netcore下RabbitMQ隊列、死信隊列、延時隊列及小應用
關于安裝rabbitmq這里一筆掠過了。
下面進入正題:
1.新建aspnetcorewebapi空項目,NormalQueue,刪除controllers文件夾已經無關的文件,這里為了偷懶不用console控制臺:
public class Program { public static void Main(string[] args) { var builder = WebApplication.CreateBuilder(args); // Add services to the container. builder.Services.AddControllers(); // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen(); builder.Services.AddHostedService<ConsumerService>(); builder.Services.AddHostedService<DeadLetterExchangeConsuerService>(); builder.Services.AddHostedService<DelayExchangeConsumerService>(); var app = builder.Build(); // Configure the HTTP request pipeline. if (app.Environment.IsDevelopment()) { app.UseSwagger(); app.UseSwaggerUI(); } app.MapGet("/normal/{message}", ([FromRoute] string message) => { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.Port = 5672; using (IConnection connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { var queueName = "rbTest202301"; channel.QueueDeclare(queueName, true, false, false, null); { string sendMessage = string.Format("Message_{0}", message); byte[] buffer = Encoding.UTF8.GetBytes(sendMessage); IBasicProperties basicProperties = channel.CreateBasicProperties(); basicProperties.DeliveryMode = 2; //持久化 1=非持久化 channel.BasicPublish("", queueName, basicProperties, buffer); Console.WriteLine("消息發送成功:" + sendMessage); } } } }); app.MapGet("/deadletterexchange/{message}",([FromRoute] string message) =>{ DeadLetterExchange.Send(message); }); app.MapGet("/delayexchange/{message}", ([FromRoute] string message) => { DelayExchange.SendMessage(message); }); app.UseHttpsRedirection(); app.UseAuthorization(); app.MapControllers(); app.Run(); } }
大概的介紹一下program文件:
這里有三個mini控制器,從這里發送對應的消息到rabbitmq
"/normal/{message}" 普通隊列,
"/deadletterexchange/{message}" 死信隊列
"/deadletterexchange/{message}" 延時隊列
builder.Services.AddHostedService<ConsumerService>(); builder.Services.AddHostedService<DeadLetterExchangeConsuerService>(); builder.Services.AddHostedService<DelayExchangeConsumerService>();
這里就是消費的服務,注冊成HostedService。
ConsumerService代碼如下:
public class ConsumerService : BackgroundService { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { Console.WriteLine("normal Rabbitmq消費端開始工作!"); while (!stoppingToken.IsCancellationRequested) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.Port = 5672; IConnection connection = factory.CreateConnection(); { IModel channel = connection.CreateModel(); { var queueName = "rbTest202301"; channel.QueueDeclare(queueName, true, false, false, null); //輸入1,那如果接收一個消息,但是沒有應答,則客戶端不會收到下一個消息 channel.BasicQos(0, 1, false); //在隊列上定義一個消費者 var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queueName, false, consumer); consumer.Received += (ch, ea) => { byte[] bytes = ea.Body.ToArray(); string str = Encoding.UTF8.GetString(bytes); Console.WriteLine("隊列消息:" + str.ToString()); //回復確認 channel.BasicAck(ea.DeliveryTag, false); }; } } await Task.Delay(5000); } } }
DeadLetterExchangeConsuerService代碼如下:
public class DeadLetterExchangeConsuerService : BackgroundService { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { Console.WriteLine("RabbitMQ消費端死信隊列開始工作"); while (!stoppingToken.IsCancellationRequested) { DeadLetterExchange.Consumer(); await Task.Delay(5000); } } }
public class DeadLetterExchange { public static string dlxExchange = "dlx.exchange"; public static string dlxQueueName = "dlx.queue"; static string exchange = "direct-exchange"; static string queueName = "queue_Testdlx"; static string dlxExchangeKey = "x-dead-letter-exchange"; static string dlxQueueKey = "x-dead-letter-rounting-key"; public static void Send(string message) { using (var connection = new ConnectionFactory() { HostName = "localhost", Port = 5672 }.CreateConnection()) { using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange, ExchangeType.Direct, true, false); //創建交換機 channel.QueueDeclare(queueName, true, false, false,new Dictionary<string, object> { { dlxExchangeKey,dlxExchange }, {dlxQueueKey,dlxQueueName } }); // 創建隊列 channel.QueueBind(queueName, exchange, queueName); var properties = channel.CreateBasicProperties(); properties.Persistent= true;//持久化 channel.BasicPublish(exchange,queueName,properties,Encoding.UTF8.GetBytes(message)); Console.WriteLine($"向隊列:{queueName}發送消息:{message}"); } } } public static void Consumer() { var connection = new ConnectionFactory() { HostName = "localhost", Port = 5672 }.CreateConnection(); var channel = connection.CreateModel(); channel.ExchangeDeclare(dlxExchange, ExchangeType.Direct, true, false); //創建sixin交換機 channel.QueueDeclare(dlxQueueName, true, false, false); // 創建sixin隊列 channel.QueueBind(dlxQueueName, dlxExchange, dlxQueueName); //綁定sixin隊列sixin交換機 channel.ExchangeDeclare(exchange, ExchangeType.Direct, true, false); //創建交換機 channel.QueueDeclare(queueName, true, false, false, new Dictionary<string, object> { { dlxExchangeKey,dlxExchange }, {dlxQueueKey,dlxQueueName } }); // 創建隊列 channel.QueueBind(queueName, exchange, queueName); var consumer = new EventingBasicConsumer(channel); channel.BasicQos(0, 1, false); consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"隊列{queueName}消費消息:{message},不做ack確認"); channel.BasicNack(ea.DeliveryTag, false, requeue: false); }; channel.BasicConsume(queueName, autoAck: false, consumer); } }
DelayExchangeConsumerService代碼如下:
public class DelayExchangeConsumerService : BackgroundService { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { Console.WriteLine("RabbitMQ消費端延遲隊列開始工作"); while (!stoppingToken.IsCancellationRequested) { DelayExchange.Consumer(); await Task.Delay(5000); } } }
public class DelayExchange { public static void SendMessage(string message) { //死信交換機 string dlxexChange = "dlx.exchange"; //死信隊列 string dlxQueueName = "dlx.queue"; //消息交換機 string exchange = "direct-exchange"; //消息隊列 string queueName = "delay_queue"; using (var connection = new ConnectionFactory() { HostName = "localhost", Port = 5672 }.CreateConnection()) { using (var channel = connection.CreateModel()) { ////創建死信交換機 //channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false); ////創建死信隊列 //channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false); ////死信隊列綁定死信交換機 //channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName); // 創建消息交換機 channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false); //創建消息隊列,并指定死信隊列,和設置這個隊列的消息過期時間為10s channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> { { "x-dead-letter-exchange",dlxexChange}, //設置當前隊列的DLX(死信交換機) { "x-dead-letter-routing-key",dlxQueueName}, //設置DLX的路由key,DLX會根據該值去找到死信消息存放的隊列 { "x-message-ttl",10000} //設置隊列的消息過期時間 }); //消息隊列綁定消息交換機 channel.QueueBind(queueName, exchange, routingKey: queueName); var properties = channel.CreateBasicProperties(); properties.Persistent = true; //properties.Expiration = "5000";發布消息,延時5s //發布消息 channel.BasicPublish(exchange: exchange, routingKey: queueName, basicProperties: properties, body: Encoding.UTF8.GetBytes(message)); Console.WriteLine($"{DateTime.Now},向隊列:{queueName}發送消息:{message}"); } } } public static void Consumer() { //死信交換機 string dlxexChange = "dlx.exchange"; //死信隊列 string dlxQueueName = "dlx.queue"; var connection = new ConnectionFactory() { HostName = "localhost", Port = 5672 }.CreateConnection(); { //創建信道 var channel = connection.CreateModel(); { //創建死信交換機 channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false); //創建死信隊列 channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false); //死信隊列綁定死信交換機 channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName); var consumer = new EventingBasicConsumer(channel); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true); consumer.Received += (model, ea) => { //處理業務 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"{DateTime.Now},隊列{dlxQueueName}消費消息:{message}"); channel.BasicAck(ea.DeliveryTag, false); }; channel.BasicConsume(dlxQueueName, autoAck: false, consumer); } } } }




延時隊列實際應用場景可能比較復雜,比如每條消息的過期時間不一樣,收到的消息的順序有可能會亂掉。這些不做深究,自行百度。
關于死信隊列常見應用場景之一下單,支付,支付超時的各種場景,下面通過一個簡單的例子模擬一下
同樣的新建一個空的webapi項目DeadLetterQueue,
program代碼如下:
public class Program { public static void Main(string[] args) { var builder = WebApplication.CreateBuilder(args); // Add services to the container. builder.Services.AddControllers(); // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen(); builder.Services.AddHostedService<ConsumerService>(); builder.Services.AddHostedService<DeadConsumerService>(); var app = builder.Build(); // Configure the HTTP request pipeline. if (app.Environment.IsDevelopment()) { app.UseSwagger(); app.UseSwaggerUI(); } app.MapGet("/normal/{message}", ([FromRoute] string message) => { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.Port = 5672; using (IConnection connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { var queueName = "rbTest2023010"; //channel.ExchangeDeclare("exchange.dlx", ExchangeType.Direct, true); //channel.QueueDeclare("queue.dlx", true, false, false, null); channel.ExchangeDeclare("exchange.normal", ExchangeType.Fanout, true); channel.QueueDeclare(queueName, true, false, false, new Dictionary<string, object> { { "x-message-ttl" ,10000}, {"x-dead-letter-exchange","exchange.dlx" }, {"x-dead-letter-routing-key","routingkey" } } ); channel.QueueBind(queueName, "exchange.normal", ""); { string sendMessage = string.Format("Message_{0}", message); byte[] buffer = Encoding.UTF8.GetBytes(sendMessage); IBasicProperties basicProperties = channel.CreateBasicProperties(); basicProperties.DeliveryMode = 2; //持久化 1=非持久化 channel.BasicPublish("exchange.normal", queueName, basicProperties, buffer); Console.WriteLine($"{DateTime.Now}消息發送成功:{sendMessage}" ); } } } }); app.UseHttpsRedirection(); app.UseAuthorization(); app.MapControllers(); app.Run(); } }
下單后消費代碼ConsumerService如下
using Microsoft.AspNetCore.Connections; using Microsoft.Extensions.Hosting; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; namespace App { public class ConsumerService : BackgroundService { private readonly IModel channel; private readonly IConnection connection; public ConsumerService() { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.Port = 5672; factory.UserName = "admin"; factory.Password = "admin"; connection = factory.CreateConnection(); channel = connection.CreateModel(); var queueName = "rbTest2023010"; channel.ExchangeDeclare("exchange.normal", ExchangeType.Fanout, true); channel.QueueDeclare(queueName, true, false, false, new Dictionary<string, object> { { "x-message-ttl" ,10000}, {"x-dead-letter-exchange","exchange.dlx" }, {"x-dead-letter-routing-key","routingkey" } }); channel.QueueBind(queueName, "exchange.normal", ""); //輸入1,那如果接收一個消息,但是沒有應答,則客戶端不會收到下一個消息 channel.BasicQos(0, 1, false); //在隊列上定義一個消費者 var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queueName, false, consumer); consumer.Received += (ch, ea) => { byte[] bytes = ea.Body.ToArray(); string str = Encoding.UTF8.GetString(bytes); Console.WriteLine($"{DateTime.Now}來自死信隊列獲取的消息: {str.ToString()}"); //回復確認 if (str.Contains("跳過")) //假設超時不處理,留給后面deadconsumerservice處理 { Console.WriteLine($"{DateTime.Now}來自死信隊列獲取的消息: {str.ToString()},該消息被拒絕"); channel.BasicNack(ea.DeliveryTag, false, false); } else //正常消息處理 { Console.WriteLine($"{DateTime.Now}來自死信隊列獲取的消息: {str.ToString()},該消息被接受"); channel.BasicAck(ea.DeliveryTag, false); } }; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { Console.WriteLine("normal Rabbitmq消費端開始工作!"); while (!stoppingToken.IsCancellationRequested) { await Task.Delay(5000); } } public override void Dispose() { // 在服務結束時關閉連接和通道 channel.Close(); connection.Close(); base.Dispose(); } } }
通過模擬發送的消息加入跳過兩個字會拒收這條消息,這樣就會跳到設置的exchange.dlx交換機隊列去,如果沒有跳過那么這條消息就正常處理掉,消費確認。
超時不處理后我們通過新的消費服務DelayConsumerService來處理這異常的消費,比如回復庫存,訂單狀態改為取消等等
using RabbitMQ.Client.Events; using RabbitMQ.Client; using System.Text; namespace App { public class DelayConsumerService:BackgroundService { private readonly IModel channel; private readonly IConnection connection; public DelayConsumerService() { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.Port = 5672; factory.UserName = "admin"; factory.Password = "admin"; connection = factory.CreateConnection(); channel = connection.CreateModel(); var queueName = "queue.dlx"; channel.ExchangeDeclare("exchange.dlx", ExchangeType.Direct, true); //channel.QueueDeclare("queue.dlx", true, false, false, null); channel.QueueDeclare(queueName, true, false, false, null); channel.QueueBind(queueName, "exchange.dlx", "routingkey"); //可能是新版問題吧,不綁定routingkey消費不了。 //輸入1,那如果接收一個消息,但是沒有應答,則客戶端不會收到下一個消息 channel.BasicQos(0, 1, false); //在隊列上定義一個消費者 var consumer = new EventingBasicConsumer(channel); channel.BasicConsume("queue.dlx", false, consumer); consumer.Received += (ch, ea) => { byte[] bytes = ea.Body.ToArray(); string str = Encoding.UTF8.GetString(bytes); Console.WriteLine($"{DateTime.Now}超時未處理的消息: {str.ToString()}"); //回復確認 { channel.BasicAck(ea.DeliveryTag, false); } }; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { Console.WriteLine("delay Rabbitmq消費端開始工作!"); while (!stoppingToken.IsCancellationRequested) { await Task.Delay(5000); } } public override void Dispose() { // 在服務結束時關閉連接和通道 channel.Close(); connection.Close(); base.Dispose(); } } }
上面第一個例子的循環代碼也需要放到構造函數去,否則rabbitmq會不停的新增消費者。
運行結果:



關于rabbitmq的死信隊列和延時隊列的介紹什么的這里不去貼baidu了,應用demo就這么多了,代碼這里exercisebook/RabbitMQ.Test at main · liuzhixin405/exercisebook (github.com) 。小面分享一個完整一點的例子。
exercisebook/cat.seckill/cat.seckill at main · liuzhixin405/exercisebook (github.com)
感覺自己還是不合適寫這些玩意兒,沒有那么細心和耐心,有這時間真不如寫寫demo。


浙公網安備 33010602011771號