<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      深入剖析RocketMQ消息消費(fèi)原理

      本文參考轉(zhuǎn)載至《RocketMQ技術(shù)內(nèi)幕 第2版》

      一. 消息消費(fèi)概述

      消息消費(fèi)以組的模式開展,一個(gè)消費(fèi)組可以包含多個(gè)消費(fèi)者,每個(gè)消費(fèi)組可以訂閱多個(gè)主題,消費(fèi)組之間有集群模式和廣播模式兩種消費(fèi)模式。集群模式是當(dāng)前主題下的同一條消息只允許被其中一個(gè)消費(fèi)者消費(fèi)。廣播模式是當(dāng)前主題下的同一條消息將被集群內(nèi)的所有消費(fèi)者消費(fèi)一次。

      消息服務(wù)器與消費(fèi)者之間的消息傳送也有兩種方式:推模式和拉模式。所謂的拉模式,是消費(fèi)端主動發(fā)起拉取消息的請求,而推模式是消息到達(dá)消息服務(wù)器后,再推送給消息消費(fèi)者。RocketMQ消息推模式基于拉模式實(shí)現(xiàn),在拉模式上包裝一層,一個(gè)拉取任務(wù)完成后開始下一個(gè)拉取任務(wù)。

      集群模式下,多個(gè)消費(fèi)者如何對消息隊(duì)列進(jìn)行負(fù)載呢?消息隊(duì)列負(fù)載機(jī)制遵循一個(gè)通用的思想:一個(gè)消息隊(duì)列同一時(shí)間只允許被一個(gè)消費(fèi)者消費(fèi),一個(gè)消費(fèi)者可以消費(fèi)多個(gè)消息隊(duì)列。

      RocketMQ 支持局部順序消息消費(fèi),也就是保證同一個(gè)消息隊(duì)列上的消息按順序消費(fèi)。不支持消息全局順序消費(fèi),如果要實(shí)現(xiàn)某一主題的全局順序消息消費(fèi),可以將該主題的隊(duì)列數(shù)設(shè)置為1,犧牲高可用性。RocketMQ支持兩種消息過濾模式:表達(dá)式(TAG、SQL92)與類過濾模式。

      消息拉模式主要是由客戶端手動調(diào)用消息拉取API,而消息推模式是消息服務(wù)器主動將消息推送到消息消費(fèi)端,本章將以推模式為突破 口,重點(diǎn)介紹 RocketMQ 消息消費(fèi)的實(shí)現(xiàn)原理。

      1.1 消費(fèi)隊(duì)列負(fù)載機(jī)制與重平衡

      正如上文提到的,RocketMQ提供了兩種消費(fèi)模式,集群模式與廣播模式。廣播模式中所有的消費(fèi)者會消費(fèi)全部的隊(duì)列,故沒有所謂的消費(fèi)隊(duì)列負(fù)載問題,而集群模式下需要考慮同一個(gè)消費(fèi)組內(nèi)的多個(gè)消費(fèi)者之間如何分配隊(duì)列。

      RocketMQ提供了多種隊(duì)列負(fù)載算法,其中比較常用的是AVG、AVG_BY_CIRCLE這兩種平均分配算法,例如8個(gè)隊(duì)列分別為b1_q0、 b1_q1、b1_q2、b1_q3、b2_q0、b2_q1、b2_q2、b2_q3,一個(gè)消費(fèi)組有 3個(gè)消費(fèi)者,分別用C1、C2、C3表示。

      采用AVG的分配機(jī)制,各個(gè)消費(fèi)者分配到的隊(duì)列如下。

      • c1:b1_q0、b1_q1、b1_q2

      • c2:b1_q3、b2_q0、b2_q1

      • c3:b2_q2、b2_q3

      采用AVG_BY_CIRCLE的分配機(jī)制,各個(gè)消費(fèi)者分配到的隊(duì)列如下。

      • c1:b1_q0、b1_q3、b2_q2

      • c2:b1_q1、b2_q0 b2_q3

      • c3:b1_q2、b2_q1

      這兩種分配算法各有使用場景。通常要求發(fā)送方發(fā)送的消息盡量在各個(gè)隊(duì)列上分布均勻,如果分布均衡,就會使用第一種平均算法。但有些時(shí)候,一臺Broker上的消息會明顯多于第二臺,如果使用第一種分配算法,c1消費(fèi)者處理的消息就太多了,但其他消費(fèi)者又空閑, 而且還不能通過增加消費(fèi)者來改變這種情況,此種情況使用AVG_BY_CIRCLE方式更加合適。

      在消費(fèi)時(shí)間過程中可能會遇到消息消費(fèi)隊(duì)列增加或減少、消息消費(fèi)者增加或減少,比如需要對消息消費(fèi)隊(duì)列進(jìn)行重新平衡,即重新分配,這就是所謂的重平衡機(jī)制。在RocketMQ中,每隔20s會根據(jù)當(dāng)前隊(duì)列數(shù)量、消費(fèi)者數(shù)量重新進(jìn)行隊(duì)列負(fù)載計(jì)算,如果計(jì)算出來的結(jié)果與當(dāng)前不一樣,則觸發(fā)消息消費(fèi)隊(duì)列的重平衡。

      1.2 并發(fā)消費(fèi)模型

      RocketMQ 支持并發(fā)消費(fèi)與順序消費(fèi)兩種消費(fèi)方式,消息的拉取與消費(fèi)模型基本一致,只是順序消費(fèi)在某些環(huán)節(jié)為了保證順序性,需要引入鎖機(jī)制,RocketMQ的消息拉取與消費(fèi)模式如圖下圖所示:

      一個(gè)MQ客戶端(MQClientInstance)只會創(chuàng)建一個(gè)消息拉取服務(wù)線程(PullMessageService)向Broker拉取消息,但是拉取消息網(wǎng)絡(luò)IO操作是異步的,所以在拉取一個(gè)消費(fèi)隊(duì)列消息時(shí)發(fā)生長輪詢阻塞并不會影響其它消費(fèi)隊(duì)列的消息拉取。PullMessageService會不斷獲取PullRequest拉取請求,將拉取請求放入IO線程池中后會立即返回(不會等Broker響應(yīng)),然后繼續(xù)“不知疲倦”地獲取下一個(gè)PullRequest拉取請求。當(dāng)IO線程收到broker相應(yīng)后,會執(zhí)行回調(diào)方法,將拉取到的消息提交到消費(fèi)組的線程池。

      RocketMQ客戶端為每一個(gè)消費(fèi)組創(chuàng)建獨(dú)立的消費(fèi)線程池,即在并發(fā)消費(fèi)模式下,單個(gè)消費(fèi)組內(nèi)的并發(fā)度為線程池線程個(gè)數(shù)。線程池處理一批消息后會向Broker匯報(bào)消息消費(fèi)進(jìn)度。

      1.3 消息消費(fèi)進(jìn)度反饋機(jī)制

      RocketMQ客戶端消費(fèi)一批數(shù)據(jù)后,需要向Broker反饋消息的消費(fèi)進(jìn)度,Broker會記錄消息消費(fèi)進(jìn)度,這樣在客戶端重啟或隊(duì)列重平衡時(shí)會根據(jù)其消費(fèi)進(jìn)度重新向Broker拉取消息,消息消費(fèi)進(jìn)度反饋機(jī)制,如下圖所示:

      消息消費(fèi)進(jìn)度反饋機(jī)制核心要點(diǎn)如下。

      1. 消費(fèi)線程池在處理完一批消息后,會將消息消費(fèi)進(jìn)度存儲在本地內(nèi)存中。

      2. 客戶端會啟動一個(gè)定時(shí)線程,每5s將存儲在本地內(nèi)存中的所有隊(duì)列消息消費(fèi)偏移量提交到Broker中。

      3. Broker收到的消息消費(fèi)進(jìn)度會存儲在內(nèi)存中,每隔5s將消息消費(fèi)偏移量持久化到磁盤文件中。

      4. 在客戶端向Broker拉取消息時(shí)也會將該隊(duì)列的消息消費(fèi)偏移量提交到Broker。

      再來思考一個(gè)問題,假設(shè)線程池中有T1、T2、T3三個(gè)線程,此時(shí)分別依次獲取到msg1、msg2、msg3消息,消息msg3的偏移量大于msg1、msg2的偏移量,由于支持并發(fā)消費(fèi),如果線程t3先處理完msg3,而t1、t2還未處理,那么線程t3如何提交消費(fèi)偏移量呢?

      試想一下,如果提交msg3的偏移量是作為消費(fèi)進(jìn)度被提交,如果此時(shí)消費(fèi)端重啟,消息消費(fèi)msg1、msg2就不會再被消費(fèi),這樣就會造成“消息丟失”。因此t3線程并不會提交msg3的偏移量,而是提交線程池中偏移量最小的消息的偏移量,即t3線程在消費(fèi)完msg3后,提交的消息消費(fèi)進(jìn)度依然是msg1的偏移量,這樣能避免消息丟失,但同樣有消息重復(fù)消費(fèi)的風(fēng)險(xiǎn)。

      二. 消息消費(fèi)者初探

      下面我們介紹推模式消費(fèi)者 MQPushConsumer 的主要屬性:

      public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
      
          private final InternalLogger log = ClientLogger.getLog();
      
          /**
           * Internal implementation. Most of the functions herein are delegated to it.
           */
          protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
      
          /**
           * 消費(fèi)者所屬組
           */
          private String consumerGroup;
      
          /**
           * 消息消費(fèi)模式,分為集群模式、廣播模式,默認(rèn)為集群模式
           */
          private MessageModel messageModel = MessageModel.CLUSTERING;
      
          /**
           * 第一次消費(fèi)時(shí)指定消費(fèi)策略。
           * CONSUME_FROM_LAST_OFFSET:此處分為兩種情況,如果磁盤消息未過期且未被刪除,則從最小偏移量開始消費(fèi)。如果磁盤已過期
           * 并被刪除,則從最大偏移量開始消費(fèi)。
           * CONSUME_FROM_FIRST_OFFSET:從隊(duì)列當(dāng)前最小偏移量開始消費(fèi)。
           * CONSUME_FROM_TIMESTAMP:從消費(fèi)者指定時(shí)間戳開始消費(fèi)。
           *
           * 注意:如果從消息進(jìn)度服務(wù)OffsetStore讀取到MessageQueue中的
           * 偏移量不小于0,則使用讀取到的偏移量拉取消息,只有在讀到的偏移
           * 量小于0時(shí),上述策略才會生效
           */
          private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
      
          /**
           * 集群模式下消息隊(duì)列的負(fù)載策略
           */
          private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
      
          /**
           * 訂閱信息
           */
          private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
      
          /**
           * 消息業(yè)務(wù)監(jiān)聽器
           */
          private MessageListener messageListener;
      
          /**
           * 消息消費(fèi)進(jìn)度存儲器
           */
          private OffsetStore offsetStore;
      
          /**
           * 消費(fèi)者最小線程數(shù)
           */
          private int consumeThreadMin = 20;
      
          /**
           * 消費(fèi)者最大線程數(shù),因?yàn)橄M(fèi)者線程池使用無界隊(duì)列,所以此參數(shù)不生效
           */
          private int consumeThreadMax = 20;
      
          /**
           * Threshold for dynamic adjustment of the number of thread pool
           */
          private long adjustThreadPoolNumsThreshold = 100000;
      
          /**
           * 并發(fā)消息消費(fèi)時(shí)處理隊(duì)列最大跨度,默認(rèn)2000,表示如果消息處理隊(duì)列中偏移量最大的消息
           * 與偏移量最小的消息的跨度超過2000,則延遲50ms后再拉取消息。
           */
          private int consumeConcurrentlyMaxSpan = 2000;
      
          /**
           * 隊(duì)列級別的流量控制閾值,默認(rèn)情況下每個(gè)消息隊(duì)列最多緩存1000條消息;
           */
          private int pullThresholdForQueue = 1000;
      
          /**
           * 在隊(duì)列級別限制緩存的消息大小,默認(rèn)情況下每個(gè)消息隊(duì)列最多緩存100 MiB消息。
           * 考慮{@code pullBatchSize},瞬時(shí)值可能超過限制消息的大小僅由消息體來衡量,因此不準(zhǔn)確
           */
          private int pullThresholdSizeForQueue = 100;
      
          /**
           * 推模式下拉取任務(wù)的間隔時(shí)間,默認(rèn)一次拉取任務(wù)完成后繼續(xù)拉取
           */
          private long pullInterval = 0;
      
          /**
           * 消息并發(fā)消費(fèi)時(shí)一次消費(fèi)消息的條數(shù),通俗點(diǎn)說,就是每次傳入MessageListener#consumeMessage中的消息條數(shù)
           */
          private int consumeMessageBatchMaxSize = 1;
      
          /**
           * 每次消息拉取的條數(shù),默認(rèn)32條
           */
          private int pullBatchSize = 32;
      
          /**
           * 是否每次拉取消息都更新訂閱信息,默認(rèn)為false
           */
          private boolean postSubscriptionWhenPull = false;
          /**
           * 最大消費(fèi)重試次數(shù)。如果消息消費(fèi)次數(shù)超過maxReconsume Times還未成功,則將該消息轉(zhuǎn)移到一個(gè)失敗隊(duì)列,等待被刪除
           */
          private int maxReconsumeTimes = -1;
      
          /**
           * 延遲將該隊(duì)列的消息提交到消費(fèi)者線程的等待時(shí)間,默認(rèn)延遲1s。
           */
          private long suspendCurrentQueueTimeMillis = 1000;
      
          /**
           * 消息消費(fèi)超時(shí)時(shí)間,默認(rèn)為15,單位為分鐘
           */
          private long consumeTimeout = 15;
      
      }
      

      三. 消費(fèi)者啟動流程

      本節(jié)介紹消息消費(fèi)者是如何啟動的,請跟我一起來分析 DefaultMQPushConsumerImplstart() 方法:

      	// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#copySubscription
          private void copySubscription() throws MQClientException {
              try {
                  Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
                  if (sub != null) {
                      for (final Map.Entry<String, String> entry : sub.entrySet()) {
                          final String topic = entry.getKey();
                          final String subString = entry.getValue();
                          SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                              topic, subString);
                          // 構(gòu)建主題訂閱信息SubscriptionData并加入RebalanceImpl的訂閱消息中
                          this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                      }
                  }
      
                  if (null == this.messageListenerInner) {
                      this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
                  }
      
                  // 如果是集群消費(fèi)模式下,還需要將重試Topic的消息放入RebalanceImpl的訂閱消息中
                  switch (this.defaultMQPushConsumer.getMessageModel()) {
                      case BROADCASTING:
                          break;
                      case CLUSTERING:
                          // RocketMQ消息重試是以消費(fèi)組為單位,而不是主題,消息重試主題名為%RETRY%+消費(fèi)組名
                          final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                          SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                              retryTopic, SubscriptionData.SUB_ALL);
                          this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                          break;
                      default:
                          break;
                  }
              } catch (Exception e) {
                  throw new MQClientException("subscription exception", e);
              }
          }
      

      第一步:構(gòu)建主題訂閱信息 SubscriptionData 并加入RebalanceImpl的訂閱消息中。訂閱關(guān)系來源主要有兩個(gè)。

      1. 通過調(diào)用DefaultMQPushConsumerImpl#subscribe(String topic, String subExpression)方法獲取。
      2. 訂閱重試主題消息。RocketMQ消息重試是以消費(fèi)組為單位,而不是主題,消息重試主題名為%RETRY%+消費(fèi)組名。消費(fèi)者在啟動時(shí)會自動訂閱該主題,參與該主題的消息隊(duì)列負(fù)載。
      //org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
      
                      if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                          this.defaultMQPushConsumer.changeInstanceNameToPID();
                      }
      
                      // 創(chuàng)建MQClientInstance實(shí)例。單例模式:同一個(gè)clientId只會創(chuàng)建一個(gè)MQClientInstance實(shí)例
                      this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
      
                      this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                      this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                      this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                      this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
      
                      this.pullAPIWrapper = new PullAPIWrapper(
                          mQClientFactory,
                          this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                      this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
      

      第二步:初始化MQClientInstance、RebalanceImple(消息重新負(fù)載實(shí)現(xiàn)類)等。

      //org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start 
      				// 初始化消息進(jìn)度
                      if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                          this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                      } else {
                          // 初始化消息進(jìn)度。如果消息消費(fèi)采用集群模式,那么消
                          // 息進(jìn)度存儲在Broker上,如果采用廣播模式,那么消息消費(fèi)進(jìn)度存儲
                          // 在消費(fèi)端
                          switch (this.defaultMQPushConsumer.getMessageModel()) {
                              case BROADCASTING:
                                  this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                                  break;
                              case CLUSTERING:
                                  this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                                  break;
                              default:
                                  break;
                          }
                          this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                      }
                      this.offsetStore.load();
      

      第三步:始化消息進(jìn)度。如果消息消費(fèi)采用集群模式,那么消息進(jìn)度存儲在Broker上,如果采用廣播模式,那么消息消費(fèi)進(jìn)度存儲在消費(fèi)端。具體實(shí)現(xiàn)細(xì)節(jié)后面將重點(diǎn)探討。

      //org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start 
                      // 如果是順序消費(fèi),創(chuàng)建消費(fèi)端消費(fèi)線程服務(wù)。ConsumeMessageService主要負(fù)責(zé)消息消費(fèi),在內(nèi)部維護(hù)一個(gè)線程池
                      if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                          // 如果是順序消費(fèi),將 consumeOrderly 置為 true,這樣在RebalanceService負(fù)載隊(duì)列時(shí)對隊(duì)列加鎖,實(shí)現(xiàn)消費(fèi)端順序消費(fèi),防止多個(gè)客戶端同時(shí)消費(fèi)同一個(gè)隊(duì)列
                          this.consumeOrderly = true;
                          this.consumeMessageService =
                              new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                      } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                          // 如果是并發(fā)消費(fèi),則設(shè)置 consumeOrderly 為 false,消費(fèi)時(shí)不會對隊(duì)列進(jìn)行加鎖
                          this.consumeOrderly = false;
                          this.consumeMessageService =
                              new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                      }
      
                      this.consumeMessageService.start();
      

      第四步:如果是順序消費(fèi),創(chuàng)建消費(fèi)端消費(fèi)線程服務(wù)。ConsumeMessageService主要負(fù)責(zé)消息消費(fèi),在內(nèi)部維護(hù)一個(gè)線程池。

      //org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start 
                      // 向MQClientInstance注冊消費(fèi)者
                      boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                      if (!registerOK) {
                          this.serviceState = ServiceState.CREATE_JUST;
                          this.consumeMessageService.shutdown();
                          throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                              + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                              null);
                      }
                      // 啟動MQClientInstance,JVM中的所有消費(fèi)者、生產(chǎn)者持有同一個(gè)MQClientInstance,MQClientInstance只會啟動一次
                      mQClientFactory.start();
                      log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                      this.serviceState = ServiceState.RUNNING;
      

      第五步:向MQClientInstance注冊消費(fèi)者并啟動MQClientInstance,JVM中的所有消費(fèi)者、生產(chǎn)者持有同一個(gè)MQClientInstance,MQClientInstance只會啟動一次。

      四. 消息拉取

      本節(jié)將基于推模式詳細(xì)分析消息拉取機(jī)制。消息消費(fèi)有兩種模式:廣播模式與集群模式,廣播模式比較簡單,每一個(gè)消費(fèi)者需要拉取訂閱主題下所有消費(fèi)隊(duì)列的消息。本節(jié)主要基于集群模式進(jìn)行介紹。在集群模式下,同一個(gè)消費(fèi)組內(nèi)有多個(gè)消息消費(fèi)者,同一個(gè)主題 存在多個(gè)消費(fèi)隊(duì)列,那么消費(fèi)者如何進(jìn)行消息隊(duì)列負(fù)載呢?從第3節(jié)介紹的啟動流程可知,每一個(gè)消費(fèi)組內(nèi)維護(hù)一個(gè)線程池來消費(fèi)消息,那么這些線程又是如何分工合作的呢?

      消息隊(duì)列負(fù)載通常的做法是一個(gè)消息隊(duì)列在同一時(shí)間只允許被一個(gè)消息消費(fèi)者消費(fèi),一個(gè)消息消費(fèi)者可以同時(shí)消費(fèi)多個(gè)消息隊(duì)列,那么RocketMQ是如何實(shí)現(xiàn)消息隊(duì)列負(fù)載的呢?帶著上述問題,我們開始RocketMQ消息消費(fèi)機(jī)制的探討。

      從MQClientInstance的啟動流程中可以看出,RocketMQ使用一個(gè)單獨(dú)的線程PullMessageService執(zhí)行消息的拉取。

      4.1 PullMessageService實(shí)現(xiàn)機(jī)制

      PullMessageService繼承的是ServiceThread,從名稱來看,它是服務(wù)線程,通過run()方法啟動:

      //org.apache.rocketmq.client.impl.consumer.PullMessageService#run
          @Override
          public void run() {
              log.info(this.getServiceName() + " service started");
      
              //  while (!this.isStopped()) 是一種通用的設(shè)計(jì)技巧,Stopped
              // 聲明為volatile,每執(zhí)行一次業(yè)務(wù)邏輯,檢測一下其運(yùn)行狀態(tài),可以
              // 通過其他線程將Stopped設(shè)置為true,從而停止該線程
              while (!this.isStopped()) {
                  try {
                      // 從pullRequestQueue中獲取一個(gè)PullRequest消息拉取任務(wù),
                      // 如果pullRequestQueue為空,則線程將阻塞,直到有拉取任務(wù)被放入
                      PullRequest pullRequest = this.pullRequestQueue.take();
                      this.pullMessage(pullRequest);
                  } catch (InterruptedException ignored) {
                  } catch (Exception e) {
                      log.error("Pull Message Service Run Method exception", e);
                  }
              }
      
              log.info(this.getServiceName() + " service end");
          }
      

      PullMessageService消息拉取服務(wù)線程,run()方法是其核心邏輯,如代碼清單5-7所示。run()方法的核心要點(diǎn)如下。

      1. while(!this.isStopped())是一種通用的設(shè)計(jì)技巧,Stopped 聲明為volatile,每執(zhí)行一次業(yè)務(wù)邏輯,檢測一下其運(yùn)行狀態(tài),可以通過其他線程將Stopped設(shè)置為true,從而停止該線程。
      2. 從pullRequestQueue中獲取一個(gè)PullRequest消息拉取任務(wù), 如果pullRequestQueue為空,則線程將阻塞,直到有拉取任務(wù)被放入。
      3. 調(diào)用pullMessage方法進(jìn)行消息拉取。思考一下,PullRequest 是什么時(shí)候添加的呢?
      // org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestLater
          public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
              if (!isStopped()) {
                  this.scheduledExecutorService.schedule(new Runnable() {
                      @Override
                      public void run() {
                          PullMessageService.this.executePullRequestImmediately(pullRequest);
                      }
                  }, timeDelay, TimeUnit.MILLISECONDS);
              } else {
                  log.warn("PullMessageServiceScheduledThread has shutdown");
              }
          }
      

      原來,PullMessageService提供了延遲添加與立即添加兩種方式將PullRequest放入pullRequestQueue。那么PullRequest是在什么時(shí)候創(chuàng)建的呢?executePullRequestImmediately方法調(diào)用鏈:

      通過跟蹤發(fā)現(xiàn),主要有兩個(gè)地方會調(diào)用executePullRequestImmediately:一個(gè)是在RocketMQ根據(jù)PullRequest拉取任務(wù)執(zhí)行完一次消息拉取任務(wù)后,又將PullRequest 對象放入pullRequestQueue;另一個(gè)是在RebalanceImpl中創(chuàng)建的。RebalanceImpl是后文要重點(diǎn)介紹的消息隊(duì)列負(fù)載機(jī)制,也就是PullRequest對象真正創(chuàng)建的地方。

      從上面的分析可知,PullMessageService只有在得到 PullRequest 對象時(shí)才會執(zhí)行拉取任務(wù),那么PullRequest究竟是什么呢?

      public class PullRequest {
      
          /**
           * 消費(fèi)者組
           */
          private String consumerGroup;
          /**
           * 待拉取消費(fèi)隊(duì)列
           */
          private MessageQueue messageQueue;
          /**
           * 消息處理隊(duì)列,從Broker中拉取到的消息會先存入ProccessQueue,然后再提交到消費(fèi)者消費(fèi)線程池進(jìn)行消費(fèi)
           */
          private ProcessQueue processQueue;
          /**
           * 待拉取的MessageQueue偏移量
           */
          private long nextOffset;
          /**
           * 是否被鎖定
           */
          private boolean lockedFirst = false;
      }
      

      PullMessageService 會不斷拉取隊(duì)列中的PullRequest請求,去Broker中拉取消息:

      // org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessage
          /**
           * 根據(jù)消費(fèi)組名從MQClientInstance中獲取消費(fèi)者的內(nèi)部實(shí)現(xiàn)類
           * MQConsumerInner,令人意外的是,這里將consumer強(qiáng)制轉(zhuǎn)換為
           * DefaultMQPushConsumerImpl,也就是PullMessageService,該線程只
           * 為推模式服務(wù),那拉模式如何拉取消息呢?其實(shí)細(xì)想也不難理解,對
           * 于拉模式,RocketMQ只需要提供拉取消息API,再由應(yīng)用程序調(diào)用API
           *
           * @param pullRequest
           */
          private void pullMessage(final PullRequest pullRequest) {
              final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
              if (consumer != null) {
                  DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
                  impl.pullMessage(pullRequest);
              } else {
                  log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
              }
          }
      

      根據(jù)消費(fèi)組名從MQClientInstance中獲取消費(fèi)者的內(nèi)部實(shí)現(xiàn)類MQConsumerInner,令人意外的是,這里將consumer強(qiáng)制轉(zhuǎn)換為DefaultMQPushConsumerImpl,也就是PullMessageService,該線程只為推模式服務(wù),那拉模式如何拉取消息呢?其實(shí)細(xì)想也不難理解,對 于拉模式,RocketMQ只需要提供拉取消息API,再由應(yīng)用程序調(diào)用 API。

      4.2 ProcessQueue實(shí)現(xiàn)機(jī)制

      ProcessQueue 是 MessageQueue 在消費(fèi)端的重現(xiàn)、快照。PullMessageService從消息服務(wù)器默認(rèn)每次拉取32條消息,按消息隊(duì)列偏移量的順序存放在ProcessQueue中,PullMessageService將消息提交到消費(fèi)者消費(fèi)線程池,消息成功消費(fèi)后,再從ProcessQueue中移除。

      // org.apache.rocketmq.client.impl.consumer.ProcessQueue
      /**
       * ProcessQueue是MessageQueue在消費(fèi)端的重現(xiàn)、快照。
       * PullMessageService從消息服務(wù)器默認(rèn)每次拉取32條消息,按消息隊(duì)
       * 列偏移量的順序存放在ProcessQueue中,PullMessageService將消息
       * 提交到消費(fèi)者消費(fèi)線程池,消息成功消費(fèi)后,再從ProcessQueue中移
       * 除。
       * Queue consumption snapshot
       */
      public class ProcessQueue {
          /**
           * 讀寫鎖,控制多線程并發(fā)修改msgTreeMap
           */
          private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
          /**
           * 消息存儲容器,鍵為消息在ConsumeQueue中的偏移量
           */
          private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
          /**
           * ProcessQueue中總消息數(shù)
           */
          private final AtomicLong msgCount = new AtomicLong();
          private final AtomicLong msgSize = new AtomicLong();
          private final Lock lockConsume = new ReentrantLock();
          /**
           * A subset of msgTreeMap, will only be used when orderly consume
           *
           * 用于存儲消息消費(fèi)隊(duì)列中正在被順序消費(fèi)的消息。其鍵值對的關(guān)系為 Offset -> Message Queue,也就是按照消息在 Broker 中存儲的物理偏移量進(jìn)行排序。
           */
          private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();
          private final AtomicLong tryUnlockTimes = new AtomicLong(0);
          /**
           * 當(dāng)前ProcessQueue中包含的最大隊(duì)列偏移量
           */
          private volatile long queueOffsetMax = 0L;
          /**
           * 當(dāng)前ProccesQueue是否被丟棄
           */
          private volatile boolean dropped = false;
          /**
           * 上一次開始拉取消息的時(shí)間戳
           */
          private volatile long lastPullTimestamp = System.currentTimeMillis();
          /**
           * 上一次消費(fèi)消息的時(shí)間戳
           */
          private volatile long lastConsumeTimestamp = System.currentTimeMillis();
      }
      

      4.3 消息拉取的基本流程

      本節(jié)將以并發(fā)消息消費(fèi)來探討整個(gè)消息消費(fèi)的流程,消息拉取分為3個(gè)主要步驟:

      1. 封裝拉取請求,并請求broker拉取消息。
      2. broker查找消息并返回。
      3. 消息拉取客戶端處理返回的消息。

      4.3.1 客戶端封裝消息拉取請求

      消息拉取入口為DefaultMQPushConsumerImpl#pullMessage:

      第一步:從PullRequest中獲取ProcessQueue,如果處理隊(duì)列當(dāng)前狀態(tài)未被丟棄,則更新ProcessQueue的lastPullTimestamp為當(dāng)前時(shí)間戳。如果當(dāng)前消費(fèi)者被掛起,則將拉取任務(wù)延遲1s再放入PullMessageService的拉取任務(wù)隊(duì)列中,最后結(jié)束本次消息拉取。

      // org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
              final ProcessQueue processQueue = pullRequest.getProcessQueue();
              if (processQueue.isDropped()) {
                  log.info("the pull request[{}] is dropped.", pullRequest.toString());
                  return;
              }
      
              pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
      
              try {
                  this.makeSureStateOK();
              } catch (MQClientException e) {
                  log.warn("pullMessage exception, consumer state not ok", e);
                  this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
                  return;
              }
      
              if (this.isPause()) {
                  log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
                  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
                  return;
              }
      

      第二步:進(jìn)行消息拉取流控。從消息消費(fèi)數(shù)量與消費(fèi)間隔兩個(gè)維度進(jìn)行控制。

      1. 消息處理總數(shù),如果ProcessQueue當(dāng)前處理的消息條數(shù)超過了pullThresholdForQueue=1000,將觸發(fā)流控,放棄本次拉取任務(wù),并且該隊(duì)列的下一次拉取任務(wù)將在50ms后才加入拉取任務(wù)隊(duì)列。每觸發(fā)1000次流控后輸出提示語:“the consumer message buffer is full, so do flow control, minOffset={隊(duì)列最小偏移量}, maxOffset={隊(duì)列最大偏移量}, size={消息總條數(shù)}, pullRequest={拉取任務(wù)}, flowControlTimes={流控觸發(fā)次數(shù)}”。
      2. ProcessQueue 中隊(duì)列最大偏移量與最小偏離量的間距不能超過 consumeConcurrentlyMaxSpan,否則觸發(fā)流控。每觸發(fā)1000次流控后輸出提示語:“the queue's messages, span too long, so do flow control, minOffset={隊(duì)列最小偏移量}, maxOffset={隊(duì)列最大偏移量}, maxSpan={間隔}, pullRequest={拉取任務(wù)信息}, flowControlTimes={流控觸發(fā)次數(shù)}”。這里主要的考量是擔(dān)心因?yàn)橐粭l消息堵塞,使消息進(jìn)度無法向前推進(jìn),可能會造成大量消息重復(fù)消費(fèi)。
      // org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
              long cachedMessageCount = processQueue.getMsgCount().get();
              long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
      
              // 已緩存消息總數(shù)維度的流量控制
              if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
                  // 如果ProcessQueue當(dāng)前處理的消息條數(shù)超過了pullThresholdForQueue=1000,將觸發(fā)流控,放棄本次拉取任務(wù),并
                  // 且該隊(duì)列的下一次拉取任務(wù)將在50ms后才加入拉取任務(wù)隊(duì)列。
                  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                  if ((queueFlowControlTimes++ % 1000) == 0) {
                      log.warn(
                          "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                          this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
                  }
                  return;
              }
      
              // 緩存消息大小維度的流量控制
              if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
                  // 緩存消息的大小不能超過pullThresholdSizeForQueue,否則觸發(fā)流控
                  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                  if ((queueFlowControlTimes++ % 1000) == 0) {
                      log.warn(
                          "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                          this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
                  }
                  return;
              }
      

      第三步:拉取該主題的訂閱信息,如果為空則結(jié)束本次消息拉取,關(guān)于該隊(duì)列的下一次拉取任務(wù)將延遲3s執(zhí)行。

      // org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
              // 獲取主題的訂閱信息
              final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
              if (null == subscriptionData) {
                  // 拉取該主題的訂閱信息,如果為空則結(jié)束本次消息拉取,關(guān)于該隊(duì)列的下一次拉取任務(wù)將延遲3s執(zhí)行
                  this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
                  log.warn("find the consumer's subscription failed, {}", pullRequest);
                  return;
              }
      

      第四步:構(gòu)建消息拉取系統(tǒng)標(biāo)記。

      // org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
              // 是否更新已消費(fèi)物理偏移量
              boolean commitOffsetEnable = false;
              // 已消費(fèi)偏移量
              long commitOffsetValue = 0L;
              if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
                  // 如果是集群消費(fèi),更新已消費(fèi)偏移量
                  commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
                  if (commitOffsetValue > 0) {
                      commitOffsetEnable = true;
                  }
              }
      
              String subExpression = null;
              boolean classFilter = false;
              SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
              if (sd != null) {
                  if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
                      subExpression = sd.getSubString();
                  }
      
                  classFilter = sd.isClassFilterMode();
              }
      
              int sysFlag = PullSysFlag.buildSysFlag(
                  commitOffsetEnable, // commitOffset
                  // 是否開啟長輪詢
                  true, // suspend
                  subExpression != null, // subscription
                  classFilter // class filter
              );
      

      如果當(dāng)前 ConsumerGroup 是集群消費(fèi),會將本地的消費(fèi)進(jìn)度在拉取請求中上報(bào)給Broker。

      下面逐一介紹PullSysFlag的枚舉值含義:

      // org.apache.rocketmq.common.sysflag.PullSysFlag
      public class PullSysFlag {
      
          /**
           * 是否更新已消費(fèi)偏移量
           */
          private final static int FLAG_COMMIT_OFFSET = 0x1;
          /**
           * 表示消息拉取時(shí)是否支持掛起(長輪詢)
           */
          private final static int FLAG_SUSPEND = 0x1 << 1;
          /**
           * 消息過濾機(jī)制為表達(dá)式,則設(shè)置該標(biāo)記位
           */
          private final static int FLAG_SUBSCRIPTION = 0x1 << 2;
          /**
           * 消息過濾機(jī)制為類模式,則設(shè)置該標(biāo)記
           */
          private final static int FLAG_CLASS_FILTER = 0x1 << 3;
      }
      

      第五步:調(diào)用PullAPIWrapper.pullKernelImpl方法后與服務(wù)端交互。

      // org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
              try {
                  // 通過遠(yuǎn)程調(diào)用,從broker中拉取消息。拉取成功后,調(diào)用 pullCallback.onSuccess 方法
                  this.pullAPIWrapper.pullKernelImpl(
                      // 需要拉取的消息隊(duì)列信息
                      pullRequest.getMessageQueue(),
                      // 消息過濾表達(dá)式
                      subExpression,
                      subscriptionData.getExpressionType(),
                      subscriptionData.getSubVersion(),
                      // 拉取的物理偏移量
                      pullRequest.getNextOffset(),
                      // 拉取的消息數(shù)量
                      this.defaultMQPushConsumer.getPullBatchSize(),
                      sysFlag,
                      // 已消費(fèi)偏移量
                      commitOffsetValue,
                      BROKER_SUSPEND_MAX_TIME_MILLIS,
                      CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                      CommunicationMode.ASYNC,
                      pullCallback
                  );
              } catch (Exception e) {
                  log.error("pullKernelImpl exception", e);
                  this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
              }
      

      pullKernelImpl 方法的參數(shù):

      1. MessageQueue mq:從哪個(gè)消息消費(fèi)隊(duì)列拉取消息。
      2. String subExpression:消息過濾表達(dá)式。
      3. String expressionType:消息表達(dá)式類型,分為TAG、SQL92。
      4. long offset:消息拉取偏移量。
      5. int maxNums:本次拉取最大消息條數(shù),默認(rèn)32條。
      6. int sysFlag:拉取系統(tǒng)標(biāo)記。
      7. long commitOffset:當(dāng)前MessageQueue的消費(fèi)進(jìn)度(內(nèi)存中)。
      8. long brokerSuspendMaxTimeMillis:消息拉取過程中允許Broker掛起的時(shí)間,默認(rèn)15s。
      9. long timeoutMillis:消息拉取超時(shí)時(shí)間。
      10. CommunicationMode communicationMode:消息拉取模式,默認(rèn)為異步拉取。
      11. PullCallback pullCallback:從Broker拉取到消息后的回調(diào)方法。

      第六步:根據(jù)brokerName、BrokerId從MQClientInstance中獲取Broker地址,在整個(gè)RocketMQ Broker的部署結(jié)構(gòu)中,相同名稱的Broker構(gòu)成主從結(jié)構(gòu),其BrokerId會不一樣,在每次拉取消息后,會給出一個(gè)建議,下次是從主節(jié)點(diǎn)還是從節(jié)點(diǎn)拉?。?/p>

      // org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl
                  // 根據(jù)brokerName、BrokerId從MQClientInstance中獲取
                  //Broker地址,在整個(gè)RocketMQ Broker的部署結(jié)構(gòu)中,相同名稱的
                  //Broker構(gòu)成主從結(jié)構(gòu),其BrokerId會不一樣,在每次拉取消息后,會
                  //給出一個(gè)建議,下次是從主節(jié)點(diǎn)還是從節(jié)點(diǎn)拉取
                  this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                      this.recalculatePullFromWhichNode(mq), false);
              if (null == findBrokerResult) {
                  this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
                  findBrokerResult =
                      this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                          this.recalculatePullFromWhichNode(mq), false);
              }
      

      第七步:如果消息過濾模式為類過濾,則需要根據(jù)主題名稱、broker地址找到注冊在Broker上的FilterServer地址,從FilterServer上拉取消息,否則從Broker上拉取消息。上述步驟完成后,RocketMQ通過MQClientAPIImpl#pullMessageAsync方法異步向Broker拉取消息。

      // org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl
              if (findBrokerResult != null) {
                  {
                      // check version
                      if (!ExpressionType.isTagType(expressionType)
                          && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                          throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
                              + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
                      }
                  }
                  int sysFlagInner = sysFlag;
      
                  if (findBrokerResult.isSlave()) {
                      sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
                  }
      
                  PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
                  // 消費(fèi)組名稱
                  requestHeader.setConsumerGroup(this.consumerGroup);
                  // topic名稱
                  requestHeader.setTopic(mq.getTopic());
                  // 隊(duì)列ID
                  requestHeader.setQueueId(mq.getQueueId());
                  // 隊(duì)列的偏移量
                  requestHeader.setQueueOffset(offset);
                  // 拉取的消息數(shù)量
                  requestHeader.setMaxMsgNums(maxNums);
                  // 消息拉取的標(biāo)識位,參考:org.apache.rocketmq.common.sysflag.PullSysFlag
                  requestHeader.setSysFlag(sysFlagInner);
                  // 已經(jīng)消費(fèi)完成的消息偏移量
                  requestHeader.setCommitOffset(commitOffset);
                  requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
                  requestHeader.setSubscription(subExpression);
                  requestHeader.setSubVersion(subVersion);
                  requestHeader.setExpressionType(expressionType);
      
                  String brokerAddr = findBrokerResult.getBrokerAddr();
                  if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
                      brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
                  }
      
                  PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                      brokerAddr,
                      requestHeader,
                      timeoutMillis,
                      communicationMode,
                      pullCallback);
      
                  return pullResult;
              }
      

      4.3.2 Broker組裝消息

      根據(jù)消息拉取命令 RequestCode.PULL_MESSAGE,很容易找到 Brokder 端處理消息拉取的入口: org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest

      第一步:根據(jù)訂閱信息構(gòu)建消息過濾器

      // org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
              // 構(gòu)建消息過濾器
              MessageFilter messageFilter;
              if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
                  //表示支持對重試topic的屬性進(jìn)行過濾
                  messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
                      this.brokerController.getConsumerFilterManager());
              } else {
                  // 表示不支持對重試topic的屬性進(jìn)行過濾
                  messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
                      this.brokerController.getConsumerFilterManager());
              }
      

      第二步:調(diào)用MessageStore.getMessage查找消息

      // org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
              // 讀取broker中存儲的消息
              final GetMessageResult getMessageResult =
                  this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                      requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
      

      該方法參數(shù)含義如下:

      1. String group:消費(fèi)組名稱。
      2. String topic:主題名稱。
      3. int queueId:隊(duì)列ID。
      4. long offset:待拉取偏移量。
      5. int maxMsgNums:最大拉取消息條數(shù)。
      6. MessageFilter messageFilter:消息過濾器。

      第三步:根據(jù)主題名稱與隊(duì)列編號獲取消息消費(fèi)隊(duì)列

      //org.apache.rocketmq.store.DefaultMessageStore#getMessage
              long beginTime = this.getSystemClock().now();
      
              GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
              // 待查找隊(duì)列的偏移量
              long nextBeginOffset = offset;
              // 當(dāng)前消息隊(duì)列的最小偏移量
              long minOffset = 0;
              // 當(dāng)前消息隊(duì)列的最大偏移量
              long maxOffset = 0;
      
      1. nextBeginOffset:待查找隊(duì)列的偏移量。
      2. minOffset:當(dāng)前消息隊(duì)列的最小偏移量。
      3. maxOffset:當(dāng)前消息隊(duì)列的最大偏移量。
      4. maxOffsetPy:當(dāng)前CommitLog文件的最大偏移量。

      第四步:消息偏移量異常情況校對下一次拉取偏移量:

      //org.apache.rocketmq.store.DefaultMessageStore#getMessage
          GetMessageResult getResult = new GetMessageResult();
      
              // 當(dāng)前CommitLog文件的最大偏移量
              final long maxOffsetPy = this.commitLog.getMaxOffset();
      
              ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
              if (consumeQueue != null) {
                  minOffset = consumeQueue.getMinOffsetInQueue();
                  maxOffset = consumeQueue.getMaxOffsetInQueue();
      
                  if (maxOffset == 0) {
                      //表示當(dāng)前消費(fèi)隊(duì)列中沒有消息,拉取結(jié)果為
                      //NO_MESSAGE_IN_QUEUE。如果當(dāng)前Broker為主節(jié)點(diǎn),下次拉取偏移量為
                      //0。如果當(dāng)前Broker為從節(jié)點(diǎn)并且offsetCheckInSlave為true,設(shè)置下
                      //次拉取偏移量為0。其他情況下次拉取時(shí)使用原偏移量
                      status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
                      nextBeginOffset = nextOffsetCorrection(offset, 0);
                  } else if (offset < minOffset) {
                      //表示待拉取消息偏移量小于隊(duì)列的起始偏
                      //移量,拉取結(jié)果為OFFSET_TOO_SMALL。如果當(dāng)前Broker為主節(jié)點(diǎn),下
                      //次拉取偏移量為隊(duì)列的最小偏移量。如果當(dāng)前Broker為從節(jié)點(diǎn)并且
                      //offsetCheckInSlave為true,下次拉取偏移量為隊(duì)列的最小偏移量。
                      //其他情況下次拉取時(shí)使用原偏移量。
                      status = GetMessageStatus.OFFSET_TOO_SMALL;
                      nextBeginOffset = nextOffsetCorrection(offset, minOffset);
                  } else if (offset == maxOffset) {
                      // 如果待拉取偏移量等于隊(duì)列最大偏移
                      //量,拉取結(jié)果為OFFSET_OVERFLOW_ONE,則下次拉取偏移量依然為
                      //offset。
                      status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
                      nextBeginOffset = nextOffsetCorrection(offset, offset);
                  } else if (offset > maxOffset) {
                      // 表示偏移量越界,拉取結(jié)果為
                      //OFFSET_OVERFLOW_BADLY。此時(shí)需要考慮當(dāng)前隊(duì)列的偏移量是否為0,
                      //如果當(dāng)前隊(duì)列的最小偏移量為0,則使用最小偏移量糾正下次拉取偏移
                      //量。如果當(dāng)前隊(duì)列的最小偏移量不為0,則使用該隊(duì)列的最大偏移量來
                      //糾正下次拉取偏移量
                      status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
                      if (0 == minOffset) {
                          nextBeginOffset = nextOffsetCorrection(offset, minOffset);
                      } else {
                          nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
                      }
                  }
      
      1. maxOffset=0:表示當(dāng)前消費(fèi)隊(duì)列中沒有消息,拉取結(jié)果為NO_MESSAGE_IN_QUEUE。如果當(dāng)前Broker為主節(jié)點(diǎn),下次拉取偏移量為 0。如果當(dāng)前Broker為從節(jié)點(diǎn)并且offsetCheckInSlave為true,設(shè)置下次拉取偏移量為0。其他情況下次拉取時(shí)使用原偏移量。
      2. offset<minOffset:表示待拉取消息偏移量小于隊(duì)列的起始偏移量,拉取結(jié)果為OFFSET_TOO_SMALL。如果當(dāng)前Broker為主節(jié)點(diǎn),下 次拉取偏移量為隊(duì)列的最小偏移量。如果當(dāng)前Broker為從節(jié)點(diǎn)并且offsetCheckInSlave為true,下次拉取偏移量為隊(duì)列的最小偏移量。 其他情況下次拉取時(shí)使用原偏移量。
      3. offset==maxOffset:如果待拉取偏移量等于隊(duì)列最大偏移量,拉取結(jié)果為OFFSET_OVERFLOW_ONE,則下次拉取偏移量依然為offset。
      4. offset>maxOffset:表示偏移量越界,拉取結(jié)果為OFFSET_OVERFLOW_BADLY。此時(shí)需要考慮當(dāng)前隊(duì)列的偏移量是否為0, 如果當(dāng)前隊(duì)列的最小偏移量為0,則使用最小偏移量糾正下次拉取偏移量。如果當(dāng)前隊(duì)列的最小偏移量不為0,則使用該隊(duì)列的最大偏移量來糾正下次拉取偏移量。糾正邏輯與1)、2)相同。

      第五步:如果待拉取偏移量大于minOffset并且小于maxOffset,從當(dāng)前offset處嘗試?yán)?2條消息。

      //org.apache.rocketmq.store.DefaultMessageStore#getMessage
                      SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
                      if (bufferConsumeQueue != null) {
                          try {
                              status = GetMessageStatus.NO_MATCHED_MESSAGE;
      
                              long nextPhyFileStartOffset = Long.MIN_VALUE;
                              long maxPhyOffsetPulling = 0;
      
                              int i = 0;
                              final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
                              final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
                              ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                              for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                                  // 消息物理偏移量
                                  long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                                  // 消息長度
                                  int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
                                  // 消息TAG的hash碼
                                  long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
      
                                  maxPhyOffsetPulling = offsetPy;
      
                                  if (nextPhyFileStartOffset != Long.MIN_VALUE) {
                                      if (offsetPy < nextPhyFileStartOffset)
                                          continue;
                                  }
      
                                  boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
      
                                  if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
                                      isInDisk)) {
                                      break;
                                  }
      
                                  boolean extRet = false, isTagsCodeLegal = true;
                                  if (consumeQueue.isExtAddr(tagsCode)) {
                                      extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
                                      if (extRet) {
                                          tagsCode = cqExtUnit.getTagsCode();
                                      } else {
                                          // can't find ext content.Client will filter messages by tag also.
                                          log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
                                              tagsCode, offsetPy, sizePy, topic, group);
                                          isTagsCodeLegal = false;
                                      }
                                  }
      
                                  // 對消息TAG的Hash碼進(jìn)行比對,如果未匹配,則繼續(xù)拉取下一條消息
                                  if (messageFilter != null
                                      && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                                      if (getResult.getBufferTotalSize() == 0) {
                                          status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                      }
      
                                      continue;
                                  }
      
                                  SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
                                  if (null == selectResult) {
                                      if (getResult.getBufferTotalSize() == 0) {
                                          status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                                      }
      
                                      nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                                      continue;
                                  }
      
                                  // 對消息屬性進(jìn)行SQL92過濾,此種過濾方式會反序列化消息內(nèi)容,性能相對TAG過濾會差一點(diǎn)
                                  if (messageFilter != null
                                      && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
                                      if (getResult.getBufferTotalSize() == 0) {
                                          status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                      }
                                      // release...
                                      selectResult.release();
                                      continue;
                                  }
      
                                  this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
                                  getResult.addMessage(selectResult);
                                  status = GetMessageStatus.FOUND;
                                  nextPhyFileStartOffset = Long.MIN_VALUE;
                              }
      
                              if (diskFallRecorded) {
                                  long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
                                  brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
                              }
      
                              nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
      
                              // diff:是 maxOffsetPy 和 maxPhyOffsetPulling 兩者的差值,表示還有多少消息沒有拉取
                              long diff = maxOffsetPy - maxPhyOffsetPulling;
                              // StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE:表示當(dāng)前 Master Broker 全部的物理內(nèi)存大小。
                              long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
                                  * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
                              // 如果消息堆積大于內(nèi)存 40% 則建議從Slave Broker拉取消息(實(shí)現(xiàn)讀寫分離)
                              getResult.setSuggestPullingFromSlave(diff > memory);
      

      第六步:根據(jù)PullResult填充responseHeader的NextBeginOffset、MinOffset、MaxOffset。

      //org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
                  response.setRemark(getMessageResult.getStatus().name());
                  responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
                  responseHeader.setMinOffset(getMessageResult.getMinOffset());
                  responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
      

      第七步:根據(jù)主從同步延遲,如果從節(jié)點(diǎn)數(shù)據(jù)包含下一次拉取的偏移量,則設(shè)置下一次拉取任務(wù)的brokerId。

      //org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
                  if (getMessageResult.isSuggestPullingFromSlave()) {
                      responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
                  } else {
                      responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
                  }
      

      第八步:GetMessageResult與Response進(jìn)行狀態(tài)編碼轉(zhuǎn)換。

      //org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
                  switch (getMessageResult.getStatus()) {
                      case FOUND:
                          response.setCode(ResponseCode.SUCCESS);
                          break;
                      case MESSAGE_WAS_REMOVING:
                          response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                          break;
                      case NO_MATCHED_LOGIC_QUEUE:
                      case NO_MESSAGE_IN_QUEUE:
                          if (0 != requestHeader.getQueueOffset()) {
                              response.setCode(ResponseCode.PULL_OFFSET_MOVED);
      
                              // XXX: warn and notify me
                              log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",
                                  requestHeader.getQueueOffset(),
                                  getMessageResult.getNextBeginOffset(),
                                  requestHeader.getTopic(),
                                  requestHeader.getQueueId(),
                                  requestHeader.getConsumerGroup()
                              );
                          } else {
                              response.setCode(ResponseCode.PULL_NOT_FOUND);
                          }
                          break;
                      case NO_MATCHED_MESSAGE:
                          response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                          break;
                      case OFFSET_FOUND_NULL:
                          response.setCode(ResponseCode.PULL_NOT_FOUND);
                          break;
                      case OFFSET_OVERFLOW_BADLY:
                          response.setCode(ResponseCode.PULL_OFFSET_MOVED);
                          // XXX: warn and notify me
                          log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",
                              requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
                          break;
                      case OFFSET_OVERFLOW_ONE:
                          response.setCode(ResponseCode.PULL_NOT_FOUND);
                          break;
                      case OFFSET_TOO_SMALL:
                          response.setCode(ResponseCode.PULL_OFFSET_MOVED);
                          log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
                              requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
                              getMessageResult.getMinOffset(), channel.remoteAddress());
                          break;
                      default:
                          assert false;
                          break;
                  }
      

      GetMessageStatus和ResponseCode轉(zhuǎn)換關(guān)系:

      ResponseCode GetMessageStatus
      SUCCESS FOUND
      PULL_RETRY_IMMEDIATELY NO_MATCHED_MESSAGE、MESSAGE_WAS_REMOVING(消息存在下一個(gè)CommitLog里)
      PULL_NOT_FOUND NO_MESSAGE_IN_QUEUE(隊(duì)列中未包含消息)、NO_MATCHED_LOGIC_QUEUE(未找到隊(duì)列)、OFFSET_FOUND_NULL、OFFSET_OVERFLOW_ONE(offset剛好等于當(dāng)前最大的offset)
      PULL_OFFSET_MOVED NO_MESSAGE_IN_QUEUE(隊(duì)列中未包含消息)、NO_MATCHED_LOGIC_QUEUE(未找到隊(duì)列)、OFFSET_OVERFLOW_BADLY(offset越界)、OFFSET_TOO_SMALL(offset不在隊(duì)列中)

      第九步:如果CommitLog標(biāo)記為可用并且當(dāng)前節(jié)點(diǎn)為主節(jié)點(diǎn),則更新消息消費(fèi)進(jìn)度,消費(fèi)進(jìn)度將先存儲在Broker內(nèi)存中,由定時(shí)任務(wù)將消費(fèi)進(jìn)度寫入Broker本地的JSON文件中。

      //org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
              boolean storeOffsetEnable = brokerAllowSuspend;
              storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
              storeOffsetEnable = storeOffsetEnable
                  && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
              if (storeOffsetEnable) {
                  //如果CommitLog標(biāo)記為可用并且當(dāng)前節(jié)點(diǎn)為主節(jié)點(diǎn),則更新消息消費(fèi)進(jìn)度,
                  this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
                      requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
              }
      

      服務(wù)端消息拉取處理完畢,將返回結(jié)果拉取到消息調(diào)用方。在調(diào)用方,需要重點(diǎn)關(guān)注PULL_RETRY_IMMEDIATELY、PULL_OFFSET_MOVED、PULL_NOT_FOUND等情況下如何校正拉取偏移量。

      4.3.3 消息拉取客戶端處理消息

      回到消息拉取客戶端調(diào)用入口:MQClientAPIImpl#pullMessageAsync

      第一步:根據(jù)響應(yīng)結(jié)果解碼成PullResultExt對象,此時(shí)只是從網(wǎng)絡(luò)中讀取消息列表中的byte[] messageBinary屬性

      // org.apache.rocketmq.client.impl.MQClientAPIImpl#processPullResponse
          private PullResult processPullResponse(
              final RemotingCommand response) throws MQBrokerException, RemotingCommandException {
              PullStatus pullStatus = PullStatus.NO_NEW_MSG;
              switch (response.getCode()) {
                  case ResponseCode.SUCCESS:
                      pullStatus = PullStatus.FOUND;
                      break;
                  case ResponseCode.PULL_NOT_FOUND:
                      pullStatus = PullStatus.NO_NEW_MSG;
                      break;
                  case ResponseCode.PULL_RETRY_IMMEDIATELY:
                      pullStatus = PullStatus.NO_MATCHED_MSG;
                      break;
                  case ResponseCode.PULL_OFFSET_MOVED:
                      pullStatus = PullStatus.OFFSET_ILLEGAL;
                      break;
      
                  default:
                      throw new MQBrokerException(response.getCode(), response.getRemark());
              }
      

      NettyRemotingClient在收到服務(wù)端響應(yīng)結(jié)構(gòu)后,會回調(diào)PullCallback的onSuccess或onException,PullCallBack對象在DefaultMQPushConsumerImpl#pullMessage中創(chuàng)建。

      第二步:調(diào)用pullAPIWrapper的processPullResult,將消息字節(jié)數(shù)組解碼成消息列表并填充msgFoundList,對消息進(jìn)行消息過濾(TAG 模式)

      // org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
                          pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                              subscriptionData);
      

      接下來按照正常流程,即分析拉取結(jié)果為PullStatus.FOUND(找到對應(yīng)的消息)的情況來分析整個(gè)消息拉取過程。

      // org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
                          switch (pullResult.getPullStatus()) {
                              case FOUND:
                                  // 獲取到了消息
                                  long prevRequestOffset = pullRequest.getNextOffset();
                                  pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                                  long pullRT = System.currentTimeMillis() - beginTimestamp;
                                  DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                      pullRequest.getMessageQueue().getTopic(), pullRT);
      
                                  long firstMsgOffset = Long.MAX_VALUE;
                                  if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                      // 如果msgFoundList為空,則立即將PullReqeuest放入PullMessageService的pullRequestQueue,
                                      // 以便PullMessageSerivce能及時(shí)喚醒并再次執(zhí)行消息拉取
      
                                      // 為什么PullResult.msgFoundList
                                      //還會為空呢?因?yàn)镽ocketMQ根據(jù)TAG進(jìn)行消息過濾時(shí),在服務(wù)端只是驗(yàn)
                                      //證了TAG的哈希碼,所以客戶端再次對消息進(jìn)行過濾時(shí),可能會出現(xiàn)
                                      //msgFoundList為空的情況
                                      DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                  } else {
                                      firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
      
                                      DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                          pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
      
                                      // 將消息存入processQueue
                                      boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
      
                                      // 然后將拉取到的消息提交到Consume MessageService中供消費(fèi)者消費(fèi)
                                      DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                          pullResult.getMsgFoundList(),
                                          processQueue,
                                          pullRequest.getMessageQueue(),
                                          dispatchToConsume);
      
                                      if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                          DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                              DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                      } else {
                                          DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                      }
                                  }
      
                                  if (pullResult.getNextBeginOffset() < prevRequestOffset
                                      || firstMsgOffset < prevRequestOffset) {
                                      log.warn(
                                          "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                          pullResult.getNextBeginOffset(),
                                          firstMsgOffset,
                                          prevRequestOffset);
                                  }
      
                                  break;
      
      1. 更新PullRequest的下一次拉取偏移量,如果msgFoundList為空,則立即將PullReqeuest放入PullMessageService 的pullRequestQueue,以便PullMessageSerivce能及時(shí)喚醒并再次執(zhí)行消息拉取。為什么PullStatus.msgFoundList 還會為空呢?因?yàn)镽ocketMQ根據(jù)TAG進(jìn)行消息過濾時(shí),在服務(wù)端只是驗(yàn)證了TAG的哈希碼,所以客戶端再次對消息進(jìn)行過濾時(shí),可能會出現(xiàn)msgFoundList為空的情況。
      2. 將拉取到的消息存入ProcessQueue,然后將拉取到的消息提交到ConsumeMessageService中供消費(fèi)者消費(fèi)。該方法是一個(gè)異步方法,也就是PullCallBack將消息提交到ConsumeMessageService中就會立即返回,至于這些消息如何消費(fèi),PullCallBack不會關(guān)注。
      3. 將消息提交給消費(fèi)者線程之后,PullCallBack將立即返回,可以說本次消息拉取順利完成。然后查看pullInterval參數(shù),如果pullInterval>0,則等待pullInterval毫秒后將PullRequest對象放入PullMessageService的pullRequestQueue中,該消息隊(duì)列的下次拉 取即將被激活,達(dá)到持續(xù)消息拉取,實(shí)現(xiàn)準(zhǔn)實(shí)時(shí)拉取消息的效果。

      再來分析消息拉取異常處理是如何校對拉取偏移量。

      1. NO_NEW_MSG、NO_MATCHED_MSG

      如果返回NO_NEW_MSG(沒有新消息)、NO_MATCHED_MSG(沒有匹配消息),則直接使用服務(wù)器端校正的偏移量進(jìn)行下一次消息的拉

      ?。?/p>

      // org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
                              case NO_NEW_MSG:
                                  pullRequest.setNextOffset(pullResult.getNextBeginOffset());
      
                                  DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
      
                                  DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                  break;
                              case NO_MATCHED_MSG:
                                  pullRequest.setNextOffset(pullResult.getNextBeginOffset());
      
                                  DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
      
                                  DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                  break;
      

      再來看服務(wù)端如何校正Offset:

      NO_NEW_MSG 對應(yīng) GetMessageResult.OFFSET_FOUND_NULL、GetMessageResult.OFFSET_OVERFLOW_ONE。

      OFFSET_OVERFLOW_ONE表示待拉取消息的物理偏移量等于消息隊(duì)列最大的偏移量,如果有新的消息到達(dá),此時(shí)會創(chuàng)建一個(gè)新的ConsumeQueue文件,因?yàn)樯弦粋€(gè)ConsueQueue文件的最大偏移量就是下一個(gè)文件的起始偏移量,所以可以按照該物理偏移量第二次拉取消息。

      OFFSET_FOUND_NULL表示根據(jù)ConsumeQueue文件的偏移量沒有找到內(nèi)容,使用偏移量定位到下一個(gè)ConsumeQueue文件,其實(shí)就是offset+(一個(gè)ConsumeQueue文件包含多少個(gè)條目=MappedFileSize/20)。

      1. OFFSET_ILLEGAL

      如果拉取結(jié)果顯示偏移量非法,首先將ProcessQueue的dropped設(shè)為true,表示丟棄該消費(fèi)隊(duì)列,意味著ProcessQueue中拉取的消息將停止消費(fèi),然后根據(jù)服務(wù)端下一次校對的偏移量嘗試更新消息消費(fèi)進(jìn)度(內(nèi)存中),然后嘗試持久化消息消費(fèi)進(jìn)度,并將該消息隊(duì)列從RebalacnImpl的處理隊(duì)列中移除,意味著暫停該消息隊(duì)列的消息拉取,等待下一次消息隊(duì)列重新負(fù)載。OFFSET_ILLEGAL對應(yīng)服務(wù)端GetMessageResult狀態(tài)的NO_MATCHED_LOGIC_QUEUE、NO_MESSAGE_IN_QUEUE、OFFSET_OVERFLOW_BADLY、OFFSET_TOO_SMALL,這些狀態(tài)服務(wù)端偏移量校正基本上使用原偏移量,在客戶端更新消息消費(fèi)進(jìn)度時(shí)只有當(dāng)消息進(jìn)度比當(dāng)前消費(fèi)進(jìn)度大才會覆蓋,以此保證消息進(jìn)度的準(zhǔn)確性。

      4.3.4 消息拉取長輪詢機(jī)制分析

      RocketMQ 并沒有真正實(shí)現(xiàn)推模式,而是消費(fèi)者主動向消息服務(wù)器拉取消息,RocketMQ 推模式是循環(huán)向消息服務(wù)端發(fā)送消息拉取請求,如果消息消費(fèi)者向 RocketMQ 發(fā)送消息拉取時(shí),消息并未到達(dá)消費(fèi)隊(duì)列,且未啟用長輪詢機(jī)制,則會在服務(wù)端等待 shortPollingTimeMills 時(shí)間后(掛起),再去判斷消息是否已到達(dá)消息隊(duì)列。如果消息未到達(dá),則提示消息拉取客戶端 PULL_NOT_FOUND(消息不存在),如果開啟長輪詢模式,RocketMQ 一方面會每 5s 輪詢檢查一次消息是否可達(dá),同時(shí)一有新消息到達(dá)后,立即通知掛起線程再次驗(yàn)證新消息是否是自己感興趣的,如果是則從 CommitLog 文件提取消息返回給消息拉取客戶端,否則掛起超時(shí),超時(shí)時(shí)間由消息拉取方在消息拉取時(shí)封裝在請求參數(shù)中,推模式默認(rèn)為15s,拉模式通過 DefaultMQPullConsumer#setBrokerSuspendMaxTimeMillis 進(jìn)行設(shè)置。RocketMQ 通過在 Broker 端配置 longPollingEnable 為true來開啟長輪詢模式。

      消息拉取時(shí)服務(wù)端從CommitLog文件中未找到消息的處理邏輯:

      // org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
          /**
           * 處理消息拉取請求
           * @param channel 請求對應(yīng)網(wǎng)絡(luò)連接通道
           * @param request 請求的內(nèi)容
           * @param brokerAllowSuspend Broker端是否支持掛起,處理消息拉取時(shí)默認(rèn)傳入true,表示如果未找到消息則掛起,如果該參數(shù)為false,未找到消息時(shí)直接返回客戶端消息未找到。
           * @return
           * @throws RemotingCommandException
           */
          private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend){
              .......省略其它邏輯.......
                      case ResponseCode.PULL_NOT_FOUND:
      
                          // broker是否開啟了長輪詢 以及 客戶端是否使用長輪詢
                          if (brokerAllowSuspend && hasSuspendFlag) {
                              long pollingTimeMills = suspendTimeoutMillisLong;
                              if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                                  pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                              }
      
                              String topic = requestHeader.getTopic();
                              long offset = requestHeader.getQueueOffset();
                              int queueId = requestHeader.getQueueId();
                              PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                                  this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
                              // 每隔5s重試一次
                              this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                              response = null;
                              break;
                          }
          }
      
      1. Channel channel:網(wǎng)絡(luò)通道,通過該通道向消息拉取客戶端發(fā)送響應(yīng)結(jié)果。
      2. RemotingCommand request:消息拉取請求。
      3. boolean brokerAllowSuspend:Broker端是否支持掛起,處理消息拉取時(shí)默認(rèn)傳入true,表示如果未找到消息則掛起,如果該參數(shù) 為false,未找到消息時(shí)直接返回客戶端消息未找到。

      如果brokerAllowSuspend為true,表示支持掛起,則將響應(yīng)對象response設(shè)置為null,不會立即向客戶端寫入響應(yīng),hasSuspendFlag 參數(shù)在拉取消息時(shí)構(gòu)建的拉取標(biāo)記默認(rèn)為true。

      默認(rèn)支持掛起,根據(jù)是否開啟長輪詢決定掛起方式。如果開啟長輪詢模式,掛起超時(shí)時(shí)間來自請求參數(shù),推模式默認(rèn)為15s,拉模式通過DefaultMQPullConsumer#brokerSuspenMaxTimeMillis 進(jìn)行設(shè)置,默認(rèn)20s。然后創(chuàng)建拉取任務(wù) PullRequest 并提交到 PullRequestHoldService 線程中。

      RocketMQ 輪詢機(jī)制由兩個(gè)線程共同完成。

      1. PullRequestHoldService:每隔5s重試一次。
      2. DefaultMessageStore#ReputMessageService:每處理一次重新拉取,線程休眠1s,繼續(xù)下一次檢查。

      PullRequestHoldService線程詳解

      // org.apache.rocketmq.broker.longpolling.PullRequestHoldService#suspendPullRequest
          public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
              String key = this.buildKey(topic, queueId);
              ManyPullRequest mpr = this.pullRequestTable.get(key);
              if (null == mpr) {
                  mpr = new ManyPullRequest();
                  ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
                  if (prev != null) {
                      mpr = prev;
                  }
              }
      
              mpr.addPullRequest(pullRequest);
          }
      

      根據(jù)消息主題與消息隊(duì)列構(gòu)建key,從 ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable 中獲取該主題隊(duì)列對應(yīng)的 ManyPullRequest,通過 ConcurrentMap 的并發(fā)特性,維護(hù)主題隊(duì)列的 ManyPullRequest,然后將 PullRequest 放入ManyPullRequest。ManyPullRequest 對象內(nèi)部持有一個(gè) PullRequest 列表,表示同一主題隊(duì)列的累積拉取消息任務(wù)。

      //org.apache.rocketmq.broker.longpolling.PullRequestHoldService#run
          @Override
          public void run() {
              log.info("{} service started", this.getServiceName());
              while (!this.isStopped()) {
                  try {
                      if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                          // 默認(rèn)情況下,每個(gè)5s檢測所有長輪詢連接是否有新消息到達(dá)
                          this.waitForRunning(5 * 1000);
                      } else {
                          this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                      }
      
                      long beginLockTimestamp = this.systemClock.now();
                      // 檢查長輪詢連接是否有新的消息到達(dá)
                      this.checkHoldRequest();
                      long costTime = this.systemClock.now() - beginLockTimestamp;
                      if (costTime > 5 * 1000) {
                          log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
                      }
                  } catch (Throwable e) {
                      log.warn(this.getServiceName() + " service has exception. ", e);
                  }
              }
      
              log.info("{} service end", this.getServiceName());
          }
      

      如果開啟長輪詢,每5s判斷一次新消息是否到達(dá)。如果未開啟長輪詢,則默認(rèn)等待1s再次判斷,可以通BrokerConfig#shortPollingTimeMills 改變等待時(shí)間。

      // org.apache.rocketmq.broker.longpolling.PullRequestHoldService#checkHoldRequest
          private void checkHoldRequest() {
              for (String key : this.pullRequestTable.keySet()) {
                  String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
                  if (2 == kArray.length) {
                      String topic = kArray[0];
                      int queueId = Integer.parseInt(kArray[1]);
                      final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                      try {
                          this.notifyMessageArriving(topic, queueId, offset);
                      } catch (Throwable e) {
                          log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
                      }
                  }
              }
          }
      

      遍歷拉取任務(wù)表,根據(jù)主題與隊(duì)列獲取消息消費(fèi)隊(duì)列的最大偏移量,如果該偏移量大于待拉取偏移量,說明有新的消息到達(dá),調(diào)用notifyMessageArriving觸發(fā)消息拉取。

      //org.apache.rocketmq.broker.longpolling.PullRequestHoldService#notifyMessageArriving
              String key = this.buildKey(topic, queueId);
              ManyPullRequest mpr = this.pullRequestTable.get(key);
              if (mpr != null) {
                  List<PullRequest> requestList = mpr.cloneListAndClear();
      

      第一步:首先從ManyPullRequest中獲取當(dāng)前該主題隊(duì)列所有的掛起拉取任務(wù)。

      //org.apache.rocketmq.broker.longpolling.PullRequestHoldService#notifyMessageArriving
                      for (PullRequest request : requestList) {
                          long newestOffset = maxOffset;
                          if (newestOffset <= request.getPullFromThisOffset()) {
                              newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                          }
      
      
                          if (newestOffset > request.getPullFromThisOffset()) {
                              boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
                                  new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
                              // match by bit map, need eval again when properties is not null.
                              if (match && properties != null) {
                                  match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
                              }
      
                              if (match) {
                                  try {
                                      // 如果消息匹配,則調(diào)用executeRequestWhenWakeup方法,將消息返回給客戶端
                                      this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                          request.getRequestCommand());
                                  } catch (Throwable e) {
                                      log.error("execute request when wakeup failed.", e);
                                  }
                                  continue;
                              }
                          }
      
                          if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                              // 如果掛起超時(shí),則不繼續(xù)等待,直接返回客戶端消息未找到
                              try {
                                  this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                      request.getRequestCommand());
                              } catch (Throwable e) {
                                  log.error("execute request when wakeup failed.", e);
                              }
                              continue;
                          }
      
                          replayList.add(request);
                      }
      

      第二步:如果消息隊(duì)列的最大偏移量大于待拉取偏移量,且消息匹配,則調(diào)用executeRequestWhenWakeup將消息返回給消息拉取客戶端,否則等待下一次嘗試。如果掛起超時(shí),則不繼續(xù)等待,直接返回客戶消息未找到。

      // org.apache.rocketmq.broker.processor.PullMessageProcessor#executeRequestWhenWakeup
          public void executeRequestWhenWakeup(final Channel channel,
              final RemotingCommand request) throws RemotingCommandException {
              Runnable run = new Runnable() {
                  @Override
                  public void run() {
                      try {
                          // 拉取磁盤消息,并生成response。注意此處 brokerAllowSuspend 參數(shù)為false,代表不管是否拉取到消息都會直接返回,而不會hold線程
                          final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
      
                          if (response != null) {
                              response.setOpaque(request.getOpaque());
                              response.markResponseType();
                              try {
                                  // 將response寫入連接channel中
                                  channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
                                      @Override
                                      public void operationComplete(ChannelFuture future) throws Exception {
                                          if (!future.isSuccess()) {
                                              log.error("processRequestWrapper response to {} failed",
                                                  future.channel().remoteAddress(), future.cause());
                                              log.error(request.toString());
                                              log.error(response.toString());
                                          }
                                      }
                                  });
                              } catch (Throwable e) {
                                  log.error("processRequestWrapper process request over, but response failed", e);
                                  log.error(request.toString());
                                  log.error(response.toString());
                              }
                          }
                      } catch (RemotingCommandException e1) {
                          log.error("excuteRequestWhenWakeup run", e1);
                      }
                  }
              };
              this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
          }
      

      第四步:這里的核心又回到長輪詢的入口代碼了,其核心是設(shè)置brokerAllowSuspend為false,表示不支持拉取線程掛起,即當(dāng)根據(jù)偏移量無法獲取消息時(shí),將不掛起線程并等待新消息,而是直接返回告 訴客戶端本次消息拉取未找到消息。

      回想一下,如果開啟了長輪詢機(jī)制,PullRequestHoldService線程每隔5s被喚醒,嘗試檢測是否有新消息到來,直到超時(shí)才停止,如果被掛起,需要等待5s再執(zhí)行。消息拉取的實(shí)時(shí)性比較差,為了避免這種情況,RocketMQ引入另外一種機(jī)制:當(dāng)消息到達(dá)時(shí)喚醒掛起線程,觸發(fā)一次檢查。

      DefaultMessageStore#ReputMessageService詳解

      ReputMessageService線程主要是根據(jù)CommitLog文件將消息轉(zhuǎn)發(fā)到ConsumeQueue、Index等文件。這里我們關(guān)注doReput()方法關(guān)于長輪詢的相關(guān)實(shí)現(xiàn):

      //org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput
                    // 如果開啟了長輪詢
                    if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                        && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                        // 有新消息到達(dá),喚醒長輪詢等待的線程,返回消息
                        DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                            dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                            dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                            dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                    }
      

      當(dāng)新消息達(dá)到CommitLog文件時(shí),ReputMessageService線程負(fù)責(zé) 將消息轉(zhuǎn)發(fā)給Consume Queue文件和Index文件,如果Broker端開啟了長輪詢模式并且當(dāng)前節(jié)點(diǎn)角色主節(jié)點(diǎn),則將調(diào)用PullRequestHoldService線程的notifyMessageArriving()方法喚醒掛起線程,判斷當(dāng)前消費(fèi)隊(duì)列最大偏移量是否大于待拉取偏移量,如果 大于則拉取消息。長輪詢模式實(shí)現(xiàn)了準(zhǔn)實(shí)時(shí)消息拉取。

      小結(jié)

      RocketMQ broker在處理消息拉取請求時(shí),如果消息未找到且brokerAllowSuspendtrue(Broker端支持掛起)且開啟了長輪詢,會設(shè)置掛起超時(shí)時(shí)間,創(chuàng)建PullRequest并提交到PullRequestHoldService線程中。PullRequestHoldService 線程每 5 秒掃描所有被 hold 住的長輪詢請求,檢查是否有新消息到達(dá)并返回。為提高實(shí)時(shí)性,在 DefaultMessageStore#ReputMessageService 線程將 CommitLog 消息轉(zhuǎn)發(fā)到 ConsumeQueue 文件時(shí),若 Broker 端開啟長輪詢且當(dāng)前節(jié)點(diǎn)為主節(jié)點(diǎn),則調(diào)用 PullRequestHoldServicenotifyMessageArriving 方法喚醒掛起線程,判斷消費(fèi)隊(duì)列最大偏移量與待拉取偏移量關(guān)系,若前者大于后者則拉取消息。

      五. 消息隊(duì)列負(fù)載與重平衡

      試想一下一個(gè)消費(fèi)組通常由多個(gè)消費(fèi)者實(shí)例消費(fèi),而一個(gè) ConsumeQueue 同一時(shí)間只能被一個(gè)消費(fèi)者實(shí)例消費(fèi),Topic的多個(gè)隊(duì)列該以何種方式分配給各個(gè)消費(fèi)者實(shí)例呢?在生產(chǎn)環(huán)境中會涉及到節(jié)點(diǎn)的擴(kuò)縮容以及發(fā)布場景下的節(jié)點(diǎn)上下線問題,如果消費(fèi)者實(shí)例數(shù)量發(fā)生變更,如何重新將 ConsumeQueue 重新分配給消費(fèi)者實(shí)例呢?

      RocketMQ消息隊(duì)列重新分布是由RebalanceService線程實(shí)現(xiàn)的,一個(gè)MQClientInstance持有一個(gè)RebalanceService實(shí)現(xiàn),并隨著MQClientInstance的啟動而啟動。接下來我們帶著上面的問題,了解一下RebalanceService的run()方法。

      // org.apache.rocketmq.client.impl.consumer.RebalanceService#run
          @Override
          public void run() {
              log.info(this.getServiceName() + " service started");
      
              while (!this.isStopped()) {
                  // 默認(rèn)每20s執(zhí)行一次 doRebalance
                  this.waitForRunning(waitInterval);
                  this.mqClientFactory.doRebalance();
              }
      
              log.info(this.getServiceName() + " service end");
          }
      

      RebalanceService 線程默認(rèn)每隔 20s 執(zhí)行一次 doRebalance() 方法。可以使用 -Drocketmq.client.rebalance.waitInterval=interval 改變默認(rèn)配置。

      // org.apache.rocketmq.client.impl.factory.MQClientInstance#doRebalance
          public void doRebalance() {
              // 遍歷消費(fèi)者
              for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
                  MQConsumerInner impl = entry.getValue();
                  if (impl != null) {
                      try {
                          // 對消費(fèi)者執(zhí)行 doRebalance
                          impl.doRebalance();
                      } catch (Throwable e) {
                          log.error("doRebalance exception", e);
                      }
                  }
              }
          }
      

      doRebalance()方法中,會循環(huán)遍歷所有注冊的消費(fèi)者信息,然后執(zhí)行對應(yīng)消費(fèi)組的 doRebalance() 方法:

      // org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance
          public void doRebalance(final boolean isOrder) {
              // 一個(gè)consumerGroup可以同時(shí)消費(fèi)多個(gè)topic,所以此處返回的是一個(gè)Map,key 是 topic 名稱
              Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
              if (subTable != null) {
                  for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                      final String topic = entry.getKey();
                      try {
                          this.rebalanceByTopic(topic, isOrder);
                      } catch (Throwable e) {
                          if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                              log.warn("rebalanceByTopic Exception", e);
                          }
                      }
                  }
              }
      
              this.truncateMessageQueueNotMyTopic();
          }
      

      每個(gè) DefaultMQPushConsumerImpl 都持有一個(gè)單獨(dú)的 RebalanceImpl 對象,該方法主要遍歷訂閱信息對每個(gè)主題的隊(duì)列進(jìn)行重新負(fù)載。RebalanceImplMap<String, SubscriptionData> subTable 在調(diào)用消費(fèi)者 DefaultMQPushConsumerImpl#subscribe 方法時(shí)填充。如果訂閱信息發(fā)生變化,例如調(diào)用了 unsubscribe() 方法,則需要將不關(guān)心的主題消費(fèi)隊(duì)列從 processQueueTable 中移除。接下來重點(diǎn)分析 RebalanceImpl#rebalanceByTopic,了解RocketMQ如何針對單個(gè)主題進(jìn)行消息隊(duì)列重新負(fù)載(以集群模式)。

      第一步:從主題訂閱信息緩存表中獲取主題的隊(duì)列信息。發(fā)送請求從Broker中獲取該消費(fèi)組內(nèi)當(dāng)前所有的消費(fèi)者客戶端ID,主題的隊(duì)列可能分布在多個(gè)Broker上,那么請求該發(fā)往哪個(gè)Broker呢? RocketeMQ從主題的路由信息表中隨機(jī)選擇一個(gè)Broker。Broker為什么會存在消費(fèi)組內(nèi)所有消費(fèi)者的信息呢?我們不妨回憶一下,消費(fèi)者在啟動的時(shí)候會向MQClientInstance中注冊消費(fèi)者,然后MQClientInstance會向所有的Broker發(fā)送心跳包,心跳包中包含MQClientInstance的消費(fèi)者信息。如果mqSet、cidAll任意一個(gè)為空,則忽略本次消息隊(duì)列負(fù)載。

      // org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic
                  case CLUSTERING: {
                      // 獲取topic 的隊(duì)列信息
                      Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                      // 發(fā)送請求,從broker中獲取該消費(fèi)組內(nèi)當(dāng)前所有的消費(fèi)者客戶端ID
                      List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
      

      第二步:對cidAll、mqAll進(jìn)行排序。這一步很重要,同一個(gè)消費(fèi)組內(nèi)看到的視圖應(yīng)保持一致,確保同一個(gè)消費(fèi)隊(duì)列不會被多個(gè)消費(fèi)者分配。然后調(diào)用 AllocateMessageQueueStrategy 隊(duì)列負(fù)載策略進(jìn)行隊(duì)列負(fù)載:

      // org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic
                          Collections.sort(mqAll);
                          Collections.sort(cidAll);
      
                          AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
      
                          List<MessageQueue> allocateResult = null;
                          try {
                              allocateResult = strategy.allocate(
                                  this.consumerGroup,
                                  this.mQClientFactory.getClientId(),
                                  mqAll,
                                  cidAll);
                          } catch (Throwable e) {
                              log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                                  e);
                              return;
                          }
      

      AllocateMessageQueueStrategy 是隊(duì)列分配算法的接口:

      /**
       * 多個(gè)消費(fèi)者之間分配消息隊(duì)列算法策略
       */
      public interface AllocateMessageQueueStrategy {
      
          /**
           * Allocating by consumer id
           *
           * @param consumerGroup current consumer group
           * @param currentCID current consumer id
           * @param mqAll message queue set in current topic
           * @param cidAll consumer set in current consumer group
           * @return The allocate result of given strategy
           */
          List<MessageQueue> allocate(
              final String consumerGroup,
              final String currentCID,
              final List<MessageQueue> mqAll,
              final List<String> cidAll
          );
      
          /**
           * Algorithm name
           *
           * @return The strategy name
           */
          String getName();
      }
      

      RocketMQ默認(rèn)提供5種分配算法:

      1. AllocateMessageQueueAveragely:平均分配,推薦使用。

        舉例來說,如果現(xiàn)在有8個(gè)消息消費(fèi)隊(duì)列q1、q2、q3、q4、q5、 q6、q7、q8,有3個(gè)消費(fèi)者c1、c2、c3,那么根據(jù)該負(fù)載算法,消息隊(duì)列分配如下。

        c1:q1、q2、q3
        c2:q4、q5、q6
        c3:q7、q8。
        
      2. AllocateMessageQueueAveragelyByCircle:平均輪詢分配,推薦使用。

        舉例來說,如果現(xiàn)在有8個(gè)消息消費(fèi)隊(duì)列q1、q2、q3、q4、q5、 q6、q7、q8,有3個(gè)消費(fèi)者c1、c2、c3,那么根據(jù)該負(fù)載算法,消息隊(duì)列分配如下。

        c1:q1、q4、q7
        c2:q2、q5、q8
        c3:q3、q6
        
      3. AllocateMessageQueueConsistentHash:一致性哈希。因?yàn)橄㈥?duì)列負(fù)載信息不容易跟蹤,所以不推薦使用。

      4. AllocateMessageQueueByConfig:根據(jù)配置,為每一個(gè)消費(fèi)者配置固定的消息隊(duì)列。

      5. AllocateMessageQueueByMachineRoom:根據(jù)Broker部署機(jī)房名,對每個(gè)消費(fèi)者負(fù)責(zé)不同的Broker上的隊(duì)列。

      當(dāng)然你也可以擴(kuò)展該接口實(shí)現(xiàn)自己的負(fù)載策略。

      消息負(fù)載算法如果沒有特殊的要求,盡量使用 AllocateMessageQueueAveragelyAllocateMessageQueueAveragelyByCircle,這是因?yàn)榉峙渌惴ū容^直觀。消息隊(duì)列分配原則為一個(gè)消費(fèi)者可以分配多個(gè)消息隊(duì)列,但同一個(gè)消息隊(duì)列只會分配給一個(gè)消費(fèi)者,故如果消費(fèi)者個(gè)數(shù)大于消息隊(duì)列數(shù)量,則有些消費(fèi)者無法消費(fèi)消息。

      對比消息隊(duì)列是否發(fā)生變化,主要思路是遍歷當(dāng)前負(fù)載隊(duì)列集合,如果隊(duì)列不在新分配隊(duì)列的集合中,需要將該隊(duì)列停止消費(fèi)并保存消費(fèi)進(jìn)度;遍歷已分配的隊(duì)列,如果隊(duì)列不在隊(duì)列負(fù)載表中(processQueueTable),則需要?jiǎng)?chuàng)建該隊(duì)列拉取任務(wù)PullRequest, 然后添加到PullMessageService 線程的 pullRequestQueue中,PullMessageService才會繼續(xù)拉取任務(wù):

      第三步:ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable 是當(dāng)前消費(fèi)者負(fù)載的消息隊(duì)列緩存表,如果緩存表中的MessageQueue 不包含在 mqSet 中,說明經(jīng)過本次消息隊(duì)列負(fù)載后,該mq被分配給其他消費(fèi)者,需要暫停該消息隊(duì)列消息的消費(fèi)。方法是將 ProccessQueue 的狀態(tài)設(shè)置為 droped=true,該 ProcessQueue 中的消息將不會再被消費(fèi),調(diào)用 removeUnnecessaryMessageQueue 方法判斷是否將 MessageQueue、ProccessQueue 從緩存表中移除。removeUnnecessaryMessageQueueRebalanceImple 中定義為抽象方法。removeUnnecessaryMessageQueue 方法主要用于持久化待移除 MessageQueue 的消息消費(fèi)進(jìn)度。在推模式下,如果是集群模式并且是順序消息消費(fèi),還需要先解鎖隊(duì)列。關(guān)于順序消息我們在 《RocketMQ順序消息實(shí)現(xiàn)原理》 詳細(xì)介紹。

      // org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance
              // processQueueTable是當(dāng)前消費(fèi)者負(fù)載的消息隊(duì)列緩存表,如果緩存表中的MessageQueue不包含在mqSet中,說明經(jīng)過本次消息隊(duì)列負(fù)載后,
              // 該mq被分配給其他消費(fèi)者,需要暫停該消息隊(duì)列消息的消費(fèi)
              Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
              while (it.hasNext()) {
                  Entry<MessageQueue, ProcessQueue> next = it.next();
                  MessageQueue mq = next.getKey();
                  ProcessQueue pq = next.getValue();
      
                  if (mq.getTopic().equals(topic)) {
                      if (!mqSet.contains(mq)) {
                          // 如果緩存表中的MessageQueue不包含在mqSet中,說明經(jīng)過本次消息隊(duì)列負(fù)載后,該mq被分配給其他消費(fèi)者
                          pq.setDropped(true);
                          // 持久化消費(fèi)進(jìn)度至broker
                          if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                              it.remove();
                              changed = true;
                              log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                          }
                      } else if (pq.isPullExpired()) {
                          switch (this.consumeType()) {
                              case CONSUME_ACTIVELY:
                                  break;
                              case CONSUME_PASSIVELY:
                                  pq.setDropped(true);
                                  if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                                      it.remove();
                                      changed = true;
                                      log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                                          consumerGroup, mq);
                                  }
                                  break;
                              default:
                                  break;
                          }
                      }
                  }
              }
      

      第四步:遍歷本次負(fù)載分配到的隊(duì)列集合,如果 processQueueTable 中沒有包含該消息隊(duì)列,表明這是本次新增加的消息隊(duì)列,首先從內(nèi)存中移除該消息隊(duì)列的消費(fèi)進(jìn)度,然后從磁盤或者Broker中(根據(jù)消費(fèi)模式)讀取該消息隊(duì)列的消費(fèi)進(jìn)度。然后將新分配隊(duì)列的PullRequest放入 PullMessageService 任務(wù)隊(duì)列中,這樣消費(fèi)者就可以開始拉取消息。

      
              List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
              for (MessageQueue mq : mqSet) {
                  if (!this.processQueueTable.containsKey(mq)) {
                      // 如果 mqSet 不存在于本地緩存中,則證明是新增加的消息隊(duì)列
                      // 如果是順序消費(fèi),則需要對queue進(jìn)行加鎖,只有加鎖成功才會創(chuàng)建 PullRequest 拉取broker消息
                      if (isOrder && !this.lock(mq)) {
                          log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                          continue;
                      }
                      // 從內(nèi)存中移除消息進(jìn)度(臟數(shù)據(jù))
                      this.removeDirtyOffset(mq);
                      ProcessQueue pq = new ProcessQueue();
                      // 計(jì)算消息消費(fèi)的起始偏移量
                      long nextOffset = this.computePullFromWhere(mq);
                      if (nextOffset >= 0) {
                          ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                          if (pre != null) {
                              log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                          } else {
                              log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                              PullRequest pullRequest = new PullRequest();
                              pullRequest.setConsumerGroup(consumerGroup);
                              pullRequest.setNextOffset(nextOffset);
                              pullRequest.setMessageQueue(mq);
                              pullRequest.setProcessQueue(pq);
                              pullRequestList.add(pullRequest);
                              changed = true;
                          }
                      } else {
                          log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                      }
                  }
              }
      
              // 將PullRequest 放入 PullMessageService 任務(wù)隊(duì)列中,這樣消費(fèi)者就可以開始拉取消息
              this.dispatchPullRequest(pullRequestList);
      
              return changed;
          }
      

      小結(jié)

      RocketMQ 消息隊(duì)列的負(fù)載與重平衡由 RebalanceService 線程實(shí)現(xiàn),默認(rèn)每隔 20s 執(zhí)行一次 doRebalance 方法。具體過程如下:

      1. RebalanceService 線程每隔 20s 觸發(fā)一次 doRebalance,遍歷所有注冊的消費(fèi)者信息執(zhí)行對應(yīng)消費(fèi)組的 doRebalance 方法,對每個(gè)主題的隊(duì)列進(jìn)行重新負(fù)載。
      2. 在 RebalanceImpl#rebalanceByTopic 中,首先從主題訂閱信息緩存表中獲取主題的隊(duì)列信息,并從 Broker 中獲取消費(fèi)組內(nèi)當(dāng)前所有消費(fèi)者客戶端 ID。
      3. 對消費(fèi)者 ID 和隊(duì)列進(jìn)行排序,然后調(diào)用 AllocateMessageQueueStrategy 隊(duì)列負(fù)載策略進(jìn)行隊(duì)列負(fù)載,RocketMQ 默認(rèn)提供五種分配算法,推薦使用平均分配或平均輪詢分配算法。
      4. 對比消息隊(duì)列是否發(fā)生變化,若緩存表中的 MessageQueue 不在新分配隊(duì)列集合中,需暫停該隊(duì)列消息消費(fèi)并保存消費(fèi)進(jìn)度;若已分配隊(duì)列不在緩存表中,則創(chuàng)建該隊(duì)列拉取任務(wù)并添加到 PullMessageService 線程任務(wù)隊(duì)列。
      5. 遍歷本次負(fù)載分配到的隊(duì)列集合,若 processQueueTable 中沒有包含該消息隊(duì)列,表明是新增加的消息隊(duì)列,需讀取消費(fèi)進(jìn)度并將新分配隊(duì)列的 PullRequest 放入 PullMessageService 任務(wù)隊(duì)列以便消費(fèi)者拉取消息。

      六. 消息的消費(fèi)過程

      我們先回顧一下消息拉取的過程:PullMessageService 負(fù)責(zé)對消息隊(duì)列進(jìn)行消息拉取,從遠(yuǎn)端服務(wù)器拉取消息后存入 ProcessQueue 消息處理隊(duì)列中,然后調(diào)用 ConsumeMessageService#submitConsumeRequest 方法進(jìn)行消息消費(fèi)。 使用線程池消費(fèi)消息,確保了消息拉取與消息消費(fèi)的解耦。RocketMQ 使用 ConsumeMessageService 來實(shí)現(xiàn)消息消費(fèi)的處理邏輯。RocketMQ 支持順序消費(fèi)與并發(fā)消費(fèi),本節(jié)將重點(diǎn)關(guān)注并發(fā)消費(fèi)的流程。ConsumeMessageService 核心方法如下:

      • ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName): 直接消費(fèi)消息,主要用于通過管理命令接收消費(fèi)消息。
      • void submitConsumeRequest(List msgs, ProcessQueue processQueue, MessageQueue messageQueue, boolean dispathToConsume):提交消息消費(fèi)。

      ConsumeMessageConcurrentlyService并發(fā)消息消費(fèi)核心參數(shù)解釋:

      • DefaultMQPushConsumerImpl defaultMQPushConsumerImpl: 消息推模式實(shí)現(xiàn)類。
      • DefaultMQPushConsumer defaultMQPushConsumer:消費(fèi)者對象。
      • MessageListenerConcurrently messageListener:并發(fā)消息業(yè)務(wù)事件類。
      • BlockingQueue consumeRequestQueue:消息消費(fèi)任務(wù)隊(duì)列。
      • ThreadPoolExecutor consumeExecutor:消息消費(fèi)線程池。
      • String consumerGroup:消費(fèi)組。
      • ScheduledExecutorService scheduledExecutorService:添加消費(fèi)任務(wù)到consumeExecutor延遲調(diào)度器
      • ScheduledExecutorService cleanExpireMsgExecutors:定時(shí)刪除過期消息線程池。為了揭示消息消費(fèi)的完整過程,從服務(wù)器拉取到消息后,回調(diào) PullCallBack 方法,先將消息放入 ProccessQueue 中, 然后把消息提交到消費(fèi)線程池中執(zhí)行,也就是調(diào)用ConsumeMessageService#submitConsumeRequest 開始進(jìn)入消息消費(fèi)的流程。

      6.1 消息消費(fèi)

      消費(fèi)者消息消費(fèi)服務(wù) ConsumeMessageConcurrentlyService 的主要方法是 submitConsumeRequest 提交消費(fèi)請求

      // org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest
              // consumeMessageBatchMaxSize表示消息批次,也就是一次消息消費(fèi)任務(wù)ConsumeRequest中包含的消息條數(shù),默認(rèn)為1。
              // msgs.size()默認(rèn)最多為32條消息,受DefaultMQPushConsumer.pullBatchSize屬性控制,如果msgs.size()
              // 小于consumeMessage BatchMaxSize,則直接將拉取到的消息放入ConsumeRequest,然后將consumeRequest提交到消息消費(fèi)者線程池中
              final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
              if (msgs.size() <= consumeBatchSize) {
                  ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
                  try {
                      this.consumeExecutor.submit(consumeRequest);
                  } catch (RejectedExecutionException e) {
                      // 如果提交過程中出現(xiàn)拒絕提交異常,則延遲5s再提交
                      this.submitConsumeRequestLater(consumeRequest);
                  }
              } 
      

      第一步:consumeMessageBatchMaxSize 表示消息批次,也就是一次消息消費(fèi)任務(wù) ConsumeRequest 中包含的消息條數(shù),默認(rèn)為1。msgs.size() 默認(rèn)最多為32條消息,受 DefaultMQPushConsumer.pullBatchSize 屬性控制,如果 msgs.size() 小于 consumeMessage BatchMaxSize,則直接將拉取到的消息放入 ConsumeRequest,然后將 consumeRequest 提交到消息消費(fèi)者線程池中。如果提交過程中出現(xiàn)拒絕提交異常,則延遲5s再提交。這里其實(shí)是給出一種標(biāo)準(zhǔn)的拒絕提交實(shí)現(xiàn)方式,實(shí)際上,由于消費(fèi)者線程池使用的任務(wù)隊(duì)列LinkedBlockingQueue 為無界隊(duì)列,故不會出現(xiàn)拒絕提交異常。

      // org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest
                  // 如果拉取的消息條數(shù)大于consumeMessageBatchMaxSize,則對拉取消息進(jìn)行分頁,每頁
                  // consumeMessageBatchMaxSize條消息,創(chuàng)建多個(gè)ConsumeRequest任務(wù)并提交到消費(fèi)線程池
                  for (int total = 0; total < msgs.size(); ) {
                      List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
                      for (int i = 0; i < consumeBatchSize; i++, total++) {
                          if (total < msgs.size()) {
                              msgThis.add(msgs.get(total));
                          } else {
                              break;
                          }
                      }
      
                      ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                      try {
                          this.consumeExecutor.submit(consumeRequest);
                      } catch (RejectedExecutionException e) {
                          for (; total < msgs.size(); total++) {
                              msgThis.add(msgs.get(total));
                          }
      
                          this.submitConsumeRequestLater(consumeRequest);
                      }
                  }
      

      第二步:如果拉取的消息條數(shù)大于 consumeMessageBatchMaxSize,則對拉取消息進(jìn)行分頁,每頁 consumeMessageBatchMaxSize 條消息,創(chuàng)建多個(gè) ConsumeRequest 任務(wù)并提交到消費(fèi)線程池。

      // org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run
                  if (this.processQueue.isDropped()) {
                      log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
                      return;
                  }
      

      第三步:進(jìn)入具體的消息消費(fèi)隊(duì)列時(shí),會先檢查 processQueuedropped,如果設(shè)置為true,則停止該隊(duì)列的消費(fèi)。在進(jìn)行消息重新負(fù)載時(shí),如果該消息隊(duì)列被分配給消費(fèi)組內(nèi)的其他消費(fèi)者,需要將 droped 設(shè)置為true,阻止消費(fèi)者繼續(xù)消費(fèi)不屬于自己的消息隊(duì)列。

      // org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run
                  // 執(zhí)行消息消費(fèi)鉤子函數(shù)ConsumeMessageHook#consumeMessageBefore。通過consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(hook)方法消息消費(fèi)執(zhí)行鉤子函數(shù)
                  ConsumeMessageContext consumeMessageContext = null;
                  if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                      consumeMessageContext = new ConsumeMessageContext();
                      consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
                      consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
                      consumeMessageContext.setProps(new HashMap<String, String>());
                      consumeMessageContext.setMq(messageQueue);
                      consumeMessageContext.setMsgList(msgs);
                      consumeMessageContext.setSuccess(false);
                      ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                  }
      

      第四步:執(zhí)行消息消費(fèi)鉤子函數(shù)。

      // org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run
                  // 消息消費(fèi)開始時(shí)間
                  long beginTimestamp = System.currentTimeMillis();
                  boolean hasException = false;
                  ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                  try {
                      if (msgs != null && !msgs.isEmpty()) {
                          for (MessageExt msg : msgs) {
                              MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                          }
                      }
                      // 執(zhí)行業(yè)務(wù)代碼消費(fèi)消息
                      status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
                  } catch (Throwable e) {
                      log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                          RemotingHelper.exceptionSimpleDesc(e),
                          ConsumeMessageConcurrentlyService.this.consumerGroup,
                          msgs,
                          messageQueue);
                      // 若出現(xiàn)異常,則設(shè)置未true
                      hasException = true;
                  }
                  // 消息消費(fèi)耗時(shí)
                  long consumeRT = System.currentTimeMillis() - beginTimestamp;
      

      第六步:執(zhí)行具體的消息消費(fèi),調(diào)用應(yīng)用程序消息監(jiān)聽器的 consumeMessage 方法,進(jìn)入具體的消息消費(fèi)業(yè)務(wù)邏輯,返回該批消息的消費(fèi)結(jié)果,即CONSUME_SUCCESS(消費(fèi)成功)或 RECONSUME_LATER(需要重新消費(fèi))。

      // org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run
                  // 執(zhí)行消息消費(fèi)鉤子函數(shù)ConsumeMessageHook#consumeMessageAfter
                  if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                      consumeMessageContext.setStatus(status.toString());
                      consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
                      ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                  }
      

      第七步:執(zhí)行消息消費(fèi)后置鉤子函數(shù)。

      // org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run
                  // 執(zhí)行業(yè)務(wù)消息消費(fèi)后,在處理結(jié)果前再次驗(yàn)證一次ProcessQueue的isDroped狀態(tài)值。如果狀態(tài)值為true,將不對結(jié)果進(jìn)
                  // 行任何處理。也就是說,在消息消費(fèi)進(jìn)入第四步時(shí),如果因新的消費(fèi)者加入或原先的消費(fèi)者出現(xiàn)宕機(jī),導(dǎo)致原先分配給消費(fèi)者的隊(duì)列在負(fù)
                  // 載之后分配給了別的消費(fèi)者,那么消息會被重復(fù)消費(fèi)
                  if (!processQueue.isDropped()) {
                      // 處理消息消費(fèi)結(jié)果
                      ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
                  } else {
                      log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
                  }
      

      第八步:執(zhí)行業(yè)務(wù)消息消費(fèi)后,在處理結(jié)果前再次驗(yàn)證一次 ProcessQueueisDroped 狀態(tài)值。如果狀態(tài)值為true,將不對結(jié)果進(jìn)行任何處理。也就是說,在消息消費(fèi)進(jìn)入第四步時(shí),如果因新的消費(fèi)者加入或原先的消費(fèi)者出現(xiàn)宕機(jī),導(dǎo)致原先分配給消費(fèi)者的隊(duì)列在負(fù)載之后分配給了別的消費(fèi)者,那么消息會被重復(fù)消費(fèi)。

      // org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult
              // 根據(jù)消息監(jiān)聽器返回的結(jié)果計(jì)算ackIndex
              switch (status) {
                  case CONSUME_SUCCESS:
                      if (ackIndex >= consumeRequest.getMsgs().size()) {
                          // 如果返回CONSUME_SUCCESS,則將ackIndex設(shè)置為msgs.size()-1,這樣在后面就不會執(zhí)行 sendMessageBack,將消息重新
                          // 發(fā)送至broker retry隊(duì)列中去嘗試重新消費(fèi)該消息。
                          ackIndex = consumeRequest.getMsgs().size() - 1;
                      }
                      int ok = ackIndex + 1;
                      int failed = consumeRequest.getMsgs().size() - ok;
                      this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                      this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                      break;
                  case RECONSUME_LATER:
                      // 如果返回 RECONSUME_LATER,則將ackIndex設(shè)置為-1。這樣就會將這一批消息全部發(fā)送至broker retry topic中,然后消費(fèi)者就能重新消費(fèi)到這一批消息
                      ackIndex = -1;
                      this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                          consumeRequest.getMsgs().size());
                      break;
                  default:
                      break;
              }
      

      第九步:根據(jù)消息監(jiān)聽器返回的結(jié)果計(jì)算 ackIndex,如果返回 CONSUME_SUCCESS,則將 ackIndex 設(shè)置為 msgs.size()-1,如果返回RECONSUME_LATER ,則將 ackIndex 設(shè)置為 -1,這是為下文消息ACK做準(zhǔn)備。

      // org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult
              switch (this.defaultMQPushConsumer.getMessageModel()) {
                  case BROADCASTING:
                      for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                          MessageExt msg = consumeRequest.getMsgs().get(i);
                          log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                      }
                      break;
                  case CLUSTERING:
                      List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                      for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                          MessageExt msg = consumeRequest.getMsgs().get(i);
                          // 將消息重新發(fā)送至broker的 retry topic中,
                          boolean result = this.sendMessageBack(msg, context);
                          if (!result) {
                              msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                              msgBackFailed.add(msg);
                          }
                      }
      
                      if (!msgBackFailed.isEmpty()) {
                          consumeRequest.getMsgs().removeAll(msgBackFailed);
                          // 消息確認(rèn)失敗,則五秒后重新消費(fèi)消息
                          this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                      }
                      break;
                  default:
                      break;
              }
      

      第十步:如果是廣播模式,業(yè)務(wù)代碼返回 RECONSUME_LATER,消息并不會被重新消費(fèi),而是以警告級別輸出到日志文件中。

      如果是集群模式,消息消費(fèi)成功,因?yàn)?ackIndex=consumeRequest.getMsgs().size()-1,所以 i=ackIndex+1 等于consumeRequest.getMsgs().size(),并不會執(zhí)行 sendMessageBack。 只有在業(yè)務(wù)代碼返回 RECONSUME_LATER 時(shí),該批消息都需要發(fā)送至Broker的重試隊(duì)列中,如果消息發(fā)送失敗,則直接將本次發(fā)送失敗的消息再次封裝為ConsumeRequest,然后延遲5s重新消費(fèi)。如果ACK消息發(fā)送成功,則該消息會延遲消費(fèi)。

      // org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult
              // 從 processQueue中移除已確認(rèn)消息,返回的偏移量是移除該批消息后最小的偏移量。
              long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
              if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
                  // 然后更新已消費(fèi)的offset,以便消費(fèi)者重啟后能從上一次的消費(fèi)進(jìn)度開始消費(fèi)
                  this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
              }
      

      第十一步:ProcessQueue 中移除這批消息,這里返回的偏移量是移除該批消息后內(nèi)存中正在處理的消息的最小的偏移量。然后用該偏移量更新消息消費(fèi)進(jìn)度,以便消費(fèi)者重啟后能從上一次的消費(fèi)進(jìn)度開始消費(fèi),避免消息重復(fù)消費(fèi)。值得注意的是,當(dāng)消息監(jiān)聽器返回 RECONSUME_LATER 時(shí),消息消費(fèi)進(jìn)度也會向前推進(jìn),并用 ProcessQueue 中最小的隊(duì)列偏移量調(diào)用消息消費(fèi)進(jìn)度存儲器 OffsetStore 更新消費(fèi)進(jìn)度。這是因?yàn)楫?dāng)返回 RECONSUME_LATER 時(shí),RocketMQ 會創(chuàng)建一條與原消息屬性相同的消息,擁有一個(gè)唯一的新 msgId,并存儲原消息ID,該消息會存入 CommitLog 文件,與原消息沒有任何關(guān)聯(lián),所以該消息也會進(jìn)入 ConsuemeQueue, 并擁有一個(gè)全新的隊(duì)列偏移量。

      為啥會使用內(nèi)存中剩余消息最小偏移量更新消費(fèi)進(jìn)度,這是因?yàn)椴l(fā)消費(fèi)模式下,不同消息的消費(fèi)完成無法保證順序。例如按照順序拉取到了4條消息 a,b,c,d,由于是并發(fā)消費(fèi),這四條消息可能被消費(fèi)者線程同時(shí)消費(fèi),假設(shè)消息d先消費(fèi)完成,此時(shí)更新消費(fèi)進(jìn)度,因?yàn)閍、b、c沒有消費(fèi)完成,不能將進(jìn)度更新為消息d的offset,而是將消息d從 ProcessQueue 中移除,移除后內(nèi)存只剩下 a、b、c 三條消息,此時(shí)會將消費(fèi)進(jìn)度更新為ProcessQueue 中最小的消息偏移量,也就是 a。

      6.2 消息消費(fèi)失敗重試機(jī)制

      如果消息監(jiān)聽器返回的消費(fèi)結(jié)果為 RECONSUME_LATER,則需要將這些消息發(fā)送給Broker的重試topic中。如果客戶端發(fā)送重試消息至Broker失敗,將延遲5s后提交線程池進(jìn)行消費(fèi)。

      重試消息發(fā)送的網(wǎng)絡(luò)客戶端入口為 MQClientAPIImpl#consumerSendMessageBack,命令編碼為 RequestCode.CONSUMER_SEND_MSG_BACK。

      ConsumerSendMsgBackRequestHeader 的核心屬性:

      public class ConsumerSendMsgBackRequestHeader implements CommandCustomHeader {
          /**
           * 消息物理偏移量,因?yàn)樾枰卦嚨南⒃贐roker中本來就有,所以發(fā)送重試消息只需要發(fā)送消息的物理偏移量即可
           */
          @CFNotNull
          private Long offset;
          /**
           * 消費(fèi)組名
           */
          @CFNotNull
          private String group;
          /**
           * 延遲級別。RcketMQ不支持精確的定時(shí)消息調(diào)度,而是提供幾個(gè)延時(shí)級別,MessageStoreConfig#messageDelayLevel = "1s 5s 10s 30s 1m 2m
           * 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h",delayLevel=1,表示延遲5s,delayLevel=2,表示延遲10s。
           */
          @CFNotNull
          private Integer delayLevel;
          /**
           * 原消息的消息ID
           */
          private String originMsgId;
          /**
           * 原消息的topic
           */
          private String originTopic;
          @CFNullable
          private boolean unitMode = false;
          /**
           * 最大重新消費(fèi)次數(shù),默認(rèn)16次。
           */
          private Integer maxReconsumeTimes;
      }
      

      客戶端以同步方式發(fā)送 RequestCode.CONSUMER_SEND 到服務(wù)端。服務(wù)端命令處理器為 org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack

      // org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack
              // 獲取消費(fèi)組的訂閱配置信息
              // 獲取消費(fèi)組的訂閱配置信息
              SubscriptionGroupConfig subscriptionGroupConfig =
                  this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
              if (null == subscriptionGroupConfig) {
                  response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
                  response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
                      + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
                  return response;
              }
      
              if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
                  response.setCode(ResponseCode.NO_PERMISSION);
                  response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
                  return response;
              }
      
              if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
                  response.setCode(ResponseCode.SUCCESS);
                  response.setRemark(null);
                  return response;
              }
      

      第一步:獲取消費(fèi)組的訂閱配置信息,如果配置信息為空,返回配置組信息不存在錯(cuò)誤,如果重試隊(duì)列數(shù)量小于等于0,則直接返回成功,說明該消費(fèi)組不支持重試。

      我們先逐一介紹 SubscriptionGroupConfig 的核心屬性:

      public class SubscriptionGroupConfig {
      
          /**
           * 消費(fèi)組名
           */
          private String groupName;
          /**
           * 是否可以消費(fèi),默認(rèn)為true,如果consumeEnable=false,該消費(fèi)組無法拉取消息,因而無法消費(fèi)消息
           */
          private boolean consumeEnable = true;
          /**
           * 是否允許從隊(duì)列最小偏移量開始消費(fèi),默認(rèn)為true,目前未使用該參數(shù)
           */
          private boolean consumeFromMinEnable = true;
          /**
           * 設(shè)置該消費(fèi)組是否能以廣播模式消費(fèi),默認(rèn)為true,如果設(shè)置為false,表示只能以集群模式消費(fèi)
           */
          private boolean consumeBroadcastEnable = true;
          /**
           * 重試隊(duì)列個(gè)數(shù),默認(rèn)為1,每一個(gè)Broker上有一個(gè)重試隊(duì)列
           */
          private int retryQueueNums = 1;
          /**
           * 消息最大重試次數(shù),默認(rèn)16次
           */
          private int retryMaxTimes = 16;
          /**
           * 主節(jié)點(diǎn)ID
           */
          private long brokerId = MixAll.MASTER_ID;
          /**
           * 如果消息堵塞(主節(jié)點(diǎn)),將轉(zhuǎn)向該brokerId的服務(wù)器上拉取消息,默認(rèn)為1
           */
          private long whichBrokerWhenConsumeSlowly = 1;
          /**
           * 當(dāng)消費(fèi)發(fā)生變化時(shí),是否
           * 立即進(jìn)行消息隊(duì)列重新負(fù)載。消費(fèi)組訂閱信息配置信息存儲在Broker
           * 的 ${ROCKET_HOME}/store/config/subscriptionGroup.json中。
           * BrokerConfig.autoCreateSubscriptionGroup默認(rèn)為true,表示在第
           * 一次使用消費(fèi)組配置信息時(shí)如果不存在消費(fèi)組,則使用上述默認(rèn)值自
           * 動創(chuàng)建一個(gè),如果為false,則只能通過客戶端命令mqadmin
           * updateSubGroup創(chuàng)建消費(fèi)組后再修改相關(guān)參數(shù)
           */
          private boolean notifyConsumerIdsChangedEnable = true;
      }
      

      回到 consumerSendMsgBack 方法:

      // org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack
              // 創(chuàng)建重試主題,重試主題名稱為%RETRY%+消費(fèi)組名稱,從重試隊(duì)列中隨機(jī)選擇一個(gè)隊(duì)列,并構(gòu)建TopicConfig主題配置信息
              String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
              int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
              // 如果沒有重試主題則創(chuàng)建一個(gè)
              TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                  newTopic,
                  subscriptionGroupConfig.getRetryQueueNums(),
                  PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
              if (null == topicConfig) {
                  response.setCode(ResponseCode.SYSTEM_ERROR);
                  response.setRemark("topic[" + newTopic + "] not exist");
                  return response;
              }
      
              if (!PermName.isWriteable(topicConfig.getPerm())) {
                  response.setCode(ResponseCode.NO_PERMISSION);
                  response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
                  return response;
              }
      

      第二步:創(chuàng)建重試主題,重試主題名稱為%RETRY%+消費(fèi)組名稱,從重試隊(duì)列中隨機(jī)選擇一個(gè)隊(duì)列,并構(gòu)建 TopicConfig 主題配置信息。

      // org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack
      
              // 根據(jù)消息物理偏移量從CommitLog中獲取消息
              MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
              if (null == msgExt) {
                  response.setCode(ResponseCode.SYSTEM_ERROR);
                  response.setRemark("look message by offset failed, " + requestHeader.getOffset());
                  return response;
              }
              final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
              if (null == retryTopic) {
                  // 將消息的主題信息存入屬性
                  MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
              }
              msgExt.setWaitStoreMsgOK(false);
      

      第三步:根據(jù)消息物理偏移量從CommitLog文件中獲取消息,因?yàn)樾枰卦嚨南⒃贐roker中本來就有,所以發(fā)送重試消息只發(fā)送消息的物理偏移量并沒有發(fā)送消息內(nèi)容。同時(shí)將消息的主題存入屬性。

      // org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack
         int delayLevel = requestHeader.getDelayLevel();
      
              int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
              if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
                  maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
              }
      
              if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
                  || delayLevel < 0) {
                  // 設(shè)置消息重試次數(shù),如果消息重試次數(shù)已超過maxReconsumeTimes,再次改變newTopic主題為DLQ("%DLQ%"),該主
                  // 題的權(quán)限為只寫,說明消息一旦進(jìn)入DLQ隊(duì)列,RocketMQ將不負(fù)責(zé)再次調(diào)度消費(fèi)了,需要人工干預(yù)
                  newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
                  queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
      
                  topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                      DLQ_NUMS_PER_GROUP,
                      PermName.PERM_WRITE, 0
                  );
                  if (null == topicConfig) {
                      response.setCode(ResponseCode.SYSTEM_ERROR);
                      response.setRemark("topic[" + newTopic + "] not exist");
                      return response;
                  }
              } else {
                  if (0 == delayLevel) {
                      delayLevel = 3 + msgExt.getReconsumeTimes();
                  }
      			// 設(shè)置延時(shí)級別
                  msgExt.setDelayTimeLevel(delayLevel);
              }
      
      

      第四步:設(shè)置消息重試次數(shù),如果消息重試次數(shù)已超過 maxReconsumeTimes,再次改變 newTopic 主題為DLQ("%DLQ%")也就是死信隊(duì)列,該主題的權(quán)限為只寫,說明消息一旦進(jìn)入DLQ隊(duì)列,RocketMQ將不負(fù)責(zé)再次調(diào)度消費(fèi)了,需要人工干預(yù)。

      // org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack
              // 根據(jù)原先的消息創(chuàng)建一個(gè)新的消息對象,重試消息會擁有一個(gè)唯一消息ID(msgId)并存入CommitLog文件。這里不會更新原
              // 先的消息,而是會將原先的主題、消息ID存入消息屬性,主題名稱為重試主題,其他屬性與原消息保持一致。
              MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
              // 將重試的消息放入重試topic,或則死信topic
              msgInner.setTopic(newTopic);
              msgInner.setBody(msgExt.getBody());
              msgInner.setFlag(msgExt.getFlag());
              MessageAccessor.setProperties(msgInner, msgExt.getProperties());
              msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
              msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
      
              msgInner.setQueueId(queueIdInt);
              msgInner.setSysFlag(msgExt.getSysFlag());
              msgInner.setBornTimestamp(msgExt.getBornTimestamp());
              msgInner.setBornHost(msgExt.getBornHost());
              msgInner.setStoreHost(this.getStoreHost());
              msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
      
              // 原始的消息ID
              String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
              // 設(shè)置原始的消息ID
              MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
      

      第五步:根據(jù)原先的消息創(chuàng)建一個(gè)新的消息對象,重試消息會擁有一個(gè)唯一消息ID(msgId)并存入CommitLog文件。這里不會更新原先的消息,而是會將原先的主題、消息ID存入消息屬性,主題名稱為重試主題,其他屬性與原消息保持一致。

      // org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack
              // 將重試的消息寫入重試topic,或則死信topic。根據(jù) newTopic 變量決定寫入哪個(gè)topic
              PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
              if (putMessageResult != null) {
                  switch (putMessageResult.getPutMessageStatus()) {
                      case PUT_OK:
                          String backTopic = msgExt.getTopic();
                          String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
                          if (correctTopic != null) {
                              backTopic = correctTopic;
                          }
      
                          this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
      
                          response.setCode(ResponseCode.SUCCESS);
                          response.setRemark(null);
      
                          return response;
                      default:
                          break;
                  }
      
                  response.setCode(ResponseCode.SYSTEM_ERROR);
                  response.setRemark(putMessageResult.getPutMessageStatus().name());
                  return response;
              }
      

      第六步:將消息存入 CommitLog 文件。這里想再重點(diǎn)突出消息重試機(jī)制,該機(jī)制的實(shí)現(xiàn)依托于RocketMQ延時(shí)消息機(jī)制。在第四步,會設(shè)置消息的延時(shí)級別,設(shè)置延時(shí)級別后消息實(shí)際上并不會直接發(fā)送至重試topic中(SCHEDULE_TOPIC_XXXX),而是先發(fā)送至延時(shí)隊(duì)列中,當(dāng)延時(shí)到期后才會由定時(shí)任務(wù)將消息發(fā)送回真正的重試隊(duì)列中,此時(shí)就能由客戶端消費(fèi)到重試隊(duì)列中的消息了。延時(shí)消息具體原理可參考:RocketMQ延遲消息

      6.3 消費(fèi)進(jìn)度管理

      消息消費(fèi)者在消費(fèi)一批消息后,需要記錄該批消息已經(jīng)消費(fèi)完畢,否則當(dāng)消費(fèi)者重新啟動時(shí),又要從消息消費(fèi)隊(duì)列最開始消費(fèi)。從6.1節(jié)也可以看到,一次消息消費(fèi)后會從 ProcessQueue 處理隊(duì)列中移除該批消息,返回 ProcessQueue 內(nèi)存中正在處理消息的最小偏移量,并存入消息進(jìn)度表。 那么消息進(jìn)度文件存儲在哪里合適呢?

      1)廣播模式:同一個(gè)消費(fèi)組的所有消息消費(fèi)者都需要消費(fèi)主題下的所有消息,也就是同組內(nèi)消費(fèi)者的消息消費(fèi)行為是對立的,互相不影響,故消息進(jìn)度需要獨(dú)立存儲,最理想的存儲地方應(yīng)該是與消費(fèi)者綁定。

      2)集群模式:同一個(gè)消費(fèi)組內(nèi)的所有消息消費(fèi)者共享消息主題下的所有消息,同一條消息(同一個(gè)消息消費(fèi)隊(duì)列)在同一時(shí)間只會被消費(fèi)組內(nèi)的一個(gè)消費(fèi)者消費(fèi),并且隨著消費(fèi)隊(duì)列的動態(tài)變化而重新負(fù)載,因此消費(fèi)進(jìn)度需要保存在每個(gè)消費(fèi)者都能訪問到的地方。

      RocketMQ消息消費(fèi)進(jìn)度接口如下:

      /**
       * Offset store interface
       */
      public interface OffsetStore {
          /**
           * 從消息進(jìn)度存儲文件加載消息進(jìn)度到內(nèi)存
           * Load
           */
          void load() throws MQClientException;
      
          /**
           * 更新內(nèi)存中的消息消費(fèi)進(jìn)度
           * @param mq 消息消費(fèi)隊(duì)列
           * @param offset 消息消費(fèi)偏移量
           * @param increaseOnly true表示offset必須大于內(nèi)存中當(dāng)前的消費(fèi)偏移量才更新
           */
          void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);
      
          /**
           * 讀取消息消費(fèi)進(jìn)度
           * @param mq 消息消費(fèi)隊(duì)列
           * @param type 讀取方式,可選值包括
           * READ_FROM_MEMORY,即從內(nèi)存中讀取,READ_FROM_STORE,即從磁盤中讀取,MEMORY_FIRST_THEN_STORE,即先從內(nèi)存中讀取,再從磁盤中讀取
           * @return
           */
          long readOffset(final MessageQueue mq, final ReadOffsetType type);
      
          /**
           * 持久化指定消息隊(duì)列進(jìn)度到磁盤
           * @param mqs
           */
          void persistAll(final Set<MessageQueue> mqs);
      
          /**
           * Persist the offset,may be in local storage or remote name server
           */
          void persist(final MessageQueue mq);
      
          /**
           * 將消息隊(duì)列的消息消費(fèi)進(jìn)度從內(nèi)存中移除。
           * @param mq
           */
          void removeOffset(MessageQueue mq);
      
          /**
           * 復(fù)制該主題下所有消息隊(duì)列的消息消費(fèi)進(jìn)度。
           * @param topic
           * @return
           */
          Map<MessageQueue, Long> cloneOffsetTable(String topic);
      
          /**
           * 使用集群模式更新存儲在Broker端的消息消費(fèi)進(jìn)度
           * @param mq
           * @param offset
           * @param isOneway
           */
          void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
              MQBrokerException, InterruptedException, MQClientException;
      }
      

      6.3.1 廣播模式消費(fèi)進(jìn)度存儲

      廣播模式消息消費(fèi)進(jìn)度存儲在消費(fèi)者本地,其實(shí)現(xiàn)類為 LocalFileOffsetStore

      public class LocalFileOffsetStore implements OffsetStore {
          /**
           * 消息進(jìn)度存儲目錄
           */
          public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
              "rocketmq.client.localOffsetStoreDir",
              System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
          private final static InternalLogger log = ClientLogger.getLog();
          private final MQClientInstance mQClientFactory;
          private final String groupName;
          /**
           * 消息進(jìn)度存儲文件LOCAL_OFFSET_STORE_DIR/.rocketmq_offsets/{mQClientFactory.getClientId()}/groupName/offsets.json
           */
          private final String storePath;
          /**
           * 消息消費(fèi)進(jìn)度(內(nèi)存)
           */
          private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
              new ConcurrentHashMap<MessageQueue, AtomicLong>();
      }
      

      下面對 LocalFileOffsetStore 核心方法進(jìn)行簡單介紹,load方法用于從磁盤加載消息消費(fèi)進(jìn)度:

      // org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore#load
          @Override
          public void load() throws MQClientException {
              // OffsetSerializeWrapper內(nèi)部就是ConcurrentMap<MessageQueue,AtomicLong>offsetTable數(shù)據(jù)結(jié)構(gòu)的封裝,readLocalOffset方法首先
              // 從storePath中嘗試加載內(nèi)容,如果讀取的內(nèi)容為空,嘗試從storePath+".bak"中加載,如果還是未找到內(nèi)容,則返回null。
              OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
              if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
                  offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
      
                  for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
                      AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
                      log.info("load consumer's offset, {} {} {}",
                          this.groupName,
                          mq,
                          offset.get());
                  }
              }
          }
      

      OffsetSerializeWrapper 內(nèi)部就是 ConcurrentMap<MessageQueue, AtomicLong> offsetTable 數(shù)據(jù)結(jié)構(gòu)的封裝,readLocalOffset 方法首先從 storePath 中嘗試加載內(nèi)容,如果讀取的內(nèi)容為空,嘗試從 storePath+".bak" 中加載,如果還是未找到內(nèi)容,則返回null。

      廣播消息消費(fèi)進(jìn)度默認(rèn)存儲在:

      消息進(jìn)度文件存儲內(nèi)容:

      {
      	"offsetTable":{{
      			"brokerName":"broker-a",
      			"queueId":2,
      			"topic":"TopicTest"
      		}:333,{
      			"brokerName":"broker-a",
      			"queueId":1,
      			"topic":"TopicTest"
      		}:335,{
      			"brokerName":"broker-a",
      			"queueId":0,
      			"topic":"TopicTest"
      		}:335
      	}
      }
      

      persistAll方法用于將內(nèi)存中的消費(fèi)進(jìn)度持久化到磁盤中:

      // org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore#persistAll
          @Override
          public void persistAll(Set<MessageQueue> mqs) {
              if (null == mqs || mqs.isEmpty())
                  return;
      
              OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
              for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
                  if (mqs.contains(entry.getKey())) {
                      AtomicLong offset = entry.getValue();
                      offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
                  }
              }
      
              String jsonString = offsetSerializeWrapper.toJson(true);
              if (jsonString != null) {
                  try {
                      // 將內(nèi)存中的廣播消費(fèi)進(jìn)度存儲至磁盤
                      MixAll.string2File(jsonString, this.storePath);
                  } catch (IOException e) {
                      log.error("persistAll consumer offset Exception, " + this.storePath, e);
                  }
              }
          }
      

      持久化消息進(jìn)度就是將 ConcurrentMap<MessageQueue, AtomicLong> offsetTable 序列化到磁盤文件中。在 MQClientInstance 中會啟動一個(gè)定時(shí)任務(wù),默認(rèn)每5s持久化消息消費(fèi)進(jìn)度一次,可通過 persistConsumerOffsetInterval 進(jìn)行設(shè)置。

      // org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask
              this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
      
                  @Override
                  public void run() {
                      try {
                          MQClientInstance.this.persistAllConsumerOffset();
                      } catch (Exception e) {
                          log.error("ScheduledTask persistAllConsumerOffset exception", e);
                      }
                  }
              }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
      

      對廣播模式的消息消費(fèi)進(jìn)度進(jìn)行存儲、更新、持久化還是比較容易的,本文就簡單介紹到這里,接下來重點(diǎn)分析集群模式下的消息進(jìn)度管理。

      6.3.2 集群模式消費(fèi)進(jìn)度存儲

      集群模式消息進(jìn)度存儲文件存放在消息服務(wù)端。消息消費(fèi)進(jìn)度集群模式實(shí)現(xiàn)類 RemoteBrokerOffsetStore

      集群模式下消息消費(fèi)進(jìn)度的讀取、持久化與廣播模式的實(shí)現(xiàn)細(xì)節(jié)差不多,如果是集群消費(fèi)模式,當(dāng) RebalanceServiceMessageQueue 負(fù)載到當(dāng)前客戶端時(shí),會調(diào)用 org.apache.rocketmq.client.impl.consumer.RebalancePushImpl#computePullFromWhere 方法從broker中獲取當(dāng)前 MessageQueue 的消費(fèi)進(jìn)度:

      // org.apache.rocketmq.client.impl.consumer.RebalancePushImpl#computePullFromWhere
          @Override
          public long computePullFromWhere(MessageQueue mq) {
              long result = -1;
              final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
              final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
              switch (consumeFromWhere) {
                  case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
                  case CONSUME_FROM_MIN_OFFSET:
                  case CONSUME_FROM_MAX_OFFSET:
                  case CONSUME_FROM_LAST_OFFSET: {
                      // 集群模式下獲取broker中的消費(fèi)進(jìn)度、廣播模式下從本地讀取消費(fèi)進(jìn)度
                      long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                      if (lastOffset >= 0) {
                          result = lastOffset;
                      }
                      // First start,no offset
                      else if (-1 == lastOffset) {
                          if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                              result = 0L;
                          } else {
                              try {
                                  result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                              } catch (MQClientException e) {
                                  result = -1;
                              }
                          }
                      } else {
                          result = -1;
                      }
                      break;
                  }
                  ....
          }
      

      而該方法內(nèi)會調(diào)用 RemoteBrokerOffsetStore#readOffset 方法從broker獲取消費(fèi)進(jìn)度:

      // org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#readOffset
          @Override
          public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
              if (mq != null) {
                  switch (type) {
                      case MEMORY_FIRST_THEN_STORE:
                      case READ_FROM_MEMORY: {
                          AtomicLong offset = this.offsetTable.get(mq);
                          if (offset != null) {
                              return offset.get();
                          } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
                              return -1;
                          }
                      }
                      case READ_FROM_STORE: {
                          try {
                              // 從broker獲取指定MessageQueue當(dāng)前消費(fèi)組的消費(fèi)進(jìn)度
                              long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
                              AtomicLong offset = new AtomicLong(brokerOffset);
                              // 更新內(nèi)存中的消費(fèi)進(jìn)度
                              this.updateOffset(mq, offset.get(), false);
                              return brokerOffset;
                          }
                          // No offset in broker
                          catch (MQBrokerException e) {
                              return -1;
                          }
                          //Other exceptions
                          catch (Exception e) {
                              log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
                              return -2;
                          }
                      }
                      default:
                          break;
                  }
              }
      
              return -1;
          }
      

      RemoteBrokerOffsetStore 會把從broker查詢到的消費(fèi)進(jìn)度進(jìn)行緩存,其余時(shí)間客戶端查詢和更新消費(fèi)進(jìn)度都是基于內(nèi)存中的數(shù)據(jù),而消費(fèi)進(jìn)度的持久化同樣是通過定時(shí)任務(wù)(默認(rèn)每5s)調(diào)用 RemoteBrokerOffsetStore#persistAll 將內(nèi)存中的消費(fèi)進(jìn)度通過網(wǎng)絡(luò)請求寫入broker磁盤。

      broker處理客戶端消費(fèi)進(jìn)度更新請求的處理類是 ConsumerManageProcessor#updateConsumerOffset:

      // org.apache.rocketmq.broker.processor.ConsumerManageProcessor#updateConsumerOffset
          private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
              throws RemotingCommandException {
              final RemotingCommand response =
                  RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
              final UpdateConsumerOffsetRequestHeader requestHeader =
                  (UpdateConsumerOffsetRequestHeader) request
                      .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
              // 更新broker中的消費(fèi)進(jìn)度
              this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
                  requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
              response.setCode(ResponseCode.SUCCESS);
              response.setRemark(null);
              return response;
          }
      

      更新的請求中,主要包含如下信息:

      public class UpdateConsumerOffsetRequestHeader implements CommandCustomHeader {
      
          /**
           * 更新的消費(fèi)組
           */
          @CFNotNull
          private String consumerGroup;
          /**
           * 主題名稱
           */
          @CFNotNull
          private String topic;
          /**
           * 隊(duì)列ID
           */
          @CFNotNull
          private Integer queueId;
          /**
           * 更新的消費(fèi)進(jìn)度
           */
          @CFNotNull
          private Long commitOffset;
      }
      

      updateConsumerOffset 方法最終會調(diào)用下列方法更新broker內(nèi)存中的消費(fèi)進(jìn)度:

      // org.apache.rocketmq.broker.offset.ConsumerOffse消費(fèi)進(jìn)度文件內(nèi)容tManager#commitOffset(java.lang.String, java.lang.String, int, long)
          public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
              final long offset) {
              // topic@group
              String key = topic + TOPIC_GROUP_SEPARATOR + group;
              this.commitOffset(clientHost, key, queueId, offset);
          }
      
          private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
              ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
              if (null == map) {
                  map = new ConcurrentHashMap<Integer, Long>(32);
                  map.put(queueId, offset);
                  this.offsetTable.put(key, map);
              } else {
                  Long storeOffset = map.put(queueId, offset);
                  if (storeOffset != null && offset < storeOffset) {
                      log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
                  }
              }
          }
      

      broker存儲消費(fèi)進(jìn)度的思路和客戶端類似,先將消費(fèi)進(jìn)度存儲在內(nèi)存中,然后通過JOB每隔10s調(diào)用 ConfigManager#persist() 刷盤,broker在啟動時(shí)會創(chuàng)建這個(gè)JOB:org.apache.rocketmq.broker.BrokerController#initialize()

      // org.apache.rocketmq.broker.offset.ConsumerOffsetManager#persist
      
          public synchronized void persist() {
              String jsonString = this.encode(true);
              if (jsonString != null) {
                  String fileName = this.configFilePath();
                  try {
                      MixAll.string2File(jsonString, fileName);
                  } catch (IOException e) {
                      log.error("persist file " + fileName + " exception", e);
                  }
              }
          }
      

      消費(fèi)進(jìn)度文件存儲目錄在broker配置文件中配置:消費(fèi)進(jìn)度文件內(nèi)容

      消費(fèi)進(jìn)度文件內(nèi)容:

      {
      	"offsetTable":{
              // Topic名稱@ConsumeGroup名稱 : 消費(fèi)進(jìn)度
      		"TopicTest@please_rename_unique_group_name_4":{
                  // queueId : 消費(fèi)進(jìn)度
                  0:1,
                  1:0,
                  2:0,
                  3:0
      		},
      		"TopicTest@TopicTest-Consumer":{0:335,1:335,2:333
      		},
      		"%RETRY%TopicTest-Consumer@TopicTest-Consumer":{0:0
      		},
      		"%RETRY%rocket-mq-consumer-demo@rocket-mq-consumer-demo":{0:2
      		},
      		"%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:0
      		},
      		"rocket-mq-topic@rocket-mq-consumer-demo":{0:0,1:0,2:1,3:1
      		}
      	}
      }
      

      RocketMQ 控制臺消費(fèi)進(jìn)度查詢:

      小結(jié)

      客戶端拉取消費(fèi)進(jìn)度與消費(fèi)進(jìn)度的持久化

      集群消費(fèi)模式,當(dāng) RebalanceServiceMessageQueue 負(fù)載到當(dāng)前客戶端時(shí),會調(diào)用RebalancePushImpl#computePullFromWhere方法從broker中獲取當(dāng)前 MessageQueue 的消費(fèi)進(jìn)度 RemoteBrokerOffsetStore 會把從broker查詢到的消費(fèi)進(jìn)度進(jìn)行緩存,其余時(shí)間客戶端查詢和更新消費(fèi)進(jìn)度都是基于內(nèi)存中的數(shù)據(jù),而消費(fèi)進(jìn)度的持久化同樣是通過定時(shí)任務(wù)(默認(rèn)每5s)調(diào)用 RemoteBrokerOffsetStore#persistAll 將內(nèi)存中的消費(fèi)進(jìn)度通過網(wǎng)絡(luò)請求寫入broker磁盤。

      客戶端何時(shí)更新內(nèi)存中的消費(fèi)進(jìn)度

      業(yè)務(wù)代碼消費(fèi) PullMessageService 拉取到的消息時(shí),不管是消費(fèi)成功還是消費(fèi)失敗,都會將消費(fèi)完的消息從 ProcessQueue 移除,并獲取移除后內(nèi)存中剩余消息的最小偏移量,并已該最小偏移量更新客戶端內(nèi)存中的消費(fèi)進(jìn)度。

      為啥會使用內(nèi)存中剩余消息最小偏移量更新消費(fèi)進(jìn)度,這是因?yàn)椴l(fā)消費(fèi)模式下,不同消息的消費(fèi)完成無法保證順序。例如按照順序拉取到了4條消息 a,b,c,d,由于是并發(fā)消費(fèi),這四條消息可能被消費(fèi)者線程同時(shí)消費(fèi),假設(shè)消息d先消費(fèi)完成,此時(shí)更新消費(fèi)進(jìn)度,因?yàn)閍、b、c沒有消費(fèi)完成,不能將進(jìn)度更新為消息d的offset,而是將消息d從 ProcessQueue 中移除,移除后內(nèi)存只剩下 a、b、c 三條消息,此時(shí)會將消費(fèi)進(jìn)度更新為ProcessQueue 中最小的消息偏移量,也就是 a。

      客戶端何時(shí)更新broker消費(fèi)進(jìn)度

      除了定時(shí)任務(wù)上報(bào)消費(fèi)進(jìn)度,消費(fèi)者client每次拉取消息時(shí)都會在請求頭中攜帶其本地的消費(fèi)進(jìn)度,broker收到拉取請求后會調(diào)用 ConsumerOffsetManager#commitOffset 方法更新broker的消費(fèi)進(jìn)度。

      • 消費(fèi)者Client攜帶本地消費(fèi)進(jìn)度拉取消息的入口:DefaultMQPushConsumerImpl#pullMessage
      • broker處理拉取請求,并存儲消費(fèi)進(jìn)度的入口:PullMessageProcessor#processRequest

      broker存儲消費(fèi)進(jìn)度的思路和客戶端類似,先將消費(fèi)進(jìn)度存儲在內(nèi)存中,然后通過JOB每隔10s調(diào)用

      ConfigManager#persist() 刷盤,broker在啟動時(shí)會創(chuàng)建這個(gè)JOB:BrokerController#initialize()。

      七. 總結(jié)

      該文章主要介紹了 RocketMQ 客戶端消息消費(fèi)的原理,包括消息消費(fèi)模型、消費(fèi)者啟動流程、消息拉取、消息隊(duì)列負(fù)載與重平衡以及消息的消費(fèi)過程等內(nèi)容,具體總結(jié)如下:

      1. 消息消費(fèi)模型:

        • 兩種消費(fèi)模式:廣播模式與集群模式,廣播模式下每一個(gè)消費(fèi)者需要拉取訂閱主題下所有消費(fèi)隊(duì)列的消息,集群模式下同一個(gè)消費(fèi)組內(nèi)的多個(gè)消息消費(fèi)者共享消息主題下的所有消息。
        • 并發(fā)消費(fèi)模型:RocketMQ 客戶端為每一個(gè)消費(fèi)組創(chuàng)建獨(dú)立的消費(fèi)線程池,單個(gè)消費(fèi)組內(nèi)的并發(fā)度為線程池線程個(gè)數(shù),線程池處理一批消息后會向 Broker 匯報(bào)消息消費(fèi)進(jìn)度。
        • 消息消費(fèi)進(jìn)度反饋機(jī)制:消費(fèi)線程池在處理完一批消息后,會將消息消費(fèi)進(jìn)度存儲在本地內(nèi)存中,客戶端會啟動一個(gè)定時(shí)線程,每 5s 將存儲在本地內(nèi)存中的所有隊(duì)列消息消費(fèi)偏移量提交到 Broker 中,Broker 收到的消息消費(fèi)進(jìn)度會存儲在內(nèi)存中,每隔 5s 將消息消費(fèi)偏移量持久化到磁盤文件中。
      2. 消費(fèi)者啟動流程:

        • 構(gòu)建主題訂閱信息:將主題訂閱信息構(gòu)建為 SubscriptionData 并加入 RebalanceImpl 的訂閱消息中。
        • 初始化相關(guān)組件:初始化 MQClientInstance、RebalanceImple 等,初始化消息進(jìn)度,根據(jù)消費(fèi)模式選擇不同的消息進(jìn)度存儲方式。
        • 創(chuàng)建消費(fèi)線程服務(wù):根據(jù)消費(fèi)模式創(chuàng)建相應(yīng)的消費(fèi)線程服務(wù),如順序消費(fèi)創(chuàng)建 ConsumeMessageOrderlyService,并發(fā)消費(fèi)創(chuàng)建 ConsumeMessageConcurrentlyService。
        • 向 MQClientInstance 注冊消費(fèi)者并啟動:向 MQClientInstance 注冊消費(fèi)者,啟動 MQClientInstance。
      3. 消息拉?。?/p>

        • PullMessageService 實(shí)現(xiàn)機(jī)制:PullMessageService 繼承 ServiceThread,通過 run () 方法從 pullRequestQueue 中獲取 PullRequest 消息拉取任務(wù),然后調(diào)用 pullMessage 方法進(jìn)行消息拉取。
        • ProcessQueue 實(shí)現(xiàn)機(jī)制:ProcessQueue 是 MessageQueue 在消費(fèi)端的重現(xiàn)、快照,用于存儲從 Broker 拉取的消息,PullMessageService 將消息提交到消費(fèi)者消費(fèi)線程池,消息成功消費(fèi)后,再從 ProcessQueue 中移除。
        • 消息拉取的基本流程:
          • 客戶端封裝消息拉取請求:包括獲取 ProcessQueue、進(jìn)行消息拉取流控、獲取主題訂閱信息、構(gòu)建消息拉取系統(tǒng)標(biāo)記、調(diào)用 PullAPIWrapper.pullKernelImpl 方法與服務(wù)端交互等步驟。
          • Broker 組裝消息:根據(jù)訂閱信息構(gòu)建消息過濾器,調(diào)用 MessageStore.getMessage 查找消息,根據(jù)主題名稱與隊(duì)列編號獲取消息消費(fèi)隊(duì)列,校對下一次拉取偏移量,組裝消息并返回。
          • 消息拉取客戶端處理消息:根據(jù)響應(yīng)結(jié)果解碼成 PullResultExt 對象,對消息進(jìn)行過濾,將消息存入 processQueue,然后將拉取到的消息提交到 ConsumeMessageService 中供消費(fèi)者消費(fèi)。
          • 消息拉取長輪詢機(jī)制分析:RocketMQ 通過在 Broker 端配置 longPollingEnable 為 true 來開啟長輪詢模式,Broker 在處理消息拉取請求時(shí),如果消息未找到且 brokerAllowSuspend 為 true 且開啟了長輪詢,會設(shè)置掛起超時(shí)時(shí)間,創(chuàng)建 PullRequest 并提交到 PullRequestHoldService 線程中,PullRequestHoldService 線程每 5 秒掃描所有被 hold 住的長輪詢請求,檢查是否有新消息到達(dá)并返回,DefaultMessageStore#ReputMessageService 線程在將 CommitLog 消息轉(zhuǎn)發(fā)到 ConsumeQueue 文件時(shí),若 Broker 端開啟長輪詢且當(dāng)前節(jié)點(diǎn)為主節(jié)點(diǎn),則調(diào)用 PullRequestHoldService 的 notifyMessageArriving 方法喚醒掛起線程,判斷消費(fèi)隊(duì)列最大偏移量與待拉取偏移量關(guān)系,若前者大于后者則拉取消息。
      4. 消息隊(duì)列負(fù)載與重平衡:

        • RebalanceService 線程實(shí)現(xiàn):默認(rèn)每隔 20s 執(zhí)行一次 doRebalance 方法,遍歷所有注冊的消費(fèi)者信息,執(zhí)行對應(yīng)消費(fèi)組的 doRebalance 方法,對每個(gè)主題的隊(duì)列進(jìn)行重新負(fù)載。
        • RebalanceImpl.rebalanceByTopic 實(shí)現(xiàn):從主題訂閱信息緩存表中獲取主題的隊(duì)列信息,從 Broker 中獲取消費(fèi)組內(nèi)當(dāng)前所有消費(fèi)者客戶端 ID,對消費(fèi)者 ID 和隊(duì)列進(jìn)行排序,調(diào)用 AllocateMessageQueueStrategy 隊(duì)列負(fù)載策略進(jìn)行隊(duì)列負(fù)載,對比消息隊(duì)列是否發(fā)生變化,暫停不屬于當(dāng)前消費(fèi)者的隊(duì)列消息消費(fèi)并保存消費(fèi)進(jìn)度,創(chuàng)建新分配隊(duì)列的 PullRequest 并添加到 PullMessageService 線程任務(wù)隊(duì)列。
      5. 消息的消費(fèi)過程:

        • 消息消費(fèi):消費(fèi)者消息消費(fèi)服務(wù) ConsumeMessageConcurrentlyService 的主要方法是 submitConsumeRequest 提交消費(fèi)請求,根據(jù)消息數(shù)量和 consumeMessageBatchMaxSize 進(jìn)行處理,將消息放入 ConsumeRequest 并提交到消費(fèi)線程池。進(jìn)入具體消費(fèi)隊(duì)列時(shí),會檢查 processQueue 的 dropped 狀態(tài),執(zhí)行消息消費(fèi)鉤子函數(shù),執(zhí)行業(yè)務(wù)代碼消費(fèi)消息,根據(jù)消費(fèi)結(jié)果計(jì)算 ackIndex,處理消息消費(fèi)結(jié)果,從 ProcessQueue 中移除已確認(rèn)消息并更新消費(fèi)進(jìn)度。
        • 消息消費(fèi)失敗重試機(jī)制:如果消息監(jiān)聽器返回的消費(fèi)結(jié)果為 RECONSUME_LATER,則將消息發(fā)送給 Broker 的重試 topic 中??蛻舳艘酝椒绞桨l(fā)送請求到服務(wù)端,服務(wù)端創(chuàng)建重試主題,根據(jù)消息物理偏移量獲取消息,設(shè)置消息重試次數(shù)和延時(shí)級別,創(chuàng)建新的消息對象并存入 CommitLog 文件。
      6. 消費(fèi)進(jìn)度管理:

        • 廣播模式消費(fèi)進(jìn)度存儲:廣播模式消息消費(fèi)進(jìn)度存儲在消費(fèi)者本地,實(shí)現(xiàn)類為 LocalFileOffsetStore,通過 load 方法從磁盤加載消息消費(fèi)進(jìn)度,persistAll 方法將內(nèi)存中的消費(fèi)進(jìn)度持久化到磁盤中。
        • 集群模式消費(fèi)進(jìn)度存儲:集群模式消息進(jìn)度存儲文件存放在消息服務(wù)端,實(shí)現(xiàn)類為 RemoteBrokerOffsetStore,從 broker 獲取指定 MessageQueue 當(dāng)前消費(fèi)組的消費(fèi)進(jìn)度,并更新內(nèi)存中的消費(fèi)進(jìn)度。
      posted @ 2024-09-21 21:42  聽到微笑  閱讀(205)  評論(0)    收藏  舉報(bào)
      主站蜘蛛池模板: 宜兰县| 精品国产品香蕉在线| 男人的天堂av社区在线| 国产乱老熟女乱老熟女视频| 亚洲av精选一区二区| 国产亚洲视频免费播放| 欧美乱码伦视频免费| 精品人妻系列无码天堂| 五月婷婷开心中文字幕| 亚洲蜜臀av乱码久久| 久久午夜私人影院| 久久人人97超碰人人澡爱香蕉| 午夜国产精品福利一二| 久久精品国产亚洲精品色婷婷 | 国产香蕉尹人在线视频你懂的| 欧美性猛交xxxx乱大交极品| 亚洲欧美成人综合久久久| 久久人与动人物a级毛片 | 国产无套护士在线观看| 鄂温| 久热这里只有精品在线观看| 亚洲欧美综合人成在线| 日本高清视频在线www色| 摸丰满大乳奶水www免费| 国产小受被做到哭咬床单GV| 泰宁县| 强奷白丝美女在线观看| 国产 亚洲 制服 无码 中文| 性一交一乱一伦| 国内揄拍国内精品少妇国语| 中文字幕亚洲无线码一区女同| 国产高清在线不卡一区| 国产成人黄色自拍小视频| 国产在线精品国偷产拍| 无套内谢少妇毛片在线| 国产精品色一区二区三区| 国产精品疯狂输出jk草莓视频 | 中文字幕久无码免费久久| 午夜免费无码福利视频麻豆| 荔浦县| 又大又硬又爽免费视频|