【RocketMQ】【源碼】主從同步實(shí)現(xiàn)原理
主從同步的實(shí)現(xiàn)邏輯主要在HAService中,在DefaultMessageStore的構(gòu)造函數(shù)中,對(duì)HAService進(jìn)行了實(shí)例化,并在start方法中,啟動(dòng)了HAService:
public class DefaultMessageStore implements MessageStore {
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
// ...
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
// 初始化HAService
this.haService = new HAService(this);
} else {
this.haService = null;
}
// ...
}
public void start() throws Exception {
// ...
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
// 啟動(dòng)HAService
this.haService.start();
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
// ...
}
}
在HAService的構(gòu)造函數(shù)中,創(chuàng)建了AcceptSocketService、GroupTransferService和HAClient,在start方法中主要做了如下幾件事:
- 調(diào)用
AcceptSocketService的beginAccept方法,這一步主要是進(jìn)行端口綁定,在端口上監(jiān)聽從節(jié)點(diǎn)的連接請(qǐng)求(可以看做是運(yùn)行在master節(jié)點(diǎn)的); - 調(diào)用
AcceptSocketService的start方法啟動(dòng)服務(wù),這一步主要為了處理從節(jié)點(diǎn)的連接請(qǐng)求,與從節(jié)點(diǎn)建立連接(可以看做是運(yùn)行在master節(jié)點(diǎn)的); - 調(diào)用
GroupTransferService的start方法,主要用于在主從同步的時(shí)候,等待數(shù)據(jù)傳輸完畢(可以看做是運(yùn)行在master節(jié)點(diǎn)的); - 調(diào)用
HAClient的start方法啟動(dòng),里面與master節(jié)點(diǎn)建立連接,向master匯報(bào)主從同步進(jìn)度并存儲(chǔ)master發(fā)送過來(lái)的同步數(shù)據(jù)(可以看做是運(yùn)行在從節(jié)點(diǎn)的);
public class HAService {
public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
this.defaultMessageStore = defaultMessageStore;
// 創(chuàng)建AcceptSocketService
this.acceptSocketService =
new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
this.groupTransferService = new GroupTransferService();
// 創(chuàng)建HAClient
this.haClient = new HAClient();
}
public void start() throws Exception {
// 開始監(jiān)聽從服務(wù)器的連接
this.acceptSocketService.beginAccept();
// 啟動(dòng)服務(wù)
this.acceptSocketService.start();
// 啟動(dòng)GroupTransferService
this.groupTransferService.start();
// 啟動(dòng)
this.haClient.start();
}
}

