<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      RabbitMQ最佳實踐

      在使用消息機制時,我們通常需要考慮以下幾個問題:

      • 消息不能丟失
      • 保證消息一定能投遞到目的地
      • 保證業務處理和消息發送/消費的一致性

      本文以RabbitMQ為例,討論如何解決以上問題。

      消息持久化

      如果希望RabbitMQ重啟之后消息不丟失,那么需要對以下3種實體均配置持久化:

      • exchange
      • queue
      • message

      聲明exchange時設置持久化(durable = true)并且不自動刪除(autoDelete = false):

      boolean durable = true;
      boolean autoDelete = false;
      channel.exchangeDeclare("dlx", TOPIC, durable, autoDelete, null)
      

      聲明queue時設置持久化(durable = true)并且不自動刪除(autoDelete = false):

      boolean durable = true;
      boolean autoDelete = false;
      channel.queueDeclare("order-summary-queue", durable, false, autoDelete, queueArguments);
      

      發送消息時通過設置deliveryMode=2持久化消息:

      AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                          .contentType("application/json")
                          .deliveryMode(2)
                          .priority(0)
                          .build();
      channel.basicPublish("order", "order.created", false, properties, "sample-data".getBytes())
      

      發送確認

      有時,業務處理成功,消息也發了,但是我們并不知道消息是否成功到達了rabbitmq,如果由于網絡等原因導致業務成功而消息發送失敗,那么發送方將出現不一致的問題,此時可以使用rabbitmq的發送確認功能,即要求rabbitmq顯式告知我們消息是否已成功發送。

      首先需要在channel上設置ConfirmListener:

      channel.addConfirmListener(new ConfirmListener() {
                      public void handleAck(long seqNo, boolean multiple) {
                          if (multiple) {
                              logger.info(seqNo + "號及其以前的所有消息發送成功,當消息發送成功后執行相應邏輯,比如標記事件為已發送或者刪除原來事件");
                          } else {
                              logger.info(seqNo + "號發送成功,當消息發送成功后執行相應邏輯,比如標記事件為已發送或者刪除原來事件");
                          }
                      }
      
                      public void handleNack(long seqNo, boolean multiple) {
                          if (multiple) {
                              logger.info(seqNo + "號及其以前的所有消息發送失敗,當消息發送失敗后執行相應邏輯,比如重試或者標記事件發送失敗");
                          } else {
                              logger.info(seqNo + "號發送失敗,當消息發送失敗后執行相應邏輯,比如重試或者標記事件發送失敗");
      
                          }
                      }
                  });
      

      然后在發送消息直線需要開啟發送確認模式:

      //開啟發送者確認
      channel.confirmSelect();
      

      然后發送消息:

      channel.basicPublish("order", "order.created", false, properties, "sample-data".getBytes());
      

      當消息正常投遞時,rabbitmq客戶端將異步調用handleAck()表示消息已經成功投遞,此時程序可以自行處理投遞成功之后的邏輯,比如在數據庫中將消息設置為已發送。當消息投遞出現異常時,handleNack()將被調用。

      通常來講,發送端只需要保證消息能夠發送到exchange即可,而無需關注消息是否被正確地投遞到了某個queue,這個是rabbitmq和消息的接收方需要考慮的事情。基于此,如果rabbitmq找不到任何需要投遞的queue,那么rabbitmq依然會ack給發送方,此時發送方可以認為消息已經正確投遞,而不好用關系消息沒有queue接收的問題。但是,對于rabbitmq而言,這種消息是需要記錄下來的,否則rabbitmq將直接丟棄該消息。此時可以為exchange設置alternate-exchange,即表示rabbitmq將把無法投遞到任何queue的消息發送到alternate-exchange指定的exchange中,通常來說可以設置一個死信交換(DLX)。

      事實上,對于exchange存在但是卻找不到任何接收queue時,如果發送是設置了mandatory=true,那么在消息被ack前將return給客戶端,此時客戶端可以創建一個ReturnListener用于接收返回的消息:

      channel.addReturnListener(new ReturnListener() {
                      @Override
                      public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          logger.warn("消息無法正確投遞,已返回。");
                      }
                  });
      

      但是需要注意的是,在return之后,消息依然會被ack而不是nack,還不如不設置madatory呢,因此return有時并不見得有用。

      需要注意的是,在發送消息時如果exchange不存在,rabbitmq直接丟棄該消息,并且不會ack或者nack操作,但是在Spring中,會nack。

      綜合起來,要完成發送方確認,需要做以下幾個點:

      • 設置ConfirmListener
      • 通過confirmSelect()開啟
      • 為exchange設置alternate-exchange到DLX
      • 發送時沒有必要設置mandotory
      • 發送方將消息記錄在數據庫中,收到ack時在數據庫中標記消息為已發送狀態
      • 如果收到reject或者由于網絡原因沒有收到ack,那么消息狀態不會改變,下次發送時再次發送,此時可能導致消息重復,解決重復問題請參考“保證至少一次投遞,并且消費端冪”小節。

      手動消費確認

      有時,消息被正確投遞到消費方,但是消費方處理失敗,那么便會出現消費方的不一致問題。比如訂單已創建的消息發送到用戶積分子系統中用于增加用戶積分,但是積分消費法處理卻都失敗了,用戶就會問:我購買了東西為什么積分并沒有增加呢?

      要解決這個問題,需要引入消費方確認,即只有消息被成功處理之后才告知rabbitmq以ack,否則告知rabbitmq以nack,此時的處理流程如下:

      1. 接收消息,不作ack,處理消息成功則ack,不成功nack
      2. 對于nack的消息,可以配置rabbitmq要么重新投遞,要么直接扔掉,要么傳到死信交換(DLX)
      3. 如果處理成功,但是由于網絡等問題導致確認(無論是ack還是nack)不成功,那么rabbitmq會重新投遞消息,但是此時由于消息已經成功,重新投遞便導致了消費重復的消息,此時請參考“保證至少一次投遞,并且消費端冪”小節。

      在rabbitmq中,消息默認是字段ack的,即消息到達消費方立即ack,而不管消費方業務處理是否成功,為此可以開啟手動確認模式,即有消費方自行決定何時應該ack,通過設置autoAck=false開啟手動確認模式:

              boolean autoAck = false;
              channel.basicConsume("order-summary-queue", autoAck,
                      new DefaultConsumer(channel) {
                          @Override
                          public void handleDelivery(String consumerTag,
                                                     Envelope envelope,
                                                     AMQP.BasicProperties properties,
                                                     byte[] body)
                                  throws IOException {
                              long deliveryTag = envelope.getDeliveryTag();
                              if (success()) {
                                  logger.info("成功消費消息" + deliveryTag);
                                  channel.basicAck(deliveryTag, false);
                              } else {
                                  if (!envelope.isRedeliver()) {
                                      logger.warn("首次消費消息" + deliveryTag + "不成功,嘗試重試");
                                      boolean requeue = true;
                                      channel.basicNack(deliveryTag, false, requeue);
                                  } else {
                                      logger.warn("第二次消費消息" + deliveryTag + "不成功,扔到DLX");
                                      boolean requeue = false;
                                      channel.basicNack(deliveryTag, false, requeue);
                                  }
                              }
                          }
                      });
      

      可以看到,在autoAck=false情況下,通過業務處理的是否成功(success())來判斷應該ack還是nack。

      另外,為了避免消息反復requeue的情況,如果消息第一次消費不成功,則在nack時設置requeue=true,表示告知rabbitmq將reject的消息重新投遞,如果第二次消費依然不成功,那么nack時設置requeue=false,告知rabbitmq不要重新投遞了,此時rabbitmq將根據自己的配置要么直接扔掉消息,要么將消息發送到DLX中,具體配置請參考“設置死信交換(DLX)和死信隊列(DLQ)”。

      保證至少一次投遞,并且消費端冪等

      通常來說,程序中會先完成寫數據庫的操作,然后發送消息,此時一個重要的點是保證這兩者的一致性,即一旦數據庫保存成功消息必須也能夠發送成功。要保證發送發一致性,一種做法是使用全局事務,即將數據庫操作和消息發送放到一個事務中,比如JTA,但是全局事務是很重的,并且rabbitmq目前并不支持全局事務。

      要解決發送發的一致性問題,可以實現將消息保存到數據庫的事件表中,此時業務處理的數據庫操作和保存消息到數據庫屬于同一個本地數據庫事務,那么到此可以保證業務處理和消息產生的原子性,然后有一個異步的后臺任務從數據庫的事件表中一次讀取未發送的消息發送至rabbitmq,發送成功后更新消息的狀態為已發布

      然而,此時我們依然無法保證發送消息和更新消息狀態之間的原子性,因為可能發生消息發送成功但是數據庫狀態更新不成功的情況,為了解決這種極端情況,可以多次重試消息發送,步驟如下:

      1. 讀取時間表中未發送消息,發送到rabbitmq
      2. 如果發送成功,事件表中消息狀態也更新成功,皆大歡喜
      3. 如果消息發送不成功,那么消息狀態也不作改變,下次重試
      4. 如果消息發送成功而狀態更新不成功,下次重試

      不斷重試,總有一個能夠達到發送消息和狀態更新的原子性。

      那么問題也來了:rabbitmq中可能出現多條重復消息,此時消費端就懵了。為了解決這個問題,消費方應該設計為冪等的,即對相同消息的多次消費與單次消費結果相同。有些消費方的業務邏輯本身便是冪等的,而對于本身不冪等的消費方,需要在數據庫中記錄已經被正確消費的消息,當重復消息來時,判斷該消息是否已經被消費,如果沒有則執行消費邏輯,如果已經消費則直接忽略。此時消費方的處理步驟如下:

      1. 接收到消息,判斷消息是否已經消費,如果是,則直接忽略,此時已然需要做消費成功確認
      2. 如果消息還未被消費,則處理業務邏輯,記錄消息,業務邏輯本身和記錄消息在同一個數據庫事務中,如果都成功,則皆大歡喜;如果失敗,那么消費方業務回滾,消息也不記錄,此時reject消息,等下次重發

      設置消息的TTL和消息隊列的max-length

      為了保證消息的時效性,可以設置隊列中消息的TTL(x-message-ttl),而為了保證消息隊列不至于太大而影響性能,可以設置隊列的最大消息數(x-max-length)。在創建隊列時設置如下:

      ImmutableMap<String, Object> orderSummaryQueueArguments = of(
                          "x-max-length",
                          300,
                          "x-message-ttl",
                          24 * 60 * 60 * 1000);
      channel.queueDeclare("order-summary-queue", true, false, false, orderSummaryQueueArguments);
      

      設置死信交換(DLX)和死信隊列(DLQ)

      對于無法投遞的消息,我們需要將其記錄下來便于后續跟蹤排查,此時可以將這樣的消息放入DLX和DLQ中。默認情況下,queue中被拋棄的消息將被直接丟掉,但是可以通過設置queue的x-dead-letter-exchange參數,將被拋棄的消息發送到x-dead-letter-exchange做指定的exchange中,這樣的exchange成為DLX。

      設置了x-dead-letter-exchange之后,在以下三種情況下消息將被扔到DLX中:

      1. 消費方nack時指定了requeue=false
      2. 消息的TTL已到
      3. 消息隊列的max-length已到

      在聲明queue時定義x-dead-letter-exchange

      ImmutableMap<String, Object> orderNotificationQueueArguments = of("x-dead-letter-exchange", "dlx");
      channel.queueDeclare("order-notification-queue", true, false, false, orderNotificationQueueArguments);
      
      • 設置DLQ為lazy,并且沒有TTL,并且沒有max-length
        在以下3種情況下,消息會被投遞到DLX中:

      需要注意的是,在發送消息時,當已經達到queue的上限,而當queue定義為x-overflow=reject-publish時,rabbitmq將nack。當有多個queue同時綁定到exchange時,如果有些queue設置了reject-publish,而有些卻沒有,那么依然會nack,這對發送方來說不好處理。因此,還是那句話,發送方只需要保證正確地投遞到了exchange即可,而不用關系exchange后面有哪些queue。

      設置Prefetch count

      Prefetch count表示消費方一次性從rabbitmq讀取的消息數量,如果設置過大,那么消費方可能始終處于高負荷運轉狀態,而如果太小又會增加網絡開銷,通常設置為20-50。另外,有時為了保證多個消費方均衡地分攤消息處理任務,通常設置prefetch count為1。

      異常處理

      在以上設置的情況下,我們來看看當各種異常發生時,rabbitmq是如何運作的:

      • broker不可達:直接拋出異常;
      • 發送方自己始終發送不出去:消息狀態始終處于“未發送”,不會破壞一致性,但是對于事件表中累計太多的事件需要關注;
      • exchange不存在:消息被丟掉,rabbitmq不會ack,消息狀態始終處于“未發送”,下次將重新發送,不會破壞一致性,但是當exchange持續不存在下去,那么事件表中事件也會累計太多;
      • exchange存在但是沒有接受queue:消息將被ack并標記為“已發送”,但由于設置了alternative exchange為dlx,那么消息將發送到dlx對應的dlq中保存以便后續處理;
      • consumer不在線,而累積消息太多:消息一致性沒有問題,但是當累計到了max-length上限,消息隊列頭部的消息將被放置dlq中以便后續處理;
      • consumer臨時性失敗:通過redelivered判斷是否為重復投遞,如果是,則nack并且requeue=false,表示如果重復投遞的一次的消息如果再失敗,那么直接扔到dlx中,也即消息最多重復投遞一次;
      • consumer始終失敗:所有消息均被投入dlq以便后續處理,此時可能需要關注dlq的長度是否太長。

      路由策略

      系統中往往會發布多種類型的消息,在發送時有幾種路由策略:

      • 所有類型的消息都發送到同一個exchange中
      • 每種類型的消息都單獨配置一個exchange
      • 對消息類型進行歸類,同一類型的消息對應一個exchange

      筆者建議采用最后一種,并且結合DDD中的聚合劃分,路由策略建議如下:

      每一個聚合根下發布的所有類型的事件對應一個exchange,exchange設置為topic,queue可以配置接收某一種類型的事件,也可以配置接收所有某種聚合相關的事件,還可以配置接收所有事件。

      案例

      假設有個訂單(Order)系統,用戶下單后需要向用戶發送短信通知,而所有對訂單的數據顯示采用了CQRS架構,即將訂單的讀模型和寫模型分離,即所有訂單的更新都通過事件發到rabbitmq,然后專門有個consumer接收這些消息用于更新訂單的讀模型。

      訂單相關有兩個事件:order.created和order.updated,所有與訂單相關的事件都發布到同一個 topic exchange中,exchange名為“order",設置短信通知queue(order-notification-queue)只接收order.created消息,因為只有訂單在新建時才會發出通知,即order-notification-queue的routing key為order.created,設置讀模型的queue(order-summary-queue)接收所有與Order相關的消息,即配置order-summary-queue的routing key為order.#,示例代碼如下:

      package com.ecommerce.order.spike.rabbitmq;
      
      import com.ecommerce.order.common.logging.AutoNamingLoggerFactory;
      import com.google.common.collect.ImmutableMap;
      import com.rabbitmq.client.*;
      import org.slf4j.Logger;
      
      import java.io.IOException;
      import java.util.concurrent.TimeoutException;
      
      import static com.google.common.collect.ImmutableMap.of;
      import static com.rabbitmq.client.BuiltinExchangeType.TOPIC;
      
      public class RabbitMQSender {
          private static final Logger logger = AutoNamingLoggerFactory.getLogger();
      
          public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
              ConnectionFactory factory = new ConnectionFactory();
              factory.setHost("localhost");
              factory.setUsername("rabbitmq-user");
              factory.setPassword("rabbitmq-password");
              factory.setVirtualHost("/");
              factory.setPort(5672);
      
              try (Connection conn = factory.newConnection(); Channel channel = conn.createChannel();) {
                  //設置死信交換,Topic類型,持久化
                  channel.exchangeDeclare("dlx", TOPIC, true, false, null);
      
                  //設置死信隊列,持久化,lazy型
                  channel.queueDeclare("dlq", true, false, false, of("x-queue-mode", "lazy"));
      
                  //接收所有發給dlx的消息,另外可以定義其他queue接收指定類型的消息
                  channel.queueBind("dlq", "dlx", "#");
      
      
                  //定義與order相關的事件exchange,如果無法路由,則路由到死信交換dlx
                  channel.exchangeDeclare("order", TOPIC, true, false, of("alternate-exchange", "dlx"));
      
      
                  //定義用于異步更新order讀模型的queue,設置死信交換為dlx,隊列滿(x-overflow)時將頭部消息發到dlx
                  //定義queue的最大消息數(x-max-length)為300,滿后發到dlx,另外定義消息的存活時間(x-message-ttl)為1天,1天后發送到dlx
                  ImmutableMap<String, Object> orderSummaryQueueArguments = of("x-dead-letter-exchange",
                          "dlx",
                          "x-overflow",
                          "drop-head",
                          "x-max-length",
                          300,
                          "x-message-ttl",
                          24 * 60 * 60 * 1000);
                  channel.queueDeclare("order-summary-queue", true, false, false, orderSummaryQueueArguments);
                  channel.queueBind("order-summary-queue", "order", "order.#");
      
      
                  //定義用于order創建時向用戶發出通知的queue,設置死信交換為dlx
                  ImmutableMap<String, Object> orderNotificationQueueArguments = of("x-dead-letter-exchange",
                          "dlx",
                          "x-overflow",
                          "drop-head",
                          "x-max-length",
                          300,
                          "x-message-ttl",
                          24 * 60 * 60 * 1000);
                  channel.queueDeclare("order-notification-queue", true, false, false, orderNotificationQueueArguments);
                  channel.queueBind("order-notification-queue", "order", "order.created");
      
      
                  //設置發送端確認
                  channel.addConfirmListener(new ConfirmListener() {
                      public void handleAck(long seqNo, boolean multiple) {
                          if (multiple) {
                              logger.info(seqNo + "號及其以前的所有消息發送成功,當消息發送成功后執行相應邏輯,比如標記事件為已發送或者刪除原來事件");
                          } else {
                              logger.info(seqNo + "號發送成功,當消息發送成功后執行相應邏輯,比如標記事件為已發送或者刪除原來事件");
      
                          }
                      }
      
                      public void handleNack(long seqNo, boolean multiple) {
                          if (multiple) {
                              logger.info(seqNo + "號及其以前的所有消息發送失敗,當消息發送失敗后執行相應邏輯,比如重試或者標記事件發送失敗");
                          } else {
                              logger.info(seqNo + "號發送失敗,當消息發送失敗后執行相應邏輯,比如重試或者標記事件發送失敗");
      
                          }
                      }
                  });
      
                  //開啟發送者確認
                  channel.confirmSelect();
      
                  //設置消息持久化
                  AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                          .contentType("application/json")
                          .deliveryMode(2)
                          .priority(0)
                          .build();
      
      
                  //發送時沒有必要設置mandatory,因為無法路由的消息會記錄在dlq中
                  //達到queue的上限時,queue頭部消息將被放入dlx中
                  try {
                      channel.basicPublish("order", "order.created", false, properties, "create order data".getBytes());
                      channel.basicPublish("order", "order.updated", false, properties, "update order data".getBytes());
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
                  Thread.sleep(5000);
              }
      
          }
      }
      
      

      以上,我們發送了一條order.created消息和一條order.updated消息,基于routing key設置,兩條消息都會到達order-summary-queue,但是只有order.created消息到達了order-notification-queue:
      所有消息都到達了order-summary-queue,但是只有order.created消息到達了order-notification-queue

      在consumer端,開啟手動ack,并且對于處理失敗的場景,只允許重新投遞一次,否則扔到DLX中:

      package com.ecommerce.order.spike.rabbitmq;
      
      import com.ecommerce.order.common.logging.AutoNamingLoggerFactory;
      import com.rabbitmq.client.*;
      import org.slf4j.Logger;
      
      import java.io.IOException;
      import java.util.Random;
      import java.util.concurrent.TimeoutException;
      
      public class RabbitMQReceiver {
          private static final Logger logger = AutoNamingLoggerFactory.getLogger();
      
          public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
              ConnectionFactory factory = new ConnectionFactory();
              factory.setHost("localhost");
              factory.setUsername("rabbitmq-user");
              factory.setPassword("rabbitmq-password");
              factory.setVirtualHost("/");
              factory.setPort(5672);
      
              Connection conn = factory.newConnection();
              Channel channel = conn.createChannel();
              channel.basicQos(1, true);
      
              boolean autoAck = false;
              channel.basicConsume("order-summary-queue", autoAck,
                      new DefaultConsumer(channel) {
                          @Override
                          public void handleDelivery(String consumerTag,
                                                     Envelope envelope,
                                                     AMQP.BasicProperties properties,
                                                     byte[] body)
                                  throws IOException {
                              long deliveryTag = envelope.getDeliveryTag();
      
                              //用Random來模擬有時處理成功有時處理失敗的場景
                              if (new Random().nextBoolean()) {
                                  logger.info("成功消費消息" + deliveryTag);
                                  channel.basicAck(deliveryTag, false);
                              } else {
                                  if (!envelope.isRedeliver()) {
                                      logger.warn("首次消費消息" + deliveryTag + "不成功,嘗試重試");
                                      channel.basicNack(deliveryTag, false, true);
                                  } else {
                                      logger.warn("第二次消費消息" + deliveryTag + "不成功,扔到DLX");
                                      channel.basicNack(deliveryTag, false, false);
                                  }
                              }
                          }
                      });
          }
      }
      
      
      
      posted @ 2019-05-21 22:11  無知者云  閱讀(18669)  評論(12)    收藏  舉報
      主站蜘蛛池模板: 青草青草久热精品视频在线观看| 亚洲成女人图区一区二区| 亚洲国产美女精品久久久| 亚洲免费观看在线视频| 国产一区二区不卡91| 国产精品天干天干综合网| 激情影院内射美女| 日韩精品卡一卡二卡三卡四| 欧美不卡无线在线一二三区观| 国产av国片精品一区二区| 国产毛片基地| 乱码精品一区二区三区| 中文在线天堂中文在线天堂| 日本国产精品第一页久久| 一本久久a久久精品综合| 亚洲精品乱码久久久久久蜜桃| 九九热精品视频在线免费| 亚洲人妻精品中文字幕| 中文字幕V亚洲日本在线电影| 中文字幕一区二区三区精华液| 国产一区二区三区精品综合| 久久成人 久久鬼色| 九九热精品在线视频免费| 蜜桃av无码免费看永久| 色8久久人人97超碰香蕉987| 无码日韩精品一区二区三区免费| 夜夜爽日日澡人人添| 亚洲精品成人一二三专区| 亚洲精品国产免费av| 日韩深夜免费在线观看| 亚洲码国产精品高潮在线| 99在线精品国自产拍中文字幕| 精品自拍偷拍一区二区三区| 国产玖玖玖玖精品电影| 亚洲天堂av在线免费看| 日韩av无码精品人妻系列| 中文字幕第一页国产| 精品偷拍一区二区三区在| 伊人精品成人久久综合| 成人免费A级毛片无码片2022| 亚洲一区成人av在线|