消息隊列上篇--基礎
消息隊列--上篇--入門
本文示例代碼見GITEE
地址:https://gitee.com/quercus-sp204/qmm-study/tree/master/reslove-mq 【reslove-mq模塊部分】
0.消息隊列
消息隊列(Message Queue)是一種中間件組件,用于在不同應用或服務之間以異步方式傳遞數據包或“消息”,并在接收方準備好之前將其暫存于隊列中,保證數據不丟失并按順序處理,常常在分布式系統或微服務架構中,實現應用組件之間松耦合且可靠的數據傳遞。其核心價值在于解耦生產者與消費者、提高系統的可靠性、可伸縮性和彈性,并在高并發或網絡不穩定情況下保證消息不丟失。常見中間件包括 RabbitMQ、Apache Kafka、RocketMQ,本文就以RocketMQ5.3.2為主體,來探索一下消息隊列。
微服務架構中:各微服務通過消息隊列解耦,實現異步任務、事件驅動(Event-Driven)和領域事件通知;
電商系統中:用戶下單后需要發送短信通知、更新積分、記錄日志等非核心操作。將這些操作放入消息隊列,主線程無需等待子任務完成即可立即響應,顯著提升系統處理速度。還比如,電商下單流程中,訂單創建成功后通過消息隊列異步處理庫存更新、支付通知等,縮短用戶等待時間。還比如,當秒殺活動、促銷期間短時間內大量請求涌入時,消息隊列作為 “緩沖池”,將突發流量存入隊列,接收方按自身處理能力消費消息,避免系統因瞬間壓力過大而崩潰。
在日志收集與分析系統中:各服務日志寫入隊列,由專門的日志處理組件異步消費并持久化到 ELK 等系統中;
在物聯網(IoT)設備通信中:大量終端設備產生數據,通過消息隊列匯聚后再統一處理和存儲,有效緩解突發流量;
首先,假設我們不知道有上述的mq產品,如果說我們想要自己實現一個消息中間件,我們需要考慮這個消息中間件模型呢?

市面上主要的消息中間件,大致模型都是如上圖所示:
- 消息的產生【生產者】:他們負責產生消息,給我們設計的消息中間件;
- MQ層:它可以接口生產者發送過來的消息,存到“內存”/“磁盤”中,(這里為什么要存到磁盤中?因為如果重啟的話,內存數據會丟失,以此來達到持久化的目的),然后根據某種關系可以把消息主動推送給想要這個消息的對象;
- 消費者層:它與我們設計的消息中間件建立某種意義上的聯系,可以主動拉取/被動推送 自己感興趣的消息。
一個良好的消息隊列需要具備哪些好的設計呢?
1.消息隊列中最重要的肯定就是“消息”了,可靠消息傳遞,保證數據不丟失這個是至關重要的,消息隊列中間件支持持久化存儲、重試機制、確認機制(ACK)等,確保消息被可靠處理; 2.能夠支持集群,保證高可用;
那么,馬上就來到了喜聞樂見的比較環節:【筆者只用過第一二兩款消息中間件】
| 中間件 | 協議支持 | 吞吐能力 | 持久化方式 | 消息排序 | 可伸縮性 | 典型應用場景 |
|---|---|---|---|---|---|---|
| RocketMQ | 自定義協議,支持 JMS、MQTT、HTTP REST | 高(優化后數十萬–百萬級?TPS) | 高性能 Journal(LevelDB、KahaDB)、JDBC | 支持獨占隊列/順序消費 | 分布式高可用集群,多主題分區 | 大規模電商事件、事務消息、金融級消息中間件 |
| RabbitMQ | AMQP、MQTT、STOMP、HTTP,Streams 插件 | 中等(數萬–數十萬?msg/s) | 可選磁盤持久化(持久隊列) | 隊列級順序保障 | 鏡像隊列集群,水平擴展較有限 | 企業級消息、分布式任務、IoT 消息傳遞 |
| Kafka | Kafka 原生二進制協議 | 高(可達百萬級?msg/s) | 持久化日志(段文件、可配置保留策略) | 分區內順序保障 | 基于分區的水平擴展,可橫向無限擴容 | 流處理、日志聚合、實時分析 |
| ActiveMQ | OpenWire、AMQP、MQTT、STOMP、JMS | 中等(數十萬?msg/s) | KahaDB、LevelDB、Artemis 日志、JDBC | 隊列級順序 + JMS 優先級/獨占消費 | Broker 網絡集群,擴展性次于 Kafka | Java EE 集成、企業應用消息中間件 |
RocketMQ的搭建與部署本文就不說明了,在官網可以看到詳細步驟:https://rocketmq.apache.org/zh/docs/quickStart/01quickstart
部署看這個:https://rocketmq.apache.org/zh/docs/deploymentOperations/01deploy
但是由于照著官網來的教程都有問題:筆者在此寫一些重要的部署內容“,【本文是以虛擬機Local本地部署的,單組節點單副本模式】
第一,下載rocketmq然后上傳到服務器我這里就不嚴實了。給出主要啟動命令【本文以版本5.3.2為例子】
##=========================mq bin/
1.啟動NameServer:
# 后臺啟動----------------------------------
nohup sh mqnamesrv > myNamesrv.out 2>&1 &
2.啟動Broker+proxy:
# 后臺啟動---------------------------------
nohup sh mqbroker -n localhost:9876 --enable-proxy > mybroker.out 2>&1 &
# 指定配置文件、
nohup sh mqbroker -n localhost:9876 -c /usr/dev-env/rocketmq-all-5.3.3/conf/broker.conf --enable-proxy > mybroker.out 2>&1 &
##=========================console--這個是控制臺項目打包的jar
# idea打成jar包之后,上傳到機器上面即可,然后在該jar包執行命令運行jar包
3.啟動console-dashboard
nohup java -jar dashboard-rocketmq-2.0.0.jar > log.out 2>&1 &
sh mqshutdown namesrv //關閉NameServer命令
sh mqshutdown broker //關閉Broker命令
firewall-cmd --zone=public --add-port=8081/tcp --permanent #記得開放8081端口
#記得開放8090端口,筆者的控制臺jar包時8090端口,所以開放8090,方便查看控制臺
firewall-cmd --zone=public --add-port=8090/tcp --permanent
firewall-cmd --zone=public --add-port=9876/tcp --permanent
firewall-cmd --reload
# 下面是多余的,不用看了
firewall-cmd --zone=public --add-port=8080/tcp --permanent
firewall-cmd --zone=public --add-port=9876/tcp --permanent
firewall-cmd --zone=public --add-port=10911/tcp --permanent
firewall-cmd --zone=public --add-port=11011/tcp --permanent
firewall-cmd --zone=public --add-port=10909/tcp --permanent
注意點: dashboard控制臺項目記得把server.port換成8090,其實只要是不與后面倆端口沖突就行了,不要8080/8081, 8080/8081留給proxy這個進程如下圖:

