深入剖析RocketMQ消息消費(fèi)原理
本文參考轉(zhuǎn)載至《RocketMQ技術(shù)內(nèi)幕 第2版》
一. 消息消費(fèi)概述
消息消費(fèi)以組的模式開展,一個(gè)消費(fèi)組可以包含多個(gè)消費(fèi)者,每個(gè)消費(fèi)組可以訂閱多個(gè)主題,消費(fèi)組之間有集群模式和廣播模式兩種消費(fèi)模式。集群模式是當(dāng)前主題下的同一條消息只允許被其中一個(gè)消費(fèi)者消費(fèi)。廣播模式是當(dāng)前主題下的同一條消息將被集群內(nèi)的所有消費(fèi)者消費(fèi)一次。
消息服務(wù)器與消費(fèi)者之間的消息傳送也有兩種方式:推模式和拉模式。所謂的拉模式,是消費(fèi)端主動發(fā)起拉取消息的請求,而推模式是消息到達(dá)消息服務(wù)器后,再推送給消息消費(fèi)者。RocketMQ消息推模式基于拉模式實(shí)現(xiàn),在拉模式上包裝一層,一個(gè)拉取任務(wù)完成后開始下一個(gè)拉取任務(wù)。
集群模式下,多個(gè)消費(fèi)者如何對消息隊(duì)列進(jìn)行負(fù)載呢?消息隊(duì)列負(fù)載機(jī)制遵循一個(gè)通用的思想:一個(gè)消息隊(duì)列同一時(shí)間只允許被一個(gè)消費(fèi)者消費(fèi),一個(gè)消費(fèi)者可以消費(fèi)多個(gè)消息隊(duì)列。
RocketMQ 支持局部順序消息消費(fèi),也就是保證同一個(gè)消息隊(duì)列上的消息按順序消費(fèi)。不支持消息全局順序消費(fèi),如果要實(shí)現(xiàn)某一主題的全局順序消息消費(fèi),可以將該主題的隊(duì)列數(shù)設(shè)置為1,犧牲高可用性。RocketMQ支持兩種消息過濾模式:表達(dá)式(TAG、SQL92)與類過濾模式。
消息拉模式主要是由客戶端手動調(diào)用消息拉取API,而消息推模式是消息服務(wù)器主動將消息推送到消息消費(fèi)端,本章將以推模式為突破 口,重點(diǎn)介紹 RocketMQ 消息消費(fèi)的實(shí)現(xiàn)原理。
1.1 消費(fèi)隊(duì)列負(fù)載機(jī)制與重平衡
正如上文提到的,RocketMQ提供了兩種消費(fèi)模式,集群模式與廣播模式。廣播模式中所有的消費(fèi)者會消費(fèi)全部的隊(duì)列,故沒有所謂的消費(fèi)隊(duì)列負(fù)載問題,而集群模式下需要考慮同一個(gè)消費(fèi)組內(nèi)的多個(gè)消費(fèi)者之間如何分配隊(duì)列。
RocketMQ提供了多種隊(duì)列負(fù)載算法,其中比較常用的是AVG、AVG_BY_CIRCLE這兩種平均分配算法,例如8個(gè)隊(duì)列分別為b1_q0、 b1_q1、b1_q2、b1_q3、b2_q0、b2_q1、b2_q2、b2_q3,一個(gè)消費(fèi)組有 3個(gè)消費(fèi)者,分別用C1、C2、C3表示。
采用AVG的分配機(jī)制,各個(gè)消費(fèi)者分配到的隊(duì)列如下。
-
c1:b1_q0、b1_q1、b1_q2
-
c2:b1_q3、b2_q0、b2_q1
-
c3:b2_q2、b2_q3
采用AVG_BY_CIRCLE的分配機(jī)制,各個(gè)消費(fèi)者分配到的隊(duì)列如下。
-
c1:b1_q0、b1_q3、b2_q2
-
c2:b1_q1、b2_q0 b2_q3
-
c3:b1_q2、b2_q1
這兩種分配算法各有使用場景。通常要求發(fā)送方發(fā)送的消息盡量在各個(gè)隊(duì)列上分布均勻,如果分布均衡,就會使用第一種平均算法。但有些時(shí)候,一臺Broker上的消息會明顯多于第二臺,如果使用第一種分配算法,c1消費(fèi)者處理的消息就太多了,但其他消費(fèi)者又空閑, 而且還不能通過增加消費(fèi)者來改變這種情況,此種情況使用AVG_BY_CIRCLE方式更加合適。
在消費(fèi)時(shí)間過程中可能會遇到消息消費(fèi)隊(duì)列增加或減少、消息消費(fèi)者增加或減少,比如需要對消息消費(fèi)隊(duì)列進(jìn)行重新平衡,即重新分配,這就是所謂的重平衡機(jī)制。在RocketMQ中,每隔20s會根據(jù)當(dāng)前隊(duì)列數(shù)量、消費(fèi)者數(shù)量重新進(jìn)行隊(duì)列負(fù)載計(jì)算,如果計(jì)算出來的結(jié)果與當(dāng)前不一樣,則觸發(fā)消息消費(fèi)隊(duì)列的重平衡。
1.2 并發(fā)消費(fèi)模型
RocketMQ 支持并發(fā)消費(fèi)與順序消費(fèi)兩種消費(fèi)方式,消息的拉取與消費(fèi)模型基本一致,只是順序消費(fèi)在某些環(huán)節(jié)為了保證順序性,需要引入鎖機(jī)制,RocketMQ的消息拉取與消費(fèi)模式如圖下圖所示:

一個(gè)MQ客戶端(MQClientInstance)只會創(chuàng)建一個(gè)消息拉取服務(wù)線程(PullMessageService)向Broker拉取消息,但是拉取消息網(wǎng)絡(luò)IO操作是異步的,所以在拉取一個(gè)消費(fèi)隊(duì)列消息時(shí)發(fā)生長輪詢阻塞并不會影響其它消費(fèi)隊(duì)列的消息拉取。PullMessageService會不斷獲取PullRequest拉取請求,將拉取請求放入IO線程池中后會立即返回(不會等Broker響應(yīng)),然后繼續(xù)“不知疲倦”地獲取下一個(gè)PullRequest拉取請求。當(dāng)IO線程收到broker相應(yīng)后,會執(zhí)行回調(diào)方法,將拉取到的消息提交到消費(fèi)組的線程池。
RocketMQ客戶端為每一個(gè)消費(fèi)組創(chuàng)建獨(dú)立的消費(fèi)線程池,即在并發(fā)消費(fèi)模式下,單個(gè)消費(fèi)組內(nèi)的并發(fā)度為線程池線程個(gè)數(shù)。線程池處理一批消息后會向Broker匯報(bào)消息消費(fèi)進(jìn)度。
1.3 消息消費(fèi)進(jìn)度反饋機(jī)制
RocketMQ客戶端消費(fèi)一批數(shù)據(jù)后,需要向Broker反饋消息的消費(fèi)進(jìn)度,Broker會記錄消息消費(fèi)進(jìn)度,這樣在客戶端重啟或隊(duì)列重平衡時(shí)會根據(jù)其消費(fèi)進(jìn)度重新向Broker拉取消息,消息消費(fèi)進(jìn)度反饋機(jī)制,如下圖所示:

消息消費(fèi)進(jìn)度反饋機(jī)制核心要點(diǎn)如下。
-
消費(fèi)線程池在處理完一批消息后,會將消息消費(fèi)進(jìn)度存儲在本地內(nèi)存中。
-
客戶端會啟動一個(gè)定時(shí)線程,每5s將存儲在本地內(nèi)存中的所有隊(duì)列消息消費(fèi)偏移量提交到Broker中。
-
Broker收到的消息消費(fèi)進(jìn)度會存儲在內(nèi)存中,每隔5s將消息消費(fèi)偏移量持久化到磁盤文件中。
-
在客戶端向Broker拉取消息時(shí)也會將該隊(duì)列的消息消費(fèi)偏移量提交到Broker。
再來思考一個(gè)問題,假設(shè)線程池中有T1、T2、T3三個(gè)線程,此時(shí)分別依次獲取到msg1、msg2、msg3消息,消息msg3的偏移量大于msg1、msg2的偏移量,由于支持并發(fā)消費(fèi),如果線程t3先處理完msg3,而t1、t2還未處理,那么線程t3如何提交消費(fèi)偏移量呢?
試想一下,如果提交msg3的偏移量是作為消費(fèi)進(jìn)度被提交,如果此時(shí)消費(fèi)端重啟,消息消費(fèi)msg1、msg2就不會再被消費(fèi),這樣就會造成“消息丟失”。因此t3線程并不會提交msg3的偏移量,而是提交線程池中偏移量最小的消息的偏移量,即t3線程在消費(fèi)完msg3后,提交的消息消費(fèi)進(jìn)度依然是msg1的偏移量,這樣能避免消息丟失,但同樣有消息重復(fù)消費(fèi)的風(fēng)險(xiǎn)。
二. 消息消費(fèi)者初探
下面我們介紹推模式消費(fèi)者 MQPushConsumer 的主要屬性:
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
private final InternalLogger log = ClientLogger.getLog();
/**
* Internal implementation. Most of the functions herein are delegated to it.
*/
protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
/**
* 消費(fèi)者所屬組
*/
private String consumerGroup;
/**
* 消息消費(fèi)模式,分為集群模式、廣播模式,默認(rèn)為集群模式
*/
private MessageModel messageModel = MessageModel.CLUSTERING;
/**
* 第一次消費(fèi)時(shí)指定消費(fèi)策略。
* CONSUME_FROM_LAST_OFFSET:此處分為兩種情況,如果磁盤消息未過期且未被刪除,則從最小偏移量開始消費(fèi)。如果磁盤已過期
* 并被刪除,則從最大偏移量開始消費(fèi)。
* CONSUME_FROM_FIRST_OFFSET:從隊(duì)列當(dāng)前最小偏移量開始消費(fèi)。
* CONSUME_FROM_TIMESTAMP:從消費(fèi)者指定時(shí)間戳開始消費(fèi)。
*
* 注意:如果從消息進(jìn)度服務(wù)OffsetStore讀取到MessageQueue中的
* 偏移量不小于0,則使用讀取到的偏移量拉取消息,只有在讀到的偏移
* 量小于0時(shí),上述策略才會生效
*/
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
/**
* 集群模式下消息隊(duì)列的負(fù)載策略
*/
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
/**
* 訂閱信息
*/
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
/**
* 消息業(yè)務(wù)監(jiān)聽器
*/
private MessageListener messageListener;
/**
* 消息消費(fèi)進(jìn)度存儲器
*/
private OffsetStore offsetStore;
/**
* 消費(fèi)者最小線程數(shù)
*/
private int consumeThreadMin = 20;
/**
* 消費(fèi)者最大線程數(shù),因?yàn)橄M(fèi)者線程池使用無界隊(duì)列,所以此參數(shù)不生效
*/
private int consumeThreadMax = 20;
/**
* Threshold for dynamic adjustment of the number of thread pool
*/
private long adjustThreadPoolNumsThreshold = 100000;
/**
* 并發(fā)消息消費(fèi)時(shí)處理隊(duì)列最大跨度,默認(rèn)2000,表示如果消息處理隊(duì)列中偏移量最大的消息
* 與偏移量最小的消息的跨度超過2000,則延遲50ms后再拉取消息。
*/
private int consumeConcurrentlyMaxSpan = 2000;
/**
* 隊(duì)列級別的流量控制閾值,默認(rèn)情況下每個(gè)消息隊(duì)列最多緩存1000條消息;
*/
private int pullThresholdForQueue = 1000;
/**
* 在隊(duì)列級別限制緩存的消息大小,默認(rèn)情況下每個(gè)消息隊(duì)列最多緩存100 MiB消息。
* 考慮{@code pullBatchSize},瞬時(shí)值可能超過限制消息的大小僅由消息體來衡量,因此不準(zhǔn)確
*/
private int pullThresholdSizeForQueue = 100;
/**
* 推模式下拉取任務(wù)的間隔時(shí)間,默認(rèn)一次拉取任務(wù)完成后繼續(xù)拉取
*/
private long pullInterval = 0;
/**
* 消息并發(fā)消費(fèi)時(shí)一次消費(fèi)消息的條數(shù),通俗點(diǎn)說,就是每次傳入MessageListener#consumeMessage中的消息條數(shù)
*/
private int consumeMessageBatchMaxSize = 1;
/**
* 每次消息拉取的條數(shù),默認(rèn)32條
*/
private int pullBatchSize = 32;
/**
* 是否每次拉取消息都更新訂閱信息,默認(rèn)為false
*/
private boolean postSubscriptionWhenPull = false;
/**
* 最大消費(fèi)重試次數(shù)。如果消息消費(fèi)次數(shù)超過maxReconsume Times還未成功,則將該消息轉(zhuǎn)移到一個(gè)失敗隊(duì)列,等待被刪除
*/
private int maxReconsumeTimes = -1;
/**
* 延遲將該隊(duì)列的消息提交到消費(fèi)者線程的等待時(shí)間,默認(rèn)延遲1s。
*/
private long suspendCurrentQueueTimeMillis = 1000;
/**
* 消息消費(fèi)超時(shí)時(shí)間,默認(rèn)為15,單位為分鐘
*/
private long consumeTimeout = 15;
}
三. 消費(fèi)者啟動流程
本節(jié)介紹消息消費(fèi)者是如何啟動的,請跟我一起來分析 DefaultMQPushConsumerImpl 的 start() 方法:
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#copySubscription
private void copySubscription() throws MQClientException {
try {
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subString);
// 構(gòu)建主題訂閱信息SubscriptionData并加入RebalanceImpl的訂閱消息中
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
// 如果是集群消費(fèi)模式下,還需要將重試Topic的消息放入RebalanceImpl的訂閱消息中
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
// RocketMQ消息重試是以消費(fèi)組為單位,而不是主題,消息重試主題名為%RETRY%+消費(fèi)組名
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
第一步:構(gòu)建主題訂閱信息 SubscriptionData 并加入RebalanceImpl的訂閱消息中。訂閱關(guān)系來源主要有兩個(gè)。
- 通過調(diào)用DefaultMQPushConsumerImpl#subscribe(String topic, String subExpression)方法獲取。
- 訂閱重試主題消息。RocketMQ消息重試是以消費(fèi)組為單位,而不是主題,消息重試主題名為
%RETRY%+消費(fèi)組名。消費(fèi)者在啟動時(shí)會自動訂閱該主題,參與該主題的消息隊(duì)列負(fù)載。
//org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
// 創(chuàng)建MQClientInstance實(shí)例。單例模式:同一個(gè)clientId只會創(chuàng)建一個(gè)MQClientInstance實(shí)例
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
第二步:初始化MQClientInstance、RebalanceImple(消息重新負(fù)載實(shí)現(xiàn)類)等。
//org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
// 初始化消息進(jìn)度
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
// 初始化消息進(jìn)度。如果消息消費(fèi)采用集群模式,那么消
// 息進(jìn)度存儲在Broker上,如果采用廣播模式,那么消息消費(fèi)進(jìn)度存儲
// 在消費(fèi)端
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
第三步:始化消息進(jìn)度。如果消息消費(fèi)采用集群模式,那么消息進(jìn)度存儲在Broker上,如果采用廣播模式,那么消息消費(fèi)進(jìn)度存儲在消費(fèi)端。具體實(shí)現(xiàn)細(xì)節(jié)后面將重點(diǎn)探討。
//org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
// 如果是順序消費(fèi),創(chuàng)建消費(fèi)端消費(fèi)線程服務(wù)。ConsumeMessageService主要負(fù)責(zé)消息消費(fèi),在內(nèi)部維護(hù)一個(gè)線程池
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
// 如果是順序消費(fèi),將 consumeOrderly 置為 true,這樣在RebalanceService負(fù)載隊(duì)列時(shí)對隊(duì)列加鎖,實(shí)現(xiàn)消費(fèi)端順序消費(fèi),防止多個(gè)客戶端同時(shí)消費(fèi)同一個(gè)隊(duì)列
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
// 如果是并發(fā)消費(fèi),則設(shè)置 consumeOrderly 為 false,消費(fèi)時(shí)不會對隊(duì)列進(jìn)行加鎖
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
第四步:如果是順序消費(fèi),創(chuàng)建消費(fèi)端消費(fèi)線程服務(wù)。ConsumeMessageService主要負(fù)責(zé)消息消費(fèi),在內(nèi)部維護(hù)一個(gè)線程池。
//org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
// 向MQClientInstance注冊消費(fèi)者
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 啟動MQClientInstance,JVM中的所有消費(fèi)者、生產(chǎn)者持有同一個(gè)MQClientInstance,MQClientInstance只會啟動一次
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
第五步:向MQClientInstance注冊消費(fèi)者并啟動MQClientInstance,JVM中的所有消費(fèi)者、生產(chǎn)者持有同一個(gè)MQClientInstance,MQClientInstance只會啟動一次。
四. 消息拉取
本節(jié)將基于推模式詳細(xì)分析消息拉取機(jī)制。消息消費(fèi)有兩種模式:廣播模式與集群模式,廣播模式比較簡單,每一個(gè)消費(fèi)者需要拉取訂閱主題下所有消費(fèi)隊(duì)列的消息。本節(jié)主要基于集群模式進(jìn)行介紹。在集群模式下,同一個(gè)消費(fèi)組內(nèi)有多個(gè)消息消費(fèi)者,同一個(gè)主題 存在多個(gè)消費(fèi)隊(duì)列,那么消費(fèi)者如何進(jìn)行消息隊(duì)列負(fù)載呢?從第3節(jié)介紹的啟動流程可知,每一個(gè)消費(fèi)組內(nèi)維護(hù)一個(gè)線程池來消費(fèi)消息,那么這些線程又是如何分工合作的呢?
消息隊(duì)列負(fù)載通常的做法是一個(gè)消息隊(duì)列在同一時(shí)間只允許被一個(gè)消息消費(fèi)者消費(fèi),一個(gè)消息消費(fèi)者可以同時(shí)消費(fèi)多個(gè)消息隊(duì)列,那么RocketMQ是如何實(shí)現(xiàn)消息隊(duì)列負(fù)載的呢?帶著上述問題,我們開始RocketMQ消息消費(fèi)機(jī)制的探討。
從MQClientInstance的啟動流程中可以看出,RocketMQ使用一個(gè)單獨(dú)的線程PullMessageService執(zhí)行消息的拉取。
4.1 PullMessageService實(shí)現(xiàn)機(jī)制
PullMessageService繼承的是ServiceThread,從名稱來看,它是服務(wù)線程,通過run()方法啟動:
//org.apache.rocketmq.client.impl.consumer.PullMessageService#run
@Override
public void run() {
log.info(this.getServiceName() + " service started");
// while (!this.isStopped()) 是一種通用的設(shè)計(jì)技巧,Stopped
// 聲明為volatile,每執(zhí)行一次業(yè)務(wù)邏輯,檢測一下其運(yùn)行狀態(tài),可以
// 通過其他線程將Stopped設(shè)置為true,從而停止該線程
while (!this.isStopped()) {
try {
// 從pullRequestQueue中獲取一個(gè)PullRequest消息拉取任務(wù),
// 如果pullRequestQueue為空,則線程將阻塞,直到有拉取任務(wù)被放入
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
PullMessageService消息拉取服務(wù)線程,run()方法是其核心邏輯,如代碼清單5-7所示。run()方法的核心要點(diǎn)如下。
- while(!this.isStopped())是一種通用的設(shè)計(jì)技巧,Stopped 聲明為volatile,每執(zhí)行一次業(yè)務(wù)邏輯,檢測一下其運(yùn)行狀態(tài),可以通過其他線程將Stopped設(shè)置為true,從而停止該線程。
- 從pullRequestQueue中獲取一個(gè)PullRequest消息拉取任務(wù), 如果pullRequestQueue為空,則線程將阻塞,直到有拉取任務(wù)被放入。
- 調(diào)用pullMessage方法進(jìn)行消息拉取。思考一下,PullRequest 是什么時(shí)候添加的呢?
// org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestLater
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
if (!isStopped()) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
PullMessageService.this.executePullRequestImmediately(pullRequest);
}
}, timeDelay, TimeUnit.MILLISECONDS);
} else {
log.warn("PullMessageServiceScheduledThread has shutdown");
}
}
原來,PullMessageService提供了延遲添加與立即添加兩種方式將PullRequest放入pullRequestQueue。那么PullRequest是在什么時(shí)候創(chuàng)建的呢?executePullRequestImmediately方法調(diào)用鏈:

