RocketMQ_01筆記
1 概述
? MQ(Message Queue)消息隊列,是一種用來保存消息數據的隊列
? 隊列:數據結構的一種,特征為 “先進先出”


2.MQ 的作用
-
應用解耦(技術上必須弄好才能使用MQ )
-
快速應用變更維護
-
流量削鋒(削峰填谷)
3.MQ的優缺點
缺點:
1系統可用性降低: 集群
2系統復雜度提高:(程序員提升水平)
3異步消息機制(都有解決方案)
消息順序性
消息丟失
消息一致性
消息重復使用
4.常見產品
ActiveMQ:java語言實現,萬級數據吞吐量,處理速度ms級,主從架構,成熟度高
RabbitMQ :erlang語言實現,萬級數據吞吐量,處理速度us級,主從架構,
RocketMQ :java語言實現,十萬級數據吞吐量,處理速度ms級,分布式架構,功能強大,擴展性強
kafka :scala語言實現,十萬級數據吞吐量,處理速度ms級,分布式架構,功能較少,應用于大數據較多
簡介
RocketMQ是阿里開源的一款非常優秀中間件產品,脫胎于阿里的另一款隊列技術MetaQ,后捐贈給Apache基金會作為一款孵化技術,僅僅經歷了一年多的時間就成為Apache基金會的頂級項目。并且它現在已經在阿里內部被廣泛的應用,并且經受住了多次雙十一的這種極致場景的壓力(2017年的雙十一,RocketMQ流轉的消息量達到了萬億級,峰值TPS達到5600萬)
解決所有缺點
Windows下安裝
參考文章
1、rocketMQ啟動時出現找不到或無法加載主類
http://www.rzrgm.cn/ckfeng/p/14658570.html
2、windows下RocketMQ下載安裝教程
https://blog.csdn.net/qq_40420367/article/details/105575579
本機按照此文章安裝
修改一行 為 set "JAVA_OPT=%JAVA_OPT% -cp "%CLASSPATH%""
3、 Windows 下安裝 RocketMQ https://blog.csdn.net/zzvar/article/details/119873053
rocketMQ 不能放在有空格的目錄下
myps: 特別注意: rocketMQ 不能放在有空格的目錄下
5.linux下安裝

jdk
1)解壓 jdk
tar -zxvf jdk-8u171-linux-x64.tar.gz
2)配置環境變量
>vim /etc/profile
export JAVA_HOME=/opt/jdk1.8.0_171
export PATH=$PATH:${JAVA_HOME}/bin
3)重新加載配置
>source /etc/profile
>java -version
錯誤解決
如果安裝完畢 jdk 后 java -version 看到的是 openjdk(需要刪除)
因為 操作系統默認已經安裝了 opendjdk,
# 查看
rpm -qa | grep java
# 刪除(把上一個命令看到的所有的jdk文件 用 如下命令刪除)
rpm -e --nodeps java-1.8.0-openjdk-1.8.0.232.b09-0.el7_7.x86_64
rpm -e --nodeps java-1.8.0-openjdk-headless-1.8.0.232.b09-0.el7_7.x86_64
rpm -e --nodeps java-1.7.0-openjdk-headless-1.7.0.241-2.6.20.0.el7_7.x86_64
rmp -e --nodeps java-1.7.0-openjdk-1.7.0.241-2.6.20.0.el7_7.x86_64
rpm -e --nodeps java-1.7.0-openjdk-1.7.0.241-2.6.20.0.el7_7.x86_64

rocketMQ
# 解壓
unzip rocketmq-all-4.5.2-bin-release.zip
# 修改目錄名稱
mv rocketmq-all-4.5.2-bin-release rocketmq
# 調整啟動內存 為128m
runserver.sh
runbroker.sh
如果和后天的課程(docker 一起 需要修改)
conf/broker.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUS
# 解決和docker 沖突的
brokerIP1=192.168.31.80
namesrvAddr=192.168.31.80:9876
啟動
#啟動nameserv
sh mqnamesrv
# 啟動mq 服務 -n 指定 nameserv 的地址(bin)
sh mqbroker -n localhost:9876 -c ../conf/broker.conf
# 關閉防火墻
systemctl stop firewalld.service
測試
export NAMESRV_ADDR=localhost:9876
bin 目錄
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
6.使用
6.1 負載均衡
環境搭建
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
消息

