詳解RocketMQ消息存儲原理
本文基于RocketMQ 4.6.0進行源碼分析
一. 存儲概要設計
RocketMQ存儲的文件主要包括CommitLog文件、ConsumeQueue文件、Index文件。RocketMQ將所有topic的消息存儲在同一個文件中,確保消息發(fā)送時按順序寫文件,盡最大的能力確保消息發(fā)送的高性能與高吞吐量。因為消息中間件一般是基于消息主題的訂閱機制,所以給按照消息主題檢索消息帶來了極大的不便。為了提高消息消費的效率,RocketMQ引入了ConsumeQueue消息消費隊列文件,每個topic包含多個消息消費隊列,每一個消息隊列有一個消息文件。Index索引文件的設計理念是為了加速消息的檢索性能,根據(jù)消息的屬性從CommitLog文件中快速檢索消息。

-
CommitLog :消息存儲文件,所有topic的消息都存儲在CommitLog 文件中。
-
ConsumeQueue :消息消費隊列,消息到達CommitLog 文件后,將異步轉發(fā)到消息消費隊列,供消息消費者消費。
-
IndexFile :消息索引文件,主要存儲消息Key 與Offset 的對應關系。
1.1 CommitLog
RocketMQ 在消息寫入過程中追求極致的磁盤順序寫,所有topic的消息全部寫入一個文件,即 CommitLog 文件。所有消息按抵達順序依次
追加到 CommitLog 文件中,消息一旦寫入,不支持修改。CommitLog 文件默認創(chuàng)建的大小為 1GB。

一個文件為 1GB 大小,也即 1024 * 1024 * 1024 = 1073741824 字節(jié),CommitLog 每個文件的命名是按照總的字節(jié)偏移量來命名的。例如第一個文件偏移量為 0,那么它的名字為 00000000000000000000;當前這 1G 文件被存儲滿了之后,就會創(chuàng)建下一個文件,下一個文件的偏移量則為 1GB,那么它的名字為 00000000001073741824,以此類推。

默認情況下這些消息文件位于 $HOME/store/commitlog 目錄下,如下圖所示:

基于文件編程與基于內存編程一個很大的不同是基于內存編程時我們有現(xiàn)成的數(shù)據(jù)結構,例如List、HashMap,對數(shù)據(jù)的讀寫非常方便,那么一條一條消息存入CommitLog文件后,該如何查找呢?
正如關系型數(shù)據(jù)庫會為每條數(shù)據(jù)引入一個ID字段,基于文件編程也會為每條消息引入一個身份標志:消息物理偏移量,即消息存儲在文件的起始位置。
正是有了物理偏移量的概念,CommitLog文件的命名方式也是極具技巧性,使用存儲在該文件的第一條消息在整個CommitLog文件組中的偏移量來命名,例如第一個CommitLog文件為0000000000000000000,第二個CommitLog文件為00000000001073741824,依次類推。
這樣做的好處是給出任意一個消息的物理偏移量,可以通過二分法進行查找,快速定位這個文件的位置,然后用消息物理偏移量減去所在文件的名稱,得到的差值就是在該文件中的絕對地址。
1.2 ConsumeQueue
CommitlLog文件的設計理念是追求極致的消息存儲性能,但我們知道消息消費模型是基于主題訂閱機制的,即一個消費組是消費特定主題的消息。根據(jù)主題從CommitlLog文件中檢索消息,這絕不是一個好主意,這樣只能從文件的第一條消息逐條檢索,其性能可想而知,為了解決基于topic的消息檢索問題,RocketMQ引入了ConsumeQueue文件。ConsumeQueue文件結構入下圖:

ConsumeQueue文件是消息消費隊列文件,是 CommitLog 文件基于topic的索引文件,主要用于消費者根據(jù) topic 消費消息,其組織方式為 /topic/queue,同一個隊列中存在多個消息文件。ConsumeQueue 的設計極具技巧,每個條目長度固定(8字節(jié)CommitLog物理偏移量、4字節(jié)消息長度、8字節(jié)tag哈希碼)。這里不是存儲tag的原始字符串,而是存儲哈希碼,目的是確保每個條目的長度固定,可以使用訪問類似數(shù)組下標的方式快速定位條目,極大地提高了ConsumeQueue文件的讀取性能。消息消費者根據(jù)topic、消息消費進度(ConsumeQueue邏輯偏移量),即第幾個ConsumeQueue條目,這樣的消費進度去訪問消息,通過邏輯偏移量logicOffset×20,即可找到該條目的起始偏移量(ConsumeQueue文件中的偏移量),然后讀取該偏移量后20個字節(jié)即可得到一個條目,無須遍歷ConsumeQueue文件。
ConsumeQueue文件可以看作基于topic維度的CommitLog索引文件,故ConsumeQueue文件夾的組織方式為topic/queue/file三層組織結構,文件存儲在 $HOME/store/consumequeue/{topic}/{queueId}/{fileName},單個文件由30萬個條目組成,每個文件大小約5.72MB。同樣的單個ConsumeQueue文件寫滿后,會繼續(xù)寫入下一個文件中。
1.3 Index
RocketMQ與Kafka相比具有一個強大的優(yōu)勢,就是支持按消息屬性檢索消息,引入ConsumeQueue文件解決了基于topic查找消息的問題,但如果想基于消息的某一個屬性進行查找,ConsumeQueue文件就無能為力了。故RocketMQ又引入了Index索引文件,實現(xiàn)基于文件的哈希索引。Index文件的存儲結構如下圖所示。

