RocketMQ主從同步原理
一. 主從同步概述
主從同步這個概念相信大家在平時的工作中,多少都會聽到。其目的主要是用于做一備份類操作,以及一些讀寫分離場景。比如我們常用的關系型數(shù)據(jù)庫mysql,就有主從同步功能在。
主從同步,就是將主服務器上的數(shù)據(jù)同步到從服務器上,也就是相當于新增了一個副本。
而具體的主從同步的實現(xiàn)也各有千秋,如mysql中通過binlog實現(xiàn)主從同步,es中通過translog實現(xiàn)主從同步,redis中通過aof實現(xiàn)主從同步。那么,rocketmq又是如何實現(xiàn)的主從同步呢?
另外,主從同步需要考慮的問題是哪些呢?
- 數(shù)據(jù)同步的及時性?(延遲與一致性)
- 對主服務器的影響性?(可用性)
- 是否可替代主服務器?(可用性或者分區(qū)容忍性)
前面兩個點是必須要考慮的,但對于第3個點,則可能不會被考慮。因為通過系統(tǒng)可能無法很好的做到這一點,所以很多系統(tǒng)就直接忽略這一點了,簡單嘛。即很多時候只把從服務器當作是一個備份存在,不會接受寫請求。如果要進行主從切換,必須要人工介入,做預知的有損切換。但隨著技術的發(fā)展,現(xiàn)在已有非常多的自動切換主從的服務存在,這是在分布式系統(tǒng)滿天下的當今的必然趨勢。
二. RocketMQ如何配置主從同步
在 RocketMQ 中,最核心的組件是 broker, 它負責幾乎所有的存儲讀取業(yè)務。所以,要談主從同步,那必然是針對broker進行的。我們再來回看 RocketMQ 的部署架構圖,以便全局觀察:

非常清晰的架構,無需多言。因為我們講的是主從同步,所以只看broker這個組件,那么整個架構就可以簡化為: BrokerMaster -> BrokerSlave 了。同樣,再簡化,主從同步就是如何將Master的數(shù)據(jù)同步到Slave這么個過程。
那么如何配置主從同步呢?
2.1 BrokerMaster配置
創(chuàng)建配置文件 conf/broker-a.properties
#所屬集群名字
brokerClusterName=DefaultCluster
#broker名字,名字可重復。相同名字的broker構成一組主從節(jié)點(每個master起一個名字,它的slave同他,eg:Amaster叫broker-a,他的slave也叫broker-a)
brokerName=broker-a
#節(jié)點ID,0 表示 Master 節(jié)點,大于 0 表示 Slave 節(jié)點
brokerId=0
#Broker 的角色
#- ASYNC_MASTER 異步復制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#nameServer地址,分號分割
namesrvAddr=172.0.1.5:9876;172.0.1.6:9876
2.2 BrokerSlave配置
創(chuàng)建配置文件 conf/broker-a-s.properties
#所屬集群名字
brokerClusterName=DefaultCluster
#broker名字,和Master節(jié)點保持一致
brokerName=broker-a
#節(jié)點ID,大于1表示這是Slave節(jié)點
brokerId=1
#Broker 的角色
#- ASYNC_MASTER 異步復制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=SLAVE
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#nameServer地址,分號分割
namesrvAddr=172.0.1.5:9876;172.0.1.6:9876
配置項看起來有點多,但核心實際上只有兩個:
- 在保持
brokderName相同的前提下配置brokerRole=ASYNC_MASTER|SLAVE|SYNC_MASTER,通過這個節(jié)點就能知道主從同步復制模式。 - 在保持
brokderName相同的前提下配置 Master節(jié)點的 brokerId 配置為0,Slave節(jié)點配置為大于0,通過這個值就可以確定是主是從。
具體配置文件叫什么名字不重要,重要的是要在啟動時指定指定對應的配置文件位置即可。啟動master/slave命令如下:
# 啟動Master節(jié)點
nohup sh /usr/local/rocketmq/bin/mqbroker -c conf/broker-a.properties > logs/broker-a.log 2>&1 &
# 啟動Slave節(jié)點
nohup sh /usr/local/rocketmq/bin/mqbroker -c conf/broker-a-s.properties > logs/broker-a-s.log 2>&1 &
三. 源碼分析
主從同步的實現(xiàn)邏輯主要在HAService中,在DefaultMessageStore的構造函數(shù)中,對HAService進行了實例化,并在start方法中,啟動了HAService:
public class DefaultMessageStore implements MessageStore {
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
// ...
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
// 初始化HAService
this.haService = new HAService(this);
} else {
this.haService = null;
}
// ...
}
public void start() throws Exception {
// ...
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
// 啟動HAService
this.haService.start();
// 主節(jié)點啟動延遲消息實現(xiàn),從節(jié)點關閉Slave消息實現(xiàn),主節(jié)點將延遲消息從延遲隊列轉移到真實的topic隊列時,也會由主從同步同步至從節(jié)點,
// 所以不需要從節(jié)點開啟延遲隊列
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
// ...
}
}
在HAService的構造函數(shù)中,創(chuàng)建了AcceptSocketService、GroupTransferService和HAClient,在start方法中主要做了如下幾件事:
- 調用
AcceptSocketService的beginAccept方法,這一步主要是進行端口綁定,在端口上監(jiān)聽從節(jié)點的連接請求(可以看做是運行在master節(jié)點的); - 調用
AcceptSocketService的start方法啟動服務,這一步主要為了處理從節(jié)點的連接請求,與從節(jié)點建立連接(可以看做是運行在master節(jié)點的); - 調用
GroupTransferService的start方法,主要用于在主從同步的時候,等待數(shù)據(jù)傳輸完畢(可以看做是運行在master節(jié)點的); - 調用
HAClient的start方法啟動,里面與master節(jié)點建立連接,向master匯報主從同步進度并存儲master發(fā)送過來的同步數(shù)據(jù)(可以看做是運行在從節(jié)點的);
public class HAService {
public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
this.defaultMessageStore = defaultMessageStore;
// 創(chuàng)建AcceptSocketService
this.acceptSocketService =
new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
this.groupTransferService = new GroupTransferService();
// 創(chuàng)建HAClient
this.haClient = new HAClient();
}
public void start() throws Exception {
// 開始監(jiān)聽從服務器的連接
this.acceptSocketService.beginAccept();
// 啟動服務
this.acceptSocketService.start();
// 啟動GroupTransferService
this.groupTransferService.start();
// 啟動
this.haClient.start();
}
}