通過跟蹤發(fā)現(xiàn),主要有兩個(gè)地方會調(diào)用executePullRequestImmediately:一個(gè)是在RocketMQ根據(jù)PullRequest拉取任務(wù)執(zhí)行完一次消息拉取任務(wù)后,又將PullRequest 對象放入pullRequestQueue;另一個(gè)是在RebalanceImpl中創(chuàng)建的。RebalanceImpl是后文要重點(diǎn)介紹的消息隊(duì)列負(fù)載機(jī)制,也就是PullRequest對象真正創(chuàng)建的地方。
從上面的分析可知,PullMessageService只有在得到 PullRequest 對象時(shí)才會執(zhí)行拉取任務(wù),那么PullRequest究竟是什么呢?
public class PullRequest {
/**
* 消費(fèi)者組
*/
private String consumerGroup;
/**
* 待拉取消費(fèi)隊(duì)列
*/
private MessageQueue messageQueue;
/**
* 消息處理隊(duì)列,從Broker中拉取到的消息會先存入ProccessQueue,然后再提交到消費(fèi)者消費(fèi)線程池進(jìn)行消費(fèi)
*/
private ProcessQueue processQueue;
/**
* 待拉取的MessageQueue偏移量
*/
private long nextOffset;
/**
* 是否被鎖定
*/
private boolean lockedFirst = false;
}
PullMessageService 會不斷拉取隊(duì)列中的PullRequest請求,去Broker中拉取消息:
// org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessage
/**
* 根據(jù)消費(fèi)組名從MQClientInstance中獲取消費(fèi)者的內(nèi)部實(shí)現(xiàn)類
* MQConsumerInner,令人意外的是,這里將consumer強(qiáng)制轉(zhuǎn)換為
* DefaultMQPushConsumerImpl,也就是PullMessageService,該線程只
* 為推模式服務(wù),那拉模式如何拉取消息呢?其實(shí)細(xì)想也不難理解,對
* 于拉模式,RocketMQ只需要提供拉取消息API,再由應(yīng)用程序調(diào)用API
*
* @param pullRequest
*/
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
根據(jù)消費(fèi)組名從MQClientInstance中獲取消費(fèi)者的內(nèi)部實(shí)現(xiàn)類MQConsumerInner,令人意外的是,這里將consumer強(qiáng)制轉(zhuǎn)換為DefaultMQPushConsumerImpl,也就是PullMessageService,該線程只為推模式服務(wù),那拉模式如何拉取消息呢?其實(shí)細(xì)想也不難理解,對 于拉模式,RocketMQ只需要提供拉取消息API,再由應(yīng)用程序調(diào)用 API。
4.2 ProcessQueue實(shí)現(xiàn)機(jī)制
ProcessQueue 是 MessageQueue 在消費(fèi)端的重現(xiàn)、快照。PullMessageService從消息服務(wù)器默認(rèn)每次拉取32條消息,按消息隊(duì)列偏移量的順序存放在ProcessQueue中,PullMessageService將消息提交到消費(fèi)者消費(fèi)線程池,消息成功消費(fèi)后,再從ProcessQueue中移除。
// org.apache.rocketmq.client.impl.consumer.ProcessQueue
/**
* ProcessQueue是MessageQueue在消費(fèi)端的重現(xiàn)、快照。
* PullMessageService從消息服務(wù)器默認(rèn)每次拉取32條消息,按消息隊(duì)
* 列偏移量的順序存放在ProcessQueue中,PullMessageService將消息
* 提交到消費(fèi)者消費(fèi)線程池,消息成功消費(fèi)后,再從ProcessQueue中移
* 除。
* Queue consumption snapshot
*/
public class ProcessQueue {
/**
* 讀寫鎖,控制多線程并發(fā)修改msgTreeMap
*/
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
/**
* 消息存儲容器,鍵為消息在ConsumeQueue中的偏移量
*/
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
/**
* ProcessQueue中總消息數(shù)
*/
private final AtomicLong msgCount = new AtomicLong();
private final AtomicLong msgSize = new AtomicLong();
private final Lock lockConsume = new ReentrantLock();
/**
* A subset of msgTreeMap, will only be used when orderly consume
*
* 用于存儲消息消費(fèi)隊(duì)列中正在被順序消費(fèi)的消息。其鍵值對的關(guān)系為 Offset -> Message Queue,也就是按照消息在 Broker 中存儲的物理偏移量進(jìn)行排序。
*/
private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();
private final AtomicLong tryUnlockTimes = new AtomicLong(0);
/**
* 當(dāng)前ProcessQueue中包含的最大隊(duì)列偏移量
*/
private volatile long queueOffsetMax = 0L;
/**
* 當(dāng)前ProccesQueue是否被丟棄
*/
private volatile boolean dropped = false;
/**
* 上一次開始拉取消息的時(shí)間戳
*/
private volatile long lastPullTimestamp = System.currentTimeMillis();
/**
* 上一次消費(fèi)消息的時(shí)間戳
*/
private volatile long lastConsumeTimestamp = System.currentTimeMillis();
}
4.3 消息拉取的基本流程
本節(jié)將以并發(fā)消息消費(fèi)來探討整個(gè)消息消費(fèi)的流程,消息拉取分為3個(gè)主要步驟:
- 封裝拉取請求,并請求broker拉取消息。
- broker查找消息并返回。
- 消息拉取客戶端處理返回的消息。
4.3.1 客戶端封裝消息拉取請求
消息拉取入口為DefaultMQPushConsumerImpl#pullMessage:
第一步:從PullRequest中獲取ProcessQueue,如果處理隊(duì)列當(dāng)前狀態(tài)未被丟棄,則更新ProcessQueue的lastPullTimestamp為當(dāng)前時(shí)間戳。如果當(dāng)前消費(fèi)者被掛起,則將拉取任務(wù)延遲1s再放入PullMessageService的拉取任務(wù)隊(duì)列中,最后結(jié)束本次消息拉取。
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
try {
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
}
if (this.isPause()) {
log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
第二步:進(jìn)行消息拉取流控。從消息消費(fèi)數(shù)量與消費(fèi)間隔兩個(gè)維度進(jìn)行控制。
- 消息處理總數(shù),如果ProcessQueue當(dāng)前處理的消息條數(shù)超過了pullThresholdForQueue=1000,將觸發(fā)流控,放棄本次拉取任務(wù),并且該隊(duì)列的下一次拉取任務(wù)將在50ms后才加入拉取任務(wù)隊(duì)列。每觸發(fā)1000次流控后輸出提示語:“the consumer message buffer is full, so do flow control, minOffset={隊(duì)列最小偏移量}, maxOffset={隊(duì)列最大偏移量}, size={消息總條數(shù)}, pullRequest={拉取任務(wù)}, flowControlTimes={流控觸發(fā)次數(shù)}”。
- ProcessQueue 中隊(duì)列最大偏移量與最小偏離量的間距不能超過 consumeConcurrentlyMaxSpan,否則觸發(fā)流控。每觸發(fā)1000次流控后輸出提示語:“the queue's messages, span too long, so do flow control, minOffset={隊(duì)列最小偏移量}, maxOffset={隊(duì)列最大偏移量}, maxSpan={間隔}, pullRequest={拉取任務(wù)信息}, flowControlTimes={流控觸發(fā)次數(shù)}”。這里主要的考量是擔(dān)心因?yàn)橐粭l消息堵塞,使消息進(jìn)度無法向前推進(jìn),可能會造成大量消息重復(fù)消費(fèi)。
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
// 已緩存消息總數(shù)維度的流量控制
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
// 如果ProcessQueue當(dāng)前處理的消息條數(shù)超過了pullThresholdForQueue=1000,將觸發(fā)流控,放棄本次拉取任務(wù),并
// 且該隊(duì)列的下一次拉取任務(wù)將在50ms后才加入拉取任務(wù)隊(duì)列。
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
// 緩存消息大小維度的流量控制
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
// 緩存消息的大小不能超過pullThresholdSizeForQueue,否則觸發(fā)流控
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
第三步:拉取該主題的訂閱信息,如果為空則結(jié)束本次消息拉取,關(guān)于該隊(duì)列的下一次拉取任務(wù)將延遲3s執(zhí)行。
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
// 獲取主題的訂閱信息
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
// 拉取該主題的訂閱信息,如果為空則結(jié)束本次消息拉取,關(guān)于該隊(duì)列的下一次拉取任務(wù)將延遲3s執(zhí)行
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
第四步:構(gòu)建消息拉取系統(tǒng)標(biāo)記。
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
// 是否更新已消費(fèi)物理偏移量
boolean commitOffsetEnable = false;
// 已消費(fèi)偏移量
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
// 如果是集群消費(fèi),更新已消費(fèi)偏移量
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
// 是否開啟長輪詢
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
如果當(dāng)前 ConsumerGroup 是集群消費(fèi),會將本地的消費(fèi)進(jìn)度在拉取請求中上報(bào)給Broker。
下面逐一介紹PullSysFlag的枚舉值含義:
// org.apache.rocketmq.common.sysflag.PullSysFlag
public class PullSysFlag {
/**
* 是否更新已消費(fèi)偏移量
*/
private final static int FLAG_COMMIT_OFFSET = 0x1;
/**
* 表示消息拉取時(shí)是否支持掛起(長輪詢)
*/
private final static int FLAG_SUSPEND = 0x1 << 1;
/**
* 消息過濾機(jī)制為表達(dá)式,則設(shè)置該標(biāo)記位
*/
private final static int FLAG_SUBSCRIPTION = 0x1 << 2;
/**
* 消息過濾機(jī)制為類模式,則設(shè)置該標(biāo)記
*/
private final static int FLAG_CLASS_FILTER = 0x1 << 3;
}
第五步:調(diào)用PullAPIWrapper.pullKernelImpl方法后與服務(wù)端交互。
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
try {
// 通過遠(yuǎn)程調(diào)用,從broker中拉取消息。拉取成功后,調(diào)用 pullCallback.onSuccess 方法
this.pullAPIWrapper.pullKernelImpl(
// 需要拉取的消息隊(duì)列信息
pullRequest.getMessageQueue(),
// 消息過濾表達(dá)式
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
// 拉取的物理偏移量
pullRequest.getNextOffset(),
// 拉取的消息數(shù)量
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
// 已消費(fèi)偏移量
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
pullKernelImpl 方法的參數(shù):
- MessageQueue mq:從哪個(gè)消息消費(fèi)隊(duì)列拉取消息。
- String subExpression:消息過濾表達(dá)式。
- String expressionType:消息表達(dá)式類型,分為TAG、SQL92。
- long offset:消息拉取偏移量。
- int maxNums:本次拉取最大消息條數(shù),默認(rèn)32條。
- int sysFlag:拉取系統(tǒng)標(biāo)記。
- long commitOffset:當(dāng)前MessageQueue的消費(fèi)進(jìn)度(內(nèi)存中)。
- long brokerSuspendMaxTimeMillis:消息拉取過程中允許Broker掛起的時(shí)間,默認(rèn)15s。
- long timeoutMillis:消息拉取超時(shí)時(shí)間。
- CommunicationMode communicationMode:消息拉取模式,默認(rèn)為異步拉取。
- PullCallback pullCallback:從Broker拉取到消息后的回調(diào)方法。
第六步:根據(jù)brokerName、BrokerId從MQClientInstance中獲取Broker地址,在整個(gè)RocketMQ Broker的部署結(jié)構(gòu)中,相同名稱的Broker構(gòu)成主從結(jié)構(gòu),其BrokerId會不一樣,在每次拉取消息后,會給出一個(gè)建議,下次是從主節(jié)點(diǎn)還是從節(jié)點(diǎn)拉?。?/p>
// org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl
// 根據(jù)brokerName、BrokerId從MQClientInstance中獲取
//Broker地址,在整個(gè)RocketMQ Broker的部署結(jié)構(gòu)中,相同名稱的
//Broker構(gòu)成主從結(jié)構(gòu),其BrokerId會不一樣,在每次拉取消息后,會
//給出一個(gè)建議,下次是從主節(jié)點(diǎn)還是從節(jié)點(diǎn)拉取
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
}
第七步:如果消息過濾模式為類過濾,則需要根據(jù)主題名稱、broker地址找到注冊在Broker上的FilterServer地址,從FilterServer上拉取消息,否則從Broker上拉取消息。上述步驟完成后,RocketMQ通過MQClientAPIImpl#pullMessageAsync方法異步向Broker拉取消息。
// org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl
if (findBrokerResult != null) {
{
// check version
if (!ExpressionType.isTagType(expressionType)
&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
}
}
int sysFlagInner = sysFlag;
if (findBrokerResult.isSlave()) {
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
// 消費(fèi)組名稱
requestHeader.setConsumerGroup(this.consumerGroup);
// topic名稱
requestHeader.setTopic(mq.getTopic());
// 隊(duì)列ID
requestHeader.setQueueId(mq.getQueueId());
// 隊(duì)列的偏移量
requestHeader.setQueueOffset(offset);
// 拉取的消息數(shù)量
requestHeader.setMaxMsgNums(maxNums);
// 消息拉取的標(biāo)識位,參考:org.apache.rocketmq.common.sysflag.PullSysFlag
requestHeader.setSysFlag(sysFlagInner);
// 已經(jīng)消費(fèi)完成的消息偏移量
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);
return pullResult;
}
4.3.2 Broker組裝消息
根據(jù)消息拉取命令 RequestCode.PULL_MESSAGE,很容易找到 Brokder 端處理消息拉取的入口: org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest
第一步:根據(jù)訂閱信息構(gòu)建消息過濾器
// org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
// 構(gòu)建消息過濾器
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
//表示支持對重試topic的屬性進(jìn)行過濾
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
} else {
// 表示不支持對重試topic的屬性進(jìn)行過濾
messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
}
第二步:調(diào)用MessageStore.getMessage查找消息
// org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
// 讀取broker中存儲的消息
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
該方法參數(shù)含義如下:
- String group:消費(fèi)組名稱。
- String topic:主題名稱。
- int queueId:隊(duì)列ID。
- long offset:待拉取偏移量。
- int maxMsgNums:最大拉取消息條數(shù)。
- MessageFilter messageFilter:消息過濾器。
第三步:根據(jù)主題名稱與隊(duì)列編號獲取消息消費(fèi)隊(duì)列
//org.apache.rocketmq.store.DefaultMessageStore#getMessage
long beginTime = this.getSystemClock().now();
GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
// 待查找隊(duì)列的偏移量
long nextBeginOffset = offset;
// 當(dāng)前消息隊(duì)列的最小偏移量
long minOffset = 0;
// 當(dāng)前消息隊(duì)列的最大偏移量
long maxOffset = 0;
- nextBeginOffset:待查找隊(duì)列的偏移量。
- minOffset:當(dāng)前消息隊(duì)列的最小偏移量。
- maxOffset:當(dāng)前消息隊(duì)列的最大偏移量。
- maxOffsetPy:當(dāng)前CommitLog文件的最大偏移量。
第四步:消息偏移量異常情況校對下一次拉取偏移量:
//org.apache.rocketmq.store.DefaultMessageStore#getMessage
GetMessageResult getResult = new GetMessageResult();
// 當(dāng)前CommitLog文件的最大偏移量
final long maxOffsetPy = this.commitLog.getMaxOffset();
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();
if (maxOffset == 0) {
//表示當(dāng)前消費(fèi)隊(duì)列中沒有消息,拉取結(jié)果為
//NO_MESSAGE_IN_QUEUE。如果當(dāng)前Broker為主節(jié)點(diǎn),下次拉取偏移量為
//0。如果當(dāng)前Broker為從節(jié)點(diǎn)并且offsetCheckInSlave為true,設(shè)置下
//次拉取偏移量為0。其他情況下次拉取時(shí)使用原偏移量
status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
} else if (offset < minOffset) {
//表示待拉取消息偏移量小于隊(duì)列的起始偏
//移量,拉取結(jié)果為OFFSET_TOO_SMALL。如果當(dāng)前Broker為主節(jié)點(diǎn),下
//次拉取偏移量為隊(duì)列的最小偏移量。如果當(dāng)前Broker為從節(jié)點(diǎn)并且
//offsetCheckInSlave為true,下次拉取偏移量為隊(duì)列的最小偏移量。
//其他情況下次拉取時(shí)使用原偏移量。
status = GetMessageStatus.OFFSET_TOO_SMALL;
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else if (offset == maxOffset) {
// 如果待拉取偏移量等于隊(duì)列最大偏移
//量,拉取結(jié)果為OFFSET_OVERFLOW_ONE,則下次拉取偏移量依然為
//offset。
status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
nextBeginOffset = nextOffsetCorrection(offset, offset);
} else if (offset > maxOffset) {
// 表示偏移量越界,拉取結(jié)果為
//OFFSET_OVERFLOW_BADLY。此時(shí)需要考慮當(dāng)前隊(duì)列的偏移量是否為0,
//如果當(dāng)前隊(duì)列的最小偏移量為0,則使用最小偏移量糾正下次拉取偏移
//量。如果當(dāng)前隊(duì)列的最小偏移量不為0,則使用該隊(duì)列的最大偏移量來
//糾正下次拉取偏移量
status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
if (0 == minOffset) {
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else {
nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
}
}
- maxOffset=0:表示當(dāng)前消費(fèi)隊(duì)列中沒有消息,拉取結(jié)果為NO_MESSAGE_IN_QUEUE。如果當(dāng)前Broker為主節(jié)點(diǎn),下次拉取偏移量為 0。如果當(dāng)前Broker為從節(jié)點(diǎn)并且offsetCheckInSlave為true,設(shè)置下次拉取偏移量為0。其他情況下次拉取時(shí)使用原偏移量。
- offset<minOffset:表示待拉取消息偏移量小于隊(duì)列的起始偏移量,拉取結(jié)果為OFFSET_TOO_SMALL。如果當(dāng)前Broker為主節(jié)點(diǎn),下 次拉取偏移量為隊(duì)列的最小偏移量。如果當(dāng)前Broker為從節(jié)點(diǎn)并且offsetCheckInSlave為true,下次拉取偏移量為隊(duì)列的最小偏移量。 其他情況下次拉取時(shí)使用原偏移量。
- offset==maxOffset:如果待拉取偏移量等于隊(duì)列最大偏移量,拉取結(jié)果為OFFSET_OVERFLOW_ONE,則下次拉取偏移量依然為offset。
- offset>maxOffset:表示偏移量越界,拉取結(jié)果為OFFSET_OVERFLOW_BADLY。此時(shí)需要考慮當(dāng)前隊(duì)列的偏移量是否為0, 如果當(dāng)前隊(duì)列的最小偏移量為0,則使用最小偏移量糾正下次拉取偏移量。如果當(dāng)前隊(duì)列的最小偏移量不為0,則使用該隊(duì)列的最大偏移量來糾正下次拉取偏移量。糾正邏輯與1)、2)相同。
第五步:如果待拉取偏移量大于minOffset并且小于maxOffset,從當(dāng)前offset處嘗試?yán)?2條消息。
//org.apache.rocketmq.store.DefaultMessageStore#getMessage
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
try {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
long nextPhyFileStartOffset = Long.MIN_VALUE;
long maxPhyOffsetPulling = 0;
int i = 0;
final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
// 消息物理偏移量
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
// 消息長度
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
// 消息TAG的hash碼
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
maxPhyOffsetPulling = offsetPy;
if (nextPhyFileStartOffset != Long.MIN_VALUE) {
if (offsetPy < nextPhyFileStartOffset)
continue;
}
boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
isInDisk)) {
break;
}
boolean extRet = false, isTagsCodeLegal = true;
if (consumeQueue.isExtAddr(tagsCode)) {
extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
if (extRet) {
tagsCode = cqExtUnit.getTagsCode();
} else {
// can't find ext content.Client will filter messages by tag also.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
tagsCode, offsetPy, sizePy, topic, group);
isTagsCodeLegal = false;
}
}
// 對消息TAG的Hash碼進(jìn)行比對,如果未匹配,則繼續(xù)拉取下一條消息
if (messageFilter != null
&& !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
continue;
}
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
if (null == selectResult) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}
nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
continue;
}
// 對消息屬性進(jìn)行SQL92過濾,此種過濾方式會反序列化消息內(nèi)容,性能相對TAG過濾會差一點(diǎn)
if (messageFilter != null
&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
// release...
selectResult.release();
continue;
}
this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
getResult.addMessage(selectResult);
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
}
if (diskFallRecorded) {
long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
}
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
// diff:是 maxOffsetPy 和 maxPhyOffsetPulling 兩者的差值,表示還有多少消息沒有拉取
long diff = maxOffsetPy - maxPhyOffsetPulling;
// StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE:表示當(dāng)前 Master Broker 全部的物理內(nèi)存大小。
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
// 如果消息堆積大于內(nèi)存 40% 則建議從Slave Broker拉取消息(實(shí)現(xiàn)讀寫分離)
getResult.setSuggestPullingFromSlave(diff > memory);
第六步:根據(jù)PullResult填充responseHeader的NextBeginOffset、MinOffset、MaxOffset。
//org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
response.setRemark(getMessageResult.getStatus().name());
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
responseHeader.setMinOffset(getMessageResult.getMinOffset());
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
第七步:根據(jù)主從同步延遲,如果從節(jié)點(diǎn)數(shù)據(jù)包含下一次拉取的偏移量,則設(shè)置下一次拉取任務(wù)的brokerId。
//org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
第八步:GetMessageResult與Response進(jìn)行狀態(tài)編碼轉(zhuǎn)換。
//org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
switch (getMessageResult.getStatus()) {
case FOUND:
response.setCode(ResponseCode.SUCCESS);
break;
case MESSAGE_WAS_REMOVING:
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
break;
case NO_MATCHED_LOGIC_QUEUE:
case NO_MESSAGE_IN_QUEUE:
if (0 != requestHeader.getQueueOffset()) {
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",
requestHeader.getQueueOffset(),
getMessageResult.getNextBeginOffset(),
requestHeader.getTopic(),
requestHeader.getQueueId(),
requestHeader.getConsumerGroup()
);
} else {
response.setCode(ResponseCode.PULL_NOT_FOUND);
}
break;
case NO_MATCHED_MESSAGE:
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
break;
case OFFSET_FOUND_NULL:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_OVERFLOW_BADLY:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",
requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
break;
case OFFSET_OVERFLOW_ONE:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_TOO_SMALL:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
getMessageResult.getMinOffset(), channel.remoteAddress());
break;
default:
assert false;
break;
}
GetMessageStatus和ResponseCode轉(zhuǎn)換關(guān)系:
| ResponseCode | GetMessageStatus |
|---|---|
| SUCCESS | FOUND |
| PULL_RETRY_IMMEDIATELY | NO_MATCHED_MESSAGE、MESSAGE_WAS_REMOVING(消息存在下一個(gè)CommitLog里) |
| PULL_NOT_FOUND | NO_MESSAGE_IN_QUEUE(隊(duì)列中未包含消息)、NO_MATCHED_LOGIC_QUEUE(未找到隊(duì)列)、OFFSET_FOUND_NULL、OFFSET_OVERFLOW_ONE(offset剛好等于當(dāng)前最大的offset) |
| PULL_OFFSET_MOVED | NO_MESSAGE_IN_QUEUE(隊(duì)列中未包含消息)、NO_MATCHED_LOGIC_QUEUE(未找到隊(duì)列)、OFFSET_OVERFLOW_BADLY(offset越界)、OFFSET_TOO_SMALL(offset不在隊(duì)列中) |
第九步:如果CommitLog標(biāo)記為可用并且當(dāng)前節(jié)點(diǎn)為主節(jié)點(diǎn),則更新消息消費(fèi)進(jìn)度,消費(fèi)進(jìn)度將先存儲在Broker內(nèi)存中,由定時(shí)任務(wù)將消費(fèi)進(jìn)度寫入Broker本地的JSON文件中。
//org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable
&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
//如果CommitLog標(biāo)記為可用并且當(dāng)前節(jié)點(diǎn)為主節(jié)點(diǎn),則更新消息消費(fèi)進(jìn)度,
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
服務(wù)端消息拉取處理完畢,將返回結(jié)果拉取到消息調(diào)用方。在調(diào)用方,需要重點(diǎn)關(guān)注PULL_RETRY_IMMEDIATELY、PULL_OFFSET_MOVED、PULL_NOT_FOUND等情況下如何校正拉取偏移量。
4.3.3 消息拉取客戶端處理消息
回到消息拉取客戶端調(diào)用入口:MQClientAPIImpl#pullMessageAsync
第一步:根據(jù)響應(yīng)結(jié)果解碼成PullResultExt對象,此時(shí)只是從網(wǎng)絡(luò)中讀取消息列表中的byte[] messageBinary屬性
// org.apache.rocketmq.client.impl.MQClientAPIImpl#processPullResponse
private PullResult processPullResponse(
final RemotingCommand response) throws MQBrokerException, RemotingCommandException {
PullStatus pullStatus = PullStatus.NO_NEW_MSG;
switch (response.getCode()) {
case ResponseCode.SUCCESS:
pullStatus = PullStatus.FOUND;
break;
case ResponseCode.PULL_NOT_FOUND:
pullStatus = PullStatus.NO_NEW_MSG;
break;
case ResponseCode.PULL_RETRY_IMMEDIATELY:
pullStatus = PullStatus.NO_MATCHED_MSG;
break;
case ResponseCode.PULL_OFFSET_MOVED:
pullStatus = PullStatus.OFFSET_ILLEGAL;
break;
default:
throw new MQBrokerException(response.getCode(), response.getRemark());
}
NettyRemotingClient在收到服務(wù)端響應(yīng)結(jié)構(gòu)后,會回調(diào)PullCallback的onSuccess或onException,PullCallBack對象在DefaultMQPushConsumerImpl#pullMessage中創(chuàng)建。
第二步:調(diào)用pullAPIWrapper的processPullResult,將消息字節(jié)數(shù)組解碼成消息列表并填充msgFoundList,對消息進(jìn)行消息過濾(TAG 模式)
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
接下來按照正常流程,即分析拉取結(jié)果為PullStatus.FOUND(找到對應(yīng)的消息)的情況來分析整個(gè)消息拉取過程。
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
switch (pullResult.getPullStatus()) {
case FOUND:
// 獲取到了消息
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
// 如果msgFoundList為空,則立即將PullReqeuest放入PullMessageService的pullRequestQueue,
// 以便PullMessageSerivce能及時(shí)喚醒并再次執(zhí)行消息拉取
// 為什么PullResult.msgFoundList
//還會為空呢?因?yàn)镽ocketMQ根據(jù)TAG進(jìn)行消息過濾時(shí),在服務(wù)端只是驗(yàn)
//證了TAG的哈希碼,所以客戶端再次對消息進(jìn)行過濾時(shí),可能會出現(xiàn)
//msgFoundList為空的情況
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
// 將消息存入processQueue
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// 然后將拉取到的消息提交到Consume MessageService中供消費(fèi)者消費(fèi)
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
- 更新PullRequest的下一次拉取偏移量,如果msgFoundList為空,則立即將PullReqeuest放入PullMessageService 的pullRequestQueue,以便PullMessageSerivce能及時(shí)喚醒并再次執(zhí)行消息拉取。為什么PullStatus.msgFoundList 還會為空呢?因?yàn)镽ocketMQ根據(jù)TAG進(jìn)行消息過濾時(shí),在服務(wù)端只是驗(yàn)證了TAG的哈希碼,所以客戶端再次對消息進(jìn)行過濾時(shí),可能會出現(xiàn)msgFoundList為空的情況。
- 將拉取到的消息存入ProcessQueue,然后將拉取到的消息提交到ConsumeMessageService中供消費(fèi)者消費(fèi)。該方法是一個(gè)異步方法,也就是PullCallBack將消息提交到ConsumeMessageService中就會立即返回,至于這些消息如何消費(fèi),PullCallBack不會關(guān)注。
- 將消息提交給消費(fèi)者線程之后,PullCallBack將立即返回,可以說本次消息拉取順利完成。然后查看pullInterval參數(shù),如果pullInterval>0,則等待pullInterval毫秒后將PullRequest對象放入PullMessageService的pullRequestQueue中,該消息隊(duì)列的下次拉 取即將被激活,達(dá)到持續(xù)消息拉取,實(shí)現(xiàn)準(zhǔn)實(shí)時(shí)拉取消息的效果。
再來分析消息拉取異常處理是如何校對拉取偏移量。
- NO_NEW_MSG、NO_MATCHED_MSG
如果返回NO_NEW_MSG(沒有新消息)、NO_MATCHED_MSG(沒有匹配消息),則直接使用服務(wù)器端校正的偏移量進(jìn)行下一次消息的拉
?。?/p>
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
再來看服務(wù)端如何校正Offset:
NO_NEW_MSG 對應(yīng) GetMessageResult.OFFSET_FOUND_NULL、GetMessageResult.OFFSET_OVERFLOW_ONE。
OFFSET_OVERFLOW_ONE表示待拉取消息的物理偏移量等于消息隊(duì)列最大的偏移量,如果有新的消息到達(dá),此時(shí)會創(chuàng)建一個(gè)新的ConsumeQueue文件,因?yàn)樯弦粋€(gè)ConsueQueue文件的最大偏移量就是下一個(gè)文件的起始偏移量,所以可以按照該物理偏移量第二次拉取消息。
OFFSET_FOUND_NULL表示根據(jù)ConsumeQueue文件的偏移量沒有找到內(nèi)容,使用偏移量定位到下一個(gè)ConsumeQueue文件,其實(shí)就是offset+(一個(gè)ConsumeQueue文件包含多少個(gè)條目=MappedFileSize/20)。
- OFFSET_ILLEGAL
如果拉取結(jié)果顯示偏移量非法,首先將ProcessQueue的dropped設(shè)為true,表示丟棄該消費(fèi)隊(duì)列,意味著ProcessQueue中拉取的消息將停止消費(fèi),然后根據(jù)服務(wù)端下一次校對的偏移量嘗試更新消息消費(fèi)進(jìn)度(內(nèi)存中),然后嘗試持久化消息消費(fèi)進(jìn)度,并將該消息隊(duì)列從RebalacnImpl的處理隊(duì)列中移除,意味著暫停該消息隊(duì)列的消息拉取,等待下一次消息隊(duì)列重新負(fù)載。OFFSET_ILLEGAL對應(yīng)服務(wù)端GetMessageResult狀態(tài)的NO_MATCHED_LOGIC_QUEUE、NO_MESSAGE_IN_QUEUE、OFFSET_OVERFLOW_BADLY、OFFSET_TOO_SMALL,這些狀態(tài)服務(wù)端偏移量校正基本上使用原偏移量,在客戶端更新消息消費(fèi)進(jìn)度時(shí)只有當(dāng)消息進(jìn)度比當(dāng)前消費(fèi)進(jìn)度大才會覆蓋,以此保證消息進(jìn)度的準(zhǔn)確性。

