【RocketMQ】主從同步實現原理
RocketMQ支持集群部署來保證高可用。它基于主從模式,將節點分為Master、Slave兩個角色,集群中可以有多個Master節點,一個Master節點可以有多個Slave節點。Master節點負責接收生產者發送的寫入請求,將消息寫入CommitLog文件,Slave節點會與Master節點建立連接,從Master節點同步消息數據。消費者可以從Master節點拉取消息,也可以從Slave節點拉取消息。

在RocketMQ 4.5版本之前,如果Master宕機,不支持自動將Slave切換為Master,需要人工介入,在4.5版本之后引入了DLedger來解決Master/Slave的主從切換問題,今天先來看主從模式下的數據同步原理。
RocketMQ主從模式下,是通過Slave節點主動向Master節點發送請求通知主節點進行數據同步的。
主從消息同步
建立連接
主節點監聽連接事件
主從節點既然需要傳輸數據,那么肯定會先建立一個連接,所以主節點在啟動的時候,會開啟一個端口(haListenPort)用于監聽從節點的連接請求(注冊了ACCEPT連接事件的監聽),默認端口是10912,當然也可以通過配置修改haListenPort的值,使用其他端口。
在端口綁定之后,主節點會專門開啟一個線程,用于監聽到從節點的連接事件,如果從節點發起了連接請求,會與從節點建立連接,與從節點的連接信息會封裝在HAConnection類中,主節點和從節點的數據同步邏輯也在HAConnection中。
從節點發起連接請求
從節點在啟動時會向主節點發起連接請求,上面說過主節點會監聽從節點的連接請求,所以經過這一步主從節點的連接建立完成,建立成功后,從節點會在連接上注冊READ可讀事件監聽,處理連接上的可讀事件。
消息同步
從節點主從同步處理
從節點主從同步的邏輯主要在HAClient中,它開啟了一個線程處理主從同步,只要Slave節點未停止,就會不斷循環進行如下處理:
- 從節點會定時向主節點匯報消息同步的偏移量(同步進度),所以每次循環開始都會判斷是否需要向主節點發送消息同步偏移量,如果已經有一段時間內沒有向主節點匯報,此時就會向主節點發送消息同步偏移量,告訴主節點現在同步到哪條消息;
- 等待與Master節點建立的連接上產生可讀事件;
- 處理可讀事件,主要是判斷Master節點是否發來了數據,如果Master節點發送了數據,就要從網絡中讀取數據,將讀取到的消息內容寫到從節點自己的CommmitLog。
主節點處理從節點發送請求
讀事件處理(處理從節點發送的請求)
上面說到從節點會定時向主節點匯報消息同步的進度,主節點會開啟一個線程專門處理監聽到的可讀事件,也就是處理從節點發來的請求,處理邏輯在ReadSocketService中。
主節點會將從節點發送的消息同步偏移量記錄在slaveAckOffset中,表示從節點已經同步的消息位置,同時也會將消息同步偏移量更新到push2SlaveMaxOffset中,它代表了主節點向從節點推送消息的偏移量。slaveRequestOffset的值如果小于0,也會將其更新為從節點反饋的同步偏移量。
這里再對比一下這三個變量的值,之后主節點向從節點發送消息數據會使用slaveRequestOffset來判斷是否需要向從節點推送數據:
-
slaveAckOffset:從節點響應的同步消息的偏移量,記錄從節點已經同步的消息位置,每次收到從節點反饋的同步偏移量都會對這個值進行更新; -
slaveRequestOffset:默認值為-1,此時表示還未收到從節點反饋的消息偏移量,在收到從節點發送的消息同步偏移量之后,如果slaveRequestOffset的值小于0才會對其進行更新,也就是主節點首次收到從節點的反饋進度或者主節點重啟等原因值又被恢復成了默認值-1再次收到反饋進度才會更新,之后不會對其進行更新; -
push2SlaveMaxOffset:默認值為0,在收到從節點反饋的消息偏移量時,會對該值進行更新,與slaveRequestOffset不同的是它每次收到從節點反饋的時候都會更新,表示主節點向從節點推送消息的偏移量;
寫事件處理(向從節點發送消息數據)
主節點同樣開啟了一個線程來處理網絡中的寫事件,主節點向從節點發送同步消息數據的處理就是在這里進行的,它也會開啟一個循環,只要主節點未停止服務,就不斷進行如下處理:
-
首先根據
slaveRequestOffset的值判斷是否需要進行推送,有以下兩種情況:slaveRequestOffset值為-1(默認值),表示還未收到過從節點反饋的消息偏移量,所以此時會睡眠一段時間等待從節點發送消息拉取偏移量;slaveRequestOffset值不為-1表示已經接收到過從節點反饋的消息偏移量(上面提到從節點向主節點反饋同步進度之后,主節點會更新這個值),此時進入下一步;
-
判斷
nextTransferFromWhere的值(默認值-1),它是主節點中記錄的下次需要傳輸的消息在CommitLog文件中的偏移量,如果值不為-1表示已經進行過數據同步,此時可以進入下一步。這里我們看下值為-1也就是首次進行主從同步的情況:-
slaveRequestOffset為0,表示從節點向主節點發送的消息同步偏移量為0,也就是從節點還未同步到消息,本次是首次進行同步,那么就從主節點當前CommitLog文件記錄的最新的那條消息開始同步,此時更新nextTransferFromWhere的值為當前CommitLog的最大的那個偏移量,然后進入下一步;每個CommitLog文件大小為1G,所以可能會有多個CommitLog文件,首次進行主從同步的時候從最近那個也就是當前正使用的那個CommitLog文件中的消息開始進行同步;
-
slaveRequestOffset大于0,表示從節點之前已經同步過消息,那么就從反饋的位置處開始消息同步,也就是之前同步到哪個消息了,就從那個消息繼續往后同步,此時將nextTransferFromWhere的值更新為slaveRequestOffset的值,然后進入下一步;這一步主要是對
nextTransferFromWhere的值進行處理。
-
-
判斷上次向從節點發送的消息是否已經傳輸完畢(有可能網絡等原因數據還在發送中):
- 如果數據都已經發送完畢,會判斷距離上次發送數據的時間間隔是否超過了設置的心跳時間,如果超過,為了避免連接空閑被關閉,需要發送一個心跳包,維護長連接;
- 如果上次發送的數據還在傳輸中,會繼續先傳輸上次同步的數據;
-
根據
nextTransferFromWhere的值從CommitLog中獲取本次要同步的消息內容; -
更新
nextTransferFromWhere的值為下次發送消息的偏移量; -
將第4步中獲取到的消息內容,每次最大發送32KB的數據,發送給從節點,進行數據同步;
從節點對收到消息的處理
在從節點主從同步處理一節中,提到從節點會開啟一個線程處理可讀事件,當主節點向從節點推送消息數據進行同步后,從節點監聽到可讀事件,就會從請求中獲取發送的消息數據,進行同步:
- 從緩沖區中讀取數據,首先獲取到的是消息在master節點的物理偏移量masterPhyOffset;
- 獲取從節點當前CommitLog的最大物理偏移量
slavePhyOffset,如果不為0并且不等于masterPhyOffset,表示與Master節點的傳輸偏移量不一致,也就是數據不一致,此時終止處理; - 計算消息體在讀緩沖區中的起始位置,從讀緩沖區中根據起始位置,讀取消息內容,將消息追加到從節點的CommitLog中;
- 繼續處理下一條消息直到請求中的消息處理完畢;
從節點會監聽到網絡中的可讀數據,收到消息后將消息寫入從節點的CommitLog中。

