(1)消費消息失敗后,重新加入隊列并優先級處理;
(2)根據消息的內容篩選出優先級高的進行設置,并發送
1. 生產者
using RabbitMQMsgProducer.MessageProducer; using Microsoft.Extensions.Configuration; using System; using System.IO; namespace RabbitMQMsgProducer { class Program { static void Main(string[] args) { try { { // 優先級 // 1. 消費消息失敗后,重新加入隊列并優先級處理 PriorityMsg.Send01(); } { // 優先級 // 2. 根據消息的內容篩選出優先級高的進行設置,并發送 //PriorityMsg.Send02(); } Console.ReadLine(); } catch (Exception ex) { Console.WriteLine(ex.Message); } } } }
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; namespace RabbitMQMsgProducer.MessageProducer { public class PriorityMsg { /// <summary> /// 消費消息失敗后,重新加入隊列并優先級處理 /// </summary> public static void Send01() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.HostName = "localhost"; connectionFactory.UserName = "guest"; connectionFactory.Password = "guest"; string queueName = "PriorityMsgQueue"; string exchangeName = "PriorityMsgExchange"; string routingKeyName = "PriorityKey"; using (IConnection connection = connectionFactory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { // 聲明exchange channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); // 聲明隊列 // x-max-priority 指定隊列的優先級設置,必須的 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>() { {"x-max-priority",10 } }); // 綁定 channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKeyName); IBasicProperties props = channel.CreateBasicProperties(); props.DeliveryMode = 2; int i = 1; while (true) { props.Priority = 1; // 設置優先級 string msg = $"the message is {i}"; channel.BasicPublish(exchange: exchangeName, routingKey: routingKeyName, basicProperties: props, body: Encoding.UTF8.GetBytes(msg)); Console.WriteLine($"{msg} is send."); i++; if (i > 29) { break; } } Console.WriteLine("press [enter] exit."); Console.Read(); } } } /// <summary> /// 根據消息的內容篩選出優先級高的進行設置,并發送 /// </summary> public static void Send02() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.HostName = "localhost"; connectionFactory.UserName = "guest"; connectionFactory.Password = "guest"; string queueName = "PriorityMsgQueue"; string exchangeName = "PriorityMsgExchange"; string routingKeyName = "PriorityKey"; using (IConnection connection = connectionFactory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { // 聲明exchange channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); // 聲明隊列 // x-max-priority 指定隊列的優先級設置,必須的 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>() { {"x-max-priority",10 } }); // 綁定 channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKeyName); string[] msgList = { "頂頭上司 給你一個任務1", "1部門領導 約你溝通問題", "2部門領導 約你溝通問題", "3部門領導 約你溝通問題", "人力 談談漲薪", "頂頭上司 給你一個任務2", }; IBasicProperties props = channel.CreateBasicProperties(); foreach (string msg in msgList) { ////沒有優先級這樣寫 //channel.BasicPublish(exchange: exchangeName, routingKey: routingKeyName, basicProperties: null, body: Encoding.UTF8.GetBytes(msg)); if (msg.StartsWith("頂頭上司")) { props.Priority = 10; } else if (msg.Contains("漲薪")) { props.Priority = 9; } else { props.Priority = 1; } channel.BasicPublish(exchange: exchangeName, routingKey: routingKeyName, basicProperties: props, body: Encoding.UTF8.GetBytes(msg)); Console.WriteLine($"{msg} is send."); } Console.WriteLine("press [enter] exit."); Console.Read(); } } } } }
2. 消費者
using RabbitMQMsgProducer.MessageProducer; using Microsoft.Extensions.Configuration; using System; using System.IO; namespace RabbitMQMsgProducer { class Program { static void Main(string[] args) { try { { // 優先級 // 1. 消費消息失敗后,重新加入隊列并優先級處理 PriorityMsg.Send01(); } { // 優先級 // 2. 根據消息的內容篩選出優先級高的進行設置,并發送 //PriorityMsg.Send02(); } Console.ReadLine(); } catch (Exception ex) { Console.WriteLine(ex.Message); } } } }
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; namespace RabbitMQMsgConsumer001.MessageConsumer { public class PriorityMsg { /// <summary> /// 消費消息失敗后,重新加入隊列并優先級處理 /// </summary> public static void Receive01() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.HostName = "localhost"; connectionFactory.UserName = "guest"; connectionFactory.Password = "guest"; string queueName = "PriorityMsgQueue"; string exchangeName = "PriorityMsgExchange"; string routingKeyName = "PriorityKey"; using (IConnection connection = connectionFactory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { string msg = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"the consumer receive {msg}"); if(msg.Equals("the message is 1")) { // 這里重新刪除消息隊列中消息,并重新設置消息優先級,寫入隊列 // 消息默認隊列優先級是1,重新寫入設置為10。 //否定:告訴Broker,這個消息我沒有正常消費; requeue: true:重新寫入到隊列里去; false:你還是刪除掉; channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false); ///設置消息優先級最高 重新寫入到隊列中去 IBasicProperties props = channel.CreateBasicProperties(); props.Priority = 10; channel.BasicPublish(exchange: exchangeName, routingKey: routingKeyName, basicProperties: props, body: Encoding.UTF8.GetBytes(msg + "double..")); } else { //手動確認 消息正常消費 告訴Broker:你可以把當前這條消息刪除掉了 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } }; Console.WriteLine("the consumer is ready."); //處理消息 //autoAck: false 顯示確認; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); Console.ReadKey(); } } } /// <summary> /// 根據消息的內容篩選出優先級高的進行設置,并發送 /// </summary> public static void Receive02() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.HostName = "localhost"; connectionFactory.UserName = "guest"; connectionFactory.Password = "guest"; string queueName = "PriorityMsgQueue"; string exchangeName = "PriorityMsgExchange"; string routingKeyName = "PriorityKey"; using (IConnection connection = connectionFactory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { string msg = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"the consumer receive {msg}"); //手動確認 消息正常消費 告訴Broker:你可以把當前這條消息刪除掉了 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; Console.WriteLine("the consumer is ready."); //處理消息 //autoAck: false 顯示確認; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); Console.ReadKey(); } } } } }
3. 結果
消費消息失敗后,重新加入隊列并優先級處理;

4. 結果
根據消息的內容篩選出優先級高的進行設置,并發送

浙公網安備 33010602011771號