day04
MQ高級
保證消息可靠性——發送者可靠性
生產者重試機制
引入生產者重試機制,用于解決發送失敗場景,配置
mq自動重發機制注意:
當網絡不穩定的時候,利用重試機制可以有效提高消息發送的成功率。不過SpringAMQP提供的重試機制是阻塞式的重試,也就是說多次重試等待的過程中,當前線程是被阻塞的。
如果對于業務性能有要求,建議禁用重試機制。如果一定要使用,請合理配置等待時長和重試次數,當然也可以考慮使用異步線程來執行發送消息的代碼。
spring:
rabbitmq:
connection-timeout: 1s # 設置MQ的連接超時時間
template:
retry:
enabled: true # 開啟超時重試機制
initial-interval: 1000ms # 失敗后的初始等待時間
multiplier: 1 # 失敗后下次的等待時長倍數,下次等待時長 = initial-interval * multiplier
max-attempts: 3 # 最大重試次數
生產者確認機制
在少數情況下,也會出現消息發送到MQ之后丟失的現象,MQ對此提供了生產者確認機制.
在開啟確認機制的情況下,當生產者發送消息給MQ后,MQ會根據消息處理的情況返回不同的回執:
- 當消息投遞到MQ,但是路由失敗時,通過Publisher Return返回異常信息,同時返回ack的確認信息,代表投遞成功
- 臨時消息投遞到了MQ,并且入隊成功,返回ACK,告知投遞成功
- 持久消息投遞到了MQ,并且入隊完成持久化,返回ACK ,告知投遞成功
- 其它情況都會返回NACK,告知投遞失敗
開啟生產者確認機制:
- 通過配置文件開啟機制
一共有三種type,通常用none或correlated(異步回調)
spring:
rabbitmq:
publisher-confirm-type: correlated # 開啟publisher confirm機制,并設置confirm類型
publisher-returns: true # 開啟publisher return機制
- 定義
ReturnCallback回調
每個RabbitTemplate只能配置一個ReturnCallback,因此可以在配置類中統一配置
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
//1
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
//2
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("觸發return callback,");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
}
});
}
}
- 定義
ConfirmCallback
定義這個來監聽返回的回執是
ack還是nack并進行處理,具體來說是在convertAndSend方法時,多傳遞一個CorrelationData參數,通常MQ返回的回執是一個Future,我們需要為Future添加一個回調函數
@Test
void testPublisherConfirm() {
// 1.創建CorrelationData
CorrelationData cd = new CorrelationData();
// 2.給Future添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
// 2.1.Future發生異常時的處理邏輯,基本不會觸發
log.error("send message fail", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
// 2.2.Future接收到回執的處理邏輯,參數中的result就是回執內容
if(result.isAck()){ // result.isAck(),boolean類型,true代表ack回執,false 代表 nack回執
log.debug("發送消息成功,收到 ack!");
}else{ // result.getReason(),String類型,返回nack時的異常描述
log.error("發送消息失敗,收到 nack, reason : {}", result.getReason());
}
}
});
// 3.發送消息
rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}
MQ的可靠性
- 數據持久化
- Lazy Queue
延遲消息
莫愁前路無知己,天下誰人不識君

浙公網安備 33010602011771號