3.1 監(jiān)聽從節(jié)點連接請求
AcceptSocketService 的 beginAccept 方法里面首先獲取了 ServerSocketChannel,然后進行端口綁定,并在selector上面注冊了OP_ACCEPT事件的監(jiān)聽,監(jiān)聽從節(jié)點的連接請求:
public class HAService {
class AcceptSocketService extends ServiceThread {
/**
* 監(jiān)聽從節(jié)點的連接
*
* @throws Exception If fails.
*/
public void beginAccept() throws Exception {
// 創(chuàng)建ServerSocketChannel
this.serverSocketChannel = ServerSocketChannel.open();
// 獲取selector
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
// 綁定端口
this.serverSocketChannel.socket().bind(this.socketAddressListen);
// 設置非阻塞
this.serverSocketChannel.configureBlocking(false);
// 注冊OP_ACCEPT連接事件的監(jiān)聽
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
}
}
3.2 處理從節(jié)點連接請求
AcceptSocketService的run方法中,對監(jiān)聽到的連接請求進行了處理,處理邏輯大致如下:
- 從selector中獲取到監(jiān)聽到的事件;
- 如果是
OP_ACCEPT連接事件,創(chuàng)建與從節(jié)點的連接對象HAConnection,與從節(jié)點建立連接,然后調用HAConnection的start方法進行啟動,并創(chuàng)建的HAConnection對象加入到連接集合中,HAConnection中封裝了Master節(jié)點和從節(jié)點的數(shù)據(jù)同步邏輯;
public class HAService {
class AcceptSocketService extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
// 如果服務未停止
while (!this.isStopped()) {
try {
this.selector.select(1000);
// 獲取監(jiān)聽到的事件
Set<SelectionKey> selected = this.selector.selectedKeys();
// 處理事件
if (selected != null) {
for (SelectionKey k : selected) {
// 如果是連接事件
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
// 創(chuàng)建HAConnection,建立連接
HAConnection conn = new HAConnection(HAService.this, sc);
// 啟動
conn.start();
// 添加連接
HAService.this.addConnection(conn);
} catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
} else {
log.warn("Unexpected ops in select " + k.readyOps());
}
}
selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}
log.info(this.getServiceName() + " service end");
}
}
}
3.3 等待主從復制傳輸結束
GroupTransferService的run方法主要是為了在進行主從數(shù)據(jù)同步的時候,等待從節(jié)點數(shù)據(jù)同步完畢。
在運行時首先進會調用waitForRunning進行等待,因為此時可能還有沒有開始主從同步,所以先進行等待,之后如果有同步請求,會喚醒該線程,然后調用doWaitTransfer方法等待數(shù)據(jù)同步完成:
public class HAService {
class GroupTransferService extends ServiceThread {
public void run() {
log.info(this.getServiceName() + " service started");
// 如果服務未停止
while (!this.isStopped()) {
try {
// 等待運行
this.waitForRunning(10);
// 如果被喚醒,調用doWaitTransfer等待主從同步完成
this.doWaitTransfer();
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info(this.getServiceName() + " service end");
}
}
}
在看doWaitTransfer方法之前,首先看下是如何判斷有數(shù)據(jù)需要同步的。
Master節(jié)點中,當消息被寫入到 CommitLog 以后,會調用 handleHA 方法處主從同步,首先判斷當前Broker的角色是否是SYNC_MASTER,如果是則會構建消息提交請求GroupCommitRequest,然后調用HAService的putRequest添加到請求集合中,并喚醒GroupTransferService中在等待的線程:
public class CommitLog {
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// 判斷是否是同步復制到從節(jié)點
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
// 如果是同步復制,也可以在發(fā)送消息時手動設置不等待同步成功就返回,這樣增加了同步復制場景下的靈活性。在小部分不需要的同步復制,
// 但是RocketMQ又配置為主從同步復制的場景下,也使得消息異步復制
if (messageExt.isWaitStoreMsgOK()) {
// Determine whether to wait
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
// 提交主從同步任務
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
// 添加請求
service.putRequest(request);
// 喚醒GroupTransferService中在等待的線程
service.getWaitNotifyObject().wakeupAll();
// 等待同步復制任務完成,異步同步線程復制完畢后會喚醒當前線程,超時時間默認為5s,如果超時則返回刷盤錯誤,刷盤成功后正常返回給調用方。
boolean flushOK =
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
}
}
在HAService.GroupCommitRequest.doWaitTransfer方法中,會判斷 CommitLog 提交請求集合 requestsRead 是否為空,如果不為空,表示有消息寫入了 CommitLog,Master節(jié)點需要等待將數(shù)據(jù)傳輸給從節(jié)點:
push2SlaveMaxOffset記錄了從節(jié)點已經(jīng)同步的消息偏移量,判斷push2SlaveMaxOffset是否大于本次CommitLog提交的偏移量,也就是請求中設置的偏移量;- 獲取請求中設置的等待截止時間;
- 開啟循環(huán),判斷數(shù)據(jù)是否還未傳輸完畢,并且未超過截止時間,如果是則等待1s,然后繼續(xù)判斷傳輸是否完畢,不斷進行,直到超過截止時間或者數(shù)據(jù)已經(jīng)傳輸完畢;
(向從節(jié)點發(fā)送的消息最大偏移量push2SlaveMaxOffset超過了請求中設置的偏移量表示本次同步數(shù)據(jù)傳輸完畢); - 喚醒在等待數(shù)據(jù)同步完畢的線程;
public class HAService {
// CommitLog提交請求集合
private volatile LinkedList<CommitLog.GroupCommitRequest> requestsRead = new LinkedList<>();
class GroupTransferService extends ServiceThread {
private void doWaitTransfer() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
// 判斷主從同步是否完成,判斷傳輸?shù)綇墓?jié)點最大偏移量是否超過了請求中設置的偏移量
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
// 默認同步有5s的超時時間
long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
+ HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
// 如果從節(jié)點還未同步完畢并且未超過截止時間
while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
// 等待
this.notifyTransferObject.waitForRunning(1000);
// 判斷從節(jié)點同步的最大偏移量是否超過了請求中設置的偏移量
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
if (!transferOK) {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
}
// 同步完成,喚醒消息發(fā)送處理線程
req.wakeupCustomer(transferOK);
}
this.requestsRead.clear();
}
}
}
}
}
3.4 啟動HAClient
HAClient可以看做是在從節(jié)點上運行的,主要進行的處理如下:
- 調用
connectMaster方法連接Master節(jié)點,Master節(jié)點上也會運行,但是它本身就是Master沒有可連的Master節(jié)點,所以可以忽略; - 調用
isTimeToReportOffset方法判斷是否需要向Master節(jié)點匯報同步偏移量,如果需要則調用reportSlaveMaxOffset方法將當前的消息同步偏移量發(fā)送給Master節(jié)點; - 調用
processReadEvent處理網(wǎng)絡請求中的可讀事件,也就是處理Master發(fā)送過來的消息,將消息存入CommitLog;
public class HAService {
class HAClient extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 連接Master節(jié)點
if (this.connectMaster()) {
// 是否需要報告消息同步偏移量
if (this.isTimeToReportOffset()) {
// 向Master節(jié)點發(fā)送同步偏移量
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
// 等待事件產(chǎn)生
this.selector.select(1000);
// 處理讀事件,也就是Master節(jié)點發(fā)送的數(shù)據(jù)
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
// ...
} else {
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.waitForRunning(1000 * 5);
}
}
log.info(this.getServiceName() + " service end");
}
}
}
3.4.1 連接主節(jié)點
connectMaster 方法中會獲取Master節(jié)點的地址,并轉換為 SocketAddress 對象,然后向Master節(jié)點請求建立連接,并在selector注冊OP_READ可讀事件監(jiān)聽:
public class HAService {
class HAClient extends ServiceThread {
// 當前的主從復制進度
private long currentReportedOffset = 0;
private boolean connectMaster() throws ClosedChannelException {
// 如果socketChannel為空,則嘗試連接主服務器
if (null == socketChannel) {
String addr = this.masterAddress.get();
if (addr != null) {
// 將地址轉為SocketAddress
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
// 建立與Master的連接
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
// 注冊OP_READ(網(wǎng)絡讀事件)
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
// 獲取從節(jié)點本地的CommitLog最大偏移量
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
// 更新上次心跳發(fā)送時間
this.lastWriteTimestamp = System.currentTimeMillis();
}
// 如果主服務器地址為空,返回false
return this.socketChannel != null;
}
}
3.4.2 發(fā)送主從同步消息拉取偏移量
在isTimeToReportOffset方法中,首先獲取當前時間與上一次進行主從同步的時間間隔interval,如果時間間隔interval大于配置的發(fā)送心跳時間間隔,表示需要向Master節(jié)點發(fā)送從節(jié)點消息同步的偏移量,接下來會調用reportSlaveMaxOffset方法發(fā)送同步偏移量,也就是說從節(jié)點會定時向Master節(jié)點發(fā)送請求,反饋CommitLog中同步消息的偏移量:
public class HAService {
class HAClient extends ServiceThread {
// 當前從節(jié)點已經(jīng)同步消息的偏移量大小
private long currentReportedOffset = 0;
private boolean isTimeToReportOffset() {
// 獲取距離上一次主從同步的間隔時間
long interval =
HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
// 判斷是否超過了配置的發(fā)送心跳包時間間隔
boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
.getHaSendHeartbeatInterval();
return needHeart;
}
// 發(fā)送同步偏移量,傳入的參數(shù)是當前的主從復制偏移量currentReportedOffset
private boolean reportSlaveMaxOffset(final long maxOffset) {
this.reportOffset.position(0);
this.reportOffset.limit(8); // 設置數(shù)據(jù)傳輸大小為8個字節(jié)
this.reportOffset.putLong(maxOffset);// 設置同步偏移量
this.reportOffset.position(0);
this.reportOffset.limit(8);
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
// 向Master節(jié)點發(fā)送拉取偏移量
this.socketChannel.write(this.reportOffset);
} catch (IOException e) {
log.error(this.getServiceName()
+ "reportSlaveMaxOffset this.socketChannel.write exception", e);
return false;
}
}
// 更新上次心跳發(fā)送時間
lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
return !this.reportOffset.hasRemaining();
}
}
}
3.4.3 處理網(wǎng)絡讀事件
processReadEvent方法中處理了可讀事件,也就是處理Master節(jié)點發(fā)送的同步數(shù)據(jù),首先從 socketChannel 中讀取數(shù)據(jù)到byteBufferRead 中,byteBufferRead 是讀緩沖區(qū),讀取數(shù)據(jù)的方法會返回讀取到的字節(jié)數(shù),對字節(jié)數(shù)大小進行判斷:
- 如果可讀字節(jié)數(shù)大于0表示有數(shù)據(jù)需要處理,調用
dispatchReadRequest方法進行處理; - 如果可讀字節(jié)數(shù)為0表示沒有可讀數(shù)據(jù),此時記錄讀取到空數(shù)據(jù)的次數(shù),如果連續(xù)讀到空數(shù)據(jù)的次數(shù)大于3次,將終止本次處理;
class HAClient extends ServiceThread {
// 讀緩沖區(qū),會將從socketChannel讀入緩沖區(qū)
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
// 循環(huán)判斷readByteBuffer是否還有剩余空間,如果存在剩余空間,則調用
// SocketChannel#read(ByteBuffer readByteBuffer)方法,將通道中
// 的數(shù)據(jù)讀入讀緩存區(qū)。
while (this.byteBufferRead.hasRemaining()) {
try {
// 從socketChannel中讀取數(shù)據(jù)到byteBufferRead中,返回讀取到的字節(jié)數(shù)
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
// 重置readSizeZeroTimes
readSizeZeroTimes = 0;
// 然后調用dispatchReadRequest方法將讀取到的所有消息全部追加到消息內存映射文件中,再次反饋拉取進度給主服務器。
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
// 記錄讀取到空數(shù)據(jù)的次數(shù)
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
} catch (IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}
return true;
}
}
dispatchReadRequest方法中會將從節(jié)點讀取到的數(shù)據(jù)寫入 CommitLog,dispatchPosition記錄了已經(jīng)處理的數(shù)據(jù)在讀緩沖區(qū)中的位置,從讀緩沖區(qū)byteBufferRead獲取剩余可讀取的字節(jié)數(shù),如果可讀數(shù)據(jù)的字節(jié)數(shù)大于一個消息頭的字節(jié)數(shù)(12個字節(jié)),表示有數(shù)據(jù)還未處理完畢,反之表示消息已經(jīng)處理完畢結束處理。對數(shù)據(jù)的處理邏輯如下:
- 從緩沖區(qū)中讀取數(shù)據(jù),首先獲取到的是消息在master節(jié)點的物理偏移量masterPhyOffset;
- 向后讀取8個字節(jié),得到消息體內容的字節(jié)數(shù)bodySize;
- 獲取從節(jié)點當前
CommitLog的最大物理偏移量slavePhyOffset,如果不為0并且不等于masterPhyOffset,表示與Master節(jié)點的傳輸偏移量不一致,也就是數(shù)據(jù)不一致,此時終止處理; - 如果可讀取的字節(jié)數(shù)大于一個消息頭的字節(jié)數(shù) + 消息體大小,表示有消息可處理,繼續(xù)進行下一步;
- 計算消息體在讀緩沖區(qū)中的起始位置,從讀緩沖區(qū)中根據(jù)起始位置,讀取消息內容,將消息追加到從節(jié)點的CommitLog中;
- 更新dispatchPosition的值為消息頭大小 + 消息體大小,dispatchPosition之前的數(shù)據(jù)表示已經(jīng)處理完畢;

