<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      RabbitMQ-基礎(chǔ)

      1. 簡介

      MQ(Message Queue)消息隊(duì)列,是基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)中“FIFO(先進(jìn)先出)”的一種數(shù)據(jù)結(jié)構(gòu)。

      一般用來解決應(yīng)用解耦,異步消息流量削峰等問題,實(shí)現(xiàn)高性能,高可用,可伸縮和最終一致性架構(gòu)。

      應(yīng)用解耦

      MQ相當(dāng)于一個(gè)中介,生產(chǎn)方通過MQ與消費(fèi)方交互,它將應(yīng)用程序進(jìn)行解耦合。

      異步消息

      將不需要同步處理的并且耗時(shí)長的操作由消息隊(duì)列通知消息接收方進(jìn)行異步處理。提高了應(yīng)用程序的響應(yīng)時(shí)間。

      流量削峰

      如訂單系統(tǒng),在下單的時(shí)候就會往數(shù)據(jù)庫寫數(shù)據(jù)。但是數(shù)據(jù)庫只能支撐每秒1000左右的并發(fā)寫入,并發(fā)量再高就容易宕機(jī)。低峰期的時(shí)候并發(fā)也就100多個(gè),但是在高峰期時(shí)候,并發(fā)量會突然激增到5000以上,這個(gè)時(shí)候數(shù)據(jù)庫肯定卡死了。

      這時(shí)候我們可以使用MQ將消息保存起來,然后系統(tǒng)就可以按照自己的消費(fèi)能力來消費(fèi),比如每秒1000個(gè)數(shù)據(jù),這樣慢慢寫入數(shù)據(jù)庫,這樣就不會卡死數(shù)據(jù)庫了。

      但是使用了MQ之后,限制消費(fèi)消息的速度為1000,但是這樣一來,高峰期產(chǎn)生的數(shù)據(jù)勢必會被積壓在MQ中,高峰就被“削”掉了。但是因?yàn)橄⒎e壓,在高峰期過后的一段時(shí)間內(nèi),消費(fèi)消息的速度還是會維持在1000QPS,直到消費(fèi)完積壓的消息,這就叫做“填谷”。

      2. RabbitMQ

      RabbitMQ是由erlang語言開發(fā),基于AMQP(Advanced Message Queue 高級消息隊(duì)列協(xié)議)協(xié)議實(shí)現(xiàn)的消息隊(duì)列。

      RabbitMQ 其實(shí)是一個(gè)消息代理:它接受和轉(zhuǎn)發(fā)消息。可以將其視為郵局:當(dāng)你把 要投遞的郵件放入郵箱時(shí),你可以確定郵遞員最終會將郵件遞送給你的收件人。在這個(gè)比喻中,RabbitMQ 是一個(gè)郵箱、一個(gè)郵局和一個(gè)信件載體。 RabbitMQ 和郵局之間的主要區(qū)別在于它不處理紙張,而是接受、存儲和轉(zhuǎn)發(fā)二進(jìn)制數(shù)據(jù)塊 - 消息。

      3. 模式

      官方網(wǎng)站

      這里僅介紹了常用的模式,最近看官網(wǎng)又多個(gè)模式Publisher Confirms,完了有時(shí)間再補(bǔ)充上。

      關(guān)于官網(wǎng)中提到的第六種模式RPC,由于RPC通信一般不使用RabbitMQ,所以這里也沒有講。

      3.1 簡單模式

      如圖所示:只有一個(gè)生產(chǎn)者(P)一個(gè)隊(duì)列(紅色塊)和 一個(gè)消費(fèi)者(C)。

      應(yīng)用場景:可以實(shí)現(xiàn)對應(yīng)用程序的解耦,并且可以實(shí)現(xiàn)對業(yè)務(wù)的異步處理。事實(shí)上這是mq最基本的功能。

      3.2 工作模式

      如圖所示:一個(gè)生產(chǎn)者對應(yīng)多個(gè)消費(fèi)者。多個(gè)消費(fèi)者功能消費(fèi)一個(gè)隊(duì)列(負(fù)載均衡)。

      每個(gè)消息只能被其中的一個(gè)消費(fèi)者消費(fèi)。

      應(yīng)用場景:對于 任務(wù)過重或任務(wù)較多情況使用工作隊(duì)列可以提高任務(wù)處理的速度。

      3.3 發(fā)布訂閱模式

      如圖所示:在生產(chǎn)者和隊(duì)列之間多了個(gè)交換機(jī)(X),此時(shí)的交換機(jī)類型為:扇形交換機(jī)(Fanout Exchange)。

      事實(shí)上,簡單模式和工作模式也都有自己的Exchange,只不過不用顯性的聲明,因?yàn)槟J(rèn)使用default Exchange。

      即:一個(gè)發(fā)送到Exchange的消息都會被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上。

      每一個(gè)消息能被多個(gè)消費(fèi)者都消費(fèi)。

      Fanout Exchange消息路由規(guī)則如圖所示:

      應(yīng)用場景:顧名思義,一個(gè)消息想被多個(gè)訂閱者消費(fèi)。

      3.4 路由模式

      如圖所示:相比發(fā)布訂閱模式,Exchange和Queue之間多了個(gè)路由關(guān)系,此時(shí)的交換機(jī)類型為:直連交換機(jī)(Direct Exchange)

      • 隊(duì)列和交換機(jī)不是任意綁定了,而是要指定一個(gè)Routingkey。

      • 生產(chǎn)者在向Exchange發(fā)送消息時(shí),也必須指定消息的RoutingKey。

      • Exchange不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的Routing Key進(jìn)行判斷,只有隊(duì)列的Routingkey與消息的 Routing key完全一致,才會接收到消息。

      Direct Exchange消息路由規(guī)則如圖所示:

      3.5 通配符模式/主題模式

      如圖所示:相比路由模式,Exchange和Queue之間不只是通過固定的RoutingKey進(jìn)行綁定,還支持通配符的方式,此時(shí)的交換機(jī)類型為:主題交換機(jī)/通配符交換機(jī)(Topic Exchange)。

      Topic Exchange消息路由規(guī)則如圖所示:

      3. 安裝RabbitMQ

      version: '2'
      services:
          rabbitmq:
             hostname: rabbitmq
             image: rabbitmq:3.8.3-management
             restart: always
             environment:
               # 默認(rèn)的用戶名
               - RABBITMQ_DEFAULT_USER=admin
               # 默認(rèn)的密碼
               - RABBITMQ_DEFAULT_PASS=admin123
             volumes:
               - ./data:/var/lib/rabbitmq
               - ./log:/var/log/rabbitmq/log
             ports:
               # rabbit ui 默認(rèn)端口
               - "15672:15672"
               # Epmd 是 Erlang Port Mapper Daemon 的縮寫,
               # 在 Erlang 集群中相當(dāng)于 dns 的作用,綁定在4369端口上
               - "4369:4369"
               # rabbit 默認(rèn)的端口
               - "5672:5672"
               # 25672端口用于節(jié)點(diǎn)間和CLI工具通信(Erlang分發(fā)服務(wù)器端口),
               # 并從動態(tài)范圍分配(默認(rèn)情況下僅限于單個(gè)端口,
               # 計(jì)算方式為AMQP 0-9-1和AMQP 1.0端口+20000),
               # 默認(rèn)情況下通過 RABBITMQ_NODE_PORT 計(jì)算是25672
               - "25672:25672"
      

      4. 各種模式的簡單實(shí)現(xiàn)

      4.1 項(xiàng)目搭建

      4.1.1 引入依賴

      我們這里使用spring-boot-starter-amqp操作RabbitMQ。

      <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
          <modelVersion>4.0.0</modelVersion>
          <parent>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-parent</artifactId>
              <version>2.5.4</version>
              <relativePath/> <!-- lookup parent from repository -->
          </parent>
          <groupId>com.ldx</groupId>
          <artifactId>rabbitmq</artifactId>
          <version>0.0.1-SNAPSHOT</version>
          <name>rabbitmq</name>
          <description>Demo project for Spring Boot</description>
          <properties>
              <java.version>1.8</java.version>
          </properties>
          <dependencies>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-web</artifactId>
              </dependency>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-amqp</artifactId>
              </dependency>
              <dependency>
                  <groupId>org.projectlombok</groupId>
                  <artifactId>lombok</artifactId>
                  <optional>true</optional>
              </dependency>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-test</artifactId>
                  <scope>test</scope>
              </dependency>
              <dependency>
                  <groupId>org.springframework.amqp</groupId>
                  <artifactId>spring-rabbit-test</artifactId>
                  <scope>test</scope>
              </dependency>
          </dependencies>
      </project>
      

      4.1.2 application.yaml

      spring:
        rabbitmq:
          host: localhost
          port: 5672
          # rabbit 默認(rèn)的虛擬主機(jī)
          virtual-host: /
          # rabbitmq 安裝時(shí)指定的超管信息
          username: admin
          password: admin123
      

      4.2 簡單模式

      4.2.1 聲明一個(gè)簡單隊(duì)列

      package com.ldx.rabbitmq.config;
      
      import org.springframework.amqp.core.Queue;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      /**
       * rabbit 快速開始
       *
       * @author ludangxin
       * @date 2021/8/23
       */
      @Configuration
      public class RabbitSimpleConfig {
      
          /**
           * 設(shè)置一個(gè)簡單的隊(duì)列
           */
          @Bean
          public Queue queue() {
              return new Queue("helloMQ");
          }
      }
      

      4.2.2 創(chuàng)建生產(chǎn)者

      package com.ldx.rabbitmq.producer;
      
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.stereotype.Component;
      
      /**
       * 生產(chǎn)者
       *
       * @author ludangxin
       * @date 2021/8/23
       */
      @Component
      public class SimpleProducer {
      
          @Autowired
          private RabbitTemplate rabbitTemplate;
      
          public void send() {
              String context = "helloMQ " + System.currentTimeMillis();
              rabbitTemplate.convertAndSend("helloMQ", context);
          }
      }
      

      4.2.3 創(chuàng)建消費(fèi)者

      package com.ldx.rabbitmq.consumer;
      
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.amqp.rabbit.annotation.RabbitHandler;
      import org.springframework.amqp.rabbit.annotation.RabbitListener;
      import org.springframework.stereotype.Component;
      
      /**
       * 消費(fèi)者
       *
       * @author ludangxin
       * @date 2021/8/23
       */
      @Slf4j
      @Component
      @RabbitListener(queues = {"helloMQ"})
      public class SimpleConsumer {
        
          @RabbitHandler
          public void process(String hello) {
              log.info("Message:{} ", hello);
          }
        
      }
      

      4.2.4 創(chuàng)建測試類

      package com.ldx.rabbitmq;
      
      import com.ldx.rabbitmq.producer.SimpleProducer;
      import org.junit.jupiter.api.Test;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.boot.test.context.SpringBootTest;
      
      @SpringBootTest
      public class RabbitMQTest {
      
          @Autowired
          private SimpleProducer simpleSender;
      
          @Test
          public void hello() throws Exception {
              // 每秒發(fā)送一條消息
              for (int i = 0; i < 10; i++) {
                  simpleSender.send();
                  Thread.sleep(1000);
              }
          }
      }
      

      4.2.5 啟動測試

      啟動測試類,輸出內(nèi)容如下:

      每秒消費(fèi)一條消息。

      2021-09-08 23:58:01.837  INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631116681827 
      2021-09-08 23:58:02.839  INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631116682833 
      2021-09-08 23:58:03.842  INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631116683838 
      2021-09-08 23:58:04.852  INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631116684843 
      2021-09-08 23:58:05.853  INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631116685844 
      2021-09-08 23:58:06.853  INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631116686847 
      2021-09-08 23:58:07.857  INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631116687850 
      2021-09-08 23:58:08.863  INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631116688855 
      2021-09-08 23:58:09.868  INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631116689858 
      2021-09-08 23:58:10.870  INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631116690862 
      

      4.2.6 小節(jié)

      簡單模式,顧名思義,很簡單,相當(dāng)于Hello World程序。我們在編寫的時(shí)候

      1. 指定了一個(gè)Queue并且名稱為helloMQ。
      2. 消息生產(chǎn)者通過SpringBoot 提供的RabbitTemplate發(fā)送消息,我們在發(fā)送時(shí)指定了QueuehelloMQ且發(fā)送了指定內(nèi)容。
      3. 消息消費(fèi)者通過@RabbitListener注解監(jiān)聽了指定QueuehelloMQ,且使用@RabbitHandler注解指定消費(fèi)方法SimpleConsumer::process()
      4. 最后編寫測試類循環(huán)調(diào)用生產(chǎn)者消息發(fā)送邏輯,實(shí)現(xiàn)了消息的生產(chǎn)與消費(fèi)。

      4.3 工作模式

      首先分析:其實(shí)工作模式和簡單模式相比,僅僅是由一個(gè)消費(fèi)者變成了多個(gè)消費(fèi)者。ok,很好辦,我們通過代碼再多加一個(gè)消費(fèi)者即可。

      4.3.1 添加消費(fèi)者

      package com.ldx.rabbitmq.consumer;
      
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.amqp.rabbit.annotation.RabbitHandler;
      import org.springframework.amqp.rabbit.annotation.RabbitListener;
      import org.springframework.stereotype.Component;
      
      /**
       * 消費(fèi)者
       *
       * @author ludangxin
       * @date 2021/8/23
       */
      @Slf4j
      @Component
      @RabbitListener(queues = {"helloMQ"})
      public class SimpleConsumer2 {
      
          @RabbitHandler
          public void process(String hello) {
              log.info("Message2:{} ", hello);
          }
      
      }
      

      4.3.2 啟動測試

      我們再次執(zhí)行test方法,查看消息消費(fèi)情況。

      輸出日志如下:

      SimpleConsumerSimpleConsumer2交替消費(fèi)隊(duì)列中的消息(消費(fèi)者之間消費(fèi)消息是通過輪詢的關(guān)系)。

      2021-09-09 20:24:35.043  INFO 41927 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631190275019 
      2021-09-09 20:24:36.038  INFO 41927 --- [ntContainer#1-1] c.ldx.rabbitmq.consumer.SimpleConsumer2  : Message2:helloMQ 1631190276029 
      2021-09-09 20:24:37.036  INFO 41927 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631190277032 
      2021-09-09 20:24:38.046  INFO 41927 --- [ntContainer#1-1] c.ldx.rabbitmq.consumer.SimpleConsumer2  : Message2:helloMQ 1631190278036 
      2021-09-09 20:24:39.049  INFO 41927 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631190279041 
      2021-09-09 20:24:40.049  INFO 41927 --- [ntContainer#1-1] c.ldx.rabbitmq.consumer.SimpleConsumer2  : Message2:helloMQ 1631190280042 
      2021-09-09 20:24:41.054  INFO 41927 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631190281047 
      2021-09-09 20:24:42.060  INFO 41927 --- [ntContainer#1-1] c.ldx.rabbitmq.consumer.SimpleConsumer2  : Message2:helloMQ 1631190282051 
      2021-09-09 20:24:43.062  INFO 41927 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer   : Message:helloMQ 1631190283055 
      2021-09-09 20:24:44.074  INFO 41927 --- [ntContainer#1-1] c.ldx.rabbitmq.consumer.SimpleConsumer2  : Message2:helloMQ 1631190284057 
      

      4.3.3 小節(jié)

      在一個(gè)隊(duì)列中如果有多個(gè)消費(fèi)者,那么消費(fèi)者之間是輪詢的關(guān)系。

      4.4 發(fā)布訂閱模式

      首先分析:發(fā)布訂閱模式其實(shí)是將消息先發(fā)送給扇形交換機(jī),交換機(jī)再將消息轉(zhuǎn)發(fā)給其綁定到此交換機(jī)的隊(duì)列上。

      這里,我們聲明一個(gè)交換機(jī),給交換機(jī)綁定兩個(gè)隊(duì)列,并且使用兩個(gè)消費(fèi)者分別綁定到兩個(gè)隊(duì)列上(其實(shí)就是為了和3.3保持一致)。

      4.4.1 聲明交換機(jī)和隊(duì)列

      package com.ldx.rabbitmq.config;
      
      import org.springframework.amqp.core.*;
      import org.springframework.beans.factory.annotation.Qualifier;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      /**
       * 扇形交換機(jī)配置
       *
       * @author ludangxin
       * @date 2021/9/9
       */
      @Configuration
      public class RabbitFanoutConfig {
      
          public static final String EXCHANGE_NAME = "FANOUT_EXCHANGE";
          public static final String QUEUE_NAME = "FANOUT_QUEUE";
          public static final String QUEUE_NAME_1 = "FANOUT_QUEUE_1";
      
          /**
           * 1.交換機(jī)
           */
          @Bean(EXCHANGE_NAME)
          public Exchange fanoutExchange() {
              return ExchangeBuilder.fanoutExchange(EXCHANGE_NAME).durable(true).build();
          }
      
          /**
           * 2.Queue 隊(duì)列
           */
          @Bean(QUEUE_NAME)
          public Queue fanoutQueue() {
              return QueueBuilder.durable(QUEUE_NAME).build();
          }
      
          /**
           * 2.1 Queue 隊(duì)列
           */
          @Bean(QUEUE_NAME_1)
          public Queue fanoutQueue1() {
              return QueueBuilder.durable(QUEUE_NAME_1).build();
          }
      
          /**
           * 3. 隊(duì)列和交互機(jī)綁定關(guān)系 Binding
           */
          @Bean
          public Binding bindFanoutExchange(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
              // fanout :routing key 默認(rèn)為 "",指定了別的值也沒用
              return BindingBuilder.bind(queue).to(exchange).with("").noargs();
          }
      
          /**
           * 3.1 隊(duì)列和交互機(jī)綁定關(guān)系 Binding
           */
          @Bean
          public Binding bindFanoutExchange1(@Qualifier(QUEUE_NAME_1) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
              // fanout :routing key 默認(rèn)為 "",指定了別的值也沒用,我們這里隨便寫個(gè)值,看會不會有影響
              return BindingBuilder.bind(queue).to(exchange).with("aaabbb").noargs();
          }
      }
      

      4.4.2 創(chuàng)建生產(chǎn)者

      package com.ldx.rabbitmq.producer;
      
      import com.ldx.rabbitmq.config.RabbitFanoutConfig;
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.stereotype.Component;
      
      /**
       * 扇形交換機(jī)消息生產(chǎn)者
       *
       * @author ludangxin
       * @date 2021/9/9
       */
      @Component
      public class FanoutProducer {
      
          @Autowired
          private RabbitTemplate rabbitTemplate;
      
         public void sendWithFanout() {
              rabbitTemplate.convertAndSend(RabbitFanoutConfig.EXCHANGE_NAME, "", "fanout mq hello~~~");
              // 指定一個(gè)routingKey 看消費(fèi)方能不能正常接收消息
              rabbitTemplate.convertAndSend(RabbitFanoutConfig.EXCHANGE_NAME, "abc", "fanout2 mq hello~~~");
          }
      }
      

      4.4.3 創(chuàng)建消費(fèi)者

      package com.ldx.rabbitmq.consumer;
      
      import com.ldx.rabbitmq.config.RabbitFanoutConfig;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.amqp.rabbit.annotation.RabbitListener;
      import org.springframework.stereotype.Component;
      
      /**
       * 扇形交換機(jī)消息消費(fèi)者
       *
       * @author ludangxin
       * @date 2021/9/9
       */
      @Slf4j
      @Component
      public class FanoutConsumer {
      
          @RabbitListener(queues = {RabbitFanoutConfig.QUEUE_NAME})
          public void process(String message){
              log.info("queue === " + message);
          }
      
          @RabbitListener(queues = {RabbitFanoutConfig.QUEUE_NAME_1})
          public void process1(String message){
              log.info("queue1 === " + message);
          }
      }
      

      4.4.4 創(chuàng)建測試代碼

          @Autowired
          private FanoutProducer producer;
      
          @Test
          @SneakyThrows
          public void sendWithFanout(){
              producer.sendWithFanout();
              // 為了阻塞進(jìn)程,使消費(fèi)者能正常消費(fèi)。
              System.in.read();
          }
      

      4.4.5 啟動測試

      執(zhí)行測試方法,輸出內(nèi)容如下:

      生產(chǎn)者發(fā)送的兩條消息,被兩個(gè)消費(fèi)者共同消費(fèi)了。實(shí)現(xiàn)了消息的廣播。

      2021-09-09 21:59:17.538  INFO 43749 --- [ntContainer#1-1] c.ldx.rabbitmq.consumer.FanoutConsumer   : queue1 === fanout mq hello~~~
      2021-09-09 21:59:17.538  INFO 43749 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.FanoutConsumer   : queue === fanout mq hello~~~
      2021-09-09 21:59:17.539  INFO 43749 --- [ntContainer#1-1] c.ldx.rabbitmq.consumer.FanoutConsumer   : queue1 === fanout2 mq hello~~~
      2021-09-09 21:59:17.539  INFO 43749 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.FanoutConsumer   : queue === fanout2 mq hello~~~
      

      4.4.6 小節(jié)

      本節(jié)代碼中我們創(chuàng)建了一個(gè)fanout Exchange,并且創(chuàng)建了兩個(gè)隊(duì)列與其綁定,其中一個(gè)隊(duì)列進(jìn)行綁定的時(shí)候還指定了routing key,但程序執(zhí)行時(shí)消息正常被消費(fèi),說明fanout Exchange不用指定routing key。

      發(fā)布訂閱模式與工作隊(duì)列模式的區(qū)別

      1、工作隊(duì)列模式不用定義交換機(jī),而發(fā)布/訂閱模式需要定義交換機(jī)。

      2、發(fā)布/訂閱模式的生產(chǎn)方是面向交換機(jī)發(fā)送消息,工作隊(duì)列模式的生產(chǎn)方是面向隊(duì)列發(fā)送消息(底層使用默認(rèn)交換機(jī))。

      3、發(fā)布/訂閱模式需要設(shè)置隊(duì)列和交換機(jī)的綁定,工作隊(duì)列模式不需要設(shè)置,實(shí)際上工作隊(duì)列模式會將隊(duì)列綁定到默認(rèn)的交換機(jī) 。

      4.5 路由模式

      首先分析:路由模式其實(shí)就是將 發(fā)布訂閱模式中的 fanout Exchange 換成了 direct Exchange 從而指定相應(yīng)的路由規(guī)則即可。

      4.5.1 聲明交換機(jī)和隊(duì)列

      package com.ldx.rabbitmq.config;
      
      import org.springframework.amqp.core.*;
      import org.springframework.beans.factory.annotation.Qualifier;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      /**
       * 直連交換機(jī)配置
       *
       * @author ludangxin
       * @date 2021/9/9
       */
      @Configuration
      public class RabbitDirectConfig {
      
          public static final String EXCHANGE_NAME = "DIRECT_EXCHANGE";
          public static final String QUEUE_NAME_INSERT = "DIRECT_QUEUE_INSERT";
          public static final String QUEUE_NAME_UPDATE = "DIRECT_QUEUE_UPDATE";
      
          /**
           * 1.交換機(jī)
           */
          @Bean(EXCHANGE_NAME)
          public Exchange bootExchange() {
              return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();
          }
      
          /**
           * 2.Queue insert隊(duì)列
           */
          @Bean(QUEUE_NAME_INSERT)
          public Queue bootQueueInsert() {
              return QueueBuilder.durable(QUEUE_NAME_INSERT).build();
          }
      
          /**
           * 2.Queue update隊(duì)列
           */
          @Bean(QUEUE_NAME_UPDATE)
          public Queue bootQueueUpdate() {
              return QueueBuilder.durable(QUEUE_NAME_UPDATE).build();
          }
      
          /**
           * 3. 綁定insert 隊(duì)列
           * 3. routing key: insert
           */
          @Bean
          public Binding bindInsertDirectExchange(@Qualifier(QUEUE_NAME_INSERT) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
              return BindingBuilder.bind(queue).to(exchange).with("insert").noargs();
          }
      
          /**
           * 3. 綁定update 隊(duì)列
           * 3. routing key: update
           */
          @Bean
          public Binding bindUpdateDirectExchange(@Qualifier(QUEUE_NAME_UPDATE) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
              return BindingBuilder.bind(queue).to(exchange).with("update").noargs();
          }
      }
      

      4.5.2 創(chuàng)建生產(chǎn)者

      package com.ldx.rabbitmq.producer;
      
      import com.ldx.rabbitmq.config.RabbitDirectConfig;
      import com.ldx.rabbitmq.config.RabbitFanoutConfig;
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.stereotype.Component;
      
      /**
       * 直連交換機(jī)消息生產(chǎn)者
       *
       * @author ludangxin
       * @date 2021/9/9
       */
      @Component
      public class DirectProducer {
      
          @Autowired
          private RabbitTemplate rabbitTemplate;
      
         public void sendWithDirect() {
            rabbitTemplate.convertAndSend(RabbitDirectConfig.EXCHANGE_NAME, "insert", "diect insert mq hello~~~");
            rabbitTemplate.convertAndSend(RabbitDirectConfig.EXCHANGE_NAME, "update", "diect update mq hello~~~");
            // 指定一個(gè)沒有配置的routingKey 看消費(fèi)方能不能接收消息
            rabbitTemplate.convertAndSend(RabbitDirectConfig.EXCHANGE_NAME, "delete", "diect update mq hello~~~");
         }
      }
      

      4.5.3 創(chuàng)建消息者

      package com.ldx.rabbitmq.consumer;
      
      import com.ldx.rabbitmq.config.RabbitDirectConfig;
      import com.ldx.rabbitmq.config.RabbitFanoutConfig;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.amqp.core.Message;
      import org.springframework.amqp.rabbit.annotation.RabbitListener;
      import org.springframework.stereotype.Component;
      
      /**
       * 直連交換機(jī)消息消費(fèi)者
       *
       * @author ludangxin
       * @date 2021/9/9
       */
      @Slf4j
      @Component
      public class DirectConsumer {
      		/**
      		 * @param message message 為springboot 封裝的消息存儲的實(shí)例對象,其對象中不僅封裝了生產(chǎn)者發(fā)送的消息
      		 *    而且也封裝了很多消息的元數(shù)據(jù),例如:headers contentType receivedRoutingKey ...
      		 */
          @RabbitListener(queues = {RabbitDirectConfig.QUEUE_NAME_INSERT, RabbitDirectConfig.QUEUE_NAME_UPDATE})
          public void directQueue(Message message){
              log.info(message.toString());
              log.info(new String(message.getBody()));
          }
      
      }
      

      4.5.4 創(chuàng)建測試代碼

          @Autowired
          private DirectProducer directProducer;
      
          @Test
          @SneakyThrows
          public void sendWithDirect() {
              directProducer.sendWithDirect();
              System.in.read();
          }
      

      4.5.5 啟動測試

      執(zhí)行測試代碼,輸出內(nèi)容如下:

      insert 和 update 對應(yīng)的消息都被正常消費(fèi),其中值得注意的是指定routing key=delete的消息丟失了,因?yàn)殛?duì)列與交換機(jī)綁定時(shí)根本沒有此routing key,而交換機(jī)之所以叫交換機(jī),因?yàn)槠洳淮鎯ο?,只是轉(zhuǎn)發(fā)消息,其沒有持久化消息的能力,所以消息還沒有到queue,然后嗝屁。

      2021-09-09 22:38:49.451  INFO 44436 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.DirectConsumer   : (Body:'diect insert mq hello~~~' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=DIRECT_EXCHANGE, receivedRoutingKey=insert, deliveryTag=1, consumerTag=amq.ctag-WJmYhQDljkKkM1pFeW99Yg, consumerQueue=DIRECT_QUEUE_INSERT])
      2021-09-09 22:38:49.451  INFO 44436 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.DirectConsumer   : diect insert mq hello~~~
      2021-09-09 22:38:49.452  INFO 44436 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.DirectConsumer   : (Body:'diect update mq hello~~~' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=DIRECT_EXCHANGE, receivedRoutingKey=update, deliveryTag=2, consumerTag=amq.ctag-guzpfaF0BdII70w85ywiCg, consumerQueue=DIRECT_QUEUE_UPDATE])
      2021-09-09 22:38:49.452  INFO 44436 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.DirectConsumer   : diect update mq hello~~~
      

      4.5.6 小節(jié)

      路由模式特點(diǎn):

      • 隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個(gè)rutingKey(路由key)。
      • 消息的發(fā)送方在 向 Exchange發(fā)送消息時(shí),也必須指定消息的 routingKey。
      • Exchange不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的routing Key進(jìn)行判斷,只有隊(duì)列的routingkey與消息的 routing key完全一致,才會接收到消息。

      4.6 主題模式

      首先分析:通配符模式其實(shí)就是將 路由模式中的 direct Exchange 換成了 topic Exchange, 使其不僅可以將exchangequeuerouting key全匹配的方式進(jìn)行綁定,而且還支持通配符

      routingkey 一般都是有一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以”.”分割,例如: item.insert

      通配符規(guī)則:

      #:匹配一個(gè)或多個(gè)單詞

      *:匹配一個(gè)單詞

      舉例:

      item.#:能夠匹配item.insert.abc 或者 item.insert

      item.*:只能匹配item.insert

      圖解:

      • 紅色Queue:綁定的是usa.# ,因此凡是以 usa.開頭的routing key 都會被匹配到
      • 黃色Queue:綁定的是#.news ,因此凡是以 .news結(jié)尾的 routing key 都會被匹配

      4.6.1 聲明交換機(jī)和隊(duì)列

      package com.ldx.rabbitmq.config;
      
      import org.springframework.amqp.core.*;
      import org.springframework.beans.factory.annotation.Qualifier;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      /**
       * 主題交換機(jī)配置
       *
       * @author ludangxin
       * @date 2021/9/9
       */
      @Configuration
      public class RabbitTopicConfig {
      
          public static final String EXCHANGE_NAME = "TOPIC_EXCHANGE";
          public static final String QUEUE_NAME1 = "TOPIC_QUEUE1";
          public static final String QUEUE_NAME2 = "TOPIC_QUEUE2";
      
          /**
           * 1.交換機(jī)
           * topicExchange:通配符,把消息交給符合routing pattern(路由模式) 的隊(duì)列
           */
          @Bean(EXCHANGE_NAME)
          public Exchange bootExchange() {
              return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
          }
      
          /**
           * 2.Queue 隊(duì)列
           */
          @Bean(QUEUE_NAME1)
          public Queue bootQueue1() {
              return QueueBuilder.durable(QUEUE_NAME1).build();
          }
      
          /**
           * 2.Queue 隊(duì)列
           */
          @Bean(QUEUE_NAME2)
          public Queue bootQueue2() {
              return QueueBuilder.durable(QUEUE_NAME2).build();
          }
      
          /**
           * 3. 隊(duì)列和交互機(jī)綁定關(guān)系 Binding
           * 匹配 routing key 以 insert 開頭的 如 insert.user ; insert.user.log
           */
          @Bean
          public Binding bindTopicExchange1(@Qualifier(QUEUE_NAME1) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
              return BindingBuilder.bind(queue).to(exchange).with("insert.#").noargs();
          }
      
          /**
           * 3. 隊(duì)列和交互機(jī)綁定關(guān)系 Binding
           * routing key 中的 * 只能匹配單個(gè)單詞
           * 匹配 routing key 以 update 開頭的 如 update.user
           * 不能匹配 如 update.user.log
           */
          @Bean
          public Binding bindTopicExchange2(@Qualifier(QUEUE_NAME1) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
              return BindingBuilder.bind(queue).to(exchange).with("update.*").noargs();
          }
      
          /**
           * 3. 隊(duì)列和交互機(jī)綁定關(guān)系 Binding
           * routing key 中的 * 只能匹配單個(gè)單詞
           * 匹配 routing key 以 . 分割的
           * 不能匹配 如 update.user.log
           */
          @Bean
          public Binding bindTopicExchange3(@Qualifier(QUEUE_NAME2) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
              return BindingBuilder.bind(queue).to(exchange).with("*.*").noargs();
          }
      }
      

      4.6.2 創(chuàng)建生產(chǎn)者

      package com.ldx.rabbitmq.producer;
      
      import com.ldx.rabbitmq.config.RabbitTopicConfig;
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.stereotype.Component;
      
      /**
       * 主題交換機(jī)消息生產(chǎn)者
       *
       * @author ludangxin
       * @date 2021/9/9
       */
      @Component
      public class TopicProducer {
      
          @Autowired
          private RabbitTemplate rabbitTemplate;
      
         public void sendWithTopic() {
            rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE_NAME, "insert.user.log", "topic mq hello~~~ routing is insert.user.lo");
            rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE_NAME, "update.user", "topic mq hello~~~ routing is update.user");
            rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE_NAME, "update.user.log", "topic mq hello~~~ routing is update.user.log");
            rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE_NAME, "delete.user", "topic mq hello~~~ routing is delete.user");
            rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE_NAME, "delete.user.log", "topic mq hello~~~routing is delete.user.log");
         }
      }
      

      4.6.3 創(chuàng)建消費(fèi)者

      package com.ldx.rabbitmq.consumer;
      
      import com.ldx.rabbitmq.config.RabbitTopicConfig;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.amqp.core.Message;
      import org.springframework.amqp.rabbit.annotation.RabbitListener;
      import org.springframework.stereotype.Component;
      
      /**
       * 主題交換機(jī)消息消費(fèi)者
       *
       * @author ludangxin
       * @date 2021/9/9
       */
      @Slf4j
      @Component
      public class TopicConsumer {
      
          @RabbitListener(queues = {RabbitTopicConfig.QUEUE_NAME1, RabbitTopicConfig.QUEUE_NAME2})
          public void topicQueue(Message message){
              log.info(message.toString());
              log.info(new String(message.getBody()));
          }
      }
      

      4.6.4 創(chuàng)建測試代碼

      @Autowired
      private TopicProducer topicProducer;
      
      @Test
      @SneakyThrows
      public void sendWithTopic() {
          topicProducer.sendWithTopic();
          System.in.read();
      }
      

      4.6.5 啟動測試

      執(zhí)行測試代碼,輸入內(nèi)容如下:

      其中符合通配符條件的消息均已消費(fèi)。

      2021-09-09 23:02:57.131  INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer  : (Body:'topic mq hello~~~ routing is insert.user.lo' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=TOPIC_EXCHANGE, receivedRoutingKey=insert.user.log, deliveryTag=1, consumerTag=amq.ctag-PeBOPjJFHMF3BMW1zDXCvw, consumerQueue=TOPIC_QUEUE1])
      2021-09-09 23:02:57.132  INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer  : topic mq hello~~~ routing is insert.user.lo
      2021-09-09 23:02:57.132  INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer  : (Body:'topic mq hello~~~ routing is update.user' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=TOPIC_EXCHANGE, receivedRoutingKey=update.user, deliveryTag=2, consumerTag=amq.ctag-PeBOPjJFHMF3BMW1zDXCvw, consumerQueue=TOPIC_QUEUE1])
      2021-09-09 23:02:57.132  INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer  : topic mq hello~~~ routing is update.user
      2021-09-09 23:02:57.133  INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer  : (Body:'topic mq hello~~~ routing is update.user' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=TOPIC_EXCHANGE, receivedRoutingKey=update.user, deliveryTag=3, consumerTag=amq.ctag-ocNDmCGDF8-aPxJ4lK1c8g, consumerQueue=TOPIC_QUEUE2])
      2021-09-09 23:02:57.133  INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer  : topic mq hello~~~ routing is update.user
      2021-09-09 23:02:57.133  INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer  : (Body:'topic mq hello~~~ routing is delete.user' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=TOPIC_EXCHANGE, receivedRoutingKey=delete.user, deliveryTag=4, consumerTag=amq.ctag-ocNDmCGDF8-aPxJ4lK1c8g, consumerQueue=TOPIC_QUEUE2])
      2021-09-09 23:02:57.133  INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer  : topic mq hello~~~ routing is delete.user
      

      4.6.6 小節(jié)

      Topic主題模式可以實(shí)現(xiàn) Publish/Subscribe發(fā)布與訂閱模式 Routing路由模式 的功能;只是Topic在配置routing key 的時(shí)候可以使用通配符,顯得更加靈活。

      5. 模式總結(jié)

      RabbitMQ工作模式:
      1、簡單模式 HelloWorld
      一個(gè)生產(chǎn)者、一個(gè)消費(fèi)者,不需要設(shè)置交換機(jī)(使用default Exchange)。

      2、工作隊(duì)列模式 Work Queue
      一個(gè)生產(chǎn)者、多個(gè)消費(fèi)者(平均分配消息),不需要設(shè)置交換機(jī)(使用default Exchange)。

      3、發(fā)布訂閱模式 Publish/subscribe
      需要設(shè)置交換機(jī)類型為fanout Exchange,并且交換機(jī)和隊(duì)列進(jìn)行綁定,當(dāng)發(fā)送消息到交換機(jī)后,交換機(jī)會將消息發(fā)送到綁定的隊(duì)列。

      4、路由模式 Routing
      需要設(shè)置交換機(jī)類型為direct Exchange,交換機(jī)和隊(duì)列進(jìn)行綁定,并且指定routing key,發(fā)送消息時(shí)也要指定對應(yīng)的routing key到交換機(jī),交換機(jī)會根據(jù)routing key將消息發(fā)送到對應(yīng)的隊(duì)列。

      5、主題模式 Topic
      需要設(shè)置交換機(jī)類型為topic Exchange,,交換機(jī)和隊(duì)列進(jìn)行綁定,并且指定通配符方式的routing key,發(fā)送消息時(shí)指定routing key到交換機(jī)后,交換機(jī)會根據(jù)routing key規(guī)則將消息發(fā)送到對應(yīng)的隊(duì)列。主題模式比上面四類更靈活。

      posted @ 2021-09-09 23:28  張鐵牛  閱讀(226)  評論(0)    收藏  舉報(bào)
      主站蜘蛛池模板: 国产精品一码二码三码| 亚洲成人av高清在线| 临桂县| 色综合夜夜嗨亚洲一二区| 亚洲中文字幕无码久久精品1| 国产精品美女www爽爽爽视频| 亚洲AV日韩AV高清在线观看| 日韩深夜免费在线观看| 麻豆精品在线| 免费看国产精品3a黄的视频| 久久夜色精品国产噜噜亚洲sv| 精品久久国产字幕高潮| 18禁精品一区二区三区| 亚洲av无码成人精品区一区| 好吊妞人成视频在线观看27du| 国产亚洲精品一区二区无| 94人妻少妇偷人精品| 自拍视频在线观看成人| 亚洲产在线精品亚洲第一站一| 亚洲精品视频一二三四区| 国产在线观看免费观看| 丰满的人妻hd高清日本| 亚洲AV午夜成人无码电影| 一区二区三区精品不卡| 成av人电影在线观看| 亚洲人亚洲人成电影网站色| 好紧好滑好湿好爽免费视频| 国产女人18毛片水真多1| 人妻熟女一区无中文字幕| 国产精品免费看久久久| 成人网站免费观看永久视频下载 | 青草热在线观看精品视频| 色成人亚洲| 中文字幕有码在线第十页| 色欲久久久天天天综合网精品| 亚洲最大av一区二区| av在线播放观看国产| 狠狠亚洲色一日本高清色| 国产精品午夜福利小视频| AV最新高清无码专区| 亚洲精品成人无限看|