4.3.4 消息拉取長輪詢機(jī)制分析
RocketMQ 并沒有真正實(shí)現(xiàn)推模式,而是消費(fèi)者主動向消息服務(wù)器拉取消息,RocketMQ 推模式是循環(huán)向消息服務(wù)端發(fā)送消息拉取請求,如果消息消費(fèi)者向 RocketMQ 發(fā)送消息拉取時(shí),消息并未到達(dá)消費(fèi)隊(duì)列,且未啟用長輪詢機(jī)制,則會在服務(wù)端等待 shortPollingTimeMills 時(shí)間后(掛起),再去判斷消息是否已到達(dá)消息隊(duì)列。如果消息未到達(dá),則提示消息拉取客戶端 PULL_NOT_FOUND(消息不存在),如果開啟長輪詢模式,RocketMQ 一方面會每 5s 輪詢檢查一次消息是否可達(dá),同時(shí)一有新消息到達(dá)后,立即通知掛起線程再次驗(yàn)證新消息是否是自己感興趣的,如果是則從 CommitLog 文件提取消息返回給消息拉取客戶端,否則掛起超時(shí),超時(shí)時(shí)間由消息拉取方在消息拉取時(shí)封裝在請求參數(shù)中,推模式默認(rèn)為15s,拉模式通過 DefaultMQPullConsumer#setBrokerSuspendMaxTimeMillis 進(jìn)行設(shè)置。RocketMQ 通過在 Broker 端配置 longPollingEnable 為true來開啟長輪詢模式。
消息拉取時(shí)服務(wù)端從CommitLog文件中未找到消息的處理邏輯:
// org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
/**
* 處理消息拉取請求
* @param channel 請求對應(yīng)網(wǎng)絡(luò)連接通道
* @param request 請求的內(nèi)容
* @param brokerAllowSuspend Broker端是否支持掛起,處理消息拉取時(shí)默認(rèn)傳入true,表示如果未找到消息則掛起,如果該參數(shù)為false,未找到消息時(shí)直接返回客戶端消息未找到。
* @return
* @throws RemotingCommandException
*/
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend){
.......省略其它邏輯.......
case ResponseCode.PULL_NOT_FOUND:
// broker是否開啟了長輪詢 以及 客戶端是否使用長輪詢
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
// 每隔5s重試一次
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
}
- Channel channel:網(wǎng)絡(luò)通道,通過該通道向消息拉取客戶端發(fā)送響應(yīng)結(jié)果。
- RemotingCommand request:消息拉取請求。
- boolean brokerAllowSuspend:Broker端是否支持掛起,處理消息拉取時(shí)默認(rèn)傳入true,表示如果未找到消息則掛起,如果該參數(shù) 為false,未找到消息時(shí)直接返回客戶端消息未找到。
如果brokerAllowSuspend為true,表示支持掛起,則將響應(yīng)對象response設(shè)置為null,不會立即向客戶端寫入響應(yīng),hasSuspendFlag 參數(shù)在拉取消息時(shí)構(gòu)建的拉取標(biāo)記默認(rèn)為true。
默認(rèn)支持掛起,根據(jù)是否開啟長輪詢決定掛起方式。如果開啟長輪詢模式,掛起超時(shí)時(shí)間來自請求參數(shù),推模式默認(rèn)為15s,拉模式通過DefaultMQPullConsumer#brokerSuspenMaxTimeMillis 進(jìn)行設(shè)置,默認(rèn)20s。然后創(chuàng)建拉取任務(wù) PullRequest 并提交到 PullRequestHoldService 線程中。
RocketMQ 輪詢機(jī)制由兩個(gè)線程共同完成。
- PullRequestHoldService:每隔5s重試一次。
- DefaultMessageStore#ReputMessageService:每處理一次重新拉取,線程休眠1s,繼續(xù)下一次檢查。
PullRequestHoldService線程詳解
// org.apache.rocketmq.broker.longpolling.PullRequestHoldService#suspendPullRequest
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (null == mpr) {
mpr = new ManyPullRequest();
ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
if (prev != null) {
mpr = prev;
}
}
mpr.addPullRequest(pullRequest);
}
根據(jù)消息主題與消息隊(duì)列構(gòu)建key,從 ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable 中獲取該主題隊(duì)列對應(yīng)的 ManyPullRequest,通過 ConcurrentMap 的并發(fā)特性,維護(hù)主題隊(duì)列的 ManyPullRequest,然后將 PullRequest 放入ManyPullRequest。ManyPullRequest 對象內(nèi)部持有一個(gè) PullRequest 列表,表示同一主題隊(duì)列的累積拉取消息任務(wù)。
//org.apache.rocketmq.broker.longpolling.PullRequestHoldService#run
@Override
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
// 默認(rèn)情況下,每個(gè)5s檢測所有長輪詢連接是否有新消息到達(dá)
this.waitForRunning(5 * 1000);
} else {
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
// 檢查長輪詢連接是否有新的消息到達(dá)
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 * 1000) {
log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
}
} catch (Throwable e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info("{} service end", this.getServiceName());
}
如果開啟長輪詢,每5s判斷一次新消息是否到達(dá)。如果未開啟長輪詢,則默認(rèn)等待1s再次判斷,可以通BrokerConfig#shortPollingTimeMills 改變等待時(shí)間。
// org.apache.rocketmq.broker.longpolling.PullRequestHoldService#checkHoldRequest
private void checkHoldRequest() {
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
}
}
}
}
遍歷拉取任務(wù)表,根據(jù)主題與隊(duì)列獲取消息消費(fèi)隊(duì)列的最大偏移量,如果該偏移量大于待拉取偏移量,說明有新的消息到達(dá),調(diào)用notifyMessageArriving觸發(fā)消息拉取。
//org.apache.rocketmq.broker.longpolling.PullRequestHoldService#notifyMessageArriving
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
List<PullRequest> requestList = mpr.cloneListAndClear();
第一步:首先從ManyPullRequest中獲取當(dāng)前該主題隊(duì)列所有的掛起拉取任務(wù)。
//org.apache.rocketmq.broker.longpolling.PullRequestHoldService#notifyMessageArriving
for (PullRequest request : requestList) {
long newestOffset = maxOffset;
if (newestOffset <= request.getPullFromThisOffset()) {
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
}
if (newestOffset > request.getPullFromThisOffset()) {
boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
// match by bit map, need eval again when properties is not null.
if (match && properties != null) {
match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
}
if (match) {
try {
// 如果消息匹配,則調(diào)用executeRequestWhenWakeup方法,將消息返回給客戶端
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
}
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
// 如果掛起超時(shí),則不繼續(xù)等待,直接返回客戶端消息未找到
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
replayList.add(request);
}
第二步:如果消息隊(duì)列的最大偏移量大于待拉取偏移量,且消息匹配,則調(diào)用executeRequestWhenWakeup將消息返回給消息拉取客戶端,否則等待下一次嘗試。如果掛起超時(shí),則不繼續(xù)等待,直接返回客戶消息未找到。
// org.apache.rocketmq.broker.processor.PullMessageProcessor#executeRequestWhenWakeup
public void executeRequestWhenWakeup(final Channel channel,
final RemotingCommand request) throws RemotingCommandException {
Runnable run = new Runnable() {
@Override
public void run() {
try {
// 拉取磁盤消息,并生成response。注意此處 brokerAllowSuspend 參數(shù)為false,代表不管是否拉取到消息都會直接返回,而不會hold線程
final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
if (response != null) {
response.setOpaque(request.getOpaque());
response.markResponseType();
try {
// 將response寫入連接channel中
channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
log.error("processRequestWrapper response to {} failed",
future.channel().remoteAddress(), future.cause());
log.error(request.toString());
log.error(response.toString());
}
}
});
} catch (Throwable e) {
log.error("processRequestWrapper process request over, but response failed", e);
log.error(request.toString());
log.error(response.toString());
}
}
} catch (RemotingCommandException e1) {
log.error("excuteRequestWhenWakeup run", e1);
}
}
};
this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
}
第四步:這里的核心又回到長輪詢的入口代碼了,其核心是設(shè)置brokerAllowSuspend為false,表示不支持拉取線程掛起,即當(dāng)根據(jù)偏移量無法獲取消息時(shí),將不掛起線程并等待新消息,而是直接返回告 訴客戶端本次消息拉取未找到消息。
回想一下,如果開啟了長輪詢機(jī)制,PullRequestHoldService線程每隔5s被喚醒,嘗試檢測是否有新消息到來,直到超時(shí)才停止,如果被掛起,需要等待5s再執(zhí)行。消息拉取的實(shí)時(shí)性比較差,為了避免這種情況,RocketMQ引入另外一種機(jī)制:當(dāng)消息到達(dá)時(shí)喚醒掛起線程,觸發(fā)一次檢查。
DefaultMessageStore#ReputMessageService詳解
ReputMessageService線程主要是根據(jù)CommitLog文件將消息轉(zhuǎn)發(fā)到ConsumeQueue、Index等文件。這里我們關(guān)注doReput()方法關(guān)于長輪詢的相關(guān)實(shí)現(xiàn):
//org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput
// 如果開啟了長輪詢
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
// 有新消息到達(dá),喚醒長輪詢等待的線程,返回消息
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
當(dāng)新消息達(dá)到CommitLog文件時(shí),ReputMessageService線程負(fù)責(zé) 將消息轉(zhuǎn)發(fā)給Consume Queue文件和Index文件,如果Broker端開啟了長輪詢模式并且當(dāng)前節(jié)點(diǎn)角色主節(jié)點(diǎn),則將調(diào)用PullRequestHoldService線程的notifyMessageArriving()方法喚醒掛起線程,判斷當(dāng)前消費(fèi)隊(duì)列最大偏移量是否大于待拉取偏移量,如果 大于則拉取消息。長輪詢模式實(shí)現(xiàn)了準(zhǔn)實(shí)時(shí)消息拉取。
小結(jié)
RocketMQ broker在處理消息拉取請求時(shí),如果消息未找到且brokerAllowSuspend為true(Broker端支持掛起)且開啟了長輪詢,會設(shè)置掛起超時(shí)時(shí)間,創(chuàng)建PullRequest并提交到PullRequestHoldService線程中。PullRequestHoldService 線程每 5 秒掃描所有被 hold 住的長輪詢請求,檢查是否有新消息到達(dá)并返回。為提高實(shí)時(shí)性,在 DefaultMessageStore#ReputMessageService 線程將 CommitLog 消息轉(zhuǎn)發(fā)到 ConsumeQueue 文件時(shí),若 Broker 端開啟長輪詢且當(dāng)前節(jié)點(diǎn)為主節(jié)點(diǎn),則調(diào)用 PullRequestHoldService 的 notifyMessageArriving 方法喚醒掛起線程,判斷消費(fèi)隊(duì)列最大偏移量與待拉取偏移量關(guān)系,若前者大于后者則拉取消息。
五. 消息隊(duì)列負(fù)載與重平衡
試想一下一個(gè)消費(fèi)組通常由多個(gè)消費(fèi)者實(shí)例消費(fèi),而一個(gè) ConsumeQueue 同一時(shí)間只能被一個(gè)消費(fèi)者實(shí)例消費(fèi),Topic的多個(gè)隊(duì)列該以何種方式分配給各個(gè)消費(fèi)者實(shí)例呢?在生產(chǎn)環(huán)境中會涉及到節(jié)點(diǎn)的擴(kuò)縮容以及發(fā)布場景下的節(jié)點(diǎn)上下線問題,如果消費(fèi)者實(shí)例數(shù)量發(fā)生變更,如何重新將 ConsumeQueue 重新分配給消費(fèi)者實(shí)例呢?
RocketMQ消息隊(duì)列重新分布是由RebalanceService線程實(shí)現(xiàn)的,一個(gè)MQClientInstance持有一個(gè)RebalanceService實(shí)現(xiàn),并隨著MQClientInstance的啟動而啟動。接下來我們帶著上面的問題,了解一下RebalanceService的run()方法。
// org.apache.rocketmq.client.impl.consumer.RebalanceService#run
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
// 默認(rèn)每20s執(zhí)行一次 doRebalance
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
RebalanceService 線程默認(rèn)每隔 20s 執(zhí)行一次 doRebalance() 方法。可以使用 -Drocketmq.client.rebalance.waitInterval=interval 改變默認(rèn)配置。
// org.apache.rocketmq.client.impl.factory.MQClientInstance#doRebalance
public void doRebalance() {
// 遍歷消費(fèi)者
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
// 對消費(fèi)者執(zhí)行 doRebalance
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
doRebalance()方法中,會循環(huán)遍歷所有注冊的消費(fèi)者信息,然后執(zhí)行對應(yīng)消費(fèi)組的 doRebalance() 方法:
// org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance
public void doRebalance(final boolean isOrder) {
// 一個(gè)consumerGroup可以同時(shí)消費(fèi)多個(gè)topic,所以此處返回的是一個(gè)Map,key 是 topic 名稱
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
每個(gè) DefaultMQPushConsumerImpl 都持有一個(gè)單獨(dú)的 RebalanceImpl 對象,該方法主要遍歷訂閱信息對每個(gè)主題的隊(duì)列進(jìn)行重新負(fù)載。RebalanceImpl 的 Map<String, SubscriptionData> subTable 在調(diào)用消費(fèi)者 DefaultMQPushConsumerImpl#subscribe 方法時(shí)填充。如果訂閱信息發(fā)生變化,例如調(diào)用了 unsubscribe() 方法,則需要將不關(guān)心的主題消費(fèi)隊(duì)列從 processQueueTable 中移除。接下來重點(diǎn)分析 RebalanceImpl#rebalanceByTopic,了解RocketMQ如何針對單個(gè)主題進(jìn)行消息隊(duì)列重新負(fù)載(以集群模式)。
第一步:從主題訂閱信息緩存表中獲取主題的隊(duì)列信息。發(fā)送請求從Broker中獲取該消費(fèi)組內(nèi)當(dāng)前所有的消費(fèi)者客戶端ID,主題的隊(duì)列可能分布在多個(gè)Broker上,那么請求該發(fā)往哪個(gè)Broker呢? RocketeMQ從主題的路由信息表中隨機(jī)選擇一個(gè)Broker。Broker為什么會存在消費(fèi)組內(nèi)所有消費(fèi)者的信息呢?我們不妨回憶一下,消費(fèi)者在啟動的時(shí)候會向MQClientInstance中注冊消費(fèi)者,然后MQClientInstance會向所有的Broker發(fā)送心跳包,心跳包中包含MQClientInstance的消費(fèi)者信息。如果mqSet、cidAll任意一個(gè)為空,則忽略本次消息隊(duì)列負(fù)載。
// org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic
case CLUSTERING: {
// 獲取topic 的隊(duì)列信息
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 發(fā)送請求,從broker中獲取該消費(fèi)組內(nèi)當(dāng)前所有的消費(fèi)者客戶端ID
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
第二步:對cidAll、mqAll進(jìn)行排序。這一步很重要,同一個(gè)消費(fèi)組內(nèi)看到的視圖應(yīng)保持一致,確保同一個(gè)消費(fèi)隊(duì)列不會被多個(gè)消費(fèi)者分配。然后調(diào)用 AllocateMessageQueueStrategy 隊(duì)列負(fù)載策略進(jìn)行隊(duì)列負(fù)載:
// org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
AllocateMessageQueueStrategy 是隊(duì)列分配算法的接口:
/**
* 多個(gè)消費(fèi)者之間分配消息隊(duì)列算法策略
*/
public interface AllocateMessageQueueStrategy {
/**
* Allocating by consumer id
*
* @param consumerGroup current consumer group
* @param currentCID current consumer id
* @param mqAll message queue set in current topic
* @param cidAll consumer set in current consumer group
* @return The allocate result of given strategy
*/
List<MessageQueue> allocate(
final String consumerGroup,
final String currentCID,
final List<MessageQueue> mqAll,
final List<String> cidAll
);
/**
* Algorithm name
*
* @return The strategy name
*/
String getName();
}
RocketMQ默認(rèn)提供5種分配算法:
-
AllocateMessageQueueAveragely:平均分配,推薦使用。
舉例來說,如果現(xiàn)在有8個(gè)消息消費(fèi)隊(duì)列q1、q2、q3、q4、q5、 q6、q7、q8,有3個(gè)消費(fèi)者c1、c2、c3,那么根據(jù)該負(fù)載算法,消息隊(duì)列分配如下。
c1:q1、q2、q3 c2:q4、q5、q6 c3:q7、q8。 -
AllocateMessageQueueAveragelyByCircle:平均輪詢分配,推薦使用。
舉例來說,如果現(xiàn)在有8個(gè)消息消費(fèi)隊(duì)列q1、q2、q3、q4、q5、 q6、q7、q8,有3個(gè)消費(fèi)者c1、c2、c3,那么根據(jù)該負(fù)載算法,消息隊(duì)列分配如下。
c1:q1、q4、q7 c2:q2、q5、q8 c3:q3、q6 -
AllocateMessageQueueConsistentHash:一致性哈希。因?yàn)橄㈥?duì)列負(fù)載信息不容易跟蹤,所以不推薦使用。
-
AllocateMessageQueueByConfig:根據(jù)配置,為每一個(gè)消費(fèi)者配置固定的消息隊(duì)列。
-
AllocateMessageQueueByMachineRoom:根據(jù)Broker部署機(jī)房名,對每個(gè)消費(fèi)者負(fù)責(zé)不同的Broker上的隊(duì)列。
當(dāng)然你也可以擴(kuò)展該接口實(shí)現(xiàn)自己的負(fù)載策略。
消息負(fù)載算法如果沒有特殊的要求,盡量使用 AllocateMessageQueueAveragely、AllocateMessageQueueAveragelyByCircle,這是因?yàn)榉峙渌惴ū容^直觀。消息隊(duì)列分配原則為一個(gè)消費(fèi)者可以分配多個(gè)消息隊(duì)列,但同一個(gè)消息隊(duì)列只會分配給一個(gè)消費(fèi)者,故如果消費(fèi)者個(gè)數(shù)大于消息隊(duì)列數(shù)量,則有些消費(fèi)者無法消費(fèi)消息。
對比消息隊(duì)列是否發(fā)生變化,主要思路是遍歷當(dāng)前負(fù)載隊(duì)列集合,如果隊(duì)列不在新分配隊(duì)列的集合中,需要將該隊(duì)列停止消費(fèi)并保存消費(fèi)進(jìn)度;遍歷已分配的隊(duì)列,如果隊(duì)列不在隊(duì)列負(fù)載表中(processQueueTable),則需要?jiǎng)?chuàng)建該隊(duì)列拉取任務(wù)PullRequest, 然后添加到PullMessageService 線程的 pullRequestQueue中,PullMessageService才會繼續(xù)拉取任務(wù):
第三步:ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable 是當(dāng)前消費(fèi)者負(fù)載的消息隊(duì)列緩存表,如果緩存表中的MessageQueue 不包含在 mqSet 中,說明經(jīng)過本次消息隊(duì)列負(fù)載后,該mq被分配給其他消費(fèi)者,需要暫停該消息隊(duì)列消息的消費(fèi)。方法是將 ProccessQueue 的狀態(tài)設(shè)置為 droped=true,該 ProcessQueue 中的消息將不會再被消費(fèi),調(diào)用 removeUnnecessaryMessageQueue 方法判斷是否將 MessageQueue、ProccessQueue 從緩存表中移除。removeUnnecessaryMessageQueue 在 RebalanceImple 中定義為抽象方法。removeUnnecessaryMessageQueue 方法主要用于持久化待移除 MessageQueue 的消息消費(fèi)進(jìn)度。在推模式下,如果是集群模式并且是順序消息消費(fèi),還需要先解鎖隊(duì)列。關(guān)于順序消息我們在 《RocketMQ順序消息實(shí)現(xiàn)原理》 詳細(xì)介紹。
// org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance
// processQueueTable是當(dāng)前消費(fèi)者負(fù)載的消息隊(duì)列緩存表,如果緩存表中的MessageQueue不包含在mqSet中,說明經(jīng)過本次消息隊(duì)列負(fù)載后,
// 該mq被分配給其他消費(fèi)者,需要暫停該消息隊(duì)列消息的消費(fèi)
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {
// 如果緩存表中的MessageQueue不包含在mqSet中,說明經(jīng)過本次消息隊(duì)列負(fù)載后,該mq被分配給其他消費(fèi)者
pq.setDropped(true);
// 持久化消費(fèi)進(jìn)度至broker
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
第四步:遍歷本次負(fù)載分配到的隊(duì)列集合,如果 processQueueTable 中沒有包含該消息隊(duì)列,表明這是本次新增加的消息隊(duì)列,首先從內(nèi)存中移除該消息隊(duì)列的消費(fèi)進(jìn)度,然后從磁盤或者Broker中(根據(jù)消費(fèi)模式)讀取該消息隊(duì)列的消費(fèi)進(jìn)度。然后將新分配隊(duì)列的PullRequest放入 PullMessageService 任務(wù)隊(duì)列中,這樣消費(fèi)者就可以開始拉取消息。
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
// 如果 mqSet 不存在于本地緩存中,則證明是新增加的消息隊(duì)列
// 如果是順序消費(fèi),則需要對queue進(jìn)行加鎖,只有加鎖成功才會創(chuàng)建 PullRequest 拉取broker消息
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
// 從內(nèi)存中移除消息進(jìn)度(臟數(shù)據(jù))
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
// 計(jì)算消息消費(fèi)的起始偏移量
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 將PullRequest 放入 PullMessageService 任務(wù)隊(duì)列中,這樣消費(fèi)者就可以開始拉取消息
this.dispatchPullRequest(pullRequestList);
return changed;
}
小結(jié)
RocketMQ 消息隊(duì)列的負(fù)載與重平衡由 RebalanceService 線程實(shí)現(xiàn),默認(rèn)每隔 20s 執(zhí)行一次 doRebalance 方法。具體過程如下:
- RebalanceService 線程每隔 20s 觸發(fā)一次 doRebalance,遍歷所有注冊的消費(fèi)者信息執(zhí)行對應(yīng)消費(fèi)組的 doRebalance 方法,對每個(gè)主題的隊(duì)列進(jìn)行重新負(fù)載。
- 在 RebalanceImpl#rebalanceByTopic 中,首先從主題訂閱信息緩存表中獲取主題的隊(duì)列信息,并從 Broker 中獲取消費(fèi)組內(nèi)當(dāng)前所有消費(fèi)者客戶端 ID。
- 對消費(fèi)者 ID 和隊(duì)列進(jìn)行排序,然后調(diào)用 AllocateMessageQueueStrategy 隊(duì)列負(fù)載策略進(jìn)行隊(duì)列負(fù)載,RocketMQ 默認(rèn)提供五種分配算法,推薦使用平均分配或平均輪詢分配算法。
- 對比消息隊(duì)列是否發(fā)生變化,若緩存表中的 MessageQueue 不在新分配隊(duì)列集合中,需暫停該隊(duì)列消息消費(fèi)并保存消費(fèi)進(jìn)度;若已分配隊(duì)列不在緩存表中,則創(chuàng)建該隊(duì)列拉取任務(wù)并添加到 PullMessageService 線程任務(wù)隊(duì)列。
- 遍歷本次負(fù)載分配到的隊(duì)列集合,若 processQueueTable 中沒有包含該消息隊(duì)列,表明是新增加的消息隊(duì)列,需讀取消費(fèi)進(jìn)度并將新分配隊(duì)列的 PullRequest 放入 PullMessageService 任務(wù)隊(duì)列以便消費(fèi)者拉取消息。

