雙主雙從集群搭建和消息的發送消費示例
1、總體架構介紹
下面我們搭建一個雙主雙從的集群,并且采用同步的方式來同步主從之間的信息,總體架構如下:

2、集群工作流程
集群工作流程如下:
- 啟動NameServer,NameServer起來后監聽端口,等待Broker、Producer、Consumer連上來,相當于一個路由控制中心。
- Broker啟動,跟所有的NameServer保持長連接,定時發送心跳包。心跳包中包含當前Broker信息(IP+端口等)以及存儲所有Topic信息。注冊成功后,NameServer集群中就有Topic跟Broker的映射關系。
- 收發消息前,先創建Topic,創建Topic時需要指定該Topic要存儲在哪些Broker上,也可以在發送消息時自動創建Topic。
- Producer發送消息,啟動時先跟NameServer集群中的其中一臺建立長連接,并從NameServer中獲取當前發送的Topic存在哪些Broker上,輪詢從隊列列表中選擇一個隊列,然后與隊列所在的Broker建立長連接從而向Broker發消息。
- Consumer跟Producer類似,跟其中一臺NameServer建立長連接,獲取當前訂閱Topic存在哪些Broker上,然后直接跟Broker建立連接通道,開始消費消息。
通常情況下,broker 的從服務器很少承擔請求處理任務,更多是處于備份和待命狀態。從服務器的主要職責是與主服務器進行數據同步,實時復制主服務器上的消息數據,以保證數據的冗余和高可用性。當主服務器出現故障時,從服務器會通過選舉機制升級為主服務器,承擔起處理所有讀寫請求的任務。
3、搭建集群
3.1、服務器準備
先準備兩臺服務器,比如兩臺虛擬機,可以分別在這兩臺服務器上放不同的主節點和從節點,但注意同一組的 broker 不要放在同一臺服務器上,避免服務器宕機后主從節點一起崩潰。信息如下:
| 序號 | IP | 角色 | 架構模式 |
|---|---|---|---|
| 1 | 192.168.32.130 | nameserver、brokerserver | Master1、Slave2 |
| 2 | 192.168.32.131 | nameserver、brokerserver | Master2、Slave1 |
3.2、修改域名映射
修改 hosts 文件:
vim /etc/hosts
往 hosts 文件中添加以下信息:
# nameserver
192.168.32.130 rocketmq-nameserver1
192.168.32.131 rocketmq-nameserver2
# broker
192.168.32.130 rocketmq-master1
192.168.32.130 rocketmq-slave2
192.168.32.131 rocketmq-master2
192.168.32.131 rocketmq-slave1
3.3、關閉防火墻或者開放端口
宿主機需要遠程訪問虛擬機的rocketmq服務和web服務,需要開放相關的端口號,簡單粗暴的方式是直接關閉防火墻
# 關閉防火墻
systemctl stop firewalld.service
# 查看防火墻的狀態
firewall-cmd --state
# 禁止firewall開機啟動
systemctl disable firewalld.service
或者為了安全,只開放特定的端口號,RocketMQ默認使用3個端口:9876 、10911 、11011 。如果防火墻沒有關閉的話,那么防火墻就必須開放這些端口:
nameserver默認使用 9876 端口master默認使用 10911 端口slave默認使用11011 端口
執行以下命令:
# 開放name server默認端口
firewall-cmd --remove-port=9876/tcp --permanent
# 開放master默認端口
firewall-cmd --remove-port=10911/tcp --permanent
# 開放slave默認端口 (當前集群模式可不開啟)
firewall-cmd --remove-port=11011/tcp --permanent
# 重啟防火墻
firewall-cmd --reload
3.4、配置環境變量
修改 profile 文件:
vim /etc/profile
直接在該文件最后添加以下配置即可:
# set rocketmq 注意:下面的ROCKETMQ_HOME指定的路徑應該是你服務器上rocketmq實際安裝的路徑
ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-4.4.0-bin-release
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH
修改后執行命令重新加載配置,使得配置立刻生效:
source /etc/profile
3.5、創建消息存儲路徑
mkdir /usr/local/rocketmq/store
mkdir /usr/local/rocketmq/store/commitlog
mkdir /usr/local/rocketmq/store/consumequeue
mkdir /usr/local/rocketmq/store/index
因為同一主機上啟動多個broker時,store路徑要不同,所以需要額外另外一個 store1 目錄給從 broker 使用:
mkdir /usr/local/rocketmq/store1 mkdir /usr/local/rocketmq/store1/commitlog mkdir /usr/local/rocketmq/store1/consumequeue mkdir /usr/local/rocketmq/store1/index
3.6、修改broker配置文件
在安裝目錄下的 conf 目錄下,我們可以看到以下目錄結構:

