<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      RabbitMQ常見問題

      RabbitMQ

      1、記一次線上RabbitMQ的堵塞問題

      當時解決問題參考的文檔:https://www.codenong.com/cs109484329/

      1、背景

      RabbitMQ同步外省市運單到本系統中

      2、問題

      某天早上上班,發現運維群里有很多企業反饋,在系統中查不到自己最新的運單了,當時我趕到公司,打開系統一看,昨天晚上10:30點之后,運單就沒有新增了,當時我首先想到的是交通部那邊沒有將運單推送到MQ的隊列中,我打開服務器log,發現沒有MQ消費相關的log,最早的MQ消費log還是昨天晚上10:30左右的。當時我趕緊讓運維同事看了一下,發現MQ中堆積了一兩千個消息沒有消費。

      并且再10:30左右的log出現了很多報錯log

      經過分析錯誤log,發現昨天晚上10:30左右,MQ消費的消息報錯了(業務報錯)

      3、解決

      1、根據報錯原因,解決報錯

      解決報錯之后,發布線上,發現消費端又開始正常消費了,經過一天的觀察,MQ消費正常

      4、問題原因

      后面有時間的時候,想了一下MQ不消費的原因:

      MQ設置的是手動應答,由于消費端消費報錯,沒有正常ack,導致MQ中出現了很多unacked的消息。這個時候MQ會認為消費端已經沒有能力去消費消息了,就不會再發送消息給消費者了,但是消息生產者繼續將消息推送到MQ中,導致ready消息越來越多,但又不消費了,就導致了消息堵塞。

      上面的其實是MQ的一種保護消費者的機制:QOS(服務質量保證)

      5、QOS(服務質量保證)

      在手動應答模式下啟用, 在消費端出現大量報錯,無法正常ack的情況下,MQ出現一定數量unacked,MQ為了保護消費端不在報錯,MQ將不在發送消息給消費者,進而保護消費端服務的正常運行。

      可以通過設置參數:PrefetchCount(spring.rabbitmq.listener.simple.prefetch),來設置MQ支持的最大未正常確認消息數量。

      spring:
        # 消息隊列
        rabbitmq:
          host: 1.1.1.1
          port: 5672
          username: 1
          password: 1
          #虛擬主機,用于隔離業務
          virtual-host: 1
          # 消息發送確認
          publisher-confirm-type: correlated
          # 開啟發送失敗退回
          publisher-returns: true
          listener:
            simple:
              # 消費端最小并發數
              concurrency: 1
              # 消費端最大并發數
              max-concurrency: 5
              # 一次請求中預處理的消息數量
              prefetch: 2
              # 手動應答
              acknowledge-mode: manual
              # 重試配置
              retry:
                enabled: true
                max-attempts: 3
                initial-interval: 5000ms
                max-interval: 1200000ms
                multiplier: 2
      

      如上面Prefetch=2,那么當有兩個消息沒有正常ack的時候,MQ就會不再發送消息了

      6、為什么重啟之后,消息又正常消費了呢

      因為重啟之后,unacked的消息,會重新會排到隊列開頭重新被消費,那么后面正常的消息就能繼續被推送

      7、如何判斷是否又堵塞的風險

      參考:https://www.codenong.com/cs109484329/

      堵塞是因為unacked數量達到了限制

      允許出現unacked的數量可以通過channelCount * prefetchCount * 節點數量 得出。

      channlCount就是由concurrency,max-concurrency決定的。

      所以

      min = concurrency * prefetch * 節點數量
      max = max-concurrency * prefetch * 節點數量
      

      結論

      unacked_msg_count < min 隊列不會阻塞。但需要及時處理unacked的消息。
      unacked_msg_count >= min 可能會出現堵塞。
      unacked_msg_count >= max 隊列一定阻塞
      

      消費者消費MQ消息,有一個緩沖池,會一下拉一批消息到緩沖池中,消費者從緩沖池中消費消息,緩沖池大小=max-concurrency(最大并發數) * prefetch(一次預處理消費的消息數)

      消息再緩沖池中,屬于待消費的消息,也就是unacked狀態,所以緩沖池中的消息數量=unacked最大數量,如果unacked超過這個值,會觸發QPS保護

      如:max-concurrency=5,prefetch=20

      max-concurrency:最大并發送,如設置5,那么MQ消費者就有5個

      8、事故重現

      1、環境

      1、生產者

      @PostMapping(value = "/pushOkMsg")
      public R<String> pushOkMsg(@RequestParam(value = "num")Integer num,@RequestParam(value = "msg")String msg){
          for (int integer = 0; integer < num; integer++) {
              String msgId = UUID.randomUUID().toString().toLowerCase().replaceAll("-", "");
              CorrelationData correlationData = new CorrelationData(msgId);
              rabbitTemplate.convertAndSend(MqConfigV2.TEST_QUEUE_KEY_V1,msg.getBytes(StandardCharsets.UTF_8),correlationData);
          }
          return R.ok("success!!!");
      }
      

      2、生產者配置

      3、隊列

      4、消費者

      @RabbitListener(queues = MqConfigV2.TEST_QUEUE_KEY_V1,containerFactory = "customContainerFactory")
      @RabbitHandlerpublic void test4(Message message, Channel channel, @Headers Map<String, Object> heads) throws Exception {
          String data = new String(message.getBody(), StandardCharsets.UTF_8);
          System.out.println(MqConfigV2.BASE_YD_QUEUE+" 消息接收=" + data);
          if("error".equals(data)){
              throw new RuntimeException("系統報錯了!!!");
          }
          //模擬業務處理
          Thread.sleep(1000);
          channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
      }
      

      2、重現

      1、先發送10條正常消息

      系統正常

      2、發送一條error消息

      系統報錯,消費失敗,未正常ack,mq中出現一條unacked

      3、再發送10條正常消息

      由于只有一條unacked消息,小于配置的prefetch=2

      系統正常消費

      4、再發送一條error

      MQ出現兩條unacked

      5、發送10條正常的消息

      消費者不消費了,MQ消息也堵塞了,因為unacked=2,大于等于prefetch=2

      問題重現成功了

      其他博客的描述

      posted @ 2025-04-14 10:19  wushusong  閱讀(155)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 偷拍精品一区二区三区| 最新日韩精品中文字幕| 日日碰狠狠躁久久躁96avv| 黄色免费在线网址| 精品无码久久久久国产电影| 好男人社区影视在线WWW| 男女扒开双腿猛进入爽爽免费看| 性动态图无遮挡试看30秒| 国产成人一区二区三区免费| 精品无码av无码专区| 无码人妻斩一区二区三区| 美女无遮挡免费视频网站| 亚洲国产精品无码久久久秋霞1| 国产高清小视频一区二区| 色成年激情久久综合国产| 九九热爱视频精品| 视频一区二区三区自拍偷拍| 久久精品这里热有精品| 亚洲鸥美日韩精品久久| 亚洲中文一区二区av| 久久久久久伊人高潮影院| 亚洲av鲁丝一区二区三区黄| 亚洲精品自拍视频在线看| 午夜夫妻试看120国产| 精品中文人妻在线不卡| 毛片网站在线观看| 欧美黑人大战白嫩在线| 欧洲一区二区中文字幕| 成全我在线观看免费第二季| 国产AV无码专区亚洲AV潘金链| 保靖县| 国产精品综合一区二区三区| 毛片无遮挡高清免费| 九九在线精品国产| 久久99精品国产麻豆婷婷| 亚洲高清WWW色好看美女| 盐池县| 国产精品亚洲专区无码导航| 日本大片免A费观看视频三区| 靖安县| 少妇人妻偷人精品免费|