六. 消息的消費(fèi)過程
我們先回顧一下消息拉取的過程:PullMessageService 負(fù)責(zé)對消息隊(duì)列進(jìn)行消息拉取,從遠(yuǎn)端服務(wù)器拉取消息后存入 ProcessQueue 消息處理隊(duì)列中,然后調(diào)用 ConsumeMessageService#submitConsumeRequest 方法進(jìn)行消息消費(fèi)。 使用線程池消費(fèi)消息,確保了消息拉取與消息消費(fèi)的解耦。RocketMQ 使用 ConsumeMessageService 來實(shí)現(xiàn)消息消費(fèi)的處理邏輯。RocketMQ 支持順序消費(fèi)與并發(fā)消費(fèi),本節(jié)將重點(diǎn)關(guān)注并發(fā)消費(fèi)的流程。ConsumeMessageService 核心方法如下:
- ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName): 直接消費(fèi)消息,主要用于通過管理命令接收消費(fèi)消息。
- void submitConsumeRequest(List msgs, ProcessQueue processQueue, MessageQueue messageQueue, boolean dispathToConsume):提交消息消費(fèi)。
ConsumeMessageConcurrentlyService并發(fā)消息消費(fèi)核心參數(shù)解釋:
- DefaultMQPushConsumerImpl defaultMQPushConsumerImpl: 消息推模式實(shí)現(xiàn)類。
- DefaultMQPushConsumer defaultMQPushConsumer:消費(fèi)者對象。
- MessageListenerConcurrently messageListener:并發(fā)消息業(yè)務(wù)事件類。
- BlockingQueue consumeRequestQueue:消息消費(fèi)任務(wù)隊(duì)列。
- ThreadPoolExecutor consumeExecutor:消息消費(fèi)線程池。
- String consumerGroup:消費(fèi)組。
- ScheduledExecutorService scheduledExecutorService:添加消費(fèi)任務(wù)到consumeExecutor延遲調(diào)度器
- ScheduledExecutorService cleanExpireMsgExecutors:定時(shí)刪除過期消息線程池。為了揭示消息消費(fèi)的完整過程,從服務(wù)器拉取到消息后,回調(diào)
PullCallBack方法,先將消息放入ProccessQueue中, 然后把消息提交到消費(fèi)線程池中執(zhí)行,也就是調(diào)用ConsumeMessageService#submitConsumeRequest開始進(jìn)入消息消費(fèi)的流程。
6.1 消息消費(fèi)
消費(fèi)者消息消費(fèi)服務(wù) ConsumeMessageConcurrentlyService 的主要方法是 submitConsumeRequest 提交消費(fèi)請求
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest
// consumeMessageBatchMaxSize表示消息批次,也就是一次消息消費(fèi)任務(wù)ConsumeRequest中包含的消息條數(shù),默認(rèn)為1。
// msgs.size()默認(rèn)最多為32條消息,受DefaultMQPushConsumer.pullBatchSize屬性控制,如果msgs.size()
// 小于consumeMessage BatchMaxSize,則直接將拉取到的消息放入ConsumeRequest,然后將consumeRequest提交到消息消費(fèi)者線程池中
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
// 如果提交過程中出現(xiàn)拒絕提交異常,則延遲5s再提交
this.submitConsumeRequestLater(consumeRequest);
}
}
第一步:consumeMessageBatchMaxSize 表示消息批次,也就是一次消息消費(fèi)任務(wù) ConsumeRequest 中包含的消息條數(shù),默認(rèn)為1。msgs.size() 默認(rèn)最多為32條消息,受 DefaultMQPushConsumer.pullBatchSize 屬性控制,如果 msgs.size() 小于 consumeMessage BatchMaxSize,則直接將拉取到的消息放入 ConsumeRequest,然后將 consumeRequest 提交到消息消費(fèi)者線程池中。如果提交過程中出現(xiàn)拒絕提交異常,則延遲5s再提交。這里其實(shí)是給出一種標(biāo)準(zhǔn)的拒絕提交實(shí)現(xiàn)方式,實(shí)際上,由于消費(fèi)者線程池使用的任務(wù)隊(duì)列LinkedBlockingQueue 為無界隊(duì)列,故不會出現(xiàn)拒絕提交異常。
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest
// 如果拉取的消息條數(shù)大于consumeMessageBatchMaxSize,則對拉取消息進(jìn)行分頁,每頁
// consumeMessageBatchMaxSize條消息,創(chuàng)建多個(gè)ConsumeRequest任務(wù)并提交到消費(fèi)線程池
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
第二步:如果拉取的消息條數(shù)大于 consumeMessageBatchMaxSize,則對拉取消息進(jìn)行分頁,每頁 consumeMessageBatchMaxSize 條消息,創(chuàng)建多個(gè) ConsumeRequest 任務(wù)并提交到消費(fèi)線程池。
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
第三步:進(jìn)入具體的消息消費(fèi)隊(duì)列時(shí),會先檢查 processQueue 的 dropped,如果設(shè)置為true,則停止該隊(duì)列的消費(fèi)。在進(jìn)行消息重新負(fù)載時(shí),如果該消息隊(duì)列被分配給消費(fèi)組內(nèi)的其他消費(fèi)者,需要將 droped 設(shè)置為true,阻止消費(fèi)者繼續(xù)消費(fèi)不屬于自己的消息隊(duì)列。
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run
// 執(zhí)行消息消費(fèi)鉤子函數(shù)ConsumeMessageHook#consumeMessageBefore。通過consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(hook)方法消息消費(fèi)執(zhí)行鉤子函數(shù)
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
第四步:執(zhí)行消息消費(fèi)鉤子函數(shù)。
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run
// 消息消費(fèi)開始時(shí)間
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
// 執(zhí)行業(yè)務(wù)代碼消費(fèi)消息
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
// 若出現(xiàn)異常,則設(shè)置未true
hasException = true;
}
// 消息消費(fèi)耗時(shí)
long consumeRT = System.currentTimeMillis() - beginTimestamp;
第六步:執(zhí)行具體的消息消費(fèi),調(diào)用應(yīng)用程序消息監(jiān)聽器的 consumeMessage 方法,進(jìn)入具體的消息消費(fèi)業(yè)務(wù)邏輯,返回該批消息的消費(fèi)結(jié)果,即CONSUME_SUCCESS(消費(fèi)成功)或 RECONSUME_LATER(需要重新消費(fèi))。
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run
// 執(zhí)行消息消費(fèi)鉤子函數(shù)ConsumeMessageHook#consumeMessageAfter
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
第七步:執(zhí)行消息消費(fèi)后置鉤子函數(shù)。
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run
// 執(zhí)行業(yè)務(wù)消息消費(fèi)后,在處理結(jié)果前再次驗(yàn)證一次ProcessQueue的isDroped狀態(tài)值。如果狀態(tài)值為true,將不對結(jié)果進(jìn)
// 行任何處理。也就是說,在消息消費(fèi)進(jìn)入第四步時(shí),如果因新的消費(fèi)者加入或原先的消費(fèi)者出現(xiàn)宕機(jī),導(dǎo)致原先分配給消費(fèi)者的隊(duì)列在負(fù)
// 載之后分配給了別的消費(fèi)者,那么消息會被重復(fù)消費(fèi)
if (!processQueue.isDropped()) {
// 處理消息消費(fèi)結(jié)果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
第八步:執(zhí)行業(yè)務(wù)消息消費(fèi)后,在處理結(jié)果前再次驗(yàn)證一次 ProcessQueue 的 isDroped 狀態(tài)值。如果狀態(tài)值為true,將不對結(jié)果進(jìn)行任何處理。也就是說,在消息消費(fèi)進(jìn)入第四步時(shí),如果因新的消費(fèi)者加入或原先的消費(fèi)者出現(xiàn)宕機(jī),導(dǎo)致原先分配給消費(fèi)者的隊(duì)列在負(fù)載之后分配給了別的消費(fèi)者,那么消息會被重復(fù)消費(fèi)。
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult
// 根據(jù)消息監(jiān)聽器返回的結(jié)果計(jì)算ackIndex
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
// 如果返回CONSUME_SUCCESS,則將ackIndex設(shè)置為msgs.size()-1,這樣在后面就不會執(zhí)行 sendMessageBack,將消息重新
// 發(fā)送至broker retry隊(duì)列中去嘗試重新消費(fèi)該消息。
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
// 如果返回 RECONSUME_LATER,則將ackIndex設(shè)置為-1。這樣就會將這一批消息全部發(fā)送至broker retry topic中,然后消費(fèi)者就能重新消費(fèi)到這一批消息
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
第九步:根據(jù)消息監(jiān)聽器返回的結(jié)果計(jì)算 ackIndex,如果返回 CONSUME_SUCCESS,則將 ackIndex 設(shè)置為 msgs.size()-1,如果返回RECONSUME_LATER ,則將 ackIndex 設(shè)置為 -1,這是為下文消息ACK做準(zhǔn)備。
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
// 將消息重新發(fā)送至broker的 retry topic中,
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
// 消息確認(rèn)失敗,則五秒后重新消費(fèi)消息
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
第十步:如果是廣播模式,業(yè)務(wù)代碼返回 RECONSUME_LATER,消息并不會被重新消費(fèi),而是以警告級別輸出到日志文件中。
如果是集群模式,消息消費(fèi)成功,因?yàn)?ackIndex=consumeRequest.getMsgs().size()-1,所以 i=ackIndex+1 等于consumeRequest.getMsgs().size(),并不會執(zhí)行 sendMessageBack。 只有在業(yè)務(wù)代碼返回 RECONSUME_LATER 時(shí),該批消息都需要發(fā)送至Broker的重試隊(duì)列中,如果消息發(fā)送失敗,則直接將本次發(fā)送失敗的消息再次封裝為ConsumeRequest,然后延遲5s重新消費(fèi)。如果ACK消息發(fā)送成功,則該消息會延遲消費(fèi)。
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult
// 從 processQueue中移除已確認(rèn)消息,返回的偏移量是移除該批消息后最小的偏移量。
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
// 然后更新已消費(fèi)的offset,以便消費(fèi)者重啟后能從上一次的消費(fèi)進(jìn)度開始消費(fèi)
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
第十一步:從 ProcessQueue 中移除這批消息,這里返回的偏移量是移除該批消息后內(nèi)存中正在處理的消息的最小的偏移量。然后用該偏移量更新消息消費(fèi)進(jìn)度,以便消費(fèi)者重啟后能從上一次的消費(fèi)進(jìn)度開始消費(fèi),避免消息重復(fù)消費(fèi)。值得注意的是,當(dāng)消息監(jiān)聽器返回 RECONSUME_LATER 時(shí),消息消費(fèi)進(jìn)度也會向前推進(jìn),并用 ProcessQueue 中最小的隊(duì)列偏移量調(diào)用消息消費(fèi)進(jìn)度存儲器 OffsetStore 更新消費(fèi)進(jìn)度。這是因?yàn)楫?dāng)返回 RECONSUME_LATER 時(shí),RocketMQ 會創(chuàng)建一條與原消息屬性相同的消息,擁有一個(gè)唯一的新 msgId,并存儲原消息ID,該消息會存入 CommitLog 文件,與原消息沒有任何關(guān)聯(lián),所以該消息也會進(jìn)入 ConsuemeQueue, 并擁有一個(gè)全新的隊(duì)列偏移量。
為啥會使用內(nèi)存中剩余消息最小偏移量更新消費(fèi)進(jìn)度,這是因?yàn)椴l(fā)消費(fèi)模式下,不同消息的消費(fèi)完成無法保證順序。例如按照順序拉取到了4條消息 a,b,c,d,由于是并發(fā)消費(fèi),這四條消息可能被消費(fèi)者線程同時(shí)消費(fèi),假設(shè)消息d先消費(fèi)完成,此時(shí)更新消費(fèi)進(jìn)度,因?yàn)閍、b、c沒有消費(fèi)完成,不能將進(jìn)度更新為消息d的offset,而是將消息d從
ProcessQueue中移除,移除后內(nèi)存只剩下 a、b、c 三條消息,此時(shí)會將消費(fèi)進(jìn)度更新為ProcessQueue中最小的消息偏移量,也就是 a。
6.2 消息消費(fèi)失敗重試機(jī)制
如果消息監(jiān)聽器返回的消費(fèi)結(jié)果為 RECONSUME_LATER,則需要將這些消息發(fā)送給Broker的重試topic中。如果客戶端發(fā)送重試消息至Broker失敗,將延遲5s后提交線程池進(jìn)行消費(fèi)。
重試消息發(fā)送的網(wǎng)絡(luò)客戶端入口為 MQClientAPIImpl#consumerSendMessageBack,命令編碼為 RequestCode.CONSUMER_SEND_MSG_BACK。
ConsumerSendMsgBackRequestHeader 的核心屬性:
public class ConsumerSendMsgBackRequestHeader implements CommandCustomHeader {
/**
* 消息物理偏移量,因?yàn)樾枰卦嚨南⒃贐roker中本來就有,所以發(fā)送重試消息只需要發(fā)送消息的物理偏移量即可
*/
@CFNotNull
private Long offset;
/**
* 消費(fèi)組名
*/
@CFNotNull
private String group;
/**
* 延遲級別。RcketMQ不支持精確的定時(shí)消息調(diào)度,而是提供幾個(gè)延時(shí)級別,MessageStoreConfig#messageDelayLevel = "1s 5s 10s 30s 1m 2m
* 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h",delayLevel=1,表示延遲5s,delayLevel=2,表示延遲10s。
*/
@CFNotNull
private Integer delayLevel;
/**
* 原消息的消息ID
*/
private String originMsgId;
/**
* 原消息的topic
*/
private String originTopic;
@CFNullable
private boolean unitMode = false;
/**
* 最大重新消費(fèi)次數(shù),默認(rèn)16次。
*/
private Integer maxReconsumeTimes;
}
客戶端以同步方式發(fā)送 RequestCode.CONSUMER_SEND 到服務(wù)端。服務(wù)端命令處理器為 org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack
// org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack
// 獲取消費(fèi)組的訂閱配置信息
// 獲取消費(fèi)組的訂閱配置信息
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return response;
}
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
return response;
}
if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
第一步:獲取消費(fèi)組的訂閱配置信息,如果配置信息為空,返回配置組信息不存在錯(cuò)誤,如果重試隊(duì)列數(shù)量小于等于0,則直接返回成功,說明該消費(fèi)組不支持重試。
我們先逐一介紹 SubscriptionGroupConfig 的核心屬性:
public class SubscriptionGroupConfig {
/**
* 消費(fèi)組名
*/
private String groupName;
/**
* 是否可以消費(fèi),默認(rèn)為true,如果consumeEnable=false,該消費(fèi)組無法拉取消息,因而無法消費(fèi)消息
*/
private boolean consumeEnable = true;
/**
* 是否允許從隊(duì)列最小偏移量開始消費(fèi),默認(rèn)為true,目前未使用該參數(shù)
*/
private boolean consumeFromMinEnable = true;
/**
* 設(shè)置該消費(fèi)組是否能以廣播模式消費(fèi),默認(rèn)為true,如果設(shè)置為false,表示只能以集群模式消費(fèi)
*/
private boolean consumeBroadcastEnable = true;
/**
* 重試隊(duì)列個(gè)數(shù),默認(rèn)為1,每一個(gè)Broker上有一個(gè)重試隊(duì)列
*/
private int retryQueueNums = 1;
/**
* 消息最大重試次數(shù),默認(rèn)16次
*/
private int retryMaxTimes = 16;
/**
* 主節(jié)點(diǎn)ID
*/
private long brokerId = MixAll.MASTER_ID;
/**
* 如果消息堵塞(主節(jié)點(diǎn)),將轉(zhuǎn)向該brokerId的服務(wù)器上拉取消息,默認(rèn)為1
*/
private long whichBrokerWhenConsumeSlowly = 1;
/**
* 當(dāng)消費(fèi)發(fā)生變化時(shí),是否
* 立即進(jìn)行消息隊(duì)列重新負(fù)載。消費(fèi)組訂閱信息配置信息存儲在Broker
* 的 ${ROCKET_HOME}/store/config/subscriptionGroup.json中。
* BrokerConfig.autoCreateSubscriptionGroup默認(rèn)為true,表示在第
* 一次使用消費(fèi)組配置信息時(shí)如果不存在消費(fèi)組,則使用上述默認(rèn)值自
* 動創(chuàng)建一個(gè),如果為false,則只能通過客戶端命令mqadmin
* updateSubGroup創(chuàng)建消費(fèi)組后再修改相關(guān)參數(shù)
*/
private boolean notifyConsumerIdsChangedEnable = true;
}
回到 consumerSendMsgBack 方法:
// org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack
// 創(chuàng)建重試主題,重試主題名稱為%RETRY%+消費(fèi)組名稱,從重試隊(duì)列中隨機(jī)選擇一個(gè)隊(duì)列,并構(gòu)建TopicConfig主題配置信息
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
// 如果沒有重試主題則創(chuàng)建一個(gè)
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
if (!PermName.isWriteable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
return response;
}
第二步:創(chuàng)建重試主題,重試主題名稱為%RETRY%+消費(fèi)組名稱,從重試隊(duì)列中隨機(jī)選擇一個(gè)隊(duì)列,并構(gòu)建 TopicConfig 主題配置信息。
// org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack
// 根據(jù)消息物理偏移量從CommitLog中獲取消息
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " + requestHeader.getOffset());
return response;
}
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
// 將消息的主題信息存入屬性
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);
第三步:根據(jù)消息物理偏移量從CommitLog文件中獲取消息,因?yàn)樾枰卦嚨南⒃贐roker中本來就有,所以發(fā)送重試消息只發(fā)送消息的物理偏移量并沒有發(fā)送消息內(nèi)容。同時(shí)將消息的主題存入屬性。
// org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack
int delayLevel = requestHeader.getDelayLevel();
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
// 設(shè)置消息重試次數(shù),如果消息重試次數(shù)已超過maxReconsumeTimes,再次改變newTopic主題為DLQ("%DLQ%"),該主
// 題的權(quán)限為只寫,說明消息一旦進(jìn)入DLQ隊(duì)列,RocketMQ將不負(fù)責(zé)再次調(diào)度消費(fèi)了,需要人工干預(yù)
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0
);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
} else {
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
// 設(shè)置延時(shí)級別
msgExt.setDelayTimeLevel(delayLevel);
}
第四步:設(shè)置消息重試次數(shù),如果消息重試次數(shù)已超過 maxReconsumeTimes,再次改變 newTopic 主題為DLQ("%DLQ%")也就是死信隊(duì)列,該主題的權(quán)限為只寫,說明消息一旦進(jìn)入DLQ隊(duì)列,RocketMQ將不負(fù)責(zé)再次調(diào)度消費(fèi)了,需要人工干預(yù)。
// org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack
// 根據(jù)原先的消息創(chuàng)建一個(gè)新的消息對象,重試消息會擁有一個(gè)唯一消息ID(msgId)并存入CommitLog文件。這里不會更新原
// 先的消息,而是會將原先的主題、消息ID存入消息屬性,主題名稱為重試主題,其他屬性與原消息保持一致。
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
// 將重試的消息放入重試topic,或則死信topic
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
// 原始的消息ID
String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
// 設(shè)置原始的消息ID
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
第五步:根據(jù)原先的消息創(chuàng)建一個(gè)新的消息對象,重試消息會擁有一個(gè)唯一消息ID(msgId)并存入CommitLog文件。這里不會更新原先的消息,而是會將原先的主題、消息ID存入消息屬性,主題名稱為重試主題,其他屬性與原消息保持一致。
// org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack
// 將重試的消息寫入重試topic,或則死信topic。根據(jù) newTopic 變量決定寫入哪個(gè)topic
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
if (putMessageResult != null) {
switch (putMessageResult.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {
backTopic = correctTopic;
}
this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
default:
break;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(putMessageResult.getPutMessageStatus().name());
return response;
}
第六步:將消息存入 CommitLog 文件。這里想再重點(diǎn)突出消息重試機(jī)制,該機(jī)制的實(shí)現(xiàn)依托于RocketMQ延時(shí)消息機(jī)制。在第四步,會設(shè)置消息的延時(shí)級別,設(shè)置延時(shí)級別后消息實(shí)際上并不會直接發(fā)送至重試topic中(SCHEDULE_TOPIC_XXXX),而是先發(fā)送至延時(shí)隊(duì)列中,當(dāng)延時(shí)到期后才會由定時(shí)任務(wù)將消息發(fā)送回真正的重試隊(duì)列中,此時(shí)就能由客戶端消費(fèi)到重試隊(duì)列中的消息了。延時(shí)消息具體原理可參考:RocketMQ延遲消息
6.3 消費(fèi)進(jìn)度管理
消息消費(fèi)者在消費(fèi)一批消息后,需要記錄該批消息已經(jīng)消費(fèi)完畢,否則當(dāng)消費(fèi)者重新啟動時(shí),又要從消息消費(fèi)隊(duì)列最開始消費(fèi)。從6.1節(jié)也可以看到,一次消息消費(fèi)后會從 ProcessQueue 處理隊(duì)列中移除該批消息,返回 ProcessQueue 內(nèi)存中正在處理消息的最小偏移量,并存入消息進(jìn)度表。 那么消息進(jìn)度文件存儲在哪里合適呢?
1)廣播模式:同一個(gè)消費(fèi)組的所有消息消費(fèi)者都需要消費(fèi)主題下的所有消息,也就是同組內(nèi)消費(fèi)者的消息消費(fèi)行為是對立的,互相不影響,故消息進(jìn)度需要獨(dú)立存儲,最理想的存儲地方應(yīng)該是與消費(fèi)者綁定。
2)集群模式:同一個(gè)消費(fèi)組內(nèi)的所有消息消費(fèi)者共享消息主題下的所有消息,同一條消息(同一個(gè)消息消費(fèi)隊(duì)列)在同一時(shí)間只會被消費(fèi)組內(nèi)的一個(gè)消費(fèi)者消費(fèi),并且隨著消費(fèi)隊(duì)列的動態(tài)變化而重新負(fù)載,因此消費(fèi)進(jìn)度需要保存在每個(gè)消費(fèi)者都能訪問到的地方。
RocketMQ消息消費(fèi)進(jìn)度接口如下:
/**
* Offset store interface
*/
public interface OffsetStore {
/**
* 從消息進(jìn)度存儲文件加載消息進(jìn)度到內(nèi)存
* Load
*/
void load() throws MQClientException;
/**
* 更新內(nèi)存中的消息消費(fèi)進(jìn)度
* @param mq 消息消費(fèi)隊(duì)列
* @param offset 消息消費(fèi)偏移量
* @param increaseOnly true表示offset必須大于內(nèi)存中當(dāng)前的消費(fèi)偏移量才更新
*/
void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);
/**
* 讀取消息消費(fèi)進(jìn)度
* @param mq 消息消費(fèi)隊(duì)列
* @param type 讀取方式,可選值包括
* READ_FROM_MEMORY,即從內(nèi)存中讀取,READ_FROM_STORE,即從磁盤中讀取,MEMORY_FIRST_THEN_STORE,即先從內(nèi)存中讀取,再從磁盤中讀取
* @return
*/
long readOffset(final MessageQueue mq, final ReadOffsetType type);
/**
* 持久化指定消息隊(duì)列進(jìn)度到磁盤
* @param mqs
*/
void persistAll(final Set<MessageQueue> mqs);
/**
* Persist the offset,may be in local storage or remote name server
*/
void persist(final MessageQueue mq);
/**
* 將消息隊(duì)列的消息消費(fèi)進(jìn)度從內(nèi)存中移除。
* @param mq
*/
void removeOffset(MessageQueue mq);
/**
* 復(fù)制該主題下所有消息隊(duì)列的消息消費(fèi)進(jìn)度。
* @param topic
* @return
*/
Map<MessageQueue, Long> cloneOffsetTable(String topic);
/**
* 使用集群模式更新存儲在Broker端的消息消費(fèi)進(jìn)度
* @param mq
* @param offset
* @param isOneway
*/
void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException;
}
6.3.1 廣播模式消費(fèi)進(jìn)度存儲
廣播模式消息消費(fèi)進(jìn)度存儲在消費(fèi)者本地,其實(shí)現(xiàn)類為 LocalFileOffsetStore:
public class LocalFileOffsetStore implements OffsetStore {
/**
* 消息進(jìn)度存儲目錄
*/
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
"rocketmq.client.localOffsetStoreDir",
System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
private final static InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mQClientFactory;
private final String groupName;
/**
* 消息進(jìn)度存儲文件LOCAL_OFFSET_STORE_DIR/.rocketmq_offsets/{mQClientFactory.getClientId()}/groupName/offsets.json
*/
private final String storePath;
/**
* 消息消費(fèi)進(jìn)度(內(nèi)存)
*/
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>();
}
下面對 LocalFileOffsetStore 核心方法進(jìn)行簡單介紹,load方法用于從磁盤加載消息消費(fèi)進(jìn)度:
// org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore#load
@Override
public void load() throws MQClientException {
// OffsetSerializeWrapper內(nèi)部就是ConcurrentMap<MessageQueue,AtomicLong>offsetTable數(shù)據(jù)結(jié)構(gòu)的封裝,readLocalOffset方法首先
// 從storePath中嘗試加載內(nèi)容,如果讀取的內(nèi)容為空,嘗試從storePath+".bak"中加載,如果還是未找到內(nèi)容,則返回null。
OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
log.info("load consumer's offset, {} {} {}",
this.groupName,
mq,
offset.get());
}
}
}
OffsetSerializeWrapper 內(nèi)部就是 ConcurrentMap<MessageQueue, AtomicLong> offsetTable 數(shù)據(jù)結(jié)構(gòu)的封裝,readLocalOffset 方法首先從 storePath 中嘗試加載內(nèi)容,如果讀取的內(nèi)容為空,嘗試從 storePath+".bak" 中加載,如果還是未找到內(nèi)容,則返回null。
廣播消息消費(fèi)進(jìn)度默認(rèn)存儲在:

消息進(jìn)度文件存儲內(nèi)容:
{
"offsetTable":{{
"brokerName":"broker-a",
"queueId":2,
"topic":"TopicTest"
}:333,{
"brokerName":"broker-a",
"queueId":1,
"topic":"TopicTest"
}:335,{
"brokerName":"broker-a",
"queueId":0,
"topic":"TopicTest"
}:335
}
}
persistAll方法用于將內(nèi)存中的消費(fèi)進(jìn)度持久化到磁盤中:
// org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore#persistAll
@Override
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
return;
OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
if (mqs.contains(entry.getKey())) {
AtomicLong offset = entry.getValue();
offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
}
}
String jsonString = offsetSerializeWrapper.toJson(true);
if (jsonString != null) {
try {
// 將內(nèi)存中的廣播消費(fèi)進(jìn)度存儲至磁盤
MixAll.string2File(jsonString, this.storePath);
} catch (IOException e) {
log.error("persistAll consumer offset Exception, " + this.storePath, e);
}
}
}
持久化消息進(jìn)度就是將 ConcurrentMap<MessageQueue, AtomicLong> offsetTable 序列化到磁盤文件中。在 MQClientInstance 中會啟動一個(gè)定時(shí)任務(wù),默認(rèn)每5s持久化消息消費(fèi)進(jìn)度一次,可通過 persistConsumerOffsetInterval 進(jìn)行設(shè)置。
// org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
對廣播模式的消息消費(fèi)進(jìn)度進(jìn)行存儲、更新、持久化還是比較容易的,本文就簡單介紹到這里,接下來重點(diǎn)分析集群模式下的消息進(jìn)度管理。
6.3.2 集群模式消費(fèi)進(jìn)度存儲
集群模式消息進(jìn)度存儲文件存放在消息服務(wù)端。消息消費(fèi)進(jìn)度集群模式實(shí)現(xiàn)類 RemoteBrokerOffsetStore。
集群模式下消息消費(fèi)進(jìn)度的讀取、持久化與廣播模式的實(shí)現(xiàn)細(xì)節(jié)差不多,如果是集群消費(fèi)模式,當(dāng) RebalanceService 將 MessageQueue 負(fù)載到當(dāng)前客戶端時(shí),會調(diào)用 org.apache.rocketmq.client.impl.consumer.RebalancePushImpl#computePullFromWhere 方法從broker中獲取當(dāng)前 MessageQueue 的消費(fèi)進(jìn)度:
// org.apache.rocketmq.client.impl.consumer.RebalancePushImpl#computePullFromWhere
@Override
public long computePullFromWhere(MessageQueue mq) {
long result = -1;
final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
switch (consumeFromWhere) {
case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
case CONSUME_FROM_MIN_OFFSET:
case CONSUME_FROM_MAX_OFFSET:
case CONSUME_FROM_LAST_OFFSET: {
// 集群模式下獲取broker中的消費(fèi)進(jìn)度、廣播模式下從本地讀取消費(fèi)進(jìn)度
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
}
// First start,no offset
else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
result = 0L;
} else {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
break;
}
....
}
而該方法內(nèi)會調(diào)用 RemoteBrokerOffsetStore#readOffset 方法從broker獲取消費(fèi)進(jìn)度:
// org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#readOffset
@Override
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
if (mq != null) {
switch (type) {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY: {
AtomicLong offset = this.offsetTable.get(mq);
if (offset != null) {
return offset.get();
} else if (ReadOffsetType.READ_FROM_MEMORY == type) {
return -1;
}
}
case READ_FROM_STORE: {
try {
// 從broker獲取指定MessageQueue當(dāng)前消費(fèi)組的消費(fèi)進(jìn)度
long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
AtomicLong offset = new AtomicLong(brokerOffset);
// 更新內(nèi)存中的消費(fèi)進(jìn)度
this.updateOffset(mq, offset.get(), false);
return brokerOffset;
}
// No offset in broker
catch (MQBrokerException e) {
return -1;
}
//Other exceptions
catch (Exception e) {
log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
return -2;
}
}
default:
break;
}
}
return -1;
}
RemoteBrokerOffsetStore 會把從broker查詢到的消費(fèi)進(jìn)度進(jìn)行緩存,其余時(shí)間客戶端查詢和更新消費(fèi)進(jìn)度都是基于內(nèi)存中的數(shù)據(jù),而消費(fèi)進(jìn)度的持久化同樣是通過定時(shí)任務(wù)(默認(rèn)每5s)調(diào)用 RemoteBrokerOffsetStore#persistAll 將內(nèi)存中的消費(fèi)進(jìn)度通過網(wǎng)絡(luò)請求寫入broker磁盤。
broker處理客戶端消費(fèi)進(jìn)度更新請求的處理類是 ConsumerManageProcessor#updateConsumerOffset:
// org.apache.rocketmq.broker.processor.ConsumerManageProcessor#updateConsumerOffset
private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
final UpdateConsumerOffsetRequestHeader requestHeader =
(UpdateConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
// 更新broker中的消費(fèi)進(jìn)度
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
更新的請求中,主要包含如下信息:
public class UpdateConsumerOffsetRequestHeader implements CommandCustomHeader {
/**
* 更新的消費(fèi)組
*/
@CFNotNull
private String consumerGroup;
/**
* 主題名稱
*/
@CFNotNull
private String topic;
/**
* 隊(duì)列ID
*/
@CFNotNull
private Integer queueId;
/**
* 更新的消費(fèi)進(jìn)度
*/
@CFNotNull
private Long commitOffset;
}
updateConsumerOffset 方法最終會調(diào)用下列方法更新broker內(nèi)存中的消費(fèi)進(jìn)度:
// org.apache.rocketmq.broker.offset.ConsumerOffse消費(fèi)進(jìn)度文件內(nèi)容tManager#commitOffset(java.lang.String, java.lang.String, int, long)
public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
final long offset) {
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
this.commitOffset(clientHost, key, queueId, offset);
}
private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
if (null == map) {
map = new ConcurrentHashMap<Integer, Long>(32);
map.put(queueId, offset);
this.offsetTable.put(key, map);
} else {
Long storeOffset = map.put(queueId, offset);
if (storeOffset != null && offset < storeOffset) {
log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
}
}
}
broker存儲消費(fèi)進(jìn)度的思路和客戶端類似,先將消費(fèi)進(jìn)度存儲在內(nèi)存中,然后通過JOB每隔10s調(diào)用 ConfigManager#persist() 刷盤,broker在啟動時(shí)會創(chuàng)建這個(gè)JOB:org.apache.rocketmq.broker.BrokerController#initialize()
// org.apache.rocketmq.broker.offset.ConsumerOffsetManager#persist
public synchronized void persist() {
String jsonString = this.encode(true);
if (jsonString != null) {
String fileName = this.configFilePath();
try {
MixAll.string2File(jsonString, fileName);
} catch (IOException e) {
log.error("persist file " + fileName + " exception", e);
}
}
}
消費(fèi)進(jìn)度文件存儲目錄在broker配置文件中配置:消費(fèi)進(jìn)度文件內(nèi)容