因為我們是采用雙主雙從,同步更新的集群,所以修改 2m-2s-sync 下的配置文件。
- master1
服務器:192.168.32.130 下:
vi /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a.properties
修改配置如下:
# 所屬集群名字 brokerClusterName=rocketmq-cluster # broker名字,注意此處不同的配置文件填寫的不一樣 brokerName=broker-a # 0 表示 Master,>0 表示 Slave brokerId=0 # nameServer地址,分號分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 # 在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數 defaultTopicQueueNums=4 # 是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉 autoCreateTopicEnable=true # 是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉 autoCreateSubscriptionGroup=true # Broker 對外服務的監聽端口 listenPort=10911 # 刪除文件時間點,默認凌晨 4點 deleteWhen=04 # 文件保留時間,默認 48 小時 fileReservedTime=120 # commitLog每個文件的大小默認1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue每個文件默認存30W條,根據業務情況調整 mapedFileSizeConsumeQueue=300000 # destroyMapedFileIntervalForcibly=120000 # redeleteHangedFileInterval=120000 # 檢測物理文件磁盤空間 diskMaxUsedSpaceRatio=88 # 存儲路徑 storePathRootDir=/usr/local/rocketmq/store # commitLog 存儲路徑 storePathCommitLog=/usr/local/rocketmq/store/commitlog # 消費隊列存儲路徑存儲路徑 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue # 消息索引存儲路徑 storePathIndex=/usr/local/rocketmq/store/index # checkpoint 文件存儲路徑 storeCheckpoint=/usr/local/rocketmq/store/checkpoint # abort 文件存儲路徑 abortFile=/usr/local/rocketmq/store/abort # 限制的消息大小 maxMessageSize=65536 # flushCommitLogLeastPages=4 # flushConsumeQueueLeastPages=2 # flushCommitLogThoroughInterval=10000 # flushConsumeQueueThoroughInterval=60000 # Broker 的角色 # - ASYNC_MASTER 異步復制Master # - SYNC_MASTER 同步雙寫Master # - SLAVE brokerRole=SYNC_MASTER # 刷盤方式 # - ASYNC_FLUSH 異步刷盤 # - SYNC_FLUSH 同步刷盤 flushDiskType=SYNC_FLUSH # checkTransactionMessageEnable=false # 發消息線程池數量 # sendMessageThreadPoolNums=128 # 拉消息線程池數量 # pullMessageThreadPoolNums=128
- slave2
服務器:192.168.32.130 下:
vi /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b-s.properties
修改配置如下:
# 所屬集群名字 brokerClusterName=rocketmq-cluster # broker名字,注意此處不同的配置文件填寫的不一樣 brokerName=broker-b # 0 表示 Master,>0 表示 Slave brokerId=1 # nameServer地址,分號分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 # 在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數 defaultTopicQueueNums=4 # 是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉 autoCreateTopicEnable=true # 是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉 autoCreateSubscriptionGroup=true # Broker 對外服務的監聽端口 listenPort=11011 # 刪除文件時間點,默認凌晨 4點 deleteWhen=04 # 文件保留時間,默認 48 小時 fileReservedTime=120 # commitLog每個文件的大小默認1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue每個文件默認存30W條,根據業務情況調整 mapedFileSizeConsumeQueue=300000 # destroyMapedFileIntervalForcibly=120000 # redeleteHangedFileInterval=120000 # 檢測物理文件磁盤空間 diskMaxUsedSpaceRatio=88 # 存儲路徑 storePathRootDir=/usr/local/rocketmq/store1 # commitLog 存儲路徑 storePathCommitLog=/usr/local/rocketmq/store1/commitlog # 消費隊列存儲路徑存儲路徑 storePathConsumeQueue=/usr/local/rocketmq/store1/consumequeue # 消息索引存儲路徑 storePathIndex=/usr/local/rocketmq/store1/index # checkpoint 文件存儲路徑 storeCheckpoint=/usr/local/rocketmq/store1/checkpoint # abort 文件存儲路徑 abortFile=/usr/local/rocketmq/store1/abort # 限制的消息大小 maxMessageSize=65536 # flushCommitLogLeastPages=4 # flushConsumeQueueLeastPages=2 # flushCommitLogThoroughInterval=10000 # flushConsumeQueueThoroughInterval=60000 # Broker 的角色 # - ASYNC_MASTER 異步復制Master # - SYNC_MASTER 同步雙寫Master # - SLAVE brokerRole=SLAVE # 刷盤方式 # - ASYNC_FLUSH 異步刷盤 # - SYNC_FLUSH 同步刷盤 flushDiskType=ASYNC_FLUSH # checkTransactionMessageEnable=false # 發消息線程池數量 # sendMessageThreadPoolNums=128 # 拉消息線程池數量 # pullMessageThreadPoolNums=128
- master2
服務器:192.168.32.131
vi /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b.properties
修改配置如下:
# 所屬集群名字 brokerClusterName=rocketmq-cluster # broker名字,注意此處不同的配置文件填寫的不一樣 brokerName=broker-b # 0 表示 Master,>0 表示 Slave brokerId=0 # nameServer地址,分號分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 # 在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數 defaultTopicQueueNums=4 # 是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉 autoCreateTopicEnable=true # 是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉 autoCreateSubscriptionGroup=true # Broker 對外服務的監聽端口 listenPort=10911 # 刪除文件時間點,默認凌晨 4點 deleteWhen=04 # 文件保留時間,默認 48 小時 fileReservedTime=120 # commitLog每個文件的大小默認1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue每個文件默認存30W條,根據業務情況調整 mapedFileSizeConsumeQueue=300000 # destroyMapedFileIntervalForcibly=120000 # redeleteHangedFileInterval=120000 # 檢測物理文件磁盤空間 diskMaxUsedSpaceRatio=88 # 存儲路徑 storePathRootDir=/usr/local/rocketmq/store # commitLog 存儲路徑 storePathCommitLog=/usr/local/rocketmq/store/commitlog # 消費隊列存儲路徑存儲路徑 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue # 消息索引存儲路徑 storePathIndex=/usr/local/rocketmq/store/index # checkpoint 文件存儲路徑 storeCheckpoint=/usr/local/rocketmq/store/checkpoint # abort 文件存儲路徑 abortFile=/usr/local/rocketmq/store/abort # 限制的消息大小 maxMessageSize=65536 # flushCommitLogLeastPages=4 # flushConsumeQueueLeastPages=2 # flushCommitLogThoroughInterval=10000 # flushConsumeQueueThoroughInterval=60000 # Broker 的角色 # - ASYNC_MASTER 異步復制Master # - SYNC_MASTER 同步雙寫Master # - SLAVE brokerRole=SYNC_MASTER # 刷盤方式 # - ASYNC_FLUSH 異步刷盤 # - SYNC_FLUSH 同步刷盤 flushDiskType=SYNC_FLUSH # checkTransactionMessageEnable=false # 發消息線程池數量 # sendMessageThreadPoolNums=128 # 拉消息線程池數量 # pullMessageThreadPoolNums=128
-
slave1
服務器:192.168.32.131:
vi /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a-s.properties
修改配置如下:
# 所屬集群名字 brokerClusterName=rocketmq-cluster # broker名字,注意此處不同的配置文件填寫的不一樣 brokerName=broker-a # 0 表示 Master,>0 表示 Slave brokerId=1 # nameServer地址,分號分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 # 在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數 defaultTopicQueueNums=4 # 是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉 autoCreateTopicEnable=true # 是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉 autoCreateSubscriptionGroup=true # Broker 對外服務的監聽端口 listenPort=11011 # 刪除文件時間點,默認凌晨 4點 deleteWhen=04 # 文件保留時間,默認 48 小時 fileReservedTime=120 # commitLog每個文件的大小默認1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue每個文件默認存30W條,根據業務情況調整 mapedFileSizeConsumeQueue=300000 # destroyMapedFileIntervalForcibly=120000 # redeleteHangedFileInterval=120000 # 檢測物理文件磁盤空間 diskMaxUsedSpaceRatio=88 # 存儲路徑 storePathRootDir=/usr/local/rocketmq/store1 # commitLog 存儲路徑 storePathCommitLog=/usr/local/rocketmq/store1/commitlog # 消費隊列存儲路徑存儲路徑 storePathConsumeQueue=/usr/local/rocketmq/store1/consumequeue # 消息索引存儲路徑 storePathIndex=/usr/local/rocketmq/store1/index # checkpoint 文件存儲路徑 storeCheckpoint=/usr/local/rocketmq/store1/checkpoint # abort 文件存儲路徑 abortFile=/usr/local/rocketmq/store1/abort # 限制的消息大小 maxMessageSize=65536 # flushCommitLogLeastPages=4 # flushConsumeQueueLeastPages=2 # flushCommitLogThoroughInterval=10000 # flushConsumeQueueThoroughInterval=60000 # Broker 的角色 # - ASYNC_MASTER 異步復制Master # - SYNC_MASTER 同步雙寫Master # - SLAVE brokerRole=SLAVE # 刷盤方式 # - ASYNC_FLUSH 異步刷盤 # - SYNC_FLUSH 同步刷盤 flushDiskType=ASYNC_FLUSH # checkTransactionMessageEnable=false # 發消息線程池數量 # sendMessageThreadPoolNums=128 # 拉消息線程池數量 # pullMessageThreadPoolNums=128
3.7、修改啟動腳本文件
根據內存大小對JVM參數進行適當的調整:
vi /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/bin/runbroker.sh
vi /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/bin/runserver.sh
參考修改如下:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
3.8、啟動服務
先關閉 rocketmq,然后再重新啟動集群:
# 關閉NameServer
sh mqshutdown namesrv
# 關閉Broker
sh mqshutdown broker
- 啟動nameserve集群
分別在192.168.32.130和192.168.32.131啟動NameServer:
# 由于已經配置了環境變量,所以可以直接執行以下命令,不然需要在 bin 目錄下執行 nohup sh mqnamesrv &
- 啟動broker集群
在192.168.32.130上啟動master1和slave2:
# 啟動master1: nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a.properties & # 啟動slave2: nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b-s.properties &
在192.168.32.131上啟動master2和slave1:
# 啟動master2: nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b.properties & # 啟動slave1: nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a-s.properties &
啟動后可通過JPS查看啟動進程

