<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      RocketMQ 整合SpringBoot發送事務消息

      環境

      jdk: 8u22
      rocketmq: rocketmq-all-4.5.2-bin-release
      springboot: 2.1.6.RELEASE
      rocketmq-springboot: 2.0.3 

      發送流程(事務消息)

      Rocket發送事務消息:

      Rocket發送事務消息是二次提交的,第一次發送prepare提交到服務器時消息主題會替換為RMQ_SYS_TRANS_HALF_TOPIC。等到本地事務執行完畢以后才進行二次提交,這時會發送給原本消息的topic。

      1、由producer發送prepare(半消息)給MQ的broker。MQ會把消息記錄到本地,然后回復prepare消息狀態給producer。

      2、prepare消息發送以后獲取發送狀態,如果是成功則執行本地業務(本地事務),根據本地事務執行結果手動返回相應狀態(RocketMQLocalTransactionState.COMMIT、RocketMQLocalTransactionState.ROLLBACK等)給MQ。

      3、如果是COMMIT則說明本地事務執行成功,prepare為可提交狀態,MQ收到COMMIT消息就會發送給consumer,然后consumer執行本地業務。如果是ROLLBACK則會刪除prepare消息。

      4、如果MQ一直沒收到返回狀態則會啟動定時任務檢查本地事務狀態

      5、消費者、生產者的事務各由開發者自己保證。MQ的事務是由MQ保證,這里會根據使用者配置的參數來決定如何執行。

      生產者

      發送事務消息

      生產者

      /**
       * @author Zhang Qiang
       * @date 2019/12/4 15:55
       */
      @Slf4j
      @Service
      public class SyncProducer {
      
          @Resource
          private RocketMQTemplate rocketMQTemplate;
      
          public TransactionSendResult sendSyncMessage(String msg, String topic, String tag){
              log.info("【發送消息】:{}", msg);
              Message message = MessageBuilder.withPayload(msg).build();
              TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(MQGroup.SPRING_BOOT_PRODUCER_GROUP.getGroup(), topic, message, tag);
              log.info("【發送狀態】:{}", result.getLocalTransactionState());
              return result;
          }
      
      }
      

       

      監聽生產者發送事務消息

      每次推送消息會執行executeLocalTransaction方法,首先會發送半消息,到這里的時候是執行具體本地業務,執行成功后手動返回RocketMQLocalTransactionState.COMMIT狀態,這里是保證本地事務執行成功,如果本地事務執行失敗則可以返回ROLLBACK進行消息回滾。 此時消息只是被保存到broker,并沒有發送到topic中,broker會根據本地返回的狀態來決定消息的處理方式。

      checkLocalTransaction方法是rocket定時回查時所執行的,本環境下并沒有執行,原因版本原因,是因為當前版本沒有執行回調(待驗證)。

      /**
       * @author Zhang Qiang
       * @date 2019/12/4 16:07
       */
      @Slf4j
      @Component
      @RocketMQTransactionListener(txProducerGroup = "spring_boot_producer_group")
      public class SyncProducerListener implements RocketMQLocalTransactionListener {
          private AtomicInteger trnner = new AtomicInteger(0);
          private ConcurrentHashMap<String, Object> localTrans = new ConcurrentHashMap<>();
          @Autowired
          private LocalService localService;
          @Override
          public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
              try {
                  localService.executeLocalService(message.getPayload().toString());
                  log.info("【本地業務執行完畢】 msg:{}, Object:{}", message, o);
                  localTrans.put(message.getHeaders().getId()+"", message.getPayload());
                  return RocketMQLocalTransactionState.COMMIT;
              } catch (Exception e) {
                  e.printStackTrace();
                  log.error("【執行本地業務異常】 exception message:{}", e.getMessage());
                  return RocketMQLocalTransactionState.ROLLBACK;
              }
          }
          @Override
          public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
              log.info("【執行檢查任務】");
              return RocketMQLocalTransactionState.UNKNOWN;
          }
      }
      

      消費者

      這里注解可以使用selectorExpression = "tag",來標明tag,注意這里如果隊列發送的時候當前消費者無法消費的時候消息就會發送給其它tag,也就是說tag并不能保證一定能消費到消息。

      /**
       * @author Zhang Qiang
       * @date 2019/12/4 16:07
       */
      @Slf4j
      @Component
      @RocketMQMessageListener(topic = "springboot_topic", consumerGroup = "spring_boot_consumer_group")
      public class SyncConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
      
          @Autowired
          ConsumerService service;
      
          @Override
          public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
              defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently)(exts, context) ->{
                  try {
                      log.info("【獲取消息】");
                      if (!CollectionUtils.isEmpty(exts)) {
                          exts.forEach(ext->{
                              Integer con = ext.getReconsumeTimes();
                              service.readMessage(new String(ext.getBody()));
                              log.info("【消費消息】 次數:{}, ext :{}", con, ext);
                          });
                      }
                      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                  } catch (Exception e){
                      e.printStackTrace();
                      log.error("【消費消息失敗】,message:{}", e.getMessage());
                      return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                  }
              });
          }
      
          @Override
          public void onMessage(MessageExt messageExt) {
              String msg = null;
              try {
                  msg = new String(messageExt.getBody(),"utf-8");
                  log.info("MsgConsumer >>> {}, msgId:{}", msg, messageExt.getMsgId());
                  service.readMessage(msg);
              } catch (UnsupportedEncodingException e) {
                  e.printStackTrace();
                  log.error("【消費異常】:{}", e.getMessage());
              }
          }
      
      
      }
      

        

      執行結果:

      2019-12-04 17:36:43.711  INFO 9792 --- [io-10100-exec-3] com.zang.rocket.producer.SyncProducer    : 【發送狀態】:COMMIT_MESSAGE
      2019-12-04 17:36:59.956  INFO 9792 --- [io-10100-exec-5] com.zang.rocket.producer.SyncProducer    : 【發送消息】:farst
      2019-12-04 17:36:59.958  INFO 9792 --- [io-10100-exec-5] com.zang.rocket.service.LocalService     : 【執行業務,讀取發送消息】:[B@5dc81a2c
      2019-12-04 17:36:59.958  INFO 9792 --- [io-10100-exec-5] c.z.r.producer.SyncProducerListener      : 【本地業務執行完畢】 msg:GenericMessage [payload=byte[5], headers={rocketmq_TOPIC=springboot_topic, rocketmq_FLAG=0, id=252178bf-1d37-2f33-0892-721a12c0fc23, rocketmq_TRANSACTION_ID=C0A80A8B264018B4AAC2133ACA340005, timestamp=1575452219958}], Object:springboot_tag
      2019-12-04 17:36:59.958  INFO 9792 --- [io-10100-exec-5] com.zang.rocket.producer.SyncProducer    : 【發送狀態】:COMMIT_MESSAGE
      2019-12-04 17:37:19.508  INFO 9792 --- [io-10100-exec-7] com.zang.rocket.producer.SyncProducer    : 【發送消息】:oneworld
      2019-12-04 17:37:19.509  INFO 9792 --- [io-10100-exec-7] com.zang.rocket.service.LocalService     : 【執行業務,讀取發送消息】:[B@532e2914
      2019-12-04 17:37:19.510  INFO 9792 --- [io-10100-exec-7] c.z.r.producer.SyncProducerListener      : 【本地業務執行完畢】 msg:GenericMessage [payload=byte[8], headers={rocketmq_TOPIC=springboot_topic, rocketmq_FLAG=0, id=e9f68709-929a-acc9-bc90-426dc299fc5e, rocketmq_TRANSACTION_ID=C0A80A8B264018B4AAC2133B16940006, timestamp=1575452239509}], Object:springboot_tag
      2019-12-04 17:37:19.510  INFO 9792 --- [io-10100-exec-7] com.zang.rocket.producer.SyncProducer    : 【發送狀態】:COMMIT_MESSAGE
      2019-12-04 17:37:42.620  INFO 9792 --- [io-10100-exec-9] com.zang.rocket.producer.SyncProducer    : 【發送消息】:好快!
      2019-12-04 17:37:42.622  INFO 9792 --- [io-10100-exec-9] com.zang.rocket.service.LocalService     : 【執行業務,讀取發送消息】:[B@1addc4e0
      2019-12-04 17:37:42.622  INFO 9792 --- [io-10100-exec-9] c.z.r.producer.SyncProducerListener      : 【本地業務執行完畢】 msg:GenericMessage [payload=byte[7], headers={rocketmq_TOPIC=springboot_topic, rocketmq_FLAG=0, id=088cfba8-7b49-6ecd-8c7d-1c4b3e543f7d, rocketmq_TRANSACTION_ID=C0A80A8B264018B4AAC2133B70DC0007, timestamp=1575452262622}], Object:springboot_tag
      2019-12-04 17:37:42.622  INFO 9792 --- [io-10100-exec-9] com.zang.rocket.producer.SyncProducer    : 【發送狀態】:COMMIT_MESSAGE
      2019-12-04 17:38:00.541  INFO 9792 --- [io-10100-exec-2] com.zang.rocket.producer.SyncProducer    : 【發送消息】:事務消息!
      2019-12-04 17:38:00.543  INFO 9792 --- [io-10100-exec-2] com.zang.rocket.service.LocalService     : 【執行業務,讀取發送消息】:[B@2c4f2e8d
      2019-12-04 17:38:00.543  INFO 9792 --- [io-10100-exec-2] c.z.r.producer.SyncProducerListener      : 【本地業務執行完畢】 msg:GenericMessage [payload=byte[13], headers={rocketmq_TOPIC=springboot_topic, rocketmq_FLAG=0, id=b0fb1bcd-723a-bcac-d650-08eeff193762, rocketmq_TRANSACTION_ID=C0A80A8B264018B4AAC2133BB6DD0008, timestamp=1575452280543}], Object:springboot_tag
      
      

       

      消費結果

      2019-12-04 17:36:59.966  INFO 30024 --- [MessageThread_3] com.zang.rocket.cunsumer.SyncConsumer    : 【獲取消息】
      2019-12-04 17:36:59.966  INFO 30024 --- [MessageThread_3] com.zang.rocket.service.ConsumerService  : 【讀取消息服務】:farst
      2019-12-04 17:36:59.966  INFO 30024 --- [MessageThread_3] com.zang.rocket.cunsumer.SyncConsumer    : 【消費消息】 次數:0 ext :MessageExt [queueId=1, storeSize=316, queueOffset=0, sysFlag=8, bornTimestamp=1575452219956, bornHost=/192.168.10.139:60014, storeTimestamp=1575452219960, storeHost=/192.168.10.139:10911, msgId=C0A80A8B00002A9F0000000003C8971E, commitLogOffset=63477534, bodyCRC=1472579256, reconsumeTimes=0, preparedTransactionOffset=63477210, toString()=Message{topic='springboot_topic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=springboot_topic, MAX_OFFSET=1, TRAN_MSG=true, CONSUME_START_TIME=1575452219966, id=b63e043d-d3d6-59ad-b455-bbc02add83e0, UNIQ_KEY=C0A80A8B264018B4AAC2133ACA340005, WAIT=false, PGROUP=spring_boot_producer_group, timestamp=1575452219956, REAL_QID=1}, body=[102, 97, 114, 115, 116], transactionId='C0A80A8B264018B4AAC2133ACA340005'}]
      2019-12-04 17:37:19.518  INFO 30024 --- [MessageThread_4] com.zang.rocket.cunsumer.SyncConsumer    : 【獲取消息】
      2019-12-04 17:37:19.518  INFO 30024 --- [MessageThread_4] com.zang.rocket.service.ConsumerService  : 【讀取消息服務】:oneworld
      2019-12-04 17:37:19.518  INFO 30024 --- [MessageThread_4] com.zang.rocket.cunsumer.SyncConsumer    : 【消費消息】 次數:0 ext :MessageExt [queueId=8, storeSize=319, queueOffset=0, sysFlag=8, bornTimestamp=1575452239508, bornHost=/192.168.10.139:60014, storeTimestamp=1575452239512, storeHost=/192.168.10.139:10911, msgId=C0A80A8B00002A9F0000000003C89A2D, commitLogOffset=63478317, bodyCRC=319562353, reconsumeTimes=0, preparedTransactionOffset=63477990, toString()=Message{topic='springboot_topic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=springboot_topic, MAX_OFFSET=1, TRAN_MSG=true, CONSUME_START_TIME=1575452239518, id=9c9acec6-1fd4-d45c-1bc2-bc489e08e8bc, UNIQ_KEY=C0A80A8B264018B4AAC2133B16940006, WAIT=false, PGROUP=spring_boot_producer_group, timestamp=1575452239508, REAL_QID=8}, body=[111, 110, 101, 119, 111, 114, 108, 100], transactionId='C0A80A8B264018B4AAC2133B16940006'}]
      2019-12-04 17:37:42.634  INFO 30024 --- [MessageThread_5] com.zang.rocket.cunsumer.SyncConsumer    : 【獲取消息】
      2019-12-04 17:37:42.634  INFO 30024 --- [MessageThread_5] com.zang.rocket.service.ConsumerService  : 【讀取消息服務】:好快!
      2019-12-04 17:37:42.634  INFO 30024 --- [MessageThread_5] com.zang.rocket.cunsumer.SyncConsumer    : 【消費消息】 次數:0 ext :MessageExt [queueId=6, storeSize=318, queueOffset=0, sysFlag=8, bornTimestamp=1575452262620, bornHost=/192.168.10.139:60014, storeTimestamp=1575452262624, storeHost=/192.168.10.139:10911, msgId=C0A80A8B00002A9F0000000003C89D3E, commitLogOffset=63479102, bodyCRC=1129015149, reconsumeTimes=0, preparedTransactionOffset=63478776, toString()=Message{topic='springboot_topic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=springboot_topic, MAX_OFFSET=1, TRAN_MSG=true, CONSUME_START_TIME=1575452262634, id=92e37d9b-b7b5-6064-65e5-deee043a5c48, UNIQ_KEY=C0A80A8B264018B4AAC2133B70DC0007, WAIT=false, PGROUP=spring_boot_producer_group, timestamp=1575452262620, REAL_QID=6}, body=[-27, -91, -67, -27, -65, -85, 33], transactionId='C0A80A8B264018B4AAC2133B70DC0007'}]
      2019-12-04 17:38:00.549  INFO 30024 --- [MessageThread_6] com.zang.rocket.cunsumer.SyncConsumer    : 【獲取消息】
      2019-12-04 17:38:00.549  INFO 30024 --- [MessageThread_6] com.zang.rocket.service.ConsumerService  : 【讀取消息服務】:事務消息!
      2019-12-04 17:38:00.549  INFO 30024 --- [MessageThread_6] com.zang.rocket.cunsumer.SyncConsumer    : 【消費消息】 次數:0 ext :MessageExt [queueId=13, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1575452280541, bornHost=/192.168.10.139:60014, storeTimestamp=1575452280544, storeHost=/192.168.10.139:10911, msgId=C0A80A8B00002A9F0000000003C8A055, commitLogOffset=63479893, bodyCRC=880426184, reconsumeTimes=0, preparedTransactionOffset=63479560, toString()=Message{topic='springboot_topic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=springboot_topic, MAX_OFFSET=1, TRAN_MSG=true, CONSUME_START_TIME=1575452280549, id=1bc5beda-d52a-9fa3-3847-e301a28095b8, UNIQ_KEY=C0A80A8B264018B4AAC2133BB6DD0008, WAIT=false, PGROUP=spring_boot_producer_group, timestamp=1575452280541, REAL_QID=13}, body=[-28, -70, -117, -27, -118, -95, -26, -74, -120, -26, -127, -81, 33], transactionId='C0A80A8B264018B4AAC2133BB6DD0008'}]
      

        

       

      源碼地址

      https://gitee.com/ghostsugar/rocketMq

      posted on 2019-12-04 18:17  GhostSugar  閱讀(5516)  評論(2)    收藏  舉報

      主站蜘蛛池模板: 四虎永久免费高清视频| 国产免费无遮挡吸奶头视频| 不卡AV中文字幕手机看| 国产精品∧v在线观看| 亚洲精品欧美综合二区| 国产精品天天看天天狠| av午夜久久蜜桃传媒软件| 国产无遮挡吃胸膜奶免费看| 日夜啪啪一区二区三区| 日韩一本不卡一区二区三区| 九九久久精品国产免费看小说| 亚洲av成人无码天堂| 亚洲一级特黄大片在线观看| 九九热在线免费视频精品| 国产a在视频线精品视频下载| 国产无遮挡吃胸膜奶免费看| 亚洲gv猛男gv无码男同| 高颜值午夜福利在线观看| 文水县| 一区二区三区自拍偷拍视频| 国产目拍亚洲精品二区| 亚洲色大成网站WWW久久| 国产精品麻豆成人av网| 综合色久七七综合尤物| 亚洲精品人妻中文字幕| 亚洲 欧美 唯美 国产 伦 综合| 国产精品男女午夜福利片| 少妇被粗大猛进进出出| 中国大陆高清aⅴ毛片| 朔州市| 2020国产成人精品视频| 花式道具play高h文调教| 日韩中文字幕精品人妻| 干老熟女干老穴干老女人| 最近中文字幕完整版2019| 九九热爱视频精品视频| 福利一区二区视频在线| 精品无码久久久久国产| 亚洲国产成人久久一区久久| 超碰伊人久久大香线蕉综合| 成人午夜福利精品一区二区|