RabbitMQ-基礎(chǔ)
1. 簡介
MQ(Message Queue)消息隊(duì)列,是基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)中“FIFO(先進(jìn)先出)”的一種數(shù)據(jù)結(jié)構(gòu)。
一般用來解決應(yīng)用解耦,異步消息,流量削峰等問題,實(shí)現(xiàn)高性能,高可用,可伸縮和最終一致性架構(gòu)。
應(yīng)用解耦
MQ相當(dāng)于一個(gè)中介,生產(chǎn)方通過MQ與消費(fèi)方交互,它將應(yīng)用程序進(jìn)行解耦合。
異步消息
將不需要同步處理的并且耗時(shí)長的操作由消息隊(duì)列通知消息接收方進(jìn)行異步處理。提高了應(yīng)用程序的響應(yīng)時(shí)間。
流量削峰
如訂單系統(tǒng),在下單的時(shí)候就會往數(shù)據(jù)庫寫數(shù)據(jù)。但是數(shù)據(jù)庫只能支撐每秒1000左右的并發(fā)寫入,并發(fā)量再高就容易宕機(jī)。低峰期的時(shí)候并發(fā)也就100多個(gè),但是在高峰期時(shí)候,并發(fā)量會突然激增到5000以上,這個(gè)時(shí)候數(shù)據(jù)庫肯定卡死了。
這時(shí)候我們可以使用MQ將消息保存起來,然后系統(tǒng)就可以按照自己的消費(fèi)能力來消費(fèi),比如每秒1000個(gè)數(shù)據(jù),這樣慢慢寫入數(shù)據(jù)庫,這樣就不會卡死數(shù)據(jù)庫了。
但是使用了MQ之后,限制消費(fèi)消息的速度為1000,但是這樣一來,高峰期產(chǎn)生的數(shù)據(jù)勢必會被積壓在MQ中,高峰就被“削”掉了。但是因?yàn)橄⒎e壓,在高峰期過后的一段時(shí)間內(nèi),消費(fèi)消息的速度還是會維持在1000QPS,直到消費(fèi)完積壓的消息,這就叫做“填谷”。

2. RabbitMQ
RabbitMQ是由erlang語言開發(fā),基于AMQP(Advanced Message Queue 高級消息隊(duì)列協(xié)議)協(xié)議實(shí)現(xiàn)的消息隊(duì)列。
RabbitMQ 其實(shí)是一個(gè)消息代理:它接受和轉(zhuǎn)發(fā)消息。可以將其視為郵局:當(dāng)你把 要投遞的郵件放入郵箱時(shí),你可以確定郵遞員最終會將郵件遞送給你的收件人。在這個(gè)比喻中,RabbitMQ 是一個(gè)郵箱、一個(gè)郵局和一個(gè)信件載體。 RabbitMQ 和郵局之間的主要區(qū)別在于它不處理紙張,而是接受、存儲和轉(zhuǎn)發(fā)二進(jìn)制數(shù)據(jù)塊 - 消息。
3. 模式
這里僅介紹了常用的模式,最近看官網(wǎng)又多個(gè)模式Publisher Confirms,完了有時(shí)間再補(bǔ)充上。
關(guān)于官網(wǎng)中提到的第六種模式RPC,由于RPC通信一般不使用RabbitMQ,所以這里也沒有講。
3.1 簡單模式
如圖所示:只有一個(gè)生產(chǎn)者(P)一個(gè)隊(duì)列(紅色塊)和 一個(gè)消費(fèi)者(C)。

應(yīng)用場景:可以實(shí)現(xiàn)對應(yīng)用程序的解耦,并且可以實(shí)現(xiàn)對業(yè)務(wù)的異步處理。事實(shí)上這是mq最基本的功能。
3.2 工作模式
如圖所示:一個(gè)生產(chǎn)者對應(yīng)多個(gè)消費(fèi)者。多個(gè)消費(fèi)者功能消費(fèi)一個(gè)隊(duì)列(負(fù)載均衡)。
每個(gè)消息只能被其中的一個(gè)消費(fèi)者消費(fèi)。