監(jiān)聽從節(jié)點(diǎn)連接請(qǐng)求
AcceptSocketService的beginAccept方法里面首先獲取了ServerSocketChannel,然后進(jìn)行端口綁定,并在selector上面注冊(cè)了OP_ACCEPT事件的監(jiān)聽,監(jiān)聽從節(jié)點(diǎn)的連接請(qǐng)求:
public class HAService {
class AcceptSocketService extends ServiceThread {
/**
* 監(jiān)聽從節(jié)點(diǎn)的連接
*
* @throws Exception If fails.
*/
public void beginAccept() throws Exception {
// 創(chuàng)建ServerSocketChannel
this.serverSocketChannel = ServerSocketChannel.open();
// 獲取selector
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
// 綁定端口
this.serverSocketChannel.socket().bind(this.socketAddressListen);
// 設(shè)置非阻塞
this.serverSocketChannel.configureBlocking(false);
// 注冊(cè)O(shè)P_ACCEPT連接事件的監(jiān)聽
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
}
}
處理從節(jié)點(diǎn)連接請(qǐng)求
AcceptSocketService的run方法中,對(duì)監(jiān)聽到的連接請(qǐng)求進(jìn)行了處理,處理邏輯大致如下:
- 從selector中獲取到監(jiān)聽到的事件;
- 如果是
OP_ACCEPT連接事件,創(chuàng)建與從節(jié)點(diǎn)的連接對(duì)象HAConnection,與從節(jié)點(diǎn)建立連接,然后調(diào)用HAConnection的start方法進(jìn)行啟動(dòng),并創(chuàng)建的HAConnection對(duì)象加入到連接集合中,HAConnection中封裝了Master節(jié)點(diǎn)和從節(jié)點(diǎn)的數(shù)據(jù)同步邏輯;
public class HAService {
class AcceptSocketService extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
// 如果服務(wù)未停止
while (!this.isStopped()) {
try {
this.selector.select(1000);
// 獲取監(jiān)聽到的事件
Set<SelectionKey> selected = this.selector.selectedKeys();
// 處理事件
if (selected != null) {
for (SelectionKey k : selected) {
// 如果是連接事件
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
// 創(chuàng)建HAConnection,建立連接
HAConnection conn = new HAConnection(HAService.this, sc);
// 啟動(dòng)
conn.start();
// 添加連接
HAService.this.addConnection(conn);
} catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
} else {
log.warn("Unexpected ops in select " + k.readyOps());
}
}
selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}
log.info(this.getServiceName() + " service end");
}
}
}
等待主從復(fù)制傳輸結(jié)束
GroupTransferService的run方法主要是為了在進(jìn)行主從數(shù)據(jù)同步的時(shí)候,等待從節(jié)點(diǎn)數(shù)據(jù)同步完畢。
在運(yùn)行時(shí)首先進(jìn)會(huì)調(diào)用waitForRunning進(jìn)行等待,因?yàn)榇藭r(shí)可能還有沒有開始主從同步,所以先進(jìn)行等待,之后如果有同步請(qǐng)求,會(huì)喚醒該線程,然后調(diào)用doWaitTransfer方法等待數(shù)據(jù)同步完成:
public class HAService {
class GroupTransferService extends ServiceThread {
public void run() {
log.info(this.getServiceName() + " service started");
// 如果服務(wù)未停止
while (!this.isStopped()) {
try {
// 等待運(yùn)行
this.waitForRunning(10);
// 如果被喚醒,調(diào)用doWaitTransfer等待主從同步完成
this.doWaitTransfer();
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info(this.getServiceName() + " service end");
}
}
}
在看doWaitTransfer方法之前,首先看下是如何判斷有數(shù)據(jù)需要同步的。
Master節(jié)點(diǎn)中,當(dāng)消息被寫入到CommitLog以后,會(huì)調(diào)用submitReplicaRequest方法處主從同步,首先判斷當(dāng)前Broker的角色是否是SYNC_MASTER,如果是則會(huì)構(gòu)建消息提交請(qǐng)求GroupCommitRequest,然后調(diào)用HAService的putRequest添加到請(qǐng)求集合中,并喚醒GroupTransferService中在等待的線程:
public class CommitLog {
public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
// 構(gòu)建GroupCommitRequest
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout());
// 添加請(qǐng)求
service.putRequest(request);
// 喚醒GroupTransferService中在等待的線程
service.getWaitNotifyObject().wakeupAll();
return request.future();
}
else {
return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
在doWaitTransfer方法中,會(huì)判斷CommitLog提交請(qǐng)求集合requestsRead是否為空,如果不為空,表示有消息寫入了CommitLog,Master節(jié)點(diǎn)需要等待將數(shù)據(jù)傳輸給從節(jié)點(diǎn):
- push2SlaveMaxOffset記錄了從節(jié)點(diǎn)已經(jīng)同步的消息偏移量,判斷push2SlaveMaxOffset是否大于本次CommitLog提交的偏移量,也就是請(qǐng)求中設(shè)置的偏移量;
- 獲取請(qǐng)求中設(shè)置的等待截止時(shí)間;
- 開啟循環(huán),判斷數(shù)據(jù)是否還未傳輸完畢,并且未超過截止時(shí)間,如果是則等待1s,然后繼續(xù)判斷傳輸是否完畢,不斷進(jìn)行,直到超過截止時(shí)間或者數(shù)據(jù)已經(jīng)傳輸完畢;
(向從節(jié)點(diǎn)發(fā)送的消息最大偏移量push2SlaveMaxOffset超過了請(qǐng)求中設(shè)置的偏移量表示本次同步數(shù)據(jù)傳輸完畢); - 喚醒在等待數(shù)據(jù)同步完畢的線程;
public class HAService {
// CommitLog提交請(qǐng)求集合
private volatile LinkedList<CommitLog.GroupCommitRequest> requestsRead = new LinkedList<>();
class GroupTransferService extends ServiceThread {
private void doWaitTransfer() {
// 如果CommitLog提交請(qǐng)求集合不為空
if (!this.requestsRead.isEmpty()) {
// 處理消息提交請(qǐng)求
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
// 判斷傳輸?shù)綇墓?jié)點(diǎn)最大偏移量是否超過了請(qǐng)求中設(shè)置的偏移量
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
// 獲取截止時(shí)間
long deadLine = req.getDeadLine();
// 如果從節(jié)點(diǎn)還未同步完畢并且未超過截止時(shí)間
while (!transferOK && deadLine - System.nanoTime() > 0) {
// 等待
this.notifyTransferObject.waitForRunning(1000);
// 判斷從節(jié)點(diǎn)同步的最大偏移量是否超過了請(qǐng)求中設(shè)置的偏移量
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
// 喚醒
req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
this.requestsRead = new LinkedList<>();
}
}
}
}
啟動(dòng)HAClient
HAClient可以看做是在從節(jié)點(diǎn)上運(yùn)行的,主要進(jìn)行的處理如下:
- 調(diào)用
connectMaster方法連接Master節(jié)點(diǎn),Master節(jié)點(diǎn)上也會(huì)運(yùn)行,但是它本身就是Master沒有可連的Master節(jié)點(diǎn),所以可以忽略; - 調(diào)用
isTimeToReportOffset方法判斷是否需要向Master節(jié)點(diǎn)匯報(bào)同步偏移量,如果需要?jiǎng)t調(diào)用reportSlaveMaxOffset方法將當(dāng)前的消息同步偏移量發(fā)送給Master節(jié)點(diǎn); - 調(diào)用
processReadEvent處理網(wǎng)絡(luò)請(qǐng)求中的可讀事件,也就是處理Master發(fā)送過來(lái)的消息,將消息存入CommitLog;
public class HAService {
class HAClient extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 連接Master節(jié)點(diǎn)
if (this.connectMaster()) {
// 是否需要報(bào)告消息同步偏移量
if (this.isTimeToReportOffset()) {
// 向Master節(jié)點(diǎn)發(fā)送同步偏移量
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
// 等待事件產(chǎn)生
this.selector.select(1000);
// 處理讀事件,也就是Master節(jié)點(diǎn)發(fā)送的數(shù)據(jù)
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
// ...
} else {
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.waitForRunning(1000 * 5);
}
}
log.info(this.getServiceName() + " service end");
}
}
}
連接主節(jié)點(diǎn)
connectMaster方法中會(huì)獲取Master節(jié)點(diǎn)的地址,并轉(zhuǎn)換為SocketAddress對(duì)象,然后向Master節(jié)點(diǎn)請(qǐng)求建立連接,并在selector注冊(cè)O(shè)P_READ可讀事件監(jiān)聽:
public class HAService {
class HAClient extends ServiceThread {
// 當(dāng)前的主從復(fù)制進(jìn)度
private long currentReportedOffset = 0;
private boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) {
String addr = this.masterAddress.get();
if (addr != null) {
// 將地址轉(zhuǎn)為SocketAddress
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
// 連接master
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
// 注冊(cè)O(shè)P_READ可讀事件監(jiān)聽
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
// 獲取CommitLog中當(dāng)前最大的偏移量
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
// 更新上次寫入時(shí)間
this.lastWriteTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}
}
發(fā)送主從同步消息拉取偏移量
在isTimeToReportOffset方法中,首先獲取當(dāng)前時(shí)間與上一次進(jìn)行主從同步的時(shí)間間隔interval,如果時(shí)間間隔interval大于配置的發(fā)送心跳時(shí)間間隔,表示需要向Master節(jié)點(diǎn)發(fā)送從節(jié)點(diǎn)消息同步的偏移量,接下來(lái)會(huì)調(diào)用reportSlaveMaxOffset方法發(fā)送同步偏移量,也就是說(shuō)從節(jié)點(diǎn)會(huì)定時(shí)向Master節(jié)點(diǎn)發(fā)送請(qǐng)求,反饋CommitLog中同步消息的偏移量:
public class HAService {
class HAClient extends ServiceThread {
// 當(dāng)前從節(jié)點(diǎn)已經(jīng)同步消息的偏移量大小
private long currentReportedOffset = 0;
private boolean isTimeToReportOffset() {
// 獲取距離上一次主從同步的間隔時(shí)間
long interval =
HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
// 判斷是否超過了配置的發(fā)送心跳包時(shí)間間隔
boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
.getHaSendHeartbeatInterval();
return needHeart;
}
// 發(fā)送同步偏移量,傳入的參數(shù)是當(dāng)前的主從復(fù)制偏移量currentReportedOffset
private boolean reportSlaveMaxOffset(final long maxOffset) {
this.reportOffset.position(0);
this.reportOffset.limit(8); // 設(shè)置數(shù)據(jù)傳輸大小為8個(gè)字節(jié)
this.reportOffset.putLong(maxOffset);// 設(shè)置同步偏移量
this.reportOffset.position(0);
this.reportOffset.limit(8);
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
// 向Master節(jié)點(diǎn)發(fā)送拉取偏移量
this.socketChannel.write(this.reportOffset);
} catch (IOException e) {
log.error(this.getServiceName()
+ "reportSlaveMaxOffset this.socketChannel.write exception", e);
return false;
}
}
// 更新發(fā)送時(shí)間
lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
return !this.reportOffset.hasRemaining();
}
}
}
處理網(wǎng)絡(luò)可讀事件
processReadEvent方法中處理了可讀事件,也就是處理Master節(jié)點(diǎn)發(fā)送的同步數(shù)據(jù), 首先從socketChannel中讀取數(shù)據(jù)到byteBufferRead中,byteBufferRead是讀緩沖區(qū),讀取數(shù)據(jù)的方法會(huì)返回讀取到的字節(jié)數(shù),對(duì)字節(jié)數(shù)大小進(jìn)行判斷:
- 如果可讀字節(jié)數(shù)大于0表示有數(shù)據(jù)需要處理,調(diào)用
dispatchReadRequest方法進(jìn)行處理; - 如果可讀字節(jié)數(shù)為0表示沒有可讀數(shù)據(jù),此時(shí)記錄讀取到空數(shù)據(jù)的次數(shù),如果連續(xù)讀到空數(shù)據(jù)的次數(shù)大于3次,將終止本次處理;
class HAClient extends ServiceThread {
// 讀緩沖區(qū),會(huì)將從socketChannel讀入緩沖區(qū)
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
while (this.byteBufferRead.hasRemaining()) {
try {
// 從socketChannel中讀取數(shù)據(jù)到byteBufferRead中,返回讀取到的字節(jié)數(shù)
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
// 重置readSizeZeroTimes
readSizeZeroTimes = 0;
// 處理數(shù)據(jù)
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
// 記錄讀取到空數(shù)據(jù)的次數(shù)
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
} catch (IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}
return true;
}
}
消息寫入ComitLog
dispatchReadRequest方法中會(huì)將從節(jié)點(diǎn)讀取到的數(shù)據(jù)寫入CommitLog,dispatchPosition記錄了已經(jīng)處理的數(shù)據(jù)在讀緩沖區(qū)中的位置,從讀緩沖區(qū)byteBufferRead獲取剩余可讀取的字節(jié)數(shù),如果可讀數(shù)據(jù)的字節(jié)數(shù)大于一個(gè)消息頭的字節(jié)數(shù)(12個(gè)字節(jié)),表示有數(shù)據(jù)還未處理完畢,反之表示消息已經(jīng)處理完畢結(jié)束處理。
對(duì)數(shù)據(jù)的處理邏輯如下:
- 從緩沖區(qū)中讀取數(shù)據(jù),首先獲取到的是消息在master節(jié)點(diǎn)的物理偏移量masterPhyOffset;
- 向后讀取8個(gè)字節(jié),得到消息體內(nèi)容的字節(jié)數(shù)bodySize;
- 獲取從節(jié)點(diǎn)當(dāng)前CommitLog的最大物理偏移量slavePhyOffset,如果不為0并且不等于masterPhyOffset,表示與Master節(jié)點(diǎn)的傳輸偏移量不一致,也就是數(shù)據(jù)不一致,此時(shí)終止處理;
- 如果可讀取的字節(jié)數(shù)大于一個(gè)消息頭的字節(jié)數(shù) + 消息體大小,表示有消息可處理,繼續(xù)進(jìn)行下一步;
- 計(jì)算消息體在讀緩沖區(qū)中的起始位置,從讀緩沖區(qū)中根據(jù)起始位置,讀取消息內(nèi)容,將消息追加到從節(jié)點(diǎn)的CommitLog中;
- 更新dispatchPosition的值為消息頭大小 + 消息體大小,dispatchPosition之前的數(shù)據(jù)表示已經(jīng)處理完畢;
![]()
class HAClient extends ServiceThread {
// 已經(jīng)處理的數(shù)據(jù)在讀緩沖區(qū)中的位置,初始化為0
private int dispatchPosition = 0;
// 讀緩沖區(qū)
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private boolean dispatchReadRequest() {
// 消息頭大小
final int msgHeaderSize = 8 + 4; // phyoffset + size
// 開啟循環(huán)不斷讀取數(shù)據(jù)
while (true) {
// 獲可讀取的字節(jié)數(shù)
int diff = this.byteBufferRead.position() - this.dispatchPosition;
// 如果字節(jié)數(shù)大于一個(gè)消息頭的字節(jié)數(shù)
if (diff >= msgHeaderSize) {
// 獲取消息在master節(jié)點(diǎn)的物理偏移量
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
// 獲取消息體大小
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
// 獲取從節(jié)點(diǎn)當(dāng)前CommitLog的最大物理偏移量
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
if (slavePhyOffset != 0) {
// 如果不一致結(jié)束處理
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ slavePhyOffset + " MASTER: " + masterPhyOffset);
return false;
}
}
// 如果可讀取的字節(jié)數(shù)大于一個(gè)消息頭的字節(jié)數(shù) + 消息體大小
if (diff >= (msgHeaderSize + bodySize)) {
// 將度緩沖區(qū)的數(shù)據(jù)轉(zhuǎn)為字節(jié)數(shù)組
byte[] bodyData = byteBufferRead.array();
// 計(jì)算消息體在讀緩沖區(qū)中的起始位置
int dataStart = this.dispatchPosition + msgHeaderSize;
// 從讀緩沖區(qū)中根據(jù)消息的位置,讀取消息內(nèi)容,將消息追加到從節(jié)點(diǎn)的CommitLog中
HAService.this.defaultMessageStore.appendToCommitLog(
masterPhyOffset, bodyData, dataStart, bodySize);
// 更新dispatchPosition的值為消息頭大小+消息體大小
this.dispatchPosition += msgHeaderSize + bodySize;
if (!reportSlaveMaxOffsetPlus()) {
return false;
}
continue;
}
}
if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}
break;
}
return true;
}
}
HAConnection
HAConnection中封裝了Master節(jié)點(diǎn)與從節(jié)點(diǎn)的網(wǎng)絡(luò)通信處理,分別在ReadSocketService和WriteSocketService中。
ReadSocketService
ReadSocketService啟動(dòng)后處理監(jiān)聽到的可讀事件,前面知道HAClient中從節(jié)點(diǎn)會(huì)定時(shí)向Master節(jié)點(diǎn)匯報(bào)從節(jié)點(diǎn)的消息同步偏移量,Master節(jié)點(diǎn)對(duì)匯報(bào)請(qǐng)求的處理就在這里,如果從網(wǎng)絡(luò)中監(jiān)聽到了可讀事件,會(huì)調(diào)用processReadEvent處理讀事件:
public class HAConnection {
class ReadSocketService extends ServiceThread {
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
// 處理可讀事件
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}
// ...
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
// ...
HAConnection.log.info(this.getServiceName() + " service end");
}
}
}
處理可讀事件
processReadEvent中從網(wǎng)絡(luò)中處理讀事件的方式與上面HAClient的dispatchReadRequest類似,都是將網(wǎng)絡(luò)中的數(shù)據(jù)讀取到讀緩沖區(qū)中,并用一個(gè)變量記錄已讀取數(shù)據(jù)的位置,processReadEvent方法的處理邏輯如下:
- 從socketChannel讀取數(shù)據(jù)到讀緩沖區(qū)byteBufferRead中,返回讀取到的字節(jié)數(shù);
- 如果讀取到的字節(jié)數(shù)大于0,進(jìn)入下一步,如果讀取到的字節(jié)數(shù)為0,記錄連續(xù)讀取到空字節(jié)數(shù)的次數(shù)是否超過三次,如果超過終止處理;
- 判斷剩余可讀取的字節(jié)數(shù)是否大于等于8,前面知道,從節(jié)點(diǎn)發(fā)送同步消息拉取偏移量的時(shí)候設(shè)置的字節(jié)大小為8,所以字節(jié)數(shù)大于等于8的時(shí)候表示需要讀取從節(jié)點(diǎn)發(fā)送的偏移量;
- 計(jì)算數(shù)據(jù)在緩沖區(qū)中的位置,從緩沖區(qū)讀取從節(jié)點(diǎn)發(fā)送的同步偏移量readOffset;
- 更新processPosition的值,processPosition表示讀緩沖區(qū)中已經(jīng)處理數(shù)據(jù)的位置;
- 更新slaveAckOffset為從節(jié)點(diǎn)發(fā)送的同步偏移量readOffset的值;
- 如果當(dāng)前Master節(jié)點(diǎn)記錄的從節(jié)點(diǎn)的同步偏移量slaveRequestOffset小于0,表示還未進(jìn)行同步,此時(shí)將slaveRequestOffset更新為從節(jié)點(diǎn)發(fā)送的同步偏移量;
- 如果從節(jié)點(diǎn)發(fā)送的同步偏移量比當(dāng)前Master節(jié)點(diǎn)的最大物理偏移量還要大,終止本次處理;
- 調(diào)用notifyTransferSome,更新Master節(jié)點(diǎn)記錄的向從節(jié)點(diǎn)同步消息的偏移量;
public class HAConnection {
class ReadSocketService extends ServiceThread {
// 讀緩沖區(qū)
private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
// 讀緩沖區(qū)中已經(jīng)處理的數(shù)據(jù)位置
private int processPosition = 0;
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
// 如果沒有可讀數(shù)據(jù)
if (!this.byteBufferRead.hasRemaining()) {
this.byteBufferRead.flip();
// 處理位置置為0
this.processPosition = 0;
}
// 如果數(shù)據(jù)未讀取完畢
while (this.byteBufferRead.hasRemaining()) {
try {
// 從socketChannel讀取數(shù)據(jù)到byteBufferRead中,返回讀取到的字節(jié)數(shù)
int readSize = this.socketChannel.read(this.byteBufferRead);
// 如果讀取數(shù)據(jù)字節(jié)數(shù)大于0
if (readSize > 0) {
// 重置readSizeZeroTimes
readSizeZeroTimes = 0;
// 獲取上次處理讀事件的時(shí)間戳
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
// 判斷剩余可讀取的字節(jié)數(shù)是否大于等于8
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
// 獲取偏移量?jī)?nèi)容的結(jié)束位置
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
// 從結(jié)束位置向前讀取8個(gè)字節(jié)得到從點(diǎn)發(fā)送的同步偏移量
long readOffset = this.byteBufferRead.getLong(pos - 8);
// 更新處理位置
this.processPosition = pos;
// 更新slaveAckOffset為從節(jié)點(diǎn)發(fā)送的同步進(jìn)度
HAConnection.this.slaveAckOffset = readOffset;
// 如果記錄的從節(jié)點(diǎn)的同步進(jìn)度小于0,表示還未進(jìn)行同步
if (HAConnection.this.slaveRequestOffset < 0) {
// 更新為從節(jié)點(diǎn)發(fā)送的同步進(jìn)度
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
} else if (HAConnection.this.slaveAckOffset > HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset()) {
// 如果從節(jié)點(diǎn)發(fā)送的拉取偏移量比當(dāng)前Master節(jié)點(diǎn)的最大物理偏移量還要大
log.warn("slave[{}] request offset={} greater than local commitLog offset={}. ",
HAConnection.this.clientAddr,
HAConnection.this.slaveAckOffset,
HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset());
return false;
}
// 更新Master節(jié)點(diǎn)記錄的向從節(jié)點(diǎn)同步消息的偏移量
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
} else if (readSize == 0)
// 判斷連續(xù)讀取到空數(shù)據(jù)的次數(shù)是否超過三次
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
return false;
}
} catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}
return true;
}
}
}
前面在GroupTransferService中可以看到是通過push2SlaveMaxOffset的值判斷本次同步是否完成的,在notifyTransferSome方法中可以看到當(dāng)Master節(jié)點(diǎn)收到從節(jié)點(diǎn)反饋的消息拉取偏移量時(shí),對(duì)push2SlaveMaxOffset的值進(jìn)行了更新:
public class HAService {
// 向從節(jié)點(diǎn)推送的消息最大偏移量
private final GroupTransferService groupTransferService;
public void notifyTransferSome(final long offset) {
// 如果傳入的偏移大于push2SlaveMaxOffset記錄的值,進(jìn)行更新
for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
// 更新向從節(jié)點(diǎn)推送的消息最大偏移量
boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
if (ok) {
this.groupTransferService.notifyTransferSome();
break;
} else {
value = this.push2SlaveMaxOffset.get();
}
}
}
}
WriteSocketService
WriteSocketService用于Master節(jié)點(diǎn)向從節(jié)點(diǎn)發(fā)送同步消息,處理邏輯如下:
-
根據(jù)從節(jié)點(diǎn)發(fā)送的主從同步消息拉取偏移量
slaveRequestOffset進(jìn)行判斷:- 如果
slaveRequestOffset值為-1,表示還未收到從節(jié)點(diǎn)報(bào)告的同步偏移量,此時(shí)睡眠一段時(shí)間等待從節(jié)點(diǎn)發(fā)送消息拉取偏移量; - 如果
slaveRequestOffset值不為-1,表示已經(jīng)開始進(jìn)行主從同步進(jìn)行下一步;
- 如果
-
判斷
nextTransferFromWhere值是否為-1,nextTransferFromWhere記錄了下次需要傳輸?shù)南⒃贑ommitLog中的偏移量,如果值為-1表示初次進(jìn)行數(shù)據(jù)同步,此時(shí)有兩種情況:- 如果從節(jié)點(diǎn)發(fā)送的拉取偏移量slaveRequestOffset為0,就從當(dāng)前CommitLog文件最大偏移量開始同步;
- 如果slaveRequestOffset不為0,則從slaveRequestOffset位置處進(jìn)行數(shù)據(jù)同步;
-
判斷上次寫事件是否已經(jīng)將數(shù)據(jù)都寫入到從節(jié)點(diǎn)
- 如果已經(jīng)寫入完畢,判斷距離上次寫入數(shù)據(jù)的時(shí)間間隔是否超過了設(shè)置的心跳時(shí)間,如果超過,為了避免連接空閑被關(guān)閉,需要發(fā)送一個(gè)心跳包,此時(shí)構(gòu)建心跳包的請(qǐng)求數(shù)據(jù),調(diào)用transferData方法傳輸數(shù)據(jù);
- 如果上次的數(shù)據(jù)還未傳輸完畢,調(diào)用transferData方法繼續(xù)傳輸,如果還是未完成,則結(jié)束此處處理;
-
根據(jù)nextTransferFromWhere從CommitLog中獲取消息,如果未獲取到消息,等待100ms,如果獲取到消息,從CommitLog中獲取消息進(jìn)行傳輸:
(1)如果獲取到消息的字節(jié)數(shù)大于最大傳輸?shù)拇笮。O(shè)置最最大傳輸數(shù)量,分批進(jìn)行傳輸;
(2)更新下次傳輸?shù)钠屏康刂芬簿褪莕extTransferFromWhere的值;
(3)從CommitLog中獲取的消息內(nèi)容設(shè)置到將讀取到的消息數(shù)據(jù)設(shè)置到selectMappedBufferResult中;
(4)設(shè)置消息頭信息,包括消息頭字節(jié)數(shù)、拉取消息的偏移量等;
(5)調(diào)用transferData發(fā)送數(shù)據(jù);
public class HAConnection {
class WriteSocketService extends ServiceThread {
private final int headerSize = 8 + 4;// 消息頭大小
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
// 如果slaveRequestOffset為-1,表示還未收到從節(jié)點(diǎn)報(bào)告的拉取進(jìn)度
if (-1 == HAConnection.this.slaveRequestOffset) {
// 等待一段時(shí)間
Thread.sleep(10);
continue;
}
// 初次進(jìn)行數(shù)據(jù)同步
if (-1 == this.nextTransferFromWhere) {
// 如果拉取進(jìn)度為0
if (0 == HAConnection.this.slaveRequestOffset) {
// 從master節(jié)點(diǎn)最大偏移量從開始傳輸
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMappedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
// 更新nextTransferFromWhere
this.nextTransferFromWhere = masterOffset;
} else {
// 根據(jù)從節(jié)點(diǎn)發(fā)送的偏移量開始數(shù)據(jù)同步
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+ "], and slave request " + HAConnection.this.slaveRequestOffset);
}
// 判斷上次傳輸是否完畢
if (this.lastWriteOver) {
// 獲取當(dāng)前時(shí)間距離上次寫入數(shù)據(jù)的時(shí)間間隔
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
// 如果距離上次寫入數(shù)據(jù)的時(shí)間間隔超過了設(shè)置的心跳時(shí)間
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getHaSendHeartbeatInterval()) {
// 構(gòu)建header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();
// 發(fā)送心跳包
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
} else {
// 未傳輸完畢,繼續(xù)上次的傳輸
this.lastWriteOver = this.transferData();
// 如果依舊未完成,結(jié)束本次處理
if (!this.lastWriteOver)
continue;
}
// 根據(jù)偏移量獲取消息數(shù)據(jù)
SelectMappedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {// 獲取消息不為空
// 獲取消息內(nèi)容大小
int size = selectResult.getSize();
// 如果消息的字節(jié)數(shù)大于最大傳輸?shù)拇笮? if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
// 設(shè)置為最大傳輸大小
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
long thisOffset = this.nextTransferFromWhere;
// 更新下次傳輸?shù)钠屏康刂? this.nextTransferFromWhere += size;
selectResult.getByteBuffer().limit(size);
// 將讀取到的消息數(shù)據(jù)設(shè)置到selectMappedBufferResult
this.selectMappedBufferResult = selectResult;
// 設(shè)置消息頭
this.byteBufferHeader.position(0);
// 設(shè)置消息頭大小
this.byteBufferHeader.limit(headerSize);
// 設(shè)置偏移量地址
this.byteBufferHeader.putLong(thisOffset);
// 設(shè)置消息內(nèi)容大小
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();
// 發(fā)送數(shù)據(jù)
this.lastWriteOver = this.transferData();
} else {
// 等待100ms
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();
// ...
HAConnection.log.info(this.getServiceName() + " service end");
}
}
}
發(fā)送數(shù)據(jù)
transferData方法的處理邏輯如下:
- 發(fā)送消息頭數(shù)據(jù);
- 消息頭數(shù)據(jù)發(fā)送完畢之后,發(fā)送消息內(nèi)容,前面知道從CommitLog中讀取的消息內(nèi)容放入到了selectMappedBufferResult,將selectMappedBufferResult的內(nèi)容發(fā)送給從節(jié)點(diǎn);
public class HAConnection {
class WriteSocketService extends ServiceThread {
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
// 寫入消息頭
while (this.byteBufferHeader.hasRemaining()) {
// 發(fā)送消息頭數(shù)據(jù)
int writeSize = this.socketChannel.write(this.byteBufferHeader);
if (writeSize > 0) {
writeSizeZeroTimes = 0;
// 記錄發(fā)送時(shí)間
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write header error < 0");
}
}
if (null == this.selectMappedBufferResult) {
return !this.byteBufferHeader.hasRemaining();
}
writeSizeZeroTimes = 0;
// 消息頭數(shù)據(jù)發(fā)送完畢之后,發(fā)送消息內(nèi)容
if (!this.byteBufferHeader.hasRemaining()) {
while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
// 發(fā)送消息內(nèi)容
int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write body error < 0");
}
}
}
// ...
return result;
}
}
}
總結(jié)
主從同步流程

有新消息寫入之后的同步流程

參考
丁威、周繼鋒《RocketMQ技術(shù)內(nèi)幕》
RocketMQ版本:4.9.3


浙公網(wǎng)安備 33010602011771號(hào)