第二,注意官網這一句話,Client連接的時候,是Proxy的,筆者經過第一步里面的lsof命令,找到筆者機器的proxy是8081端口。

第三,控制臺界面要切換到5.x版本

1.RocketMQ相關介紹
大部分摘選自官網
RocketMQ的領域模型:(來自官網)

可以看到RocketMQ的模型和生產者-隊列-消費者模型類似。只不過RocketMQ中間那一層是叫做Topic的。生產者這一端沒有什么好說的,主要來看一下中間消息存儲端和消費者端。
- 消息存儲:[ MQ里面當然是允許多個Topic存在的 ]
Apache RocketMQ 消息傳輸和存儲的分組容器,主題內部由多個隊列組成,消息的存儲和水平擴展實際是通過主題內的隊列實現的。
Apache RocketMQ 消息傳輸和存儲的實際單元容器,類比于其他消息隊列中的分區。 Apache RocketMQ 通過流式特性的無限隊列結構來存儲消息,消息在隊列內具備順序性存儲特征。
Apache RocketMQ 的最小傳輸單元。消息具備不可變性,在初始化發送和完成存儲后即不可變。
- 消費者
Apache RocketMQ 發布訂閱模型中定義的獨立的消費身份分組,用于統一管理底層運行的多個消費者(Consumer)。同一個消費組的多個消費者必須保持消費邏輯和配置一致,共同分擔該消費組訂閱的消息,實現消費能力的水平擴展。
Apache RocketMQ 消費消息的運行實體,一般集成在業務調用鏈路的下游。消費者必須被指定到某一個消費組中。
Apache RocketMQ 發布訂閱模型中消息過濾、重試、消費進度的規則配置。訂閱關系以消費組粒度進行管理,消費組通過定義訂閱關系控制指定消費組下的消費者如何實現消息過濾、消費重試及消費進度恢復等。
Apache RocketMQ 的訂閱關系除過濾表達式之外都是持久化的,即服務端重啟或請求斷開,訂閱關系依然保留。
消息的傳輸模型:
點對點模型:
- 消費匿名:消息上下游溝通的唯一的身份就是隊列,下游消費者從隊列獲取消息無法申明獨立身份。
- 一對一通信:基于消費匿名特點,下游消費者即使有多個,但都沒有自己獨立的身份,因此共享隊列中的消息,每一條消息都只會被唯一一個消費者處理。因此點對點模型只能實現一對一通信。
發布訂閱模型:
- 消費獨立:相比隊列模型的匿名消費方式,發布訂閱模型中消費方都會具備的身份,一般叫做訂閱組(訂閱關系),不同訂閱組之間相互獨立不會相互影響。
- 一對多通信:基于獨立身份的設計,同一個主題內的消息可以被多個訂閱組處理,每個訂閱組都可以拿到全量消息。因此發布訂閱模型可以實現一對多通信。
RocketMQ就是采用的發布訂閱模型。
2.基本功能
①消息發送
- 普通消息:創建主題的時候,Topic的消息類型要是普通消息【message.type=NORMAL】
發送消息時,建議設置業務上唯一的信息作為索引,方便后續快速定位消息。例如,訂單ID,用戶ID等。

