我工作中用MQ的10種場景
前言
最近有球友問我:MQ的使用場景有哪些?工作中一定要使用MQ嗎?
記得剛工作那會兒,我總是想不明白:為什么明明直接調用接口就能完成的功能,非要引入MQ這么個"中間商"?
直到經歷了系統崩潰、數據丟失、性能瓶頸等一系列問題后,我才真正理解了MQ的價值。
今天我想和大家分享我在實際工作中使用消息隊列(MQ)的10種典型場景,希望對你會有所幫助。
一、為什么需要消息隊列(MQ)?
在深入具體場景之前,我們先來思考一個基本問題:為什么要使用消息隊列?
系統間的直接調用:

引入消息隊列后:

接下來我們將通過10個具體場景,帶大家來深入理解MQ的價值。
場景一:系統解耦
背景描述
在我早期參與的一個電商項目中,訂單創建后需要通知多個系統:
// 早期的緊耦合設計
public class OrderService {
private InventoryService inventoryService;
private PointsService pointsService;
private EmailService emailService;
private AnalyticsService analyticsService;
public void createOrder(Order order) {
// 1. 保存訂單
orderDao.save(order);
// 2. 調用庫存服務
inventoryService.updateInventory(order);
// 3. 調用積分服務
pointsService.addPoints(order.getUserId(), order.getAmount());
// 4. 發送郵件通知
emailService.sendOrderConfirmation(order);
// 5. 記錄分析數據
analyticsService.trackOrderCreated(order);
// 更多服務...
}
}
這種架構存在嚴重問題:
- 緊耦合:訂單服務需要知道所有下游服務
- 單點故障:任何一個下游服務掛掉都會導致訂單創建失敗
- 性能瓶頸:同步調用導致響應時間慢
MQ解決方案
引入MQ后,架構變為:

代碼實現:
// 訂單服務 - 生產者
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void createOrder(Order order) {
// 1. 保存訂單
orderDao.save(order);
// 2. 發送消息到MQ
rabbitTemplate.convertAndSend(
"order.exchange",
"order.created",
new OrderCreatedEvent(order.getId(), order.getUserId(), order.getAmount())
);
}
}
// 庫存服務 - 消費者
@Component
@RabbitListener(queues = "inventory.queue")
public class InventoryConsumer {
@Autowired
private InventoryService inventoryService;
@RabbitHandler
public void handleOrderCreated(OrderCreatedEvent event) {
inventoryService.updateInventory(event.getOrderId());
}
}
技術要點
- 消息協議選擇:根據業務需求選擇RabbitMQ、Kafka或RocketMQ
- 消息格式:使用JSON或Protobuf等跨語言格式
- 錯誤處理:實現重試機制和死信隊列
場景二:異步處理
背景描述
用戶上傳視頻后需要執行轉碼、生成縮略圖、內容審核等耗時操作,如果同步處理,用戶需要等待很長時間。
MQ解決方案
// 視頻服務 - 生產者
@Service
public class VideoService {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public UploadResponse uploadVideo(MultipartFile file, String userId) {
// 1. 保存原始視頻
String videoId = saveOriginalVideo(file);
// 2. 發送處理消息
kafkaTemplate.send("video-processing", new VideoProcessingEvent(videoId, userId));
// 3. 立即返回響應
return new UploadResponse(videoId, "upload_success");
}
}
// 視頻處理服務 - 消費者
@Service
public class VideoProcessingConsumer {
@KafkaListener(topics = "video-processing")
public void processVideo(VideoProcessingEvent event) {
// 異步執行耗時操作
videoProcessor.transcode(event.getVideoId());
videoProcessor.generateThumbnails(event.getVideoId());
contentModerationService.checkContent(event.getVideoId());
// 發送處理完成通知
notificationService.notifyUser(event.getUserId(), event.getVideoId());
}
}
架構優勢
- 快速響應:用戶上傳后立即得到響應
- 彈性擴展:可以根據處理壓力動態調整消費者數量
- 故障隔離:處理服務故障不會影響上傳功能
場景三:流量削峰
背景描述
電商秒殺活動時,瞬時流量可能是平時的百倍以上,直接沖擊數據庫和服務。
MQ解決方案

