<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      我工作中用MQ的10種場景

      前言

      最近有球友問我:MQ的使用場景有哪些?工作中一定要使用MQ嗎?

      記得剛工作那會兒,我總是想不明白:為什么明明直接調用接口就能完成的功能,非要引入MQ這么個"中間商"?

      直到經歷了系統崩潰、數據丟失、性能瓶頸等一系列問題后,我才真正理解了MQ的價值。

      今天我想和大家分享我在實際工作中使用消息隊列(MQ)的10種典型場景,希望對你會有所幫助。

      一、為什么需要消息隊列(MQ)?

      在深入具體場景之前,我們先來思考一個基本問題:為什么要使用消息隊列?

      系統間的直接調用:
      image

      引入消息隊列后:
      image

      接下來我們將通過10個具體場景,帶大家來深入理解MQ的價值。

      場景一:系統解耦

      背景描述

      在我早期參與的一個電商項目中,訂單創建后需要通知多個系統:

      // 早期的緊耦合設計
      public class OrderService {
          private InventoryService inventoryService;
          private PointsService pointsService;
          private EmailService emailService;
          private AnalyticsService analyticsService;
          
          public void createOrder(Order order) {
              // 1. 保存訂單
              orderDao.save(order);
              
              // 2. 調用庫存服務
              inventoryService.updateInventory(order);
              
              // 3. 調用積分服務
              pointsService.addPoints(order.getUserId(), order.getAmount());
              
              // 4. 發送郵件通知
              emailService.sendOrderConfirmation(order);
              
              // 5. 記錄分析數據
              analyticsService.trackOrderCreated(order);
              
              // 更多服務...
          }
      }
      

      這種架構存在嚴重問題:

      • 緊耦合:訂單服務需要知道所有下游服務
      • 單點故障:任何一個下游服務掛掉都會導致訂單創建失敗
      • 性能瓶頸:同步調用導致響應時間慢

      MQ解決方案

      引入MQ后,架構變為:
      image

      代碼實現

      // 訂單服務 - 生產者
      @Service
      public class OrderService {
          @Autowired
          private RabbitTemplate rabbitTemplate;
          
          public void createOrder(Order order) {
              // 1. 保存訂單
              orderDao.save(order);
              
              // 2. 發送消息到MQ
              rabbitTemplate.convertAndSend(
                  "order.exchange",
                  "order.created",
                  new OrderCreatedEvent(order.getId(), order.getUserId(), order.getAmount())
              );
          }
      }
      
      // 庫存服務 - 消費者
      @Component
      @RabbitListener(queues = "inventory.queue")
      public class InventoryConsumer {
          @Autowired
          private InventoryService inventoryService;
          
          @RabbitHandler
          public void handleOrderCreated(OrderCreatedEvent event) {
              inventoryService.updateInventory(event.getOrderId());
          }
      }
      

      技術要點

      1. 消息協議選擇:根據業務需求選擇RabbitMQ、Kafka或RocketMQ
      2. 消息格式:使用JSON或Protobuf等跨語言格式
      3. 錯誤處理:實現重試機制和死信隊列

      場景二:異步處理

      背景描述

      用戶上傳視頻后需要執行轉碼、生成縮略圖、內容審核等耗時操作,如果同步處理,用戶需要等待很長時間。

      MQ解決方案

      // 視頻服務 - 生產者
      @Service
      public class VideoService {
          @Autowired
          private KafkaTemplate<String, Object> kafkaTemplate;
          
          public UploadResponse uploadVideo(MultipartFile file, String userId) {
              // 1. 保存原始視頻
              String videoId = saveOriginalVideo(file);
              
              // 2. 發送處理消息
              kafkaTemplate.send("video-processing", new VideoProcessingEvent(videoId, userId));
              
              // 3. 立即返回響應
              return new UploadResponse(videoId, "upload_success");
          }
      }
      
      // 視頻處理服務 - 消費者
      @Service
      public class VideoProcessingConsumer {
          @KafkaListener(topics = "video-processing")
          public void processVideo(VideoProcessingEvent event) {
              // 異步執行耗時操作
              videoProcessor.transcode(event.getVideoId());
              videoProcessor.generateThumbnails(event.getVideoId());
              contentModerationService.checkContent(event.getVideoId());
              
              // 發送處理完成通知
              notificationService.notifyUser(event.getUserId(), event.getVideoId());
          }
      }
      

      架構優勢

      1. 快速響應:用戶上傳后立即得到響應
      2. 彈性擴展:可以根據處理壓力動態調整消費者數量
      3. 故障隔離:處理服務故障不會影響上傳功能

      場景三:流量削峰

      背景描述

      電商秒殺活動時,瞬時流量可能是平時的百倍以上,直接沖擊數據庫和服務。

      MQ解決方案

      image

      代碼實現

      // 秒殺服務
      @Service
      public class SecKillService {
          @Autowired
          private RedisTemplate<String, Object> redisTemplate;
          
          @Autowired
          private RabbitTemplate rabbitTemplate;
          
          public SecKillResponse secKill(SecKillRequest request) {
              // 1. 校驗用戶資格
              if (!checkUserQualification(request.getUserId())) {
                  return SecKillResponse.failed("用戶無資格");
              }
              
              // 2. 預減庫存(Redis原子操作)
              Long remaining = redisTemplate.opsForValue().decrement(
                  "sec_kill_stock:" + request.getItemId());
              
              if (remaining == null || remaining < 0) {
                  // 庫存不足,恢復庫存
                  redisTemplate.opsForValue().increment("sec_kill_stock:" + request.getItemId());
                  return SecKillResponse.failed("庫存不足");
              }
              
              // 3. 發送秒殺成功消息到MQ
              rabbitTemplate.convertAndSend(
                  "sec_kill.exchange",
                  "sec_kill.success",
                  new SecKillSuccessEvent(request.getUserId(), request.getItemId())
              );
              
              return SecKillResponse.success("秒殺成功");
          }
      }
      
      // 訂單處理消費者
      @Component
      @RabbitListener(queues = "sec_kill.order.queue")
      public class SecKillOrderConsumer {
          @RabbitHandler
          public void handleSecKillSuccess(SecKillSuccessEvent event) {
              // 異步創建訂單
              orderService.createSecKillOrder(event.getUserId(), event.getItemId());
          }
      }
      

      技術要點

      1. 庫存預扣:使用Redis原子操作避免超賣
      2. 隊列緩沖:MQ緩沖請求,避免直接沖擊數據庫
      3. 限流控制:在網關層進行限流,拒絕過多請求

      場景四:數據同步

      背景描述

      在微服務架構中,不同服務有自己的數據庫,需要保證數據一致性。

      MQ解決方案

      // 用戶服務 - 數據變更時發送消息
      @Service
      public class UserService {
          @Transactional
          public User updateUser(User user) {
              // 1. 更新數據庫
              userDao.update(user);
              
              // 2. 發送消息(在事務內)
              rocketMQTemplate.sendMessageInTransaction(
                  "user-update-topic",
                  MessageBuilder.withPayload(new UserUpdateEvent(user.getId(), user.getStatus()))
                      .build(),
                  null
              );
              
              return user;
          }
      }
      
      // 其他服務 - 消費用戶更新消息
      @Service
      @RocketMQMessageListener(topic = "user-update-topic", consumerGroup = "order-group")
      public class UserUpdateConsumer implements RocketMQListener<UserUpdateEvent> {
          @Override
          public void onMessage(UserUpdateEvent event) {
              // 更新本地用戶信息緩存
              orderService.updateUserCache(event.getUserId(), event.getStatus());
          }
      }
      

      一致性保證

      1. 本地事務表:將消息和業務數據放在同一個數據庫事務中
      2. 事務消息:使用RocketMQ的事務消息機制
      3. 冪等消費:消費者實現冪等性,避免重復處理

      場景五:日志收集

      背景描述

      分布式系統中,日志分散在各個節點,需要集中收集和分析。

      MQ解決方案

      image

      代碼實現

      // 日志收集組件
      @Component
      public class LogCollector {
          @Autowired
          private KafkaTemplate<String, String> kafkaTemplate;
          
          public void collectLog(String appId, String level, String message, Map<String, Object> context) {
              LogEvent logEvent = new LogEvent(appId, level, message, context, System.currentTimeMillis());
              
              // 發送到Kafka
              kafkaTemplate.send("app-logs", appId, JsonUtils.toJson(logEvent));
          }
      }
      
      // 日志消費者
      @Service
      public class LogConsumer {
          @KafkaListener(topics = "app-logs", groupId = "log-es")
          public void consumeLog(String message) {
              LogEvent logEvent = JsonUtils.fromJson(message, LogEvent.class);
              
              // 存儲到Elasticsearch
              elasticsearchService.indexLog(logEvent);
              
              // 實時監控檢查
              if ("ERROR".equals(logEvent.getLevel())) {
                  alertService.checkAndAlert(logEvent);
              }
          }
      }
      

      技術優勢

      1. 解耦:應用節點無需關心日志如何處理
      2. 緩沖:應對日志產生速率波動
      3. 多消費:同一份日志可以被多個消費者處理

      場景六:消息廣播

      背景描述

      系統配置更新后,需要通知所有服務節點更新本地配置。

      MQ解決方案

      // 配置服務 - 廣播配置更新
      @Service
      public class ConfigService {
          @Autowired
          private RedisTemplate<String, Object> redisTemplate;
          
          public void updateConfig(String configKey, String configValue) {
              // 1. 更新配置存儲
              configDao.updateConfig(configKey, configValue);
              
              // 2. 廣播配置更新消息
              redisTemplate.convertAndSend("config-update-channel", 
                  new ConfigUpdateEvent(configKey, configValue));
          }
      }
      
      // 服務節點 - 訂閱配置更新
      @Component
      public class ConfigUpdateListener {
          @Autowired
          private LocalConfigCache localConfigCache;
          
          @RedisListener(channel = "config-update-channel")
          public void handleConfigUpdate(ConfigUpdateEvent event) {
              // 更新本地配置緩存
              localConfigCache.updateConfig(event.getKey(), event.getValue());
          }
      }
      

      應用場景

      1. 功能開關:動態開啟或關閉功能
      2. 參數調整:調整超時時間、限流閾值等
      3. 黑白名單:更新黑白名單配置

      場景七:順序消息

      背景描述

      在某些業務場景中,消息的處理順序很重要,如訂單狀態變更。

      MQ解決方案

      // 訂單狀態變更服務
      @Service
      public class OrderStateService {
          @Autowired
          private RocketMQTemplate rocketMQTemplate;
          
          public void changeOrderState(String orderId, String oldState, String newState) {
              OrderStateEvent event = new OrderStateEvent(orderId, oldState, newState);
              
              // 發送順序消息,使用orderId作為sharding key
              rocketMQTemplate.syncSendOrderly(
                  "order-state-topic", 
                  event, 
                  orderId  // 保證同一訂單的消息按順序處理
              );
          }
      }
      
      // 訂單狀態消費者
      @Service
      @RocketMQMessageListener(
          topic = "order-state-topic",
          consumerGroup = "order-state-group",
          consumeMode = ConsumeMode.ORDERLY  // 順序消費
      )
      public class OrderStateConsumer implements RocketMQListener<OrderStateEvent> {
          @Override
          public void onMessage(OrderStateEvent event) {
              // 按順序處理訂單狀態變更
              orderService.processStateChange(event);
          }
      }
      

      順序保證機制

      1. 分區順序:同一分區內的消息保證順序
      2. 順序投遞:MQ保證消息按發送順序投遞
      3. 順序處理:消費者順序處理消息

      場景八:延遲消息

      背景描述

      需要實現定時任務,如訂單超時未支付自動取消。

      MQ解決方案

      // 訂單服務 - 發送延遲消息
      @Service
      public class OrderService {
          @Autowired
          private RabbitTemplate rabbitTemplate;
          
          public void createOrder(Order order) {
              // 保存訂單
              orderDao.save(order);
              
              // 發送延遲消息,30分鐘后檢查支付狀態
              rabbitTemplate.convertAndSend(
                  "order.delay.exchange",
                  "order.create",
                  new OrderCreateEvent(order.getId()),
                  message -> {
                      message.getMessageProperties().setDelay(30 * 60 * 1000); // 30分鐘
                      return message;
                  }
              );
          }
      }
      
      // 訂單超時檢查消費者
      @Component
      @RabbitListener(queues = "order.delay.queue")
      public class OrderTimeoutConsumer {
          @RabbitHandler
          public void checkOrderPayment(OrderCreateEvent event) {
              Order order = orderDao.findById(event.getOrderId());
              if ("UNPAID".equals(order.getStatus())) {
                  // 超時未支付,取消訂單
                  orderService.cancelOrder(order.getId(), "超時未支付");
              }
          }
      }
      

      替代方案對比

      方案 優點 缺點
      數據庫輪詢 實現簡單 實時性差,數據庫壓力大
      延時隊列 實時性好 實現復雜,消息堆積問題
      定時任務 可控性強 分布式協調復雜

      場景九:消息重試

      背景描述

      處理消息時可能遇到臨時故障,需要重試機制保證最終處理成功。

      MQ解決方案

      // 消息消費者 with 重試機制
      @Service
      @Slf4j
      public class RetryableConsumer {
          @Autowired
          private RabbitTemplate rabbitTemplate;
          
          @RabbitListener(queues = "business.queue")
          public void processMessage(Message message, Channel channel) {
              try {
                  // 業務處理
                  businessService.process(message);
                  
                  // 確認消息
                  channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                  
              } catch (TemporaryException e) {
                  // 臨時異常,重試
                  log.warn("處理失敗,準備重試", e);
                  
                  // 拒絕消息,requeue=true
                  channel.basicNack(
                      message.getMessageProperties().getDeliveryTag(),
                      false,
                      true  // 重新入隊
                  );
                  
              } catch (PermanentException e) {
                  // 永久異常,進入死信隊列
                  log.error("處理失敗,進入死信隊列", e);
                  
                  channel.basicNack(
                      message.getMessageProperties().getDeliveryTag(),
                      false,
                      false  // 不重新入隊
                  );
              }
          }
      }
      

      重試策略

      1. 立即重試:臨時故障立即重試
      2. 延遲重試:逐步增加重試間隔
      3. 死信隊列:最終無法處理的消息進入死信隊列

      場景十:事務消息

      背景描述

      分布式系統中,需要保證多個服務的數據一致性。

      MQ解決方案

      // 事務消息生產者
      @Service
      public class TransactionalMessageService {
          @Autowired
          private RocketMQTemplate rocketMQTemplate;
          
          @Transactional
          public void createOrderWithTransaction(Order order) {
              // 1. 保存訂單(數據庫事務)
              orderDao.save(order);
              
              // 2. 發送事務消息
              TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
                  "order-tx-topic",
                  MessageBuilder.withPayload(new OrderCreatedEvent(order.getId()))
                      .build(),
                  order  // 事務參數
              );
              
              if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {
                  throw new RuntimeException("事務消息發送失敗");
              }
          }
      }
      
      // 事務消息監聽器
      @Component
      @RocketMQTransactionListener
      public class OrderTransactionListener implements RocketMQLocalTransactionListener {
          @Autowired
          private OrderDao orderDao;
          
          @Override
          public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
              try {
                  // 檢查本地事務狀態
                  Order order = (Order) arg;
                  Order existOrder = orderDao.findById(order.getId());
                  
                  if (existOrder != null && "CREATED".equals(existOrder.getStatus())) {
                      return RocketMQLocalTransactionState.COMMIT_MESSAGE;
                  } else {
                      return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
                  }
              } catch (Exception e) {
                  return RocketMQLocalTransactionState.UNKNOWN;
              }
          }
          
          @Override
          public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
              // 回查本地事務狀態
              String orderId = (String) msg.getHeaders().get("order_id");
              Order order = orderDao.findById(orderId);
              
              if (order != null && "CREATED".equals(order.getStatus())) {
                  return RocketMQLocalTransactionState.COMMIT_MESSAGE;
              } else {
                  return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
              }
          }
      }
      

      事務消息流程

      image

      總結

      通過以上10個場景,我們可以總結出MQ使用的核心原則:

      適用場景

      1. 異步處理:提升系統響應速度
      2. 系統解耦:降低系統間依賴
      3. 流量削峰:應對突發流量
      4. 數據同步:保證最終一致性
      5. 分布式事務:解決數據一致性問題

      技術選型建議

      場景 推薦MQ 原因
      高吞吐 Kafka 高吞吐量,持久化存儲
      事務消息 RocketMQ 完整的事務消息機制
      復雜路由 RabbitMQ 靈活的路由配置
      延遲消息 RabbitMQ 原生支持延遲隊列

      最佳實踐

      1. 消息冪等性:消費者必須實現冪等處理
      2. 死信隊列:處理失敗的消息要有兜底方案
      3. 監控告警:完善的消息堆積監控和告警
      4. 性能優化:根據業務特點調整MQ參數

      最后說一句(求關注,別白嫖我)

      如果這篇文章對您有所幫助,或者有所啟發的話,幫忙關注一下我的同名公眾號:蘇三說技術,您的支持是我堅持寫作最大的動力。

      求一鍵三連:點贊、轉發、在看。

      關注公眾號:【蘇三說技術】,在公眾號中回復:進大廠,可以免費獲取我最近整理的10萬字的面試寶典,好多小伙伴靠這個寶典拿到了多家大廠的offer。

      本文收錄于我的技術網站:http://www.susan.net.cn

      posted @ 2025-10-09 09:59  蘇三說技術  閱讀(1570)  評論(2)    收藏  舉報
      主站蜘蛛池模板: 久久99精品久久久久久青青| 蜜桃视频一区二区三区四| 欧美不卡无线在线一二三区观| 午夜成人无码福利免费视频| 久久久久青草线蕉综合超碰| 人妻日韩人妻中文字幕| av中文字幕一区二区| 99riav国产精品视频| 97在线碰| 成人免费看片又大又黄| 亚洲国产成人久久77| 伊人久久精品久久亚洲一区| 精品亚洲国产成人痴汉av| 康保县| 自拍偷在线精品自拍偷99| 成人做受视频试看60秒| 甘肃省| 亚洲国产av剧一区二区三区 | 中文字幕久久六月色综合| 欧美亚洲h在线一区二区| 人妻护士在线波多野结衣| 国产激情精品一区二区三区 | 色吊丝av熟女中文字幕| 精品无码人妻| 精品一区二区久久久久久久网站| 国产福利社区一区二区| 国产最大成人亚洲精品| 中文字幕制服国产精品| 日韩放荡少妇无码视频| 免费人成网站免费看视频| 精品视频一区二区福利午夜| 日本深夜福利在线观看| 国产真人无遮挡免费视频| 不卡一区二区国产在线| 亚洲av无码成人精品区一区| 久久精品色一情一乱一伦| 久久99国内精品自在现线| 午夜av高清在线观看| 久久―日本道色综合久久| 国产精品户外野外| 色爱av综合网国产精品|