public static void sendSyncMessage() throws ClientException, IOException {
// 接入點地址,需要設置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。
String endpoint = "192.168.110.128:8081";
// 消息發送的目標Topic名稱,需要提前創建。
String topic = Topic.DEMO_TOPIC;
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint)
.enableSsl(false);
ClientConfiguration configuration = builder.build();
// 初始化Producer時需要設置通信配置以及預綁定的Topic。
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
// 普通消息發送。
Message message = provider.newMessageBuilder()
.setTopic(topic)
// 設置消息索引鍵,可根據關鍵字精確查找某條消息。
.setKeys("messageKey")
// 設置消息Tag,用于消費端根據指定Tag過濾消息。
.setTag("messageTag")
// 消息體。
.setBody("我是消息體".getBytes())
.build();
try {
// 發送消息,需要關注發送結果,并捕獲失敗等異常。
SendReceipt sendReceipt = producer.send(message);
System.out.println("Send message successfully, messageId=: "+ sendReceipt.getMessageId());
} catch (ClientException e) {
System.out.println("Failed to send message" + e);;
}
producer.close();
}
- 定時/延時消息:同理,注意Topic的消息類型。定時消息是 Apache RocketMQ 提供的一種高級消息類型,消息被發送至服務端后,在指定時間后才能被消費者消費。通過設置一定的定時時間可以實現分布式場景的延時調度觸發效果。
典型場景:分布式定時調度,任務超時處理
public static void sendDelayMessage() throws ClientException, IOException {
// 接入點地址,需要設置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。
String endpoint = "192.168.110.128:8081";
// 消息發送的目標Topic名稱,需要提前創建。
String topic = Topic.DELAY_TOPIC;
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint)
.enableSsl(false);
ClientConfiguration configuration = builder.build();
// 初始化Producer時需要設置通信配置以及預綁定的Topic。
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
// 普通消息發送。
//以下示例表示:延遲時間為1分鐘之后的Unix時間戳。
long deliverTimeStamp = System.currentTimeMillis() + 60 * 1000;
Message message = provider.newMessageBuilder()
.setTopic(topic)
// 設置消息索引鍵,可根據關鍵字精確查找某條消息。
.setKeys("delay-messageKey")
// 設置消息Tag,用于消費端根據指定Tag過濾消息。
.setTag("delay-messageTag")
.setDeliveryTimestamp(deliverTimeStamp)
// 消息體。
.setBody("我是延時消息消息體".getBytes())
.build();
try {
// 發送消息,需要關注發送結果,并捕獲失敗等異常。
SendReceipt sendReceipt = producer.send(message);
System.out.println("Send delay message ok, messageId=: "+ sendReceipt.getMessageId());
} catch (ClientException e) {
System.out.println("Failed to send message" + e);;
}
producer.close();
}
- 順序消息:順序消息是指在消息隊列中,消息按照生產者發送的順序被消費者接收和處理的一種消息類型。其核心目的是確保消息在邏輯關聯性強的場景下,嚴格按照業務預期的順序執行,避免因并發或異步處理導致的數據不一致問題。【和普通消息發送相比,順序消息發送必須要設置消息組。】
普通消息消費者處理的順序可能并不相同,以普通消息為例子:
// 發送10條消息
for (int i = 0; i < 10; i++) {
// 普通消息發送。
Message message = provider.newMessageBuilder()
.setTopic(topic)
// 設置消息索引鍵,可根據關鍵字精確查找某條消息。
.setKeys("messageKey" + i)
// 設置消息Tag,用于消費端根據指定Tag過濾消息。
.setTag("messageTag" + i )
// 消息體。
.setBody(("我是消息體 " + i).getBytes())
.build();
SendReceipt sendReceipt = producer.send(message);
System.out.println("Send message successfully, messageId=: "+ sendReceipt.getMessageId());
}
// 然后消費者來消費消息:
消費者1 【普通消息】 successfully, messageId=0100FF10576F2983B0084061DF00000000【】我是消息體 0
消費者1 【普通消息】 successfully, messageId=0100FF10576F2983B0084061DF00000002【】我是消息體 2
消費者1 【普通消息】 successfully, messageId=0100FF10576F2983B0084061DF00000004【】我是消息體 4
消費者1 【普通消息】 successfully, messageId=0100FF10576F2983B0084061DF00000007【】我是消息體 7
消費者1 【普通消息】 successfully, messageId=0100FF10576F2983B0084061DF00000005【】我是消息體 5
消費者1 【普通消息】 successfully, messageId=0100FF10576F2983B0084061DF00000008【】我是消息體 8
消費者1 【普通消息】 successfully, messageId=0100FF10576F2983B0084061DF00000009【】我是消息體 9
消費者1 【普通消息】 successfully, messageId=0100FF10576F2983B0084061DF00000001【】我是消息體 1
消費者1 【普通消息】 successfully, messageId=0100FF10576F2983B0084061DF00000006【】我是消息體 6
消費者1 【普通消息】 successfully, messageId=0100FF10576F2983B0084061DF00000003【】我是消息體 3
可以看到消費者,消費消息的順序是亂序的。在有些場景里面,以證券、股票交易撮合場景為例,對于出價相同的交易單,堅持按照先出價先交易的原則,下游處理訂單的系統需要嚴格按照出價順序來處理訂單。
第二個場景,以數據庫變更增量同步場景為例,上游源端數據庫按需執行增刪改操作,將二進制操作日志作為消息,通過 Apache RocketMQ 傳輸到下游搜索系統,下游系統按順序還原消息數據,實現狀態數據按序刷新。如果是普通消息則可能會導致狀態混亂,和預期操作結果不符,基于順序消息可以實現下游狀態和上游操作結果一致。
示例:
public static void sendOrderMessage() throws Exception {
// 接入點地址,需要設置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。
String endpoint = "192.168.110.128:8081";
// 消息發送的目標Topic名稱,需要提前創建。
String topic = Topic.ORDER_TOPIC;
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint)
.enableSsl(false);
ClientConfiguration configuration = builder.build();
// 初始化Producer時需要設置通信配置以及預綁定的Topic。
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
try {
// 發送消息,需要關注發送結果,并捕獲失敗等異常。
for (int i = 0; i < 10; i++) {
// 普通消息發送。
Message message = provider.newMessageBuilder()
.setTopic(topic)
// 設置消息索引鍵,可根據關鍵字精確查找某條消息。
.setKeys("order-messageKey" + i)
// 設置消息Tag,用于消費端根據指定Tag過濾消息。
.setTag("order-messageTag" + i )
//設置順序消息的排序分組,該分組盡量保持離散,避免熱點排序分組。
.setMessageGroup("fifoGroup001") //v【和普通消息發送相比,順序消息發送必須要設置消息組】
// 消息體。
.setBody(("我是順序消息體 " + i).getBytes())
.build();
SendReceipt sendReceipt = producer.send(message);
System.out.println("Send message successfully, messageId=: "+ sendReceipt.getMessageId());
}
} catch (ClientException e) {
System.out.println("Failed to send message" + e);;
}
producer.close();
}
// 消費者
public static void simpleOrderConsumer() throws ClientException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 接入點地址,需要設置成Proxy的地址和端口列表,一般是xxx:8081。
String endpoints = "192.168.110.128:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
// 訂閱消息的過濾規則,表示訂閱所有Tag的消息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// 為消費者指定所屬的消費者分組,Group需要提前創建。
String consumerGroup = "Order-Consumer-Group";
// 指定需要訂閱哪個目標Topic,Topic需要提前創建。
String topic = Topic.ORDER_TOPIC;
// 初始化PushConsumer,需要綁定消費者分組ConsumerGroup、通信參數以及訂閱關系。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// 設置消費者分組。
.setConsumerGroup(consumerGroup)
// 設置預綁定的訂閱關系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 設置消費監聽器。
.setMessageListener(messageView -> {
// 處理消息并返回消費結果。
ByteBuffer body = messageView.getBody();
int remaining = body.remaining();
byte[] content = new byte[remaining];
body.get(content);
String str = new String(content);
System.out.println("消費者1 【順序消息】 successfully, messageId=" + messageView.getMessageId() + "【】" +str);
return ConsumeResult.SUCCESS;
})
.build();
try {
Thread.sleep(30_000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
// 如果不需要再使用 PushConsumer,可關閉該實例。
pushConsumer.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
消費者1 【順序消息】 successfully, messageId=0100FF10576F2973F40840624B00000000【】我是順序消息體 0
消費者1 【順序消息】 successfully, messageId=0100FF10576F2973F40840624B00000001【】我是順序消息體 1
消費者1 【順序消息】 successfully, messageId=0100FF10576F2973F40840624B00000002【】我是順序消息體 2
消費者1 【順序消息】 successfully, messageId=0100FF10576F2973F40840624B00000003【】我是順序消息體 3
消費者1 【順序消息】 successfully, messageId=0100FF10576F2973F40840624B00000004【】我是順序消息體 4
消費者1 【順序消息】 successfully, messageId=0100FF10576F2973F40840624B00000005【】我是順序消息體 5
消費者1 【順序消息】 successfully, messageId=0100FF10576F2973F40840624B00000006【】我是順序消息體 6
消費者1 【順序消息】 successfully, messageId=0100FF10576F2973F40840624B00000007【】我是順序消息體 7
消費者1 【順序消息】 successfully, messageId=0100FF10576F2973F40840624B00000008【】我是順序消息體 8
消費者1 【順序消息】 successfully, messageId=0100FF10576F2973F40840624B00000009【】我是順序消息體 9
可以看到,嚴格有序的。
順序消息原理:順序消息的順序關系通過消息組(MessageGroup)判定和識別,發送順序消息時需要為每條消息設置歸屬的消息組,相同消息組的多條消息之間遵循先進先出的順序關系,不同消息組、無消息組的消息之間不涉及順序性。
要保證順序性,需要做到以下幾點:
- 生產順序性:單一生產者:消息生產的順序性僅支持單一生產者,不同生產者分布在不同的系統,即使設置相同的消息組,不同生產者之間產生的消息也無法判定其先后順序。串行發送:Apache RocketMQ 生產者客戶端支持多線程安全訪問,但如果生產者使用多線程并行發送,則不同線程間產生的消息將無法判定其先后順序。滿足以上條件的生產者,將順序消息發送至 Apache RocketMQ 后,會保證設置了同一消息組的消息,按照發送順序存儲在同一隊列中,相同消息組的消息按照先后順序被存儲在同一個隊列。不同消息組的消息可以混合在同一個隊列中,且不保證連續。
- 消費順序性:投遞順序,RocketMQ 通過客戶端SDK和服務端通信協議保障消息按照服務端存儲順序投遞,但業務方消費消息時需要嚴格按照接收---處理---應答的語義處理消息,避免因異步處理導致消息亂序;有限重試,RocketMQ 順序消息投遞僅在重試次數限定范圍內,即一條消息如果一直重試失敗,超過最大重試次數后將不再重試,跳過這條消息消費,不會一直阻塞后續消息處理,對于需要嚴格保證消費順序的場景,請務設置合理的重試次數,避免參數不合理導致消息亂序。
+++
- 事務消息:這個就涉及到分布式事務了,讀者們先見官網吧:https://rocketmq.apache.org/zh/docs/featureBehavior/04transactionmessage,這個分布式事務,我認為是分布式系統中的最難問題了。本文僅作簡單探討。
②消息(可靠性保證&消費)
在分布式系統中,消息中間件的可靠性是保障業務數據一致性和系統健壯性的關鍵。RocketMQ 通過多層次的機制確保消息從生產者發送到Broker存儲再到消費者消費的整個生命周期中不丟失、不重復且有序傳遞。
- 生產者端消息發送可靠性保證:內置消息發送重試機制
觸發消息發送重試機制的條件如下:客戶端消息發送請求調用失敗或請求超時;網絡異常造成連接失敗或請求超時;服務端節點處于重啟或下線等狀態造成連接失敗;服務端運行慢造成請求超時;服務端返回失敗錯誤碼,系統邏輯錯誤:因運行邏輯不正確造成的錯誤,系統流控錯誤【這個重試有點不一樣,見官網】:因容量超限造成的流控錯誤。
1.同步發送模式
生產者發送消息后阻塞等待Broker的確認響應(SendResult),若未收到ACK或收到失敗響應,觸發重試機制。
【這個是最可靠的方式。生產者在發送消息后,會等待 Broker 的確認(ack),確認消息成功到達 Broker 后,生產者才會認為發送成功。】
SendResult sendResult = producer.send(msg);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("消息發送成功");
}
確保消息成功到達Broker,避免異步發送因網絡抖動導致的消息丟失。
在構建生產者的時候,
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setMaxAttempts(3) // 重試次數默認是3次
.setClientConfiguration(configuration)
.build();
2.異步發送模式
異步發送:調用線程不會阻塞,但調用結果會通過異常事件或者成功事件返回。
CompletableFuture<SendReceipt> future = producer.sendAsync(message);
future.whenCompleteAsync((sendReceipt1, throwable) -> {
if (throwable != null) {
// 消息發送失敗,原因等異常。
System.out.println("Failed to send message");
} else {
// 消息發送成功。
System.out.println("Send message successfully, messageId=: "+ sendReceipt1.getMessageId());
}
});
- Broker端消息可靠性保證
1.消息持久化
刷盤策略:
- 同步刷盤(FlushDiskType.SYNC_FLUSH):消息寫入內存后立即刷盤,保證宕機不丟數據,但性能較低。
- 異步刷盤(FlushDiskType.ASYNC_FLUSH):定期批量刷盤,性能高但存在數據丟失風險(默認配置)。
配置建議:金融級場景建議同步刷盤,一般場景異步刷盤+主從復制。
2.主從復制
復制模式:
- 同步復制:消息寫入主節點后需同步到從節點才返回成功,保證數據強一致。
- 異步復制:主節點寫入成功即返回,從節點異步復制(性能更高,默認模式)。
broker角色配置:
brokerRole 默認ASYNC_MASTER 可選值:SYNC_MASTER,ASYNC_MASTER,SLAVE
broker刷盤類型:
flushDiskType 默認ASYNC_FLUSH 可選值:SYNC_FLUSH,ASYNC_FLUSH
SYNC_FLUSH 模式下的 broker 保證在收到確認生產者之前將消息刷盤。
ASYNC_FLUSH 模式下的 broker 則利用刷盤一組消息的模式,可以取得更好的性能。
- 消費者端消息可靠性保證
推薦使用消息重試場景如下:
-
業務處理失敗,且失敗原因跟當前的消息內容相關,比如該消息對應的事務狀態還未獲取到,預期一段時間后可執行成功。
-
消費失敗的原因不會導致連續性,即當前消息消費失敗是一個小概率事件,不是常態化的失敗,后面的消息大概率會消費成功。此時可以對當前消息進行重試,避免進程阻塞。
消費者可靠性見第三章吧。
3.實操及相關分析
本節就使用Springboot2.7.18 整合 RocketMQ,來模擬平時的一些場景。
3.1 訂單超時關閉
這個場景用到它還是比較合適的,就用他的延時消息,官網也說到了這個場景的:以電商交易場景為例,訂單下單后暫未支付,此時不可以直接關閉訂單,而是需要等待一段時間后才能關閉訂單。使用 Apache RocketMQ 定時消息可以實現超時任務的檢查觸發。
我們先創建延時相關的主題,然后見如下代碼示例:
@RestController
public class OrderExpireController {
@Resource
private OrderExpireProducer orderExpireProducer;
@PostMapping("/orderSubmit")
public R orderSubmit(String orderId) {
// 模擬訂單超時關閉
// 1.mq發送一條延遲消息,延遲時間60秒
orderExpireProducer.sendOrderExpireMessage(orderId, 60);
return R.success().setData("msg", "訂單提交成功,請耐心等待");
}
}
// 生產者-----------------
@Service
@Slf4j
public class OrderExpireProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
public void sendOrderExpireMessage(String message, long delayTime) {
log.info("訂單延遲消息發送,訂單ID:{},延遲時間:{}", message, delayTime);
message = "[訂單號]" + message + "|" + delayTime;
SendResult sendResult = null;
int retryCount = 0;
// 這里是模擬重試!!!要注意!!!
do {
try {
sendResult = rocketMQTemplate.syncSendDelayTimeSeconds(TopicConstant.ORDER_EXPIRE_TOPIC, message, delayTime);
//sendResult = rocketMQTemplate.syncSendDelayTimeSeconds(TopicConstant.ORDER_EXPIRE_TOPIC, message, 0); // 模擬錯誤
} catch (Exception e) {
log.error("訂單延遲消息發送出現異常,訂單ID:{},錯誤信息:{}", message, e.getMessage());
}
if ( sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK ) {
log.info("訂單延遲消息發送成功,訂單ID:{}", message); break;
} else {
log.error("訂單延遲消息發送失敗,訂單ID:{}", message);
// 可以考慮將此消息存起來,或者重試.....
}
retryCount++;
} while (sendResult == null && retryCount < 3);
}
}
// 消費者----------------------
@Service
@Slf4j
@RocketMQMessageListener(
topic = TopicConstant.ORDER_EXPIRE_TOPIC,
consumerGroup = ConsumerGroup.ORDER_EXPIRE_GROUP,
/*
默認集群消費模式:當使用集群消費模式時,RocketMQ 認為任意一條消息只需要被消費組內的任意一個消費者處理即可。
廣播消費模式:當使用廣播消費模式時,RocketMQ 會將每條消息推送給消費組所有的消費者,保證消息至少被每個消費者消費一次。
*/
messageModel = MessageModel.CLUSTERING
)
public class OrderExpireConsumer implements RocketMQListener<String> {
public void onMessage(String message) {
log.info("【訂單過期消息消費者】收到消息:{}", message);
log.info("【訂單過期消息消費者】處理訂單消息開始......修改訂單狀態..等等操作");
// 處理訂單過期消息.....
}
}
2025-06-27 15:09:42.575 INFO 5192 --- [nio-8900-exec-1] c.f.m.r.producer1.OrderExpireProducer : 訂單延遲消息發送,訂單ID:123456789,延遲時間:60
2025-06-27 15:09:42.664 INFO 5192 --- [nio-8900-exec-1] c.f.m.r.producer1.OrderExpireProducer : 訂單延遲消息發送成功,訂單ID:[訂單號]123456789|60
2025-06-27 15:10:44.026 INFO 5192 --- [-expire-group_1] c.f.m.r.consumer1.OrderExpireConsumer : 【訂單過期消息消費者】收到消息:[訂單號]123456789|60
2025-06-27 15:10:44.026 INFO 5192 --- [-expire-group_1] c.f.m.r.consumer1.OrderExpireConsumer : 【訂單過期消息消費者】處理訂單消息開始......修改訂單狀態..等等操作
現在有一個疑問,那就是如果只有一個消費者,我現在發送了兩條延時消息,會導致第二條消息處理的時間往后延五秒鐘嗎?
看下面的實驗結果:【只有一個消費者的時候,發送兩條】