Index文件基于物理磁盤文件實現(xiàn)哈希索引。Index文件由40字節(jié)的文件頭、500萬個哈希槽、2000萬個Index條目組成,每個哈希槽4字節(jié)、每個Index條目含有20個字節(jié),分別為4字節(jié)索引key的哈希碼、8字節(jié)消息物理偏移量、4字節(jié)時間戳、4字節(jié)的前一個Index條目(哈希沖突的鏈表結構)。
1.4 內存映射
雖然基于磁盤的順序寫消息可以極大提高I/O的寫效率,但如果基于文件的存儲采用常規(guī)的Java文件操作API,例如FileOutputStream等,將性能提升會很有限,故RocketMQ又引入了內存映射,將磁盤文件映射到內存中,以操作內存的方式操作磁盤,將性能又提升了一個檔次。
在Java中可通過FileChannel的map方法創(chuàng)建內存映射文件。在Linux服務器中由該方法創(chuàng)建的文件使用的就是操作系統(tǒng)的頁緩存(pagecache)。Linux操作系統(tǒng)中內存使用策略時會盡可能地利用機器的物理內存,并常駐內存中,即頁緩存。在操作系統(tǒng)的內存不夠的情況下,采用緩存置換算法,例如LRU將不常用的頁緩存回收,即操作系統(tǒng)會自動管理這部分內存。
如果RocketMQ Broker進程異常退出,存儲在頁緩存中的數(shù)據(jù)并不會丟失,操作系統(tǒng)會定時將頁緩存中的數(shù)據(jù)持久化到磁盤,實現(xiàn)數(shù)據(jù)安全可靠。不過如果是機器斷電等異常情況,存儲在頁緩存中的數(shù)據(jù)也有可能丟失。
1.5 TransientStorePool機制
RocketMQ 中的 TransientStorePool 機制是一種優(yōu)化磁盤寫入性能的技術,主要應用于異步刷盤場景。這種機制主要是通過預先在堆外內存(Direct Memory)中分配一塊固定大小的內存區(qū)域,然后將消息數(shù)據(jù)首先寫入堆外內存,再由單獨的線程負責把堆外內存中的數(shù)據(jù)批量地、按頁對齊的方式寫入到 MappedFile 中(CommitRealTimeService線程),也就是說無需每次寫入數(shù)據(jù)時都進行系統(tǒng)調用從而提高寫入效率。
以下是 RocketMQ 使用 TransientStorePool 的主要流程:
- 預先在堆外內存中創(chuàng)建一個內存池(即 TransientStorePool),并初始化為一段連續(xù)的內存空間。
- 當生產者發(fā)送消息時,RocketMQ 先將消息寫入到 TransientStorePool 中的堆外內存里。
- 刷盤線程定時或者達到一定數(shù)量的消息后,將堆外內存中的數(shù)據(jù)按頁對齊的方式,批量寫入到 MappedFile(MappedByteBuffer)中。
- 最后,再由 MappedByteBuffer 進行真正的磁盤刷盤操作。
有了 TransientStorePool 的存在,消息可以批量寫入內存緩沖區(qū),RocketMQ 也就可以有效地控制何時以及如何將臟頁(Dirty Page,即已修改但還未寫入磁盤的內存頁)刷寫到磁盤,避免了操作系統(tǒng)自動進行的隨機性、不可預測的臟頁刷寫操作,從而提升了I/O性能,特別是在大量寫入請求的場景下。
值得一提的是,使用TransientStorePool并非沒有代價。因為需要額外的一次內存復制操作,即從堆外內存復制到內存映射區(qū)域。但是在大多數(shù)情況下,通過控制臟頁刷寫帶來的性能提升,相比于增加的內存復制開銷,更加明顯。
并且開啟 transientStorePool 機制后,由于消息數(shù)據(jù)會先寫入堆外內存,然后由特定后臺線程(CommitRealTimeService),將堆外內存中的修改 commit 到內存映射區(qū)域,而這一步如果發(fā)生斷電、服務宕機,都會產生消息丟失。而普通的異步刷盤,由于消息是直接寫入內存映射區(qū)域,所以服務宕機并不會丟失數(shù)據(jù),只有在服務器突然斷電時才會丟失少量數(shù)據(jù)。
1.6 刷盤策略
有了順序寫和內存映射的加持,RocketMQ的寫入性能得到了極大的保證,但凡事都有利弊,引入了內存映射和頁緩存機制,消息會先寫入頁緩存,此時消息并沒有真正持久化到磁盤。那么Broker收到客戶端的消息后,是存儲到頁緩存中就直接返回成功,還是要持久化到磁盤中才返回成功呢?
這是一個“艱難”的選擇,是在性能與消息可靠性方面進行權衡。為此,RocketMQ提供了三種策略:同步刷盤、異步刷盤、異步刷盤+緩沖區(qū)。
| 類型 | 描述 |
|---|---|
| SYNC_FLUSH | 同步刷盤 |
| ASYNC_FLUSH && transientStorePoolEnable=false(默認為false) | 異步刷盤 |
| ASYNC_FLUSH && transientStorePoolEnable=true | 異步刷盤+緩沖區(qū) |
- 同步刷盤時,只有消息被真正持久化到磁盤才會響應ACK,可靠性非常高,但是性能會受到較大影響,適用于金融業(yè)務。
- 異步刷盤時,消息寫入PageCache就會響應ACK,然后由后臺線程異步將PageCache里的內容持久化到磁盤,降低了讀寫延遲,提高了性能和吞吐量。服務宕機消息不丟失(操作系統(tǒng)會完成內存映射區(qū)域的刷盤),機器斷電少量消息丟失。
- 異步刷盤+緩沖區(qū),消息先寫入直接內存緩沖區(qū),然后由后臺線程異步將緩沖區(qū)里的內容持久化到磁盤,性能最好。但是最不可靠,服務宕機和機器斷電都會丟失消息。
1.7 文件恢復機制
我們知道,RocketMQ主要的數(shù)據(jù)存儲文件包括CommitLog、ConsumeQueue和Index,而ConsumeQueue、Index文件是根據(jù)CommitLog文件異步構建的。既然是異步操作,這兩者之間的數(shù)據(jù)就不可能始終保持一致,那么,重啟broker時需要如何恢復數(shù)據(jù)呢?我們考慮如下異常場景。
-
消息采用同步刷盤方式寫入CommitLog文件,準備轉發(fā)給ConsumeQueue文件時由于斷電等異常,導致存儲失敗。
-
在刷盤的時候,突然記錄了100MB消息,準備將這100MB消息寫入磁盤,由于機器突然斷電,只寫入50MB消息到CommitLog文件。
-
在RocketMQ存儲目錄下有一個檢查點(Checkpoint)文件,用于記錄CommitLog等文件的刷盤點。但將數(shù)據(jù)寫入CommitLog文件后才會將刷盤點記錄到檢查點文件中,有可能在從刷盤點寫入檢查點文件前數(shù)據(jù)就丟失了。
在RocketMQ中有broker異常停止恢復和正常停止恢復兩種場景。這兩種場景的區(qū)別是定位從哪個文件開始恢復的邏輯不一樣,大致思路如下。
- 嘗試恢復ConsumeQueue文件,根據(jù)文件的存儲格式(8字節(jié)物理偏移量、4字節(jié)長度、8字節(jié)tag哈希碼),找到最后一條完整的消息格式所對應的物理偏移量,用maxPhysical OfConsumequeue表示。
- 嘗試恢復CommitLog文件,先通過文件的魔數(shù)判斷該文件是否為ComitLog文件,然后按照消息的存儲格式尋找最后一條合格的消息,拿到其物理偏移量,如果CommitLog文件的有效偏移量小于ConsumeQueue文件存儲的最大物理偏移量,將會刪除ConsumeQueue中多余的內容,如果大于,說明ConsuemQueue文件存儲的內容少于CommitLog文件,則會重推數(shù)據(jù)。
那么如何定位要恢復的文件呢?
正常停止刷盤的情況下,先從倒數(shù)第三個文件開始進行恢復,然后按照消息的存儲格式進行查找,如果該文件中所有的消息都符合消息存儲格式,則繼續(xù)查找下一個文件,直到找到最后一條消息所在的位置。
異常停止刷盤的情況下,RocketMQ會借助檢查點文件,即存儲的刷盤點,定位恢復的文件。刷盤點記錄的是CommitLog、ConsuemQueue、Index文件最后的刷盤時間戳,但并不是只認為該時間戳之前的消息是有效的,超過這個時間戳之后的消息就是不可靠的。
異常停止刷盤時,從最后一個文件開始尋找,在尋找時讀取該文件第一條消息的存儲時間,如果這個存儲時間小于檢查點文件中的刷盤時間,就可以從這個文件開始恢復,如果這個文件中第一條消息的存儲時間大于刷盤點,說明不能從這個文件開始恢復,需要尋找上一個文件,因為檢查點文件中的刷盤點代表的是100%可靠的消息。
二. 存儲文件組織與內存映射實現(xiàn)
RocketMQ通過使用內存映射文件來提高I/O訪問性能,無論是CommitLog、Consume-Queue還是Index,單個文件都被設計為固定長度,一個文件寫滿以后再創(chuàng)建新文件,文件名就為該文件第一條消息對應的全局物理偏移量。
RocketMQ使用MappedFile、MappedFileQueue來封裝存儲文件。
2.1 MappedFileQueue映射文件隊列
MappedFileQueue是MappedFile的管理容器,MappedFileQueue對存儲目錄進行封裝,例如CommitLog文件的存儲路徑為${ROCKET_HOME}/store/commitlog/,該目錄下會存在多個內存映射文件MappedFile。MappedFileQueue結構如下:
/**
* RocketMQ通過使用內存映射文件來提高I/O訪問性能,無論是
* CommitLog、Consume-Queue還是Index,單個文件都被設計為固定長
* 度,一個文件寫滿以后再創(chuàng)建新文件,文件名就為該文件第一條消息
* 對應的全局物理偏移量。
* RocketMQ使用MappedFile、MappedFileQueue來封裝存儲文件。
*
* MappedFileQueue是MappedFile的管理容器,MappedFileQueue對
* 存儲目錄進行封裝,
*
* 例如CommitLog文件的存儲場景下,存儲路徑為${ROCKET_HOME}/store/commitlog/,
* 該目錄下會存在多個內存映射文件MappedFile
*/
public class MappedFileQueue {
private static final int DELETE_FILES_BATCH_MAX = 10;
/**
* 存儲目錄
*/
private final String storePath;
/**
* 單個文件的存儲大小
*/
private final int mappedFileSize;
/**
* MappedFile集合
*/
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
/**
* 創(chuàng)建MappedFile服務類
*/
private final AllocateMappedFileService allocateMappedFileService;
/**
* 當前刷盤指針,表示該指針之前的所有數(shù)據(jù)全部持久化到磁盤
*/
private long flushedWhere = 0;
/**
* 當前數(shù)據(jù)提交指針,內存中ByteBuffer當前的寫指針,該值大于、等于flushedWhere
*/
private long committedWhere = 0;
private volatile long storeTimestamp = 0;
}
2.1.1 根據(jù)消息存儲時間戳查找MappdFile
/**
* 根據(jù)消息存儲時間戳查找MappdFile。從MappedFile列表中第一個
* 文件開始查找,找到第一個最后一次更新時間大于待查找時間戳的文
* 件,如果不存在,則返回最后一個MappedFile
* @param timestamp
* @return
*/
public MappedFile getMappedFileByTime(final long timestamp) {
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
return null;
for (int i = 0; i < mfs.length; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
return mappedFile;
}
}
return (MappedFile) mfs[mfs.length - 1];
}
根據(jù)消息存儲時間戳查找MappdFile。從MappedFile列表中第一個文件開始查找,找到第一個最后一次更新時間大于待查找時間戳的文件,如果不存在,則返回最后一個MappedFile。
2.1.2 根據(jù)消息偏移量offset查找MappedFile、
// org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
try {
MappedFile firstMappedFile = this.getFirstMappedFile();
MappedFile lastMappedFile = this.getLastMappedFile();
if (firstMappedFile != null && lastMappedFile != null) {
if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
offset,
firstMappedFile.getFileFromOffset(),
lastMappedFile.getFileFromOffset() + this.mappedFileSize,
this.mappedFileSize,
this.mappedFiles.size());
} else {
// 計算文件索引
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
MappedFile targetFile = null;
try {
targetFile = this.mappedFiles.get(index);
} catch (Exception ignored) {
}
if (targetFile != null && offset >= targetFile.getFileFromOffset()
&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
return targetFile;
}
for (MappedFile tmpMappedFile : this.mappedFiles) {
if (offset >= tmpMappedFile.getFileFromOffset()
&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
return tmpMappedFile;
}
}
}
if (returnFirstOnNotFound) {
return firstMappedFile;
}
}
} catch (Exception e) {
log.error("findMappedFileByOffset Exception", e);
}
return null;
}
根據(jù)消息偏移量offset查找MappedFile,但是不能直接使用 offset%mappedFileSize。這是因為使用了內存映射,只要是存在于存儲目錄下的文件,都需要對應創(chuàng)建內存映射文件,如果不定期進行將已消費的消息從存儲文件中刪除,會造成極大的內存壓力與資源浪費,所以RocketMQ采取定時刪除存儲文件的策略。也就是說,在存儲文件中,第一個文件不一定是00000000000000000000,因為該文件在某一時刻會被刪除,所以根據(jù)offset定位MappedFile的算法為(int)((offset/this.mappedFileSize)(mappedFile.getFileFromOffset()/this.MappedFileSize))
2.2 MappedFile內存映射文件
MappedFile是RocketMQ內存映射文件的具體實現(xiàn)。核心屬性有:
public class MappedFile extends ReferenceResource {
/**
* 操作系統(tǒng)頁大小
*/
public static final int OS_PAGE_SIZE = 1024 * 4;
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
/**
* 當前JVM實例中MappedFile的虛擬內存。
*/
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
/**
* 當前JVM實例中MappedFile對象個數(shù)。
*/
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
/**
* 當前文件的寫指針,從0開始(內存映射文件中的寫指針)。
*/
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
/**
* 當前文件的提交指針,如果開啟transientStorePool,則數(shù)據(jù)會存儲在
* TransientStorePool中,然后提交到內存映射ByteBuffer中,再寫入磁盤。
*/
protected final AtomicInteger committedPosition = new AtomicInteger(0);
/**
* 將該指針之前的數(shù)據(jù)持久化存儲到磁盤中。
*/
private final AtomicInteger flushedPosition = new AtomicInteger(0);
protected int fileSize;
/**
* 文件通道
*/
protected FileChannel fileChannel;
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
*/
/**
* 堆外內存ByteBuffer,如果不為空,數(shù)據(jù)首先將存儲在該Buffer中,然后提交到MappedFile創(chuàng)建的
* FileChannel中。transientStorePoolEnable為true時不為空。
*/
protected ByteBuffer writeBuffer = null;
/**
* 堆外內存池,該內存池中的內存會提供內存鎖機制。transientStorePoolEnable為true時啟用。
*/
protected TransientStorePool transientStorePool = null;
/**
* 文件名稱
*/
private String fileName;
/**
* 該文件的初始偏移量
*/
private long fileFromOffset;
/**
* 物理文件
*/
private File file;
/**
* 物理文件對應的內存映射Buffer。(內存映射對象,對其進行數(shù)據(jù)寫入,會由操作系統(tǒng)同步至磁盤)
*/
private MappedByteBuffer mappedByteBuffer;
/**
* 文件最后一次寫入內容的時間
*/
private volatile long storeTimestamp = 0;
/**
* 是否是MappedFileQueue隊列中第一個文件。
*/
private boolean firstCreateInQueue = false;
}
其中 MappedByteBuffer 就是我們之前所說的內存映射對象,當你向 MappedByteBuffer 寫入數(shù)據(jù)時,這些改動會直接寫入到磁盤上。如果你修改 MappedByteBuffer 中的內容,改動對所有訪問同一文件的其他映射文件也是可見的。從API角度來看使用MappedByteBuffer操作文件可以像操作內存中的一個連續(xù)區(qū)域一樣,修改內存中的數(shù)據(jù),會由操作系統(tǒng)同步至磁盤。
內存映射機制可以參考:《Java中使用內存映射操作文件》
2.2.1 MappedFile初始化
第一步:根據(jù)是否開啟transientStorePoolEnable存在兩種初始化情況。transientStorePool-Enable為true表示內容先存儲在堆外內存,然后通過Commit線程將數(shù)據(jù)提交到FileChannel中,再通過Flush線程將數(shù)據(jù)持久化到磁盤中:
public void init(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize);
this.writeBuffer = transientStorePool.borrowBuffer();
this.transientStorePool = transientStorePool;
}
第二步:初始化fileFromOffset為文件名,也就是文件名代表該文件的起始偏移量,通過RandomAccessFile創(chuàng)建讀寫文件通道,并將文件內容使用NIO的內存映射Buffer將文件映射到內存中
//org.apache.rocketmq.store.MappedFile#init(java.lang.String, int)
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
ensureDirOK(this.file.getParent());
try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
// 創(chuàng)建內存映射 MappedByteBuffer
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("Failed to create file " + this.fileName, e);
throw e;
} catch (IOException e) {
log.error("Failed to map file " + this.fileName, e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
2.2.2 MappedFile提交
內存映射文件的提交動作由MappedFile的commit()方法實現(xiàn):
/**
* 內存映射文件的提交動作由MappedFile的commit()方法實現(xiàn)
* @param commitLeastPages 本次提交的最小頁數(shù),如果待提交數(shù)據(jù)不滿足commitLeastPages,則不執(zhí)行本次提交操作,等待下次提交
* @return
*/
public int commit(final int commitLeastPages) {
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return this.wrotePosition.get();
}
if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
commit0(commitLeastPages);
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
}
}
// All dirty data has been committed to FileChannel.
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
this.transientStorePool.returnBuffer(writeBuffer);
this.writeBuffer = null;
}
return this.committedPosition.get();
}
isAbleToCommit 方法校驗本次提交的最小頁數(shù),如果待提交數(shù)據(jù)不滿足commitLeastPages,則不執(zhí)行本次提交操作,等待下次提交:
//org.apache.rocketmq.store.MappedFile#isAbleToCommit
/**
* 判斷是否執(zhí)行commit操作。如果文件已滿,返回true。如果
* commitLeastPages大于0,則計算wrotePosition(當前writeBuffe的
* 寫指針)與上一次提交的指針(committedPosition)的差值,將其除
* 以OS_PAGE_SIZE得到當前臟頁的數(shù)量,如果大于commitLeastPages,
* 則返回true。如果commitLeastPages小于0,表示只要存在臟頁就提交
* @param commitLeastPages
* @return
*/
protected boolean isAbleToCommit(final int commitLeastPages) {
int flush = this.committedPosition.get();
int write = this.wrotePosition.get();
if (this.isFull()) {
return true;
}
if (commitLeastPages > 0) {
return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
}
return write > flush;
}
有了 TransientStorePool 的存在,消息可以批量寫入堆外內存緩沖區(qū),RocketMQ 也就可以有效地控制何時以及如何將臟頁(Dirty Page,即已修改但還未寫入磁盤的內存頁)刷寫到磁盤,避免了操作系統(tǒng)自動進行的隨機性、不可預測的臟頁刷寫操作,從而提升了I/O性能,特別是在大量寫入請求的場景下。但是引入此種機制會帶來新的問題,前文已經(jīng)詳細介紹過了,此處不再贅述。
2.2.3 MappedFile刷盤
刷盤指的是將內存中的數(shù)據(jù)寫入磁盤,永久存儲在磁盤中,由MappedFile的flush()方法實現(xiàn):
// org.apache.rocketmq.store.MappedFile#flush
public int flush(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
int value = getReadPosition();
try {
//We only append data to fileChannel or mappedByteBuffer, never both.
if (writeBuffer != null || this.fileChannel.position() != 0) {
this.fileChannel.force(false);
} else {
// 直接調用mappedByteBuffer或fileChannel的force()方法將數(shù)據(jù)
// 寫入磁盤,將內存中的數(shù)據(jù)持久化到磁盤中,
this.mappedByteBuffer.force();
}
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}
this.flushedPosition.set(value);
this.release();
} else {
log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
return this.getFlushedPosition();
}
2.2.4 TransientStorePool
TransientStorePool即短暫的存儲池。RocketMQ單獨創(chuàng)建了一個DirectByteBuffer內存緩存池,用來臨時存儲數(shù)據(jù),數(shù)據(jù)先寫入該內存映射中,然后由Commit線程定時將數(shù)據(jù)從該內存復制到與目標物理文件對應的內存映射中。RokcetMQ引入該機制是為了提供一種內存鎖定,將當前堆外內存一直鎖定在內存中,避免被進程將內存交換到磁盤中。
TransientStorePool核心屬性如下:
public class TransientStorePool {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
/**
* avaliableBuffers個數(shù),可在broker配置文件中通過transient StorePoolSize進行設置,默認為5
*/
private final int poolSize;
/**
* 每個ByteBuffer的大小,默認為mapedFileSizeCommitLog,表明TransientStorePool為CommitLog文件服務。
*/
private final int fileSize;
/**
* ByteBuffer容器,雙端隊列。
*/
private final Deque<ByteBuffer> availableBuffers;
private final MessageStoreConfig storeConfig;
}
2.2.5 內存映射整體流程

