<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      解決MQ消息丟失問題的5種方案

      前言

      今天我們來聊聊一個(gè)讓很多開發(fā)者頭疼的話題——MQ消息丟失問題。

      有些小伙伴在工作中,一提到消息隊(duì)列就覺得很簡單,但真正遇到線上消息丟失時(shí),排查起來卻讓人抓狂。

      其實(shí),我在實(shí)際工作中,也遇到過MQ消息丟失的情況。

      今天這篇文章,專門跟大家一起聊聊這個(gè)話題,希望對你會有所幫助。

      一、消息丟失的三大環(huán)節(jié)

      在深入解決方案之前,我們先搞清楚消息在哪幾個(gè)環(huán)節(jié)可能丟失:

      1. 生產(chǎn)者發(fā)送階段

      • 網(wǎng)絡(luò)抖動導(dǎo)致發(fā)送失敗
      • 生產(chǎn)者宕機(jī)未發(fā)送
      • Broker處理失敗未返回確認(rèn)

      2. Broker存儲階段

      • 內(nèi)存消息未持久化,重啟丟失
      • 磁盤故障導(dǎo)致數(shù)據(jù)丟失
      • 集群切換時(shí)消息丟失

      3. 消費(fèi)者處理階段

      • 自動確認(rèn)模式下處理異常
      • 消費(fèi)者宕機(jī)處理中斷
      • 手動確認(rèn)但忘記確認(rèn)

      理解了問題根源,接下來我們看5種實(shí)用的解決方案。

      二、方案一:生產(chǎn)者確認(rèn)機(jī)制

      核心原理

      生產(chǎn)者發(fā)送消息后等待Broker確認(rèn),確保消息成功到達(dá)。

      這是防止消息丟失的第一道防線。

      關(guān)鍵實(shí)現(xiàn)

      // RabbitMQ生產(chǎn)者確認(rèn)配置
      @Bean
      public RabbitTemplate rabbitTemplate() {
          RabbitTemplate template = new RabbitTemplate(connectionFactory);
          template.setConfirmCallback((correlationData, ack, cause) -> {
              if (ack) {
                  // 消息成功到達(dá)Broker
                  messageStatusService.markConfirmed(correlationData.getId());
              } else {
                  // 發(fā)送失敗,觸發(fā)重試
                  retryService.scheduleRetry(correlationData.getId());
              }
          });
          return template;
      }
      
      // 可靠發(fā)送方法
      public void sendReliable(String exchange, String routingKey, Object message) {
          String messageId = generateId();
          // 先落庫保存發(fā)送狀態(tài)
          messageStatusService.saveSendingStatus(messageId, message);
          
          // 發(fā)送持久化消息
          rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
              msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
              msg.getMessageProperties().setMessageId(messageId);
              return msg;
          }, new CorrelationData(messageId));
      }
      

      適用場景

      • 對消息可靠性要求高的業(yè)務(wù)
      • 金融交易、訂單處理等關(guān)鍵業(yè)務(wù)
      • 需要精確知道消息發(fā)送結(jié)果的場景

      三、方案二:消息持久化機(jī)制

      核心原理

      將消息保存到磁盤,確保Broker重啟后消息不丟失。

      這是防止Broker端消息丟失的關(guān)鍵。

      關(guān)鍵實(shí)現(xiàn)

      // 持久化隊(duì)列配置
      @Bean
      public Queue orderQueue() {
          return QueueBuilder.durable("order.queue")  // 隊(duì)列持久化
                  .deadLetterExchange("order.dlx")    // 死信交換機(jī)
                  .build();
      }
      
      // 發(fā)送持久化消息
      public void sendPersistentMessage(Object message) {
          rabbitTemplate.convertAndSend("order.exchange", "order.create", message, msg -> {
              msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 消息持久化
              return msg;
          });
      }
      
      // Kafka持久化配置
      @Bean
      public ProducerFactory<String, Object> producerFactory() {
          Map<String, Object> props = new HashMap<>();
          props.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本確認(rèn)
          props.put(ProducerConfig.RETRIES_CONFIG, 3);   // 重試次數(shù)
          props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 冪等性
          return new DefaultKafkaProducerFactory<>(props);
      }
      

      優(yōu)缺點(diǎn)

      優(yōu)點(diǎn):

      • 有效防止Broker重啟導(dǎo)致的消息丟失
      • 配置簡單,效果明顯

      缺點(diǎn):

      • 磁盤IO影響性能
      • 需要足夠的磁盤空間

      四、方案三:消費(fèi)者確認(rèn)機(jī)制

      核心原理

      消費(fèi)者處理完消息后手動向Broker發(fā)送確認(rèn),Broker收到確認(rèn)后才刪除消息。

      這是保證消息不丟失的最后一道防線。

      關(guān)鍵實(shí)現(xiàn)

      // 手動確認(rèn)消費(fèi)者
      @RabbitListener(queues = "order.queue")
      public void handleMessage(Order order, Message message, Channel channel) {
          long deliveryTag = message.getMessageProperties().getDeliveryTag();
          
          try {
              // 業(yè)務(wù)處理
              orderService.processOrder(order);
              
              // 手動確認(rèn)
              channel.basicAck(deliveryTag, false);
              log.info("消息處理完成: {}", order.getOrderId());
              
          } catch (Exception e) {
              log.error("消息處理失敗: {}", order.getOrderId(), e);
              
              // 處理失敗,重新入隊(duì)
              channel.basicNack(deliveryTag, false, true);
          }
      }
      
      // 消費(fèi)者容器配置
      @Bean
      public SimpleRabbitListenerContainerFactory containerFactory() {
          SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
          factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手動確認(rèn)
          factory.setPrefetchCount(10); // 預(yù)取數(shù)量
          factory.setConcurrentConsumers(3); // 并發(fā)消費(fèi)者
          return factory;
      }
      

      注意事項(xiàng)

      • 確保業(yè)務(wù)處理完成后再確認(rèn)
      • 合理設(shè)置預(yù)取數(shù)量,避免內(nèi)存溢出
      • 處理異常時(shí)要正確使用NACK

      五、方案四:事務(wù)消息機(jī)制

      核心原理

      通過事務(wù)保證本地業(yè)務(wù)操作和消息發(fā)送的原子性,要么都成功,要么都失敗。

      關(guān)鍵實(shí)現(xiàn)

      // 本地事務(wù)表方案
      @Transactional
      public void createOrder(Order order) {
          // 1. 保存訂單到數(shù)據(jù)庫
          orderRepository.save(order);
          
          // 2. 保存消息到本地消息表
          LocalMessage localMessage = new LocalMessage();
          localMessage.setBusinessId(order.getOrderId());
          localMessage.setContent(JSON.toJSONString(order));
          localMessage.setStatus(MessageStatus.PENDING);
          localMessageRepository.save(localMessage);
          
          // 3. 事務(wù)提交,本地業(yè)務(wù)和消息存儲保持一致性
      }
      
      // 定時(shí)任務(wù)掃描并發(fā)送消息
      @Scheduled(fixedDelay = 5000)
      public void sendPendingMessages() {
          List<LocalMessage> pendingMessages = localMessageRepository.findByStatus(MessageStatus.PENDING);
          
          for (LocalMessage message : pendingMessages) {
              try {
                  // 發(fā)送消息到MQ
                  rabbitTemplate.convertAndSend("order.exchange", "order.create", message.getContent());
                  
                  // 更新消息狀態(tài)為已發(fā)送
                  message.setStatus(MessageStatus.SENT);
                  localMessageRepository.save(message);
                  
              } catch (Exception e) {
                  log.error("發(fā)送消息失敗: {}", message.getId(), e);
              }
          }
      }
      
      // RocketMQ事務(wù)消息
      public void sendTransactionMessage(Order order) {
          TransactionMQProducer producer = new TransactionMQProducer("order_producer");
          
          // 發(fā)送事務(wù)消息
          Message msg = new Message("order_topic", "create", 
                                   JSON.toJSONBytes(order));
          
          TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
          
          if (result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
              log.info("事務(wù)消息提交成功");
          }
      }
      

      適用場景

      • 需要嚴(yán)格保證業(yè)務(wù)和消息一致性的場景
      • 分布式事務(wù)場景
      • 金融、電商等對數(shù)據(jù)一致性要求高的業(yè)務(wù)

      六、方案五:消息重試與死信隊(duì)列

      核心原理

      通過重試機(jī)制處理臨時(shí)故障,通過死信隊(duì)列處理最終無法消費(fèi)的消息。

      關(guān)鍵實(shí)現(xiàn)

      // 重試隊(duì)列配置
      @Bean
      public Queue orderQueue() {
          return QueueBuilder.durable("order.queue")
                  .withArgument("x-dead-letter-exchange", "order.dlx") // 死信交換機(jī)
                  .withArgument("x-dead-letter-routing-key", "order.dead")
                  .withArgument("x-message-ttl", 60000) // 60秒后進(jìn)入死信
                  .build();
      }
      
      // 死信隊(duì)列配置
      @Bean
      public Queue orderDeadLetterQueue() {
          return QueueBuilder.durable("order.dead.queue").build();
      }
      
      // 消費(fèi)者重試邏輯
      @RabbitListener(queues = "order.queue")
      public void handleMessageWithRetry(Order order, Message message, Channel channel) {
          long deliveryTag = message.getMessageProperties().getDeliveryTag();
          
          try {
              orderService.processOrder(order);
              channel.basicAck(deliveryTag, false);
              
          } catch (TemporaryException e) {
              // 臨時(shí)異常,重新入隊(duì)重試
              channel.basicNack(deliveryTag, false, true);
              
          } catch (PermanentException e) {
              // 永久異常,直接確認(rèn)進(jìn)入死信隊(duì)列
              channel.basicAck(deliveryTag, false);
              log.error("消息進(jìn)入死信隊(duì)列: {}", order.getOrderId(), e);
          }
      }
      
      // 死信隊(duì)列消費(fèi)者
      @RabbitListener(queues = "order.dead.queue")
      public void handleDeadLetterMessage(Order order) {
          log.warn("處理死信消息: {}", order.getOrderId());
          // 發(fā)送告警、記錄日志、人工處理等
          alertService.sendAlert("死信消息告警", order.toString());
      }
      

      重試策略建議

      1. 指數(shù)退避:1s, 5s, 15s, 30s
      2. 最大重試次數(shù):3-5次
      3. 死信處理:人工介入或特殊處理流程

      七、方案對比與選型指南

      為了幫助大家選擇合適的方案,我整理了詳細(xì)的對比表:

      方案 可靠性 性能影響 復(fù)雜度 適用場景
      生產(chǎn)者確認(rèn) 所有需要可靠發(fā)送的場景
      消息持久化 Broker重啟保護(hù)
      消費(fèi)者確認(rèn) 確保消息被成功處理
      事務(wù)消息 最高 強(qiáng)一致性要求的業(yè)務(wù)
      重試+死信 處理臨時(shí)故障和最終死信

      選型建議

      初創(chuàng)項(xiàng)目/簡單業(yè)務(wù):

      • 生產(chǎn)者確認(rèn) + 消息持久化 + 消費(fèi)者確認(rèn)
      • 滿足大部分場景,實(shí)現(xiàn)簡單

      電商/交易系統(tǒng):

      • 生產(chǎn)者確認(rèn) + 事務(wù)消息 + 重試機(jī)制
      • 保證數(shù)據(jù)一致性,處理復(fù)雜業(yè)務(wù)

      大數(shù)據(jù)/日志處理:

      • 消息持久化 + 消費(fèi)者確認(rèn)
      • 允許少量丟失,追求吞吐量

      金融/支付系統(tǒng):

      • 全方案組合使用
      • 最高可靠性要求

      總結(jié)

      消息丟失問題是消息隊(duì)列使用中的常見挑戰(zhàn),通過今天介紹的5種方案,我們可以構(gòu)建一個(gè)可靠的消息系統(tǒng):

      1. 生產(chǎn)者確認(rèn)機(jī)制 - 保證消息成功發(fā)送到Broker
      2. 消息持久化機(jī)制 - 防止Broker重啟導(dǎo)致消息丟失
      3. 消費(fèi)者確認(rèn)機(jī)制 - 確保消息被成功處理
      4. 事務(wù)消息機(jī)制 - 保證業(yè)務(wù)和消息的一致性
      5. 重試與死信隊(duì)列 - 處理異常情況和最終死信

      有些小伙伴可能會問:"我需要全部使用這些方案嗎?

      "我的建議是:根據(jù)業(yè)務(wù)需求選擇合適的組合

      對于關(guān)鍵業(yè)務(wù),建議至少使用前三種方案;對于普通業(yè)務(wù),可以根據(jù)實(shí)際情況適當(dāng)簡化。

      記住,沒有完美的方案,只有最適合的方案。

      最后說一句(求關(guān)注,別白嫖我)

      如果這篇文章對您有所幫助,或者有所啟發(fā)的話,幫忙關(guān)注一下我的同名公眾號:蘇三說技術(shù),您的支持是我堅(jiān)持寫作最大的動力。

      求一鍵三連:點(diǎn)贊、轉(zhuǎn)發(fā)、在看。

      關(guān)注公眾號:【蘇三說技術(shù)】,在公眾號中回復(fù):進(jìn)大廠,可以免費(fèi)獲取我最近整理的10萬字的面試寶典,好多小伙伴靠這個(gè)寶典拿到了多家大廠的offer。

      更多項(xiàng)目實(shí)戰(zhàn)在我的技術(shù)網(wǎng)站:http://www.susan.net.cn/project

      posted @ 2025-10-28 15:24  蘇三說技術(shù)  閱讀(298)  評論(1)    收藏  舉報(bào)
      主站蜘蛛池模板: 国产午夜福利精品视频| 精品国产中文字幕在线| 粗壮挺进人妻水蜜桃成熟| 亚洲十八禁一区二区三区| 欧美成人精品手机在线| 国产成人无码A区在线观看视频 | 四虎国产成人永久精品免费| 中文无码vr最新无码av专区| julia无码中文字幕一区| аⅴ天堂中文在线网| 在线高清免费不卡全码| 国精产品一品二品国精在线观看| 亚洲AV日韩AV激情亚洲| 亚洲欧美国产日韩天堂区| 三上悠亚精品一区二区久久| 精品无码国产自产拍在线观看蜜| av色国产色拍| 在线免费播放av观看| 国产一区二区三区精品综合| 成人精品色一区二区三区| 久久久久99精品成人片牛牛影视 | 亚洲av成人网在线观看| 久久se精品一区二区三区| 内射人妻视频国内| 日本道播放一区二区三区| 亚洲精品久久久久玩吗| 久久久精品人妻一区二区三区| 午夜在线观看成人av| 白水县| 激情人妻自拍中文夜夜嗨| 宫西光有码视频中文字幕| 国产成人AV男人的天堂| 国产永久免费高清在线| 亚洲丶国产丶欧美一区二区三区| 欧美黑人XXXX性高清版| 中文字幕永久精品国产| 国产精品色三级在线观看| 丁香花成人电影| 国产亚洲色视频在线| 国产乱码精品一区二区三| 亚洲亚洲人成综合网络|