Rabbitmq的使用
rabbitmq的使用
1. 使用場景及它的特點介紹

2. mq的5種常用消息模型
2.1 隊列模型—-1 對 1


2.2 隊列模型 — 1(生產者)對多(消費者)
特點:
1.當有多個消費者時,無論消費者處理的性能是否相同,生產者的消費會平均分配給每一個消費者
2.每個消費者處理的消息是否存在重復? 不會重復
解釋:為什么開啟多個消費者時,會出現有的消費者雖然處理的慢,但是也會收到相同的消息的個數?
rabbitmq有消息默認的分配機制:平均分配(有多少個消費者,都將平均分配要處理的消息數)
優化: 能者多勞
在消費處理消息時,可以設置由隊列每次分配給消費者的消息數量,不要一次性全分完
2.3 隊列模式的代碼實現
2.3.1 生產的核心代碼
import cn.itsource.mq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
private static final String QUEUE_NAME = "queue_workqueue";
public static void main(String[] args) throws Exception {
//1.創建連接
Connection connection = ConnectionUtil.getConnection();
//2.生產者與服務端之間建立通道
Channel channel = connection.createChannel();
for (int i = 0; i < 20; i++) {
/**
* 發送消息到隊列
* @param exchange 交換機名稱
* @param routingKey 發送到哪個隊列(這個參數很容易搞錯:沒有交換機時,這個參數必須填隊列名稱;有交換機的時候,就填路由)
* @param props 消息的其他屬性
* @param body 消息體
*/
//在實際開發中,我們也會將發送的內容,以字符串進行傳輸。但是涉及到對象類型,會將其先轉為json字符串。
String message = "queue_workqueue: 這是一個消息!" + i;
System.out.println(message);
//3. 調用API進行消息的發送
channel.basicPublish("",QUEUE_NAME,null,message.getBytes("utf-8"));
}
//5.關閉連接
connection.close();
}
}
2.3.2 消費者的代碼實現
import cn.itsource.mq.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer01 {
//隊列的名稱,必須要與接收的消息生產者,設置的隊列名相同
private static final String QUEUE_NAME = "queue_workqueue";
public static void main(String[] args) throws Exception {
//1.創建連接
Connection connection = ConnectionUtil.getConnection();
//2.生產者與服務端之間建立通道
Channel channel = connection.createChannel();
//3.聲明隊列:因為生產者那邊已經聲明過隊列了,所以這邊就不需要聲明隊列
/**
* 3.聲明隊列
* @param queue 隊列名稱
* @param durable 是否持久化
* @param exclusive 是否為專用隊列
* @param autoDelete 是否自動刪除
* @param arguments 其他參數
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//設置消費者每次預提取1個消息【這是一個提高消息處理效率的參數。表示每次接收幾個消息】
channel.basicQos(1);
//4. 采用匿名內部類 寫一個DefaultConsumer的子類,子類中重寫handleDelivery方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//接收消息
String message = new String(body, "utf-8");
Thread.sleep(500);
System.out.println("消費者收到消息:" + message);
long deliveryTag = envelope.getDeliveryTag();
/**
【如果采用默認的 自動確認ACK機制 ,則可省略】
* 正常情況下的手動回執
* @param deliveryTag 處理消息的標識
* @param multiple 是否自動批量處理(自動處理隊列中當前消息,及之前的所有消息) false表示只處理當前消息
*/
//注意:當ACK采用手動確認機制時,確認消息的成功發送的代碼,一定要放在當前方法體的最后一行
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
e.printStackTrace();
}
}
};
/**
* 5.監聽隊列
* 一旦被監聽的隊列中有新的消息,就自動調用consumer對象的handleDelivery方法來接收消息
* @param queue 隊列名稱
* @param autoAck 是否自動回執 true表示自動回執,false表示手動回執
* @param callback 接收消息的回調方法Consumer
*/
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
2.4 訂閱模型的代碼實現
2.4.1 訂閱模型分3種
1. fanout類型 : 1.不需要設置routekey,生產者的消息,會統一分別發給每一個消費者
2. direct : 1. 設置routekey,且生產者在發送消息時,也要指定routekey,且消息在過濾時,需要完全匹配生產指定的routekey
3. topic : 1. 在設置toutekey時,可以引用【通配符】 ;2.通配符分2種:*:單個匹配;#:多個匹配
-
fanout模型的效果圖

