rabbitMq
1,新建exchange 新建queue 在exchange中綁定queue并設置routing_key 2,消息發送,發送到exchange,routingKey 不同的queue會存放自己綁定routingkey的消息 消息監聽,監聽queue 3,各個消費者監聽自己的queue, 每個queue可以存放同樣的消息,就是廣播模式了 4, exchange類型有: direct直連類型 在exchange中queue需要一一逐個綁定routingKey test.aa test.bb topic主題類型 queue只需要模糊匹配綁定就行 test.#
1,多個服務器實例,多個消費者,每個消費者又可以多線程,并發消費同一隊列 2,多個監聽消費者,不同隊列,同一條數據每個消費者都會消費到,廣播 (消費發送到exchange交換機、routingkey上,不同的queue會存放自己綁定routingkey的消息) fanout 可由direct替代,它是一種廣播式的交換器(散開分列),表示分發,所有的消費者得到同樣的隊列信息,發布/訂閱,會將接收到的消息發送給所有綁定到該交換器的隊列, 而不考慮路由鍵(routing key)。這意味著無論消息發布時指定了什么路由鍵,Fanout 交換器都會忽略這些信息,并簡單地將消息復制并分發給所有的綁定隊列。 direct可以建多個queue監聽同一個routingkey來實現分發 可來替代fanout
concurrency 多線程消費
Purge 隊列清理
@Autowired
private RabbitTemplate rabbitTemplate;
public String mq(String name) {
log.info("消息生產:{}", name);
rabbitTemplate.convertAndSend("queuexmh", name);
/**
* one_q one_x 映射到 queue_x1, two_q映射到 queue_x2
*/
rabbitTemplate.convertAndSend("exchange_direct_xmh","one_q", "queue_x1"+name);
rabbitTemplate.convertAndSend("exchange_direct_xmh","one_x", "queue_x1"+name);
rabbitTemplate.convertAndSend("exchange_direct_xmh","two_q", "queue_x2"+name);
/**
* one.t1 映射到 queue_x1, two.t1 two.t2 映射到 queue_x2
* 后臺做了關聯 two.# 映射到 queue_x2, one.# 映射到 queue_x1
*/
rabbitTemplate.convertAndSend("exchange_topic_xmh","two.t1", "queue_x2"+name);
rabbitTemplate.convertAndSend("exchange_topic_xmh","two.t2", "queue_x2"+name);
rabbitTemplate.convertAndSend("exchange_topic_xmh","one.t1", "queue_x1"+name);
return "hello"+name;
}
@Component
public class RabbitConsumer {
//指定queue消費
@RabbitListener(queues = "queue_x1",concurrency = "1")
public void queueX1(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
try {
System.out.println("queue_x1收到消息:" + message);
//設置為手動確認時生效 手動確認消息(第二個參數false表示不批量確認)
//channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 處理失敗時拒絕消息(requeue=false表示不重回隊列)
channel.basicNack(deliveryTag, false, false);
}
}
//指定queue消費
@RabbitListener(queues = "queue_x2",concurrency = "1")
public void queueX2(Message message) {
System.out.println("queue_x2收到消息:" + message);
}
//指定queue消費
@RabbitListener(queues = "${rabbitmq.queue.sendSms}",concurrency = "5",containerFactory =
"rabbitListenerContainerFactory")
public void onMessage(Message message) {
}
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.6.8</version>
</dependency>
spring:
rabbitmq:
port: 5672
host: 192.168.243.128:5672
username: admin
password: admin
virtual-host: xmh
publisher-confirm-type: correlated
publisher-returns: true
listener:
direct:
acknowledge-mode: auto
simple:
acknowledge-mode: auto
retry:
enabled: true
@Configuration
public class RabbitConfig {
@Value("${spring.rabbitmq.host}")
private String addresses;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
// 構建mq實例工廠
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
return connectionFactory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 若設置為手動確認,必須手動調用ack AUTO 自動確認 默認值
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
return new RabbitAdmin(connectionFactory);
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //每次注入的時候回自動創建一個新的bean實例
public RabbitTemplate rabbitTemplate(){
return new RabbitTemplate(connectionFactory());
}
自動綁定
package com.xmh.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// 交換機名稱
public static final String EXCHANGE_NAME = "xmh.exchange";
// 隊列名稱
public static final String QUEUE_NAME = "xmh.queue";
public static final String QUEUE_NAME2 = "xmh2.queue";
// 路由鍵
public static final String ROUTING_KEY = "xmh.routing.key";
public static final String ROUTING_KEY2 = "xmh2.routing.key";
/**
* 聲明交換機
* 這里使用的是Direct類型交換機,也可以根據需要使用Topic、Fanout等類型
*/
@Bean
public DirectExchange exampleExchange() {
// durable: 是否持久化 autoDelete: 是否自動刪除
return new DirectExchange(EXCHANGE_NAME, true, false);
}
/**
* 聲明隊列
*/
@Bean
public Queue exampleQueue() {
// durable: 是否持久化 exclusive: 是否排他 autoDelete: 是否自動刪除
return QueueBuilder.durable(QUEUE_NAME).build();
}
@Bean
public Queue exampleQueue2() {
// durable: 是否持久化 exclusive: 是否排他 autoDelete: 是否自動刪除
return QueueBuilder.durable(QUEUE_NAME2).build();
}
/**
* 綁定交換機和隊列
*/
@Bean
public Binding binding(Queue exampleQueue, DirectExchange exampleExchange) {
// 將隊列通過路由鍵綁定到交換機
return BindingBuilder.bind(exampleQueue).to(exampleExchange).with(ROUTING_KEY);
}
@Bean
public Binding binding2(Queue exampleQueue2, DirectExchange exampleExchange) {
// 將隊列通過路由鍵綁定到交換機
return BindingBuilder.bind(exampleQueue2).to(exampleExchange).with(ROUTING_KEY2);
}
}
rabbitTemplate.convertAndSend(EXCHANGE_NAME,ROUTING_KEY2, ROUTING_KEY2+name);
//指定queue消費
@SneakyThrows
@RabbitListener(queues = QUEUE_NAME2,concurrency = "1")
public void queueX2(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
String msg = new String(message.getBody(),"UTF-8");
System.out.println(QUEUE_NAME2+"收到消息:"+msg+":"+ message);
}
RabbitMq 消費失敗,重試機制
自定義異常業務處理
步驟1 設置自動確認,自定義表記錄失敗數據(包含業務數據、失敗次數)觸發定時任務重新投放該消息,執行3次,每次間隔30分鐘
執行成功,刪除重發任務
執行失敗,第一次失敗執行步驟1,否則記錄失敗次數,達到一定次數刪除定時任務
channel.basicReject(deliveryTag, true);// 拒絕消息并重回隊列
安裝rabbitmq
https://www.rabbitmq.com/release-information rabbitmq erlang CentOS tar -zxf otp_src_28.0.2.tar.gz 修改 Centos 鏡像地址 參考 http://www.rzrgm.cn/xingminghui111/articles/17514510.html sudo yum install -y ncurses-devel sudo yum install -y gcc gcc-c++ openssl-devel libssh-devel unixODBC-devel ./configure && make && make install erl -version rpm -i --nodeps rabbitmq-server-4.0.9-1.el8.noarch.rpm service rabbitmq-server start service rabbitmq-server status 查看服務狀態 配置管理員賬號: rabbitmqctl add_user admin admin rabbitmqctl set_user_tags admin administrator # 設置用戶為管理員 # 授予所有權限 rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" # 啟用管理插件 rabbitmq-plugins enable rabbitmq_management # 重啟服務使配置生效 systemctl restart rabbitmq-server 關閉防火墻 sudo systemctl stop firewalld sudo systemctl disable firewalld 后臺管理 http://192.168.243.128:15672/
# 停止服務 自啟動
systemctl stop rabbitmq-server sudo systemctl enable rabbitmq-server
其他
@Service
public class OrderItemServiceImpl {
/**
* https://blog.csdn.net/aetawt/article/details/128957417
* 監聽方式獲取消息 消息只消費一次,其他消費者消費后,本消費者不再消費
* @RabbitHandler:標注在方法上
* @RabbitListener: 標注在類、方法上
* 使用 @RabbitHandler + @RabbitListener 接受不同類型的消息
*/
@RabbitListener(queues = {MqConstant.SEARCH_DATA_QUEUE})
public void recieveOrderMessage(Message message, Channel channel) throws IOException {
System.out.println("收到了消息了--->" + message + " ====》內容:" + new String(message.getBody()));
System.out.println("渠道數量:" + channel.getChannelNumber());
MessageProperties messageProperties = message.getMessageProperties();
System.out.println("消息處理完成---------------------------------");
//消息順序,自增
long deliveryTag = messageProperties.getDeliveryTag();
System.out.println(deliveryTag);
//回復,簽收消息, fasle表示只簽收當前消息,true簽收所有
channel.basicAck(deliveryTag, false);
}
/**
* RabbitTemplate 拉取的方式獲取消息
*/
@Component
public class RabbitListenerConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void receive(){
Thread thread = new Thread(){
public void run(){
while (true){
System.out.println("*************ready");
String message = (String)rabbitTemplate.receiveAndConvert(MqConstant.SEARCH_DATA_QUEUE,1000*2);
System.out.println("*************receive:"+message);
}
}
};
thread.start();
}
自定義多線程處理
@RabbitListener(queues = {MqConstant.DEAL_TOBE_HANDLE})//,containerFactory = "batchQueueRabbitListenerContainerFactory"
@Bean("batchQueueRabbitListenerContainerFactory")
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//確認方式,manual為手動ack.
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//每次處理數據數量,提高并發量
//factory.setPrefetchCount(250);
//設置線程數
//factory.setConcurrentConsumers(30);
//最大線程數
//factory.setMaxConcurrentConsumers(50);
/* setConnectionFactory:設置spring-amqp的ConnectionFactory。 */
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(1);
factory.setPrefetchCount(1);
//factory.setDefaultRequeueRejected(true);
//使用自定義線程池來啟動消費者。
factory.setTaskExecutor(taskExecutor());
return factory;
}
@Bean("correctTaskExecutor")
@Primary
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 設置核心線程數
executor.setCorePoolSize(100);
// 設置最大線程數
executor.setMaxPoolSize(100);
// 設置隊列容量
executor.setQueueCapacity(0);
// 設置線程活躍時間(秒)
executor.setKeepAliveSeconds(300);
// 設置默認線程名稱
executor.setThreadNamePrefix("thread-rabbitmq");
// 設置拒絕策略rejection-policy:當pool已經達到max size的時候, 調用者執行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任務結束后再關閉線程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}

浙公網安備 33010602011771號