消費(fèi)進(jìn)度文件內(nèi)容:
{
"offsetTable":{
// Topic名稱@ConsumeGroup名稱 : 消費(fèi)進(jìn)度
"TopicTest@please_rename_unique_group_name_4":{
// queueId : 消費(fèi)進(jìn)度
0:1,
1:0,
2:0,
3:0
},
"TopicTest@TopicTest-Consumer":{0:335,1:335,2:333
},
"%RETRY%TopicTest-Consumer@TopicTest-Consumer":{0:0
},
"%RETRY%rocket-mq-consumer-demo@rocket-mq-consumer-demo":{0:2
},
"%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:0
},
"rocket-mq-topic@rocket-mq-consumer-demo":{0:0,1:0,2:1,3:1
}
}
}
RocketMQ 控制臺消費(fèi)進(jìn)度查詢:

小結(jié)
客戶端拉取消費(fèi)進(jìn)度與消費(fèi)進(jìn)度的持久化:
集群消費(fèi)模式,當(dāng) RebalanceService 將 MessageQueue 負(fù)載到當(dāng)前客戶端時(shí),會調(diào)用RebalancePushImpl#computePullFromWhere方法從broker中獲取當(dāng)前 MessageQueue 的消費(fèi)進(jìn)度 RemoteBrokerOffsetStore 會把從broker查詢到的消費(fèi)進(jìn)度進(jìn)行緩存,其余時(shí)間客戶端查詢和更新消費(fèi)進(jìn)度都是基于內(nèi)存中的數(shù)據(jù),而消費(fèi)進(jìn)度的持久化同樣是通過定時(shí)任務(wù)(默認(rèn)每5s)調(diào)用 RemoteBrokerOffsetStore#persistAll 將內(nèi)存中的消費(fèi)進(jìn)度通過網(wǎng)絡(luò)請求寫入broker磁盤。
客戶端何時(shí)更新內(nèi)存中的消費(fèi)進(jìn)度:
業(yè)務(wù)代碼消費(fèi) PullMessageService 拉取到的消息時(shí),不管是消費(fèi)成功還是消費(fèi)失敗,都會將消費(fèi)完的消息從 ProcessQueue 移除,并獲取移除后內(nèi)存中剩余消息的最小偏移量,并已該最小偏移量更新客戶端內(nèi)存中的消費(fèi)進(jìn)度。
為啥會使用內(nèi)存中剩余消息最小偏移量更新消費(fèi)進(jìn)度,這是因?yàn)椴l(fā)消費(fèi)模式下,不同消息的消費(fèi)完成無法保證順序。例如按照順序拉取到了4條消息 a,b,c,d,由于是并發(fā)消費(fèi),這四條消息可能被消費(fèi)者線程同時(shí)消費(fèi),假設(shè)消息d先消費(fèi)完成,此時(shí)更新消費(fèi)進(jìn)度,因?yàn)閍、b、c沒有消費(fèi)完成,不能將進(jìn)度更新為消息d的offset,而是將消息d從 ProcessQueue 中移除,移除后內(nèi)存只剩下 a、b、c 三條消息,此時(shí)會將消費(fèi)進(jìn)度更新為ProcessQueue 中最小的消息偏移量,也就是 a。
客戶端何時(shí)更新broker消費(fèi)進(jìn)度:
除了定時(shí)任務(wù)上報(bào)消費(fèi)進(jìn)度,消費(fèi)者client每次拉取消息時(shí)都會在請求頭中攜帶其本地的消費(fèi)進(jìn)度,broker收到拉取請求后會調(diào)用 ConsumerOffsetManager#commitOffset 方法更新broker的消費(fèi)進(jìn)度。
- 消費(fèi)者Client攜帶本地消費(fèi)進(jìn)度拉取消息的入口:
DefaultMQPushConsumerImpl#pullMessage
- broker處理拉取請求,并存儲消費(fèi)進(jìn)度的入口:
PullMessageProcessor#processRequest
broker存儲消費(fèi)進(jìn)度的思路和客戶端類似,先將消費(fèi)進(jìn)度存儲在內(nèi)存中,然后通過JOB每隔10s調(diào)用
ConfigManager#persist() 刷盤,broker在啟動時(shí)會創(chuàng)建這個(gè)JOB:BrokerController#initialize()。
七. 總結(jié)

