RocketMQ閱讀源碼前的準備
本文將講解如何在IDEA中導入 RocketMQ 源碼,并運行 Broker 和 NameServer,編寫一個消息發送與消息消費的示例。
一. 源碼導入及調試
1.1 導入源碼
RocketMQ 原先是阿里巴巴集團內部的消息中間件,于2016年提交至Apache基金會孵化,并最終成為Apache頂級項目。
第一步:從GitHub上clone RocketMQ 源碼:RocketMQ。點擊 File->Open 導入項目。

后面關于RocketMQ的源碼分析若無特殊說明,將默認使用 4.6.0 版本
第二步:構建項目
在項目根目錄下執行下列命令
mvn clean install -Dmaven.test.skip=true
1.2 啟動 NameServer
第一步:展開 namesrv 模塊,鼠標右鍵選中 NamesrvStartup.java,將其拖拽到Debug As,選中 Debug ‘NamesrvStartup.java.main()’,彈出下圖:


第二步:單擊 Environment variables 后面的按鈕,彈出配置界面,配置如下環境變量:
ROCKETMQ_HOME=E:\software-engineer\java\open-source\rocketmq-home
點擊 OK->Apply 保存即可。
第三步:在剛剛指定的文件夾內創建conf、logs、store文件夾。

第四步:從 distribution 部署目錄中將 broker.conf、logback_broker.xml、logback_namesrv.xml 等文件復制到 conf 目錄中,按需修改 broker.conf
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
#nameServer地址
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
#存儲路徑
storePathRootDir=E:\software-engineer\java\open-source\rocketmq-home\store
#CommitLog存儲路徑
storePatchCommitLog=E:\software-engineer\java\open-source\rocketmq-home\store\commitlog
#消費隊列存儲路徑
storePathConsumeQueue=E:\software-engineer\java\open-source\rocketmq-home\store\consumequeue
#消息索引存儲路徑
storePathIndex=E:\software-engineer\java\open-source\rocketmq-home\store\index
#checkpoint 文件存儲路徑
storeCheckpoint=E:\software-engineer\java\open-source\rocketmq-home\store\checkpoint
#abort 文件存儲路徑
abortFile=E:\software-engineer\java\open-source\rocketmq-home\store\abort
第五步:點擊 Debug 運行 NamesrvStartup.java,并輸出“The Name Server boot success. serializeType=JSON”

1.3 啟動 Broker
第一步:展開 broker 模塊,啟動 BrokerStartup.java ,會提示需要配置 ROCKETMQ_HOME 環境變量。與 NameServer啟動時一樣,配置環境變量即可:

第二步:配置-c參數,指定broker.conf 配置文件地址:

第三步:以 Debug 模式啟動 BrokerStartup.java,查看${ROCKET_HOME}/logs/broker.log 文件,未報錯則表示啟動成功:


1.4 代碼測試
1.4.1 生產者
第一步:修改org.apache.rocketmq.example.quickstart.Producer 示例程序,設置 RocketMQ NameServer地址:
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i++) {
try {
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
第二步:運行該方法發送消息

1.4.2 消費者
第一步:修改 org.apache.rocketmq.example.quickstart.Consumer 示例程序,設置RocketMQ NameServer地址
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
第二步:運行該方法,消費消息

二. RocketMQ 源碼結構
RocketMQ源碼組織方式基于 Maven 模塊組織,RocketMQ核心目錄說明如下:
- acl:權限控制模塊
- broker:broker模塊(broker啟動進程)
- client:消息客戶端,包含消息生產者和消費者
- common:公共包
- dev:開發者信息(非源碼)
- distribution:打包分發目錄(非源碼)
- example:示例代碼
- filter:消息過濾相關基礎類
- logappender:日志實現相關類
- logging:自主實現日志相關類
- namesrv:NameServer實現相關類(NameServer啟動進程)
- openmessaging:消息開放標準,已發布
- remoting:遠程通信模塊,基于Netty
- srvutil:服務器工具類
- store:消息存儲相關實現類
- style:checkstyle相關實現
- test:測試相關類
- tools:工具類,監控命令相關實現類。
三. Rocket設計理念和設計目標
3.1 設計理念
RocketMQ 設計基于主題的發布與訂閱模式,其核心功能包括消息發送、消息存儲和消息消費,整體設計追求簡單和性能高效,主要體現在一下三個方面:
- 首先,NameServer設計極其簡單,摒棄了業界常用的使用Zookeeper充當信息管理的“注冊中心”,而是自研NameServer來實現元數據的管理(Topic路由信息等)。從實際需求出發,因為Topic路由信息無須在集群之間保持強一致,追求最終一致性,并且能容忍分鐘級的不一致。正是基于此種情況,RocketMQ的NameServer集群之間互不通信,極大地降低了NameServer實現的復雜程度,對網絡的要求也降低了不少,但是性能相比較Zookeeper有了極大的提升。
- 其次是高效的IO存儲機制。RocketMQ追求消息發送的高吞吐量,RocketMQ的消息存儲文件設計成文件組的概念,組內單個文件大小固定,方便引入內存映射機制,所有主題的消息存儲基于順序寫,極大地提供了消息寫性能,同時為了兼顧消息消費與消息查找,引入了消息消費隊列文件與索引文件。
- 最后是容忍存在設計缺陷,適當將某些工作下放給RocketMQ使用者。消息中間件的實現者經常會遇到一個難題:如何保證消息一定能被消息消費者消費,并且保證只消費一次。RocketMQ的設計者給出的解決辦法是不解決這個難題,而是退而求其次,只保證消息被消費者消費,但設計上允許消息被重復消費,這樣極大地簡化了消息中間件的內核,使得實現消息發送高可用變得非常簡單與高效,消息重復問題由消費者在消息消費時實現冪等。
3.2 設計目標
RocketMQ作為一款消息中間件,需要解決如下問題:
-
架構模式 RocketMQ與大部分消息中間件一樣,采用發布訂閱模式,基本的參與組件主要包括:消息發送者、消息服務器(消息存儲)、消息消費、路由發現。
-
順序消息 所謂順序消息,就是消息消費者按照消息達到消息存儲服務器的順序消費。RocketMQ可以嚴格保證消息有序。
-
消息過濾 消息過濾是指在消息消費時,消息消費者可以對同一主題下的消息按照規則只消費自己感興趣的消息。RocketMQ消息過濾支持在服務端與消費端的消息過濾機制。
- 消息在Broker端過濾。Broker只將消息消費者感興趣的消息發送給消息消費者。
- 消息在消息消費端過濾,消息過濾方式完全由消息消費者自定義,但缺點是有很多無用的消息會從Broker傳輸到消費端。
-
消息存儲 消息中間件的一個核心實現是消息的存儲,對消息存儲一般有如下兩個維度的考量:消息堆積能力和消息存儲性能。RocketMQ追求消息存儲的高性能,引入內存映射機制,所有主題的消息順序存儲在同一個文件中。同時為了避免消息無限在消息存儲服務器中累積,引入了消息文件過期機制與文件存儲空間報警機制。
-
消息高可用性 通常影響消息可靠性的有以下幾種情況。
- Broker正常關機。
- Broker異常Crash。
- OS Crash。
- 機器斷電,但是能立即恢復供電情況。
- 機器無法開機(可能是CPU、主板、內存等關鍵設備損壞)。
- 磁盤設備損壞。
針對上述情況,情況14的RocketMQ在同步刷盤機制下可以確保不丟失消息,在異步刷盤模式下,會丟失少量消息。情況56屬于單點故障,一旦發生,該節點上的消息全部丟失,如果開啟了異步復制機制,RoketMQ能保證只丟失少量消息,RocketMQ在后續版本中將引入雙寫機制,以滿足消息可靠性要求極高的場合。
-
消息到達(消費)低延遲 RocketMQ在消息不發生消息堆積時,以長輪詢模式實現準實時的消息推送模式。
-
確保消息必須被消費一次 RocketMQ 通過消息消費確認機制(ACK)來確保消息至少被消費一次(least once),但由于ACK消息有可能丟失等其他原因,RocketMQ無法做到消息只被消費一次,有重復消費的可能。
-
回溯消息 回溯消息是指消息消費端已經消費成功的消息,由于業務要求需要重新消費消息。RocketMQ支持按時間回溯消息,時間維度可精確到毫秒,可以向前或向后回溯。
-
消息堆積 消息中間件的主要功能是異步解耦,必須具備應對前端的數據洪峰,提高后端系統的可用性,必然要求消息中間件具備一定的消息堆積能力。RocketMQ消息存儲使用磁盤文件(內存映射機制),并且在物理布局上為多個大小相等的文件組成邏輯文件組,可以無限循環使用。RocketMQ消息存儲文件并不是永久存儲在消息服務器端,而是提供了過期機制,默認保留3天。
-
定時消息 定時消息是指消息發送到Broker后,不能被消息消費端立即消費,要到特定的時間點或者等待特定的時間后才能被消費。如果要支持任意精度的定時消息消費,必須在消息服務端對消息進行排序,勢必帶來很大的性能損耗,故RocketMQ不支持任意進度的定時消息,而只支持特定延遲級別。
-
消息重試機制 消息重試是指消息在消費時,如果發送異常,消息中間件需要支持消息重新投遞,RocketMQ支持消息重試機制。

浙公網安備 33010602011771號