4、發送消息
新建一個 maven 工程,導入MQ客戶端依賴(MQ 集群相當于是服務器端,往集群發送消息也就相當于是客戶端):
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency>
消息發送者步驟分析:
- 創建消息生產者producer,并制定生產者組名
- 指定Nameserver地址
- 啟動producer
- 創建消息對象,指定主題Topic、Tag(消息標簽)和消息體
- 發送消息
- 關閉生產者producer
消息消費者步驟分析:
- 創建消費者Consumer,制定消費者組名
- 指定Nameserver地址
- 訂閱主題Topic和Tag
- 設置回調函數,處理消息
- 啟動消費者consumer
在發送消息時,需指定主題 Topic、Tag 和消息體,消費者就可以根據主題和 Tag 來訂閱消息,以此來接收到指定主題和 Tag 的消息。
(Tag 標簽的作用:消費者訂閱了某個Topic后,消息隊列RocketMQ版會將該Topic中的所有消息投遞給消費端進行消費。若消費者只需要關注部分消息,可通過設置過濾條件在消息隊列RocketMQ版服務端完成消息過濾,只消費需要關注的消息。)
4.1、發送同步消息
同步消息指的是向 broker 發送消息后會一直等待 broker 服務器返回發送結果才會執行下一步的程序,否則程序進程一直會等待結果返回。這種可靠性同步地發送方式使用的比較廣泛,比如:重要的消息通知,短信通知。
/** * 發送同步消息 */ public class SyncProducer { public static void main(String[] args) throws Exception { //1.創建消息生產者producer,并制定生產者組名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 producer.setNamesrvAddr("192.168.32.130:9876;192.168.32.131:9876"); //3.啟動producer producer.start(); for (int i = 0; i < 10; i++) { //4.創建消息對象,指定主題Topic、Tag和消息體 /** * 參數一:消息主題Topic * 參數二:消息Tag * 參數三:消息內容 */ Message msg = new Message("base", "Tag1", ("同步消息Hello World" + i).getBytes()); //5.發送消息 SendResult result = producer.send(msg); //發送狀態 SendStatus status = result.getSendStatus(); System.out.println("發送結果:" + result); //線程睡1秒 TimeUnit.SECONDS.sleep(1); } //6.關閉生產者producer producer.shutdown(); } }
4.2、發送異步消息
異步消息指的是向 broker 發送消息后,producer發送消息線程不阻塞。發送異步消息時可以指定消息發送成功和發送異常的回調方法,這些回調任務在消息發送返回后會在一個新的線程中執行 。
異步消息通常用在對響應時間敏感的業務場景,即發送端不能容忍長時間地等待Broker的響應。
/** * 發送異步消息 */ public class AsyncProducer { public static void main(String[] args) throws Exception { //1.創建消息生產者producer,并制定生產者組名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 producer.setNamesrvAddr("192.168.32.130:9876;192.168.32.131:9876"); //3.啟動producer producer.start(); for (int i = 0; i < 10; i++) { //4.創建消息對象,指定主題Topic、Tag和消息體 /** * 參數一:消息主題Topic * 參數二:消息Tag * 參數三:消息內容 */ Message msg = new Message("base", "Tag2", ("異步消息Hello World" + i).getBytes()); //5.發送異步消息 producer.send(msg, new SendCallback() { /** * 發送成功回調函數 * @param sendResult */ public void onSuccess(SendResult sendResult) { System.out.println("發送結果:" + sendResult); } /** * 發送失敗回調函數 * @param e */ public void onException(Throwable e) { System.out.println("發送異常:" + e); } }); //線程睡1秒 TimeUnit.SECONDS.sleep(1); } //6.關閉生產者producer producer.shutdown(); } }
4.3、發送單向消息
發送單向消息時,broker 服務器不會有返回結果,producer 客戶端也無需等待broker 服務器的結果 。
這種方式主要用在不特別關心發送結果的場景,例如日志發送。
/** * 發送單向消息 */ public class OneWayProducer { public static void main(String[] args) throws Exception, MQBrokerException { //1.創建消息生產者producer,并制定生產者組名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 producer.setNamesrvAddr("192.168.32.130:9876;192.168.32.131:9876"); //3.啟動producer producer.start(); for (int i = 0; i < 3; i++) { //4.創建消息對象,指定主題Topic、Tag和消息體 /** * 參數一:消息主題Topic * 參數二:消息Tag * 參數三:消息內容 */ Message msg = new Message("base", "Tag3", ("單向消息Hello World" + i).getBytes()); //5.發送單向消息。沒有任何返回結果 producer.sendOneway(msg); //線程睡1秒 TimeUnit.SECONDS.sleep(1); } //6.關閉生產者producer producer.shutdown(); } }
5、接收消息
可以通過 DefaultMQPushConsumer 類來創建消費者,以此接收消息:
/** * 消息的接受者 */ public class Consumer { public static void main(String[] args) throws Exception { //1.創建消費者Consumer,制定消費者組名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 consumer.setNamesrvAddr("192.168.32.130:9876;192.168.32.131:9876"); //3.訂閱主題Topic和Tag。下面的*表示監聽base的所有Tag consumer.subscribe("base", "*"); //可指定消費模式:負載均衡|廣播模式。默認為負載均衡模式 //MessageModel.CLUSTERING-負載均衡模式 MessageModel.BROADCASTING-廣播模式 //consumer.setMessageModel(MessageModel.BROADCASTING); //4.設置回調函數,處理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { //接受消息內容 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.啟動消費者consumer consumer.start(); } }
消費者啟動后不會自動退出,由此一旦生產者有發送消息,消費者即可接收到。即使在消費者開啟之前已經有消息發送,消費者也可接收到這些消息。
5.1、負載均衡消費模式(集群模式)
消費者消費消息有負載均衡和廣播模式,默認為負載均衡消費模式。
Consumer 在拉取消息之前需要對 Topic 的 Message 進行負載操作,簡單來說就是將Topic下的MessageQueue分配給這些Consumer,至于怎么分,就是通過這些負載策略定義的算法規則來劃分。
RocketMQ 默認使用的負載均衡策略為平均負載策略。如果某個Consumer集群,訂閱了相同的某個Topic,Topic下面的這些MessageQueue會被平均分配給集群中的所有Consumer中。
如下圖:

示例:
比如上面的消費者方法我們手動設置為負載均衡模式:
//指定消費模式為負載均衡(可以不用指定也行,默認就是負載均衡模式) consumer.setMessageModel(MessageModel.CLUSTERING);
然后同時啟動兩個相同的消費者 main 進程,然后發送消息,結果如下:

可以看到,兩個進程消費了相同數量的消息,并且平均分配。
5.2、廣播模式
廣播消費,類似于ActiveMQ中的發布訂閱模式,每條消息都將會對一個Consumer Group下的各個Consumer實例都消費一遍,每個消費者消費的消息都是一樣多的。
示例:
將消費者方法手動設置為廣播模式:
//指定消費模式為廣播模式 consumer.setMessageModel(MessageModel.BROADCASTING);
然后同時啟動兩個相同的消費者 main 進程,然后發送消息,結果如下:

可以看到,兩個進程對每條消息都進行了消費。
6、順序消息
默認情況下,消費者接收消息時,并不能保證接收到的消息順序的。通過發送順序消息,可以保證消費者接收的消息有序。消息有序指的是可以按照消息的發送順序來消費(FIFO)。RocketMQ可以嚴格的保證消息有序,可以分為分區有序或者全局有序。
順序消費的原理解析:在默認的情況下消息發送會采取Round Robin輪詢方式把消息發送到不同的queue(分區隊列),而消費消息時會從多個queue上拉取消息,這種情況發送和消費是不能保證順序。但是如果控制發送的順序消息只依次發送到同一個queue中,消費的時候只從這個queue上依次拉取,則就保證了順序。當發送和消費參與的queue只有一個,則是全局有序;如果發送到多個queue,但是同一系列的消息發送到同一個 queue,則能保證分區有序,即相對每個queue,消息都是有序的。
下面我們用訂單進行分區有序的示例,即保證在隸屬于同一個訂單時,消費者接收到的消息是有序的。假設一個訂單的順序流程是:創建、付款、推送、完成。則我們控制訂單號相同的消息會被先后發送到同一個隊列中,消費時,同一個OrderId獲取到的肯定是同一個隊列。
先創建一個訂單實體類:
/** * 訂單構建者 */ public class OrderStep { private long orderId; private String desc; public long getOrderId() { return orderId; } public void setOrderId(long orderId) { this.orderId = orderId; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } @Override public String toString() { return "OrderStep{" + "orderId=" + orderId + ", desc='" + desc + '\'' + '}'; } //該方法可用于生成訂單列表 public static List<OrderStep> buildOrders() { // 1039L : 創建 付款 推送 完成 // 1065L : 創建 付款 // 7235L :創建 付款 List<OrderStep> orderList = new ArrayList<OrderStep>(); OrderStep orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("創建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1065L); orderDemo.setDesc("創建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(7235L); orderDemo.setDesc("創建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1065L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(7235L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1065L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("推送"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(7235L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("完成"); orderList.add(orderDemo); return orderList; } }
生產者發送消息:
public class Producer { public static void main(String[] args) throws Exception { //1.創建消息生產者producer,并制定生產者組名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 producer.setNamesrvAddr("192.168.32.130:9876;192.168.32.131:9876"); //3.啟動producer producer.start(); //構建消息集合 List<OrderStep> orderSteps = OrderStep.buildOrders(); //發送消息 for (int i = 0; i < orderSteps.size(); i++) { String body = orderSteps.get(i) + ""; //消息體 Message message = new Message("OrderTopic", "Order", "i" + i, body.getBytes()); /** * 發送消息 * 參數一:消息對象 * 參數二:消息隊列的選擇器 * 參數三:選擇隊列的業務標識(訂單ID) */ SendResult sendResult = producer.send(message, new MessageQueueSelector() { /** * 該方法用于控制哪些消息被放在同一隊列中 * @param mqs:隊列集合 * @param msg:消息對象 * @param arg:業務標識的參數 * @return */ @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { long orderId = (long) arg; long index = orderId % mqs.size(); return mqs.get((int) index); //最終返回一個消息隊列,消息隊列一致的消息會放在同一隊列中 } }, orderSteps.get(i).getOrderId()); System.out.println("發送結果:" + sendResult); } producer.shutdown(); } }
消費者消費消息:
public class Consumer { public static void main(String[] args) throws MQClientException { //1.創建消費者Consumer,制定消費者組名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 consumer.setNamesrvAddr("192.168.32.130:9876;192.168.32.131:9876"); //3.訂閱主題Topic和Tag consumer.subscribe("OrderTopic", "*"); //4.注冊消息監聽器。接收順序消息時,參數為MessageListenerOrderly實例 consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { System.out.println("線程名稱:【" + Thread.currentThread().getName() + "】:" + new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } }); //5.啟動消費者 consumer.start(); System.out.println("消費者啟動"); } }
先執行生產者發送消息,然后執行消費者接收消息,運行結果如下:

可以看到,雖然不是全局有序,但是是分區有序的,即同一訂單的內容是有序的。
7、延時消息
rocketmq提供一種延時消息的解決方案,就是在特定的時間到了,消息才會被投遞出去供consumer消費。broker 在接收到延遲消息的時候會把對應延遲級別的消息先存儲到對應的延遲隊列中,等延遲消息時間到達時,會把消息重新存儲到對應的 topic 的 queue 里面,以供消費者使用。
生產者:
public class Producer { public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { //1.創建消息生產者producer,并制定生產者組名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 producer.setNamesrvAddr("192.168.32.130:9876;192.168.32.131:9876"); //3.啟動producer producer.start(); for (int i = 0; i < 10; i++) { //4.創建消息對象,指定主題Topic、Tag和消息體 /** * 參數一:消息主題Topic * 參數二:消息Tag * 參數三:消息內容 */ Message msg = new Message("DelayTopic", "Tag1", ("Hello World" + i).getBytes()); //設定延遲時間 msg.setDelayTimeLevel(2); //5.發送消息 SendResult result = producer.send(msg); //發送狀態 SendStatus status = result.getSendStatus(); System.out.println("發送結果:" + result); //線程睡1秒 TimeUnit.SECONDS.sleep(1); } //6.關閉生產者producer producer.shutdown(); } }
消費者:
public class Consumer { public static void main(String[] args) throws Exception { //1.創建消費者Consumer,制定消費者組名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 consumer.setNamesrvAddr("192.168.32.130:9876;192.168.32.131:9876"); //3.訂閱主題Topic和Tag consumer.subscribe("DelayTopic", "*"); //4.設置回調函數,處理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { //接受消息內容 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("消息ID:【" + msg.getMsgId() + "】,延遲時間:" + (System.currentTimeMillis() - msg.getStoreTimestamp())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.啟動消費者consumer consumer.start(); System.out.println("消費者啟動"); } }
先啟動消費者,然后啟動生產者發送消息,你會發現,消息發送完成后,消費者并不能立即收到消息,而是需等待延遲時間后才能接收到消息。
執行結果如下,可以看到,延遲時間可能并不一定是設定的 2 秒,這是因為可能一些網絡延遲等原因導致,屬正?,F象。

但是現在RocketMq并不支持任意時間的延時,只支持設置一些固定的延時等級,從1s到2h分別對應著等級1到18。如下:
// org/apache/rocketmq/store/config/MessageStoreConfig.java private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
8、批量消息
上面我們發送消息實際上都是循環一條一條地去發送的,通過批量發送消息可以顯著提高傳遞消息的性能。限制是這些批量消息應該有相同的topic,相同的waitStoreMsgOK,而且不能是延時消息。此外,這一批消息的總大小不應超過4MB。
批量消息實際上就是一次性發送多條消息,代碼跟一條條地發送差別不大,只不過是發送的是集合而已。
生產者:
public class Producer { public static void main(String[] args) throws Exception { //1.創建消息生產者producer,并制定生產者組名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 producer.setNamesrvAddr("192.168.32.130:9876;192.168.32.131:9876"); //3.啟動producer producer.start(); List<Message> msgs = new ArrayList<Message>(); //4.創建消息對象,指定主題Topic、Tag和消息體 /** * 參數一:消息主題Topic * 參數二:消息Tag * 參數三:消息內容 */ Message msg1 = new Message("BatchTopic", "Tag1", ("Hello World" + 1).getBytes()); Message msg2 = new Message("BatchTopic", "Tag1", ("Hello World" + 2).getBytes()); Message msg3 = new Message("BatchTopic", "Tag1", ("Hello World" + 3).getBytes()); msgs.add(msg1); msgs.add(msg2); msgs.add(msg3); //5.發送消息 SendResult result = producer.send(msgs); //發送狀態 SendStatus status = result.getSendStatus(); System.out.println("發送結果:" + result); //線程睡1秒 TimeUnit.SECONDS.sleep(1); //6.關閉生產者producer producer.shutdown(); } }
消費者:
public class Consumer { public static void main(String[] args) throws Exception { //1.創建消費者Consumer,制定消費者組名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 consumer.setNamesrvAddr("192.168.32.130:9876;192.168.32.131:9876"); //3.訂閱主題Topic和Tag consumer.subscribe("BatchTopic", "*"); //4.設置回調函數,處理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { //接受消息內容 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.啟動消費者consumer consumer.start(); System.out.println("消費者啟動"); } }
執行結果:


使用批量發送消息時,一批消息的總大小不應超過4MB,如果消息的總長度可能大于4MB時,這時候最好把消息進行分割。
可使用下面提供的類來實現消息分割:
public class ListSplitter implements Iterator<List<Message>> { private final int SIZE_LIMIT = 1024 * 1024 * 4; private final List<Message> messages; private int currIndex; public ListSplitter(List<Message> messages) { this.messages = messages; } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List<Message> next() { int nextIndex = currIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { Message message = messages.get(nextIndex); int tmpSize = message.getTopic().length() + message.getBody().length; Map<String, String> properties = message.getProperties(); for (Map.Entry<String, String> entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } tmpSize = tmpSize + 20; // 增加日志的開銷20字節 if (tmpSize > SIZE_LIMIT) { //單個消息超過了最大的限制 //忽略,否則會阻塞分裂的進程 if (nextIndex - currIndex == 0) { //假如下一個子列表沒有元素,則添加這個子列表然后退出循環,否則只是退出循環 nextIndex++; } break; } if (tmpSize + totalSize > SIZE_LIMIT) { break; } else { totalSize += tmpSize; } } List<Message> subList = messages.subList(currIndex, nextIndex); currIndex = nextIndex; return subList; } }
通過實現上面的 ListSplitter 類,每次遍歷獲取一個容量大小不超過 4M 的集合,通過發送該集合來實現消息分割:
//把大的消息分裂成若干個小的消息 ListSplitter splitter = new ListSplitter(messages); while (splitter.hasNext()) { try { List<Message> listItem = splitter.next(); producer.send(listItem); } catch (Exception e) { e.printStackTrace(); //處理error } }
9、消息過濾
在消費者消費消息時,可以指定一些條件來過濾消息,rocketmq 支持通過 tag 和 SQL 來過濾消息。
9.1、通過 TAG 過濾消息
每一條消息在發送前都可以指定一個主題 topic 和標簽 tag,在消費者訂閱了指定的主題消費消息時,還可以通過指定 TAG 來指定只消費該 topic 下的某一些 TAG 的消息。
Tag 的作用:消費者訂閱了某個Topic后,消息隊列RocketMQ版會將該Topic中的所有消息投遞給消費端進行消費。若消費者只需要關注部分消息,可通過設置過濾條件在消息隊列RocketMQ版服務端完成消息過濾,只消費需要關注的消息。
使用示例:
//1.創建消費者Consumer,制定消費者組名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 consumer.setNamesrvAddr("192.168.32.130:9876;192.168.32.131:9876"); //3.訂閱主題Topic和Tag。多個 tag 可以用 || 符號分割開,也可以通過 * 表示該主題下是所有 tag consumer.subscribe("FilterTagTopic", "Tag1 || Tag2 ");
9.2、通過 SQL 過濾消息
通過 TAG 可以實現過濾消息,但是一個消息只能有一個標簽,這可能難以滿足一些復雜的場景。此時,我們可以使用SQL表達式來篩選消息。在發送消息時,我們可以通過 putUserProperty 方法來給消息添加屬性并設置,然后在消費時就可以根據這些屬性來寫一些 SQL 來過濾符合條件的消息。
使用示例:
生產者:
public class Producer { public static void main(String[] args) throws Exception { //1.創建消息生產者producer,并制定生產者組名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 producer.setNamesrvAddr("192.168.32.130:9876;192.168.32.131:9876"); //3.啟動producer producer.start(); for (int i = 0; i < 10; i++) { //4.創建消息對象,指定主題Topic、Tag和消息體 /** * 參數一:消息主題Topic * 參數二:消息Tag * 參數三:消息內容 */ Message msg = new Message("FilterSQLTopic", "Tag1", ("Hello World" + i).getBytes()); msg.putUserProperty("i", String.valueOf(i)); //5.發送消息 SendResult result = producer.send(msg); //發送狀態 SendStatus status = result.getSendStatus(); System.out.println("發送結果:" + result); //線程睡1秒 TimeUnit.SECONDS.sleep(2); } //6.關閉生產者producer producer.shutdown(); } }
消費者:
public class Consumer { public static void main(String[] args) throws Exception { //1.創建消費者Consumer,制定消費者組名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 consumer.setNamesrvAddr("192.168.32.130:9876;192.168.32.131:9876"); //3.訂閱主題Topic和Tag consumer.subscribe("FilterSQLTopic", MessageSelector.bySql("i>5")); //4.設置回調函數,處理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { //接受消息內容 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.啟動消費者consumer consumer.start(); System.out.println("消費者啟動"); } }
執行結果:


可以看到,雖然發送了 10 條數據,但是消費者只消費了符合 SQL 條件的其中 4 條數據。
9.2.1、rocketmq支持的SQL語法
RocketMQ只定義了一些基本語法來支持這個特性。你也可以很容易地擴展它。
- 數值比較,比如:>,>=,<,<=,BETWEEN,=;
- 字符比較,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 邏輯符號 AND,OR,NOT;
常量支持類型為:
- 數值,比如:123,3.1415;
- 字符,比如:'abc',必須用單引號包裹起來;
- NULL,特殊的常量
- 布爾值,TRUE 或 FALSE
只有使用push模式的消費者才能用使用SQL92標準的sql語句,接口如下:
public void subscribe(finalString topic, final MessageSelector messageSelector)
9.2.2、解決使用SQL過濾報錯
在使用 SQL 過濾啟動消費者時,可能會提示:The broker does not support consumer to filter message by SQL92 的錯誤,此時需修改 rocketmq 安裝目錄下的 conf/broker.conf 文件,添加 enablePropertyFilter=true 配置。

在兩臺服務器上分別修改該配置文件并重啟 nameserver 和 broker 集群,在 rocketmq-console 上可以看到集群中 enablePropertyFilter 為 true 即說明配置成功,或者直接啟動消費者不報錯即可。

如果仍然不行,則分別需要修改 192.168.32.130 下的 conf/2m-2s-sync/broker-a.properties 和 conf/2m-2s-sync/broker-b-s.properties 配置文件,還有 192.168.32.131 下的 conf/2m-2s-sync/broker-b.properties 和 conf/2m-2s-sync/broker-a-s.properties 配置文件,分別添加 enablePropertyFilter=true 配置,然后重啟 namerserver 和 broker 集群即可。
192.168.32.130 下:
vi /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a.properties
vi /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b-s.properties
192.168.32.131 下:
vi /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b.properties
vi /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a-s.properties
10、事務消息
10.1、事務消息背景
MQ組件是系統架構里必不可少的一門利器,設計層面可以降低系統耦合度,高并發場景又可以起到削峰填谷的作用,從單體應用到集群部署方案,再到現在的微服務架構,MQ憑借其優秀的性能和高可靠性,得到了廣泛的認可。
隨著數據量增多,系統壓力變大,開始出現這種現象:數據庫已經更新了,但消息沒發出來,或者消息先發了,但后來數據庫更新失敗了,結果研發童鞋各種數據修復,這種生產問題出現的概率不大,但讓人很郁悶。這個其實就是數據庫事務與MQ消息的一致性問題,簡單來講,數據庫的事務跟普通MQ消息發送無法直接綁定與數據庫事務綁定在一起,例如上面提及的兩種問題場景:
- 數據庫事務提交后發送MQ消息;
- MQ消息先發,然后再提交數據庫事務。
場景1的問題是數據庫事務可能剛剛提交,服務器就宕機了,MQ消息沒發出去,場景2的問題就是MQ消息發送出去了,但數據庫事務提交失敗,又沒辦法追加已經發出去的MQ消息,結果導致數據沒更新,下游已經收到消息,最終事務出現不一致的情況。
10.2、事務消息的引出
我們以微服務架構的購物場景為例,參照一下RocketMQ官方的例子,用戶A發起訂單,支付100塊錢操作完成后,能得到100積分,賬戶服務和會員服務是兩個獨立的微服務模塊,有各自的數據庫,按照上文提及的問題可能性,將會出現這些情況:
- 如果先扣款,再發消息,可能錢剛扣完,宕機了,消息沒發出去,結果積分沒增加。
- 如果先發消息,再扣款,可能積分增加了,但錢沒扣掉,白送了100積分給人家。
- 錢正常扣了,消息也發送成功了,但會員服務實例消費消息出現問題,結果積分沒增加。

由此引出的是數據庫事務與MQ消息的事務一致性問題,rocketmq事務消息解決的問題:解決本地事務執行與消息發送的原子性問題。這里界限一定要明白,是確保MQ生產端正確無誤地將消息發送出來,沒有多發,也不會漏發。但至于發送后消費端有沒有正常的消費掉(如上面提及的第三種情況,錢正??哿耍⒁舶l了,但下游消費出問題導致積分不對),這種異常場景將由MQ消息消費失敗重試機制來保證,不在此次的討論范圍內。
常用的MQ組件針對此場景都有自己的實現方案,如ActiveMQ使用AMQP協議(二階提交方式)保證消息正確發送,這里我們以RocketMQ為重點進行學習。
10.3、RocketMQ事務消息設計思路
根據CAP理論,RocketMQ事務消息通過異步確保方式,保證事務的最終一致性。設計流程上借鑒兩階段提交理論,流程圖如下:

流程:
- 應用模塊遇到要發送事務消息的場景時,先發送消息給MQ(half 消息)
- 服務端返回消息的寫入結果
- 根據服務器端返回的寫入結果執行數據庫事務(本地事務)(如果寫入失敗,此時 half 消息對業務不可見,本地邏輯也不會執行)
- 根據本地事務執行的結果,再返回 Commit 或 Rollback 給 MQ 服務器
- 如果是Commit,MQ 會把消息下發給 Consumer 端;如果是Rollback,MQ會直接刪掉該消息
- 圖中第3步即本地事務執行后如果沒響應給 MQ 服務器端,或是超時的,啟動定時任務回查事務狀態(最多重試15次,超過了默認丟棄此消息),處理結果同圖中第4步。
MQ消費的成功機制由 MQ 自己保證。
可參考:http://www.rzrgm.cn/huangying2124/p/11702761.html#top
事務補償:對沒有Commit/Rollback的事務消息(pending狀態的消息),從服務端發起“回查, Producer收到回查消息,檢查回查消息對應的本地事務的狀態,根據本地事務狀態,重新Commit或者Rollback。補償階段用于解決消息Commit或者Rollback發生超時或者失敗的情況。
10.4、事務消息狀態
事務消息共有三種狀態,提交狀態、回滾狀態、中間狀態:
- TransactionStatus.CommitTransaction:提交事務,它允許消費者消費此消息。
- TransactionStatus.RollbackTransaction:回滾事務,它代表該消息將被刪除,不允許被消費。
- TransactionStatus.Unknown:中間狀態,它代表需要檢查消息隊列來確定狀態。
10.5、代碼實現
生產者:
/** * 發送同步消息 */ public class Producer { public static void main(String[] args) throws Exception { //1.創建消息生產者producer,并制定生產者組名。注意,這里使用的是TransactionMQProducer來創建 TransactionMQProducer producer = new TransactionMQProducer("group5"); //2.指定Nameserver地址 producer.setNamesrvAddr("192.168.32.130:9876;192.168.32.131:9876"); //添加事務監聽器。在事務監聽器里面執行本地事務,執行完后再確認是否要發送commit或者rollback給MQ服務器 producer.setTransactionListener(new TransactionListener() { /** * 在該方法中執行本地事務 * @param msg * @param arg * @return */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { if (StringUtils.equals("TAGA", msg.getTags())) { return LocalTransactionState.COMMIT_MESSAGE; //commit } else if (StringUtils.equals("TAGB", msg.getTags())) { return LocalTransactionState.ROLLBACK_MESSAGE; //rollback } else if (StringUtils.equals("TAGC", msg.getTags())) { return LocalTransactionState.UNKNOW; //什么都不做 } return LocalTransactionState.UNKNOW; } /** * 該方法是MQ回查事務狀態時的處理 * (上面在TAGC消息的處理中我們都沒做,所以這里實際上會匹配到TAGC的消息) * @param msg * @return */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("消息的Tag:" + msg.getTags()); return LocalTransactionState.COMMIT_MESSAGE; //最后commit } }); //3.啟動producer producer.start(); String[] tags = {"TAGA", "TAGB", "TAGC"}; for (int i = 0; i < 3; i++) { //4.創建消息對象,指定主題Topic、Tag和消息體 /** * 參數一:消息主題Topic * 參數二:消息Tag * 參數三:消息內容 */ Message msg = new Message("TransactionTopic", tags[i], ("Hello World" + i).getBytes()); //5.發送消息。這里會先發送,發送完成后才會執行事務監聽器 SendResult result = producer.sendMessageInTransaction(msg, null); //發送狀態 SendStatus status = result.getSendStatus(); System.out.println("發送結果:" + result); //線程睡1秒 TimeUnit.SECONDS.sleep(1); } //6.關閉生產者producer。由于MQ服務器在未收到生產者確認消息后會回查,所以啟動生產者后先不要關閉 //producer.shutdown(); } }
消費者:
/** * 消息的接受者 */ public class Consumer { public static void main(String[] args) throws Exception { //1.創建消費者Consumer,制定消費者組名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 consumer.setNamesrvAddr("192.168.32.130:9876;192.168.32.131:9876"); //3.訂閱主題Topic和Tag consumer.subscribe("TransactionTopic", "*"); //4.設置回調函數,處理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { //接受消息內容 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.啟動消費者consumer consumer.start(); System.out.println("消費者啟動"); } }
最終執行結果可以看到消費者消費了 TAGA 和 TAGC 的消息,但是沒有消費 TAGB 的消息,因為 TAGB 被回滾了。
10.6、使用限制
事務消息有以下使用限制:
- 事務消息不支持延時消息和批量消息。
- 為了避免單個消息被檢查太多次而導致半隊列消息累積,我們默認將單個消息的檢查次數限制為 15 次,但是用戶可以通過 Broker 配置文件的
transactionCheckMax參數來修改此限制。如果已經檢查某條消息超過 N 次的話( N =transactionCheckMax) 則 Broker 將丟棄此消息,并在默認情況下同時打印錯誤日志。用戶可以通過重寫AbstractTransactionCheckListener類來修改這個行為。 - 事務消息將在 Broker 配置文件中的參數 transactionMsgTimeout 這樣的特定時間長度之后回查。當發送事務消息時,用戶還可以通過設置用戶屬性 CHECK_IMMUNITY_TIME_IN_SECONDS 來改變這個限制,該參數優先于
transactionMsgTimeout參數。 - 事務性消息可能不止一次被檢查或消費。
- 提交給用戶的目標主題消息可能會失敗,目前這依日志的記錄而定。它的高可用性通過 RocketMQ 本身的高可用性機制來保證,如果希望確保事務消息不丟失、并且事務完整性得到保證,建議使用同步的雙重寫入機制。
- 事務消息的生產者 ID 不能與其他類型消息的生產者 ID 共享。與其他類型的消息不同,事務消息允許反向查詢、MQ服務器能通過它們的生產者 ID 查詢到消費者。

浙公網安備 33010602011771號