該文章主要介紹了 RocketMQ 客戶端消息消費(fèi)的原理,包括消息消費(fèi)模型、消費(fèi)者啟動流程、消息拉取、消息隊(duì)列負(fù)載與重平衡以及消息的消費(fèi)過程等內(nèi)容,具體總結(jié)如下:
-
消息消費(fèi)模型:
- 兩種消費(fèi)模式:廣播模式與集群模式,廣播模式下每一個(gè)消費(fèi)者需要拉取訂閱主題下所有消費(fèi)隊(duì)列的消息,集群模式下同一個(gè)消費(fèi)組內(nèi)的多個(gè)消息消費(fèi)者共享消息主題下的所有消息。
- 并發(fā)消費(fèi)模型:RocketMQ 客戶端為每一個(gè)消費(fèi)組創(chuàng)建獨(dú)立的消費(fèi)線程池,單個(gè)消費(fèi)組內(nèi)的并發(fā)度為線程池線程個(gè)數(shù),線程池處理一批消息后會向 Broker 匯報(bào)消息消費(fèi)進(jìn)度。
- 消息消費(fèi)進(jìn)度反饋機(jī)制:消費(fèi)線程池在處理完一批消息后,會將消息消費(fèi)進(jìn)度存儲在本地內(nèi)存中,客戶端會啟動一個(gè)定時(shí)線程,每 5s 將存儲在本地內(nèi)存中的所有隊(duì)列消息消費(fèi)偏移量提交到 Broker 中,Broker 收到的消息消費(fèi)進(jìn)度會存儲在內(nèi)存中,每隔 5s 將消息消費(fèi)偏移量持久化到磁盤文件中。
-
消費(fèi)者啟動流程:
- 構(gòu)建主題訂閱信息:將主題訂閱信息構(gòu)建為 SubscriptionData 并加入 RebalanceImpl 的訂閱消息中。
- 初始化相關(guān)組件:初始化 MQClientInstance、RebalanceImple 等,初始化消息進(jìn)度,根據(jù)消費(fèi)模式選擇不同的消息進(jìn)度存儲方式。
- 創(chuàng)建消費(fèi)線程服務(wù):根據(jù)消費(fèi)模式創(chuàng)建相應(yīng)的消費(fèi)線程服務(wù),如順序消費(fèi)創(chuàng)建 ConsumeMessageOrderlyService,并發(fā)消費(fèi)創(chuàng)建 ConsumeMessageConcurrentlyService。
- 向 MQClientInstance 注冊消費(fèi)者并啟動:向 MQClientInstance 注冊消費(fèi)者,啟動 MQClientInstance。
-
消息拉?。?/p>
- PullMessageService 實(shí)現(xiàn)機(jī)制:PullMessageService 繼承 ServiceThread,通過 run () 方法從 pullRequestQueue 中獲取 PullRequest 消息拉取任務(wù),然后調(diào)用 pullMessage 方法進(jìn)行消息拉取。
- ProcessQueue 實(shí)現(xiàn)機(jī)制:ProcessQueue 是 MessageQueue 在消費(fèi)端的重現(xiàn)、快照,用于存儲從 Broker 拉取的消息,PullMessageService 將消息提交到消費(fèi)者消費(fèi)線程池,消息成功消費(fèi)后,再從 ProcessQueue 中移除。
- 消息拉取的基本流程:
- 客戶端封裝消息拉取請求:包括獲取 ProcessQueue、進(jìn)行消息拉取流控、獲取主題訂閱信息、構(gòu)建消息拉取系統(tǒng)標(biāo)記、調(diào)用 PullAPIWrapper.pullKernelImpl 方法與服務(wù)端交互等步驟。
- Broker 組裝消息:根據(jù)訂閱信息構(gòu)建消息過濾器,調(diào)用 MessageStore.getMessage 查找消息,根據(jù)主題名稱與隊(duì)列編號獲取消息消費(fèi)隊(duì)列,校對下一次拉取偏移量,組裝消息并返回。
- 消息拉取客戶端處理消息:根據(jù)響應(yīng)結(jié)果解碼成 PullResultExt 對象,對消息進(jìn)行過濾,將消息存入 processQueue,然后將拉取到的消息提交到 ConsumeMessageService 中供消費(fèi)者消費(fèi)。
- 消息拉取長輪詢機(jī)制分析:RocketMQ 通過在 Broker 端配置 longPollingEnable 為 true 來開啟長輪詢模式,Broker 在處理消息拉取請求時(shí),如果消息未找到且 brokerAllowSuspend 為 true 且開啟了長輪詢,會設(shè)置掛起超時(shí)時(shí)間,創(chuàng)建 PullRequest 并提交到 PullRequestHoldService 線程中,PullRequestHoldService 線程每 5 秒掃描所有被 hold 住的長輪詢請求,檢查是否有新消息到達(dá)并返回,DefaultMessageStore#ReputMessageService 線程在將 CommitLog 消息轉(zhuǎn)發(fā)到 ConsumeQueue 文件時(shí),若 Broker 端開啟長輪詢且當(dāng)前節(jié)點(diǎn)為主節(jié)點(diǎn),則調(diào)用 PullRequestHoldService 的 notifyMessageArriving 方法喚醒掛起線程,判斷消費(fèi)隊(duì)列最大偏移量與待拉取偏移量關(guān)系,若前者大于后者則拉取消息。
-
消息隊(duì)列負(fù)載與重平衡:
- RebalanceService 線程實(shí)現(xiàn):默認(rèn)每隔 20s 執(zhí)行一次 doRebalance 方法,遍歷所有注冊的消費(fèi)者信息,執(zhí)行對應(yīng)消費(fèi)組的 doRebalance 方法,對每個(gè)主題的隊(duì)列進(jìn)行重新負(fù)載。
- RebalanceImpl.rebalanceByTopic 實(shí)現(xiàn):從主題訂閱信息緩存表中獲取主題的隊(duì)列信息,從 Broker 中獲取消費(fèi)組內(nèi)當(dāng)前所有消費(fèi)者客戶端 ID,對消費(fèi)者 ID 和隊(duì)列進(jìn)行排序,調(diào)用 AllocateMessageQueueStrategy 隊(duì)列負(fù)載策略進(jìn)行隊(duì)列負(fù)載,對比消息隊(duì)列是否發(fā)生變化,暫停不屬于當(dāng)前消費(fèi)者的隊(duì)列消息消費(fèi)并保存消費(fèi)進(jìn)度,創(chuàng)建新分配隊(duì)列的 PullRequest 并添加到 PullMessageService 線程任務(wù)隊(duì)列。
-
消息的消費(fèi)過程:
- 消息消費(fèi):消費(fèi)者消息消費(fèi)服務(wù) ConsumeMessageConcurrentlyService 的主要方法是 submitConsumeRequest 提交消費(fèi)請求,根據(jù)消息數(shù)量和 consumeMessageBatchMaxSize 進(jìn)行處理,將消息放入 ConsumeRequest 并提交到消費(fèi)線程池。進(jìn)入具體消費(fèi)隊(duì)列時(shí),會檢查 processQueue 的 dropped 狀態(tài),執(zhí)行消息消費(fèi)鉤子函數(shù),執(zhí)行業(yè)務(wù)代碼消費(fèi)消息,根據(jù)消費(fèi)結(jié)果計(jì)算 ackIndex,處理消息消費(fèi)結(jié)果,從 ProcessQueue 中移除已確認(rèn)消息并更新消費(fèi)進(jìn)度。
- 消息消費(fèi)失敗重試機(jī)制:如果消息監(jiān)聽器返回的消費(fèi)結(jié)果為 RECONSUME_LATER,則將消息發(fā)送給 Broker 的重試 topic 中??蛻舳艘酝椒绞桨l(fā)送請求到服務(wù)端,服務(wù)端創(chuàng)建重試主題,根據(jù)消息物理偏移量獲取消息,設(shè)置消息重試次數(shù)和延時(shí)級別,創(chuàng)建新的消息對象并存入 CommitLog 文件。
-
消費(fèi)進(jìn)度管理:
- 廣播模式消費(fèi)進(jìn)度存儲:廣播模式消息消費(fèi)進(jìn)度存儲在消費(fèi)者本地,實(shí)現(xiàn)類為 LocalFileOffsetStore,通過 load 方法從磁盤加載消息消費(fèi)進(jìn)度,persistAll 方法將內(nèi)存中的消費(fèi)進(jìn)度持久化到磁盤中。
- 集群模式消費(fèi)進(jìn)度存儲:集群模式消息進(jìn)度存儲文件存放在消息服務(wù)端,實(shí)現(xiàn)類為 RemoteBrokerOffsetStore,從 broker 獲取指定 MessageQueue 當(dāng)前消費(fèi)組的消費(fèi)進(jìn)度,并更新內(nèi)存中的消費(fèi)進(jìn)度。

浙公網(wǎng)安備 33010602011771號