RocketMQ事務消息源碼解析
RocketMQ提供了事務消息的功能,采用2PC(兩階段協(xié)議)+補償機制(事務回查)的分布式事務功能,通過這種方式能達到分布式事務的最終一致。
一. 概述
- 半事務消息:指的是發(fā)送至broker但是還沒被commit的消息,在半事務消息被確認之前都是無法被消費者消費的。
- 消息回查:由于網(wǎng)絡閃斷、生產者應用重啟等原因,導致某條事務消息的二次確認丟失,broker 通過掃描發(fā)現(xiàn)某條消息長期處于“半事務消息”時,需要主動向消息生產者詢問該消息的最終狀態(tài)(commit 或是 rollback),該詢問過程即消息回查。
二. 交互流程

事務消息發(fā)送步驟如下:
-
發(fā)送方將半事務消息發(fā)送至broker。
-
broker將消息持久化成功之后,向發(fā)送方返回 Ack,確認消息已經(jīng)發(fā)送成功,此時消息為半事務消息。
-
發(fā)送方開始執(zhí)行本地事務邏輯。
-
發(fā)送方根據(jù)本地事務執(zhí)行結果向服務端提交二次確認(commit 或是 rollback),服務端收到 commit 狀態(tài)則將半事務消息標記為可投遞,訂閱方最終將收到該消息;服務端收到 rollback 狀態(tài)則“刪除”半事務消息,訂閱方將不會接受該消息。
-
在斷網(wǎng)或者是應用重啟的特殊情況下,上述步驟 4 提交的二次確認最終未到達服務端,經(jīng)過固定時間后服務端將對該消息發(fā)起消息回查。
-
發(fā)送方收到消息回查后,需要檢查對應消息的本地事務執(zhí)行的最終結果。
-
發(fā)送方根據(jù)檢查得到的本地事務的最終狀態(tài)再次提交二次確認,服務端仍按照步驟 4 對半事務消息進行操作。
三. 示例代碼
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 執(zhí)行業(yè)務代碼 ....
// 最終返回業(yè)務代碼的事務狀態(tài)
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 回查本地事務狀態(tài)
return LocalTransactionState.ROLLBACK_MESSAGE;
}
});
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
TransactionMQProducer 支持事務消息的生產者,繼承自 DefaultMQProducer,在默認生產者上進行了擴展,支持發(fā)送事務消息。它擁有一個線程池 executorService 用來異步執(zhí)行本地事務和回查事務,還需要注冊 TransactionListener 事務監(jiān)聽器,里面包含了執(zhí)行本地事務和回查事務的邏輯。
三. 源碼分析
整個事務消息的處理流程,可以分為以下五個步驟:
- Half消息發(fā)送
- Half消息存儲
- 提交事務狀態(tài)
- 處理事務狀態(tài)
- 事務回查
3.1 Half消息發(fā)送
除事務回查外,事務消息的時序圖大致如下:

TransactionMQProducer 是RocketMQ提供的,支持發(fā)送事務消息的生產者,它繼承自 DefaultMQProducer,也是一個外觀類,代碼非常的簡單,核心邏輯依然在 DefaultMQProducerImpl,屬性如下:
public class TransactionMQProducer extends DefaultMQProducer {
// 事務回查監(jiān)聽
private TransactionCheckListener transactionCheckListener;
// 回查線程池最小線程數(shù)
private int checkThreadPoolMinSize = 1;
// 回查線程池最大線程數(shù)
private int checkThreadPoolMaxSize = 1;
// 最大回查請求數(shù),阻塞隊列容量
private int checkRequestHoldMax = 2000;
// 執(zhí)行本地事務/事務回查的線程池
private ExecutorService executorService;
// 事務監(jiān)聽器:本地事務、事務回查邏輯
private TransactionListener transactionListener;
}
啟動 TransactionMQProducer,必須先注冊 TransactionListener,實現(xiàn)本地事務的執(zhí)行邏輯和事務回查邏輯,Producer在發(fā)送Half消息成功后會自動執(zhí)行executeLocalTransaction,在Broker請求事務回查時自動執(zhí)行checkLocalTransaction。
然后,Producer就可以啟動了,在啟動默認Producer之前,會對checkExecutor和checkRequestQueue進行初始化,如果沒有設置線程池會自動創(chuàng)建。
public void initTransactionEnv() {
TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
if (producer.getExecutorService() != null) {
this.checkExecutor = producer.getExecutorService();
} else {
// 事務回查請求隊列
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
// 事務回查線程池
this.checkExecutor = new ThreadPoolExecutor(
producer.getCheckThreadPoolMinSize(),
producer.getCheckThreadPoolMaxSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.checkRequestQueue);
}
}
初始化完成以后,就是Producer的正常啟動邏輯,這里不再贅述。
發(fā)送事務消息對應的方法是sendMessageInTransaction:
// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
// 判斷檢查本地事務Listener是否存在
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// ignore DelayTimeLevel parameter
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
// 檢查消息內容
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
// 標識當前消息為事務消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
// 發(fā)送消息
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
// 發(fā)送消息成功,執(zhí)行本地事務
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
// 執(zhí)行endTransaction方法,如果半消息發(fā)送失敗或本地事務執(zhí)行失敗告訴服務端是刪除半消息,
// 半消息發(fā)送成功且本地事務執(zhí)行成功則告訴broker提交半消息
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
// .......
}
它主要做了以下事情:
- 設置屬性TRAN_MSG=true
- 同步發(fā)送Half消息
- 消息發(fā)送成功,執(zhí)行本地事務
- 提交本地事務狀態(tài)
Tips:事務消息不支持延時,會在發(fā)送前,自動忽略延遲級別。
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Broker判斷是否是事務消息的依據(jù)是通過Properties的TRAN_MSG屬性判斷的,Producer在發(fā)送消息前會進行設置。
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
消息屬性設置完畢,調用send方法同步發(fā)送給Broker,并獲取發(fā)送狀態(tài)。
SendResult sendResult = this.send(msg);
3.2 Half消息存儲
Half消息發(fā)送到Broker,Broker要負責存儲,且此時消息對Consumer是不可見的,看看它是如何處理的。
SendMessageProcessor 會對接收到的消息進行判斷,如果是事務消息,會轉交給TransactionalMessageService 處理,普通消息直接轉交給 MessageStore 處理。
// org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage
// 是否是事務消息?通過TRAN_MSG屬性判斷
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
// 事務消息的處理
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
}
TransactionalMessageService 使用了橋接模式,大部分操作會交給橋接類 TransactionalMessageBridge 執(zhí)行。在處理Half消息時,為了不讓Consumer可見,會像處理延遲消息一樣,改寫Topic和queueId,將消息統(tǒng)一扔到RMQ_SYS_TRANS_HALF_TOPIC這個Topic下,默認的queueId為0。同時,為了后續(xù)消息Commit時重新寫入正常消息,必須將真實的Topic和queueId等屬性先保留到Properties中。
// org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
// 真實的Topic和queueId存儲到Properties
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 改寫Topic為:RMQ_SYS_TRANS_HALF_TOPIC
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
消息的Topic被改寫后,正常寫入CommitLog,但不會對Consumer可見。
3.3 提交事務狀態(tài)
Broker將消息寫入CommitLog后,會返回結果SendResult,如果發(fā)送成功,Producer開始執(zhí)行本地事務:
// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction
if (sendResult.getTransactionId() != null) {
// 設置事務ID
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
// 老的本地事務處理,已經(jīng)被廢棄
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
// 執(zhí)行本地事務,獲取事務狀態(tài)
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
將本地事務執(zhí)行狀態(tài) LocalTransactionState 提交到 Broker,方法是endTransaction。先根據(jù) MessageQueue 找到Broker的主機地址,然后構建提交事務請求頭 EndTransactionRequestHeader 并設置相關屬性,請求頭屬性如下:
public class EndTransactionRequestHeader implements CommandCustomHeader {
// 生產者組
private String producerGroup;
// ConsumeQueue Offset
private Long tranStateTableOffset;
// 消息所在CommitLog偏移量
private Long commitLogOffset;
// 事務狀態(tài)
private Integer commitOrRollback;
// 是否Broker發(fā)起的回查?
private Boolean fromTransactionCheck = false;
// 消息ID
private String msgId;
// 事務ID
private String transactionId;
}
事務狀態(tài) commitOrRollback 用數(shù)字表示,8代表Commit、12代表Rollback、0代表未知狀態(tài)。請求頭構建好以后,通過Netty發(fā)送數(shù)據(jù)包給Broker,對應的RequestCode為END_TRANSACTION。
public void endTransaction(
final Message msg,
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
// 解析MessageId,內含消息Offset
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
// 獲取MessageQueue所在Broker的Master主機地址
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
// 創(chuàng)建請求頭
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
// 設置事務ID和偏移量
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
// 設置事務狀態(tài)
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
// 執(zhí)行鉤子函數(shù)
doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
// 發(fā)送請求
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}
3.4 處理事務狀態(tài)
Broker通過 EndTransactionProcessor 類來處理 Producer 提交的事務請求,首先做校驗,確保是Master處理該請求,因為Slave是沒有寫權限的
// org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
return response;
}
然后,解析請求頭,獲取事務狀態(tài),如果是commit操作,則根據(jù)請求頭里的 CommitLogOffset 讀取出完整的消息,從 Properties 中恢復消息真實的 Topic、queueId 等屬性,再調用 sendFinalMessage 方法將消息重新寫入 CommitLog,稍后構建好 ConsumeQueue 消息對 Consumer 就可見了,最后調用deletePrepareMessage方法刪除Half消息。
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 提交事務消息
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 創(chuàng)建新的Message,恢復真實的Topic、queueId等屬性,重新寫入CommitLog
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
/**
* 事務消息提交,刪除Half消息
* Half消息不會真的被刪除,通過寫入Op消息來標記它被處理。
*/
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
}
如果是Rollback,處理就更加簡單了,因為消息本來就對Consumer是不可見的,只需要刪除Half消息即可。
else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
// 回滾事務消息,事實上沒做任何處理
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 寫入Op消息,代表Half消息被處理
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
實際上,Half消息并不會刪除,因為CommitLog是順序寫的,不可能刪除單個消息。「刪除Half消息」僅僅是給該消息打上一個標記,代表它的最終狀態(tài)已知,不需要再回查了。RocketMQ通過引入Op消息來給Half消息打標記,Half消息狀態(tài)確認后,會寫入一條消息到Op隊列,對應的Topic為MQ_SYS_TRANS_OP_HALF_TOPIC,反之Op隊列中不存在的,就是狀態(tài)未確認,需要回查的Half消息。
3.5 事務回查
Broker端發(fā)起消息事務回查的時序圖如下:

Half消息寫入成功,可能因為種種原因沒有收到Producer的事務狀態(tài)提交請求。此時,Broker會主動發(fā)起事務回查請求給Producer,以決定最終將消息Commit還是Rollback。
Half消息最終狀態(tài)有沒有被確認,是通過Op隊列里的消息判斷的。Broker服務啟動時,會開啟TransactionalMessageCheckService線程,每隔60秒進行一次消息回查。為了避免消息被無限次的回查,RocketMQ通過transactionCheckMax屬性設置消息回查的最大次數(shù),默認是15次。
// org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#onWaitEnd
protected void onWaitEnd() {
// 回查超時
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
// 回查最大次數(shù)
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
// 開始回查
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
}
回查Half消息時,首先要獲取Half主題下的所有消息隊列。
// 獲取RMQ_SYS_TRANS_HALF_TOPIC下所有隊列
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
然后遍歷所有的MessageQueue,按個處理所有隊列里的待回查的消息。怎么判斷消息需要回查呢?前面說過了,通過Op隊列判斷,因此還需要定位到HalfQueue對應的OpQueue,以及它們的ConsumeQueue偏移量。
// 獲取對應的 Op隊列
MessageQueue opQueue = getOpQueue(messageQueue);
// 獲取ConsumeQueue Offset
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
然后,從CommitLog讀取出完整的消息。
// 根據(jù)offset去CommitLog讀取出消息
private GetResult getHalfMsg(MessageQueue messageQueue, long offset) {
GetResult getResult = new GetResult();
PullResult result = pullHalfMsg(messageQueue, offset, PULL_MSG_RETRY_NUMBER);
getResult.setPullResult(result);
List<MessageExt> messageExts = result.getMsgFoundList();
if (messageExts == null) {
return getResult;
}
getResult.setMsg(messageExts.get(0));
return getResult;
}
判斷回查次數(shù)是否已達上限,如果是的話,就統(tǒng)一扔到TRANS_CHECK_MAX_TIME_TOPIC下。
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
// 回查次數(shù)超過15,丟棄消息,扔到TRANS_CHECK_MAX_TIME_TOPIC
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}
如果判斷消息確實需要回查,會調用AbstractTransactionalMessageCheckListener的sendCheckMessage方法,恢復消息真實的Topic、queueId等屬性,然后發(fā)回給Producer進行事務的回查確認。
// 發(fā)送事務回查消息給Producer
public void sendCheckMessage(MessageExt msgExt) throws Exception {
CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
// 獲取消息生產的GroupId
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
// 輪詢出一臺Producer實例
Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
if (channel != null) {
// 發(fā)送回查請求
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else {
LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
}
}
Broker將回查請求發(fā)送給Producer后,Producer會執(zhí)行checkLocalTransaction方法檢查本地事務,然后將事務狀態(tài)再發(fā)送給Broker,重復上述流程。
四. 總結
RocketMQ實現(xiàn)事務消息的原理和實現(xiàn)延遲消息的原理類似,都是通過改寫Topic和queueId,暫時將消息先寫入一個對Consumer不可見的隊列中,然后等待Producer執(zhí)行本地事務,提交事務狀態(tài)后再決定將Half消息Commit或者Rollback。同時,可能因為服務宕機或網(wǎng)絡抖動等原因,Broker沒有收到Producer的事務狀態(tài)提交請求,為了對二階段進行補償,Broker會主動對未確認的Half消息進行事務回查,判斷消息的最終狀態(tài)是否確認,是通過Op隊列實現(xiàn)的,Half消息一旦確認事務狀態(tài),就會往Op隊列中寫入一條消息,消息內容是Half消息所在ConsumeQueue的偏移量。
本文參考轉載至:

浙公網(wǎng)安備 33010602011771號