第二條消息是05秒發出的,按照我們的想法,應該是25 + 5 = 30秒被收到,但是實際結果沒有延遲5秒,消息發送到處理,這個過程兩條消息間隔差不多都是20秒。說明是多線程消費的!

看一下RocketMQMessageListener注解里面的參數,我們會發現一些端倪。當未顯式設置consumeMode時,默認使用ConsumeMode.CONCURRENTLY(并發消費)。
雖然是多線程的了,但是它還是會有最大線程數,一個人的能力畢竟是有限的。我們可以新增消費者2,監聽同一個主題,在同一個消費者組里面。新增消費者實例會觸發RocketMQ的Rebalance機制,系統會重新分配Topic的MessageQueue到所有消費者實例。假設原Topic有4個隊列【創建Topic的時候默認是八個隊列】,原消費者組有1個實例 → 新增1個實例后,隊列分配可能變為:
消費者1 → 隊列0、1 消費者2 → 隊列2、3【每個隊列僅被一個消費者實例處理,確保消息不重復消費】
但是我們要注意一個問題:
- 若Topic隊列數 ≥ 消費者實例數,新增實例會直接提升并行處理能力
- 若Topic隊列數 < 消費者實例數,多余實例將處于空閑狀態(無隊列分配)
倘若我們這樣設置,那么就是單線程處理消息了:
@Service
@Slf4j
@RocketMQMessageListener(
topic = TopicConstant.ORDER_EXPIRE_TOPIC,
consumerGroup = ConsumerGroup.ORDER_EXPIRE_GROUP,
messageModel = MessageModel.CLUSTERING,
consumeThreadMax = 1, // 設置這個
consumeThreadNumber = 1 // 設置這個
)
public class OrderExpireConsumer implements RocketMQListener<String> {
public void onMessage(String message) {
log.info("【===============================】");
log.info("【訂單過期消息消費者】收到消息:{}", message);
log.info("【訂單過期消息消費者】處理訂單消息開始......修改訂單狀態..等等操作");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("【===============================】");
// 處理訂單過期消息.....
}
}
SpringBoot的RocketMQTemplate是有重試機制的,所以我們上面只是模擬重試的:
// DefaultMQProducerImpl.java
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
...
// 重試
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
for (; times < timesTotal; times++) {
.......
}
}
3.2 消費消息
模擬這樣一個場景,用戶下單后,系統需調用庫存服務扣減庫存。若庫存服務暫時不可用(如網絡抖動),需自動重試;若多次重試仍失敗,則轉人工處理。
核心流程:訂單服務發送訂單消息到 order-topic,庫存消費者嘗試扣減庫存,失敗時觸發重試,重試3次后仍失敗,消息進入死信隊列,死信隊列消費者記錄日志并發送告警。【注意哦,RocketMQ也是有死信隊列的喔】
@RocketMQMessageListener的形式:配置簡單,參數固定。
// 發送消息
public void sendReliabilityOrderTest( String orderId ) {
SendResult sendResult = rocketMQTemplate.syncSend(TopicConstant.CONSUMER_RELIABILITY_TOPIC, MessageBuilder.withPayload(orderId).build());
if ( sendResult.getSendStatus() == SendStatus.SEND_OK )
System.out.printf("【消費者可靠性測試】同步發送ok結果: %s\n", orderId);
else
System.out.printf("【消費者可靠性測試】同步發送失敗: %s\n", orderId);
}
// 扣減庫存的消費者
@Service
@Slf4j
@RocketMQMessageListener(
topic = TopicConstant.CONSUMER_RELIABILITY_TOPIC,
consumerGroup = ConsumerGroup.CONSUMER_RELIABILITY,
messageModel = MessageModel.CLUSTERING,
maxReconsumeTimes = 2, // 最大重試次數
suspendCurrentQueueTimeMillis = 1500 // 暫停時間
)
public class ReliabilityConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("【處理訂單消息-準備扣減庫存========================】");
boolean res = handleMessage(s); // 這里要拋異常出去----------
}
private boolean handleMessage(String message) {
// 隨機生成true或者false
Random random = new Random();
// boolean flag = random.nextBoolean();
boolean flag = false;
log.info("處理消息:{}", message);
if ( !flag ) {
log.error("處理消息失敗--出現了意想不到的異常");
throw new RuntimeException("處理消息失敗");
}
// 處理消息
return true;
}
}
按照上面的寫法,會自動重試喔,重試之后,我們會發現,dashboard里面出現了新的死信topic:%DLQ%reliability-order-group,它是以%DLQ%字符串開頭的。但是它的topic后綴是那個處理消息失敗的消費者組名稱。
// 死信消費
@Service
@Slf4j
@RocketMQMessageListener(
topic = "%DLQ%" + ConsumerGroup.CONSUMER_RELIABILITY, // 死信TOPIC,注意他的組成
consumerGroup = ConsumerGroup.CONSUMER_RELIABILITY_DLQ, // 死信隊列消費組
messageModel = MessageModel.CLUSTERING
)
public class ReliabilityDLQConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("【死信隊列消費者】收到消息:{}", s);
// 通知
log.info("【死信隊列消費者】處理死信消息開始......通知人工處理~~~~");
// 存儲該死信
log.info("【死信隊列消費者】處理死信消息結束......存儲數據..");
}
}
從官網我們得知:消費重試指的是,消費者在消費某條消息失敗后,RocketMQ 服務端會根據重試策略重新消費該消息,超過一定次數后若還未消費成功,則該消息將不再繼續重試,直接被發送到死信隊列中。
會在如下情況觸發重試:
- 消費失敗,包括消費者返回消息失敗狀態標識或拋出非預期異常。
- 消息處理超時,包括在PushConsumer中排隊超時。
看到這里,相信讀者們肯定有這樣一個想法,這個注解好神奇啊,我們只需要按照約定,配置一下邏輯,一個消費者就完成了,包括其自動重試機制也有,那么,這個原理是什么呢?本文制作簡單引導,詳情見后續文章。
熟悉SpringBoot自動配置原理的同學肯定知道,我們導入了rocketmq-spring-boot-starter這樣一個依賴,我們去類路徑下找一下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
導入了其自動配置:RocketMQAutoConfiguration
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@ConditionalOnClass({MQAdmin.class})
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server", matchIfMissing = true)
// 導入了這個`ListenerContainerConfiguration`
@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class,
ExtConsumerResetConfiguration.class, RocketMQTransactionConfiguration.class, RocketMQListenerConfiguration.class})
@AutoConfigureAfter({MessageConverterConfiguration.class})
@AutoConfigureBefore({RocketMQTransactionConfiguration.class})
public class RocketMQAutoConfiguration implements ApplicationContextAware {
....
}
@Configuration
@ConditionalOnMissingBean(RocketMQMessageListenerContainerRegistrar.class)
public class ListenerContainerConfiguration {
// 創建了這樣一個bean
@Bean
public RocketMQMessageListenerContainerRegistrar rocketMQMessageListenerContainerRegistrar(RocketMQMessageConverter rocketMQMessageConverter, ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) {
return new RocketMQMessageListenerContainerRegistrar(rocketMQMessageConverter, environment, rocketMQProperties);
}
}
// 實現了這樣一個ApplicationContextAware接口
public class RocketMQMessageListenerContainerRegistrar implements ApplicationContextAware {
....
// 在下面的BeanPostProcessor里面被調用了,識別注解進行解析
public void registerContainer(String beanName, Object bean, RocketMQMessageListener annotation) {
.....
}
}
同時呢,
@Configuration
@AutoConfigureAfter(RocketMQAutoConfiguration.class)
public class RocketMQListenerConfiguration implements ImportBeanDefinitionRegistrar {
// 注冊了這樣一個bean定義,所以容器就會有一個這樣的bean:RocketMQMessageListenerBeanPostProcessor
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
if (!registry.containsBeanDefinition(RocketMQMessageListenerBeanPostProcessor.class.getName())) {
registry.registerBeanDefinition(RocketMQMessageListenerBeanPostProcessor.class.getName(),
new RootBeanDefinition(RocketMQMessageListenerBeanPostProcessor.class));
}
}
}
// RocketMQMessageListenerBeanPostProcessor.java
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> targetClass = AopUtils.getTargetClass(bean);
RocketMQMessageListener ann = targetClass.getAnnotation(RocketMQMessageListener.class);
if (ann != null) {
RocketMQMessageListener enhance = enhance(targetClass, ann);
if (listenerContainerRegistrar != null) {
// 這里調用registerContainer方法
listenerContainerRegistrar.registerContainer方法(beanName, bean, enhance);
}
}
return bean;
}
// 在上面注冊好相關的bean之后,然后通過Lifecycle接口的start
// 【在Spring容器創建過程中,創建好bean之后,在onRefresh方法里面】,在里面啟動消費者
@Override
public void start() {
....
try {
// DefaultMQPushConsumer.start---本質是這個
consumer.start();
} catch (MQClientException e) {
throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
}
this.setRunning(true);
log.info("running container: {}", this.toString());
}
大致過程就是這樣,具體過程就留給讀者自行閱讀源碼了。【需要讀者了解spring創建初始化bean的完整流程,及其擴展點】-- 據說這個是Spring面試的重點
所以說,spring整合的時候除了注解的形式,我們還可以使用下面的形式:
/**
* @version 1.0
* @Author txf
* @Date 2025/6/30 14:48
* @注釋 @Bean的形式創建消費者
*/
@Component
@Slf4j
public class BeanConsumer {
@Value("${rocketmq.name-server}")
private String nameServer;
@Bean
public DefaultMQPushConsumer pushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(ConsumerGroup.CONSUMER_RELIABILITY);
consumer.setNamesrvAddr(nameServer);
consumer.subscribe(TopicConstant.CONSUMER_RELIABILITY_TOPIC, "*");
// 配置重試參數
consumer.setMaxReconsumeTimes(3); // 最多重試3次
consumer.setSuspendCurrentQueueTimeMillis(10000); // 重試間隔10秒
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
MessageExt messageExt = msgs.get(0); // 這里模擬方便,只取了第一個
String msg = new String(messageExt.getBody());
log.info("【===============================】 {}", msg);
try {
// 業務邏輯....
Random random = new Random();
boolean b = random.nextBoolean();
if (b) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
throw new Exception("消費失敗");
} catch (Exception e) {
log.error("消費失敗--{}", e.getMessage());
// 記錄重試次數
int reconsumeTimes = msgs.get(0).getReconsumeTimes();
log.error("消費失敗,重試次數:{}", reconsumeTimes, e);
// 達到最大重試次數后轉死信隊列
if (reconsumeTimes >= 2) { // 已重試2次,本次是第3次
// 發送到死信隊列:sendToDLQ(msgs.get(0));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 觸發重試
}
});
consumer.start();
return consumer;
}
}
3.3 消息冪等性
RocketMQ 保證消息至少送達一次,這意味著在網絡不穩定、客戶端重啟、負載均衡重平衡等情況下,消息極有可能被重復消費。確保消息消費的冪等性是構建可靠消息系統的關鍵要求。我們的核心思路就是: 冪等性保證的責任主要在消費者端。消費者需要設計自己的業務邏輯,使得無論同一條消息被消費多少次,最終的業務結果都是一致的。
現在我們復現一下重復消費的情況:
// 消息生產者
@Service
@Slf4j
public class IdempotentProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
// 發送一個普通消息--topic是 idempotent-topic
public void sendNormalMessage(String message){
Message<String> msg = MessageBuilder.withPayload(message)
// 發送消息的時候附帶一個唯一標識 -- 這里僅是舉個例子,實際使用中請自行處理
// 需確保全局唯一性,推薦使用業務流水號(如訂單號+時間戳),這里用UUID作為例子。
.setHeader("msgId", UUID.randomUUID().toString().substring(0, 8))
.build();
rocketMQTemplate.asyncSend(TopicConstant.IDEMPOTENT_TOPIC, msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("發送成功: {}", sendResult);
}
@Override
public void onException(Throwable e) {
log.error("發送失敗: {}", e.getMessage());
}
});
}
}
// 消息消費者
@Service
@Slf4j
@RocketMQMessageListener(
topic = TopicConstant.IDEMPOTENT_TOPIC,
consumerGroup = ConsumerGroup.IDEMPOTENT_GROUP,
maxReconsumeTimes = 1 // 這里重試一次
)
public class IdempotentConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String body = new String(message.getBody());
handleMessage(body);
}
private void handleMessage(String body) {
// 處理消息....
log.info("【冪等性消費者】處理消息中....: {}", body);
try {
TimeUnit.SECONDS.sleep(20); // 模擬處理業務20秒
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("【冪等性消費者】處理消息完成, 入庫咯~~~");
throw new RuntimeException("出現異常"); // 觸發重試
}
}
上面模擬會出現如下情況:

如果沒有做好冪等性,真實業務可能會一次操作,記錄兩次結果,假設是在扣庫存方面,就會一次下單,然后扣減了兩邊甚至是多次庫存,數據就會產生不一致。
冪等性主要從消費者端來處理:
public class IdempotentConsumer implements RocketMQListener<MessageExt> {
private final Set<String> msgIds = new HashSet<>(128);
@Override
public void onMessage(MessageExt message) {
String msgId = message.getProperty("msgId"); // 自定義的
// 消息在Broker中的唯一ID,全局唯一,看起來適合做冪等,但是 msgId 是系統生成的,
// 與業務含義無關。無法直接關聯到具體的業務操作實例。業務系統通常需要自己的業務唯一標識。
// String msgId1 = message.getMsgId();
String body = new String(message.getBody());
// 判斷消息的唯一性
if ( judgeUnique(msgId) ) {
handleMessage(msgId, body);
}else {
log.info("消息重復,請勿重復處理");
}
}
private void handleMessage(String msgId, String body) {
// 處理消息....
log.info("【冪等性消費者】處理消息: {}", body);
try {
TimeUnit.SECONDS.sleep(20); // 模擬處理業務20秒
// 該消息處理成功之后,msgId可以標識為已處理了
log.info("【冪等性消費者】處理消息完成, 入庫咯~~~");
msgIds.add(msgId);
/*
* if ( ok ) msgIds.add(msgId); // 如果處理是ok的
* else msgIds.remove(msgId); // 反之,移除掉
* */
} catch (InterruptedException e) {
log.info("處理消息異常", e);
// msgIds.remove(msgId);
}
// 這里再次模擬重試的話,消息處理完成之后,只會被處理一次了
throw new RuntimeException("出現異常");
}
private boolean judgeUnique(String msgId) {
// 這里就僅僅用一個Set來判斷消息的唯一性
return msgIds.add(msgId);
/*
方式二:可以結合中間件Redis來判斷
使用Redis的SETNX命令或Redisson實現分布式鎖。消費者獲取鎖后處理消息,完成后釋放鎖。
但是要注意內存問題,如果key過多,可能會內存溢出,建議根據實際情況定期清理key
*/
// 方式三:使用數據庫:【唯一約束等等】
}
}