- 內存映射文件MappedFile通過AllocateMappedFileService創(chuàng)建
- MappedFile的創(chuàng)建是典型的生產者-消費者模型
- MappedFileQueue調用getLastMappedFile獲取MappedFile時,將請求放入隊列中
- AllocateMappedFileService線程持續(xù)監(jiān)聽隊列,隊列有請求時,創(chuàng)建出MappedFile對象
- 最后將MappedFile對象預熱,底層調用force方法和mlock方法
三. 消息存儲整體流程
3.1 ConmmitLog存儲流程
消息存儲入口為org.apache.rocketmq.store.DefaultMessageStore#putMessage。
// org.apache.rocketmq.store.DefaultMessageStore#putMessage
// 如果當前broker停止工作或當前不支持寫入,則拒絕消息寫入。
if (this.shutdown) {
log.warn("message store has shutdown, so putMessage is forbidden");
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
// 如果當前broker是從節(jié)點,則拒絕寫入消息
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is slave mode, so putMessage is forbidden ");
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
// 判斷當前broker是否能夠寫入
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else {
this.printTimes.set(0);
}
// topic長度大于127字符,則報錯
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
// 消息屬性長度大于 32767,則報錯
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
// OSPageCacheBusy通常是出現(xiàn)在操作系統(tǒng)在試圖緩存太多頁面時。當物理內存充滿了,操作系統(tǒng)可能試圖清除一些頁面來為新的頁面騰出空間。
// 如果這個過程中所有的頁面都在使用(即“繁忙”),那么就會報告OSPageCacheBusy。
if (this.isOSPageCacheBusy()) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
第一步:先對broker和消息進行基礎校驗。如果當前broker停止工作或當前不支持寫入,則拒絕消息寫入。如果消息主題長度超過127個字符、消息屬性長度超過32767個字符,同樣拒絕該消息寫入。如果日志中出現(xiàn)“message store is not writeable, so putMessage is forbidden”提示,最有可能是因為磁盤空間不足,在寫入ConsumeQueue、Index文件出現(xiàn)錯誤時會拒絕消息再次寫入。
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
// 記錄消息寫入時間
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
// 如果消息的延遲級別大于0,將消息的原主題名稱與原消息隊列ID存入消息屬性中,用延遲消息主題SCHEDULE_TOPIC_XXXX、消
//息隊列ID更新原先消息的主題與隊列,這是并發(fā)消息重試關鍵的異步
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
msg.setBornHostV6Flag();
}
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
msg.setStoreHostAddressV6Flag();
}
long eclipsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
// 獲取當前可以寫入的CommitLog文件,對應 ${ROCKET_HOME}/store/commitlog 文件夾下的文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 在將消息寫入CommitLog之前,先申請putMessageLock
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
//如果mappedFile為空,表明 ${ROCKET_HOME}/store/commitlog目錄下不存在任何文件,說明本次
//消息是第一次發(fā)送,用偏移量0創(chuàng)建第一個CommitLog文件,文件名為00000000000000000000,
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
// 如果文件創(chuàng)建失敗,拋出CREATE_MAPEDFILE_FAILED,這很有可能是磁盤空間不足或權限不夠導致的,
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
// 將消息追加到 CommitLog 中
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
// 當前CommitLog文件不夠寫入此條消息 (CommitLog定長1GB)
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
eclipsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
// 追加完畢,釋放鎖
putMessageLock.unlock();
}
if (eclipsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipsedTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
// appendMessage只是將消息追加到內存中,需要根據(jù)采取的是同步刷盤方式還是異步刷盤方式,將內存中的數(shù)據(jù)持久化到磁盤中。
handleDiskFlush(result, putMessageResult, msg);
// HA主從同步復制
handleHA(result, putMessageResult, msg);
return putMessageResult;
}
第二步:如果消息的延遲級別大于0,將消息的原主題名稱與原消息隊列ID存入消息屬性中,用延遲消息主題SCHEDULE_TOPIC_XXXX、消息隊列ID更新原先消息的主題與隊列。這是并發(fā)消息消費重試關鍵的一步,第5章會重點探討消息重試機制與定時消息的實現(xiàn)原理。
第三步:獲取當前可以寫入的CommitLog文件。CommitLog文件的存儲目錄為 ${ROCKET_HOME}/store/commitlog,每個文件默認1GB,一個文件寫滿后再創(chuàng)建另一個,以該文件中第一個偏移量為文件名,如果偏移量少于20位則用0補齊。第一個文件初始偏移量為0,第二個文件名中的“1073741824”代表該文件第一條消息的物理偏移量為1073741824,這樣根據(jù)物理偏移量可以快速定位到消息。MappedFileQueue可以看作${ROCKET_HOME}/store/commitlog文件夾,而MappedFile則對應該文件夾下的文件。
第四步:在將消息寫入CommitLog之前,先申請putMessageLock
第五步:設置消息的存儲時間,如果mappedFile為空,表明${ROCKET_HOME}/store/commitlog目錄下不存在任何文件,說明本次消息是第一次發(fā)送,用偏移量0創(chuàng)建第一個CommitLog文件,文件名為00000000000000000000,如果文件創(chuàng)建失敗,拋出CREATE_MAPEDFILE_FAILED,這很有可能是磁盤空間不足或權限不夠導致的。
第六步:將消息追加到MappedFile中。首先獲取MappedFile當前的寫指針,如果currentPos大于或等于文件大小,表明文件已寫滿,拋出AppendMessageStatus.UNKNOWN_ERROR。如果currentPos小于文件大小,通過slice()方法創(chuàng)建一個與原ByteBuffer共享的內存區(qū),且擁有獨立的position、limit、capacity等指針,并設置position為當前指針:
// org.apache.rocketmq.store.MappedFile#appendMessagesInner
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
// 首先獲取MappedFile當前的寫指針,如果currentPos大于或等于文件大小,表明文件已寫滿
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {
// 通過slice()方法創(chuàng)建一個與原ByteBuffer共享的內存區(qū),且擁有獨立的position、limit、capacity等指針(零拷貝)
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
// 并設置position為當前指針
byteBuffer.position(currentPos);
AppendMessageResult result;
// 執(zhí)行寫入操作,將消息內容存儲到ByteBuffer中,這里只是將消息存儲在MappedFile對應的內存映射Buffer中,并沒有寫入磁盤
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
第七步:創(chuàng)建全局唯一消息ID,消息ID有16字節(jié)。

// org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner)
// 創(chuàng)建全局唯一的消息ID
if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
} else {
msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
}
為了消息ID具備可讀性,返回給應用程序的msgId為字符類型,可以通過UtilAll. bytes2string方法將msgId字節(jié)數(shù)組轉換成字符串,通過UtilAll.string2bytes方法將msgId字符串還原成16字節(jié)的數(shù)組,根據(jù)提取的消息物理偏移量,可以快速通過msgId找到消息內容。
// org.apache.rocketmq.common.message.MessageDecoder#createMessageId(java.nio.ByteBuffer, java.nio.ByteBuffer, long)
public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
input.flip();
int msgIDLength = addr.limit() == 8 ? 16 : 28;
input.limit(msgIDLength);
input.put(addr);
input.putLong(offset);
return UtilAll.bytes2string(input.array());
}
第九步:根據(jù)消息體、主題和屬性的長度,結合消息存儲格式,計算消息的總長度,RocketMQ消息存儲格式如下:

- TOTALSIZE:消息條目總長度,4字節(jié)。
- MAGICCODE:魔數(shù),4字節(jié)。固定值0xdaa320a7。
- BODYCRC:消息體的crc校驗碼,4字節(jié)。
- QUEUEID:消息消費隊列ID,4字節(jié)。
- FLAG:消息標記,RocketMQ對其不做處理,供應用程序使用,默認4字節(jié)。
- QUEUEOFFSET:消息在ConsumeQuene文件中的物理偏移量,8字節(jié)。
- PHYSICALOFFSET:消息在CommitLog文件中的物理偏移量,8字節(jié)。
- SYSFLAG:消息系統(tǒng)標記,例如是否壓縮、是否是事務消息等,4字節(jié)。
- BORNTIMESTAMP:消息生產者調用消息發(fā)送API的時間戳,8字節(jié)。
- BORNHOST:消息發(fā)送者IP、端口號,8字節(jié)。
- STORETIMESTAMP:消息存儲時間戳,8字節(jié)。
- STOREHOSTADDRESS:Broker服務器IP+端口號,8字節(jié)。
- RECONSUMETIMES:消息重試次數(shù),4字節(jié)。
- Prepared Transaction Offset:事務消息的物理偏移量,8字節(jié)。
- BodyLength:消息體長度,4字節(jié)。
- Body:消息體內容,長度為bodyLenth中存儲的值。
- TopicLength:主題存儲長度,1字節(jié),表示主題名稱不能超過255個字符。
- Topic:主題,長度為TopicLength中存儲的值。
- PropertiesLength:消息屬性長度,2字節(jié),表示消息屬性長度不能超過65536個字符。
- Properties:消息屬性,長度為PropertiesLength中存儲的值。
計算消息長度的代碼:
//org.apache.rocketmq.store.CommitLog#calMsgLength
/**
* CommitLog條目是不定長的,每一個條目的長度存儲在前4個字節(jié)中。
* @param sysFlag
* @param bodyLength 消息body長度
* @param topicLength topic長度
* @param propertiesLength properties長度
* @return
*/
protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {
int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
final int msgLen = 4 //TOTALSIZE 消息頭部4字節(jié),記錄消息條目總長度
+ 4 //MAGICCODE
+ 4 //BODYCRC 消息體的crc校驗碼
+ 4 //QUEUEID 消息消費隊列ID
+ 4 //FLAG 消息標記,RocketMQ對其不做處理,供應用程序使用, 默認4字節(jié)。
+ 8 //QUEUEOFFSET 消息在ConsumeQuene文件中的物理偏移量,8字節(jié)
+ 8 //PHYSICALOFFSET 消息在CommitLog文件中的物理偏移量,8字 節(jié)。
+ 4 //SYSFLAG 消息系統(tǒng)標記,例如是否壓縮、是否是事務消息 等,4字節(jié)。
+ 8 //BORNTIMESTAMP 消息生產者調用消息發(fā)送API的時間戳,8字 節(jié)。
+ bornhostLength //BORNHOST 消息發(fā)送者IP、端口號,8字節(jié)。
+ 8 //STORETIMESTAMP 消息存儲時間戳,8字節(jié)。
+ storehostAddressLength //STOREHOSTADDRESS Broker服務器IP+端口號,8字節(jié)。
+ 4 //RECONSUMETIMES 消息重試次數(shù),4字節(jié)。
+ 8 //Prepared Transaction Offset 事務消息的物理偏移量,8字節(jié)
+ 4 + (bodyLength > 0 ? bodyLength : 0) //BODY 消息體長度,4字節(jié) + 消息體內容,長度為bodyLenth中存儲的值。
+ 1 + topicLength //TOPIC 主題存儲長度,1字節(jié),表示主題名稱不能超 過255個字符 + Topic內容,長度為 topicLength
+ 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength 消息屬性長度,2字節(jié),表示消息屬性長度不能超過65536個字符 + 消息屬性內容
+ 0;
return msgLen;
}
第十步:如果消息長度+END_FILE_MIN_BLANK_LENGTH大于CommitLog文件的空閑空間,則返回AppendMessageStatus.END_OF_FILE,Broker會創(chuàng)建一個新的CommitLog文件來存儲該消息。從這里可以看出,每個CommitLog文件最少空閑8字節(jié),高4字節(jié)存儲當前文件的剩余空間,低4字節(jié)存儲魔數(shù)CommitLog.BLANK_MAGIC_CODE:
// 如果消息長度+END_FILE_MIN_BLANK_LENGTH大于CommitLog文件的空閑空間,
// 則返回AppendMessageStatus.END_OF_FILE,Broker會創(chuàng)建一個新的CommitLog文件來存儲該消息。
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
第十一步:將消息內容存儲到ByteBuffer中,然后創(chuàng)建AppendMessageResult。這里只是將消息存儲在MappedFile對應的內存映射Buffer中,并沒有寫入磁盤:
// org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner)
// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(bornHostHolder, bornHostLength);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(storeHostHolder, storeHostLength);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.msgStoreItemMemory.put(propertiesData);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
// 將消息內容存儲到ByteBuffer中,然后創(chuàng)建AppendMessageResult。這里只是將消息存儲在MappedFile對應的內存映射Buffer中,并沒有寫入磁盤
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
AppendMessageResult 結構如下:
public class AppendMessageResult {
// Return code
/**
* 消息追加結果。取值為PUT_OK則代表追加成功、
* END_OF_FILE則代表超過文件大小、
* MESSAGE_SIZE_EXCEEDED則代表消息長度超過最大允許長度、
* PROPERTIES_SIZE_EXCEEDED則代表消息屬性超過最大允許長度、
* UNKNOWN_ERROR則代表未知異常。
*/
private AppendMessageStatus status;
// Where to start writing
/**
* 消息的物理偏移量。
*/
private long wroteOffset;
// Write Bytes
private int wroteBytes;
// 消息ID
private String msgId;
// Message storage timestamp
/**
* 消息存儲時間戳
*/
private long storeTimestamp;
// Consume queue's offset(step by one)
/**
* 消息消費隊列的邏輯偏移量,類似于數(shù)組下標
*/
private long logicsOffset;
/**
* 寫入頁緩存的響應時間
*/
private long pagecacheRT = 0;
/**
* 批量發(fā)送消息時的消息條數(shù)
*/
private int msgNum = 1;
}
第十二步:更新消息隊列的邏輯偏移量
第十三步:處理完消息追加邏輯后將釋放putMessageLock。
第十四步:DefaultAppendMessageCallback#doAppend只是將消息追加到內存中,需要根據(jù)采取的是同步刷盤方式還是異步刷盤方式,將內存中的數(shù)據(jù)持久化到磁盤中,后文會詳細介紹刷盤操作。
// org.apache.rocketmq.store.CommitLog#putMessage
// appendMessage只是將消息追加到內存中,需要根據(jù)采取的是同步刷盤方式還是異步刷盤方式,將內存中的數(shù)據(jù)持久化到磁盤中。
handleDiskFlush(result, putMessageResult, msg);
第十五步:然后執(zhí)行HA主從同步復制。
// org.apache.rocketmq.store.CommitLog#putMessage
// HA主從同步復制
handleHA(result, putMessageResult, msg);
3.2 ConsumeQueue、Index消息索引的異步構建
因為ConsumeQueue文件、Index文件都是基于CommitLog文件構建的,所以當消息生產者提交的消息存儲到CommitLog文件中時,ConsumeQueue文件、Index文件需要及時更新,否則消息無法及時被消費,根據(jù)消息屬性查找消息也會出現(xiàn)較大延遲。RocketMQ通過開啟一個線程ReputMessageServcie來準實時轉發(fā)CommitLog文件的更新事件,相應的任務處理器根據(jù)轉發(fā)的消息及時更新ConsumeQueue文件、Index文件。
// org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService
class ReputMessageService extends ServiceThread {
private volatile long reputFromOffset = 0;
/**
* ReputMessageService線程每執(zhí)行一次任務推送,休息1ms后繼續(xù)
* 嘗試推送消息到Consume Queue和Index文件中,消息消費轉發(fā)由
* doReput()方法實現(xiàn)
*/
private void doReput() {
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
// 返回reputFromOffset偏移量開始的全部有效數(shù)據(jù)(CommitLog文件)。然后循環(huán)讀取每一條消息
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
// 從result返回的ByteBuffer中循環(huán)讀取消息,一次讀取一條,創(chuàng)建Dispatch Request對象
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
// 執(zhí)行CommitLog轉發(fā)
DefaultMessageStore.this.doDispatch(dispatchRequest);
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
this.reputFromOffset += size;
readSize += size;
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
.addAndGet(dispatchRequest.getMsgSize());
}
} else if (size == 0) {
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {
if (size > 0) {
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
this.reputFromOffset += size;
} else {
doNext = false;
// If user open the dledger pattern or the broker is master node,
// it will not ignore the exception and fix the reputFromOffset variable
if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
this.reputFromOffset += result.getSize() - readSize;
}
}
}
}
} finally {
result.release();
}
} else {
doNext = false;
}
}
}
@Override
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
Thread.sleep(1);
// 每休息1ms就執(zhí)行一次消息轉發(fā)
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
}
3.3 整體流程

