<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      【RocketMQ】【源碼】主從同步實(shí)現(xiàn)原理

      主從同步的實(shí)現(xiàn)邏輯主要在HAService中,在DefaultMessageStore的構(gòu)造函數(shù)中,對(duì)HAService進(jìn)行了實(shí)例化,并在start方法中,啟動(dòng)了HAService

      public class DefaultMessageStore implements MessageStore {
          public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
              final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
              // ...
              if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                  // 初始化HAService
                  this.haService = new HAService(this);
              } else {
                  this.haService = null;
              }
              // ...
          }
      
          public void start() throws Exception {
              // ...
              if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                  // 啟動(dòng)HAService
                  this.haService.start();
                  this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
              }
              // ...
          }
      }
      

      HAService的構(gòu)造函數(shù)中,創(chuàng)建了AcceptSocketServiceGroupTransferServiceHAClient,在start方法中主要做了如下幾件事:

      1. 調(diào)用AcceptSocketService的beginAccept方法,這一步主要是進(jìn)行端口綁定,在端口上監(jiān)聽從節(jié)點(diǎn)的連接請(qǐng)求(可以看做是運(yùn)行在master節(jié)點(diǎn)的);
      2. 調(diào)用AcceptSocketService的start方法啟動(dòng)服務(wù),這一步主要為了處理從節(jié)點(diǎn)的連接請(qǐng)求,與從節(jié)點(diǎn)建立連接(可以看做是運(yùn)行在master節(jié)點(diǎn)的);
      3. 調(diào)用GroupTransferService的start方法,主要用于在主從同步的時(shí)候,等待數(shù)據(jù)傳輸完畢(可以看做是運(yùn)行在master節(jié)點(diǎn)的);
      4. 調(diào)用HAClient的start方法啟動(dòng),里面與master節(jié)點(diǎn)建立連接,向master匯報(bào)主從同步進(jìn)度并存儲(chǔ)master發(fā)送過來(lái)的同步數(shù)據(jù)(可以看做是運(yùn)行在從節(jié)點(diǎn)的);
      public class HAService {
           public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
              this.defaultMessageStore = defaultMessageStore;
              // 創(chuàng)建AcceptSocketService
              this.acceptSocketService =
                  new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
              this.groupTransferService = new GroupTransferService();
              // 創(chuàng)建HAClient
              this.haClient = new HAClient();
          }
      
          public void start() throws Exception {
              // 開始監(jiān)聽從服務(wù)器的連接
              this.acceptSocketService.beginAccept();
              // 啟動(dòng)服務(wù)
              this.acceptSocketService.start();
              // 啟動(dòng)GroupTransferService
              this.groupTransferService.start();
              // 啟動(dòng)
              this.haClient.start();
          }
      }
      

      監(jiān)聽從節(jié)點(diǎn)連接請(qǐng)求

      AcceptSocketServicebeginAccept方法里面首先獲取了ServerSocketChannel,然后進(jìn)行端口綁定,并在selector上面注冊(cè)了OP_ACCEPT事件的監(jiān)聽,監(jiān)聽從節(jié)點(diǎn)的連接請(qǐng)求:

      public class HAService {
          class AcceptSocketService extends ServiceThread {
              /**
               * 監(jiān)聽從節(jié)點(diǎn)的連接
               *
               * @throws Exception If fails.
               */
              public void beginAccept() throws Exception {
                  // 創(chuàng)建ServerSocketChannel
                  this.serverSocketChannel = ServerSocketChannel.open();
                  // 獲取selector
                  this.selector = RemotingUtil.openSelector();
                  this.serverSocketChannel.socket().setReuseAddress(true);
                  // 綁定端口
                  this.serverSocketChannel.socket().bind(this.socketAddressListen);
                  // 設(shè)置非阻塞
                  this.serverSocketChannel.configureBlocking(false);
                  // 注冊(cè)O(shè)P_ACCEPT連接事件的監(jiān)聽
                  this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
              }
          }
      }
      

      處理從節(jié)點(diǎn)連接請(qǐng)求

      AcceptSocketService的run方法中,對(duì)監(jiān)聽到的連接請(qǐng)求進(jìn)行了處理,處理邏輯大致如下:

      1. 從selector中獲取到監(jiān)聽到的事件;
      2. 如果是OP_ACCEPT連接事件,創(chuàng)建與從節(jié)點(diǎn)的連接對(duì)象HAConnection,與從節(jié)點(diǎn)建立連接,然后調(diào)用HAConnection的start方法進(jìn)行啟動(dòng),并創(chuàng)建的HAConnection對(duì)象加入到連接集合中,HAConnection中封裝了Master節(jié)點(diǎn)和從節(jié)點(diǎn)的數(shù)據(jù)同步邏輯
      public class HAService {
          class AcceptSocketService extends ServiceThread {
              @Override
              public void run() {
                  log.info(this.getServiceName() + " service started");
                  // 如果服務(wù)未停止
                  while (!this.isStopped()) {
                      try {
                          this.selector.select(1000);
                          // 獲取監(jiān)聽到的事件
                          Set<SelectionKey> selected = this.selector.selectedKeys();
                          // 處理事件
                          if (selected != null) {
                              for (SelectionKey k : selected) {
                                  // 如果是連接事件
                                  if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                                      SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
                                      if (sc != null) {
                                          HAService.log.info("HAService receive new connection, "
                                              + sc.socket().getRemoteSocketAddress());
                                          try {
                                              // 創(chuàng)建HAConnection,建立連接
                                              HAConnection conn = new HAConnection(HAService.this, sc);
                                              // 啟動(dòng)
                                              conn.start();
                                              // 添加連接
                                              HAService.this.addConnection(conn);
                                          } catch (Exception e) {
                                              log.error("new HAConnection exception", e);
                                              sc.close();
                                          }
                                      }
                                  } else {
                                      log.warn("Unexpected ops in select " + k.readyOps());
                                  }
                              }
                              selected.clear();
                          }
                      } catch (Exception e) {
                          log.error(this.getServiceName() + " service has exception.", e);
                      }
                  }
                  log.info(this.getServiceName() + " service end");
              }
          }
      }
      

      等待主從復(fù)制傳輸結(jié)束

      GroupTransferService的run方法主要是為了在進(jìn)行主從數(shù)據(jù)同步的時(shí)候,等待從節(jié)點(diǎn)數(shù)據(jù)同步完畢。

      在運(yùn)行時(shí)首先進(jìn)會(huì)調(diào)用waitForRunning進(jìn)行等待,因?yàn)榇藭r(shí)可能還有沒有開始主從同步,所以先進(jìn)行等待,之后如果有同步請(qǐng)求,會(huì)喚醒該線程,然后調(diào)用doWaitTransfer方法等待數(shù)據(jù)同步完成:

      public class HAService {
          class GroupTransferService extends ServiceThread {
      
               public void run() {
                  log.info(this.getServiceName() + " service started");
                  // 如果服務(wù)未停止
                  while (!this.isStopped()) {
                      try {
                          // 等待運(yùn)行
                          this.waitForRunning(10);
                          // 如果被喚醒,調(diào)用doWaitTransfer等待主從同步完成
                          this.doWaitTransfer();
                      } catch (Exception e) {
                          log.warn(this.getServiceName() + " service has exception. ", e);
                      }
                  }
      
                  log.info(this.getServiceName() + " service end");
              }
          }
      }
      

      在看doWaitTransfer方法之前,首先看下是如何判斷有數(shù)據(jù)需要同步的。

      Master節(jié)點(diǎn)中,當(dāng)消息被寫入到CommitLog以后,會(huì)調(diào)用submitReplicaRequest方法處主從同步,首先判斷當(dāng)前Broker的角色是否是SYNC_MASTER,如果是則會(huì)構(gòu)建消息提交請(qǐng)求GroupCommitRequest,然后調(diào)用HAServiceputRequest添加到請(qǐng)求集合中,并喚醒GroupTransferService中在等待的線程:

      public class CommitLog {
          public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
              if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
                  HAService service = this.defaultMessageStore.getHaService();
                  if (messageExt.isWaitStoreMsgOK()) {
                      if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
                          // 構(gòu)建GroupCommitRequest
                          GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                                  this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout());
                          // 添加請(qǐng)求
                          service.putRequest(request);
                          // 喚醒GroupTransferService中在等待的線程
                          service.getWaitNotifyObject().wakeupAll();
                          return request.future();
                      }
                      else {
                          return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                      }
                  }
              }
              return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
          }
      }
      

      doWaitTransfer方法中,會(huì)判斷CommitLog提交請(qǐng)求集合requestsRead是否為空,如果不為空,表示有消息寫入了CommitLog,Master節(jié)點(diǎn)需要等待將數(shù)據(jù)傳輸給從節(jié)點(diǎn):

      1. push2SlaveMaxOffset記錄了從節(jié)點(diǎn)已經(jīng)同步的消息偏移量,判斷push2SlaveMaxOffset是否大于本次CommitLog提交的偏移量,也就是請(qǐng)求中設(shè)置的偏移量;
      2. 獲取請(qǐng)求中設(shè)置的等待截止時(shí)間;
      3. 開啟循環(huán),判斷數(shù)據(jù)是否還未傳輸完畢,并且未超過截止時(shí)間,如果是則等待1s,然后繼續(xù)判斷傳輸是否完畢,不斷進(jìn)行,直到超過截止時(shí)間或者數(shù)據(jù)已經(jīng)傳輸完畢;
        (向從節(jié)點(diǎn)發(fā)送的消息最大偏移量push2SlaveMaxOffset超過了請(qǐng)求中設(shè)置的偏移量表示本次同步數(shù)據(jù)傳輸完畢);
      4. 喚醒在等待數(shù)據(jù)同步完畢的線程;
      public class HAService {
          // CommitLog提交請(qǐng)求集合
          private volatile LinkedList<CommitLog.GroupCommitRequest> requestsRead = new LinkedList<>();
      
          class GroupTransferService extends ServiceThread {
      
              private void doWaitTransfer() {
                  // 如果CommitLog提交請(qǐng)求集合不為空
                  if (!this.requestsRead.isEmpty()) {
                      // 處理消息提交請(qǐng)求
                      for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                          // 判斷傳輸?shù)綇墓?jié)點(diǎn)最大偏移量是否超過了請(qǐng)求中設(shè)置的偏移量
                          boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                          // 獲取截止時(shí)間
                          long deadLine = req.getDeadLine();
                          // 如果從節(jié)點(diǎn)還未同步完畢并且未超過截止時(shí)間
                          while (!transferOK && deadLine - System.nanoTime() > 0) {
                              // 等待
                              this.notifyTransferObject.waitForRunning(1000);
                              // 判斷從節(jié)點(diǎn)同步的最大偏移量是否超過了請(qǐng)求中設(shè)置的偏移量
                              transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                          }
                          // 喚醒
                          req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                      }
      
                      this.requestsRead = new LinkedList<>();
                  }
              }
          }
      }
      

      啟動(dòng)HAClient

      HAClient可以看做是在從節(jié)點(diǎn)上運(yùn)行的,主要進(jìn)行的處理如下:

      1. 調(diào)用connectMaster方法連接Master節(jié)點(diǎn),Master節(jié)點(diǎn)上也會(huì)運(yùn)行,但是它本身就是Master沒有可連的Master節(jié)點(diǎn),所以可以忽略;
      2. 調(diào)用isTimeToReportOffset方法判斷是否需要向Master節(jié)點(diǎn)匯報(bào)同步偏移量,如果需要?jiǎng)t調(diào)用reportSlaveMaxOffset方法將當(dāng)前的消息同步偏移量發(fā)送給Master節(jié)點(diǎn);
      3. 調(diào)用processReadEvent處理網(wǎng)絡(luò)請(qǐng)求中的可讀事件,也就是處理Master發(fā)送過來(lái)的消息,將消息存入CommitLog
      public class HAService {
          class HAClient extends ServiceThread {
         
              @Override
              public void run() {
                  log.info(this.getServiceName() + " service started");
                  while (!this.isStopped()) {
                      try {
                          // 連接Master節(jié)點(diǎn)
                          if (this.connectMaster()) {
                              // 是否需要報(bào)告消息同步偏移量
                              if (this.isTimeToReportOffset()) {
                                  // 向Master節(jié)點(diǎn)發(fā)送同步偏移量
                                  boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                                  if (!result) {
                                      this.closeMaster();
                                  }
                              }
                              // 等待事件產(chǎn)生
                              this.selector.select(1000);
                              // 處理讀事件,也就是Master節(jié)點(diǎn)發(fā)送的數(shù)據(jù)
                              boolean ok = this.processReadEvent();
                              if (!ok) {
                                  this.closeMaster();
                              }
                              // ...
                          } else {
                              this.waitForRunning(1000 * 5);
                          }
                      } catch (Exception e) {
                          log.warn(this.getServiceName() + " service has exception. ", e);
                          this.waitForRunning(1000 * 5);
                      }
                  }
      
                  log.info(this.getServiceName() + " service end");
              }
          }
      }
      

      連接主節(jié)點(diǎn)

      connectMaster方法中會(huì)獲取Master節(jié)點(diǎn)的地址,并轉(zhuǎn)換為SocketAddress對(duì)象,然后向Master節(jié)點(diǎn)請(qǐng)求建立連接,并在selector注冊(cè)O(shè)P_READ可讀事件監(jiān)聽:

      public class HAService {
          class HAClient extends ServiceThread {
          // 當(dāng)前的主從復(fù)制進(jìn)度
          private long currentReportedOffset = 0;
      
          private boolean connectMaster() throws ClosedChannelException {
              if (null == socketChannel) {
                  String addr = this.masterAddress.get();
                  if (addr != null) {
                      // 將地址轉(zhuǎn)為SocketAddress
                      SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
                      if (socketAddress != null) {
                          // 連接master
                          this.socketChannel = RemotingUtil.connect(socketAddress);
                          if (this.socketChannel != null) {
                              // 注冊(cè)O(shè)P_READ可讀事件監(jiān)聽
                              this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                          }
                      }
                  }
                  // 獲取CommitLog中當(dāng)前最大的偏移量
                  this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
                  // 更新上次寫入時(shí)間
                  this.lastWriteTimestamp = System.currentTimeMillis();
              }
              return this.socketChannel != null;
          }
      }
      

      發(fā)送主從同步消息拉取偏移量

      isTimeToReportOffset方法中,首先獲取當(dāng)前時(shí)間與上一次進(jìn)行主從同步的時(shí)間間隔interval,如果時(shí)間間隔interval大于配置的發(fā)送心跳時(shí)間間隔,表示需要向Master節(jié)點(diǎn)發(fā)送從節(jié)點(diǎn)消息同步的偏移量,接下來(lái)會(huì)調(diào)用reportSlaveMaxOffset方法發(fā)送同步偏移量,也就是說(shuō)從節(jié)點(diǎn)會(huì)定時(shí)向Master節(jié)點(diǎn)發(fā)送請(qǐng)求,反饋CommitLog中同步消息的偏移量

      public class HAService {
          class HAClient extends ServiceThread {
             // 當(dāng)前從節(jié)點(diǎn)已經(jīng)同步消息的偏移量大小
             private long currentReportedOffset = 0;
      
             private boolean isTimeToReportOffset() {
                  // 獲取距離上一次主從同步的間隔時(shí)間
                  long interval =
                      HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
                  // 判斷是否超過了配置的發(fā)送心跳包時(shí)間間隔
                  boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
                      .getHaSendHeartbeatInterval();
      
                  return needHeart;
              }
      
              // 發(fā)送同步偏移量,傳入的參數(shù)是當(dāng)前的主從復(fù)制偏移量currentReportedOffset
              private boolean reportSlaveMaxOffset(final long maxOffset) {
                  this.reportOffset.position(0);
                  this.reportOffset.limit(8); // 設(shè)置數(shù)據(jù)傳輸大小為8個(gè)字節(jié)
                  this.reportOffset.putLong(maxOffset);// 設(shè)置同步偏移量
                  this.reportOffset.position(0);
                  this.reportOffset.limit(8);
      
                  for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
                      try {
                          // 向Master節(jié)點(diǎn)發(fā)送拉取偏移量
                          this.socketChannel.write(this.reportOffset);
                      } catch (IOException e) {
                          log.error(this.getServiceName()
                              + "reportSlaveMaxOffset this.socketChannel.write exception", e);
                          return false;
                      }
                  }
                  // 更新發(fā)送時(shí)間
                  lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
                  return !this.reportOffset.hasRemaining();
              }
          }
      }
      

      處理網(wǎng)絡(luò)可讀事件

      processReadEvent方法中處理了可讀事件,也就是處理Master節(jié)點(diǎn)發(fā)送的同步數(shù)據(jù), 首先從socketChannel中讀取數(shù)據(jù)到byteBufferRead中,byteBufferRead是讀緩沖區(qū),讀取數(shù)據(jù)的方法會(huì)返回讀取到的字節(jié)數(shù),對(duì)字節(jié)數(shù)大小進(jìn)行判斷:

      • 如果可讀字節(jié)數(shù)大于0表示有數(shù)據(jù)需要處理,調(diào)用dispatchReadRequest方法進(jìn)行處理;
      • 如果可讀字節(jié)數(shù)為0表示沒有可讀數(shù)據(jù),此時(shí)記錄讀取到空數(shù)據(jù)的次數(shù),如果連續(xù)讀到空數(shù)據(jù)的次數(shù)大于3次,將終止本次處理;
        class HAClient extends ServiceThread {
              // 讀緩沖區(qū),會(huì)將從socketChannel讀入緩沖區(qū)
              private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
      
              private boolean processReadEvent() {
                  int readSizeZeroTimes = 0;
                  while (this.byteBufferRead.hasRemaining()) {
                      try {
                          // 從socketChannel中讀取數(shù)據(jù)到byteBufferRead中,返回讀取到的字節(jié)數(shù)
                          int readSize = this.socketChannel.read(this.byteBufferRead);
                          if (readSize > 0) {
                              // 重置readSizeZeroTimes
                              readSizeZeroTimes = 0;
                              // 處理數(shù)據(jù)
                              boolean result = this.dispatchReadRequest();
                              if (!result) {
                                  log.error("HAClient, dispatchReadRequest error");
                                  return false;
                              }
                          } else if (readSize == 0) {
                              // 記錄讀取到空數(shù)據(jù)的次數(shù)
                              if (++readSizeZeroTimes >= 3) {
                                  break;
                              }
                          } else {
                              log.info("HAClient, processReadEvent read socket < 0");
                              return false;
                          }
                      } catch (IOException e) {
                          log.info("HAClient, processReadEvent read socket exception", e);
                          return false;
                      }
                  }
      
                  return true;
              }
        }
      
      消息寫入ComitLog

      dispatchReadRequest方法中會(huì)將從節(jié)點(diǎn)讀取到的數(shù)據(jù)寫入CommitLog,dispatchPosition記錄了已經(jīng)處理的數(shù)據(jù)在讀緩沖區(qū)中的位置,從讀緩沖區(qū)byteBufferRead獲取剩余可讀取的字節(jié)數(shù),如果可讀數(shù)據(jù)的字節(jié)數(shù)大于一個(gè)消息頭的字節(jié)數(shù)(12個(gè)字節(jié)),表示有數(shù)據(jù)還未處理完畢,反之表示消息已經(jīng)處理完畢結(jié)束處理。
      對(duì)數(shù)據(jù)的處理邏輯如下:

      1. 從緩沖區(qū)中讀取數(shù)據(jù),首先獲取到的是消息在master節(jié)點(diǎn)的物理偏移量masterPhyOffset;
      2. 向后讀取8個(gè)字節(jié),得到消息體內(nèi)容的字節(jié)數(shù)bodySize;
      3. 獲取從節(jié)點(diǎn)當(dāng)前CommitLog的最大物理偏移量slavePhyOffset,如果不為0并且不等于masterPhyOffset,表示與Master節(jié)點(diǎn)的傳輸偏移量不一致,也就是數(shù)據(jù)不一致,此時(shí)終止處理
      4. 如果可讀取的字節(jié)數(shù)大于一個(gè)消息頭的字節(jié)數(shù) + 消息體大小,表示有消息可處理,繼續(xù)進(jìn)行下一步;
      5. 計(jì)算消息體在讀緩沖區(qū)中的起始位置,從讀緩沖區(qū)中根據(jù)起始位置,讀取消息內(nèi)容,將消息追加到從節(jié)點(diǎn)的CommitLog中
      6. 更新dispatchPosition的值為消息頭大小 + 消息體大小,dispatchPosition之前的數(shù)據(jù)表示已經(jīng)處理完畢;
          class HAClient extends ServiceThread {
              // 已經(jīng)處理的數(shù)據(jù)在讀緩沖區(qū)中的位置,初始化為0
              private int dispatchPosition = 0;
              // 讀緩沖區(qū)
              private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
      
              private boolean dispatchReadRequest() {
                  // 消息頭大小
                  final int msgHeaderSize = 8 + 4; // phyoffset + size
                  // 開啟循環(huán)不斷讀取數(shù)據(jù)
                  while (true) {
                      // 獲可讀取的字節(jié)數(shù)
                      int diff = this.byteBufferRead.position() - this.dispatchPosition;
                      // 如果字節(jié)數(shù)大于一個(gè)消息頭的字節(jié)數(shù)
                      if (diff >= msgHeaderSize) {
                          // 獲取消息在master節(jié)點(diǎn)的物理偏移量
                          long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
                          // 獲取消息體大小
                          int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
                          // 獲取從節(jié)點(diǎn)當(dāng)前CommitLog的最大物理偏移量
                          long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
                          if (slavePhyOffset != 0) {
                              // 如果不一致結(jié)束處理
                              if (slavePhyOffset != masterPhyOffset) {
                                  log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                                      + slavePhyOffset + " MASTER: " + masterPhyOffset);
                                  return false;
                              }
                          }
                          // 如果可讀取的字節(jié)數(shù)大于一個(gè)消息頭的字節(jié)數(shù) + 消息體大小
                          if (diff >= (msgHeaderSize + bodySize)) {
                              // 將度緩沖區(qū)的數(shù)據(jù)轉(zhuǎn)為字節(jié)數(shù)組
                              byte[] bodyData = byteBufferRead.array();
                              // 計(jì)算消息體在讀緩沖區(qū)中的起始位置
                              int dataStart = this.dispatchPosition + msgHeaderSize;
                              // 從讀緩沖區(qū)中根據(jù)消息的位置,讀取消息內(nèi)容,將消息追加到從節(jié)點(diǎn)的CommitLog中
                              HAService.this.defaultMessageStore.appendToCommitLog(
                                      masterPhyOffset, bodyData, dataStart, bodySize);
                              // 更新dispatchPosition的值為消息頭大小+消息體大小
                              this.dispatchPosition += msgHeaderSize + bodySize;
                              if (!reportSlaveMaxOffsetPlus()) {
                                  return false;
                              }
                              continue;
                          }
                      }
                      if (!this.byteBufferRead.hasRemaining()) {
                          this.reallocateByteBuffer();
                      }
      
                      break;
                  }
      
                  return true;
              }
          }
      

      HAConnection

      HAConnection中封裝了Master節(jié)點(diǎn)與從節(jié)點(diǎn)的網(wǎng)絡(luò)通信處理,分別在ReadSocketServiceWriteSocketService中。

      ReadSocketService

      ReadSocketService啟動(dòng)后處理監(jiān)聽到的可讀事件,前面知道HAClient中從節(jié)點(diǎn)會(huì)定時(shí)向Master節(jié)點(diǎn)匯報(bào)從節(jié)點(diǎn)的消息同步偏移量,Master節(jié)點(diǎn)對(duì)匯報(bào)請(qǐng)求的處理就在這里,如果從網(wǎng)絡(luò)中監(jiān)聽到了可讀事件,會(huì)調(diào)用processReadEvent處理讀事件:

      public class HAConnection {
           class ReadSocketService extends ServiceThread {
              @Override
              public void run() {
                  HAConnection.log.info(this.getServiceName() + " service started");
                  while (!this.isStopped()) {
                      try {
                          this.selector.select(1000);
                          // 處理可讀事件
                          boolean ok = this.processReadEvent();
                          if (!ok) {
                              HAConnection.log.error("processReadEvent error");
                              break;
                          }
                          // ...
                      } catch (Exception e) {
                          HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                          break;
                      }
                  }
                  // ...
                  HAConnection.log.info(this.getServiceName() + " service end");
              }
           }
      }
      

      處理可讀事件

      processReadEvent中從網(wǎng)絡(luò)中處理讀事件的方式與上面HAClientdispatchReadRequest類似,都是將網(wǎng)絡(luò)中的數(shù)據(jù)讀取到讀緩沖區(qū)中,并用一個(gè)變量記錄已讀取數(shù)據(jù)的位置,processReadEvent方法的處理邏輯如下:

      1. 從socketChannel讀取數(shù)據(jù)到讀緩沖區(qū)byteBufferRead中,返回讀取到的字節(jié)數(shù);
      2. 如果讀取到的字節(jié)數(shù)大于0,進(jìn)入下一步,如果讀取到的字節(jié)數(shù)為0,記錄連續(xù)讀取到空字節(jié)數(shù)的次數(shù)是否超過三次,如果超過終止處理;
      3. 判斷剩余可讀取的字節(jié)數(shù)是否大于等于8,前面知道,從節(jié)點(diǎn)發(fā)送同步消息拉取偏移量的時(shí)候設(shè)置的字節(jié)大小為8,所以字節(jié)數(shù)大于等于8的時(shí)候表示需要讀取從節(jié)點(diǎn)發(fā)送的偏移量;
      4. 計(jì)算數(shù)據(jù)在緩沖區(qū)中的位置,從緩沖區(qū)讀取從節(jié)點(diǎn)發(fā)送的同步偏移量readOffset;
      5. 更新processPosition的值,processPosition表示讀緩沖區(qū)中已經(jīng)處理數(shù)據(jù)的位置;
      6. 更新slaveAckOffset為從節(jié)點(diǎn)發(fā)送的同步偏移量readOffset的值;
      7. 如果當(dāng)前Master節(jié)點(diǎn)記錄的從節(jié)點(diǎn)的同步偏移量slaveRequestOffset小于0,表示還未進(jìn)行同步,此時(shí)將slaveRequestOffset更新為從節(jié)點(diǎn)發(fā)送的同步偏移量;
      8. 如果從節(jié)點(diǎn)發(fā)送的同步偏移量比當(dāng)前Master節(jié)點(diǎn)的最大物理偏移量還要大,終止本次處理;
      9. 調(diào)用notifyTransferSome,更新Master節(jié)點(diǎn)記錄的向從節(jié)點(diǎn)同步消息的偏移量;
      public class HAConnection {
      
           class ReadSocketService extends ServiceThread {
           // 讀緩沖區(qū)    
           private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
           // 讀緩沖區(qū)中已經(jīng)處理的數(shù)據(jù)位置
           private int processPosition = 0;
      
           private boolean processReadEvent() {
                  int readSizeZeroTimes = 0;
                  // 如果沒有可讀數(shù)據(jù)
                  if (!this.byteBufferRead.hasRemaining()) {
                      this.byteBufferRead.flip();
                      // 處理位置置為0
                      this.processPosition = 0;
                  }
                  // 如果數(shù)據(jù)未讀取完畢
                  while (this.byteBufferRead.hasRemaining()) {
                      try {
                          // 從socketChannel讀取數(shù)據(jù)到byteBufferRead中,返回讀取到的字節(jié)數(shù)
                          int readSize = this.socketChannel.read(this.byteBufferRead);
                          // 如果讀取數(shù)據(jù)字節(jié)數(shù)大于0
                          if (readSize > 0) {
                              // 重置readSizeZeroTimes
                              readSizeZeroTimes = 0;
                              // 獲取上次處理讀事件的時(shí)間戳
                              this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                              // 判斷剩余可讀取的字節(jié)數(shù)是否大于等于8
                              if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
                                  // 獲取偏移量?jī)?nèi)容的結(jié)束位置
                                  int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                                  // 從結(jié)束位置向前讀取8個(gè)字節(jié)得到從點(diǎn)發(fā)送的同步偏移量
                                  long readOffset = this.byteBufferRead.getLong(pos - 8);
                                  // 更新處理位置
                                  this.processPosition = pos;
                                  // 更新slaveAckOffset為從節(jié)點(diǎn)發(fā)送的同步進(jìn)度
                                  HAConnection.this.slaveAckOffset = readOffset;
                                  // 如果記錄的從節(jié)點(diǎn)的同步進(jìn)度小于0,表示還未進(jìn)行同步
                                  if (HAConnection.this.slaveRequestOffset < 0) {
                                      // 更新為從節(jié)點(diǎn)發(fā)送的同步進(jìn)度
                                      HAConnection.this.slaveRequestOffset = readOffset;
                                      log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
                                  } else if (HAConnection.this.slaveAckOffset > HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset()) {
                                      // 如果從節(jié)點(diǎn)發(fā)送的拉取偏移量比當(dāng)前Master節(jié)點(diǎn)的最大物理偏移量還要大
                                      log.warn("slave[{}] request offset={} greater than local commitLog offset={}. ",
                                              HAConnection.this.clientAddr,
                                              HAConnection.this.slaveAckOffset,
                                              HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset());
                                      return false;
                                  }
                                  // 更新Master節(jié)點(diǎn)記錄的向從節(jié)點(diǎn)同步消息的偏移量
                                  HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                              }
                          } else if (readSize == 0) 
                              // 判斷連續(xù)讀取到空數(shù)據(jù)的次數(shù)是否超過三次
                              if (++readSizeZeroTimes >= 3) {
                                  break;
                              }
                          } else {
                              log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
                              return false;
                          }
                      } catch (IOException e) {
                          log.error("processReadEvent exception", e);
                          return false;
                      }
                  }
      
                  return true;
              }
          }
      }
      

      前面在GroupTransferService中可以看到是通過push2SlaveMaxOffset的值判斷本次同步是否完成的,在notifyTransferSome方法中可以看到當(dāng)Master節(jié)點(diǎn)收到從節(jié)點(diǎn)反饋的消息拉取偏移量時(shí),對(duì)push2SlaveMaxOffset的值進(jìn)行了更新:

      public class HAService {
          // 向從節(jié)點(diǎn)推送的消息最大偏移量
          private final GroupTransferService groupTransferService;
      
          public void notifyTransferSome(final long offset) {
              // 如果傳入的偏移大于push2SlaveMaxOffset記錄的值,進(jìn)行更新
              for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
                  // 更新向從節(jié)點(diǎn)推送的消息最大偏移量
                  boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
                  if (ok) {
                      this.groupTransferService.notifyTransferSome();
                      break;
                  } else {
                      value = this.push2SlaveMaxOffset.get();
                  }
              }
          }
      }
      

      WriteSocketService

      WriteSocketService用于Master節(jié)點(diǎn)向從節(jié)點(diǎn)發(fā)送同步消息,處理邏輯如下:

      1. 根據(jù)從節(jié)點(diǎn)發(fā)送的主從同步消息拉取偏移量slaveRequestOffset進(jìn)行判斷:

        • 如果slaveRequestOffset值為-1,表示還未收到從節(jié)點(diǎn)報(bào)告的同步偏移量,此時(shí)睡眠一段時(shí)間等待從節(jié)點(diǎn)發(fā)送消息拉取偏移量;
        • 如果slaveRequestOffset值不為-1,表示已經(jīng)開始進(jìn)行主從同步進(jìn)行下一步;
      2. 判斷nextTransferFromWhere值是否為-1,nextTransferFromWhere記錄了下次需要傳輸?shù)南⒃贑ommitLog中的偏移量,如果值為-1表示初次進(jìn)行數(shù)據(jù)同步,此時(shí)有兩種情況:

        • 如果從節(jié)點(diǎn)發(fā)送的拉取偏移量slaveRequestOffset為0,就從當(dāng)前CommitLog文件最大偏移量開始同步;
        • 如果slaveRequestOffset不為0,則從slaveRequestOffset位置處進(jìn)行數(shù)據(jù)同步;
      3. 判斷上次寫事件是否已經(jīng)將數(shù)據(jù)都寫入到從節(jié)點(diǎn)

        • 如果已經(jīng)寫入完畢,判斷距離上次寫入數(shù)據(jù)的時(shí)間間隔是否超過了設(shè)置的心跳時(shí)間,如果超過,為了避免連接空閑被關(guān)閉,需要發(fā)送一個(gè)心跳包,此時(shí)構(gòu)建心跳包的請(qǐng)求數(shù)據(jù),調(diào)用transferData方法傳輸數(shù)據(jù);
        • 如果上次的數(shù)據(jù)還未傳輸完畢,調(diào)用transferData方法繼續(xù)傳輸,如果還是未完成,則結(jié)束此處處理;
      4. 根據(jù)nextTransferFromWhere從CommitLog中獲取消息,如果未獲取到消息,等待100ms,如果獲取到消息,從CommitLog中獲取消息進(jìn)行傳輸:
        (1)如果獲取到消息的字節(jié)數(shù)大于最大傳輸?shù)拇笮。O(shè)置最最大傳輸數(shù)量,分批進(jìn)行傳輸;
        (2)更新下次傳輸?shù)钠屏康刂芬簿褪莕extTransferFromWhere的值;
        (3)從CommitLog中獲取的消息內(nèi)容設(shè)置到將讀取到的消息數(shù)據(jù)設(shè)置到selectMappedBufferResult中;
        (4)設(shè)置消息頭信息,包括消息頭字節(jié)數(shù)、拉取消息的偏移量等;
        (5)調(diào)用transferData發(fā)送數(shù)據(jù);

      public class HAConnection {
          class WriteSocketService extends ServiceThread {
              private final int headerSize = 8 + 4;// 消息頭大小
              @Override
              public void run() {
                  HAConnection.log.info(this.getServiceName() + " service started");
                  while (!this.isStopped()) {
                      try {
                          this.selector.select(1000);
                          // 如果slaveRequestOffset為-1,表示還未收到從節(jié)點(diǎn)報(bào)告的拉取進(jìn)度
                          if (-1 == HAConnection.this.slaveRequestOffset) {
                              // 等待一段時(shí)間
                              Thread.sleep(10);
                              continue;
                          }
                          // 初次進(jìn)行數(shù)據(jù)同步
                          if (-1 == this.nextTransferFromWhere) {
                              // 如果拉取進(jìn)度為0
                              if (0 == HAConnection.this.slaveRequestOffset) {
                                  // 從master節(jié)點(diǎn)最大偏移量從開始傳輸
                                  long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                                  masterOffset =
                                      masterOffset
                                          - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                                          .getMappedFileSizeCommitLog());
      
                                  if (masterOffset < 0) {
                                      masterOffset = 0;
                                  }
                                  // 更新nextTransferFromWhere
                                  this.nextTransferFromWhere = masterOffset;
                              } else {
                                  // 根據(jù)從節(jié)點(diǎn)發(fā)送的偏移量開始數(shù)據(jù)同步
                                  this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
                              }
      
                              log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
                                  + "], and slave request " + HAConnection.this.slaveRequestOffset);
                          }
                          // 判斷上次傳輸是否完畢
                          if (this.lastWriteOver) {
                              // 獲取當(dāng)前時(shí)間距離上次寫入數(shù)據(jù)的時(shí)間間隔
                              long interval =
                                  HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
                              // 如果距離上次寫入數(shù)據(jù)的時(shí)間間隔超過了設(shè)置的心跳時(shí)間
                              if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                                  .getHaSendHeartbeatInterval()) {
                                  // 構(gòu)建header
                                  this.byteBufferHeader.position(0);
                                  this.byteBufferHeader.limit(headerSize);
                                  this.byteBufferHeader.putLong(this.nextTransferFromWhere);
                                  this.byteBufferHeader.putInt(0);
                                  this.byteBufferHeader.flip();
                                  // 發(fā)送心跳包
                                  this.lastWriteOver = this.transferData();
                                  if (!this.lastWriteOver)
                                      continue;
                              }
                          } else {
                              // 未傳輸完畢,繼續(xù)上次的傳輸
                              this.lastWriteOver = this.transferData();
                              // 如果依舊未完成,結(jié)束本次處理
                              if (!this.lastWriteOver)
                                  continue;
                          }
                          // 根據(jù)偏移量獲取消息數(shù)據(jù)
                          SelectMappedBufferResult selectResult =
                              HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
                          if (selectResult != null) {// 獲取消息不為空
                              // 獲取消息內(nèi)容大小
                              int size = selectResult.getSize();
                              // 如果消息的字節(jié)數(shù)大于最大傳輸?shù)拇笮?                        if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                                  // 設(shè)置為最大傳輸大小
                                  size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
                              }
      
                              long thisOffset = this.nextTransferFromWhere;
                              // 更新下次傳輸?shù)钠屏康刂?                        this.nextTransferFromWhere += size;
      
                              selectResult.getByteBuffer().limit(size);
                              // 將讀取到的消息數(shù)據(jù)設(shè)置到selectMappedBufferResult
                              this.selectMappedBufferResult = selectResult;
      
                              // 設(shè)置消息頭
                              this.byteBufferHeader.position(0);
                              // 設(shè)置消息頭大小
                              this.byteBufferHeader.limit(headerSize);
                              // 設(shè)置偏移量地址
                              this.byteBufferHeader.putLong(thisOffset);
                              // 設(shè)置消息內(nèi)容大小
                              this.byteBufferHeader.putInt(size);
                              this.byteBufferHeader.flip();
                              // 發(fā)送數(shù)據(jù)
                              this.lastWriteOver = this.transferData();
                          } else {
                              // 等待100ms
                              HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
                          }
                      } catch (Exception e) {
      
                          HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                          break;
                      }
                  }
      
                  HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();
      
                  // ...
                  HAConnection.log.info(this.getServiceName() + " service end");
              }
          }
      }
      

      發(fā)送數(shù)據(jù)

      transferData方法的處理邏輯如下:

      1. 發(fā)送消息頭數(shù)據(jù);
      2. 消息頭數(shù)據(jù)發(fā)送完畢之后,發(fā)送消息內(nèi)容,前面知道從CommitLog中讀取的消息內(nèi)容放入到了selectMappedBufferResult,將selectMappedBufferResult的內(nèi)容發(fā)送給從節(jié)點(diǎn);
      public class HAConnection {
          class WriteSocketService extends ServiceThread {
              private boolean transferData() throws Exception {
                  int writeSizeZeroTimes = 0;
                  // 寫入消息頭
                  while (this.byteBufferHeader.hasRemaining()) {
                      // 發(fā)送消息頭數(shù)據(jù)
                      int writeSize = this.socketChannel.write(this.byteBufferHeader);
                      if (writeSize > 0) {
                          writeSizeZeroTimes = 0;
                          // 記錄發(fā)送時(shí)間
                          this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                      } else if (writeSize == 0) {
                          if (++writeSizeZeroTimes >= 3) {
                              break;
                          }
                      } else {
                          throw new Exception("ha master write header error < 0");
                      }
                  }
      
                  if (null == this.selectMappedBufferResult) {
                      return !this.byteBufferHeader.hasRemaining();
                  }
      
                  writeSizeZeroTimes = 0;
      
                  // 消息頭數(shù)據(jù)發(fā)送完畢之后,發(fā)送消息內(nèi)容
                  if (!this.byteBufferHeader.hasRemaining()) {
                      while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                          // 發(fā)送消息內(nèi)容
                          int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
                          if (writeSize > 0) {
                              writeSizeZeroTimes = 0;
                              this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                          } else if (writeSize == 0) {
                              if (++writeSizeZeroTimes >= 3) {
                                  break;
                              }
                          } else {
                              throw new Exception("ha master write body error < 0");
                          }
                      }
                  }
                  // ...
                  return result;
              }
          }
      }
      

      總結(jié)

      主從同步流程

      有新消息寫入之后的同步流程

      參考
      丁威、周繼鋒《RocketMQ技術(shù)內(nèi)幕》

      RocketMQ版本:4.9.3

      posted @ 2022-12-05 09:05  shanml  閱讀(978)  評(píng)論(0)    收藏  舉報(bào)
      主站蜘蛛池模板: gogo无码大胆啪啪艺术| 丝袜老师办公室里做好紧好爽| 坐盗市亚洲综合一二三区| 极品无码人妻巨屁股系列| 国产绿帽在线视频看| 国产自产对白一区| 国产精品一区二区日韩精品| 亚洲av男人电影天堂热app| 云南省| 丰满无码人妻热妇无码区| 国产乱码精品一区二区麻豆| 蜜桃av无码免费看永久| 制服 丝袜 亚洲 中文 综合| 蜜臀av一区二区三区精品| 精品国产大片中文字幕| 久久国产精品无码网站| 欧美精品国产综合久久| 一区二区三区国产综合在线| 国产成人女人在线观看| 亚洲卡1卡2卡3精品| 久久精品国产免费观看频道| 亚洲欧美在线综合一区二区三区| 亚洲一区成人在线视频| 成在人线av无码免费看网站直播 | 国产初高中生粉嫩无套第一次| 忍着娇喘人妻被中出中文字幕| 国产成人精品亚洲午夜| 亚洲男人电影天堂无码| 中文字幕在线视频不卡一区二区 | 日本高清一区免费中文视频| 国产亚洲精品综合一区二区| 国产精品毛片av999999| 男女一级国产片免费视频| 亚洲av中文一区二区| 操操操综合网| 精品熟女少妇免费久久| 久久精品熟女亚洲av艳妇| 五月丁香综合缴情六月小说| 大地资源免费视频观看| 成年午夜免费韩国做受视频| 华人在线亚洲欧美精品|