<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      我工作中用MQ的10種場景

      前言

      最近有球友問我:MQ的使用場景有哪些?工作中一定要使用MQ嗎?

      記得剛工作那會兒,我總是想不明白:為什么明明直接調(diào)用接口就能完成的功能,非要引入MQ這么個"中間商"?

      直到經(jīng)歷了系統(tǒng)崩潰、數(shù)據(jù)丟失、性能瓶頸等一系列問題后,我才真正理解了MQ的價值。

      今天我想和大家分享我在實際工作中使用消息隊列(MQ)的10種典型場景,希望對你會有所幫助。

      一、為什么需要消息隊列(MQ)?

      在深入具體場景之前,我們先來思考一個基本問題:為什么要使用消息隊列?

      系統(tǒng)間的直接調(diào)用:
      image

      引入消息隊列后:
      image

      接下來我們將通過10個具體場景,帶大家來深入理解MQ的價值。

      場景一:系統(tǒng)解耦

      背景描述

      在我早期參與的一個電商項目中,訂單創(chuàng)建后需要通知多個系統(tǒng):

      // 早期的緊耦合設(shè)計
      public class OrderService {
          private InventoryService inventoryService;
          private PointsService pointsService;
          private EmailService emailService;
          private AnalyticsService analyticsService;
          
          public void createOrder(Order order) {
              // 1. 保存訂單
              orderDao.save(order);
              
              // 2. 調(diào)用庫存服務(wù)
              inventoryService.updateInventory(order);
              
              // 3. 調(diào)用積分服務(wù)
              pointsService.addPoints(order.getUserId(), order.getAmount());
              
              // 4. 發(fā)送郵件通知
              emailService.sendOrderConfirmation(order);
              
              // 5. 記錄分析數(shù)據(jù)
              analyticsService.trackOrderCreated(order);
              
              // 更多服務(wù)...
          }
      }
      

      這種架構(gòu)存在嚴(yán)重問題:

      • 緊耦合:訂單服務(wù)需要知道所有下游服務(wù)
      • 單點故障:任何一個下游服務(wù)掛掉都會導(dǎo)致訂單創(chuàng)建失敗
      • 性能瓶頸:同步調(diào)用導(dǎo)致響應(yīng)時間慢

      MQ解決方案

      引入MQ后,架構(gòu)變?yōu)椋?br> image

      代碼實現(xiàn)

      // 訂單服務(wù) - 生產(chǎn)者
      @Service
      public class OrderService {
          @Autowired
          private RabbitTemplate rabbitTemplate;
          
          public void createOrder(Order order) {
              // 1. 保存訂單
              orderDao.save(order);
              
              // 2. 發(fā)送消息到MQ
              rabbitTemplate.convertAndSend(
                  "order.exchange",
                  "order.created",
                  new OrderCreatedEvent(order.getId(), order.getUserId(), order.getAmount())
              );
          }
      }
      
      // 庫存服務(wù) - 消費者
      @Component
      @RabbitListener(queues = "inventory.queue")
      public class InventoryConsumer {
          @Autowired
          private InventoryService inventoryService;
          
          @RabbitHandler
          public void handleOrderCreated(OrderCreatedEvent event) {
              inventoryService.updateInventory(event.getOrderId());
          }
      }
      

      技術(shù)要點

      1. 消息協(xié)議選擇:根據(jù)業(yè)務(wù)需求選擇RabbitMQ、Kafka或RocketMQ
      2. 消息格式:使用JSON或Protobuf等跨語言格式
      3. 錯誤處理:實現(xiàn)重試機制和死信隊列

      場景二:異步處理

      背景描述

      用戶上傳視頻后需要執(zhí)行轉(zhuǎn)碼、生成縮略圖、內(nèi)容審核等耗時操作,如果同步處理,用戶需要等待很長時間。

      MQ解決方案

      // 視頻服務(wù) - 生產(chǎn)者
      @Service
      public class VideoService {
          @Autowired
          private KafkaTemplate<String, Object> kafkaTemplate;
          
          public UploadResponse uploadVideo(MultipartFile file, String userId) {
              // 1. 保存原始視頻
              String videoId = saveOriginalVideo(file);
              
              // 2. 發(fā)送處理消息
              kafkaTemplate.send("video-processing", new VideoProcessingEvent(videoId, userId));
              
              // 3. 立即返回響應(yīng)
              return new UploadResponse(videoId, "upload_success");
          }
      }
      
      // 視頻處理服務(wù) - 消費者
      @Service
      public class VideoProcessingConsumer {
          @KafkaListener(topics = "video-processing")
          public void processVideo(VideoProcessingEvent event) {
              // 異步執(zhí)行耗時操作
              videoProcessor.transcode(event.getVideoId());
              videoProcessor.generateThumbnails(event.getVideoId());
              contentModerationService.checkContent(event.getVideoId());
              
              // 發(fā)送處理完成通知
              notificationService.notifyUser(event.getUserId(), event.getVideoId());
          }
      }
      

      架構(gòu)優(yōu)勢

      1. 快速響應(yīng):用戶上傳后立即得到響應(yīng)
      2. 彈性擴展:可以根據(jù)處理壓力動態(tài)調(diào)整消費者數(shù)量
      3. 故障隔離:處理服務(wù)故障不會影響上傳功能

      場景三:流量削峰

      背景描述

      電商秒殺活動時,瞬時流量可能是平時的百倍以上,直接沖擊數(shù)據(jù)庫和服務(wù)。

      MQ解決方案

      image

      代碼實現(xiàn)

      // 秒殺服務(wù)
      @Service
      public class SecKillService {
          @Autowired
          private RedisTemplate<String, Object> redisTemplate;
          
          @Autowired
          private RabbitTemplate rabbitTemplate;
          
          public SecKillResponse secKill(SecKillRequest request) {
              // 1. 校驗用戶資格
              if (!checkUserQualification(request.getUserId())) {
                  return SecKillResponse.failed("用戶無資格");
              }
              
              // 2. 預(yù)減庫存(Redis原子操作)
              Long remaining = redisTemplate.opsForValue().decrement(
                  "sec_kill_stock:" + request.getItemId());
              
              if (remaining == null || remaining < 0) {
                  // 庫存不足,恢復(fù)庫存
                  redisTemplate.opsForValue().increment("sec_kill_stock:" + request.getItemId());
                  return SecKillResponse.failed("庫存不足");
              }
              
              // 3. 發(fā)送秒殺成功消息到MQ
              rabbitTemplate.convertAndSend(
                  "sec_kill.exchange",
                  "sec_kill.success",
                  new SecKillSuccessEvent(request.getUserId(), request.getItemId())
              );
              
              return SecKillResponse.success("秒殺成功");
          }
      }
      
      // 訂單處理消費者
      @Component
      @RabbitListener(queues = "sec_kill.order.queue")
      public class SecKillOrderConsumer {
          @RabbitHandler
          public void handleSecKillSuccess(SecKillSuccessEvent event) {
              // 異步創(chuàng)建訂單
              orderService.createSecKillOrder(event.getUserId(), event.getItemId());
          }
      }
      

      技術(shù)要點

      1. 庫存預(yù)扣:使用Redis原子操作避免超賣
      2. 隊列緩沖:MQ緩沖請求,避免直接沖擊數(shù)據(jù)庫
      3. 限流控制:在網(wǎng)關(guān)層進行限流,拒絕過多請求

      場景四:數(shù)據(jù)同步

      背景描述

      在微服務(wù)架構(gòu)中,不同服務(wù)有自己的數(shù)據(jù)庫,需要保證數(shù)據(jù)一致性。

      MQ解決方案

      // 用戶服務(wù) - 數(shù)據(jù)變更時發(fā)送消息
      @Service
      public class UserService {
          @Transactional
          public User updateUser(User user) {
              // 1. 更新數(shù)據(jù)庫
              userDao.update(user);
              
              // 2. 發(fā)送消息(在事務(wù)內(nèi))
              rocketMQTemplate.sendMessageInTransaction(
                  "user-update-topic",
                  MessageBuilder.withPayload(new UserUpdateEvent(user.getId(), user.getStatus()))
                      .build(),
                  null
              );
              
              return user;
          }
      }
      
      // 其他服務(wù) - 消費用戶更新消息
      @Service
      @RocketMQMessageListener(topic = "user-update-topic", consumerGroup = "order-group")
      public class UserUpdateConsumer implements RocketMQListener<UserUpdateEvent> {
          @Override
          public void onMessage(UserUpdateEvent event) {
              // 更新本地用戶信息緩存
              orderService.updateUserCache(event.getUserId(), event.getStatus());
          }
      }
      

      一致性保證

      1. 本地事務(wù)表:將消息和業(yè)務(wù)數(shù)據(jù)放在同一個數(shù)據(jù)庫事務(wù)中
      2. 事務(wù)消息:使用RocketMQ的事務(wù)消息機制
      3. 冪等消費:消費者實現(xiàn)冪等性,避免重復(fù)處理

      場景五:日志收集

      背景描述

      分布式系統(tǒng)中,日志分散在各個節(jié)點,需要集中收集和分析。

      MQ解決方案

      image

      代碼實現(xiàn)

      // 日志收集組件
      @Component
      public class LogCollector {
          @Autowired
          private KafkaTemplate<String, String> kafkaTemplate;
          
          public void collectLog(String appId, String level, String message, Map<String, Object> context) {
              LogEvent logEvent = new LogEvent(appId, level, message, context, System.currentTimeMillis());
              
              // 發(fā)送到Kafka
              kafkaTemplate.send("app-logs", appId, JsonUtils.toJson(logEvent));
          }
      }
      
      // 日志消費者
      @Service
      public class LogConsumer {
          @KafkaListener(topics = "app-logs", groupId = "log-es")
          public void consumeLog(String message) {
              LogEvent logEvent = JsonUtils.fromJson(message, LogEvent.class);
              
              // 存儲到Elasticsearch
              elasticsearchService.indexLog(logEvent);
              
              // 實時監(jiān)控檢查
              if ("ERROR".equals(logEvent.getLevel())) {
                  alertService.checkAndAlert(logEvent);
              }
          }
      }
      

      技術(shù)優(yōu)勢

      1. 解耦:應(yīng)用節(jié)點無需關(guān)心日志如何處理
      2. 緩沖:應(yīng)對日志產(chǎn)生速率波動
      3. 多消費:同一份日志可以被多個消費者處理

      場景六:消息廣播

      背景描述

      系統(tǒng)配置更新后,需要通知所有服務(wù)節(jié)點更新本地配置。

      MQ解決方案

      // 配置服務(wù) - 廣播配置更新
      @Service
      public class ConfigService {
          @Autowired
          private RedisTemplate<String, Object> redisTemplate;
          
          public void updateConfig(String configKey, String configValue) {
              // 1. 更新配置存儲
              configDao.updateConfig(configKey, configValue);
              
              // 2. 廣播配置更新消息
              redisTemplate.convertAndSend("config-update-channel", 
                  new ConfigUpdateEvent(configKey, configValue));
          }
      }
      
      // 服務(wù)節(jié)點 - 訂閱配置更新
      @Component
      public class ConfigUpdateListener {
          @Autowired
          private LocalConfigCache localConfigCache;
          
          @RedisListener(channel = "config-update-channel")
          public void handleConfigUpdate(ConfigUpdateEvent event) {
              // 更新本地配置緩存
              localConfigCache.updateConfig(event.getKey(), event.getValue());
          }
      }
      

      應(yīng)用場景

      1. 功能開關(guān):動態(tài)開啟或關(guān)閉功能
      2. 參數(shù)調(diào)整:調(diào)整超時時間、限流閾值等
      3. 黑白名單:更新黑白名單配置

      場景七:順序消息

      背景描述

      在某些業(yè)務(wù)場景中,消息的處理順序很重要,如訂單狀態(tài)變更。

      MQ解決方案

      // 訂單狀態(tài)變更服務(wù)
      @Service
      public class OrderStateService {
          @Autowired
          private RocketMQTemplate rocketMQTemplate;
          
          public void changeOrderState(String orderId, String oldState, String newState) {
              OrderStateEvent event = new OrderStateEvent(orderId, oldState, newState);
              
              // 發(fā)送順序消息,使用orderId作為sharding key
              rocketMQTemplate.syncSendOrderly(
                  "order-state-topic", 
                  event, 
                  orderId  // 保證同一訂單的消息按順序處理
              );
          }
      }
      
      // 訂單狀態(tài)消費者
      @Service
      @RocketMQMessageListener(
          topic = "order-state-topic",
          consumerGroup = "order-state-group",
          consumeMode = ConsumeMode.ORDERLY  // 順序消費
      )
      public class OrderStateConsumer implements RocketMQListener<OrderStateEvent> {
          @Override
          public void onMessage(OrderStateEvent event) {
              // 按順序處理訂單狀態(tài)變更
              orderService.processStateChange(event);
          }
      }
      

      順序保證機制

      1. 分區(qū)順序:同一分區(qū)內(nèi)的消息保證順序
      2. 順序投遞:MQ保證消息按發(fā)送順序投遞
      3. 順序處理:消費者順序處理消息

      場景八:延遲消息

      背景描述

      需要實現(xiàn)定時任務(wù),如訂單超時未支付自動取消。

      MQ解決方案

      // 訂單服務(wù) - 發(fā)送延遲消息
      @Service
      public class OrderService {
          @Autowired
          private RabbitTemplate rabbitTemplate;
          
          public void createOrder(Order order) {
              // 保存訂單
              orderDao.save(order);
              
              // 發(fā)送延遲消息,30分鐘后檢查支付狀態(tài)
              rabbitTemplate.convertAndSend(
                  "order.delay.exchange",
                  "order.create",
                  new OrderCreateEvent(order.getId()),
                  message -> {
                      message.getMessageProperties().setDelay(30 * 60 * 1000); // 30分鐘
                      return message;
                  }
              );
          }
      }
      
      // 訂單超時檢查消費者
      @Component
      @RabbitListener(queues = "order.delay.queue")
      public class OrderTimeoutConsumer {
          @RabbitHandler
          public void checkOrderPayment(OrderCreateEvent event) {
              Order order = orderDao.findById(event.getOrderId());
              if ("UNPAID".equals(order.getStatus())) {
                  // 超時未支付,取消訂單
                  orderService.cancelOrder(order.getId(), "超時未支付");
              }
          }
      }
      

      替代方案對比

      方案 優(yōu)點 缺點
      數(shù)據(jù)庫輪詢 實現(xiàn)簡單 實時性差,數(shù)據(jù)庫壓力大
      延時隊列 實時性好 實現(xiàn)復(fù)雜,消息堆積問題
      定時任務(wù) 可控性強 分布式協(xié)調(diào)復(fù)雜

      場景九:消息重試

      背景描述

      處理消息時可能遇到臨時故障,需要重試機制保證最終處理成功。

      MQ解決方案

      // 消息消費者 with 重試機制
      @Service
      @Slf4j
      public class RetryableConsumer {
          @Autowired
          private RabbitTemplate rabbitTemplate;
          
          @RabbitListener(queues = "business.queue")
          public void processMessage(Message message, Channel channel) {
              try {
                  // 業(yè)務(wù)處理
                  businessService.process(message);
                  
                  // 確認(rèn)消息
                  channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                  
              } catch (TemporaryException e) {
                  // 臨時異常,重試
                  log.warn("處理失敗,準(zhǔn)備重試", e);
                  
                  // 拒絕消息,requeue=true
                  channel.basicNack(
                      message.getMessageProperties().getDeliveryTag(),
                      false,
                      true  // 重新入隊
                  );
                  
              } catch (PermanentException e) {
                  // 永久異常,進入死信隊列
                  log.error("處理失敗,進入死信隊列", e);
                  
                  channel.basicNack(
                      message.getMessageProperties().getDeliveryTag(),
                      false,
                      false  // 不重新入隊
                  );
              }
          }
      }
      

      重試策略

      1. 立即重試:臨時故障立即重試
      2. 延遲重試:逐步增加重試間隔
      3. 死信隊列:最終無法處理的消息進入死信隊列

      場景十:事務(wù)消息

      背景描述

      分布式系統(tǒng)中,需要保證多個服務(wù)的數(shù)據(jù)一致性。

      MQ解決方案

      // 事務(wù)消息生產(chǎn)者
      @Service
      public class TransactionalMessageService {
          @Autowired
          private RocketMQTemplate rocketMQTemplate;
          
          @Transactional
          public void createOrderWithTransaction(Order order) {
              // 1. 保存訂單(數(shù)據(jù)庫事務(wù))
              orderDao.save(order);
              
              // 2. 發(fā)送事務(wù)消息
              TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
                  "order-tx-topic",
                  MessageBuilder.withPayload(new OrderCreatedEvent(order.getId()))
                      .build(),
                  order  // 事務(wù)參數(shù)
              );
              
              if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {
                  throw new RuntimeException("事務(wù)消息發(fā)送失敗");
              }
          }
      }
      
      // 事務(wù)消息監(jiān)聽器
      @Component
      @RocketMQTransactionListener
      public class OrderTransactionListener implements RocketMQLocalTransactionListener {
          @Autowired
          private OrderDao orderDao;
          
          @Override
          public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
              try {
                  // 檢查本地事務(wù)狀態(tài)
                  Order order = (Order) arg;
                  Order existOrder = orderDao.findById(order.getId());
                  
                  if (existOrder != null && "CREATED".equals(existOrder.getStatus())) {
                      return RocketMQLocalTransactionState.COMMIT_MESSAGE;
                  } else {
                      return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
                  }
              } catch (Exception e) {
                  return RocketMQLocalTransactionState.UNKNOWN;
              }
          }
          
          @Override
          public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
              // 回查本地事務(wù)狀態(tài)
              String orderId = (String) msg.getHeaders().get("order_id");
              Order order = orderDao.findById(orderId);
              
              if (order != null && "CREATED".equals(order.getStatus())) {
                  return RocketMQLocalTransactionState.COMMIT_MESSAGE;
              } else {
                  return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
              }
          }
      }
      

      事務(wù)消息流程

      image

      總結(jié)

      通過以上10個場景,我們可以總結(jié)出MQ使用的核心原則:

      適用場景

      1. 異步處理:提升系統(tǒng)響應(yīng)速度
      2. 系統(tǒng)解耦:降低系統(tǒng)間依賴
      3. 流量削峰:應(yīng)對突發(fā)流量
      4. 數(shù)據(jù)同步:保證最終一致性
      5. 分布式事務(wù):解決數(shù)據(jù)一致性問題

      技術(shù)選型建議

      場景 推薦MQ 原因
      高吞吐 Kafka 高吞吐量,持久化存儲
      事務(wù)消息 RocketMQ 完整的事務(wù)消息機制
      復(fù)雜路由 RabbitMQ 靈活的路由配置
      延遲消息 RabbitMQ 原生支持延遲隊列

      最佳實踐

      1. 消息冪等性:消費者必須實現(xiàn)冪等處理
      2. 死信隊列:處理失敗的消息要有兜底方案
      3. 監(jiān)控告警:完善的消息堆積監(jiān)控和告警
      4. 性能優(yōu)化:根據(jù)業(yè)務(wù)特點調(diào)整MQ參數(shù)

      最后說一句(求關(guān)注,別白嫖我)

      如果這篇文章對您有所幫助,或者有所啟發(fā)的話,幫忙關(guān)注一下我的同名公眾號:蘇三說技術(shù),您的支持是我堅持寫作最大的動力。

      求一鍵三連:點贊、轉(zhuǎn)發(fā)、在看。

      關(guān)注公眾號:【蘇三說技術(shù)】,在公眾號中回復(fù):進大廠,可以免費獲取我最近整理的10萬字的面試寶典,好多小伙伴靠這個寶典拿到了多家大廠的offer。

      本文收錄于我的技術(shù)網(wǎng)站:http://www.susan.net.cn

      posted @ 2025-10-09 09:59  蘇三說技術(shù)  閱讀(1570)  評論(2)    收藏  舉報
      主站蜘蛛池模板: 亚洲精品综合久久国产二区| 米林县| 久久精品国产成人午夜福利| 亚洲国产成人资源在线| 日本伊人色综合网| 一本无码在线观看| 中文乱码字幕在线中文乱码 | 天天综合色一区二区三区| 中文字幕人妻有码久视频| 国产精品一区二区无线| 在线 欧美 中文 亚洲 精品| 日韩精品中文字幕一线不卡| 色综合热无码热国产| 婷婷六月天在线| 国产精品久久无中文字幕| 丁香婷婷综合激情五月色| 亚洲香蕉免费有线视频| 香蕉EEWW99国产精选免费| 亚洲欧美日韩综合一区二区| 国产成人剧情AV麻豆果冻| 久久视频这里只精品| 精品人妻午夜福利一区二区| 亚洲精品中文av在线| 久久精品国产九一九九九| 曲阳县| 夜夜躁狠狠躁日日躁| 东北女人毛多水多牲交视频| 色欲狠狠躁天天躁无码中文字幕| 日韩精品一区二区在线看| 忘忧草日本在线播放www| 99国产精品白浆无码流出| 99在线精品免费视频| 日韩精品一区二区蜜臀av| 护士张开腿被奷日出白浆| 人妻少妇偷人无码视频| 午夜福利国产精品小视频| 漂亮的保姆hd完整版免费韩国| 亚洲这里只有久热精品伊人| 中文字幕久久久久人妻中出| 国精品午夜福利不卡视频| 日韩欧美亚洲综合久久|