RabbitMQ常見問題
RabbitMQ
1、記一次線上RabbitMQ的堵塞問題
當時解決問題參考的文檔:https://www.codenong.com/cs109484329/
1、背景
RabbitMQ同步外省市運單到本系統中
2、問題
某天早上上班,發現運維群里有很多企業反饋,在系統中查不到自己最新的運單了,當時我趕到公司,打開系統一看,昨天晚上10:30點之后,運單就沒有新增了,當時我首先想到的是交通部那邊沒有將運單推送到MQ的隊列中,我打開服務器log,發現沒有MQ消費相關的log,最早的MQ消費log還是昨天晚上10:30左右的。當時我趕緊讓運維同事看了一下,發現MQ中堆積了一兩千個消息沒有消費。

并且再10:30左右的log出現了很多報錯log
經過分析錯誤log,發現昨天晚上10:30左右,MQ消費的消息報錯了(業務報錯)
3、解決
1、根據報錯原因,解決報錯
解決報錯之后,發布線上,發現消費端又開始正常消費了,經過一天的觀察,MQ消費正常
4、問題原因
后面有時間的時候,想了一下MQ不消費的原因:
MQ設置的是手動應答,由于消費端消費報錯,沒有正常ack,導致MQ中出現了很多unacked的消息。這個時候MQ會認為消費端已經沒有能力去消費消息了,就不會再發送消息給消費者了,但是消息生產者繼續將消息推送到MQ中,導致ready消息越來越多,但又不消費了,就導致了消息堵塞。
上面的其實是MQ的一種保護消費者的機制:QOS(服務質量保證)
5、QOS(服務質量保證)
在手動應答模式下啟用, 在消費端出現大量報錯,無法正常ack的情況下,MQ出現一定數量unacked,MQ為了保護消費端不在報錯,MQ將不在發送消息給消費者,進而保護消費端服務的正常運行。
可以通過設置參數:PrefetchCount(spring.rabbitmq.listener.simple.prefetch),來設置MQ支持的最大未正常確認消息數量。
spring:
# 消息隊列
rabbitmq:
host: 1.1.1.1
port: 5672
username: 1
password: 1
#虛擬主機,用于隔離業務
virtual-host: 1
# 消息發送確認
publisher-confirm-type: correlated
# 開啟發送失敗退回
publisher-returns: true
listener:
simple:
# 消費端最小并發數
concurrency: 1
# 消費端最大并發數
max-concurrency: 5
# 一次請求中預處理的消息數量
prefetch: 2
# 手動應答
acknowledge-mode: manual
# 重試配置
retry:
enabled: true
max-attempts: 3
initial-interval: 5000ms
max-interval: 1200000ms
multiplier: 2
如上面Prefetch=2,那么當有兩個消息沒有正常ack的時候,MQ就會不再發送消息了
6、為什么重啟之后,消息又正常消費了呢
因為重啟之后,unacked的消息,會重新會排到隊列開頭重新被消費,那么后面正常的消息就能繼續被推送
7、如何判斷是否又堵塞的風險
參考:https://www.codenong.com/cs109484329/
堵塞是因為unacked數量達到了限制
允許出現unacked的數量可以通過channelCount * prefetchCount * 節點數量 得出。
channlCount就是由concurrency,max-concurrency決定的。
所以
min = concurrency * prefetch * 節點數量
max = max-concurrency * prefetch * 節點數量
結論
unacked_msg_count < min 隊列不會阻塞。但需要及時處理unacked的消息。
unacked_msg_count >= min 可能會出現堵塞。
unacked_msg_count >= max 隊列一定阻塞
消費者消費MQ消息,有一個緩沖池,會一下拉一批消息到緩沖池中,消費者從緩沖池中消費消息,緩沖池大小=max-concurrency(最大并發數) * prefetch(一次預處理消費的消息數)
消息再緩沖池中,屬于待消費的消息,也就是unacked狀態,所以緩沖池中的消息數量=unacked最大數量,如果unacked超過這個值,會觸發QPS保護
如:max-concurrency=5,prefetch=20


max-concurrency:最大并發送,如設置5,那么MQ消費者就有5個


8、事故重現
1、環境
1、生產者
@PostMapping(value = "/pushOkMsg")
public R<String> pushOkMsg(@RequestParam(value = "num")Integer num,@RequestParam(value = "msg")String msg){
for (int integer = 0; integer < num; integer++) {
String msgId = UUID.randomUUID().toString().toLowerCase().replaceAll("-", "");
CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend(MqConfigV2.TEST_QUEUE_KEY_V1,msg.getBytes(StandardCharsets.UTF_8),correlationData);
}
return R.ok("success!!!");
}
2、生產者配置

3、隊列

4、消費者
@RabbitListener(queues = MqConfigV2.TEST_QUEUE_KEY_V1,containerFactory = "customContainerFactory")
@RabbitHandlerpublic void test4(Message message, Channel channel, @Headers Map<String, Object> heads) throws Exception {
String data = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println(MqConfigV2.BASE_YD_QUEUE+" 消息接收=" + data);
if("error".equals(data)){
throw new RuntimeException("系統報錯了!!!");
}
//模擬業務處理
Thread.sleep(1000);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
2、重現
1、先發送10條正常消息
系統正常



2、發送一條error消息
系統報錯,消費失敗,未正常ack,mq中出現一條unacked



3、再發送10條正常消息
由于只有一條unacked消息,小于配置的prefetch=2
系統正常消費



4、再發送一條error
MQ出現兩條unacked



5、發送10條正常的消息
消費者不消費了,MQ消息也堵塞了,因為unacked=2,大于等于prefetch=2
問題重現成功了



其他博客的描述


浙公網安備 33010602011771號