應(yīng)用場景:對于 任務(wù)過重或任務(wù)較多情況使用工作隊(duì)列可以提高任務(wù)處理的速度。
3.3 發(fā)布訂閱模式
如圖所示:在生產(chǎn)者和隊(duì)列之間多了個(gè)交換機(jī)(X),此時(shí)的交換機(jī)類型為:扇形交換機(jī)(Fanout Exchange)。
事實(shí)上,簡單模式和工作模式也都有自己的Exchange,只不過不用顯性的聲明,因?yàn)槟J(rèn)使用default Exchange。
即:一個(gè)發(fā)送到Exchange的消息都會被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上。
每一個(gè)消息能被多個(gè)消費(fèi)者都消費(fèi)。

Fanout Exchange消息路由規(guī)則如圖所示:

應(yīng)用場景:顧名思義,一個(gè)消息想被多個(gè)訂閱者消費(fèi)。
3.4 路由模式
如圖所示:相比發(fā)布訂閱模式,Exchange和Queue之間多了個(gè)路由關(guān)系,此時(shí)的交換機(jī)類型為:直連交換機(jī)(Direct Exchange)。
-
隊(duì)列和交換機(jī)不是任意綁定了,而是要指定一個(gè)
Routingkey。 -
生產(chǎn)者在向Exchange發(fā)送消息時(shí),也必須指定消息的
RoutingKey。 -
Exchange不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的
Routing Key進(jìn)行判斷,只有隊(duì)列的Routingkey與消息的Routing key完全一致,才會接收到消息。

Direct Exchange消息路由規(guī)則如圖所示:

3.5 通配符模式/主題模式
如圖所示:相比路由模式,Exchange和Queue之間不只是通過固定的RoutingKey進(jìn)行綁定,還支持通配符的方式,此時(shí)的交換機(jī)類型為:主題交換機(jī)/通配符交換機(jī)(Topic Exchange)。

Topic Exchange消息路由規(guī)則如圖所示:

3. 安裝RabbitMQ
version: '2'
services:
rabbitmq:
hostname: rabbitmq
image: rabbitmq:3.8.3-management
restart: always
environment:
# 默認(rèn)的用戶名
- RABBITMQ_DEFAULT_USER=admin
# 默認(rèn)的密碼
- RABBITMQ_DEFAULT_PASS=admin123
volumes:
- ./data:/var/lib/rabbitmq
- ./log:/var/log/rabbitmq/log
ports:
# rabbit ui 默認(rèn)端口
- "15672:15672"
# Epmd 是 Erlang Port Mapper Daemon 的縮寫,
# 在 Erlang 集群中相當(dāng)于 dns 的作用,綁定在4369端口上
- "4369:4369"
# rabbit 默認(rèn)的端口
- "5672:5672"
# 25672端口用于節(jié)點(diǎn)間和CLI工具通信(Erlang分發(fā)服務(wù)器端口),
# 并從動態(tài)范圍分配(默認(rèn)情況下僅限于單個(gè)端口,
# 計(jì)算方式為AMQP 0-9-1和AMQP 1.0端口+20000),
# 默認(rèn)情況下通過 RABBITMQ_NODE_PORT 計(jì)算是25672
- "25672:25672"
4. 各種模式的簡單實(shí)現(xiàn)
4.1 項(xiàng)目搭建
4.1.1 引入依賴
我們這里使用spring-boot-starter-amqp操作RabbitMQ。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.ldx</groupId>
<artifactId>rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
4.1.2 application.yaml
spring:
rabbitmq:
host: localhost
port: 5672
# rabbit 默認(rèn)的虛擬主機(jī)
virtual-host: /
# rabbitmq 安裝時(shí)指定的超管信息
username: admin
password: admin123
4.2 簡單模式
4.2.1 聲明一個(gè)簡單隊(duì)列
package com.ldx.rabbitmq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* rabbit 快速開始
*
* @author ludangxin
* @date 2021/8/23
*/
@Configuration
public class RabbitSimpleConfig {
/**
* 設(shè)置一個(gè)簡單的隊(duì)列
*/
@Bean
public Queue queue() {
return new Queue("helloMQ");
}
}
4.2.2 創(chuàng)建生產(chǎn)者
package com.ldx.rabbitmq.producer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 生產(chǎn)者
*
* @author ludangxin
* @date 2021/8/23
*/
@Component
public class SimpleProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send() {
String context = "helloMQ " + System.currentTimeMillis();
rabbitTemplate.convertAndSend("helloMQ", context);
}
}
4.2.3 創(chuàng)建消費(fèi)者
package com.ldx.rabbitmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消費(fèi)者
*
* @author ludangxin
* @date 2021/8/23
*/
@Slf4j
@Component
@RabbitListener(queues = {"helloMQ"})
public class SimpleConsumer {
@RabbitHandler
public void process(String hello) {
log.info("Message:{} ", hello);
}
}
4.2.4 創(chuàng)建測試類
package com.ldx.rabbitmq;
import com.ldx.rabbitmq.producer.SimpleProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RabbitMQTest {
@Autowired
private SimpleProducer simpleSender;
@Test
public void hello() throws Exception {
// 每秒發(fā)送一條消息
for (int i = 0; i < 10; i++) {
simpleSender.send();
Thread.sleep(1000);
}
}
}
4.2.5 啟動測試
啟動測試類,輸出內(nèi)容如下:
每秒消費(fèi)一條消息。
2021-09-08 23:58:01.837 INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer : Message:helloMQ 1631116681827
2021-09-08 23:58:02.839 INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer : Message:helloMQ 1631116682833
2021-09-08 23:58:03.842 INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer : Message:helloMQ 1631116683838
2021-09-08 23:58:04.852 INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer : Message:helloMQ 1631116684843
2021-09-08 23:58:05.853 INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer : Message:helloMQ 1631116685844
2021-09-08 23:58:06.853 INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer : Message:helloMQ 1631116686847
2021-09-08 23:58:07.857 INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer : Message:helloMQ 1631116687850
2021-09-08 23:58:08.863 INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer : Message:helloMQ 1631116688855
2021-09-08 23:58:09.868 INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer : Message:helloMQ 1631116689858
2021-09-08 23:58:10.870 INFO 29956 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer : Message:helloMQ 1631116690862
4.2.6 小節(jié)
簡單模式,顧名思義,很簡單,相當(dāng)于Hello World程序。我們在編寫的時(shí)候
- 指定了一個(gè)
Queue并且名稱為helloMQ。 - 消息生產(chǎn)者通過SpringBoot 提供的
RabbitTemplate發(fā)送消息,我們在發(fā)送時(shí)指定了Queue為helloMQ且發(fā)送了指定內(nèi)容。 - 消息消費(fèi)者通過
@RabbitListener注解監(jiān)聽了指定Queue為helloMQ,且使用@RabbitHandler注解指定消費(fèi)方法SimpleConsumer::process()。 - 最后編寫測試類循環(huán)調(diào)用生產(chǎn)者消息發(fā)送邏輯,實(shí)現(xiàn)了消息的生產(chǎn)與消費(fèi)。
4.3 工作模式
首先分析:其實(shí)工作模式和簡單模式相比,僅僅是由一個(gè)消費(fèi)者變成了多個(gè)消費(fèi)者。ok,很好辦,我們通過代碼再多加一個(gè)消費(fèi)者即可。
4.3.1 添加消費(fèi)者
package com.ldx.rabbitmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消費(fèi)者
*
* @author ludangxin
* @date 2021/8/23
*/
@Slf4j
@Component
@RabbitListener(queues = {"helloMQ"})
public class SimpleConsumer2 {
@RabbitHandler
public void process(String hello) {
log.info("Message2:{} ", hello);
}
}
4.3.2 啟動測試
我們再次執(zhí)行test方法,查看消息消費(fèi)情況。
輸出日志如下:
SimpleConsumer 和 SimpleConsumer2交替消費(fèi)隊(duì)列中的消息(消費(fèi)者之間消費(fèi)消息是通過輪詢的關(guān)系)。
2021-09-09 20:24:35.043 INFO 41927 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer : Message:helloMQ 1631190275019
2021-09-09 20:24:36.038 INFO 41927 --- [ntContainer#1-1] c.ldx.rabbitmq.consumer.SimpleConsumer2 : Message2:helloMQ 1631190276029
2021-09-09 20:24:37.036 INFO 41927 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer : Message:helloMQ 1631190277032
2021-09-09 20:24:38.046 INFO 41927 --- [ntContainer#1-1] c.ldx.rabbitmq.consumer.SimpleConsumer2 : Message2:helloMQ 1631190278036
2021-09-09 20:24:39.049 INFO 41927 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer : Message:helloMQ 1631190279041
2021-09-09 20:24:40.049 INFO 41927 --- [ntContainer#1-1] c.ldx.rabbitmq.consumer.SimpleConsumer2 : Message2:helloMQ 1631190280042
2021-09-09 20:24:41.054 INFO 41927 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer : Message:helloMQ 1631190281047
2021-09-09 20:24:42.060 INFO 41927 --- [ntContainer#1-1] c.ldx.rabbitmq.consumer.SimpleConsumer2 : Message2:helloMQ 1631190282051
2021-09-09 20:24:43.062 INFO 41927 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.SimpleConsumer : Message:helloMQ 1631190283055
2021-09-09 20:24:44.074 INFO 41927 --- [ntContainer#1-1] c.ldx.rabbitmq.consumer.SimpleConsumer2 : Message2:helloMQ 1631190284057
4.3.3 小節(jié)
在一個(gè)隊(duì)列中如果有多個(gè)消費(fèi)者,那么消費(fèi)者之間是輪詢的關(guān)系。
4.4 發(fā)布訂閱模式
首先分析:發(fā)布訂閱模式其實(shí)是將消息先發(fā)送給扇形交換機(jī),交換機(jī)再將消息轉(zhuǎn)發(fā)給其綁定到此交換機(jī)的隊(duì)列上。
這里,我們聲明一個(gè)交換機(jī),給交換機(jī)綁定兩個(gè)隊(duì)列,并且使用兩個(gè)消費(fèi)者分別綁定到兩個(gè)隊(duì)列上(其實(shí)就是為了和3.3保持一致)。
4.4.1 聲明交換機(jī)和隊(duì)列
package com.ldx.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 扇形交換機(jī)配置
*
* @author ludangxin
* @date 2021/9/9
*/
@Configuration
public class RabbitFanoutConfig {
public static final String EXCHANGE_NAME = "FANOUT_EXCHANGE";
public static final String QUEUE_NAME = "FANOUT_QUEUE";
public static final String QUEUE_NAME_1 = "FANOUT_QUEUE_1";
/**
* 1.交換機(jī)
*/
@Bean(EXCHANGE_NAME)
public Exchange fanoutExchange() {
return ExchangeBuilder.fanoutExchange(EXCHANGE_NAME).durable(true).build();
}
/**
* 2.Queue 隊(duì)列
*/
@Bean(QUEUE_NAME)
public Queue fanoutQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
/**
* 2.1 Queue 隊(duì)列
*/
@Bean(QUEUE_NAME_1)
public Queue fanoutQueue1() {
return QueueBuilder.durable(QUEUE_NAME_1).build();
}
/**
* 3. 隊(duì)列和交互機(jī)綁定關(guān)系 Binding
*/
@Bean
public Binding bindFanoutExchange(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
// fanout :routing key 默認(rèn)為 "",指定了別的值也沒用
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
/**
* 3.1 隊(duì)列和交互機(jī)綁定關(guān)系 Binding
*/
@Bean
public Binding bindFanoutExchange1(@Qualifier(QUEUE_NAME_1) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
// fanout :routing key 默認(rèn)為 "",指定了別的值也沒用,我們這里隨便寫個(gè)值,看會不會有影響
return BindingBuilder.bind(queue).to(exchange).with("aaabbb").noargs();
}
}
4.4.2 創(chuàng)建生產(chǎn)者
package com.ldx.rabbitmq.producer;
import com.ldx.rabbitmq.config.RabbitFanoutConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 扇形交換機(jī)消息生產(chǎn)者
*
* @author ludangxin
* @date 2021/9/9
*/
@Component
public class FanoutProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendWithFanout() {
rabbitTemplate.convertAndSend(RabbitFanoutConfig.EXCHANGE_NAME, "", "fanout mq hello~~~");
// 指定一個(gè)routingKey 看消費(fèi)方能不能正常接收消息
rabbitTemplate.convertAndSend(RabbitFanoutConfig.EXCHANGE_NAME, "abc", "fanout2 mq hello~~~");
}
}
4.4.3 創(chuàng)建消費(fèi)者
package com.ldx.rabbitmq.consumer;
import com.ldx.rabbitmq.config.RabbitFanoutConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 扇形交換機(jī)消息消費(fèi)者
*
* @author ludangxin
* @date 2021/9/9
*/
@Slf4j
@Component
public class FanoutConsumer {
@RabbitListener(queues = {RabbitFanoutConfig.QUEUE_NAME})
public void process(String message){
log.info("queue === " + message);
}
@RabbitListener(queues = {RabbitFanoutConfig.QUEUE_NAME_1})
public void process1(String message){
log.info("queue1 === " + message);
}
}
4.4.4 創(chuàng)建測試代碼
@Autowired
private FanoutProducer producer;
@Test
@SneakyThrows
public void sendWithFanout(){
producer.sendWithFanout();
// 為了阻塞進(jìn)程,使消費(fèi)者能正常消費(fèi)。
System.in.read();
}
4.4.5 啟動測試
執(zhí)行測試方法,輸出內(nèi)容如下:
生產(chǎn)者發(fā)送的兩條消息,被兩個(gè)消費(fèi)者共同消費(fèi)了。實(shí)現(xiàn)了消息的廣播。
2021-09-09 21:59:17.538 INFO 43749 --- [ntContainer#1-1] c.ldx.rabbitmq.consumer.FanoutConsumer : queue1 === fanout mq hello~~~
2021-09-09 21:59:17.538 INFO 43749 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.FanoutConsumer : queue === fanout mq hello~~~
2021-09-09 21:59:17.539 INFO 43749 --- [ntContainer#1-1] c.ldx.rabbitmq.consumer.FanoutConsumer : queue1 === fanout2 mq hello~~~
2021-09-09 21:59:17.539 INFO 43749 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.FanoutConsumer : queue === fanout2 mq hello~~~
4.4.6 小節(jié)
本節(jié)代碼中我們創(chuàng)建了一個(gè)fanout Exchange,并且創(chuàng)建了兩個(gè)隊(duì)列與其綁定,其中一個(gè)隊(duì)列進(jìn)行綁定的時(shí)候還指定了routing key,但程序執(zhí)行時(shí)消息正常被消費(fèi),說明fanout Exchange不用指定routing key。
發(fā)布訂閱模式與工作隊(duì)列模式的區(qū)別
1、工作隊(duì)列模式不用定義交換機(jī),而發(fā)布/訂閱模式需要定義交換機(jī)。
2、發(fā)布/訂閱模式的生產(chǎn)方是面向交換機(jī)發(fā)送消息,工作隊(duì)列模式的生產(chǎn)方是面向隊(duì)列發(fā)送消息(底層使用默認(rèn)交換機(jī))。
3、發(fā)布/訂閱模式需要設(shè)置隊(duì)列和交換機(jī)的綁定,工作隊(duì)列模式不需要設(shè)置,實(shí)際上工作隊(duì)列模式會將隊(duì)列綁定到默認(rèn)的交換機(jī) 。
4.5 路由模式
首先分析:路由模式其實(shí)就是將 發(fā)布訂閱模式中的 fanout Exchange 換成了 direct Exchange 從而指定相應(yīng)的路由規(guī)則即可。
4.5.1 聲明交換機(jī)和隊(duì)列
package com.ldx.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 直連交換機(jī)配置
*
* @author ludangxin
* @date 2021/9/9
*/
@Configuration
public class RabbitDirectConfig {
public static final String EXCHANGE_NAME = "DIRECT_EXCHANGE";
public static final String QUEUE_NAME_INSERT = "DIRECT_QUEUE_INSERT";
public static final String QUEUE_NAME_UPDATE = "DIRECT_QUEUE_UPDATE";
/**
* 1.交換機(jī)
*/
@Bean(EXCHANGE_NAME)
public Exchange bootExchange() {
return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();
}
/**
* 2.Queue insert隊(duì)列
*/
@Bean(QUEUE_NAME_INSERT)
public Queue bootQueueInsert() {
return QueueBuilder.durable(QUEUE_NAME_INSERT).build();
}
/**
* 2.Queue update隊(duì)列
*/
@Bean(QUEUE_NAME_UPDATE)
public Queue bootQueueUpdate() {
return QueueBuilder.durable(QUEUE_NAME_UPDATE).build();
}
/**
* 3. 綁定insert 隊(duì)列
* 3. routing key: insert
*/
@Bean
public Binding bindInsertDirectExchange(@Qualifier(QUEUE_NAME_INSERT) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("insert").noargs();
}
/**
* 3. 綁定update 隊(duì)列
* 3. routing key: update
*/
@Bean
public Binding bindUpdateDirectExchange(@Qualifier(QUEUE_NAME_UPDATE) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("update").noargs();
}
}
4.5.2 創(chuàng)建生產(chǎn)者
package com.ldx.rabbitmq.producer;
import com.ldx.rabbitmq.config.RabbitDirectConfig;
import com.ldx.rabbitmq.config.RabbitFanoutConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 直連交換機(jī)消息生產(chǎn)者
*
* @author ludangxin
* @date 2021/9/9
*/
@Component
public class DirectProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendWithDirect() {
rabbitTemplate.convertAndSend(RabbitDirectConfig.EXCHANGE_NAME, "insert", "diect insert mq hello~~~");
rabbitTemplate.convertAndSend(RabbitDirectConfig.EXCHANGE_NAME, "update", "diect update mq hello~~~");
// 指定一個(gè)沒有配置的routingKey 看消費(fèi)方能不能接收消息
rabbitTemplate.convertAndSend(RabbitDirectConfig.EXCHANGE_NAME, "delete", "diect update mq hello~~~");
}
}
4.5.3 創(chuàng)建消息者
package com.ldx.rabbitmq.consumer;
import com.ldx.rabbitmq.config.RabbitDirectConfig;
import com.ldx.rabbitmq.config.RabbitFanoutConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 直連交換機(jī)消息消費(fèi)者
*
* @author ludangxin
* @date 2021/9/9
*/
@Slf4j
@Component
public class DirectConsumer {
/**
* @param message message 為springboot 封裝的消息存儲的實(shí)例對象,其對象中不僅封裝了生產(chǎn)者發(fā)送的消息
* 而且也封裝了很多消息的元數(shù)據(jù),例如:headers contentType receivedRoutingKey ...
*/
@RabbitListener(queues = {RabbitDirectConfig.QUEUE_NAME_INSERT, RabbitDirectConfig.QUEUE_NAME_UPDATE})
public void directQueue(Message message){
log.info(message.toString());
log.info(new String(message.getBody()));
}
}
4.5.4 創(chuàng)建測試代碼
@Autowired
private DirectProducer directProducer;
@Test
@SneakyThrows
public void sendWithDirect() {
directProducer.sendWithDirect();
System.in.read();
}
4.5.5 啟動測試
執(zhí)行測試代碼,輸出內(nèi)容如下:
insert 和 update 對應(yīng)的消息都被正常消費(fèi),其中值得注意的是指定routing key=delete的消息丟失了,因?yàn)殛?duì)列與交換機(jī)綁定時(shí)根本沒有此routing key,而交換機(jī)之所以叫交換機(jī),因?yàn)槠洳淮鎯ο?,只是轉(zhuǎn)發(fā)消息,其沒有持久化消息的能力,所以消息還沒有到queue,然后嗝屁。
2021-09-09 22:38:49.451 INFO 44436 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.DirectConsumer : (Body:'diect insert mq hello~~~' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=DIRECT_EXCHANGE, receivedRoutingKey=insert, deliveryTag=1, consumerTag=amq.ctag-WJmYhQDljkKkM1pFeW99Yg, consumerQueue=DIRECT_QUEUE_INSERT])
2021-09-09 22:38:49.451 INFO 44436 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.DirectConsumer : diect insert mq hello~~~
2021-09-09 22:38:49.452 INFO 44436 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.DirectConsumer : (Body:'diect update mq hello~~~' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=DIRECT_EXCHANGE, receivedRoutingKey=update, deliveryTag=2, consumerTag=amq.ctag-guzpfaF0BdII70w85ywiCg, consumerQueue=DIRECT_QUEUE_UPDATE])
2021-09-09 22:38:49.452 INFO 44436 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.DirectConsumer : diect update mq hello~~~
4.5.6 小節(jié)
路由模式特點(diǎn):
- 隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個(gè)
rutingKey(路由key)。 - 消息的發(fā)送方在 向 Exchange發(fā)送消息時(shí),也必須指定消息的
routingKey。 - Exchange不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的
routing Key進(jìn)行判斷,只有隊(duì)列的routingkey與消息的routing key完全一致,才會接收到消息。
4.6 主題模式
首先分析:通配符模式其實(shí)就是將 路由模式中的 direct Exchange 換成了 topic Exchange, 使其不僅可以將exchange和queue以routing key全匹配的方式進(jìn)行綁定,而且還支持通配符。
routingkey 一般都是有一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以”.”分割,例如: item.insert
通配符規(guī)則:
#:匹配一個(gè)或多個(gè)單詞
*:匹配一個(gè)單詞
舉例:
item.#:能夠匹配item.insert.abc 或者 item.insert
item.*:只能匹配item.insert

圖解:
- 紅色Queue:綁定的是
usa.#,因此凡是以usa.開頭的routing key都會被匹配到 - 黃色Queue:綁定的是
#.news,因此凡是以.news結(jié)尾的routing key都會被匹配
4.6.1 聲明交換機(jī)和隊(duì)列
package com.ldx.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 主題交換機(jī)配置
*
* @author ludangxin
* @date 2021/9/9
*/
@Configuration
public class RabbitTopicConfig {
public static final String EXCHANGE_NAME = "TOPIC_EXCHANGE";
public static final String QUEUE_NAME1 = "TOPIC_QUEUE1";
public static final String QUEUE_NAME2 = "TOPIC_QUEUE2";
/**
* 1.交換機(jī)
* topicExchange:通配符,把消息交給符合routing pattern(路由模式) 的隊(duì)列
*/
@Bean(EXCHANGE_NAME)
public Exchange bootExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
/**
* 2.Queue 隊(duì)列
*/
@Bean(QUEUE_NAME1)
public Queue bootQueue1() {
return QueueBuilder.durable(QUEUE_NAME1).build();
}
/**
* 2.Queue 隊(duì)列
*/
@Bean(QUEUE_NAME2)
public Queue bootQueue2() {
return QueueBuilder.durable(QUEUE_NAME2).build();
}
/**
* 3. 隊(duì)列和交互機(jī)綁定關(guān)系 Binding
* 匹配 routing key 以 insert 開頭的 如 insert.user ; insert.user.log
*/
@Bean
public Binding bindTopicExchange1(@Qualifier(QUEUE_NAME1) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("insert.#").noargs();
}
/**
* 3. 隊(duì)列和交互機(jī)綁定關(guān)系 Binding
* routing key 中的 * 只能匹配單個(gè)單詞
* 匹配 routing key 以 update 開頭的 如 update.user
* 不能匹配 如 update.user.log
*/
@Bean
public Binding bindTopicExchange2(@Qualifier(QUEUE_NAME1) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("update.*").noargs();
}
/**
* 3. 隊(duì)列和交互機(jī)綁定關(guān)系 Binding
* routing key 中的 * 只能匹配單個(gè)單詞
* 匹配 routing key 以 . 分割的
* 不能匹配 如 update.user.log
*/
@Bean
public Binding bindTopicExchange3(@Qualifier(QUEUE_NAME2) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("*.*").noargs();
}
}
4.6.2 創(chuàng)建生產(chǎn)者
package com.ldx.rabbitmq.producer;
import com.ldx.rabbitmq.config.RabbitTopicConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 主題交換機(jī)消息生產(chǎn)者
*
* @author ludangxin
* @date 2021/9/9
*/
@Component
public class TopicProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendWithTopic() {
rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE_NAME, "insert.user.log", "topic mq hello~~~ routing is insert.user.lo");
rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE_NAME, "update.user", "topic mq hello~~~ routing is update.user");
rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE_NAME, "update.user.log", "topic mq hello~~~ routing is update.user.log");
rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE_NAME, "delete.user", "topic mq hello~~~ routing is delete.user");
rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE_NAME, "delete.user.log", "topic mq hello~~~routing is delete.user.log");
}
}
4.6.3 創(chuàng)建消費(fèi)者
package com.ldx.rabbitmq.consumer;
import com.ldx.rabbitmq.config.RabbitTopicConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 主題交換機(jī)消息消費(fèi)者
*
* @author ludangxin
* @date 2021/9/9
*/
@Slf4j
@Component
public class TopicConsumer {
@RabbitListener(queues = {RabbitTopicConfig.QUEUE_NAME1, RabbitTopicConfig.QUEUE_NAME2})
public void topicQueue(Message message){
log.info(message.toString());
log.info(new String(message.getBody()));
}
}
4.6.4 創(chuàng)建測試代碼
@Autowired
private TopicProducer topicProducer;
@Test
@SneakyThrows
public void sendWithTopic() {
topicProducer.sendWithTopic();
System.in.read();
}
4.6.5 啟動測試
執(zhí)行測試代碼,輸入內(nèi)容如下:
其中符合通配符條件的消息均已消費(fèi)。
2021-09-09 23:02:57.131 INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer : (Body:'topic mq hello~~~ routing is insert.user.lo' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=TOPIC_EXCHANGE, receivedRoutingKey=insert.user.log, deliveryTag=1, consumerTag=amq.ctag-PeBOPjJFHMF3BMW1zDXCvw, consumerQueue=TOPIC_QUEUE1])
2021-09-09 23:02:57.132 INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer : topic mq hello~~~ routing is insert.user.lo
2021-09-09 23:02:57.132 INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer : (Body:'topic mq hello~~~ routing is update.user' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=TOPIC_EXCHANGE, receivedRoutingKey=update.user, deliveryTag=2, consumerTag=amq.ctag-PeBOPjJFHMF3BMW1zDXCvw, consumerQueue=TOPIC_QUEUE1])
2021-09-09 23:02:57.132 INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer : topic mq hello~~~ routing is update.user
2021-09-09 23:02:57.133 INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer : (Body:'topic mq hello~~~ routing is update.user' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=TOPIC_EXCHANGE, receivedRoutingKey=update.user, deliveryTag=3, consumerTag=amq.ctag-ocNDmCGDF8-aPxJ4lK1c8g, consumerQueue=TOPIC_QUEUE2])
2021-09-09 23:02:57.133 INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer : topic mq hello~~~ routing is update.user
2021-09-09 23:02:57.133 INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer : (Body:'topic mq hello~~~ routing is delete.user' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=TOPIC_EXCHANGE, receivedRoutingKey=delete.user, deliveryTag=4, consumerTag=amq.ctag-ocNDmCGDF8-aPxJ4lK1c8g, consumerQueue=TOPIC_QUEUE2])
2021-09-09 23:02:57.133 INFO 44812 --- [ntContainer#5-1] com.ldx.rabbitmq.consumer.TopicConsumer : topic mq hello~~~ routing is delete.user
4.6.6 小節(jié)
Topic主題模式可以實(shí)現(xiàn) Publish/Subscribe發(fā)布與訂閱模式 和 Routing路由模式 的功能;只是Topic在配置routing key 的時(shí)候可以使用通配符,顯得更加靈活。
5. 模式總結(jié)
RabbitMQ工作模式:
1、簡單模式 HelloWorld
一個(gè)生產(chǎn)者、一個(gè)消費(fèi)者,不需要設(shè)置交換機(jī)(使用default Exchange)。
2、工作隊(duì)列模式 Work Queue
一個(gè)生產(chǎn)者、多個(gè)消費(fèi)者(平均分配消息),不需要設(shè)置交換機(jī)(使用default Exchange)。
3、發(fā)布訂閱模式 Publish/subscribe
需要設(shè)置交換機(jī)類型為fanout Exchange,并且交換機(jī)和隊(duì)列進(jìn)行綁定,當(dāng)發(fā)送消息到交換機(jī)后,交換機(jī)會將消息發(fā)送到綁定的隊(duì)列。
4、路由模式 Routing
需要設(shè)置交換機(jī)類型為direct Exchange,交換機(jī)和隊(duì)列進(jìn)行綁定,并且指定routing key,發(fā)送消息時(shí)也要指定對應(yīng)的routing key到交換機(jī),交換機(jī)會根據(jù)routing key將消息發(fā)送到對應(yīng)的隊(duì)列。
5、主題模式 Topic
需要設(shè)置交換機(jī)類型為topic Exchange,,交換機(jī)和隊(duì)列進(jìn)行綁定,并且指定通配符方式的routing key,發(fā)送消息時(shí)指定routing key到交換機(jī)后,交換機(jī)會根據(jù)routing key規(guī)則將消息發(fā)送到對應(yīng)的隊(duì)列。主題模式比上面四類更靈活。

浙公網(wǎng)安備 33010602011771號