-
direct效果圖

-
topic效果圖

2.4.2 生產者的代碼實現
import cn.itsource.mq.utils.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws Exception {
//1.創建連接
Connection connection = ConnectionUtil.getConnection();
//2.生產者與服務端之間建立通道
Channel channel = connection.createChannel();
/**
* 3.聲明交換機
* @param exchange 交換機名稱
* @param type 交換機類型
* @param durable 是否持久化
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
/**
* 4.發送消息到隊列
* @param exchange 交換機名稱
* @param routingKey 發送到哪個隊列(這個參數很容易搞錯:沒有交換機時,這個參數必須填隊列名稱;有交換機的時候,就填路由)
* @param props 消息的其他屬性
* @param body 消息體
*/
String message = "這是一個消息!" + System.currentTimeMillis();
System.out.println(message);
//要指定 路由key : routekey。設置后,對應的消費者,只要在監聽指定的路由key的消息,才會收取到
channel.basicPublish(EXCHANGE_NAME,"email",null,message.getBytes("utf-8"));
//5.關閉連接
connection.close();
}
}
2.4.3 消費者的代碼實現
import cn.itsource.mq.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerEMAIL {
private static final String QUEUE_NAME_EMAIL = "queue_direct_email";
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws Exception {
//1.創建連接
Connection connection = ConnectionUtil.getConnection();
//2.生產者與服務端之間建立通道
Channel channel = connection.createChannel();
/**
* 3.聲明隊列
* @param queue 隊列名稱
* @param durable 是否持久化
* @param exclusive 是否為專用隊列
* @param autoDelete 是否自動刪除
* @param arguments 其他參數
*/
channel.queueDeclare(QUEUE_NAME_EMAIL,true,false,false,null);
/**
在綁定到 指定的交換機時,要同時指定接收什么類型的 routekey消息
* 4.將隊列綁定到交換機
* @param queue 隊列名稱
* @param exchange 交換機名稱
* @param routingKey 路由設置
* @param arguments 其他參數
*/
channel.queueBind(QUEUE_NAME_EMAIL,EXCHANGE_NAME,"email",null);
//設置消費者每次預提取1個消息
channel.basicQos(1);
//采用匿名內部類 寫一個DefaultConsumer的子類,子類中重寫handleDelivery方法
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//接收消息
String message = new String(body,"utf-8");
Thread.sleep(2000);
System.out.println("消費者收到消息:"+message);
/**
* 正常情況下的手動回執
* @param deliveryTag 處理消息的標識
* @param multiple 是否自動批量處理(自動處理隊列中當前消息,及之前的所有消息) false表示只處理當前消息
*/
channel.basicAck(envelope.getDeliveryTag(),false);
} catch (Exception e) {
e.printStackTrace();
}
}
};
/**
* 4.監聽隊列
* 一旦被監聽的隊列中有新的消息,就自動調用consumer對象的handleDelivery方法來接收消息
* @param queue 隊列名稱
* @param autoAck 是否自動回執 true表示自動回執,false表示手動回執
* @param callback 接收消息的回調方法Consumer
*/
channel.basicConsume(QUEUE_NAME_EMAIL, false, consumer);
}
}
3. springboot整合mq
- springboot整合mq時,在企業開發中,都會將生產者和消費者分開集成到 2個工程中
3.1 整合生產者
3.1.1 導入pom依賴
<!--spirngboot集成rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.1.2 配置yml
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtualHost: /
listener:
simple:
acknowledge-mode: manual #手動簽收
prefetch: 1 #預提取1條消息
publisher-confirms: true #消息發送到交換機失敗回調
publisher-returns: true #消息發送到隊列失敗回調
template:
mandatory: true # 必須設置成true 消息路由失敗通知監聽者,而不是將消息丟棄
3.1.3 配置啟動類的注解
-
不需要在啟動類添加開啟的注解,但是需要添加幾個@Bean的配置
-
配置bean
public static final String EXCHANGE_NAME = "springboot-rabbitmq-exchange"; public static final String QUEUE_NAME_SMS = "springboot-rabbitmq-queue-sms"; public static final String QUEUE_NAME_EMAIL = "springboot-rabbitmq-queue-email"; /** * 聲明交換機 * @return */ @Bean(EXCHANGE_NAME) public Exchange EXCHANGE_NAME(){ return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } /** * 聲明隊列:sms * @return */ @Bean(QUEUE_NAME_SMS) public Queue QUEUE_NAME_SMS(){ return QueueBuilder.durable(QUEUE_NAME_SMS).build(); } /** * 聲明隊列:email * @return */ @Bean(QUEUE_NAME_EMAIL) public Queue QUEUE_NAME_EMAIL(){ return QueueBuilder.durable(QUEUE_NAME_EMAIL).build(); } /** * sms隊列綁定到交換機 * 需要參數有兩個辦法: * 1)直接在方法內部調用其他方法獲取對象 * 2)直接方法參數中寫變量,Spring會自動從Spring容器取出對象進行依賴注入 * @param queue * @param exchange * @return */ @Bean public Binding BINDING_QUEUE_NAME_SMS(@Qualifier(QUEUE_NAME_SMS)Queue queue, Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("user.#.sms").noargs(); } /** * email隊列綁定到交換機 * @param queue * @param exchange * @return */ @Bean public Binding BINDING_QUEUE_NAME_EMAIL(@Qualifier(QUEUE_NAME_EMAIL)Queue queue, Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("user.#.email").noargs(); }
3.1.4 測試
- 先定義一個controller,調用RabbitmqTemplate方法。
在瀏覽器中,調用一次下面的消息發送的方法,就到 rabbitmq服務器中,檢查是否生成了對應的exchange和queue的內容
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg")
public void sendMsg(String msg) {
//發送一個消息給mq服務器
rabbitTemplate.convertAndSend(Contants.EXCHANGE_NAME, "user.email", msg);
}
}
- 檢查rabbitmq服務器,是否會生成對應的exchange和queue的數據