發送單條消息
public static void main(String[] args) throws Exception {
//1.創建一個發送消息的對象Producer
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.設定發送的命名服務器地址 , 因為需要根據命名服務器(相當于注冊中心)查找消費者集群和Broker,且把消息放在Broker的隊列里面
producer.setNamesrvAddr("192.168.31.80:9876");
//3.1啟動發送的服務
producer.start();
//4.創建要發送的消息對象,指定topic,指定內容body
Message msg = new Message("topic1","hello rocketmq".getBytes("UTF-8"));
//4.2發送消息
SendResult result = producer.send(msg);
System.out.println("返回結果:"+result);
//5.關閉連接
producer.shutdown();
}
發送多條消息
for (int i = 1; i <= 10; i++) {
Message msg = new Message("topic1",("生產者2: hello rocketmq "+i).getBytes("UTF-8"));
SendResult result = producer.send(msg);
System.out.println("返回結果:"+result);
}
消費者
public static void main(String[] args) throws Exception {
//1.創建一個接收消息的對象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.設定接收的命名服務器地址
consumer.setNamesrvAddr("192.168.31.80:9876");
//3.設置接收消息對應的topic,對應的sub標簽為任意*
consumer.subscribe("topic1","*");
//3.開啟監聽,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//遍歷消息
for(MessageExt msg : list){
// System.out.println("收到消息:"+msg);
System.out.println("消息:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功處理, mq 收到這個 標記后相同的消息講不會再次發給消費者
}
});
//4.啟動接收消息的服務
consumer.start();// 開啟多線程 監控消息,持續運行
System.out.println("接收消息服務已開啟運行");
}
6.2 廣播模式
發送者
同上
消費者
//1.創建一個接收消息的對象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
System.out.println(consumer.getInstanceName());
//consumer.setInstanceName("instance01");
//2.設定接收的命名服務器地址
consumer.setNamesrvAddr("192.168.31.80:9876");
//3.設置接收消息對應的topic,對應的sub標簽為任意*
consumer.subscribe("topic1","*");
//設置當前消費者的消費模式(默認模式:負載均衡)
// consumer.setMessageModel(MessageModel.CLUSTERING);
//設置當前消費者的消費模式為廣播模式:所有客戶端接收的消息都是一樣的
consumer.setMessageModel(MessageModel.BROADCASTING);
//3.開啟監聽,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//遍歷消息
for(MessageExt msg : list){
// System.out.println("收到消息:"+msg);
System.out.println("消費者1:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//4.啟動接收消息的服務
consumer.start();
System.out.println("接收消息服務已開啟運行");
廣播模式的現象
1) 如果 生產者先發送消息, 后啟動消費者, 消息只能被消費一次
2) 如果多個消費者先啟動(廣播模式),后發消息,才有廣播的效果
結論:
必須先啟動消費者再啟動發送者才有廣播的效果
6.3 發送者發送消息的類型 三種
- 同步消息發送
- 特征:即時性較強,重要的消息,且必須有回執的消息,例如短信,通知(轉賬成功)
- 異步消息發送
- 特征:即時性較弱,但需要有回執的消息,例如訂單中的某些信息
- 單向消息:
- 特征:不需要有回執的消息,例如日志類消息
// 測試消息的種類
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.31.80:9876");
producer.start();
for (int i = 1; i <= 5; i++) {
//同步消息發送
Message msg = new Message("topic2",("同步消息:hello rocketmq "+i).getBytes("UTF-8"));
SendResult result = producer.send(msg);
System.out.println("返回結果:"+result);
//異步消息發送
Message msg2 = new Message("topic2",("異步消息:hello rocketmq "+i).getBytes("UTF-8"));
producer.send(msg, new SendCallback() {
//表示成功返回結果
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
//表示發送消息失敗
public void onException(Throwable t) {
System.out.println(t);
}
});
//發送單向消息(場景:比如:記錄日志等)
Message msg3 = new Message("topic2",("單向消息:hello rocketmq "+i).getBytes("UTF-8"));
producer.sendOneway(msg);// 和第一種的同步消息類似,不同的是沒有返回結果
}
//添加一個休眠操作,確保異步消息返回后能夠輸出(不然就producer.shutdown()了)
TimeUnit.SECONDS.sleep(10);
producer.shutdown();
}
小結:

6.4 延時消息
立刻發送, 只是 告訴MQ ,消息隱藏一段時間再暴露
應用場景
下訂單時 網mq 發一個取消訂單消息 (訂單號 30分鐘演示)
30分鐘后,消費者能看到這個消息,開始處理取消訂單(如果沒付費)
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.31.80:9876");
producer.start();
for (int i = 1; i <= 5; i++) {
Message msg = new Message("topic3",("非延時消息:hello rocketmq "+i).getBytes("UTF-8"));
// 30秒后再發送,而是先發送,但是通知mq , 30s 才對外暴露數據
//設置當前消息的延時效果(比如訂單,下訂單后,20分鐘后,決定這個訂單是否刪除,)
msg.setDelayTimeLevel(3);// 注意,這個參數是等級,代表30秒
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
SendResult result = producer.send(msg);
System.out.println("返回結果:"+result);
}
producer.shutdown();
}
小結:
- 消息發送時并不直接發送到消息服務器,而是根據設定的等待時間到達,起到延時到達的緩沖作用
Message msg = new Message("topic3",("延時消息:hello rocketmq "+i).getBytes("UTF-8"));
//設置當前消息的延時效果
msg.setDelayTimeLevel(3);
SendResult result = producer.send(msg);
System.out.println("返回結果:"+result);
- 目前支持的消息時間
- 秒級:1,5,10,30
- 分級:1~10,20,30
- 時級:1,2
- 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 1sh0m 20m 30m 1h 2h(按順序查出等級數即可)
6.5 批量消息
// 創建一個消息集合即可
List<Message> msgList = new ArrayList<Message>();
SendResult send = producer.send(msgList);
注意事項:
消息內容總長度不超過4M
消息內容總長度包含如下:
topic(字符串字節數)
body (字節數組長度)
消息追加的屬性(key與value對應字符串字節數)
日志(固定20字節)
6.6、消息過濾
- 分類過濾
- 語法過濾(SQL過濾)
1、分類過濾--Tag
發送者
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.31.80:9876");
producer.start();
//創建消息的時候除了制定topic,還可以指定tag
Message msg = new Message("topic6","tag2",("消息過濾按照tag:hello rocketmq 2").getBytes("UTF-8"));
SendResult send = producer.send(msg);
System.out.println(send);
producer.shutdown();
}
消費者
*代表任意tag
"tag1 || tag2" 代表兩個 tag 都行
//接收消息的時候,除了指定topic,還可以指定接收的tag,*代表任意tag
consumer.subscribe("topic6","tag1 || tag2");
2、消息過濾--sql(其他叫法:屬性過濾/語法過濾/SQL過濾)
生產者
//為消息添加屬性
msg.putUserProperty("vip","1");
msg.putUserProperty("age","20");
消費者
//使用消息選擇器來過濾對應的屬性,語法格式為類SQL語法
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
注意:SQL過濾需要依賴服務器的功能支持,需要在broker配置文件中添加對應的功能項,并開啟對應功能
enablePropertyFilter=true
需要啟動服務器時加載這個配置文件
sh mqbroker -n localhost:9876 -c ../conf/broker.conf
6.7 順序消息
默認情況下,MQ 開啟了多個隊列, 同時發送多個消息的的話,發送給那個隊列是不確定的,同時消息的消費者讀取消息,每讀取一個消息開啟一個線程,也不能保證消息的順序性,

