RabbitMQ - 常用消息隊列之:路由模式【Direct Exchange】
路由模式介紹

Producer:消息的生產者(發送消息的程序)。Exchange:交換機,負責發送消息給指定隊列。routingKey:路由key,即上圖的 key1,key2 等,相當于在交換機和隊列之間又加了一層限制Queue:消息隊列,理解為一個容器,生產者向它發送消息,它把消息存儲,等待消費者消費。Consumer:消息的消費者(接收消息的程序)。
如何理解
路由模式的交換機類型是 direct,與 fanout 模式相比,多了路由 key 這個概念。生產者發送攜帶指定 routingKey(路由key) 的消息到交換機,交換機拿著此 routingKey 去找到綁定了這個 routingKey 的隊列,然后發送到此隊列,一個隊列可以綁定多個 routingKey 。

路由模式下,在發布消息時指定不同的routeKey,交換機會根據不同的routeKey分發消息到不同的隊列中
.net 5.0 代碼實現:
- 生產者實現
using RabbitMQ.Client;
using System;
using System.Text;
namespace RabbitMQTest.Producer
{
/// <summary>
/// RabbitMQ測試_Producer生產者
/// </summary>
class Program
{
static void Main(string[] args)
{
// 1、創建連接工廠
IConnectionFactory factory = new ConnectionFactory()
{
UserName = "admin",
Password = "admin",
HostName = "192.168.1.101", //rabbitmq server ip
Port = 5672,
VirtualHost = "testhost" //在此連接期間要訪問的虛擬主機。 默認值[ / ]
};
// 2、創建連接
IConnection connection = factory.CreateConnection();
// 3、創建通道
IModel channel = connection.CreateModel();
// 交換機名稱
string exchangeName = "exchangeTest";
string routeKey = "key1";
// 4、把交換機設置成 Direct 路由模式
channel.ExchangeDeclare(exchangeName, type: ExchangeType.Direct);
Console.WriteLine("\n RabbitMQ連接成功,請輸入消息,輸入exit退出!");
string input;
do
{
input = Console.ReadLine();
byte[] sendBytes = Encoding.UTF8.GetBytes(input);
//發布消息
channel.BasicPublish(exchangeName, routeKey, null, sendBytes);
} while (input.Trim().ToLower() != "exit");
Console.WriteLine("\n RabbitMQ測試完畢!");
// 6、關閉通道
channel.Close();
// 7、關閉連接
connection.Close();
}
}
}
申明一個routeKey值為key1,并在發布消息的時候告訴了RabbitMQ,消息傳遞時routeKey必須匹配,才會被隊列接收否則消息會被拋棄。
- 消費者實現:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
namespace RabbitMQTest.Consumer
{
/// <summary>
/// RabbitMQ測試_Consumer消費者
/// </summary>
class Program
{
static void Main(string[] args)
{
Console.WriteLine("輸入接受key的名稱");
string routekey = Console.ReadLine();
// 1、創建連接工廠
ConnectionFactory factory = new ConnectionFactory()
{
UserName = "admin", //用戶名
Password = "admin", //密碼
HostName = "192.168.1.101", //rabbitmq server ip
Port = 5672, //端口號
VirtualHost = "testhost" //在此連接期間要訪問的虛擬主機。
};
// 2、創建連接
IConnection connection = factory.CreateConnection();
// 3、創建通道
IModel channel = connection.CreateModel();
// 交換機名稱
string exchangeName = "exchangeTest";
// 4、聲明交換機
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
// 消息隊列名稱
string queueName = DateTime.Now.Year.ToString();
// 5、聲明隊列
channel.QueueDeclare(queueName, false, false, false, null);
// 5.1、將隊列,交換機和key綁定 這里可以綁定多個key
channel.QueueBind(queueName, exchangeName, routekey, null);
// 6、定義消費者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
Console.WriteLine($"隊列名稱:{queueName}");
// 7、接收到消息事件
consumer.Received += (ch, ea) =>
{
string message = Encoding.Default.GetString(ea.Body.ToArray());
Console.WriteLine($"接受到消息:{message}");
// 8、確認該消費已被消費
channel.BasicAck(ea.DeliveryTag, true);
};
// 9、啟動消費者 設置為自動應答消息
channel.BasicConsume(
queue: queueName, // 消息隊列名稱
autoAck: false, // 兩種消息確認模式false 手動模式 true自動模式
consumer: consumer);
Console.WriteLine("消費者1已啟動");
Console.ReadKey();
channel.Close();
connection.Close();
}
}
}

在routeKey匹配的時候才會接收消息,接收者消息隊列可以聲明多個routeKey與交換機進行綁定
routeKey不匹配則不接收消息。

浙公網安備 33010602011771號