RabbitMQ - 方法的簡(jiǎn)單封裝
說(shuō)明
為了方便使用,做了簡(jiǎn)單的封裝,生產(chǎn)消息可以使用泛型等,可以每次創(chuàng)建,可以使用單例模式,或者IOC使配合單例模式使用。這里就不一一介紹,請(qǐng)大家根據(jù)自己的業(yè)務(wù)場(chǎng)景設(shè)計(jì)。
測(cè)試代碼:只有簡(jiǎn)單隊(duì)列的代碼作為參考
- RabbitMQHelper
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace RabbitMQTest.Common
{
public class RabbitMQHelper
{
/// <summary>
/// 連接工廠
/// </summary>
ConnectionFactory connectionFactory;
/// <summary>
/// 連接
/// </summary>
IConnection connection;
/// <summary>
/// 通道
/// </summary>
IModel channel;
/// <summary>
/// 交換機(jī)名稱(chēng)
/// </summary>
string exchangeName;
/// <summary>
/// 構(gòu)造函數(shù)
/// </summary>
private RabbitMQHelper()
{
//創(chuàng)建連接工廠
connectionFactory = new ConnectionFactory()
{
HostName = "192.168.1.101",
Port = 5672,
UserName = "admin",
Password = "admin",
VirtualHost = "testhost"
};
//創(chuàng)建連接
connection = connectionFactory.CreateConnection();
//創(chuàng)建通道
channel = connection.CreateModel();
}
private static readonly Lazy<RabbitMQHelper> _singletonLock = new Lazy<RabbitMQHelper>(() => new RabbitMQHelper());
public static RabbitMQHelper Instance
{
get
{
return _singletonLock.Value;
}
}
/// <summary>
/// 生產(chǎn)消息
/// <para>簡(jiǎn)單隊(duì)列(一對(duì)一模式)、Worker隊(duì)列(一對(duì)多模式)</para>
/// </summary>
/// <param name="queueName">隊(duì)列名稱(chēng)</param>
/// <param name="msg">消息內(nèi)容</param>
public void SendMsg(string queueName, string msg)
{
//聲明一個(gè)隊(duì)列
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
byte[] body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
}
/// <summary>
/// 消費(fèi)消息
/// <para>簡(jiǎn)單隊(duì)列(一對(duì)一模式)、Worker隊(duì)列(一對(duì)多模式)</para>
/// </summary>
/// <param name="queueName">隊(duì)列名稱(chēng)</param>
/// <param name="received">消費(fèi)消息</param>
public void Receive(string queueName, Action<string> received)
{
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
string message = Encoding.Default.GetString(ea.Body.ToArray());
received(message);
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(queueName, false, consumer);
}
}
}
- 使用方式
using RabbitMQTest.Common;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace RabbitMQTest.Con
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("請(qǐng)輸入消息隊(duì)列測(cè)試數(shù)據(jù):");
string input = string.Empty;
Task.Run(() =>
{
do
{
input = Console.ReadLine();
RabbitMQHelper.Instance.SendMsg("test_queue_1", input);
} while (input.Trim().ToLower() != "exit");
});
Task.Run(() =>
{
RabbitMQHelper.Instance.Receive("test_queue_1", item =>
{
Console.WriteLine($"消費(fèi)消息:{item}");
});
});
//主線程不死
while (true)
{
Thread.Sleep(10000);
if (input == "exit") return;
}
}
}
}

浙公網(wǎng)安備 33010602011771號(hào)