<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      RabbitMQ簡(jiǎn)單入門

      服務(wù)端安裝及配置

      docker安裝

      使用docker安裝RabbitMQ,注意,要選擇tag包含management的鏡像(包含web端管理插件)

      docker pull rabbitmq:3.7.7-management
      docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xxx rabbitmq:3.7.7-management
      

      設(shè)置初始賬號(hào)和密碼,web頁(yè)面地址如下

      http://ip:15672
      

      客戶端訪問(wèn)

      添加依賴

      <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.4.3</version>
      </dependency>
      

      發(fā)布消息

      public class TestRabbitClient {
      
        public static void main(String[] args) throws IOException, TimeoutException {
          String host = "";
          String queueName = "hello_queue";
          ConnectionFactory factory = new ConnectionFactory();
          factory.setHost(host);
          factory.setUsername("");
          factory.setPassword("");
          try (Connection connection = factory.newConnection();
              Channel channel = connection.createChannel()) {
              //隊(duì)列名稱 是否持久化到硬盤(pán) 是否消息共享(多個(gè)消費(fèi)者消費(fèi)) 是否自動(dòng)刪除
            channel.queueDeclare(queueName, false, false, false, null);
            String message = "Hello World!";
            //發(fā)布消息 使用的默認(rèn)交換機(jī)
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
          }
        }
      }
      

      消費(fèi)消息

      channel.basicConsume(queueName, true, new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
              System.out.println("receive message: " + new String(message.getBody()));
            }
          }, (x) -> {
            System.out.println("消息被中斷");
          });
      

      注意:channel被關(guān)閉之后 就不能接收到消息了

      交換機(jī)

      交換機(jī)類型

      • Direct (直連交換機(jī)): 最常使用,會(huì)根據(jù)routingkey進(jìn)行精準(zhǔn)匹配。直連交換機(jī)可以分發(fā)任務(wù)給多個(gè)工作者(worker)
      • Fanout (扇形交換機(jī)): 將消費(fèi)分發(fā)給所有綁定的隊(duì)列,而不會(huì)理會(huì)routingkey。優(yōu)點(diǎn)是轉(zhuǎn)發(fā)消息最快,性能最好。一般會(huì)用來(lái)處理廣播消息(broadcast routing)。
      • Topic(主題交換機(jī)): 根據(jù)routingkey進(jìn)行模糊匹配,將消息分發(fā)給一個(gè)或多個(gè)隊(duì)列(delimited by dots)。 routingkey可以有通配符 *,#。* 表示匹配一個(gè)單詞,# 匹配0個(gè)或多個(gè)單詞。當(dāng)綁定鍵為#時(shí),表示接收所有消息,和Fanout交換機(jī)類似了,當(dāng)綁定鍵不包含*和#時(shí),和Direct交換機(jī)功能類似了。
      • Headers (頭交換機(jī)): 類似于直連交換機(jī)。不同點(diǎn)在于頭交換機(jī)的路由規(guī)則建立在頭屬性之上而不是路由鍵,一般開(kāi)發(fā)使用較少。

      使用Fanout交換機(jī)實(shí)現(xiàn)廣播

      @Configuration
      public class RabbitConfig {
      
          @Bean
          public FanoutExchange testExchange2() {
              return new FanoutExchange("test_exchange2");
          }
      }
      
      @Component
      @RabbitListener(bindings = @QueueBinding(
              value = @Queue(), //注意這里不要定義隊(duì)列名稱,系統(tǒng)會(huì)隨機(jī)生成 spring.gen-4klrfbJ0TfmrGQnCpyPAQg這種格式
              exchange = @Exchange(value = "test_exchange2", type = ExchangeTypes.FANOUT))
      )
      public class RabbitMqReceiver {
          /**
           * 測(cè)試消息接收
           */
          @RabbitHandler
          public void process(String context, Message message, Channel channel) {
          }
      }
      
      @Component
      @RabbitListener(bindings = @QueueBinding(
              value = @Queue(), //注意這里不要定義隊(duì)列名稱,系統(tǒng)會(huì)隨機(jī)生成 spring.gen-4klrfbJ0TfmrGQnCpyPAQg這種格式
              exchange = @Exchange(value = "test_exchange2", type = ExchangeTypes.FANOUT))
      )
      public class RabbitMqReceiver {
          /**
           * 測(cè)試消息接收
           */
          @RabbitHandler
          public void process(String context, Message message, Channel channel) {
          }
      }
      

      通過(guò)Spring來(lái)創(chuàng)建動(dòng)態(tài)隊(duì)列,動(dòng)態(tài)綁定關(guān)系。如果我們不指定隊(duì)列名稱,Spring 會(huì)創(chuàng)建非持久化、排他、自動(dòng)刪除的隊(duì)列,具體邏輯可以看 RabbitListenerAnnotationBeanPostProcessor 的 declareQueue 方法。

      死信隊(duì)列

      當(dāng)一條消息在隊(duì)列中出現(xiàn)以下三種情況的時(shí)候,該消息就會(huì)變成一條死信。

      • 消息被拒絕(basic.reject / basic.nack),并且requeue = false
      • 消息TTL過(guò)期(通過(guò)x-message-ttl屬性設(shè)置)
      • 隊(duì)列達(dá)到最大長(zhǎng)度(通過(guò)x-max-length屬性設(shè)置)

      當(dāng)消息在一個(gè)隊(duì)列中變成一個(gè)死信之后,如果配置了死信隊(duì)列,它將被重新publish到死信交換機(jī),死信交換機(jī)將死信投遞到一個(gè)隊(duì)列上,這個(gè)隊(duì)列就是死信隊(duì)列。

      //聲明訂單取消正常隊(duì)列 當(dāng)消息過(guò)期時(shí),將消息發(fā)布到死信隊(duì)列
      Map<String, Object> params = Map
              .of("x-dead-letter-exchange", "order_cancel_delay_queue_exchange",
                  "x-dead-letter-routing-key", "order_cancel_delay_queue_key",
                  "x-message-ttl", 20 * 1000);//消息過(guò)期時(shí)間 毫秒 在這里設(shè)置不靈活(不能改),可以在消息發(fā)布時(shí)設(shè)置
          channel.queueDeclare("order_cancel_queue", true, false, false, params);
          channel.exchangeDeclare("order_cancel_queue_exchange", BuiltinExchangeType.DIRECT, true);
          channel
              .queueBind("order_cancel_queue", "order_cancel_queue_exchange", "order_cancel_queue_key");
      
      

      在發(fā)布消息時(shí)指定過(guò)期時(shí)間

      //聲明訂單取消正常隊(duì)列 當(dāng)消息過(guò)期時(shí),將消息發(fā)布到死信隊(duì)列
      Map<String, Object> params = Map
              .of("x-dead-letter-exchange", "order_cancel_delay_queue_exchange",
                  "x-dead-letter-routing-key", "order_cancel_delay_queue_key");//消息過(guò)期時(shí)間 毫秒
          channel.queueDeclare("order_cancel_queue", true, false, false, params);
          channel.exchangeDeclare("order_cancel_queue_exchange", BuiltinExchangeType.DIRECT, true);
          channel
              .queueBind("order_cancel_queue", "order_cancel_queue_exchange", "order_cancel_queue_key");
      
          BasicProperties basicProperties = new Builder()
              .expiration("15000")//消息過(guò)期時(shí)間 15s
              .build();
          //向正常隊(duì)列發(fā)布消息
          channel.basicPublish("order_cancel_queue_exchange", "order_cancel_queue_key", basicProperties,
              "hello".getBytes());
      

      延時(shí)隊(duì)列

      • 使用死信隊(duì)列的消息過(guò)期方式來(lái)實(shí)現(xiàn)
        有一個(gè)問(wèn)題,如果一個(gè)隊(duì)列配置了死信隊(duì)列,在發(fā)布消息時(shí)指定過(guò)期時(shí)間,第一條20S,第二條5S,那么第二條也會(huì)延遲到20S后執(zhí)行,因?yàn)閞abbitmq只會(huì)檢查第一個(gè)消息是否過(guò)期。
      • 使用rabbitmq的延遲插件,創(chuàng)建的交換機(jī)類型為x-delayed-message,并設(shè)置x-delayed-type屬性為direct,這種方式也沒(méi)有第一種方式的問(wèn)題。
      public Exchange demo08Exchange() {
            Map<String, Object> args = new HashMap<>();
            args.put("x-delayed-type", "direct");
            return new CustomExchange(Demo01Message.NEW_DELAY_EXCHANGE, "x-delayed-message", true, false, args);
          }
      //消息發(fā)布
      rabbitTemplate.convertAndSend(Demo01Message.NEW_DELAY_EXCHANGE, Demo01Message.ROUTING_KEY, message, msg -> {
            msg.getMessageProperties().setDelay(20 * 1000);//延遲20S
            return msg;
          });
      

      整合Spring

      • @EnableRabbit注解導(dǎo)入的RabbitListenerAnnotationBeanPostProcessor來(lái)處理@RabbitListener注解和@RabbitHandler注解
      • 將消息處理器包裝成MultiMethodRabbitListenerEndpoint,注冊(cè)到RabbitListenerEndpointRegistrar對(duì)象中
      • RabbitListenerEndpointRegistrar對(duì)象只是一個(gè)工具類,最終是注冊(cè)到RabbitListenerEndpointRegistry對(duì)象中,它也是通過(guò)@EnableRabbit注解導(dǎo)入的
      • RabbitListenerEndpointRegistry將RabbitListenerEndpoint對(duì)象包裝成MessageListenerContainer對(duì)象,它其中包含MessageListener對(duì)象(包裝了消息處理器)
      • 在MessageListenerContainer的start()方法過(guò)程中,調(diào)用checkMismatchedQueues()方法
      • 通過(guò)調(diào)用CachingConnectionFactory創(chuàng)建connection,在這個(gè)過(guò)程中調(diào)用CompositeConnectionListener的onCreate()方法
      • ConnectionListener是通過(guò)RabbitAdmin對(duì)象添加到CachingConnectionFactory中的
      • 通過(guò)RabbitAdmin對(duì)象聲明Queue,Exchange,Binding
      • 繼續(xù)MessageListenerContainer的start()方法,將消息處理器包裝成AsyncMessageProcessingConsumer對(duì)象,它是一個(gè)Runnable,交給線程池處理,可以通過(guò) concurrentConsumers 屬性來(lái)控制消費(fèi)者數(shù)量從而并發(fā)消費(fèi)
      • AsyncMessageProcessingConsumer中包含一個(gè)BlockingQueueConsumer對(duì)象,在它的start()方法中會(huì)注冊(cè)消息處理的回調(diào),具體類為InternalConsumer
      • 有消息到來(lái)時(shí),InternalConsumer向BlockingQueueConsumer的queue中加入一條消息(推拉結(jié)合)
      • AsyncMessageProcessingConsumer一直在輪訓(xùn)獲取BlockingQueueConsumer的queue,最大等待時(shí)間1秒。
      • 獲取到消息,交給最終的消息處理器處理(就是我們標(biāo)記@RabbitListener注解和@RabbitHandler注解的方法)

      關(guān)于ConfirmCallback和ReturnCallback的使用

      • ConfirmCallback : 注意:只能確認(rèn)消息是否能到達(dá) Exchange
      • ReturnCallback : 注意 它是當(dāng)交換機(jī)路由不到 隊(duì)列的時(shí)候 它才會(huì)被觸發(fā)

      如何保證消息不被重復(fù)消費(fèi)

      每個(gè)消息添加一個(gè)Id,消費(fèi)時(shí)判斷是否已經(jīng)消費(fèi)過(guò)

      //消息發(fā)送方
      Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8))
              .setContentType(
                  MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
              .setContentEncoding(StandardCharsets.UTF_8.name())
              .setMessageId(UUID.randomUUID().toString()).build();
      rabbitTemplate.convertAndSend("test_exchange1", "test_ronting_key1", message);
      //消息消費(fèi)方
      @RabbitHandler
      public void process(String context, Message message, Channel channel) {
          System.out.println("接收到消息: " + context);
          System.out.println(message.getMessageProperties().getMessageId());
          // 不管成功失敗,向原隊(duì)列確認(rèn)消費(fèi)
          try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
          } catch (IOException e) {
            log.error("原隊(duì)列確認(rèn)消費(fèi)失敗:" + e.getLocalizedMessage());
          }
        }
      

      參數(shù)中context的內(nèi)容類似new String(message.getBody(), StandardCharsets.UTF_8);,源碼位于 SimpleMessageConverter 的 fromMessage() 方法的100行

      如何保證消息不丟失

      1. 生產(chǎn)端,使用confirm機(jī)制,發(fā)送失敗重試
      2. RabbitMQ服務(wù)端,隊(duì)列持久化,消息內(nèi)容持久化
      3. 消費(fèi)端,手動(dòng)確認(rèn)機(jī)制

      集群部署

      1. 普通集群模式(無(wú)高可用性): 默認(rèn)模式,以兩個(gè)節(jié)點(diǎn)(rabbit01、rabbit02)為例來(lái)進(jìn)行說(shuō)明。對(duì)于Queue來(lái)說(shuō),消息實(shí)體只存在于其中一個(gè)節(jié)點(diǎn)rabbit01(或者rabbit02),rabbit01和rabbit02兩個(gè)節(jié)點(diǎn)僅有相同的元數(shù)據(jù),即隊(duì)列的結(jié)構(gòu)。當(dāng)消息進(jìn)入rabbit01節(jié)點(diǎn)的Queue后,consumer從rabbit02節(jié)點(diǎn)消費(fèi)時(shí),RabbitMQ會(huì)臨時(shí)在rabbit01、rabbit02間進(jìn)行消息傳輸,把A中的消息實(shí)體取出并經(jīng)過(guò)B發(fā)送給consumer。

      2. 鏡像集群模式(高可用性): 最常用的集群模式,把需要的隊(duì)列做成鏡像隊(duì)列,存在于多個(gè)節(jié)點(diǎn),屬于RabbitMQ的HA方案。該模式解決了普通模式中的問(wèn)題,其實(shí)質(zhì)和普通模式不同之處在于,消息實(shí)體會(huì)主動(dòng)在鏡像節(jié)點(diǎn)間同步,而不是在客戶端取數(shù)據(jù)時(shí)臨時(shí)拉取。

      當(dāng)然,負(fù)載均衡還是需要我們自己做的。

      參考

      RabbitMQ 多實(shí)例 廣播消息_松月的博客-程序員宅基地_rabbitmq 廣播消息
      RabbitMQ集群鏡像模式部署
      消息冪等

      posted @ 2024-05-19 19:15  strongmore  閱讀(74)  評(píng)論(0)    收藏  舉報(bào)
      主站蜘蛛池模板: 久久人爽人人爽人人片av| 亚洲日韩AV秘 无码一区二区 | 国产亚洲精品国产福APP| 国产精品亚洲一区二区z| 最近中文字幕完整版| 99热久久这里只有精品| 极品少妇xxxx| 国产精品久久国产丁香花| 双柏县| 国产精品爽爽爽一区二区| 在线涩涩免费观看国产精品| 精品人妻伦一二二区久久| 亚洲狠狠爱一区二区三区| 国产精品剧情亚洲二区| 东京热人妻丝袜无码AV一二三区观| 日韩一区二区三区在线视频| 国产玩具酱一区二区三区| 亚洲国产超清无码专区| 1区2区3区4区产品不卡码网站| 2021av在线天堂网| 国产无人区码一区二区| 久久精品亚洲中文无东京热| 日本激情久久精品人妻热| 男女性高爱潮免费网站| 青草内射中出高潮| 2020精品自拍视频曝光| 亚洲精品第一区二区三区| 成人av午夜在线观看| 欧美成人精品手机在线| 91人妻无码成人精品一区91| 国产成人精品无码免费看| 国产精品爽爽久久久久久竹菊| 亚洲va久久久噜噜噜久久狠狠| 国产偷自视频区视频| 亚洲av无码国产在丝袜线观看| 另类 专区 欧美 制服丝袜| 亚洲青青草视频在线播放| 国产三级精品三级在线专区1| 人人玩人人添人人澡超碰| 精品无码一区二区三区水蜜桃| 精品人妻一区二区三区蜜臀|