等待主從復制傳輸結束
SYNC_MASTER同步復制:消息寫入主節點之后,需要等待從節點也寫入完畢才能返回成功。
ASYNC_MASTER異步復制:消息寫入主節點之后即可返回成功,主從同步數據異步進行,不需要等待從節點寫入完畢即可返回成功。
當主從同步開始之后,如果有新的消息寫入主節點的CommitLog,如果Master節點配置的是SYNC_MASTER同步復制,在消息寫入主節點之后還需要等待從節點同步完畢,主節點會開啟一個線程,可以記作數據同步判斷線程(GroupTransferService中實現),它專門來判斷數據是否同步完畢。
首先消息在寫入CommitLog之后會構建一個消息提交請求GroupCommitRequest,請求中會攜帶本次消息寫入之后的偏移量,將其提交到一個求集合requestsRead中,這個線程可以記作主線程,然后主線程會喚醒數據同步判斷線程來判斷數據是否同步完畢,之后主線程進入等待狀態。
在數據同步判斷線程中,它會對消息提交請求集合requestsRead中的每一個請求進行處理,開啟循環做如下處理:
push2SlaveMaxOffset記錄了從節點已經同步的消息偏移量,將push2SlaveMaxOffset與本次消息提交請求的偏移量作對比:- 如果
push2SlaveMaxOffset值大,說明當前提交請求中的消息已經同步完畢,此時進入第2步喚醒正在等待的主線程,繼續執行主線程的處理邏輯; - 如果
push2SlaveMaxOffset值比請求中的偏移量小,表示這條消息還未同步到從節點,此時當前線程會等待一段時間再進行判斷,直到數據已經同步到從節點或者超時;
- 如果
- 喚醒主線程;