- Broker端收到消息后,將消息原始信息保存在CommitLog文件對應的MappedFile中,然后異步刷新到磁盤
- ReputMessageServie線程異步的將CommitLog中MappedFile中的消息保存到ConsumerQueue和IndexFile中
- ConsumerQueue和IndexFile只是原始文件的索引信息。
四. 文件創(chuàng)建
當有新的消息到來的時候,其會默認選擇列表中的最后一個文件來進行消息的保存:

當有新的消息到來的時候,其會默認選擇列表中的最后一個文件來進行消息的保存:

org.apache.rocketmq.store.MappedFileQueue:
public class MappedFileQueue {
public MappedFile getLastMappedFile() {
MappedFile mappedFileLast = null;
while (!this.mappedFiles.isEmpty()) {
try {
mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
break;
} catch (IndexOutOfBoundsException e) {
//continue;
} catch (Exception e) {
log.error("getLastMappedFile has exception.", e);
break;
}
}
return mappedFileLast;
}
}
當然如果這個 Broker 之前從未接受過消息的話,那么這個列表肯定是空的。這樣一旦有新的消息需要存儲的時候,其就得需要立即創(chuàng)建一個 MappedFile 文件來存儲消息。
RocketMQ 提供了一個專門用來實例化 MappedFile 文件的服務類 AllocateMappedFileService。在內存中,也同時維護了一張請求表 requestTable 和一個優(yōu)先級請求隊列 requestQueue 。當需要創(chuàng)建文件的時候,Broker 會創(chuàng)建一個 AllocateRequest 對象,其包含了文件的路徑、大小等信息。然后先將其放入 requestTable 表中,再將其放入優(yōu)先級請求隊列 requestQueue 中:
org.apache.rocketmq.store.AllocateMappedFileService#putRequestAndReturnMappedFile
public class AllocateMappedFileService extends ServiceThread {
public MappedFile putRequestAndReturnMappedFile(String nextFilePath,
String nextNextFilePath,
int fileSize) {
// ...
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
if (nextPutOK) {
// ...
boolean offerOK = this.requestQueue.offer(nextReq);
}
}
}
服務類會一直等待優(yōu)先級隊列是否有新的請求到來,如果有,便會從隊列中取出請求,然后創(chuàng)建對應的 MappedFile,并將請求表 requestTable 中 AllocateRequest 對象的字段 mappedFile 設置上值。最后將 AllocateRequest 對象上的 CountDownLatch 的計數(shù)器減 1 ,以標明此分配申請的 MappedFile 已經(jīng)創(chuàng)建完畢了:
org.apache.rocketmq.store.AllocateMappedFileService#mmapOperation
public class AllocateMappedFileService extends ServiceThread {
public void run() {
log.info(this.getServiceName() + " service started");
//會一直嘗試從隊列中獲取請求,從而執(zhí)行創(chuàng)建文件的任務
while (!this.isStopped() && this.mmapOperation()) {
}
log.info(this.getServiceName() + " service end");
}
/**
* Only interrupted by the external thread, will return false
*/
private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {
// 獲取優(yōu)先隊列中的請求
req = this.requestQueue.take();
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
// ...
if (req.getMappedFile() == null) {
long beginTime = System.currentTimeMillis();
MappedFile mappedFile;
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
//創(chuàng)建MappedFile
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
// pre write mappedFile
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
//...
}
req.setMappedFile(mappedFile);
this.hasException = false;
isSuccess = true;
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
this.hasException = true;
return false;
} catch (IOException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.hasException = true;
if (null != req) {
requestQueue.offer(req);
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {
}
}
} finally {
if (req != null && isSuccess)
// 文件創(chuàng)建成功,計數(shù)減一。因為創(chuàng)建文件的動作是在獨立的線程中完成的,業(yè)務線程需要等待文件創(chuàng)建完畢
req.getCountDownLatch().countDown();
}
return true;
}
}
等待 MappedFile 創(chuàng)建完畢之后,其便會從請求表 requestTable 中取出并刪除表中記錄:
org.apache.rocketmq.store.AllocateMappedFileService#putRequestAndReturnMappedFile
public class AllocateMappedFileService extends ServiceThread {
public MappedFile putRequestAndReturnMappedFile(String nextFilePath,
String nextNextFilePath,
int fileSize) {
// ...........
//獲取請求
AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result != null) {
// 等待 MappedFile 的創(chuàng)建完成
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
if (!waitOK) {
// 創(chuàng)建超時
log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
return null;
} else {
//創(chuàng)建成功,則將requestTable中將請求移除
this.requestTable.remove(nextFilePath);
// 返回創(chuàng)建的MappedFiles
return result.getMappedFile();
}
} else {
log.error("find preallocate mmap failed, this never happen");
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
return null;
}
}
創(chuàng)建完成后將其加入列表中:
org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile(long, boolean)
public class MappedFileQueue {
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;
// 嘗試獲取最后一個MappedFile
MappedFile mappedFileLast = getLastMappedFile();
if (mappedFileLast == null) {
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
if (mappedFileLast != null && mappedFileLast.isFull()) {
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
// 第一次啟動未創(chuàng)建MappedFile
if (createOffset != -1 && needCreate) {
// 文件名
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) {
// 提交一個創(chuàng)建MappedFile的請求
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
if (mappedFile != null) {
//創(chuàng)建成功
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
//加入列表
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
return mappedFileLast;
}
}

至此,MappedFile 已經(jīng)創(chuàng)建完畢,也即可以進行下一步的操作了。
五. 文件初始化
在 MappedFile 的構造函數(shù)中,其使用了 FileChannel 類提供的 map 函數(shù)來將磁盤上的這個文件映射到進程地址空間中。然后當通過 MappedByteBuffer 來讀入或者寫入文件的時候,磁盤上也會有相應的改動。采用這種方式,通常比傳統(tǒng)的基于文件 IO 流的方式讀取效率高。
public class MappedFile extends ReferenceResource {
public MappedFile(final String fileName, final int fileSize)
throws IOException {
init(fileName, fileSize);
}
private void init(final String fileName, final int fileSize)
throws IOException {
// ...
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
// ...
}
}
六. 消息文件加載
前面提到過,Broker 在啟動的時候,會加載磁盤上的文件到一個 mappedFiles 列表中。但是加載完畢后,其還會對這份列表中的消息文件進行驗證 (恢復),確保沒有錯誤。
驗證的基本想法是通過一一讀取列表中的每一個文件,然后再一一讀取每個文件中的每個消息,在讀取的過程中,其會更新整體的消息寫入的偏移量,如下圖中的紅色箭頭 (我們假設最終讀取的消息的總偏移量為 905):

當確定消息整體的偏移量之后,Broker 便會確定每一個單獨的 MappedFile 文件的各自的偏移量,每一個文件的偏移量是通過取余算法確定的:
org.apache.rocketmq.store.MappedFileQueue#truncateDirtyFiles:
public class MappedFileQueue {
public void truncateDirtyFiles(long offset) {
for (MappedFile file : this.mappedFiles) {
long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
if (fileTailOffset > offset) {
if (offset >= file.getFileFromOffset()) {
// 確定每個文件的各自偏移量
file.setWrotePosition((int) (offset % this.mappedFileSize));
file.setCommittedPosition((int) (offset % this.mappedFileSize));
file.setFlushedPosition((int) (offset % this.mappedFileSize));
} else {
// ...
}
}
}
// ...
}
}

在確定每個消息文件各自的寫入位置的同時,其還會刪除起始偏移量大于當前總偏移量的消息文件,這些文件可以視作臟文件,或者也可以說這些文件里面一條消息也沒有。這也是上述文件 1073741824 被打上紅叉的原因:
public void truncateDirtyFiles(long offset) {
List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
for (MappedFile file : this.mappedFiles) {
long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
if (fileTailOffset > offset) {
if (offset >= file.getFileFromOffset()) {
// ...
} else {
// 總偏移量 < 文件起始偏移量
// 加入到待刪除列表中
file.destroy(1000);
willRemoveFiles.add(file);
}
}
}
this.deleteExpiredFile(willRemoveFiles);
}
七. 寫入消息
消息寫入口:org.apache.rocketmq.store.CommitLog#putMessage
一旦我們獲取到 MappedFile 文件之后,我們便可以往這個文件里面寫入消息了。寫入消息可能會遇見如下兩種情況,一種是這條消息可以完全追加到這個文件中,另外一種是這條消息完全不能或者只有一小部分能存放到這個文件中,其余的需要放到新的文件中。我們對于這兩種情況分別討論:
7.1 文件可以完全存儲消息
MappedFile 類維護了一個用以標識當前寫位置的指針 wrotePosition,以及一個用來映射文件到進程地址空間的 mappedByteBuffer:
public class MappedFile extends ReferenceResource {
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
private MappedByteBuffer mappedByteBuffer;
}
由這兩個數(shù)據(jù)結構我們可以看出來,單個文件的消息寫入過程其實是非常簡單的。首先獲取到這個文件的寫入位置,然后將消息內容追加到 byteBuffer 中,然后再更新寫入位置。
public class MappedFile extends ReferenceResource {
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
// ...
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer =
writeBuffer != null ?
writeBuffer.slice() :
this.mappedByteBuffer.slice();
// 更新 byteBuffer 位置
byteBuffer.position(currentPos);
// 寫入消息內容
// ...
// 獲取當前需要寫入的消息長度,更新 wrotePosition 指針的位置
this.wrotePosition.addAndGet(result.getWroteBytes());
return result;
}
}
}
示例流程如下所示:

7.2 文件不可以完全存儲消息
在寫入消息之前,如果判斷出文件已經(jīng)滿了的情況下,其會直接嘗試創(chuàng)建一個新的 MappedFile:
public class CommitLog {
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// 文件為空 || 文件已經(jīng)滿了
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
}
// ...
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
}
}
如果文件未滿,那么在寫入之前會先計算出消息體長度 msgLen,然后判斷這個文件剩下的空間是否有能力容納這條消息。在這個地方我們還需要介紹下每條消息的存儲方式。
每條消息的存儲是按照一個 4 字節(jié)的長度來做界限的,這個長度本身就是整個消息體的長度,當讀完這整條消息體的長度之后,下一次再取出來的一個 4 字節(jié)的數(shù)字,便又是下一條消息的長度:

圍繞著一條消息,還會存儲許多其它內容,我們在這里只需要了解前兩位是 4 字節(jié)的總長度和 4 字節(jié)的 MAGICCODE 即可:

MAGICCODE 的可選值有:
CommitLog.MESSAGE_MAGIC_CODECommitLog.BLANK_MAGIC_CODE
當這個文件有能力容納這條消息體的情況下,其便會存儲 MESSAGE_MAGIC_CODE 值;當這個文件沒有能力容納這條消息體的情況下,其便會存儲 BLANK_MAGIC_CODE 值。所以這個 MAGICCODE 是用來界定這是空消息還是一條正常的消息。
當判定這個文件不足以容納整個消息的時候,其將消息體長度設置為這個文件剩余的最大空間長度,將 MAGICCODE 設定為這是一個空消息文件 (需要去下一個文件去讀)。由此我們可以看出消息體長度 和 MAGICCODE 是判別一條消息格式的最基本要求,這也是 END_FILE_MIN_BLANK_LENGTH 的值為 8 的原因:
org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner)
public class CommitLog {
class DefaultAppendMessageCallback implements AppendMessageCallback {
// File at the end of the minimum fixed length empty
private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
public AppendMessageResult doAppend(final long fileFromOffset,
final ByteBuffer byteBuffer,
final int maxBlank,
final MessageExtBrokerInner msgInner) {
// ...
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
// ...
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE,
/** other params **/ );
}
}
}
}
由上述方法我們看出在這種情況下返回的結果是 END_OF_FILE。當檢測到這種返回結果的時候,CommitLog 接著又會申請創(chuàng)建新的 MappedFile 并嘗試寫入消息。追加方法同 (1) 相同,不再贅述:

注: 在消息文件加載的過程中,其也是通過判斷
MAGICCODE的類型,來判斷是否繼續(xù)讀取下一個MappedFile來計算整體消息偏移量的。
六. 消息刷盤策略
在消息存儲主流程執(zhí)行完成后,會調用handleDiskFlush觸發(fā)刷盤策略。
// org.apache.rocketmq.store.CommitLog#handleDiskFlush
/**
* 觸發(fā)刷盤CommitLog
* @param result
* @param putMessageResult
* @param messageExt
*/
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 如果是同步刷盤
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
// 構建GroupCommitRequest同步任務并提交到GroupCommitRequest。
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
// 將同步任務GroupCommitRequest提交到GroupCommitService線程
service.putRequest(request);
// 等待同步刷盤任務完成,異步刷盤線程刷寫完畢后會喚醒當前線程,超時時間默認為5s,如果超時則返回刷盤錯誤,刷盤成功后正常返回給調用方。
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
// Asynchronous flush
else {
// 異步刷盤
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
// 如果transientStorePoolEnable為false,消息將追加到與物理文件直接映射的內存中,然后寫入磁盤
// 喚醒刷盤線程,執(zhí)行刷盤操作
flushCommitLogService.wakeup();
} else {
// 如果transientStorePoolEnable為true,RocketMQ會單獨申請一個與目標物理文件(CommitLog)同樣大
// 小的堆外內存,該堆外內存將使用內存鎖定,確保不會被置換到虛擬內存中去,消息首先追加到堆外內存,然后提交到與物理文件的內存
// 映射中,再經(jīng)flush操作到磁盤
commitLogService.wakeup();
}
}
}
刷盤的整體流程:

producer發(fā)送給broker的消息保存在MappedFile中,然后通過刷盤機制同步到磁盤中
刷盤分為同步刷盤和異步刷盤
異步刷盤后臺線程按一定時間間隔執(zhí)行
同步刷盤也是生產者-消費者模型。broker保存消息到MappedFile后,創(chuàng)建GroupCommitRequest請求放入列表,并阻塞等待。后臺線程從列表中獲取請求并刷新磁盤,成功刷盤后通知等待線程。
6.1 異步刷盤
當配置為異步刷盤策略的時候,Broker 會運行一個服務 FlushRealTimeService 用來刷新緩沖區(qū)的消息內容到磁盤,這個服務使用一個獨立的線程來做刷盤這件事情,默認情況下每隔 500ms 來檢查一次是否需要刷盤:
class FlushRealTimeService extends FlushCommitLogService {
public void run() {
// 不停運行
while (!this.isStopped()) {
// interval 默認值是 500ms
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
this.waitForRunning(interval);
}
// 刷盤
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
}
}
}
本文參考至:
《RocketMQ技術內幕》

浙公網(wǎng)安備 33010602011771號