<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      RocketMQ事務消息源碼解析

      RocketMQ提供了事務消息的功能,采用2PC(兩階段協(xié)議)+補償機制(事務回查)的分布式事務功能,通過這種方式能達到分布式事務的最終一致。

      一. 概述

      • 半事務消息:指的是發(fā)送至broker但是還沒被commit的消息,在半事務消息被確認之前都是無法被消費者消費的。
      • 消息回查:由于網(wǎng)絡閃斷、生產者應用重啟等原因,導致某條事務消息的二次確認丟失,broker 通過掃描發(fā)現(xiàn)某條消息長期處于“半事務消息”時,需要主動向消息生產者詢問該消息的最終狀態(tài)(commit 或是 rollback),該詢問過程即消息回查。

      二. 交互流程

      img

      事務消息發(fā)送步驟如下:

      1. 發(fā)送方將半事務消息發(fā)送至broker。

      2. broker將消息持久化成功之后,向發(fā)送方返回 Ack,確認消息已經(jīng)發(fā)送成功,此時消息為半事務消息。

      3. 發(fā)送方開始執(zhí)行本地事務邏輯。

      4. 發(fā)送方根據(jù)本地事務執(zhí)行結果向服務端提交二次確認(commit 或是 rollback),服務端收到 commit 狀態(tài)則將半事務消息標記為可投遞,訂閱方最終將收到該消息;服務端收到 rollback 狀態(tài)則“刪除”半事務消息,訂閱方將不會接受該消息。

      5. 在斷網(wǎng)或者是應用重啟的特殊情況下,上述步驟 4 提交的二次確認最終未到達服務端,經(jīng)過固定時間后服務端將對該消息發(fā)起消息回查。

      6. 發(fā)送方收到消息回查后,需要檢查對應消息的本地事務執(zhí)行的最終結果。

      7. 發(fā)送方根據(jù)檢查得到的本地事務的最終狀態(tài)再次提交二次確認,服務端仍按照步驟 4 對半事務消息進行操作。

      三. 示例代碼

      public static void main(String[] args) throws MQClientException, InterruptedException {
          TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
          ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
              @Override
              public Thread newThread(Runnable r) {
                  Thread thread = new Thread(r);
                  thread.setName("client-transaction-msg-check-thread");
                  return thread;
              }
          });
      
          producer.setExecutorService(executorService);
          producer.setTransactionListener(new TransactionListener() {
              @Override
              public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                  // 執(zhí)行業(yè)務代碼 ....
      
                  // 最終返回業(yè)務代碼的事務狀態(tài)
                  return LocalTransactionState.COMMIT_MESSAGE;
              }
      
              @Override
              public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                  // 回查本地事務狀態(tài)
                  return LocalTransactionState.ROLLBACK_MESSAGE;
              }
          });
          producer.start();
      
          String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
          for (int i = 0; i < 10; i++) {
              try {
                  Message msg =
                      new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                          ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                  SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                  System.out.printf("%s%n", sendResult);
      
                  Thread.sleep(10);
              } catch (MQClientException | UnsupportedEncodingException e) {
                  e.printStackTrace();
              }
          }
      
          for (int i = 0; i < 100000; i++) {
              Thread.sleep(1000);
          }
          producer.shutdown();
      }
      

      TransactionMQProducer 支持事務消息的生產者,繼承自 DefaultMQProducer,在默認生產者上進行了擴展,支持發(fā)送事務消息。它擁有一個線程池 executorService 用來異步執(zhí)行本地事務和回查事務,還需要注冊 TransactionListener 事務監(jiān)聽器,里面包含了執(zhí)行本地事務和回查事務的邏輯。

      三. 源碼分析

      整個事務消息的處理流程,可以分為以下五個步驟:

      1. Half消息發(fā)送
      2. Half消息存儲
      3. 提交事務狀態(tài)
      4. 處理事務狀態(tài)
      5. 事務回查

      3.1 Half消息發(fā)送

      除事務回查外,事務消息的時序圖大致如下:

      TransactionMQProducer 是RocketMQ提供的,支持發(fā)送事務消息的生產者,它繼承自 DefaultMQProducer,也是一個外觀類,代碼非常的簡單,核心邏輯依然在 DefaultMQProducerImpl,屬性如下:

      public class TransactionMQProducer extends DefaultMQProducer {
          // 事務回查監(jiān)聽
          private TransactionCheckListener transactionCheckListener;
          // 回查線程池最小線程數(shù)
          private int checkThreadPoolMinSize = 1;
          // 回查線程池最大線程數(shù)
          private int checkThreadPoolMaxSize = 1;
          // 最大回查請求數(shù),阻塞隊列容量
          private int checkRequestHoldMax = 2000;
          // 執(zhí)行本地事務/事務回查的線程池
          private ExecutorService executorService;
          // 事務監(jiān)聽器:本地事務、事務回查邏輯
          private TransactionListener transactionListener;
      }
      

      啟動 TransactionMQProducer,必須先注冊 TransactionListener,實現(xiàn)本地事務的執(zhí)行邏輯和事務回查邏輯,Producer在發(fā)送Half消息成功后會自動執(zhí)行executeLocalTransaction,在Broker請求事務回查時自動執(zhí)行checkLocalTransaction

      然后,Producer就可以啟動了,在啟動默認Producer之前,會對checkExecutorcheckRequestQueue進行初始化,如果沒有設置線程池會自動創(chuàng)建。

      public void initTransactionEnv() {
          TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
          if (producer.getExecutorService() != null) {
              this.checkExecutor = producer.getExecutorService();
          } else {
              // 事務回查請求隊列
              this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
              // 事務回查線程池
              this.checkExecutor = new ThreadPoolExecutor(
                  producer.getCheckThreadPoolMinSize(),
                  producer.getCheckThreadPoolMaxSize(),
                  1000 * 60,
                  TimeUnit.MILLISECONDS,
                  this.checkRequestQueue);
          }
      }
      

      初始化完成以后,就是Producer的正常啟動邏輯,這里不再贅述。

      發(fā)送事務消息對應的方法是sendMessageInTransaction

      // org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction
      public TransactionSendResult sendMessageInTransaction(final Message msg,
          final LocalTransactionExecuter localTransactionExecuter, final Object arg)
          throws MQClientException {
          // 判斷檢查本地事務Listener是否存在
          TransactionListener transactionListener = getCheckListener();
          if (null == localTransactionExecuter && null == transactionListener) {
              throw new MQClientException("tranExecutor is null", null);
          }
      
          // ignore DelayTimeLevel parameter
          if (msg.getDelayTimeLevel() != 0) {
              MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
          }
      
          // 檢查消息內容
          Validators.checkMessage(msg, this.defaultMQProducer);
      
          SendResult sendResult = null;
          // 標識當前消息為事務消息
          MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
          MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
          try {
              // 發(fā)送消息
              sendResult = this.send(msg);
          } catch (Exception e) {
              throw new MQClientException("send message Exception", e);
          }
      
          LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
          Throwable localException = null;
          switch (sendResult.getSendStatus()) {
              case SEND_OK: {
                  try {
                      if (sendResult.getTransactionId() != null) {
                          msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                      }
                      String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                      if (null != transactionId && !"".equals(transactionId)) {
                          msg.setTransactionId(transactionId);
                      }
                      if (null != localTransactionExecuter) {
                          localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                      } else if (transactionListener != null) {
                          // 發(fā)送消息成功,執(zhí)行本地事務
                          log.debug("Used new transaction API");
                          localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                      }
                      if (null == localTransactionState) {
                          localTransactionState = LocalTransactionState.UNKNOW;
                      }
      
                      if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                          log.info("executeLocalTransactionBranch return {}", localTransactionState);
                          log.info(msg.toString());
                      }
                  } catch (Throwable e) {
                      log.info("executeLocalTransactionBranch exception", e);
                      log.info(msg.toString());
                      localException = e;
                  }
              }
              break;
              case FLUSH_DISK_TIMEOUT:
              case FLUSH_SLAVE_TIMEOUT:
              case SLAVE_NOT_AVAILABLE:
                  localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                  break;
              default:
                  break;
          }
      
          try {
              // 執(zhí)行endTransaction方法,如果半消息發(fā)送失敗或本地事務執(zhí)行失敗告訴服務端是刪除半消息,
              // 半消息發(fā)送成功且本地事務執(zhí)行成功則告訴broker提交半消息
              this.endTransaction(sendResult, localTransactionState, localException);
          } catch (Exception e) {
              log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
          }
      
         // .......
      }
      

      它主要做了以下事情:

      1. 設置屬性TRAN_MSG=true
      2. 同步發(fā)送Half消息
      3. 消息發(fā)送成功,執(zhí)行本地事務
      4. 提交本地事務狀態(tài)

      Tips:事務消息不支持延時,會在發(fā)送前,自動忽略延遲級別。

      if (msg.getDelayTimeLevel() != 0) {
          MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
      }
      

      Broker判斷是否是事務消息的依據(jù)是通過Properties的TRAN_MSG屬性判斷的,Producer在發(fā)送消息前會進行設置。

      MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
      

      消息屬性設置完畢,調用send方法同步發(fā)送給Broker,并獲取發(fā)送狀態(tài)。

      SendResult sendResult = this.send(msg);	
      

      3.2 Half消息存儲

      Half消息發(fā)送到Broker,Broker要負責存儲,且此時消息對Consumer是不可見的,看看它是如何處理的。

      SendMessageProcessor 會對接收到的消息進行判斷,如果是事務消息,會轉交給TransactionalMessageService 處理,普通消息直接轉交給 MessageStore 處理。

      // org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage
      
      // 是否是事務消息?通過TRAN_MSG屬性判斷
      String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
      if (transFlag != null && Boolean.parseBoolean(transFlag)) {
          // 事務消息的處理
          if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
              response.setCode(ResponseCode.NO_PERMISSION);
              response.setRemark(
                  "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                  + "] sending transaction message is forbidden");
              return CompletableFuture.completedFuture(response);
          }
          putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
      }
      

      TransactionalMessageService 使用了橋接模式,大部分操作會交給橋接類 TransactionalMessageBridge 執(zhí)行。在處理Half消息時,為了不讓Consumer可見,會像處理延遲消息一樣,改寫Topic和queueId,將消息統(tǒng)一扔到RMQ_SYS_TRANS_HALF_TOPIC這個Topic下,默認的queueId為0。同時,為了后續(xù)消息Commit時重新寫入正常消息,必須將真實的Topic和queueId等屬性先保留到Properties中。

      // org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner
      private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
          // 真實的Topic和queueId存儲到Properties
          MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
          MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
                                      String.valueOf(msgInner.getQueueId()));
          msgInner.setSysFlag(
              MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
          // 改寫Topic為:RMQ_SYS_TRANS_HALF_TOPIC
          msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
          msgInner.setQueueId(0);
          msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
          return msgInner;
      }
      

      消息的Topic被改寫后,正常寫入CommitLog,但不會對Consumer可見。

      3.3 提交事務狀態(tài)

      Broker將消息寫入CommitLog后,會返回結果SendResult,如果發(fā)送成功,Producer開始執(zhí)行本地事務:

      // org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction
      if (sendResult.getTransactionId() != null) {
          // 設置事務ID
          msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
      }
      String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
      if (null != transactionId && !"".equals(transactionId)) {
          msg.setTransactionId(transactionId);
      }
      if (null != localTransactionExecuter) {
          // 老的本地事務處理,已經(jīng)被廢棄
          localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
      } else if (transactionListener != null) {
          // 執(zhí)行本地事務,獲取事務狀態(tài)
          localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
      }
      if (null == localTransactionState) {
          localTransactionState = LocalTransactionState.UNKNOW;
      }
      
      if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
          log.info("executeLocalTransactionBranch return {}", localTransactionState);
          log.info(msg.toString());
      }
      

      將本地事務執(zhí)行狀態(tài) LocalTransactionState 提交到 Broker,方法是endTransaction。先根據(jù) MessageQueue 找到Broker的主機地址,然后構建提交事務請求頭 EndTransactionRequestHeader 并設置相關屬性,請求頭屬性如下:

      public class EndTransactionRequestHeader implements CommandCustomHeader {
          // 生產者組
          private String producerGroup;
          // ConsumeQueue Offset
          private Long tranStateTableOffset;
          // 消息所在CommitLog偏移量
          private Long commitLogOffset;
          // 事務狀態(tài)
          private Integer commitOrRollback;
          // 是否Broker發(fā)起的回查?
          private Boolean fromTransactionCheck = false;
          // 消息ID
          private String msgId;
          // 事務ID
          private String transactionId;
      }
      

      事務狀態(tài) commitOrRollback 用數(shù)字表示,8代表Commit、12代表Rollback、0代表未知狀態(tài)。請求頭構建好以后,通過Netty發(fā)送數(shù)據(jù)包給Broker,對應的RequestCode為END_TRANSACTION

      public void endTransaction(
          final Message msg,
          final SendResult sendResult,
          final LocalTransactionState localTransactionState,
          final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
          final MessageId id;
          // 解析MessageId,內含消息Offset
          if (sendResult.getOffsetMsgId() != null) {
              id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
          } else {
              id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
          }
          String transactionId = sendResult.getTransactionId();
          // 獲取MessageQueue所在Broker的Master主機地址
          final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
          // 創(chuàng)建請求頭
          EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
          // 設置事務ID和偏移量
          requestHeader.setTransactionId(transactionId);
          requestHeader.setCommitLogOffset(id.getOffset());
          // 設置事務狀態(tài)
          switch (localTransactionState) {
              case COMMIT_MESSAGE:
                  requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                  break;
              case ROLLBACK_MESSAGE:
                  requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                  break;
              case UNKNOW:
                  requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                  break;
              default:
                  break;
          }
          // 執(zhí)行鉤子函數(shù)
          doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
          requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
          requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
          requestHeader.setMsgId(sendResult.getMsgId());
          String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
          // 發(fā)送請求
          this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
                                                                         this.defaultMQProducer.getSendMsgTimeout());
      }
      

      3.4 處理事務狀態(tài)

      Broker通過 EndTransactionProcessor 類來處理 Producer 提交的事務請求,首先做校驗,確保是Master處理該請求,因為Slave是沒有寫權限的

      // org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest
      if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
          response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
          LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
          return response;
      }
      

      然后,解析請求頭,獲取事務狀態(tài),如果是commit操作,則根據(jù)請求頭里的 CommitLogOffset 讀取出完整的消息,從 Properties 中恢復消息真實的 TopicqueueId 等屬性,再調用 sendFinalMessage 方法將消息重新寫入 CommitLog,稍后構建好 ConsumeQueue 消息對 Consumer 就可見了,最后調用deletePrepareMessage方法刪除Half消息。

      if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
          // 提交事務消息
          result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
          if (result.getResponseCode() == ResponseCode.SUCCESS) {
              RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
              if (res.getCode() == ResponseCode.SUCCESS) {
                  // 創(chuàng)建新的Message,恢復真實的Topic、queueId等屬性,重新寫入CommitLog
                  MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                  msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                  msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                  msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                  msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                  MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
                  RemotingCommand sendResult = sendFinalMessage(msgInner);
                  if (sendResult.getCode() == ResponseCode.SUCCESS) {
                      /**
                               * 事務消息提交,刪除Half消息
                               * Half消息不會真的被刪除,通過寫入Op消息來標記它被處理。
                               */
                      this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                  }
                  return sendResult;
              }
              return res;
          }
      }
      

      如果是Rollback,處理就更加簡單了,因為消息本來就對Consumer是不可見的,只需要刪除Half消息即可。

      else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
          // 回滾事務消息,事實上沒做任何處理
          result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
          if (result.getResponseCode() == ResponseCode.SUCCESS) {
              RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
              if (res.getCode() == ResponseCode.SUCCESS) {
                  // 寫入Op消息,代表Half消息被處理
                  this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
              }
              return res;
          }
      }
      

      實際上,Half消息并不會刪除,因為CommitLog是順序寫的,不可能刪除單個消息。「刪除Half消息」僅僅是給該消息打上一個標記,代表它的最終狀態(tài)已知,不需要再回查了。RocketMQ通過引入Op消息來給Half消息打標記,Half消息狀態(tài)確認后,會寫入一條消息到Op隊列,對應的Topic為MQ_SYS_TRANS_OP_HALF_TOPIC,反之Op隊列中不存在的,就是狀態(tài)未確認,需要回查的Half消息。

      3.5 事務回查

      Broker端發(fā)起消息事務回查的時序圖如下:

      Half消息寫入成功,可能因為種種原因沒有收到Producer的事務狀態(tài)提交請求。此時,Broker會主動發(fā)起事務回查請求給Producer,以決定最終將消息Commit還是Rollback。

      Half消息最終狀態(tài)有沒有被確認,是通過Op隊列里的消息判斷的。Broker服務啟動時,會開啟TransactionalMessageCheckService線程,每隔60秒進行一次消息回查。為了避免消息被無限次的回查,RocketMQ通過transactionCheckMax屬性設置消息回查的最大次數(shù),默認是15次。

      // org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#onWaitEnd
      protected void onWaitEnd() {
          // 回查超時
          long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
          // 回查最大次數(shù)
          int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
          long begin = System.currentTimeMillis();
          // 開始回查
          this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
      }
      

      回查Half消息時,首先要獲取Half主題下的所有消息隊列。

      // 獲取RMQ_SYS_TRANS_HALF_TOPIC下所有隊列
      String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
      Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
      if (msgQueues == null || msgQueues.size() == 0) {
          log.warn("The queue of topic is empty :" + topic);
          return;
      }
      

      然后遍歷所有的MessageQueue,按個處理所有隊列里的待回查的消息。怎么判斷消息需要回查呢?前面說過了,通過Op隊列判斷,因此還需要定位到HalfQueue對應的OpQueue,以及它們的ConsumeQueue偏移量。

      // 獲取對應的 Op隊列
      MessageQueue opQueue = getOpQueue(messageQueue);
      // 獲取ConsumeQueue Offset
      long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
      long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
      

      然后,從CommitLog讀取出完整的消息。

      // 根據(jù)offset去CommitLog讀取出消息
      private GetResult getHalfMsg(MessageQueue messageQueue, long offset) {
          GetResult getResult = new GetResult();
      
          PullResult result = pullHalfMsg(messageQueue, offset, PULL_MSG_RETRY_NUMBER);
          getResult.setPullResult(result);
          List<MessageExt> messageExts = result.getMsgFoundList();
          if (messageExts == null) {
              return getResult;
          }
          getResult.setMsg(messageExts.get(0));
          return getResult;
      }
      

      判斷回查次數(shù)是否已達上限,如果是的話,就統(tǒng)一扔到TRANS_CHECK_MAX_TIME_TOPIC下。

      if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
          // 回查次數(shù)超過15,丟棄消息,扔到TRANS_CHECK_MAX_TIME_TOPIC
          listener.resolveDiscardMsg(msgExt);
          newOffset = i + 1;
          i++;
          continue;
      }
      

      如果判斷消息確實需要回查,會調用AbstractTransactionalMessageCheckListener的sendCheckMessage方法,恢復消息真實的Topic、queueId等屬性,然后發(fā)回給Producer進行事務的回查確認。

      // 發(fā)送事務回查消息給Producer
      public void sendCheckMessage(MessageExt msgExt) throws Exception {
          CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
          checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
          checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
          checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
          checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
          checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
          msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
          msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
          msgExt.setStoreSize(0);
          // 獲取消息生產的GroupId
          String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
          // 輪詢出一臺Producer實例
          Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
          if (channel != null) {
              // 發(fā)送回查請求
              brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
          } else {
              LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
          }
      }
      

      Broker將回查請求發(fā)送給Producer后,Producer會執(zhí)行checkLocalTransaction方法檢查本地事務,然后將事務狀態(tài)再發(fā)送給Broker,重復上述流程。

      四. 總結

      RocketMQ實現(xiàn)事務消息的原理和實現(xiàn)延遲消息的原理類似,都是通過改寫Topic和queueId,暫時將消息先寫入一個對Consumer不可見的隊列中,然后等待Producer執(zhí)行本地事務,提交事務狀態(tài)后再決定將Half消息Commit或者Rollback。同時,可能因為服務宕機或網(wǎng)絡抖動等原因,Broker沒有收到Producer的事務狀態(tài)提交請求,為了對二階段進行補償,Broker會主動對未確認的Half消息進行事務回查,判斷消息的最終狀態(tài)是否確認,是通過Op隊列實現(xiàn)的,Half消息一旦確認事務狀態(tài),就會往Op隊列中寫入一條消息,消息內容是Half消息所在ConsumeQueue的偏移量。

      本文參考轉載至:

      【RocketMQ】事務消息實現(xiàn)原理分析 - 掘金 (juejin.cn)

      RocketMq之事務消息實現(xiàn)原理 - 掘金 (juejin.cn)

      posted @ 2023-12-02 10:07  聽到微笑  閱讀(119)  評論(0)    收藏  舉報  來源
      主站蜘蛛池模板: 四川少妇被弄到高潮| 日韩高清视频 一区二区| 国产AV国片精品有毛| 亚洲欧美综合人成在线| 亚洲激情一区二区三区在线| 仁寿县| а∨天堂一区中文字幕| 国产精品亚洲综合第一页| 国产国产乱老熟女视频网站97 | 亚洲国产综合一区二区精品| 成人资源网亚洲精品在线| 国产区精品福利在线观看精品| 国产精品久久久久久人妻精品动漫| 四虎影视www在线播放| 欧美乱大交aaaa片if| 九色精品国产亚洲av麻豆一| 国产女精品视频网站免费| 韩国精品一区二区三区在线观看 | 精品999日本久久久影院| 国产95在线 | 亚洲| 久久精产国品一二三产品| 成人免费A级毛片无码网站入口| 久热综合在线亚洲精品| 成人免费区一区二区三区| 日韩精品专区在线影院重磅| 久久精品国产福利亚洲av| 久草热久草热线频97精品| 沂源县| 精品视频一区二区福利午夜| 亚洲中文字幕在线二页| 91无码人妻精品一区二区蜜桃| 精品无码三级在线观看视频| 日韩亚洲视频一区二区三区| 国产精品成人aaaaa网站| 国精一二二产品无人区免费应用| 国产成熟女人性满足视频| 好吊视频在线一区二区三区 | 激情综合色综合啪啪开心| 国产精品日本一区二区不卡视频| 四虎亚洲国产成人久久精品| 亚洲a∨国产av综合av|