主從模式下的消息消費
在主從模式下,消費者向Broker發送拉取消息請求后,Broker對拉取請求進行處理時會設置一個broker ID,建議消費者下次從這個Broker拉取消息,接下來看下Broker是根據什么條件決定返回哪個Broker ID的。
Broker在處理消費者拉取請求時,獲取消息后會在返回結果中設置一個是否建議從Slave節點拉取值放在isSuggestPullingFromSlave這個變量中,這個值的判斷方式如下:
diff:當前Broker的CommitLog最大偏移量減去本次拉取消息的最大物理偏移量,表示剩余未拉取的消息;
memory:消息在PageCache中的總大小,計算方式是總物理內存 * 消息存儲在內存中的閥值(默認為40)/100,也就是說MQ會緩存一部分消息在操作系統的PageCache中,加速訪問;
如果dif大于memory的值,表示未拉取的消息過多,已經超出了PageCache緩存的數據的大小,還需要從磁盤中獲取消息,所以此時會建議下次從Slave節點拉取,將isSuggestPullingFromSlave的值置為true,否則為false。
訂閱分組配置
mqadmin命令的-i參數可以指定從哪個Broker消費消息(對應SubscriptionGroupConfig中的brokerId變量,默認是MASTER節點的ID);
-w參數可以指定建議從slave節點消費的時候,從哪個slave消費(對應SubscriptionGroupConfig中的whichBrokerWhenConsumeSlowly變量,默認值為1);
接下來會用到以上兩個參數。
Broker獲取到isSuggestPullingFromSlave的值之后,在構建返回結果時,會根據isSuggestPullingFromSlave的值進行以下處理:
- 如果建議從slave節點拉取消息(
isSuggestPullingFromSlave為true),會獲取訂閱分組配置中設置的whichBrokerWhenConsumeSlowly的值(默認-1)作為建議拉取消息的Broker ID,否則下次依舊建議從主節點拉取消息,將MASTER節點的ID設置到響應中; - 如果當前Broker的角色是slave節點,并且配置了不允許從slave節點讀取數據(SlaveReadEnable = false),此時依舊建議從主節點拉取消息,將MASTER節點的ID設置到響應中;
- 如果開啟了允許從slave節點讀取數據(SlaveReadEnable = true),有以下兩種情況:
isSuggestPullingFromSlave為true,表示建議從slave節點拉消息,會使用訂閱分組配置中設置的whichBrokerWhenConsumeSlowly的值(默認-1)作為建議拉取消息的Broker ID;isSuggestPullingFromSlave為false,表示不建議從slave節點拉取消息,會從訂閱分組配置中獲取brokerId(默認值為Master節點ID)的值作為建議拉取消息的Broker ID;
當然,如果未開啟允許從Slave節點讀取數據,下次依舊建議從Master節點拉取;
總結
默認情況下,消費者從Master節點拉取消息,Broker在處理消息拉取時會根據消息的拉取進度,進行判斷,如果未拉取消息的大小超過了總物理內存的40%,此時會建議消費者從Slave節點拉取消息,Broker會將下次建議拉取消息的BrokerID,設置到響應中返回給消費者。
從Slave節點拉取消息,需要開啟配置項SlaveReadEnable,可以通過mqadmin命令更改訂閱分組中的brokerId(默認值為Master節點ID)和whichBrokerWhenConsumeSlowly(默認-1),如果未設置使用默認值。
如果未開啟SlaveReadEnable,依舊會從Master節點拉取消息;
消費進度管理
消費者會優先選擇向主節點發送請求進行消費進度保存,假如主節點宕機等原因未能獲取到主節點的信息,會迭代集合選擇第一個節點返回,所以消費者也可以向從節點發送請求進行進度保存,待主節點恢復后,依舊優先選擇主節點。
消費進度同步
從節點在啟動時會注冊定時任務,定時進行數據同步:
- 從節點向主節點發送請求獲取消費進度數據;
- 從節點將獲取到的消費進度數據進行持久化;

浙公網安備 33010602011771號