代碼實現:
// 秒殺服務
@Service
public class SecKillService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
public SecKillResponse secKill(SecKillRequest request) {
// 1. 校驗用戶資格
if (!checkUserQualification(request.getUserId())) {
return SecKillResponse.failed("用戶無資格");
}
// 2. 預減庫存(Redis原子操作)
Long remaining = redisTemplate.opsForValue().decrement(
"sec_kill_stock:" + request.getItemId());
if (remaining == null || remaining < 0) {
// 庫存不足,恢復庫存
redisTemplate.opsForValue().increment("sec_kill_stock:" + request.getItemId());
return SecKillResponse.failed("庫存不足");
}
// 3. 發送秒殺成功消息到MQ
rabbitTemplate.convertAndSend(
"sec_kill.exchange",
"sec_kill.success",
new SecKillSuccessEvent(request.getUserId(), request.getItemId())
);
return SecKillResponse.success("秒殺成功");
}
}
// 訂單處理消費者
@Component
@RabbitListener(queues = "sec_kill.order.queue")
public class SecKillOrderConsumer {
@RabbitHandler
public void handleSecKillSuccess(SecKillSuccessEvent event) {
// 異步創建訂單
orderService.createSecKillOrder(event.getUserId(), event.getItemId());
}
}
技術要點
- 庫存預扣:使用Redis原子操作避免超賣
- 隊列緩沖:MQ緩沖請求,避免直接沖擊數據庫
- 限流控制:在網關層進行限流,拒絕過多請求
場景四:數據同步
背景描述
在微服務架構中,不同服務有自己的數據庫,需要保證數據一致性。
MQ解決方案
// 用戶服務 - 數據變更時發送消息
@Service
public class UserService {
@Transactional
public User updateUser(User user) {
// 1. 更新數據庫
userDao.update(user);
// 2. 發送消息(在事務內)
rocketMQTemplate.sendMessageInTransaction(
"user-update-topic",
MessageBuilder.withPayload(new UserUpdateEvent(user.getId(), user.getStatus()))
.build(),
null
);
return user;
}
}
// 其他服務 - 消費用戶更新消息
@Service
@RocketMQMessageListener(topic = "user-update-topic", consumerGroup = "order-group")
public class UserUpdateConsumer implements RocketMQListener<UserUpdateEvent> {
@Override
public void onMessage(UserUpdateEvent event) {
// 更新本地用戶信息緩存
orderService.updateUserCache(event.getUserId(), event.getStatus());
}
}
一致性保證
- 本地事務表:將消息和業務數據放在同一個數據庫事務中
- 事務消息:使用RocketMQ的事務消息機制
- 冪等消費:消費者實現冪等性,避免重復處理
場景五:日志收集
背景描述
分布式系統中,日志分散在各個節點,需要集中收集和分析。
MQ解決方案