具體什么方案還是看具體業務的,上面的冪等性處理是消息的唯一鍵標識。在實際過程中,我們還可以給消息處理加上一個狀態,首先為每條消息或對應的業務操作分配一個全局唯一的業務標識(如訂單ID、交易流水號、業務單據號等),在消費消息的時候,先去一個持久化存儲(通常是數據庫)中檢查這個唯一標識對應的業務狀態,基于當前狀態和業務規則,決定是否執行以及如何執行操作。
【生產者端】:為需要保證冪等性的業務操作生成一個全局唯一的標識(uniqueKey),并放入消息的 properties 或消息體中。這個標識必須能唯一確定一個業務操作實例。
【消費者端】:從消息中解析出 uniqueKey,先看這個uniqueKey是否存在過了,如果記錄不存在:說明是第一次處理,正常執行業務邏輯(如創建訂單、扣減庫存等),并在操作成功后,在同一個數據庫事務中將業務記錄和狀態寫入數據庫。如果存在的話,并且它的狀態是已完成/已成功:直接丟棄消息或記錄日志(冪等成功);如果狀態是失敗:根據業務規則決定是重試還是標記為最終失敗;如果狀態是處理中,可以選擇等待一段時間重試檢查,因為處理中的消息可能會失敗喔。最后需要注意的是,檢查狀態、執行業務操作、更新狀態到數據庫,這三個步驟最好在一個數據庫事務中完成,為了防止并發重復消費嘛。
上述過程如下圖所示:
上面僅供參考,實際處理需要讀者自行根據業務判斷,其中Redis并不是必須要引入的。上圖引入Redis僅僅是為了讓處理成功的消息更快被找到,僅此而已。

浙公網安備 33010602011771號