3.2 整合消費者
3.2.1 導入pom依賴
<!--spirngboot集成rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.2.2 配置yml
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtualHost: /
listener:
simple:
acknowledge-mode: manual #手動簽收
prefetch: 1 #預提取1條消息
publisher-confirms: true #消息發送到交換機失敗回調
publisher-returns: true #消息發送到隊列失敗回調
template:
mandatory: true # 必須設置成true 消息路由失敗通知監聽者,而不是將消息丟棄
3.2.3 配置啟動注解或bean
無
3.2.4 測試
-
消費者的核心代碼
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class ConsumerListener { public static final String QUEUE_NAME_SMS = "springboot-rabbitmq-queue-sms"; public static final String QUEUE_NAME_EMAIL = "springboot-rabbitmq-queue-email"; /** * 監聽器:監聽一個或者多個隊列 * 被監聽的隊列中一旦有了新的消息,就自動執行此方法來處理消息 * @param msg * @param message * @param channel */ @RabbitListener(queues = {QUEUE_NAME_SMS}) public void accept_sms(String msg, Message message, Channel channel){ try { System.out.println("SMS消費者收到消息:" + msg); //成功接收消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace(); } } /** * 監聽器:監聽一個或者多個隊列 * 被監聽的隊列中一旦有了新的消息,就自動執行此方法來處理消息 * @param msg * @param message * @param channel */ @RabbitListener(queues = {QUEUE_NAME_EMAIL}) public void accept_email(String msg, Message message, Channel channel){ try { System.out.println("EMAIL消費者收到消息:" + msg); //成功接收消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace(); } } } -
啟動消費者的工程后,不需要做任何事,只要生產者發送成功一條消息,消費者就應該能接收到 消息內容,如果接收不到 ,說明 環境 配置失敗

浙公網安備 33010602011771號