RabbitMQ簡(jiǎn)單入門
服務(wù)端安裝及配置
docker安裝
使用docker安裝RabbitMQ,注意,要選擇tag包含management的鏡像(包含web端管理插件)
docker pull rabbitmq:3.7.7-management
docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xxx rabbitmq:3.7.7-management
設(shè)置初始賬號(hào)和密碼,web頁(yè)面地址如下
http://ip:15672
客戶端訪問(wèn)
添加依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
發(fā)布消息
public class TestRabbitClient {
public static void main(String[] args) throws IOException, TimeoutException {
String host = "";
String queueName = "hello_queue";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setUsername("");
factory.setPassword("");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//隊(duì)列名稱 是否持久化到硬盤(pán) 是否消息共享(多個(gè)消費(fèi)者消費(fèi)) 是否自動(dòng)刪除
channel.queueDeclare(queueName, false, false, false, null);
String message = "Hello World!";
//發(fā)布消息 使用的默認(rèn)交換機(jī)
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
消費(fèi)消息
channel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("receive message: " + new String(message.getBody()));
}
}, (x) -> {
System.out.println("消息被中斷");
});
注意:channel被關(guān)閉之后 就不能接收到消息了
交換機(jī)
交換機(jī)類型
- Direct (直連交換機(jī)): 最常使用,會(huì)根據(jù)routingkey進(jìn)行精準(zhǔn)匹配。直連交換機(jī)可以分發(fā)任務(wù)給多個(gè)工作者(worker)
- Fanout (扇形交換機(jī)): 將消費(fèi)分發(fā)給所有綁定的隊(duì)列,而不會(huì)理會(huì)routingkey。優(yōu)點(diǎn)是轉(zhuǎn)發(fā)消息最快,性能最好。一般會(huì)用來(lái)處理廣播消息(broadcast routing)。
- Topic(主題交換機(jī)): 根據(jù)routingkey進(jìn)行模糊匹配,將消息分發(fā)給一個(gè)或多個(gè)隊(duì)列(delimited by dots)。 routingkey可以有通配符 *,#。* 表示匹配一個(gè)單詞,# 匹配0個(gè)或多個(gè)單詞。當(dāng)綁定鍵為#時(shí),表示接收所有消息,和Fanout交換機(jī)類似了,當(dāng)綁定鍵不包含*和#時(shí),和Direct交換機(jī)功能類似了。
- Headers (頭交換機(jī)): 類似于直連交換機(jī)。不同點(diǎn)在于頭交換機(jī)的路由規(guī)則建立在頭屬性之上而不是路由鍵,一般開(kāi)發(fā)使用較少。
使用Fanout交換機(jī)實(shí)現(xiàn)廣播
@Configuration
public class RabbitConfig {
@Bean
public FanoutExchange testExchange2() {
return new FanoutExchange("test_exchange2");
}
}
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(), //注意這里不要定義隊(duì)列名稱,系統(tǒng)會(huì)隨機(jī)生成 spring.gen-4klrfbJ0TfmrGQnCpyPAQg這種格式
exchange = @Exchange(value = "test_exchange2", type = ExchangeTypes.FANOUT))
)
public class RabbitMqReceiver {
/**
* 測(cè)試消息接收
*/
@RabbitHandler
public void process(String context, Message message, Channel channel) {
}
}
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(), //注意這里不要定義隊(duì)列名稱,系統(tǒng)會(huì)隨機(jī)生成 spring.gen-4klrfbJ0TfmrGQnCpyPAQg這種格式
exchange = @Exchange(value = "test_exchange2", type = ExchangeTypes.FANOUT))
)
public class RabbitMqReceiver {
/**
* 測(cè)試消息接收
*/
@RabbitHandler
public void process(String context, Message message, Channel channel) {
}
}
通過(guò)Spring來(lái)創(chuàng)建動(dòng)態(tài)隊(duì)列,動(dòng)態(tài)綁定關(guān)系。如果我們不指定隊(duì)列名稱,Spring 會(huì)創(chuàng)建非持久化、排他、自動(dòng)刪除的隊(duì)列,具體邏輯可以看 RabbitListenerAnnotationBeanPostProcessor 的 declareQueue 方法。
死信隊(duì)列
當(dāng)一條消息在隊(duì)列中出現(xiàn)以下三種情況的時(shí)候,該消息就會(huì)變成一條死信。
- 消息被拒絕(basic.reject / basic.nack),并且requeue = false
- 消息TTL過(guò)期(通過(guò)x-message-ttl屬性設(shè)置)
- 隊(duì)列達(dá)到最大長(zhǎng)度(通過(guò)x-max-length屬性設(shè)置)
當(dāng)消息在一個(gè)隊(duì)列中變成一個(gè)死信之后,如果配置了死信隊(duì)列,它將被重新publish到死信交換機(jī),死信交換機(jī)將死信投遞到一個(gè)隊(duì)列上,這個(gè)隊(duì)列就是死信隊(duì)列。
//聲明訂單取消正常隊(duì)列 當(dāng)消息過(guò)期時(shí),將消息發(fā)布到死信隊(duì)列
Map<String, Object> params = Map
.of("x-dead-letter-exchange", "order_cancel_delay_queue_exchange",
"x-dead-letter-routing-key", "order_cancel_delay_queue_key",
"x-message-ttl", 20 * 1000);//消息過(guò)期時(shí)間 毫秒 在這里設(shè)置不靈活(不能改),可以在消息發(fā)布時(shí)設(shè)置
channel.queueDeclare("order_cancel_queue", true, false, false, params);
channel.exchangeDeclare("order_cancel_queue_exchange", BuiltinExchangeType.DIRECT, true);
channel
.queueBind("order_cancel_queue", "order_cancel_queue_exchange", "order_cancel_queue_key");
在發(fā)布消息時(shí)指定過(guò)期時(shí)間
//聲明訂單取消正常隊(duì)列 當(dāng)消息過(guò)期時(shí),將消息發(fā)布到死信隊(duì)列
Map<String, Object> params = Map
.of("x-dead-letter-exchange", "order_cancel_delay_queue_exchange",
"x-dead-letter-routing-key", "order_cancel_delay_queue_key");//消息過(guò)期時(shí)間 毫秒
channel.queueDeclare("order_cancel_queue", true, false, false, params);
channel.exchangeDeclare("order_cancel_queue_exchange", BuiltinExchangeType.DIRECT, true);
channel
.queueBind("order_cancel_queue", "order_cancel_queue_exchange", "order_cancel_queue_key");
BasicProperties basicProperties = new Builder()
.expiration("15000")//消息過(guò)期時(shí)間 15s
.build();
//向正常隊(duì)列發(fā)布消息
channel.basicPublish("order_cancel_queue_exchange", "order_cancel_queue_key", basicProperties,
"hello".getBytes());
延時(shí)隊(duì)列
- 使用死信隊(duì)列的消息過(guò)期方式來(lái)實(shí)現(xiàn)
有一個(gè)問(wèn)題,如果一個(gè)隊(duì)列配置了死信隊(duì)列,在發(fā)布消息時(shí)指定過(guò)期時(shí)間,第一條20S,第二條5S,那么第二條也會(huì)延遲到20S后執(zhí)行,因?yàn)閞abbitmq只會(huì)檢查第一個(gè)消息是否過(guò)期。 - 使用rabbitmq的延遲插件,創(chuàng)建的交換機(jī)類型為x-delayed-message,并設(shè)置x-delayed-type屬性為direct,這種方式也沒(méi)有第一種方式的問(wèn)題。
public Exchange demo08Exchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(Demo01Message.NEW_DELAY_EXCHANGE, "x-delayed-message", true, false, args);
}
//消息發(fā)布
rabbitTemplate.convertAndSend(Demo01Message.NEW_DELAY_EXCHANGE, Demo01Message.ROUTING_KEY, message, msg -> {
msg.getMessageProperties().setDelay(20 * 1000);//延遲20S
return msg;
});
整合Spring
- @EnableRabbit注解導(dǎo)入的RabbitListenerAnnotationBeanPostProcessor來(lái)處理@RabbitListener注解和@RabbitHandler注解
- 將消息處理器包裝成MultiMethodRabbitListenerEndpoint,注冊(cè)到RabbitListenerEndpointRegistrar對(duì)象中
- RabbitListenerEndpointRegistrar對(duì)象只是一個(gè)工具類,最終是注冊(cè)到RabbitListenerEndpointRegistry對(duì)象中,它也是通過(guò)@EnableRabbit注解導(dǎo)入的
- RabbitListenerEndpointRegistry將RabbitListenerEndpoint對(duì)象包裝成MessageListenerContainer對(duì)象,它其中包含MessageListener對(duì)象(包裝了消息處理器)
- 在MessageListenerContainer的start()方法過(guò)程中,調(diào)用checkMismatchedQueues()方法
- 通過(guò)調(diào)用CachingConnectionFactory創(chuàng)建connection,在這個(gè)過(guò)程中調(diào)用CompositeConnectionListener的onCreate()方法
- ConnectionListener是通過(guò)RabbitAdmin對(duì)象添加到CachingConnectionFactory中的
- 通過(guò)RabbitAdmin對(duì)象聲明Queue,Exchange,Binding
- 繼續(xù)MessageListenerContainer的start()方法,將消息處理器包裝成AsyncMessageProcessingConsumer對(duì)象,它是一個(gè)Runnable,交給線程池處理,可以通過(guò) concurrentConsumers 屬性來(lái)控制消費(fèi)者數(shù)量從而并發(fā)消費(fèi)
- AsyncMessageProcessingConsumer中包含一個(gè)BlockingQueueConsumer對(duì)象,在它的start()方法中會(huì)注冊(cè)消息處理的回調(diào),具體類為InternalConsumer
- 有消息到來(lái)時(shí),InternalConsumer向BlockingQueueConsumer的queue中加入一條消息(推拉結(jié)合)
- AsyncMessageProcessingConsumer一直在輪訓(xùn)獲取BlockingQueueConsumer的queue,最大等待時(shí)間1秒。
- 獲取到消息,交給最終的消息處理器處理(就是我們標(biāo)記@RabbitListener注解和@RabbitHandler注解的方法)
關(guān)于ConfirmCallback和ReturnCallback的使用
- ConfirmCallback : 注意:只能確認(rèn)消息是否能到達(dá) Exchange
- ReturnCallback : 注意 它是當(dāng)交換機(jī)路由不到 隊(duì)列的時(shí)候 它才會(huì)被觸發(fā)
如何保證消息不被重復(fù)消費(fèi)
每個(gè)消息添加一個(gè)Id,消費(fèi)時(shí)判斷是否已經(jīng)消費(fèi)過(guò)
//消息發(fā)送方
Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8))
.setContentType(
MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setContentEncoding(StandardCharsets.UTF_8.name())
.setMessageId(UUID.randomUUID().toString()).build();
rabbitTemplate.convertAndSend("test_exchange1", "test_ronting_key1", message);
//消息消費(fèi)方
@RabbitHandler
public void process(String context, Message message, Channel channel) {
System.out.println("接收到消息: " + context);
System.out.println(message.getMessageProperties().getMessageId());
// 不管成功失敗,向原隊(duì)列確認(rèn)消費(fèi)
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
log.error("原隊(duì)列確認(rèn)消費(fèi)失敗:" + e.getLocalizedMessage());
}
}
參數(shù)中context的內(nèi)容類似new String(message.getBody(), StandardCharsets.UTF_8);,源碼位于 SimpleMessageConverter 的 fromMessage() 方法的100行
如何保證消息不丟失
- 生產(chǎn)端,使用confirm機(jī)制,發(fā)送失敗重試
- RabbitMQ服務(wù)端,隊(duì)列持久化,消息內(nèi)容持久化
- 消費(fèi)端,手動(dòng)確認(rèn)機(jī)制
集群部署
-
普通集群模式(無(wú)高可用性): 默認(rèn)模式,以兩個(gè)節(jié)點(diǎn)(rabbit01、rabbit02)為例來(lái)進(jìn)行說(shuō)明。對(duì)于Queue來(lái)說(shuō),消息實(shí)體只存在于其中一個(gè)節(jié)點(diǎn)rabbit01(或者rabbit02),rabbit01和rabbit02兩個(gè)節(jié)點(diǎn)僅有相同的元數(shù)據(jù),即隊(duì)列的結(jié)構(gòu)。當(dāng)消息進(jìn)入rabbit01節(jié)點(diǎn)的Queue后,consumer從rabbit02節(jié)點(diǎn)消費(fèi)時(shí),RabbitMQ會(huì)臨時(shí)在rabbit01、rabbit02間進(jìn)行消息傳輸,把A中的消息實(shí)體取出并經(jīng)過(guò)B發(fā)送給consumer。
-
鏡像集群模式(高可用性): 最常用的集群模式,把需要的隊(duì)列做成鏡像隊(duì)列,存在于多個(gè)節(jié)點(diǎn),屬于RabbitMQ的HA方案。該模式解決了普通模式中的問(wèn)題,其實(shí)質(zhì)和普通模式不同之處在于,消息實(shí)體會(huì)主動(dòng)在鏡像節(jié)點(diǎn)間同步,而不是在客戶端取數(shù)據(jù)時(shí)臨時(shí)拉取。
當(dāng)然,負(fù)載均衡還是需要我們自己做的。
參考
RabbitMQ 多實(shí)例 廣播消息_松月的博客-程序員宅基地_rabbitmq 廣播消息
RabbitMQ集群鏡像模式部署
消息冪等

浙公網(wǎng)安備 33010602011771號(hào)