RabbitMQ - 常用消息隊(duì)列之:發(fā)布訂閱模式【fanout Exchange】
Exchange fanout模式介紹

Producer:消息的生產(chǎn)者(發(fā)送消息的程序)。Exchange:交換機(jī),負(fù)責(zé)發(fā)送消息給指定隊(duì)列。Queue:消息隊(duì)列,理解為一個(gè)容器,生產(chǎn)者向它發(fā)送消息,它把消息存儲(chǔ),等待消費(fèi)者消費(fèi)。Consumer:消息的消費(fèi)者(接收消息的程序)。
如何理解
Fanout 直譯為 “扇出” 但是大家更多的會(huì)把它叫做廣播或者發(fā)布與訂閱,它是一種沒有路由key的模式,生產(chǎn)者將消息發(fā)送給交換機(jī),交換機(jī)會(huì)把所有消息復(fù)制同步到所有與它綁定過的隊(duì)列上,而每個(gè)隊(duì)列只能有一個(gè)消費(fèi)者拿到這條消息,如果在一個(gè)消費(fèi)者連接中,創(chuàng)建多個(gè)通道,則會(huì)出現(xiàn)爭(zhēng)搶消息的結(jié)果。
在RabbitMQ的Exchange模式中生產(chǎn)者并不會(huì)直接把消息發(fā)送到Queue中,而是將消息發(fā)送到Exchange(交換機(jī)),消費(fèi)者創(chuàng)建各自的隊(duì)列綁定到交換機(jī).
- 發(fā)布訂閱模式(fanout)

