<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      RocketMQ(九):主從同步的實現(xiàn)

        分布式系統(tǒng)的三大理論CAP就不說了,但是作為分布式消息系統(tǒng)的rocketmq, 主從功能是最最基礎的保證可用性的手段了。也許該功能現(xiàn)在已經(jīng)不是很常用了,但是對于我們理解一些分布式系統(tǒng)的常用工作原理還是有些積極意義的。

        今天就一起來挖挖rocketmq是如何實現(xiàn)主從數(shù)據(jù)同步的吧。

       

      1. 主從同步概述

        主從同步這個概念相信大家在平時的工作中,多少都會聽到。其目的主要是用于做一備份類操作,以及一些讀寫分離場景。比如我們常用的關系型數(shù)據(jù)庫mysql,就有主從同步功能在。

        主從同步,就是將主服務器上的數(shù)據(jù)同步到從服務器上,也就是相當于新增了一個副本。

        而具體的主從同步的實現(xiàn)也各有千秋,如mysql中通過binlog實現(xiàn)主從同步,es中通過translog實現(xiàn)主從同步,redis中通過aof實現(xiàn)主從同步。那么,rocketmq又是如何實現(xiàn)的主從同步呢?

        另外,主從同步需要考慮的問題是哪些呢?

          1. 數(shù)據(jù)同步的及時性?(延遲與一致性)
          2. 對主服務器的影響性?(可用性)
          3. 是否可替代主服務器?(可用性或者分區(qū)容忍性)

        前面兩個點是必須要考慮的,但對于第3個點,則可能不會被考慮。因為通過系統(tǒng)可能無法很好的做到這一點,所以很多系統(tǒng)就直接忽略這一點了,簡單嘛。即很多時候只把從服務器當作是一個備份存在,不會接受寫請求。如果要進行主從切換,必須要人工介入,做預知的有損切換。但隨著技術的發(fā)展,現(xiàn)在已有非常多的自動切換主從的服務存在,這是在分布式系統(tǒng)滿天下的當今的必然趨勢。

       

      2. rocketmq主從同步配置

        在rocketmq中,最核心的組件是 broker, 它負責幾乎所有的存儲讀取業(yè)務。所以,要談主從同步,那必然是針對broker進行的。我們再來回看rocketmq的部署架構圖,以便全局觀察:

        非常清晰的架構,無須多言。因為我們講的是主從同步,所以只看broker這個組件,那么整個架構就可以簡化為:  BrokerMaster -> BrokerSlave 了。同樣,再簡化,主從同步就是如何將Master的數(shù)據(jù)同步到Slave這么個過程。

        那么,如何配置使用主從同步呢?

        conf/broker-a.properties  (master配置)

      #所屬集群名字
      brokerClusterName=DefaultCluster
      #broker名字,名字可重復,為了管理,每個master起一個名字,他的slave同他,eg:Amaster叫broker-a,他的slave也叫broker-a
      brokerName=broker-a
      #0 表示 Master,>0 表示 Slave
      brokerId=0
      #Broker 的角色
      #- ASYNC_MASTER 異步復制Master
      #- SYNC_MASTER 同步雙寫Master
      #- SLAVE
      brokerRole=ASYNC_MASTER
      #刷盤方式
      #- ASYNC_FLUSH 異步刷盤
      #- SYNC_FLUSH 同步刷盤
      flushDiskType=ASYNC_FLUSH
      #nameServer地址,分號分割
      namesrvAddr=172.0.1.5:9876;172.0.1.6:9876
      #在發(fā)送消息時,自動創(chuàng)建服務器不存在的topic,默認創(chuàng)建的隊列數(shù)
      defaultTopicQueueNums=4
      #是否允許 Broker 自動創(chuàng)建Topic,建議線下開啟,線上關閉
      autoCreateTopicEnable=true
      #是否允許 Broker 自動創(chuàng)建訂閱組,建議線下開啟,線上關閉
      autoCreateSubscriptionGroup=true
      #Broker 對外服務的監(jiān)聽端口,
      listenPort=10911
      #刪除文件時間點,默認凌晨 4點
      deleteWhen=04
      #文件保留時間,默認 48 小時
      fileReservedTime=120
      #commitLog每個文件的大小默認1G
      mapedFileSizeCommitLog=1073741824
      #ConsumeQueue每個文件默認存30W條,根據(jù)業(yè)務情況調整
      mapedFileSizeConsumeQueue=300000
      #destroyMapedFileIntervalForcibly=120000
      #redeleteHangedFileInterval=120000
      #檢測物理文件磁盤空間
      diskMaxUsedSpaceRatio=88
      #存儲路徑
      storePathRootDir=/usr/local/rocketmq/store/broker-a
      #commitLog 存儲路徑
      storePathCommitLog=/usr/local/rocketmq/store/broker-a/commitlog
      #消費隊列存儲路徑存儲路徑
      storePathConsumeQueue=/usr/local/rocketmq/store/broker-a/consumequeue
      #消息索引存儲路徑
      storePathIndex=/usr/local/rocketmq/store/broker-a/index
      #checkpoint 文件存儲路徑
      storeCheckpoint=/usr/local/rocketmq/store/checkpoint
      #abort 文件存儲路徑
      abortFile=/usr/local/rocketmq/store/abort
      #限制的消息大小
      maxMessageSize=65536
      #flushCommitLogLeastPages=4
      #flushConsumeQueueLeastPages=2
      #flushCommitLogThoroughInterval=10000
      #flushConsumeQueueThoroughInterval=60000
      #checkTransactionMessageEnable=false
      #發(fā)消息線程池數(shù)量
      #sendMessageThreadPoolNums=128
      #拉消息線程池數(shù)量
      #pullMessageThreadPoolNums=128

        conf/broker-a-s.properties (slave配置)

      #所屬集群名字
      brokerClusterName=DefaultCluster
      #broker名字,名字可重復,為了管理,每個master起一個名字,他的slave同他,eg:Amaster叫broker-a,他的slave也叫broker-a
      brokerName=broker-a
      #0 表示 Master,>0 表示 Slave
      brokerId=1
      #Broker 的角色
      #- ASYNC_MASTER 異步復制Master
      #- SYNC_MASTER 同步雙寫Master
      #- SLAVE
      brokerRole=SLAVE
      #刷盤方式
      #- ASYNC_FLUSH 異步刷盤
      #- SYNC_FLUSH 同步刷盤
      flushDiskType=ASYNC_FLUSH
      #nameServer地址,分號分割
      namesrvAddr=172.0.1.5:9876;172.0.1.6:9876
      #在發(fā)送消息時,自動創(chuàng)建服務器不存在的topic,默認創(chuàng)建的隊列數(shù)
      defaultTopicQueueNums=4
      #是否允許 Broker 自動創(chuàng)建Topic,建議線下開啟,線上關閉
      autoCreateTopicEnable=true
      #是否允許 Broker 自動創(chuàng)建訂閱組,建議線下開啟,線上關閉
      autoCreateSubscriptionGroup=true
      #Broker 對外服務的監(jiān)聽端口,
      listenPort=10920
      #刪除文件時間點,默認凌晨 4點
      deleteWhen=04
      #文件保留時間,默認 48 小時
      fileReservedTime=120
      #commitLog每個文件的大小默認1G
      mapedFileSizeCommitLog=1073741824
      #ConsumeQueue每個文件默認存30W條,根據(jù)業(yè)務情況調整
      mapedFileSizeConsumeQueue=300000
      #destroyMapedFileIntervalForcibly=120000
      #redeleteHangedFileInterval=120000
      #檢測物理文件磁盤空間
      diskMaxUsedSpaceRatio=88
      #存儲路徑
      storePathRootDir=/usr/local/rocketmq/store/broker-a-s
      #commitLog 存儲路徑
      storePathCommitLog=/usr/local/rocketmq/store/broker-a-s/commitlog
      #消費隊列存儲路徑存儲路徑
      storePathConsumeQueue=/usr/local/rocketmq/store/broker-a-s/consumequeue
      #消息索引存儲路徑
      storePathIndex=/usr/local/rocketmq/store/broker-a-s/index
      #checkpoint 文件存儲路徑
      storeCheckpoint=/usr/local/rocketmq/store/checkpoint
      #abort 文件存儲路徑
      abortFile=/usr/local/rocketmq/store/abort
      #限制的消息大小
      maxMessageSize=65536
      #flushCommitLogLeastPages=4
      #flushConsumeQueueLeastPages=2
      #flushCommitLogThoroughInterval=10000
      #flushConsumeQueueThoroughInterval=60000
      #checkTransactionMessageEnable=false
      #發(fā)消息線程池數(shù)量
      #sendMessageThreadPoolNums=128
      #拉消息線程池數(shù)量
      #pullMessageThreadPoolNums=128

        實際上具體配置文件叫什么名字不重要,重要的是要在啟動時指定指定對應的配置文件位置即可。啟動master/slave命令如下:

          nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties > logs/broker-a.log 2>&1 &
          nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties > logs/broker-a-s.log 2>&1 &

        以上配置,如果怕啟動命令出錯,也可以統(tǒng)一使用一個 broker.properties (默認查找), 里面寫不同的內容,這樣就無需在不同機器上使用不同的命令啟動了,也避免了一定程度的誤操作。

        當然要在啟動broker之前啟動nameserver節(jié)點。這樣,一個rocketmq的主從集群就配置好了。配置項看起來有點多,但核心實際上只有一個:在保持brokderName相同的前提下配置brokerRole=ASYNC_MASTER|SLAVE|SYNC_MASTER, 通過這個值就可以確定是主是從。從向主復制數(shù)據(jù)或者主向從同步數(shù)據(jù)。

       

      3. rocketmq主從同步的實現(xiàn)

        了解完主從配置,才是我們理解實現(xiàn)的開始。也從上面的說明中,我們看出,一個broker是master或者slave是在配置文件中就指定了的,也就是說這個性質是改不了的了。所以,這個主從相關的動作,會在broker啟動時就表現(xiàn)出不一樣了。

        我們先看看broker運行同步的大體框架如何:

          // org.apache.rocketmq.broker.BrokerController#start
          public void start() throws Exception {
              if (this.messageStore != null) {
                  this.messageStore.start();
              }
      
              if (this.remotingServer != null) {
                  this.remotingServer.start();
              }
      
              if (this.fastRemotingServer != null) {
                  this.fastRemotingServer.start();
              }
      
              if (this.fileWatchService != null) {
                  this.fileWatchService.start();
              }
      
              if (this.brokerOuterAPI != null) {
                  this.brokerOuterAPI.start();
              }
      
              if (this.pullRequestHoldService != null) {
                  this.pullRequestHoldService.start();
              }
      
              if (this.clientHousekeepingService != null) {
                  this.clientHousekeepingService.start();
              }
      
              if (this.filterServerManager != null) {
                  this.filterServerManager.start();
              }
      
              if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                  startProcessorByHa(messageStoreConfig.getBrokerRole());
                  // 處理SLAVE消息同步
                  handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
                  // 強制做一次注冊動作
                  this.registerBrokerAll(true, false, true);
              }
              // 定期向nameserver注冊自身狀態(tài)
              this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
      
                  @Override
                  public void run() {
                      try {
                          BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                      } catch (Throwable e) {
                          log.error("registerBrokerAll Exception", e);
                      }
                  }
              }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
      
              if (this.brokerStatsManager != null) {
                  this.brokerStatsManager.start();
              }
      
              if (this.brokerFastFailure != null) {
                  this.brokerFastFailure.start();
              }
      
      
          }
      
          private void handleSlaveSynchronize(BrokerRole role) {
              // 只有slave節(jié)點,才進行同步操作
              if (role == BrokerRole.SLAVE) {
                  if (null != slaveSyncFuture) {
                      slaveSyncFuture.cancel(false);
                  }
                  // 設置master節(jié)點為空,避免一開始就進行同步
                  // 后續(xù)必然有其他地方設計 master信息
                  // 實際上它是在registerBrokerAll() 的時候,將master信息放入的
                  this.slaveSynchronize.setMasterAddr(null);
                  // 10秒鐘同步一次數(shù)據(jù)
                  slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                      @Override
                      public void run() {
                          try {
                              BrokerController.this.slaveSynchronize.syncAll();
                          }
                          catch (Throwable e) {
                              log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
                          }
                      }
                  }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
              } else {
                  //handle the slave synchronise
                  if (null != slaveSyncFuture) {
                      slaveSyncFuture.cancel(false);
                  }
                  this.slaveSynchronize.setMasterAddr(null);
              }
          }
          public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
              TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
      
              if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
                  || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
                  ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
                  for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
                      TopicConfig tmp =
                          new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                              this.brokerConfig.getBrokerPermission());
                      topicConfigTable.put(topicConfig.getTopicName(), tmp);
                  }
                  topicConfigWrapper.setTopicConfigTable(topicConfigTable);
              }
              // 強制注冊或者進行周期性注冊時間到時,向nameserver注冊自身
              if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
                                              this.getBrokerAddr(),
                                              this.brokerConfig.getBrokerName(),
                                              this.brokerConfig.getBrokerId(),
                                              this.brokerConfig.getRegisterBrokerTimeoutMills())) {
                  doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
              }
          }
      
          private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
              TopicConfigSerializeWrapper topicConfigWrapper) {
              // 向多個nameserver依次注冊
              List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
                  this.brokerConfig.getBrokerClusterName(),
                  this.getBrokerAddr(),
                  this.brokerConfig.getBrokerName(),
                  this.brokerConfig.getBrokerId(),
                  this.getHAServerAddr(),
                  topicConfigWrapper,
                  this.filterServerManager.buildNewFilterServerList(),
                  oneway,
                  this.brokerConfig.getRegisterBrokerTimeoutMills(),
                  this.brokerConfig.isCompressedRegister());
      
              if (registerBrokerResultList.size() > 0) {
                  RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
                  if (registerBrokerResult != null) {
                      if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                          this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                      }
                      // 更新master地址信息
                      this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
      
                      if (checkOrderConfig) {
                          this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                      }
                  }
              }
          }
          // org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll
          public List<RegisterBrokerResult> registerBrokerAll(
              final String clusterName,
              final String brokerAddr,
              final String brokerName,
              final long brokerId,
              final String haServerAddr,
              final TopicConfigSerializeWrapper topicConfigWrapper,
              final List<String> filterServerList,
              final boolean oneway,
              final int timeoutMills,
              final boolean compressed) {
      
              final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
              List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
              if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
      
                  final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
                  requestHeader.setBrokerAddr(brokerAddr);
                  requestHeader.setBrokerId(brokerId);
                  requestHeader.setBrokerName(brokerName);
                  requestHeader.setClusterName(clusterName);
                  requestHeader.setHaServerAddr(haServerAddr);
                  requestHeader.setCompressed(compressed);
      
                  RegisterBrokerBody requestBody = new RegisterBrokerBody();
                  requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
                  requestBody.setFilterServerList(filterServerList);
                  final byte[] body = requestBody.encode(compressed);
                  final int bodyCrc32 = UtilAll.crc32(body);
                  requestHeader.setBodyCrc32(bodyCrc32);
                  final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
                  for (final String namesrvAddr : nameServerAddressList) {
                      // 多線程同時注冊多個nameserver, 效果更佳
                      brokerOuterExecutor.execute(new Runnable() {
                          @Override
                          public void run() {
                              try {
                                  RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                                  if (result != null) {
                                      registerBrokerResultList.add(result);
                                  }
      
                                  log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                              } catch (Exception e) {
                                  log.warn("registerBroker Exception, {}", namesrvAddr, e);
                              } finally {
                                  countDownLatch.countDown();
                              }
                          }
                      });
                  }
      
                  try {
                      countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
                  } catch (InterruptedException e) {
                  }
              }
      
              return registerBrokerResultList;
          }

        基本上,master與slave差別不大,各broker需要的功能,都會具有的。比如都會開啟各服務端口,都會進行文件清理動作,都會向nameserver注冊自身等等。唯一的差別在于,slave會另外開啟一個同步的定時任務,每10秒向master發(fā)送一次同步請求,即 syncAll(); 那么,所謂的同步,到底是同步個啥?即其如何實現(xiàn)同步?

        所有的主從同步的實現(xiàn)都在這里了:syncAll();

          // org.apache.rocketmq.broker.slave.SlaveSynchronize#syncAll
          public void syncAll() {
              // 同步topic配置信息
              this.syncTopicConfig();
              // 同步消費偏移量信息
              this.syncConsumerOffset();
              // 同步延遲信息
              this.syncDelayOffset();
              // 同步消費組信息數(shù)據(jù),所以主從同步的核心,是基于消息的訂閱來實現(xiàn)的
              this.syncSubscriptionGroupConfig();
          }
      
          // 同步topic配置信息
          private void syncTopicConfig() {
              String masterAddrBak = this.masterAddr;
              // 存在master地址,且該地址不是自身時,才會進行同步動作
              if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
                  try {
                      TopicConfigSerializeWrapper topicWrapper =
                          this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
                      // 版本發(fā)生變更,即數(shù)據(jù)有變化,則寫入新的版本數(shù)據(jù)
                      if (!this.brokerController.getTopicConfigManager().getDataVersion()
                          .equals(topicWrapper.getDataVersion())) {
      
                          this.brokerController.getTopicConfigManager().getDataVersion()
                              .assignNewOne(topicWrapper.getDataVersion());
                          this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
                          this.brokerController.getTopicConfigManager().getTopicConfigTable()
                              .putAll(topicWrapper.getTopicConfigTable());
                          // 持久化topic信息
                          this.brokerController.getTopicConfigManager().persist();
      
                          log.info("Update slave topic config from master, {}", masterAddrBak);
                      }
                  } catch (Exception e) {
                      log.error("SyncTopicConfig Exception, {}", masterAddrBak, e);
                  }
              }
          }
      
          // 同步消費偏移量信息
          private void syncConsumerOffset() {
              String masterAddrBak = this.masterAddr;
              if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
                  try {
                      ConsumerOffsetSerializeWrapper offsetWrapper =
                          this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
                      this.brokerController.getConsumerOffsetManager().getOffsetTable()
                          .putAll(offsetWrapper.getOffsetTable());
                      this.brokerController.getConsumerOffsetManager().persist();
                      log.info("Update slave consumer offset from master, {}", masterAddrBak);
                  } catch (Exception e) {
                      log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
                  }
              }
          }
          // 額。。。 反正就是一個數(shù)字吧, 存儲在 config/delayOffset.json 下
          private void syncDelayOffset() {
              String masterAddrBak = this.masterAddr;
              if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
                  try {
                      String delayOffset =
                          this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
                      if (delayOffset != null) {
      
                          String fileName =
                              StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
                                  .getMessageStoreConfig().getStorePathRootDir());
                          try {
                              MixAll.string2File(delayOffset, fileName);
                          } catch (IOException e) {
                              log.error("Persist file Exception, {}", fileName, e);
                          }
                      }
                      log.info("Update slave delay offset from master, {}", masterAddrBak);
                  } catch (Exception e) {
                      log.error("SyncDelayOffset Exception, {}", masterAddrBak, e);
                  }
              }
          }
      
          // 同步消費組信息數(shù)據(jù)
          private void syncSubscriptionGroupConfig() {
              String masterAddrBak = this.masterAddr;
              if (masterAddrBak != null  && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
                  try {
                      SubscriptionGroupWrapper subscriptionWrapper =
                          this.brokerController.getBrokerOuterAPI()
                              .getAllSubscriptionGroupConfig(masterAddrBak);
      
                      if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()
                          .equals(subscriptionWrapper.getDataVersion())) {
                          SubscriptionGroupManager subscriptionGroupManager =
                              this.brokerController.getSubscriptionGroupManager();
                          subscriptionGroupManager.getDataVersion().assignNewOne(
                              subscriptionWrapper.getDataVersion());
                          subscriptionGroupManager.getSubscriptionGroupTable().clear();
                          subscriptionGroupManager.getSubscriptionGroupTable().putAll(
                              subscriptionWrapper.getSubscriptionGroupTable());
                          // 持久化消費組信息
                          subscriptionGroupManager.persist();
                          log.info("Update slave Subscription Group from master, {}", masterAddrBak);
                      }
                  } catch (Exception e) {
                      log.error("SyncSubscriptionGroup Exception, {}", masterAddrBak, e);
                  }
              }
          }

        以上,就是rocketmq的主從同步的主體框架代碼了。回答上面的幾個疑問:同步個啥?同步4種數(shù)據(jù):topic信息、消費偏移信息、延遲信息、訂閱組信息;同步的及時性如何?每10秒發(fā)起一步同步請求,即延遲是10秒級的。

        等等,以上同步的信息,看起來都是元數(shù)據(jù)信息。那么消息數(shù)據(jù)的同步去哪里了?這可是我們最關心的啊!

       

      4. rocketmq消息數(shù)據(jù)的同步實現(xiàn)

        經(jīng)過上一節(jié)的分析,我們好像摸到了點皮毛,然后發(fā)現(xiàn)不是想要的。因為定時任務只同步了元數(shù)據(jù)信息,而真正的數(shù)據(jù)信息同步去了哪里呢?實際上,它是由一個HAService去承載該功能的,HAService會使用的一個主循環(huán),一直不停地向master拉取數(shù)據(jù),然后添加到自身的commitlog文件中,從而實現(xiàn)真正的數(shù)據(jù)同步。

       

      4.1. HAService的開啟

        同步服務是一系列專門的實現(xiàn)的,它包括server端,客戶端以及一些維護線程。這需要我們分開理解。同步服務的開啟,是在messageStore初始化時做的。它會讀取一個單獨的端口配置,開啟HA同步服務。

          // org.apache.rocketmq.store.DefaultMessageStore#DefaultMessageStore
          public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
              final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
              this.messageArrivingListener = messageArrivingListener;
              this.brokerConfig = brokerConfig;
              this.messageStoreConfig = messageStoreConfig;
              this.brokerStatsManager = brokerStatsManager;
              this.allocateMappedFileService = new AllocateMappedFileService(this);
              if (messageStoreConfig.isEnableDLegerCommitLog()) {
                  this.commitLog = new DLedgerCommitLog(this);
              } else {
                  this.commitLog = new CommitLog(this);
              }
              this.consumeQueueTable = new ConcurrentHashMap<>(32);
      
              this.flushConsumeQueueService = new FlushConsumeQueueService();
              this.cleanCommitLogService = new CleanCommitLogService();
              this.cleanConsumeQueueService = new CleanConsumeQueueService();
              this.storeStatsService = new StoreStatsService();
              this.indexService = new IndexService(this);
              if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                  // 初始化 HAService
                  this.haService = new HAService(this);
              } else {
                  this.haService = null;
              }
              ...
              File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
              MappedFile.ensureDirOK(file.getParent());
              lockFile = new RandomAccessFile(file, "rw");
          }
      
          // org.apache.rocketmq.store.ha.HAService#HAService
          public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
              this.defaultMessageStore = defaultMessageStore;
              // 開啟server端服務
              this.acceptSocketService =
                  new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
              this.groupTransferService = new GroupTransferService();
              // 初始化client
              this.haClient = new HAClient();
          }
          // 具體運行則都會被視為一個個的后臺線程,會在start()操作中統(tǒng)一運行起來
          public void start() throws Exception {
              // server端服務啟動,由master節(jié)點管控
              this.acceptSocketService.beginAccept();
              this.acceptSocketService.start();
              // 數(shù)據(jù)中轉服務,它會接收用戶的寫請求,然后吐數(shù)據(jù)給到各slave節(jié)點
              this.groupTransferService.start();
              // 客戶端請求服務,由slave節(jié)點發(fā)起
              this.haClient.start();
          }

        HAService作為rocketmq中的一個小型服務,運行在后臺線程中,為了簡單起見或者資源隔離,它使用一些單獨的端口和通信實現(xiàn)處理。也可謂麻雀雖小,五臟俱全。下面我就分三個單獨的部分講解下如何實現(xiàn)數(shù)據(jù)同步。

       

      4.2. 從節(jié)點同步實現(xiàn)

        從節(jié)點負責主動拉取主節(jié)點數(shù)據(jù),是一個比較重要的步驟。它的實現(xiàn)是在 HAClient 中的,該client啟動起來之后,會一直不停地向master請求新的數(shù)據(jù),然后同步到自己的commitlog中。

              // org.apache.rocketmq.store.ha.HAService.HAClient#run
              @Override
              public void run() {
                  log.info(this.getServiceName() + " service started");
      
                  while (!this.isStopped()) {
                      try {
                          // 使用原生nio, 嘗試連接至master
                          if (this.connectMaster()) {
      
                              if (this.isTimeToReportOffset()) {
                                  // 隔一段時間向master匯報一次本slave的同步信息
                                  boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                                  // 如果連接無效,則關閉,下次再循環(huán)周期將會重新發(fā)起連接
                                  if (!result) {
                                      this.closeMaster();
                                  }
                              }
                              this.selector.select(1000);
                              // 核心邏輯:處理獲取到的消息數(shù)據(jù)
                              boolean ok = this.processReadEvent();
                              if (!ok) {
                                  this.closeMaster();
                              }
      
                              if (!reportSlaveMaxOffsetPlus()) {
                                  continue;
                              }
      
                              long interval =
                                  HAService.this.getDefaultMessageStore().getSystemClock().now()
                                      - this.lastWriteTimestamp;
                              if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                                  .getHaHousekeepingInterval()) {
                                  log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
                                      + "] expired, " + interval);
                                  this.closeMaster();
                                  log.warn("HAClient, master not response some time, so close connection");
                              }
                          } else {
                              // 未連接成功,5秒后重試,可能會一直無用
                              this.waitForRunning(1000 * 5);
                          }
                      } catch (Exception e) {
                          log.warn(this.getServiceName() + " service has exception. ", e);
                          this.waitForRunning(1000 * 5);
                      }
                  }
      
                  log.info(this.getServiceName() + " service end");
              }
              
              private boolean connectMaster() throws ClosedChannelException {
                  // 單例長鏈接
                  if (null == socketChannel) {
                      String addr = this.masterAddress.get();
                      // 如果沒有master, 則返回空
                      // 針對master節(jié)點,也是同樣的運行,只是不會連接到任何節(jié)點而已
                      if (addr != null) {
      
                          SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
                          if (socketAddress != null) {
                              // 原生nio實現(xiàn)
                              this.socketChannel = RemotingUtil.connect(socketAddress);
                              if (this.socketChannel != null) {
                                  this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                              }
                          }
                      }
      
                      this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
      
                      this.lastWriteTimestamp = System.currentTimeMillis();
                  }
      
                  return this.socketChannel != null;
              }
          // org.apache.rocketmq.remoting.common.RemotingUtil#connect
          public static SocketChannel connect(SocketAddress remote) {
              return connect(remote, 1000 * 5);
          }
          public static SocketChannel connect(SocketAddress remote, final int timeoutMillis) {
              SocketChannel sc = null;
              try {
                  sc = SocketChannel.open();
                  sc.configureBlocking(true);
                  sc.socket().setSoLinger(false, -1);
                  sc.socket().setTcpNoDelay(true);
                  sc.socket().setReceiveBufferSize(1024 * 64);
                  sc.socket().setSendBufferSize(1024 * 64);
                  sc.socket().connect(remote, timeoutMillis);
                  sc.configureBlocking(false);
                  return sc;
              } catch (Exception e) {
                  if (sc != null) {
                      try {
                          sc.close();
                      } catch (IOException e1) {
                          e1.printStackTrace();
                      }
                  }
              }
              return null;
          }
          processReadEvent() 即是在收到master的新數(shù)據(jù)后,實現(xiàn)如何同步到本broker的commitlog中。其實現(xiàn)主要還是依賴于commitlogService.
              // org.apache.rocketmq.store.ha.HAService.HAClient#processReadEvent
              private boolean processReadEvent() {
                  int readSizeZeroTimes = 0;
                  while (this.byteBufferRead.hasRemaining()) {
                      try {
                          int readSize = this.socketChannel.read(this.byteBufferRead);
                          if (readSize > 0) {
                              readSizeZeroTimes = 0;
                              boolean result = this.dispatchReadRequest();
                              if (!result) {
                                  log.error("HAClient, dispatchReadRequest error");
                                  return false;
                              }
                          } else if (readSize == 0) {
                              if (++readSizeZeroTimes >= 3) {
                                  break;
                              }
                          } else {
                              log.info("HAClient, processReadEvent read socket < 0");
                              return false;
                          }
                      } catch (IOException e) {
                          log.info("HAClient, processReadEvent read socket exception", e);
                          return false;
                      }
                  }
      
                  return true;
              }
      
              private boolean dispatchReadRequest() {
                  // 按協(xié)議讀取數(shù)據(jù)
                  final int msgHeaderSize = 8 + 4; // phyoffset + size
                  int readSocketPos = this.byteBufferRead.position();
      
                  while (true) {
                      int diff = this.byteBufferRead.position() - this.dispatchPosition;
                      if (diff >= msgHeaderSize) {
                          long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
                          int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
      
                          long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
      
                          if (slavePhyOffset != 0) {
                              if (slavePhyOffset != masterPhyOffset) {
                                  log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                                      + slavePhyOffset + " MASTER: " + masterPhyOffset);
                                  return false;
                              }
                          }
                          // 數(shù)據(jù)讀取完成,則立即添加到存儲中
                          if (diff >= (msgHeaderSize + bodySize)) {
                              byte[] bodyData = new byte[bodySize];
                              this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
                              this.byteBufferRead.get(bodyData);
      
                              HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
      
                              this.byteBufferRead.position(readSocketPos);
                              this.dispatchPosition += msgHeaderSize + bodySize;
      
                              if (!reportSlaveMaxOffsetPlus()) {
                                  return false;
                              }
      
                              continue;
                          }
                      }
      
                      if (!this.byteBufferRead.hasRemaining()) {
                          this.reallocateByteBuffer();
                      }
      
                      break;
                  }
      
                  return true;
              }
          // org.apache.rocketmq.store.DefaultMessageStore#appendToCommitLog
          @Override
          public boolean appendToCommitLog(long startOffset, byte[] data) {
              if (this.shutdown) {
                  log.warn("message store has shutdown, so appendToPhyQueue is forbidden");
                  return false;
              }
              // 添加到commitlog中,并生成后續(xù)的consumeQueue,index等相關信息
              boolean result = this.commitLog.appendData(startOffset, data);
              if (result) {
                  this.reputMessageService.wakeup();
              } else {
                  log.error("appendToPhyQueue failed " + startOffset + " " + data.length);
              }
      
              return result;
          }

        從slave節(jié)點的處理流程,我們基本上已經(jīng)完全搞清楚了rocketmq如何同步數(shù)據(jù)的了。單獨開啟一個端口用于同步數(shù)據(jù),slave一直不停地輪詢master, 拿到新數(shù)據(jù)后,就將其添加到自身的commitlog中,構造自身的數(shù)據(jù)集。從而保持與master的同步。(請需要注意數(shù)據(jù)一致性)

       

      4.3. master的數(shù)據(jù)同步服務

        從節(jié)點負責不停從主節(jié)點拉取數(shù)據(jù),所以主節(jié)點只要給到數(shù)據(jù)就可以了。但至少,主節(jié)點還是有一個網(wǎng)絡服務,以便接受從節(jié)點的請求。

        這同樣是在 HAService中,它直接以nio的形式開啟一個服務端口,從而接收請求:

          // org.apache.rocketmq.store.ha.HAService.AcceptSocketService
          /**
           * Listens to slave connections to create {@link HAConnection}.
           */
          class AcceptSocketService extends ServiceThread {
              private final SocketAddress socketAddressListen;
              private ServerSocketChannel serverSocketChannel;
              private Selector selector;
              // 給定端口監(jiān)聽
              public AcceptSocketService(final int port) {
                  this.socketAddressListen = new InetSocketAddress(port);
              }
      
              /**
               * Starts listening to slave connections.
               *
               * @throws Exception If fails.
               */
              public void beginAccept() throws Exception {
                  this.serverSocketChannel = ServerSocketChannel.open();
                  this.selector = RemotingUtil.openSelector();
                  this.serverSocketChannel.socket().setReuseAddress(true);
                  this.serverSocketChannel.socket().bind(this.socketAddressListen);
                  this.serverSocketChannel.configureBlocking(false);
                  this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
              }
      
              /**
               * {@inheritDoc}
               */
              @Override
              public void run() {
                  log.info(this.getServiceName() + " service started");
      
                  while (!this.isStopped()) {
                      try {
                          this.selector.select(1000);
                          Set<SelectionKey> selected = this.selector.selectedKeys();
      
                          if (selected != null) {
                              for (SelectionKey k : selected) {
                                  if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                                      SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
      
                                      if (sc != null) {
                                          HAService.log.info("HAService receive new connection, "
                                              + sc.socket().getRemoteSocketAddress());
      
                                          try {
                                              HAConnection conn = new HAConnection(HAService.this, sc);
                                              // accept 接入后,開啟另外的讀線程處理數(shù)據(jù)請求
                                              conn.start();
                                              HAService.this.addConnection(conn);
                                          } catch (Exception e) {
                                              log.error("new HAConnection exception", e);
                                              sc.close();
                                          }
                                      }
                                  } else {
                                      log.warn("Unexpected ops in select " + k.readyOps());
                                  }
                              }
      
                              selected.clear();
                          }
                      } catch (Exception e) {
                          log.error(this.getServiceName() + " service has exception.", e);
                      }
                  }
      
                  log.info(this.getServiceName() + " service end");
              }
              ...
          }
          // org.apache.rocketmq.store.ha.HAConnection#start
          public void start() {
              this.readSocketService.start();
              this.writeSocketService.start();
          }
              // org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#run
              @Override
              public void run() {
                  HAConnection.log.info(this.getServiceName() + " service started");
      
                  while (!this.isStopped()) {
                      try {
                          this.selector.select(1000);
                          boolean ok = this.processReadEvent();
                          if (!ok) {
                              HAConnection.log.error("processReadEvent error");
                              break;
                          }
      
                          long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
                          if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
                              log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
                              break;
                          }
                      } catch (Exception e) {
                          HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                          break;
                      }
                  }
      
                  this.makeStop();
      
                  writeSocketService.makeStop();
      
                  haService.removeConnection(HAConnection.this);
      
                  HAConnection.this.haService.getConnectionCount().decrementAndGet();
      
                  SelectionKey sk = this.socketChannel.keyFor(this.selector);
                  if (sk != null) {
                      sk.cancel();
                  }
      
                  try {
                      this.selector.close();
                      this.socketChannel.close();
                  } catch (IOException e) {
                      HAConnection.log.error("", e);
                  }
      
                  HAConnection.log.info(this.getServiceName() + " service end");
              }
      
              private boolean processReadEvent() {
                  int readSizeZeroTimes = 0;
      
                  if (!this.byteBufferRead.hasRemaining()) {
                      this.byteBufferRead.flip();
                      this.processPosition = 0;
                  }
      
                  while (this.byteBufferRead.hasRemaining()) {
                      try {
                          int readSize = this.socketChannel.read(this.byteBufferRead);
                          if (readSize > 0) {
                              readSizeZeroTimes = 0;
                              this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                              if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
                                  int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                                  long readOffset = this.byteBufferRead.getLong(pos - 8);
                                  this.processPosition = pos;
                                  // 讀取唯一參數(shù)
                                  HAConnection.this.slaveAckOffset = readOffset;
                                  if (HAConnection.this.slaveRequestOffset < 0) {
                                      HAConnection.this.slaveRequestOffset = readOffset;
                                      log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
                                  }
                                  // ...
                                  HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                              }
                          } else if (readSize == 0) {
                              if (++readSizeZeroTimes >= 3) {
                                  break;
                              }
                          } else {
                              log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
                              return false;
                          }
                      } catch (IOException e) {
                          log.error("processReadEvent exception", e);
                          return false;
                      }
                  }
      
                  return true;
              }
          // org.apache.rocketmq.store.ha.HAService#notifyTransferSome
          public void notifyTransferSome(final long offset) {
              for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
                  boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
                  if (ok) {
                      this.groupTransferService.notifyTransferSome();
                      break;
                  } else {
                      value = this.push2SlaveMaxOffset.get();
                  }
              }
          }

        端口開啟及接受請求很容易,但如何響應客戶端還是有點復雜的。各自同學自行深入吧!

        GroupCommitService 通過一個寫隊列和讀隊列,在有消息寫入時將被調用,從而達到實時通知的目的。

              // org.apache.rocketmq.store.ha.HAService.GroupTransferService#putRequest
              public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {
                  synchronized (this.requestsWrite) {
                      this.requestsWrite.add(request);
                  }
                  this.wakeup();
              }
      
              public void notifyTransferSome() {
                  this.notifyTransferObject.wakeup();
              }
      
              private void swapRequests() {
                  // 交換buffer
                  List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;
                  this.requestsWrite = this.requestsRead;
                  this.requestsRead = tmp;
              }
      
              private void doWaitTransfer() {
                  synchronized (this.requestsRead) {
                      if (!this.requestsRead.isEmpty()) {
                          for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                              boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                              long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
                                  + HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
                              while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
                                  this.notifyTransferObject.waitForRunning(1000);
                                  transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                              }
      
                              if (!transferOK) {
                                  log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
                              }
      
                              req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                          }
      
                          this.requestsRead.clear();
                      }
                  }
              }
      
              public void run() {
                  log.info(this.getServiceName() + " service started");
      
                  while (!this.isStopped()) {
                      try {
                          this.waitForRunning(10);
                          this.doWaitTransfer();
                      } catch (Exception e) {
                          log.warn(this.getServiceName() + " service has exception. ", e);
                      }
                  }
      
                  log.info(this.getServiceName() + " service end");
              }

        至此,rocketmq主從同步解析完成。rocketmq基于commitlog實現(xiàn)核心主從同步,以及其他多個元數(shù)據(jù)信息的簡單定時同步,并以兩個緩沖buffer的形式,及時將數(shù)據(jù)推送到從節(jié)點。保證了盡量好的數(shù)據(jù)一致性。

       

        最后,我們需要注意一個問題,就是主從的數(shù)據(jù)一致性到底是如何保證的?因為主的數(shù)據(jù)是直接寫入的,那么從的數(shù)據(jù)又如何保證與主的一樣,或者簡單說就是,如何保證寫入的順序呢?如果某兩條記錄插入commitlog的順序不一樣,那么最終就會亂序,結果就完不一樣了,比如進行主從切換,那么如果使用相同的偏移量進行取值,必然會得到不一樣的結果。

        實際上,從服務器僅使用一條線程進行數(shù)據(jù)同步,即拉取到的數(shù)據(jù)順序是一致的,寫入commitlog也是用同一條線程進行寫入,自然就不會存在亂序問題了。這可能也是主從同步不能使用netty這種通信框架的原因,沒必要也不能做。主從同步要求保證嚴格的順序性,而無需過多考慮并發(fā)性。就像redis的單線程,同樣撐起超高的性能。rocketmq主從同步基于原生 nio, 加上pagecache, mmap 同樣實現(xiàn)了超高的性能。也就無需單線程同步會導致很大延遲了。

       

      posted @ 2020-12-27 21:02  阿牛20  閱讀(4116)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 国产免费久久精品44| 久久精品国产99国产精品澳门| 国产精一品亚洲二区在线播放| 日韩成人福利视频在线观看| 91超碰在线精品| 亚洲av综合av一区| 国产片av在线观看国语| 成全高清在线播放电视剧| 亚洲综合色区另类av| 在线a亚洲老鸭窝天堂| av午夜久久蜜桃传媒软件| 久久夜色撩人精品国产av| 亚洲性日韩一区二区三区| 豆国产97在线 | 亚洲| 蜜桃臀av一区二区三区| 日韩精品人妻av一区二区三区| 国产v综合v亚洲欧美久久| 国产三级国产精品久久成人| 天堂中文8资源在线8| 内射毛片内射国产夫妻| 一二三三免费观看视频| 欧美成人看片黄A免费看| 99福利一区二区视频| 亚洲精品乱码久久久久久中文字幕| 午夜成人性爽爽免费视频| 亚洲精品美女久久久久99| 国产午夜福利视频在线| 在线精品视频一区二区三四| 国偷自产一区二区三区在线视频| 国精品91人妻无码一区二区三区| 亚洲精品国产一区二区在线观看| 亚洲欧美日韩在线码| 老司机午夜福利视频| 中文成人无字幕乱码精品区| 黄陵县| 中文字幕亚洲综合久久蜜桃| 在线日韩日本国产亚洲| 无码人妻av免费一区二区三区| 国产成人精品久久一区二区| 亚洲熟妇无码爱v在线观看| 国内自拍偷拍福利视频看看|