class HAClient extends ServiceThread {
// 已經(jīng)處理的數(shù)據(jù)在讀緩沖區(qū)中的位置,初始化為0
private int dispatchPosition = 0;
// 讀緩沖區(qū)
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private boolean dispatchReadRequest() {
// 消息頭大小
final int msgHeaderSize = 8 + 4; // phyoffset + size
// 開啟循環(huán)不斷讀取數(shù)據(jù)
while (true) {
// 獲可讀取的字節(jié)數(shù)
int diff = this.byteBufferRead.position() - this.dispatchPosition;
// 如果字節(jié)數(shù)大于一個消息頭的字節(jié)數(shù)
if (diff >= msgHeaderSize) {
// 獲取消息在master節(jié)點的物理偏移量
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
// 獲取消息體大小
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
// 獲取從節(jié)點當前CommitLog的最大物理偏移量
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
if (slavePhyOffset != 0) {
// 如果不一致結束處理
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ slavePhyOffset + " MASTER: " + masterPhyOffset);
return false;
}
}
// 如果可讀取的字節(jié)數(shù)大于一個消息頭的字節(jié)數(shù) + 消息體大小
if (diff >= (msgHeaderSize + bodySize)) {
// 將度緩沖區(qū)的數(shù)據(jù)轉為字節(jié)數(shù)組
byte[] bodyData = byteBufferRead.array();
// 計算消息體在讀緩沖區(qū)中的起始位置
int dataStart = this.dispatchPosition + msgHeaderSize;
// 從讀緩沖區(qū)中根據(jù)消息的位置,讀取消息內容,將消息追加到從節(jié)點的CommitLog中
HAService.this.defaultMessageStore.appendToCommitLog(
masterPhyOffset, bodyData, dataStart, bodySize);
// 更新dispatchPosition的值為消息頭大小+消息體大小
this.dispatchPosition += msgHeaderSize + bodySize;
if (!reportSlaveMaxOffsetPlus()) {
return false;
}
continue;
}
}
if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}
break;
}
return true;
}
}
3.5 HAConnection
HAConnection中封裝了Master節(jié)點與從節(jié)點的網(wǎng)絡通信處理,分別在ReadSocketService和WriteSocketService中。
3.5.1 ReadSocketService
ReadSocketService啟動后處理監(jiān)聽到的可讀事件,前面知道HAClient中從節(jié)點會定時向Master節(jié)點匯報從節(jié)點的消息同步偏移量,Master節(jié)點對匯報請求的處理就在這里,如果從網(wǎng)絡中監(jiān)聽到了可讀事件,會調用processReadEvent處理讀事件:
public class HAConnection {
class ReadSocketService extends ServiceThread {
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
// 處理可讀事件
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}
// ...
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
// ...
HAConnection.log.info(this.getServiceName() + " service end");
}
}
}
processReadEvent中從網(wǎng)絡中處理讀事件的方式與上面HAClient的dispatchReadRequest類似,都是將網(wǎng)絡中的數(shù)據(jù)讀取到讀緩沖區(qū)中,并用一個變量記錄已讀取數(shù)據(jù)的位置,processReadEvent方法的處理邏輯如下:
- 從socketChannel讀取數(shù)據(jù)到讀緩沖區(qū)byteBufferRead中,返回讀取到的字節(jié)數(shù);
- 如果讀取到的字節(jié)數(shù)大于0,進入下一步,如果讀取到的字節(jié)數(shù)為0,記錄連續(xù)讀取到空字節(jié)數(shù)的次數(shù)是否超過三次,如果超過終止處理;
- 判斷剩余可讀取的字節(jié)數(shù)是否大于等于8,前面知道,從節(jié)點發(fā)送同步消息拉取偏移量的時候設置的字節(jié)大小為8,所以字節(jié)數(shù)大于等于8的時候表示需要讀取從節(jié)點發(fā)送的偏移量;
- 計算數(shù)據(jù)在緩沖區(qū)中的位置,從緩沖區(qū)讀取從節(jié)點發(fā)送的同步偏移量readOffset;
- 更新processPosition的值,processPosition表示讀緩沖區(qū)中已經(jīng)處理數(shù)據(jù)的位置;
- 更新slaveAckOffset為從節(jié)點發(fā)送的同步偏移量readOffset的值;
- 如果當前Master節(jié)點記錄的從節(jié)點的同步偏移量slaveRequestOffset小于0,表示還未進行同步,此時將slaveRequestOffset更新為從節(jié)點發(fā)送的同步偏移量;
- 如果從節(jié)點發(fā)送的同步偏移量比當前Master節(jié)點的最大物理偏移量還要大,終止本次處理;
- 調用notifyTransferSome,更新Master節(jié)點記錄的向從節(jié)點同步消息的偏移量;
public class HAConnection {
class ReadSocketService extends ServiceThread {
// 讀緩沖區(qū)
private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
// 讀緩沖區(qū)中已經(jīng)處理的數(shù)據(jù)位置
private int processPosition = 0;
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
// 如果沒有可讀數(shù)據(jù)
if (!this.byteBufferRead.hasRemaining()) {
this.byteBufferRead.flip();
// 處理位置置為0
this.processPosition = 0;
}
// 如果數(shù)據(jù)未讀取完畢
while (this.byteBufferRead.hasRemaining()) {
try {
// 從socketChannel讀取數(shù)據(jù)到byteBufferRead中,返回讀取到的字節(jié)數(shù)
int readSize = this.socketChannel.read(this.byteBufferRead);
// 如果讀取數(shù)據(jù)字節(jié)數(shù)大于0
if (readSize > 0) {
// 重置readSizeZeroTimes
readSizeZeroTimes = 0;
// 獲取上次處理讀事件的時間戳
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
// 判斷剩余可讀取的字節(jié)數(shù)是否大于等于8
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
// 獲取偏移量內容的結束位置
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
// 從結束位置向前讀取8個字節(jié)得到從點發(fā)送的同步偏移量
long readOffset = this.byteBufferRead.getLong(pos - 8);
// 更新處理位置
this.processPosition = pos;
// 更新slaveAckOffset為從節(jié)點發(fā)送的同步進度
HAConnection.this.slaveAckOffset = readOffset;
// 如果記錄的從節(jié)點的同步進度小于0,表示還未進行同步
if (HAConnection.this.slaveRequestOffset < 0) {
// 更新為從節(jié)點發(fā)送的同步進度
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
} else if (HAConnection.this.slaveAckOffset > HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset()) {
// 如果從節(jié)點發(fā)送的拉取偏移量比當前Master節(jié)點的最大物理偏移量還要大
log.warn("slave[{}] request offset={} greater than local commitLog offset={}. ",
HAConnection.this.clientAddr,
HAConnection.this.slaveAckOffset,
HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset());
return false;
}
// 更新Master節(jié)點記錄的向從節(jié)點同步消息的偏移量
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
} else if (readSize == 0)
// 判斷連續(xù)讀取到空數(shù)據(jù)的次數(shù)是否超過三次
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
return false;
}
} catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}
return true;
}
}
}
前面在 GroupTransferService 中可以看到是通過 push2SlaveMaxOffset 的值判斷本次同步是否完成的,在 notifyTransferSome 方法中可以看到當Master節(jié)點收到從節(jié)點反饋的消息拉取偏移量時,對 push2SlaveMaxOffset 的值進行了更新:
public class HAService {
// 向從節(jié)點推送的消息最大偏移量
private final GroupTransferService groupTransferService;
public void notifyTransferSome(final long offset) {
// 如果傳入的偏移大于push2SlaveMaxOffset記錄的值,進行更新
for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
// 更新向從節(jié)點推送的消息最大偏移量
boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
if (ok) {
this.groupTransferService.notifyTransferSome();
break;
} else {
value = this.push2SlaveMaxOffset.get();
}
}
}
}
3.5.2 WriteSocketService
WriteSocketService用于Master節(jié)點向從節(jié)點發(fā)送同步消息,處理邏輯如下:
- 根據(jù)從節(jié)點發(fā)送的主從同步消息拉取偏移量
slaveRequestOffset進行判斷:- 如果
slaveRequestOffset值為-1,表示還未收到從節(jié)點報告的同步偏移量,此時睡眠一段時間等待從節(jié)點發(fā)送消息拉取偏移量; - 如果
slaveRequestOffset值不為-1,表示已經(jīng)開始進行主從同步進行下一步;
- 如果
- 判斷
nextTransferFromWhere值是否為-1,nextTransferFromWhere記錄了下次需要傳輸?shù)南⒃贑ommitLog中的偏移量,如果值為-1表示初次進行數(shù)據(jù)同步,此時有兩種情況:- 如果從節(jié)點發(fā)送的拉取偏移量slaveRequestOffset為0,就從當前CommitLog文件最大偏移量開始同步;
- 如果slaveRequestOffset不為0,則從slaveRequestOffset位置處進行數(shù)據(jù)同步;
- 判斷上次寫事件是否已經(jīng)將數(shù)據(jù)都寫入到從節(jié)點
- 如果已經(jīng)寫入完畢,判斷距離上次寫入數(shù)據(jù)的時間間隔是否超過了設置的心跳時間,如果超過,為了避免連接空閑被關閉,需要發(fā)送一個心跳包,此時構建心跳包的請求數(shù)據(jù),調用transferData方法傳輸數(shù)據(jù);
- 如果上次的數(shù)據(jù)還未傳輸完畢,調用transferData方法繼續(xù)傳輸,如果還是未完成,則結束此處處理;
- 根據(jù)nextTransferFromWhere從CommitLog中獲取消息,如果未獲取到消息,等待100ms,如果獲取到消息,從CommitLog中獲取消息進行傳輸:
(1)如果獲取到消息的字節(jié)數(shù)大于最大傳輸?shù)拇笮。O置最最大傳輸數(shù)量,分批進行傳輸;
(2)更新下次傳輸?shù)钠屏康刂芬簿褪莕extTransferFromWhere的值;
(3)從CommitLog中獲取的消息內容設置到將讀取到的消息數(shù)據(jù)設置到selectMappedBufferResult中;
(4)設置消息頭信息,包括消息頭字節(jié)數(shù)、拉取消息的偏移量等;
(5)調用transferData發(fā)送數(shù)據(jù);
public class HAConnection {
class WriteSocketService extends ServiceThread {
private final int headerSize = 8 + 4;// 消息頭大小
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
// 如果slaveRequestOffset為-1,表示還未收到從節(jié)點報告的拉取進度
if (-1 == HAConnection.this.slaveRequestOffset) {
// 等待一段時間
Thread.sleep(10);
continue;
}
// 初次進行數(shù)據(jù)同步
if (-1 == this.nextTransferFromWhere) {
// 如果拉取進度為0
if (0 == HAConnection.this.slaveRequestOffset) {
// 從master節(jié)點最大偏移量從開始傳輸
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMappedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
// 更新nextTransferFromWhere
this.nextTransferFromWhere = masterOffset;
} else {
// 根據(jù)從節(jié)點發(fā)送的偏移量開始數(shù)據(jù)同步
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+ "], and slave request " + HAConnection.this.slaveRequestOffset);
}
// 判斷上次傳輸是否完畢
if (this.lastWriteOver) {
// 獲取當前時間距離上次寫入數(shù)據(jù)的時間間隔
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
// 如果距離上次寫入數(shù)據(jù)的時間間隔超過了設置的心跳時間
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getHaSendHeartbeatInterval()) {
// 構建header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();
// 發(fā)送心跳包
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
} else {
// 未傳輸完畢,繼續(xù)上次的傳輸
this.lastWriteOver = this.transferData();
// 如果依舊未完成,結束本次處理
if (!this.lastWriteOver)
continue;
}
// 根據(jù)偏移量獲取消息數(shù)據(jù)
SelectMappedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {// 獲取消息不為空
// 獲取消息內容大小
int size = selectResult.getSize();
// 如果消息的字節(jié)數(shù)大于最大傳輸?shù)拇笮?/span>
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
// 設置為最大傳輸大小
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
long thisOffset = this.nextTransferFromWhere;
// 更新下次傳輸?shù)钠屏康刂?/span>
this.nextTransferFromWhere += size;
selectResult.getByteBuffer().limit(size);
// 將讀取到的消息數(shù)據(jù)設置到selectMappedBufferResult
this.selectMappedBufferResult = selectResult;
// 設置消息頭
this.byteBufferHeader.position(0);
// 設置消息頭大小
this.byteBufferHeader.limit(headerSize);
// 設置偏移量地址
this.byteBufferHeader.putLong(thisOffset);
// 設置消息內容大小
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();
// 發(fā)送數(shù)據(jù)
this.lastWriteOver = this.transferData();
} else {
// 等待100ms
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();
// ...
HAConnection.log.info(this.getServiceName() + " service end");
}
}
}
四. 總結

有新消息寫入之后的同步流程

本文參考至:
【RocketMQ】【源碼】主從同步實現(xiàn)原理 - shanml - 博客園 (cnblogs.com)

浙公網(wǎng)安備 33010602011771號