生產(chǎn)者實(shí)現(xiàn), 把隊(duì)列替換成了交換機(jī),,發(fā)布消息時(shí)把交換機(jī)名稱告訴RabbitMQ,把交換機(jī)設(shè)置成fanout發(fā)布訂閱模式
.net 5.0 測(cè)試代碼:消費(fèi)者隊(duì)列名稱如果一樣就和Worker工作模式一樣
- 生產(chǎn)者:
using RabbitMQ.Client;
using System;
using System.Text;
namespace RabbitMQTest.Producer
{
/// <summary>
/// RabbitMQ測(cè)試_Producer生產(chǎn)者
/// </summary>
class Program
{
static void Main(string[] args)
{
// 1、創(chuàng)建連接工廠
IConnectionFactory factory = new ConnectionFactory()
{
UserName = "admin",
Password = "admin",
HostName = "192.168.1.101", //rabbitmq server ip
Port = 5672,
VirtualHost = "testhost" //在此連接期間要訪問的虛擬主機(jī)。 默認(rèn)值[ / ]
};
// 2、創(chuàng)建連接
IConnection connection = factory.CreateConnection();
// 3、創(chuàng)建通道
IModel channel = connection.CreateModel();
// 交換機(jī)名稱
string exchangeName = "exchangeTest";
// 4、把交換機(jī)設(shè)置成fanout發(fā)布訂閱模式
channel.ExchangeDeclare(exchangeName, type: ExchangeType.Fanout);
Console.WriteLine("\n RabbitMQ連接成功,請(qǐng)輸入消息,輸入exit退出!");
string input;
do
{
input = Console.ReadLine();
byte[] sendBytes = Encoding.UTF8.GetBytes(input);
//發(fā)布消息
channel.BasicPublish(exchangeName, "", null, sendBytes);
//Console.WriteLine("消息發(fā)布完畢");
} while (input.Trim().ToLower() != "exit");
Console.WriteLine("\n RabbitMQ測(cè)試完畢!");
// 6、關(guān)閉通道
channel.Close();
// 7、關(guān)閉連接
connection.Close();
}
}
}
- exchangeDeclare 方法解釋
- 參數(shù)1:exchange(交換機(jī)名稱),如果交換機(jī)不存在,則自動(dòng)創(chuàng)建
- 參數(shù)2:type(類型),此處選擇 fanout 模式
- 消費(fèi)者1
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
namespace RabbitMQTest.Consumer
{
/// <summary>
/// RabbitMQ測(cè)試_Consumer消費(fèi)者
/// </summary>
class Program
{
static void Main(string[] args)
{
// 1、創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory()
{
UserName = "admin", //用戶名
Password = "admin", //密碼
HostName = "192.168.1.101", //rabbitmq server ip
Port = 5672, //端口號(hào)
VirtualHost = "testhost" //在此連接期間要訪問的虛擬主機(jī)。
};
// 2、創(chuàng)建連接
IConnection connection = factory.CreateConnection();
// 3、創(chuàng)建通道
IModel channel = connection.CreateModel();
// 交換機(jī)名稱
string exchangeName = "exchangeTest";
// 4、聲明交換機(jī)
channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
// 消息隊(duì)列名稱
string queueName = DateTime.Now.Year.ToString();
// 5、聲明隊(duì)列
channel.QueueDeclare(queueName, false, false, false, null);
channel.QueueBind(queueName, exchangeName, "", null);
// 6、定義消費(fèi)者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
Console.WriteLine($"隊(duì)列名稱:{queueName}");
// 7、接收到消息事件
consumer.Received += (ch, ea) =>
{
string message = Encoding.Default.GetString(ea.Body.ToArray());
Console.WriteLine($"接受到消息:{message}");
Thread.Sleep(5000);
// 8、確認(rèn)該消費(fèi)已被消費(fèi)
channel.BasicAck(ea.DeliveryTag, true);
};
// 9、啟動(dòng)消費(fèi)者 設(shè)置為自動(dòng)應(yīng)答消息
channel.BasicConsume(
queue: queueName, // 消息隊(duì)列名稱
autoAck: false, // 兩種消息確認(rèn)模式false 手動(dòng)模式 true自動(dòng)模式
consumer: consumer);
Console.WriteLine("消費(fèi)者1已啟動(dòng)");
Console.ReadKey();
channel.Close();
connection.Close();
}
}
}
- queueBind 方法解釋
- 參數(shù)1:queue(臨時(shí)隊(duì)列)
- 參數(shù)2:exchange(交換機(jī))
- 參數(shù)3:routingKey(路由key)
- 消費(fèi)者2
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace RabbitMQTest.Consumer2
{
class Program
{
static void Main(string[] args)
{
// 1、創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory()
{
UserName = "admin", //用戶名
Password = "admin", //密碼
HostName = "192.168.1.101", //rabbitmq server ip
Port = 5672, //端口號(hào)
VirtualHost = "testhost" //在此連接期間要訪問的虛擬主機(jī)。
};
// 2、創(chuàng)建連接
IConnection connection = factory.CreateConnection();
// 3、創(chuàng)建通道
IModel channel = connection.CreateModel();
// 交換機(jī)名稱
string exchangeName = "exchangeTest";
// 4、聲明交換機(jī)
channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
// 消息隊(duì)列名稱
string queueName = DateTime.Now.Second.ToString();
// 5、聲明隊(duì)列
channel.QueueDeclare(queueName, false, false, false, null);
channel.QueueBind(queueName, exchangeName, "", null);
// 6、定義消費(fèi)者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
Console.WriteLine($"隊(duì)列名稱:{queueName}");
// 7、接收到消息事件
consumer.Received += (ch, ea) =>
{
string message = Encoding.Default.GetString(ea.Body.ToArray());
Console.WriteLine($"接受到消息:{message}");
// 8、確認(rèn)該消費(fèi)已被消費(fèi)
channel.BasicAck(ea.DeliveryTag, true);
};
// 9、啟動(dòng)消費(fèi)者 設(shè)置為自動(dòng)應(yīng)答消息
channel.BasicConsume(
queue: queueName, // 消息隊(duì)列名稱
autoAck: false, // 兩種消息確認(rèn)模式false 手動(dòng)模式 true自動(dòng)模式
consumer: consumer);
Console.WriteLine("消費(fèi)者2已啟動(dòng)");
Console.ReadKey();
channel.Close();
connection.Close();
}
}
}
當(dāng)消費(fèi)者綁定同樣的交換機(jī),可以看到兩個(gè)不同的消費(fèi)者都能接受到生產(chǎn)者發(fā)送的所有消息。


浙公網(wǎng)安備 33010602011771號(hào)