代碼實現:
// 日志收集組件
@Component
public class LogCollector {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void collectLog(String appId, String level, String message, Map<String, Object> context) {
LogEvent logEvent = new LogEvent(appId, level, message, context, System.currentTimeMillis());
// 發送到Kafka
kafkaTemplate.send("app-logs", appId, JsonUtils.toJson(logEvent));
}
}
// 日志消費者
@Service
public class LogConsumer {
@KafkaListener(topics = "app-logs", groupId = "log-es")
public void consumeLog(String message) {
LogEvent logEvent = JsonUtils.fromJson(message, LogEvent.class);
// 存儲到Elasticsearch
elasticsearchService.indexLog(logEvent);
// 實時監控檢查
if ("ERROR".equals(logEvent.getLevel())) {
alertService.checkAndAlert(logEvent);
}
}
}
技術優勢
- 解耦:應用節點無需關心日志如何處理
- 緩沖:應對日志產生速率波動
- 多消費:同一份日志可以被多個消費者處理
場景六:消息廣播
背景描述
系統配置更新后,需要通知所有服務節點更新本地配置。
MQ解決方案
// 配置服務 - 廣播配置更新
@Service
public class ConfigService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void updateConfig(String configKey, String configValue) {
// 1. 更新配置存儲
configDao.updateConfig(configKey, configValue);
// 2. 廣播配置更新消息
redisTemplate.convertAndSend("config-update-channel",
new ConfigUpdateEvent(configKey, configValue));
}
}
// 服務節點 - 訂閱配置更新
@Component
public class ConfigUpdateListener {
@Autowired
private LocalConfigCache localConfigCache;
@RedisListener(channel = "config-update-channel")
public void handleConfigUpdate(ConfigUpdateEvent event) {
// 更新本地配置緩存
localConfigCache.updateConfig(event.getKey(), event.getValue());
}
}
應用場景
- 功能開關:動態開啟或關閉功能
- 參數調整:調整超時時間、限流閾值等
- 黑白名單:更新黑白名單配置
場景七:順序消息
背景描述
在某些業務場景中,消息的處理順序很重要,如訂單狀態變更。
MQ解決方案
// 訂單狀態變更服務
@Service
public class OrderStateService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void changeOrderState(String orderId, String oldState, String newState) {
OrderStateEvent event = new OrderStateEvent(orderId, oldState, newState);
// 發送順序消息,使用orderId作為sharding key
rocketMQTemplate.syncSendOrderly(
"order-state-topic",
event,
orderId // 保證同一訂單的消息按順序處理
);
}
}
// 訂單狀態消費者
@Service
@RocketMQMessageListener(
topic = "order-state-topic",
consumerGroup = "order-state-group",
consumeMode = ConsumeMode.ORDERLY // 順序消費
)
public class OrderStateConsumer implements RocketMQListener<OrderStateEvent> {
@Override
public void onMessage(OrderStateEvent event) {
// 按順序處理訂單狀態變更
orderService.processStateChange(event);
}
}
順序保證機制
- 分區順序:同一分區內的消息保證順序
- 順序投遞:MQ保證消息按發送順序投遞
- 順序處理:消費者順序處理消息
場景八:延遲消息
背景描述
需要實現定時任務,如訂單超時未支付自動取消。
MQ解決方案
// 訂單服務 - 發送延遲消息
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void createOrder(Order order) {
// 保存訂單
orderDao.save(order);
// 發送延遲消息,30分鐘后檢查支付狀態
rabbitTemplate.convertAndSend(
"order.delay.exchange",
"order.create",
new OrderCreateEvent(order.getId()),
message -> {
message.getMessageProperties().setDelay(30 * 60 * 1000); // 30分鐘
return message;
}
);
}
}
// 訂單超時檢查消費者
@Component
@RabbitListener(queues = "order.delay.queue")
public class OrderTimeoutConsumer {
@RabbitHandler
public void checkOrderPayment(OrderCreateEvent event) {
Order order = orderDao.findById(event.getOrderId());
if ("UNPAID".equals(order.getStatus())) {
// 超時未支付,取消訂單
orderService.cancelOrder(order.getId(), "超時未支付");
}
}
}
替代方案對比
| 方案 | 優點 | 缺點 |
|---|---|---|
| 數據庫輪詢 | 實現簡單 | 實時性差,數據庫壓力大 |
| 延時隊列 | 實時性好 | 實現復雜,消息堆積問題 |
| 定時任務 | 可控性強 | 分布式協調復雜 |
場景九:消息重試
背景描述
處理消息時可能遇到臨時故障,需要重試機制保證最終處理成功。
MQ解決方案
// 消息消費者 with 重試機制
@Service
@Slf4j
public class RetryableConsumer {
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "business.queue")
public void processMessage(Message message, Channel channel) {
try {
// 業務處理
businessService.process(message);
// 確認消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (TemporaryException e) {
// 臨時異常,重試
log.warn("處理失敗,準備重試", e);
// 拒絕消息,requeue=true
channel.basicNack(
message.getMessageProperties().getDeliveryTag(),
false,
true // 重新入隊
);
} catch (PermanentException e) {
// 永久異常,進入死信隊列
log.error("處理失敗,進入死信隊列", e);
channel.basicNack(
message.getMessageProperties().getDeliveryTag(),
false,
false // 不重新入隊
);
}
}
}
重試策略
- 立即重試:臨時故障立即重試
- 延遲重試:逐步增加重試間隔
- 死信隊列:最終無法處理的消息進入死信隊列
場景十:事務消息
背景描述
分布式系統中,需要保證多個服務的數據一致性。
MQ解決方案
// 事務消息生產者
@Service
public class TransactionalMessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Transactional
public void createOrderWithTransaction(Order order) {
// 1. 保存訂單(數據庫事務)
orderDao.save(order);
// 2. 發送事務消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"order-tx-topic",
MessageBuilder.withPayload(new OrderCreatedEvent(order.getId()))
.build(),
order // 事務參數
);
if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {
throw new RuntimeException("事務消息發送失敗");
}
}
}
// 事務消息監聽器
@Component
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderDao orderDao;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 檢查本地事務狀態
Order order = (Order) arg;
Order existOrder = orderDao.findById(order.getId());
if (existOrder != null && "CREATED".equals(existOrder.getStatus())) {
return RocketMQLocalTransactionState.COMMIT_MESSAGE;
} else {
return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
return RocketMQLocalTransactionState.UNKNOWN;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 回查本地事務狀態
String orderId = (String) msg.getHeaders().get("order_id");
Order order = orderDao.findById(orderId);
if (order != null && "CREATED".equals(order.getStatus())) {
return RocketMQLocalTransactionState.COMMIT_MESSAGE;
} else {
return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
事務消息流程

總結
通過以上10個場景,我們可以總結出MQ使用的核心原則:
適用場景
- 異步處理:提升系統響應速度
- 系統解耦:降低系統間依賴
- 流量削峰:應對突發流量
- 數據同步:保證最終一致性
- 分布式事務:解決數據一致性問題
技術選型建議
| 場景 | 推薦MQ | 原因 |
|---|---|---|
| 高吞吐 | Kafka | 高吞吐量,持久化存儲 |
| 事務消息 | RocketMQ | 完整的事務消息機制 |
| 復雜路由 | RabbitMQ | 靈活的路由配置 |
| 延遲消息 | RabbitMQ | 原生支持延遲隊列 |
最佳實踐
- 消息冪等性:消費者必須實現冪等處理
- 死信隊列:處理失敗的消息要有兜底方案
- 監控告警:完善的消息堆積監控和告警
- 性能優化:根據業務特點調整MQ參數
最后說一句(求關注,別白嫖我)
如果這篇文章對您有所幫助,或者有所啟發的話,幫忙關注一下我的同名公眾號:蘇三說技術,您的支持是我堅持寫作最大的動力。
求一鍵三連:點贊、轉發、在看。
關注公眾號:【蘇三說技術】,在公眾號中回復:進大廠,可以免費獲取我最近整理的10萬字的面試寶典,好多小伙伴靠這個寶典拿到了多家大廠的offer。
本文收錄于我的技術網站:http://www.susan.net.cn

浙公網安備 33010602011771號