想要保證消息的有序性,需要指定消息的隊列,同時 消息的消費者應該一個隊列開啟一個線程進行接收而不是一個消息一個線程)
發送者
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.31.80:9876");
producer.start();
//創建要執行的業務隊列
List<Order> orderList = new ArrayList<Order>();
Order order11 = new Order();
order11.setId("a");
order11.setMsg("主單-1");
orderList.add(order11);
Order order12 = new Order();
order12.setId("a");
order12.setMsg("子單-2");
orderList.add(order12);
Order order13 = new Order();
order13.setId("a");
order13.setMsg("支付-3");
orderList.add(order13);
Order order14 = new Order();
order14.setId("a");
order14.setMsg("推送-4");
orderList.add(order14);
Order order21 = new Order();
order21.setId("b");
order21.setMsg("主單-1");
orderList.add(order21);
Order order22 = new Order();
order22.setId("b");
order22.setMsg("子單-2");
orderList.add(order22);
Order order31 = new Order();
order31.setId("c");
order31.setMsg("主單-1");
orderList.add(order31);
Order order32 = new Order();
order32.setId("c");
order32.setMsg("子單-2");
orderList.add(order32);
Order order33 = new Order();
order33.setId("c");
order33.setMsg("支付-3");
orderList.add(order33);
//設置消息進入到指定的消息隊列中
for(final Order order : orderList){
Message msg = new Message("orderTopic",order.toString().getBytes());
//發送時要指定對應的消息隊列選擇器
SendResult result = producer.send(msg, new MessageQueueSelector() {
//設置當前消息發送時使用哪一個消息隊列
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
System.out.println(list.size());// 數量只能通過修改 mq 的配置 改變(阿里開發團隊認為,這個是敏感資源需要服務器管理員控制,而不是編碼人員控制)
//根據發送的信息不同,選擇不同的消息隊列
//根據id來選擇一個消息隊列的對象,并返回->id得到int值
int mqIndex = order.getId().hashCode() % list.size();
return list.get(mqIndex);
}
}, null);
System.out.println(result);
}
producer.shutdown();
}
接受者
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.31.80:9876");
consumer.subscribe("orderTopic","*");
//應該使用單線程的模式從消息隊列中取數據,一個線程綁定一個消息隊列(否則使用多線程順序還是亂)
consumer.registerMessageListener(new MessageListenerOrderly() {
//使用MessageListenerOrderly接口后,對消息隊列的處理由一個消息隊列多個線程服務,轉化為一個消息隊列一個線程服務(myps:相對于這個隊列就是一個單線程模式了)
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
for(MessageExt msg : list){
System.out.println(Thread.currentThread().getName()+" 消息:"+new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("接收消息服務已開啟運行");
}
6.8 事務消息
RocketMQ 也允許我們像mysql 一樣發送具有事務特征的消息
MQ 的事務流程(本地代碼正常執行)

MQ 的消息補償過程(當本地代碼執行失敗時)

MQ 事務消息的三種狀態
?提交狀態:允許進入隊列,此消息與非事務消息無區別
?回滾狀態:不允許進入隊列,此消息等同于未發送過
?中間狀態:完成了 half 消息的發送,未對 MQ 進行二次狀態確認(未知狀態)
注意:事務消息僅與生產者有關,與消費者無關
生產者代碼代碼
public static void main1(String[] args) throws Exception {
//事務消息使用的生產者是TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("192.168.184.128:9876");
//添加本地事務對應的監聽
producer.setTransactionListener(new TransactionListener() {
//正常事務過程
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// 此處寫本地事務處理業務
// 如果成功,消息改為提交,如果失敗改為 回滾,如果是多線程處理狀態未知,就提交為未知等待事務補償過程
//事務提交狀態
return LocalTransactionState.COMMIT_MESSAGE;// 類似于msql 的 commit
}
//事務補償過程
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
return null;
}
});
producer.start();
Message msg = new Message("topic8",("事務消息:hello rocketmq ").getBytes("UTF-8"));
SendResult result = producer.sendMessageInTransaction(msg,null);
System.out.println("返回結果:"+result);
producer.shutdown();
}
補償代碼
public static void main(String[] args) throws Exception {
//事務消息使用的生產者是TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("192.168.184.128:9876");
//添加本地事務對應的監聽
producer.setTransactionListener(new TransactionListener() {
//正常事務過程
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//中間狀態
return LocalTransactionState.UNKNOW;
}
//事務補償過程
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("事務補償過程執行");
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
Message msg = new Message("topic11",("事務消息:hello rocketmq ").getBytes("UTF-8"));
SendResult result = producer.sendMessageInTransaction(msg,null);
System.out.println("返回結果:"+result);
//事務補償過程必須保障服務器在運行過程中,否則將無法進行正常的事務補償
// producer.shutdown();
}


浙公網安備 33010602011771號