<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      RocketMQ主從同步原理

      一. 主從同步概述

      主從同步這個概念相信大家在平時的工作中,多少都會聽到。其目的主要是用于做一備份類操作,以及一些讀寫分離場景。比如我們常用的關系型數(shù)據(jù)庫mysql,就有主從同步功能在。

      主從同步,就是將主服務器上的數(shù)據(jù)同步到從服務器上,也就是相當于新增了一個副本。

      而具體的主從同步的實現(xiàn)也各有千秋,如mysql中通過binlog實現(xiàn)主從同步,es中通過translog實現(xiàn)主從同步,redis中通過aof實現(xiàn)主從同步。那么,rocketmq又是如何實現(xiàn)的主從同步呢?

      另外,主從同步需要考慮的問題是哪些呢?

      1. 數(shù)據(jù)同步的及時性?(延遲與一致性)
      2. 對主服務器的影響性?(可用性)
      3. 是否可替代主服務器?(可用性或者分區(qū)容忍性)

      前面兩個點是必須要考慮的,但對于第3個點,則可能不會被考慮。因為通過系統(tǒng)可能無法很好的做到這一點,所以很多系統(tǒng)就直接忽略這一點了,簡單嘛。即很多時候只把從服務器當作是一個備份存在,不會接受寫請求。如果要進行主從切換,必須要人工介入,做預知的有損切換。但隨著技術的發(fā)展,現(xiàn)在已有非常多的自動切換主從的服務存在,這是在分布式系統(tǒng)滿天下的當今的必然趨勢。

      二. RocketMQ如何配置主從同步

      在 RocketMQ 中,最核心的組件是 broker, 它負責幾乎所有的存儲讀取業(yè)務。所以,要談主從同步,那必然是針對broker進行的。我們再來回看 RocketMQ 的部署架構圖,以便全局觀察:

      非常清晰的架構,無需多言。因為我們講的是主從同步,所以只看broker這個組件,那么整個架構就可以簡化為: BrokerMaster -> BrokerSlave 了。同樣,再簡化,主從同步就是如何將Master的數(shù)據(jù)同步到Slave這么個過程。

      那么如何配置主從同步呢?

      2.1 BrokerMaster配置

      創(chuàng)建配置文件 conf/broker-a.properties

      #所屬集群名字
      brokerClusterName=DefaultCluster
      #broker名字,名字可重復。相同名字的broker構成一組主從節(jié)點(每個master起一個名字,它的slave同他,eg:Amaster叫broker-a,他的slave也叫broker-a)
      brokerName=broker-a
      #節(jié)點ID,0 表示 Master 節(jié)點,大于 0 表示 Slave 節(jié)點
      brokerId=0
      #Broker 的角色
      #- ASYNC_MASTER 異步復制Master
      #- SYNC_MASTER 同步雙寫Master
      #- SLAVE
      brokerRole=ASYNC_MASTER
      #刷盤方式
      #- ASYNC_FLUSH 異步刷盤
      #- SYNC_FLUSH 同步刷盤
      flushDiskType=ASYNC_FLUSH
      #nameServer地址,分號分割
      namesrvAddr=172.0.1.5:9876;172.0.1.6:9876
      

      2.2 BrokerSlave配置

      創(chuàng)建配置文件 conf/broker-a-s.properties

      #所屬集群名字
      brokerClusterName=DefaultCluster
      #broker名字,和Master節(jié)點保持一致
      brokerName=broker-a
      #節(jié)點ID,大于1表示這是Slave節(jié)點
      brokerId=1
      #Broker 的角色
      #- ASYNC_MASTER 異步復制Master
      #- SYNC_MASTER 同步雙寫Master
      #- SLAVE
      brokerRole=SLAVE
      #刷盤方式
      #- ASYNC_FLUSH 異步刷盤
      #- SYNC_FLUSH 同步刷盤
      flushDiskType=ASYNC_FLUSH
      #nameServer地址,分號分割
      namesrvAddr=172.0.1.5:9876;172.0.1.6:9876
      

      配置項看起來有點多,但核心實際上只有兩個:

      1. 在保持 brokderName 相同的前提下配置 brokerRole=ASYNC_MASTER|SLAVE|SYNC_MASTER,通過這個節(jié)點就能知道主從同步復制模式。
      2. 在保持 brokderName 相同的前提下配置 Master節(jié)點的 brokerId 配置為0,Slave節(jié)點配置為大于0,通過這個值就可以確定是主是從。

      具體配置文件叫什么名字不重要,重要的是要在啟動時指定指定對應的配置文件位置即可。啟動master/slave命令如下:

      # 啟動Master節(jié)點
      nohup sh /usr/local/rocketmq/bin/mqbroker -c conf/broker-a.properties > logs/broker-a.log 2>&1 &
      
      # 啟動Slave節(jié)點
      nohup sh /usr/local/rocketmq/bin/mqbroker -c conf/broker-a-s.properties > logs/broker-a-s.log 2>&1 &
      

      三. 源碼分析

      主從同步的實現(xiàn)邏輯主要在HAService中,在DefaultMessageStore的構造函數(shù)中,對HAService進行了實例化,并在start方法中,啟動了HAService

      public class DefaultMessageStore implements MessageStore {
          public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
              final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
              // ...
              if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                  // 初始化HAService
                  this.haService = new HAService(this);
              } else {
                  this.haService = null;
              }
              // ...
          }
      
          public void start() throws Exception {
              // ...
              if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                  // 啟動HAService
                  this.haService.start();
                  // 主節(jié)點啟動延遲消息實現(xiàn),從節(jié)點關閉Slave消息實現(xiàn),主節(jié)點將延遲消息從延遲隊列轉移到真實的topic隊列時,也會由主從同步同步至從節(jié)點,
                  // 所以不需要從節(jié)點開啟延遲隊列
                  this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
              }
              // ...
          }
      }
      

      HAService的構造函數(shù)中,創(chuàng)建了AcceptSocketServiceGroupTransferServiceHAClient,在start方法中主要做了如下幾件事:

      1. 調用 AcceptSocketServicebeginAccept 方法,這一步主要是進行端口綁定,在端口上監(jiān)聽從節(jié)點的連接請求(可以看做是運行在master節(jié)點的);
      2. 調用 AcceptSocketServicestart 方法啟動服務,這一步主要為了處理從節(jié)點的連接請求,與從節(jié)點建立連接(可以看做是運行在master節(jié)點的);
      3. 調用 GroupTransferServicestart 方法,主要用于在主從同步的時候,等待數(shù)據(jù)傳輸完畢(可以看做是運行在master節(jié)點的);
      4. 調用 HAClientstart 方法啟動,里面與master節(jié)點建立連接,向master匯報主從同步進度并存儲master發(fā)送過來的同步數(shù)據(jù)(可以看做是運行在從節(jié)點的);
      public class HAService {
           public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
              this.defaultMessageStore = defaultMessageStore;
              // 創(chuàng)建AcceptSocketService
              this.acceptSocketService =
                  new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
              this.groupTransferService = new GroupTransferService();
              // 創(chuàng)建HAClient
              this.haClient = new HAClient();
          }
      
          public void start() throws Exception {
              // 開始監(jiān)聽從服務器的連接
              this.acceptSocketService.beginAccept();
              // 啟動服務
              this.acceptSocketService.start();
              // 啟動GroupTransferService
              this.groupTransferService.start();
              // 啟動
              this.haClient.start();
          }
      }
      

      3.1 監(jiān)聽從節(jié)點連接請求

      AcceptSocketServicebeginAccept 方法里面首先獲取了 ServerSocketChannel,然后進行端口綁定,并在selector上面注冊了OP_ACCEPT事件的監(jiān)聽,監(jiān)聽從節(jié)點的連接請求:

      public class HAService {
          class AcceptSocketService extends ServiceThread {
              /**
               * 監(jiān)聽從節(jié)點的連接
               *
               * @throws Exception If fails.
               */
              public void beginAccept() throws Exception {
                  // 創(chuàng)建ServerSocketChannel
                  this.serverSocketChannel = ServerSocketChannel.open();
                  // 獲取selector
                  this.selector = RemotingUtil.openSelector();
                  this.serverSocketChannel.socket().setReuseAddress(true);
                  // 綁定端口
                  this.serverSocketChannel.socket().bind(this.socketAddressListen);
                  // 設置非阻塞
                  this.serverSocketChannel.configureBlocking(false);
                  // 注冊OP_ACCEPT連接事件的監(jiān)聽
                  this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
              }
          }
      }
      

      3.2 處理從節(jié)點連接請求

      AcceptSocketService的run方法中,對監(jiān)聽到的連接請求進行了處理,處理邏輯大致如下:

      1. 從selector中獲取到監(jiān)聽到的事件;
      2. 如果是OP_ACCEPT連接事件,創(chuàng)建與從節(jié)點的連接對象HAConnection,與從節(jié)點建立連接,然后調用HAConnection的start方法進行啟動,并創(chuàng)建的HAConnection對象加入到連接集合中,HAConnection中封裝了Master節(jié)點和從節(jié)點的數(shù)據(jù)同步邏輯
      public class HAService {
          class AcceptSocketService extends ServiceThread {
              @Override
              public void run() {
                  log.info(this.getServiceName() + " service started");
                  // 如果服務未停止
                  while (!this.isStopped()) {
                      try {
                          this.selector.select(1000);
                          // 獲取監(jiān)聽到的事件
                          Set<SelectionKey> selected = this.selector.selectedKeys();
                          // 處理事件
                          if (selected != null) {
                              for (SelectionKey k : selected) {
                                  // 如果是連接事件
                                  if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                                      SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
                                      if (sc != null) {
                                          HAService.log.info("HAService receive new connection, "
                                              + sc.socket().getRemoteSocketAddress());
                                          try {
                                              // 創(chuàng)建HAConnection,建立連接
                                              HAConnection conn = new HAConnection(HAService.this, sc);
                                              // 啟動
                                              conn.start();
                                              // 添加連接
                                              HAService.this.addConnection(conn);
                                          } catch (Exception e) {
                                              log.error("new HAConnection exception", e);
                                              sc.close();
                                          }
                                      }
                                  } else {
                                      log.warn("Unexpected ops in select " + k.readyOps());
                                  }
                              }
                              selected.clear();
                          }
                      } catch (Exception e) {
                          log.error(this.getServiceName() + " service has exception.", e);
                      }
                  }
                  log.info(this.getServiceName() + " service end");
              }
          }
      }
      

      3.3 等待主從復制傳輸結束

      GroupTransferService的run方法主要是為了在進行主從數(shù)據(jù)同步的時候,等待從節(jié)點數(shù)據(jù)同步完畢。

      在運行時首先進會調用waitForRunning進行等待,因為此時可能還有沒有開始主從同步,所以先進行等待,之后如果有同步請求,會喚醒該線程,然后調用doWaitTransfer方法等待數(shù)據(jù)同步完成:

      public class HAService {
          class GroupTransferService extends ServiceThread {
      
               public void run() {
                  log.info(this.getServiceName() + " service started");
                  // 如果服務未停止
                  while (!this.isStopped()) {
                      try {
                          // 等待運行
                          this.waitForRunning(10);
                          // 如果被喚醒,調用doWaitTransfer等待主從同步完成
                          this.doWaitTransfer();
                      } catch (Exception e) {
                          log.warn(this.getServiceName() + " service has exception. ", e);
                      }
                  }
      
                  log.info(this.getServiceName() + " service end");
              }
          }
      }
      
      

      在看doWaitTransfer方法之前,首先看下是如何判斷有數(shù)據(jù)需要同步的。

      Master節(jié)點中,當消息被寫入到 CommitLog 以后,會調用 handleHA 方法處主從同步,首先判斷當前Broker的角色是否是SYNC_MASTER,如果是則會構建消息提交請求GroupCommitRequest,然后調用HAServiceputRequest添加到請求集合中,并喚醒GroupTransferService中在等待的線程:

      public class CommitLog {
          public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
              // 判斷是否是同步復制到從節(jié)點
              if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
                  HAService service = this.defaultMessageStore.getHaService();
                  // 如果是同步復制,也可以在發(fā)送消息時手動設置不等待同步成功就返回,這樣增加了同步復制場景下的靈活性。在小部分不需要的同步復制,
                  // 但是RocketMQ又配置為主從同步復制的場景下,也使得消息異步復制
                  if (messageExt.isWaitStoreMsgOK()) {
                      // Determine whether to wait
                      if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                          // 提交主從同步任務
                          GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                          // 添加請求
                          service.putRequest(request);
                          // 喚醒GroupTransferService中在等待的線程
                          service.getWaitNotifyObject().wakeupAll();
                          // 等待同步復制任務完成,異步同步線程復制完畢后會喚醒當前線程,超時時間默認為5s,如果超時則返回刷盤錯誤,刷盤成功后正常返回給調用方。
                          boolean flushOK =
                              request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                          if (!flushOK) {
                              log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                                  + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
                              putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                          }
                      }
                      // Slave problem
                      else {
                          // Tell the producer, slave not available
                          putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                      }
                  }
              }
      
          }
      }
      
      

      HAService.GroupCommitRequest.doWaitTransfer方法中,會判斷 CommitLog 提交請求集合 requestsRead 是否為空,如果不為空,表示有消息寫入了 CommitLog,Master節(jié)點需要等待將數(shù)據(jù)傳輸給從節(jié)點:

      1. push2SlaveMaxOffset 記錄了從節(jié)點已經(jīng)同步的消息偏移量,判斷 push2SlaveMaxOffset 是否大于本次 CommitLog 提交的偏移量,也就是請求中設置的偏移量;
      2. 獲取請求中設置的等待截止時間;
      3. 開啟循環(huán),判斷數(shù)據(jù)是否還未傳輸完畢,并且未超過截止時間,如果是則等待1s,然后繼續(xù)判斷傳輸是否完畢,不斷進行,直到超過截止時間或者數(shù)據(jù)已經(jīng)傳輸完畢;
        (向從節(jié)點發(fā)送的消息最大偏移量 push2SlaveMaxOffset 超過了請求中設置的偏移量表示本次同步數(shù)據(jù)傳輸完畢);
      4. 喚醒在等待數(shù)據(jù)同步完畢的線程;
      public class HAService {
          // CommitLog提交請求集合
          private volatile LinkedList<CommitLog.GroupCommitRequest> requestsRead = new LinkedList<>();
      
          class GroupTransferService extends ServiceThread {
      
              private void doWaitTransfer() {
                  synchronized (this.requestsRead) {
                      if (!this.requestsRead.isEmpty()) {
                          for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                              // 判斷主從同步是否完成,判斷傳輸?shù)綇墓?jié)點最大偏移量是否超過了請求中設置的偏移量
                              boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                              // 默認同步有5s的超時時間
                              long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
                                  + HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
                              // 如果從節(jié)點還未同步完畢并且未超過截止時間
                              while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
                                  // 等待
                                  this.notifyTransferObject.waitForRunning(1000);
                                  // 判斷從節(jié)點同步的最大偏移量是否超過了請求中設置的偏移量
                                  transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                              }
      
                              if (!transferOK) {
                                  log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
                              }
                              // 同步完成,喚醒消息發(fā)送處理線程
                              req.wakeupCustomer(transferOK);
                          }
      
                          this.requestsRead.clear();
                      }
                  }
              }
          }
      }
      

      3.4 啟動HAClient

      HAClient可以看做是在從節(jié)點上運行的,主要進行的處理如下:

      1. 調用connectMaster方法連接Master節(jié)點,Master節(jié)點上也會運行,但是它本身就是Master沒有可連的Master節(jié)點,所以可以忽略;
      2. 調用isTimeToReportOffset方法判斷是否需要向Master節(jié)點匯報同步偏移量,如果需要則調用reportSlaveMaxOffset方法將當前的消息同步偏移量發(fā)送給Master節(jié)點;
      3. 調用processReadEvent處理網(wǎng)絡請求中的可讀事件,也就是處理Master發(fā)送過來的消息,將消息存入CommitLog
      public class HAService {
          class HAClient extends ServiceThread {
         
              @Override
              public void run() {
                  log.info(this.getServiceName() + " service started");
                  while (!this.isStopped()) {
                      try {
                          // 連接Master節(jié)點
                          if (this.connectMaster()) {
                              // 是否需要報告消息同步偏移量
                              if (this.isTimeToReportOffset()) {
                                  // 向Master節(jié)點發(fā)送同步偏移量
                                  boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                                  if (!result) {
                                      this.closeMaster();
                                  }
                              }
                              // 等待事件產(chǎn)生
                              this.selector.select(1000);
                              // 處理讀事件,也就是Master節(jié)點發(fā)送的數(shù)據(jù)
                              boolean ok = this.processReadEvent();
                              if (!ok) {
                                  this.closeMaster();
                              }
                              // ...
                          } else {
                              this.waitForRunning(1000 * 5);
                          }
                      } catch (Exception e) {
                          log.warn(this.getServiceName() + " service has exception. ", e);
                          this.waitForRunning(1000 * 5);
                      }
                  }
      
                  log.info(this.getServiceName() + " service end");
              }
          }
      }
      
      3.4.1 連接主節(jié)點

      connectMaster 方法中會獲取Master節(jié)點的地址,并轉換為 SocketAddress 對象,然后向Master節(jié)點請求建立連接,并在selector注冊OP_READ可讀事件監(jiān)聽:

      public class HAService {
          class HAClient extends ServiceThread {
          // 當前的主從復制進度
          private long currentReportedOffset = 0;
      
          private boolean connectMaster() throws ClosedChannelException {
              // 如果socketChannel為空,則嘗試連接主服務器
              if (null == socketChannel) {
                  String addr = this.masterAddress.get();
                  if (addr != null) {
                      // 將地址轉為SocketAddress
                      SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
                      if (socketAddress != null) {
                          // 建立與Master的連接
                          this.socketChannel = RemotingUtil.connect(socketAddress);
                          if (this.socketChannel != null) {
                              // 注冊OP_READ(網(wǎng)絡讀事件)
                              this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                          }
                      }
                  }
      
                  // 獲取從節(jié)點本地的CommitLog最大偏移量
                  this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
                  // 更新上次心跳發(fā)送時間
                  this.lastWriteTimestamp = System.currentTimeMillis();
              }
      
              // 如果主服務器地址為空,返回false
              return this.socketChannel != null;
          }
      }
      
      
      3.4.2 發(fā)送主從同步消息拉取偏移量

      isTimeToReportOffset方法中,首先獲取當前時間與上一次進行主從同步的時間間隔interval,如果時間間隔interval大于配置的發(fā)送心跳時間間隔,表示需要向Master節(jié)點發(fā)送從節(jié)點消息同步的偏移量,接下來會調用reportSlaveMaxOffset方法發(fā)送同步偏移量,也就是說從節(jié)點會定時向Master節(jié)點發(fā)送請求,反饋CommitLog中同步消息的偏移量

      public class HAService {
          class HAClient extends ServiceThread {
             // 當前從節(jié)點已經(jīng)同步消息的偏移量大小
             private long currentReportedOffset = 0;
      
             private boolean isTimeToReportOffset() {
                  // 獲取距離上一次主從同步的間隔時間
                  long interval =
                      HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
                  // 判斷是否超過了配置的發(fā)送心跳包時間間隔
                  boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
                      .getHaSendHeartbeatInterval();
      
                  return needHeart;
              }
      
              // 發(fā)送同步偏移量,傳入的參數(shù)是當前的主從復制偏移量currentReportedOffset
              private boolean reportSlaveMaxOffset(final long maxOffset) {
                  this.reportOffset.position(0);
                  this.reportOffset.limit(8); // 設置數(shù)據(jù)傳輸大小為8個字節(jié)
                  this.reportOffset.putLong(maxOffset);// 設置同步偏移量
                  this.reportOffset.position(0);
                  this.reportOffset.limit(8);
      
                  for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
                      try {
                          // 向Master節(jié)點發(fā)送拉取偏移量
                          this.socketChannel.write(this.reportOffset);
                      } catch (IOException e) {
                          log.error(this.getServiceName()
                              + "reportSlaveMaxOffset this.socketChannel.write exception", e);
                          return false;
                      }
                  }
                  // 更新上次心跳發(fā)送時間
                  lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
                  return !this.reportOffset.hasRemaining();
              }
          }
      }
      
      3.4.3 處理網(wǎng)絡讀事件

      processReadEvent方法中處理了可讀事件,也就是處理Master節(jié)點發(fā)送的同步數(shù)據(jù),首先從 socketChannel 中讀取數(shù)據(jù)到byteBufferRead 中,byteBufferRead 是讀緩沖區(qū),讀取數(shù)據(jù)的方法會返回讀取到的字節(jié)數(shù),對字節(jié)數(shù)大小進行判斷:

      • 如果可讀字節(jié)數(shù)大于0表示有數(shù)據(jù)需要處理,調用dispatchReadRequest方法進行處理;
      • 如果可讀字節(jié)數(shù)為0表示沒有可讀數(shù)據(jù),此時記錄讀取到空數(shù)據(jù)的次數(shù),如果連續(xù)讀到空數(shù)據(jù)的次數(shù)大于3次,將終止本次處理;
        class HAClient extends ServiceThread {
              // 讀緩沖區(qū),會將從socketChannel讀入緩沖區(qū)
              private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
      
              private boolean processReadEvent() {
                  int readSizeZeroTimes = 0;
                  // 循環(huán)判斷readByteBuffer是否還有剩余空間,如果存在剩余空間,則調用
                  // SocketChannel#read(ByteBuffer readByteBuffer)方法,將通道中
                  // 的數(shù)據(jù)讀入讀緩存區(qū)。
                  while (this.byteBufferRead.hasRemaining()) {
                      try {
                          // 從socketChannel中讀取數(shù)據(jù)到byteBufferRead中,返回讀取到的字節(jié)數(shù)
                          int readSize = this.socketChannel.read(this.byteBufferRead);
                          if (readSize > 0) {
                              // 重置readSizeZeroTimes
                              readSizeZeroTimes = 0;
                                // 然后調用dispatchReadRequest方法將讀取到的所有消息全部追加到消息內存映射文件中,再次反饋拉取進度給主服務器。
                              boolean result = this.dispatchReadRequest();
                              if (!result) {
                                  log.error("HAClient, dispatchReadRequest error");
                                  return false;
                              }
                          } else if (readSize == 0) {
                              // 記錄讀取到空數(shù)據(jù)的次數(shù)
                              if (++readSizeZeroTimes >= 3) {
                                  break;
                              }
                          } else {
                              log.info("HAClient, processReadEvent read socket < 0");
                              return false;
                          }
                      } catch (IOException e) {
                          log.info("HAClient, processReadEvent read socket exception", e);
                          return false;
                      }
                  }
      
                  return true;
              }
        }
      

      dispatchReadRequest方法中會將從節(jié)點讀取到的數(shù)據(jù)寫入 CommitLogdispatchPosition記錄了已經(jīng)處理的數(shù)據(jù)在讀緩沖區(qū)中的位置,從讀緩沖區(qū)byteBufferRead獲取剩余可讀取的字節(jié)數(shù),如果可讀數(shù)據(jù)的字節(jié)數(shù)大于一個消息頭的字節(jié)數(shù)(12個字節(jié)),表示有數(shù)據(jù)還未處理完畢,反之表示消息已經(jīng)處理完畢結束處理。對數(shù)據(jù)的處理邏輯如下:

      1. 從緩沖區(qū)中讀取數(shù)據(jù),首先獲取到的是消息在master節(jié)點的物理偏移量masterPhyOffset;
      2. 向后讀取8個字節(jié),得到消息體內容的字節(jié)數(shù)bodySize;
      3. 獲取從節(jié)點當前 CommitLog 的最大物理偏移量 slavePhyOffset,如果不為0并且不等于 masterPhyOffset表示與Master節(jié)點的傳輸偏移量不一致,也就是數(shù)據(jù)不一致,此時終止處理
      4. 如果可讀取的字節(jié)數(shù)大于一個消息頭的字節(jié)數(shù) + 消息體大小,表示有消息可處理,繼續(xù)進行下一步;
      5. 計算消息體在讀緩沖區(qū)中的起始位置,從讀緩沖區(qū)中根據(jù)起始位置,讀取消息內容,將消息追加到從節(jié)點的CommitLog中
      6. 更新dispatchPosition的值為消息頭大小 + 消息體大小,dispatchPosition之前的數(shù)據(jù)表示已經(jīng)處理完畢;

          class HAClient extends ServiceThread {
              // 已經(jīng)處理的數(shù)據(jù)在讀緩沖區(qū)中的位置,初始化為0
              private int dispatchPosition = 0;
              // 讀緩沖區(qū)
              private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
      
              private boolean dispatchReadRequest() {
                  // 消息頭大小
                  final int msgHeaderSize = 8 + 4; // phyoffset + size
                  // 開啟循環(huán)不斷讀取數(shù)據(jù)
                  while (true) {
                      // 獲可讀取的字節(jié)數(shù)
                      int diff = this.byteBufferRead.position() - this.dispatchPosition;
                      // 如果字節(jié)數(shù)大于一個消息頭的字節(jié)數(shù)
                      if (diff >= msgHeaderSize) {
                          // 獲取消息在master節(jié)點的物理偏移量
                          long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
                          // 獲取消息體大小
                          int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
                          // 獲取從節(jié)點當前CommitLog的最大物理偏移量
                          long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
                          if (slavePhyOffset != 0) {
                              // 如果不一致結束處理
                              if (slavePhyOffset != masterPhyOffset) {
                                  log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                                      + slavePhyOffset + " MASTER: " + masterPhyOffset);
                                  return false;
                              }
                          }
                          // 如果可讀取的字節(jié)數(shù)大于一個消息頭的字節(jié)數(shù) + 消息體大小
                          if (diff >= (msgHeaderSize + bodySize)) {
                              // 將度緩沖區(qū)的數(shù)據(jù)轉為字節(jié)數(shù)組
                              byte[] bodyData = byteBufferRead.array();
                              // 計算消息體在讀緩沖區(qū)中的起始位置
                              int dataStart = this.dispatchPosition + msgHeaderSize;
                              // 從讀緩沖區(qū)中根據(jù)消息的位置,讀取消息內容,將消息追加到從節(jié)點的CommitLog中
                              HAService.this.defaultMessageStore.appendToCommitLog(
                                      masterPhyOffset, bodyData, dataStart, bodySize);
                              // 更新dispatchPosition的值為消息頭大小+消息體大小
                              this.dispatchPosition += msgHeaderSize + bodySize;
                              if (!reportSlaveMaxOffsetPlus()) {
                                  return false;
                              }
                              continue;
                          }
                      }
                      if (!this.byteBufferRead.hasRemaining()) {
                          this.reallocateByteBuffer();
                      }
      
                      break;
                  }
      
                  return true;
              }
          }
      

      3.5 HAConnection

      HAConnection中封裝了Master節(jié)點與從節(jié)點的網(wǎng)絡通信處理,分別在ReadSocketServiceWriteSocketService中。

      3.5.1 ReadSocketService

      ReadSocketService啟動后處理監(jiān)聽到的可讀事件,前面知道HAClient中從節(jié)點會定時向Master節(jié)點匯報從節(jié)點的消息同步偏移量,Master節(jié)點對匯報請求的處理就在這里,如果從網(wǎng)絡中監(jiān)聽到了可讀事件,會調用processReadEvent處理讀事件:

      public class HAConnection {
           class ReadSocketService extends ServiceThread {
              @Override
              public void run() {
                  HAConnection.log.info(this.getServiceName() + " service started");
                  while (!this.isStopped()) {
                      try {
                          this.selector.select(1000);
                          // 處理可讀事件
                          boolean ok = this.processReadEvent();
                          if (!ok) {
                              HAConnection.log.error("processReadEvent error");
                              break;
                          }
                          // ...
                      } catch (Exception e) {
                          HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                          break;
                      }
                  }
                  // ...
                  HAConnection.log.info(this.getServiceName() + " service end");
              }
           }
      }
      
      

      processReadEvent中從網(wǎng)絡中處理讀事件的方式與上面HAClientdispatchReadRequest類似,都是將網(wǎng)絡中的數(shù)據(jù)讀取到讀緩沖區(qū)中,并用一個變量記錄已讀取數(shù)據(jù)的位置,processReadEvent方法的處理邏輯如下:

      1. 從socketChannel讀取數(shù)據(jù)到讀緩沖區(qū)byteBufferRead中,返回讀取到的字節(jié)數(shù);
      2. 如果讀取到的字節(jié)數(shù)大于0,進入下一步,如果讀取到的字節(jié)數(shù)為0,記錄連續(xù)讀取到空字節(jié)數(shù)的次數(shù)是否超過三次,如果超過終止處理;
      3. 判斷剩余可讀取的字節(jié)數(shù)是否大于等于8,前面知道,從節(jié)點發(fā)送同步消息拉取偏移量的時候設置的字節(jié)大小為8,所以字節(jié)數(shù)大于等于8的時候表示需要讀取從節(jié)點發(fā)送的偏移量;
      4. 計算數(shù)據(jù)在緩沖區(qū)中的位置,從緩沖區(qū)讀取從節(jié)點發(fā)送的同步偏移量readOffset;
      5. 更新processPosition的值,processPosition表示讀緩沖區(qū)中已經(jīng)處理數(shù)據(jù)的位置;
      6. 更新slaveAckOffset為從節(jié)點發(fā)送的同步偏移量readOffset的值;
      7. 如果當前Master節(jié)點記錄的從節(jié)點的同步偏移量slaveRequestOffset小于0,表示還未進行同步,此時將slaveRequestOffset更新為從節(jié)點發(fā)送的同步偏移量;
      8. 如果從節(jié)點發(fā)送的同步偏移量比當前Master節(jié)點的最大物理偏移量還要大,終止本次處理;
      9. 調用notifyTransferSome,更新Master節(jié)點記錄的向從節(jié)點同步消息的偏移量;
      public class HAConnection {
      
           class ReadSocketService extends ServiceThread {
           // 讀緩沖區(qū)    
           private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
           // 讀緩沖區(qū)中已經(jīng)處理的數(shù)據(jù)位置
           private int processPosition = 0;
      
           private boolean processReadEvent() {
                  int readSizeZeroTimes = 0;
                  // 如果沒有可讀數(shù)據(jù)
                  if (!this.byteBufferRead.hasRemaining()) {
                      this.byteBufferRead.flip();
                      // 處理位置置為0
                      this.processPosition = 0;
                  }
                  // 如果數(shù)據(jù)未讀取完畢
                  while (this.byteBufferRead.hasRemaining()) {
                      try {
                          // 從socketChannel讀取數(shù)據(jù)到byteBufferRead中,返回讀取到的字節(jié)數(shù)
                          int readSize = this.socketChannel.read(this.byteBufferRead);
                          // 如果讀取數(shù)據(jù)字節(jié)數(shù)大于0
                          if (readSize > 0) {
                              // 重置readSizeZeroTimes
                              readSizeZeroTimes = 0;
                              // 獲取上次處理讀事件的時間戳
                              this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                              // 判斷剩余可讀取的字節(jié)數(shù)是否大于等于8
                              if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
                                  // 獲取偏移量內容的結束位置
                                  int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                                  // 從結束位置向前讀取8個字節(jié)得到從點發(fā)送的同步偏移量
                                  long readOffset = this.byteBufferRead.getLong(pos - 8);
                                  // 更新處理位置
                                  this.processPosition = pos;
                                  // 更新slaveAckOffset為從節(jié)點發(fā)送的同步進度
                                  HAConnection.this.slaveAckOffset = readOffset;
                                  // 如果記錄的從節(jié)點的同步進度小于0,表示還未進行同步
                                  if (HAConnection.this.slaveRequestOffset < 0) {
                                      // 更新為從節(jié)點發(fā)送的同步進度
                                      HAConnection.this.slaveRequestOffset = readOffset;
                                      log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
                                  } else if (HAConnection.this.slaveAckOffset > HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset()) {
                                      // 如果從節(jié)點發(fā)送的拉取偏移量比當前Master節(jié)點的最大物理偏移量還要大
                                      log.warn("slave[{}] request offset={} greater than local commitLog offset={}. ",
                                              HAConnection.this.clientAddr,
                                              HAConnection.this.slaveAckOffset,
                                              HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset());
                                      return false;
                                  }
                                  // 更新Master節(jié)點記錄的向從節(jié)點同步消息的偏移量
                                  HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                              }
                          } else if (readSize == 0) 
                              // 判斷連續(xù)讀取到空數(shù)據(jù)的次數(shù)是否超過三次
                              if (++readSizeZeroTimes >= 3) {
                                  break;
                              }
                          } else {
                              log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
                              return false;
                          }
                      } catch (IOException e) {
                          log.error("processReadEvent exception", e);
                          return false;
                      }
                  }
      
                  return true;
              }
          }
      }
      
      

      前面在 GroupTransferService 中可以看到是通過 push2SlaveMaxOffset 的值判斷本次同步是否完成的,在 notifyTransferSome 方法中可以看到當Master節(jié)點收到從節(jié)點反饋的消息拉取偏移量時,對 push2SlaveMaxOffset 的值進行了更新:

      public class HAService {
          // 向從節(jié)點推送的消息最大偏移量
          private final GroupTransferService groupTransferService;
      
          public void notifyTransferSome(final long offset) {
              // 如果傳入的偏移大于push2SlaveMaxOffset記錄的值,進行更新
              for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
                  // 更新向從節(jié)點推送的消息最大偏移量
                  boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
                  if (ok) {
                      this.groupTransferService.notifyTransferSome();
                      break;
                  } else {
                      value = this.push2SlaveMaxOffset.get();
                  }
              }
          }
      }
      
      3.5.2 WriteSocketService

      WriteSocketService用于Master節(jié)點向從節(jié)點發(fā)送同步消息,處理邏輯如下:

      1. 根據(jù)從節(jié)點發(fā)送的主從同步消息拉取偏移量slaveRequestOffset進行判斷:
        • 如果slaveRequestOffset值為-1,表示還未收到從節(jié)點報告的同步偏移量,此時睡眠一段時間等待從節(jié)點發(fā)送消息拉取偏移量;
        • 如果slaveRequestOffset值不為-1,表示已經(jīng)開始進行主從同步進行下一步;
      2. 判斷nextTransferFromWhere值是否為-1,nextTransferFromWhere記錄了下次需要傳輸?shù)南⒃贑ommitLog中的偏移量,如果值為-1表示初次進行數(shù)據(jù)同步,此時有兩種情況:
        • 如果從節(jié)點發(fā)送的拉取偏移量slaveRequestOffset為0,就從當前CommitLog文件最大偏移量開始同步;
        • 如果slaveRequestOffset不為0,則從slaveRequestOffset位置處進行數(shù)據(jù)同步;
      3. 判斷上次寫事件是否已經(jīng)將數(shù)據(jù)都寫入到從節(jié)點
        • 如果已經(jīng)寫入完畢,判斷距離上次寫入數(shù)據(jù)的時間間隔是否超過了設置的心跳時間,如果超過,為了避免連接空閑被關閉,需要發(fā)送一個心跳包,此時構建心跳包的請求數(shù)據(jù),調用transferData方法傳輸數(shù)據(jù);
        • 如果上次的數(shù)據(jù)還未傳輸完畢,調用transferData方法繼續(xù)傳輸,如果還是未完成,則結束此處處理;
      4. 根據(jù)nextTransferFromWhere從CommitLog中獲取消息,如果未獲取到消息,等待100ms,如果獲取到消息,從CommitLog中獲取消息進行傳輸:
        (1)如果獲取到消息的字節(jié)數(shù)大于最大傳輸?shù)拇笮。O置最最大傳輸數(shù)量,分批進行傳輸;
        (2)更新下次傳輸?shù)钠屏康刂芬簿褪莕extTransferFromWhere的值;
        (3)從CommitLog中獲取的消息內容設置到將讀取到的消息數(shù)據(jù)設置到selectMappedBufferResult中;
        (4)設置消息頭信息,包括消息頭字節(jié)數(shù)、拉取消息的偏移量等;
        (5)調用transferData發(fā)送數(shù)據(jù);
      public class HAConnection {
          class WriteSocketService extends ServiceThread {
              private final int headerSize = 8 + 4;// 消息頭大小
              @Override
              public void run() {
                  HAConnection.log.info(this.getServiceName() + " service started");
                  while (!this.isStopped()) {
                      try {
                          this.selector.select(1000);
                          // 如果slaveRequestOffset為-1,表示還未收到從節(jié)點報告的拉取進度
                          if (-1 == HAConnection.this.slaveRequestOffset) {
                              // 等待一段時間
                              Thread.sleep(10);
                              continue;
                          }
                          // 初次進行數(shù)據(jù)同步
                          if (-1 == this.nextTransferFromWhere) {
                              // 如果拉取進度為0
                              if (0 == HAConnection.this.slaveRequestOffset) {
                                  // 從master節(jié)點最大偏移量從開始傳輸
                                  long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                                  masterOffset =
                                      masterOffset
                                          - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                                          .getMappedFileSizeCommitLog());
      
                                  if (masterOffset < 0) {
                                      masterOffset = 0;
                                  }
                                  // 更新nextTransferFromWhere
                                  this.nextTransferFromWhere = masterOffset;
                              } else {
                                  // 根據(jù)從節(jié)點發(fā)送的偏移量開始數(shù)據(jù)同步
                                  this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
                              }
      
                              log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
                                  + "], and slave request " + HAConnection.this.slaveRequestOffset);
                          }
                          // 判斷上次傳輸是否完畢
                          if (this.lastWriteOver) {
                              // 獲取當前時間距離上次寫入數(shù)據(jù)的時間間隔
                              long interval =
                                  HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
                              // 如果距離上次寫入數(shù)據(jù)的時間間隔超過了設置的心跳時間
                              if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                                  .getHaSendHeartbeatInterval()) {
                                  // 構建header
                                  this.byteBufferHeader.position(0);
                                  this.byteBufferHeader.limit(headerSize);
                                  this.byteBufferHeader.putLong(this.nextTransferFromWhere);
                                  this.byteBufferHeader.putInt(0);
                                  this.byteBufferHeader.flip();
                                  // 發(fā)送心跳包
                                  this.lastWriteOver = this.transferData();
                                  if (!this.lastWriteOver)
                                      continue;
                              }
                          } else {
                              // 未傳輸完畢,繼續(xù)上次的傳輸
                              this.lastWriteOver = this.transferData();
                              // 如果依舊未完成,結束本次處理
                              if (!this.lastWriteOver)
                                  continue;
                          }
                          // 根據(jù)偏移量獲取消息數(shù)據(jù)
                          SelectMappedBufferResult selectResult =
                              HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
                          if (selectResult != null) {// 獲取消息不為空
                              // 獲取消息內容大小
                              int size = selectResult.getSize();
                              // 如果消息的字節(jié)數(shù)大于最大傳輸?shù)拇笮?/span>
                              if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                                  // 設置為最大傳輸大小
                                  size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
                              }
      
                              long thisOffset = this.nextTransferFromWhere;
                              // 更新下次傳輸?shù)钠屏康刂?/span>
                              this.nextTransferFromWhere += size;
      
                              selectResult.getByteBuffer().limit(size);
                              // 將讀取到的消息數(shù)據(jù)設置到selectMappedBufferResult
                              this.selectMappedBufferResult = selectResult;
      
                              // 設置消息頭
                              this.byteBufferHeader.position(0);
                              // 設置消息頭大小
                              this.byteBufferHeader.limit(headerSize);
                              // 設置偏移量地址
                              this.byteBufferHeader.putLong(thisOffset);
                              // 設置消息內容大小
                              this.byteBufferHeader.putInt(size);
                              this.byteBufferHeader.flip();
                              // 發(fā)送數(shù)據(jù)
                              this.lastWriteOver = this.transferData();
                          } else {
                              // 等待100ms
                              HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
                          }
                      } catch (Exception e) {
      
                          HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                          break;
                      }
                  }
      
                  HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();
      
                  // ...
                  HAConnection.log.info(this.getServiceName() + " service end");
              }
          }
      }
      

      四. 總結

      有新消息寫入之后的同步流程

      本文參考至:

      【RocketMQ】【源碼】主從同步實現(xiàn)原理 - shanml - 博客園 (cnblogs.com)

      RocketMQ(九):主從同步的實現(xiàn) - 阿牛20 - 博客園 (cnblogs.com)

      【RocketMQ】主從同步實現(xiàn)原理 - shanml - 博客園 (cnblogs.com)

      posted @ 2023-12-02 10:18  聽到微笑  閱讀(112)  評論(0)    收藏  舉報  來源
      主站蜘蛛池模板: 又黄又爽又无遮挡免费的网站| 国产精品综合一区二区三区 | 最新精品露脸国产在线| 亚洲精品成人片在线观看精品字幕| brazzers欧美巨大| 久久精品无码免费不卡 | 成年女人免费碰碰视频| 一区二区三区精品偷拍| 亚洲第一无码AV无码专区| 亚洲国产精品人人做人人爱| 中文字幕乱码无码人妻系列蜜桃| 少妇真人直播免费视频| 成人亚洲欧美一区二区三区| 国产亚洲综合欧美视频| 全部免费毛片在线播放| 成人自拍小视频在线观看| 日韩av在线一卡二卡三卡| 欧美中文字幕在线看| 我国产码在线观看av哈哈哈网站| 日韩精品无码免费专区午夜不卡| 人妻中文字幕一区二区视频| 日韩精品中文字幕人妻| 色又黄又爽18禁免费视频| 国产福利精品一区二区 | 成在线人免费视频| 精品一区二区三区日韩版| 日本一区二区国产在线| 亚洲av无码精品蜜桃| 国产精品露脸视频观看| 正在播放的国产A一片| 沾益县| 国产真实younv在线| 亚洲av永久无码精品网站| 久久精品av国产一区二区 | 超碰成人精品一区二区三| 国产亚洲精品中文字幕| 一本大道色婷婷在线| 久久久精品2019中文字幕之3| 国模一区二区三区私拍视频| 精品国产丝袜自在线拍国语| 猫咪AV成人永久网站在线观看|