RabbitMQ - 常用消息隊(duì)列之:簡(jiǎn)單隊(duì)列(一對(duì)一模式)
介紹

Producer:消息的生產(chǎn)者(發(fā)送消息的程序)。Queue:消息隊(duì)列,理解為一個(gè)容器,生產(chǎn)者向它發(fā)送消息,它把消息存儲(chǔ),等待消費(fèi)者消費(fèi)。Consumer:消息的消費(fèi)者(接收消息的程序)。
由圖所示,簡(jiǎn)單隊(duì)列模式,一個(gè)生產(chǎn)者,經(jīng)過(guò)一個(gè)隊(duì)列,對(duì)應(yīng)一個(gè)消費(fèi)者。可以看做是點(diǎn)對(duì)點(diǎn)的一種傳輸方式,相較架構(gòu)模型圖,最主要的特點(diǎn)就是看不到 Exchange(交換機(jī)) 和 routekey(路由鍵) ,正是因?yàn)檫@種模式簡(jiǎn)單,所以并不會(huì)涉及到復(fù)雜的條件分發(fā)等等,因此也不需要用戶去顯式的考慮交換機(jī)和路由鍵的問(wèn)題。
- 但是要注意,這種模式并不是生產(chǎn)者直接對(duì)接隊(duì)列,而是用了默認(rèn)的交換機(jī),默認(rèn)的交換機(jī)會(huì)把消息發(fā)送到和 routekey 名稱相同的隊(duì)列中去,這也是我們?cè)诤竺娲a中在 routekey 位置填寫了隊(duì)列名稱的原因
.net 5.0 NuGet引用包
RabbitMQ.Client
簡(jiǎn)單測(cè)試代碼:
- 生產(chǎn)者實(shí)現(xiàn)
using RabbitMQ.Client;
using System;
using System.Text;
namespace RabbitMQTest.Producer
{
/// <summary>
/// RabbitMQ測(cè)試_Producer生產(chǎn)者
/// </summary>
class Program
{
static void Main(string[] args)
{
// 1、創(chuàng)建連接工廠
IConnectionFactory factory = new ConnectionFactory()
{
UserName = "admin",
Password = "admin",
HostName = "192.168.1.101", //rabbitmq server ip
Port = 5672,
VirtualHost = "testhost" //在此連接期間要訪問(wèn)的虛擬主機(jī)。
};
// 2、創(chuàng)建連接
IConnection connection = factory.CreateConnection();
// 3、創(chuàng)建通道
IModel channel = connection.CreateModel();
string name = "testQueue";
// 4、聲明一個(gè)隊(duì)列
channel.QueueDeclare(
queue: name, //消息隊(duì)列名稱
durable: false, //是否持久化,true持久化,隊(duì)列會(huì)保存磁盤,服務(wù)器重啟時(shí)可以保證不丟失相關(guān)信息。
exclusive: false, //是否排他,true排他的,如果一個(gè)隊(duì)列聲明為排他隊(duì)列,該隊(duì)列僅對(duì)首次聲明它的連接可見,并在連接斷開時(shí)自動(dòng)刪除。
autoDelete: false, //是否自動(dòng)刪除,true是自動(dòng)刪除,自動(dòng)刪除的前提是:致少有一個(gè)消費(fèi)者連接到這個(gè)隊(duì)列,之后所有與這個(gè)隊(duì)列連接的消費(fèi)者都斷開時(shí),才會(huì)自動(dòng)刪除。
arguments: null); //設(shè)置隊(duì)列的一些其它參數(shù)。
Console.WriteLine("\n RabbitMQ連接成功,請(qǐng)輸入消息,輸入exit退出!");
string input;
do
{
input = Console.ReadLine();
byte[] sendBytes = Encoding.UTF8.GetBytes(input);
//發(fā)布消息
channel.BasicPublish("", name, null, sendBytes);
Console.WriteLine("消息發(fā)布完畢");
} while (input.Trim().ToLower() != "exit");
Console.WriteLine("\n RabbitMQ測(cè)試完畢!");
// 6、關(guān)閉通道
channel.Close();
// 7、關(guān)閉連接
connection.Close();
}
}
}
- queueDeclare 方法解釋
- 參數(shù)1:queue(隊(duì)列名稱),如果隊(duì)列不存在,則自動(dòng)創(chuàng)建。
- 參數(shù)2:durable(隊(duì)列是否持久化),持久化可以保證服務(wù)器重啟后此隊(duì)列仍然存在。
- 參數(shù)3:exclusive(排他隊(duì)列)即是否獨(dú)占隊(duì)列,如果此項(xiàng)為 true,該隊(duì)列僅對(duì)首次申明它的連接可見,并在連接斷開時(shí)自動(dòng)刪除。
- 參數(shù)4:autoDelete(自動(dòng)刪除),最后一個(gè)消費(fèi)者將消息消費(fèi)完畢后,自動(dòng)刪除隊(duì)列。
- 參數(shù)5:arguments(攜帶附加屬性)。
- basicPublish 方法解釋
- 參數(shù)1:exchange(交換機(jī)名稱)。
- 參數(shù)2:routingKey(路由key),此處填寫隊(duì)列名,可理解為把消息發(fā)送到和 routekey 名稱相同的隊(duì)列中去。
- 參數(shù)3:props(消息的控制狀態(tài)),可以在此處控制消息的持久化。
- 參數(shù)為:MessageProperties.PERSISTENT_TEXT_PLAIN
- 參數(shù)4:body(消息主體),類型是一個(gè)字節(jié)數(shù)組,要轉(zhuǎn)一下類型。
通過(guò)工具關(guān)閉channel和釋放連接:先關(guān)閉通道,再釋放連接。
- 消費(fèi)者實(shí)現(xiàn)
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
namespace RabbitMQTest.Consumer
{
/// <summary>
/// RabbitMQ測(cè)試_Consumer消費(fèi)者
/// </summary>
class Program
{
static void Main(string[] args)
{
// 1、創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory()
{
UserName = "admin", //用戶名
Password = "admin", //密碼
HostName = "192.168.1.101", //rabbitmq server ip
Port = 5672, //端口號(hào)
VirtualHost = "testhost" //在此連接期間要訪問(wèn)的虛擬主機(jī)。
};
// 2、創(chuàng)建連接
IConnection connection = factory.CreateConnection();
// 3、創(chuàng)建通道
IModel channel = connection.CreateModel();
// 4、事件基本消費(fèi)者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
string name = "testQueue";
// 5、接收到消息事件
consumer.Received += (ch, ea) =>
{
string message = Encoding.Default.GetString(ea.Body.ToArray());
Console.WriteLine($"接受到消息:{message}");
Console.WriteLine($"收到該消息[{ea.DeliveryTag}] 延遲1s發(fā)送回執(zhí)!");
Thread.Sleep(1000);
// 6、確認(rèn)該消費(fèi)已被消費(fèi)
channel.BasicAck(ea.DeliveryTag, false);
Console.WriteLine($"已發(fā)送回執(zhí){ea.DeliveryTag}");
};
// 7、啟動(dòng)消費(fèi)者 設(shè)置為手動(dòng)應(yīng)答消息
channel.BasicConsume(name, false, consumer);
Console.WriteLine("消費(fèi)者已啟動(dòng)");
Console.ReadKey();
channel.Close();
connection.Close();
}
}
}
- basicConsume 方法解釋
- 參數(shù)1:queue(隊(duì)列名稱),即消費(fèi)哪個(gè)隊(duì)列的消息 。
- 參數(shù)2:autoAck(自動(dòng)應(yīng)答)開始消息的自動(dòng)確認(rèn)機(jī)制,只要消費(fèi)了就從隊(duì)列刪除消息。
- 參數(shù)3:callback(消費(fèi)時(shí)的回調(diào)接口),callback 的類型是 Consumer 這里使用了 DefaultConsumer 就是 Consumer 的一個(gè)實(shí)現(xiàn)類。其中重寫 handleDelivery 方法,就可以獲取到消費(fèi)的數(shù)據(jù)內(nèi)容了,這里主要使用了其中的 body,即查看消息主體,其他三個(gè)參數(shù)暫時(shí)還沒用到,有興趣可以先打印輸出一下,能先有個(gè)大概的了解。

浙公網(wǎng)安備 33010602011771號(hào)