<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      netcore下RabbitMQ隊列、死信隊列、延時隊列及小應用

      關于安裝rabbitmq這里一筆掠過了。

      下面進入正題:

      1.新建aspnetcorewebapi空項目,NormalQueue,刪除controllers文件夾已經無關的文件,這里為了偷懶不用console控制臺:

      public class Program
          {
              public static void Main(string[] args)
              {
                  var builder = WebApplication.CreateBuilder(args);
      
                  // Add services to the container.
      
                  builder.Services.AddControllers();
                  // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
                  builder.Services.AddEndpointsApiExplorer();
                  builder.Services.AddSwaggerGen();
                  builder.Services.AddHostedService<ConsumerService>();
                  builder.Services.AddHostedService<DeadLetterExchangeConsuerService>();
                  builder.Services.AddHostedService<DelayExchangeConsumerService>();
                  var app = builder.Build();
      
                  // Configure the HTTP request pipeline.
                  if (app.Environment.IsDevelopment())
                  {
                      app.UseSwagger();
                      app.UseSwaggerUI();
                  }
                  app.MapGet("/normal/{message}", ([FromRoute] string message) =>
                  {
                      ConnectionFactory factory = new ConnectionFactory();
                      factory.HostName = "localhost";
                      factory.Port = 5672;
                      using (IConnection connection = factory.CreateConnection())
                      {
                          using (IModel channel = connection.CreateModel())
                          {
                              var queueName = "rbTest202301";
                              channel.QueueDeclare(queueName, true, false, false, null);
      
                              {
                                  string sendMessage = string.Format("Message_{0}", message);
                                  byte[] buffer = Encoding.UTF8.GetBytes(sendMessage);
                                  IBasicProperties basicProperties = channel.CreateBasicProperties();
                                  basicProperties.DeliveryMode = 2; //持久化  1=非持久化
                                  channel.BasicPublish("", queueName, basicProperties, buffer);
                                  Console.WriteLine("消息發送成功:" + sendMessage);
                              }
                          }
                      }
                  });
      
                  app.MapGet("/deadletterexchange/{message}",([FromRoute] string message) =>{
                      DeadLetterExchange.Send(message);
                  });
      
                  app.MapGet("/delayexchange/{message}", ([FromRoute] string message) => {
                      DelayExchange.SendMessage(message);
                  });
                  app.UseHttpsRedirection();
      
                  app.UseAuthorization();
      
      
                  app.MapControllers();
      
                  app.Run();
              }
          }

      大概的介紹一下program文件:

      這里有三個mini控制器,從這里發送對應的消息到rabbitmq

      "/normal/{message}"   普通隊列,
      "/deadletterexchange/{message}" 死信隊列
      "/deadletterexchange/{message}"   延時隊列

       

              builder.Services.AddHostedService<ConsumerService>();
                  builder.Services.AddHostedService<DeadLetterExchangeConsuerService>();
                  builder.Services.AddHostedService<DelayExchangeConsumerService>();

      這里就是消費的服務,注冊成HostedService。

      ConsumerService代碼如下:
       public class ConsumerService : BackgroundService
          {
              protected override async Task ExecuteAsync(CancellationToken stoppingToken)
              {
                  Console.WriteLine("normal Rabbitmq消費端開始工作!");
                  while (!stoppingToken.IsCancellationRequested)
                  {
                      ConnectionFactory factory = new ConnectionFactory();
                      factory.HostName = "localhost";
                      factory.Port = 5672;
                     
                      IConnection connection = factory.CreateConnection();
                      {
                          IModel channel = connection.CreateModel();
                          {
                              var queueName = "rbTest202301";
                              channel.QueueDeclare(queueName, true, false, false, null);
                              //輸入1,那如果接收一個消息,但是沒有應答,則客戶端不會收到下一個消息
                              channel.BasicQos(0, 1, false);
                              //在隊列上定義一個消費者
                              var consumer = new EventingBasicConsumer(channel);
                              channel.BasicConsume(queueName, false, consumer);
                              consumer.Received += (ch, ea) =>
                              {
                                  byte[] bytes = ea.Body.ToArray();
                                  string str = Encoding.UTF8.GetString(bytes);
                                  Console.WriteLine("隊列消息:" + str.ToString());
                                  //回復確認
                                  channel.BasicAck(ea.DeliveryTag, false);
                              };
                          }
                      }
                      await Task.Delay(5000);
                  }
              }
          }

       

      DeadLetterExchangeConsuerService代碼如下:

       public class DeadLetterExchangeConsuerService : BackgroundService
          {
              protected override async Task ExecuteAsync(CancellationToken stoppingToken)
              {
                  Console.WriteLine("RabbitMQ消費端死信隊列開始工作");
                  while (!stoppingToken.IsCancellationRequested)
                  {
                      DeadLetterExchange.Consumer();
                      await Task.Delay(5000);
                  }
              }
          }
        public class DeadLetterExchange
          {
              public static string dlxExchange = "dlx.exchange";
              public static string dlxQueueName = "dlx.queue";
              static string exchange = "direct-exchange";
              static string queueName = "queue_Testdlx";
              static string dlxExchangeKey = "x-dead-letter-exchange";
              static string dlxQueueKey = "x-dead-letter-rounting-key";
              public static void Send(string message)
              {
                  using (var connection = new ConnectionFactory() { HostName = "localhost", Port = 5672 }.CreateConnection())
                  {
                      using(var channel = connection.CreateModel())
                      {
                         
                          channel.ExchangeDeclare(exchange, ExchangeType.Direct, true, false); //創建交換機
                          channel.QueueDeclare(queueName, true, false, false,new Dictionary<string, object>
                          {
                              { dlxExchangeKey,dlxExchange },
                              {dlxQueueKey,dlxQueueName }
                          }); // 創建隊列
                          channel.QueueBind(queueName, exchange, queueName);
                          
      
                          var properties = channel.CreateBasicProperties();
                          properties.Persistent= true;//持久化
                          channel.BasicPublish(exchange,queueName,properties,Encoding.UTF8.GetBytes(message));
                          Console.WriteLine($"向隊列:{queueName}發送消息:{message}");
                      }
                  }
              }
              
              public static void Consumer()
              {
                  var connection = new ConnectionFactory() { HostName = "localhost", Port = 5672 }.CreateConnection();
                  var channel = connection.CreateModel();
                  channel.ExchangeDeclare(dlxExchange, ExchangeType.Direct, true, false); //創建sixin交換機
                  channel.QueueDeclare(dlxQueueName, true, false, false); // 創建sixin隊列
                  channel.QueueBind(dlxQueueName, dlxExchange, dlxQueueName); //綁定sixin隊列sixin交換機
      
                  channel.ExchangeDeclare(exchange, ExchangeType.Direct, true, false); //創建交換機
                  channel.QueueDeclare(queueName, true, false, false, new Dictionary<string, object>
                          {
                              { dlxExchangeKey,dlxExchange },
                              {dlxQueueKey,dlxQueueName }
                          }); // 創建隊列
                  channel.QueueBind(queueName, exchange, queueName);
      
                  var consumer = new EventingBasicConsumer(channel);
                  channel.BasicQos(0, 1, false);
                  consumer.Received += (model, ea) =>
                  {
                      var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                      Console.WriteLine($"隊列{queueName}消費消息:{message},不做ack確認");
                      channel.BasicNack(ea.DeliveryTag, false, requeue: false);
                  };
                  channel.BasicConsume(queueName, autoAck: false, consumer);
              }
          }

       

      DelayExchangeConsumerService代碼如下:

       public class DelayExchangeConsumerService : BackgroundService
          {
              protected override async Task ExecuteAsync(CancellationToken stoppingToken)
              {
                  Console.WriteLine("RabbitMQ消費端延遲隊列開始工作");
                  while (!stoppingToken.IsCancellationRequested)
                  {
                    
                      DelayExchange.Consumer();
                      await Task.Delay(5000);
                  }
              }
          }
       public class DelayExchange
          {
      
              public static void SendMessage(string message)
              {
                  //死信交換機
                  string dlxexChange = "dlx.exchange";
                  //死信隊列
                  string dlxQueueName = "dlx.queue";
      
                  //消息交換機
                  string exchange = "direct-exchange";
                  //消息隊列
                  string queueName = "delay_queue";
      
                  using (var connection = new ConnectionFactory() { HostName = "localhost", Port = 5672 }.CreateConnection())
                  {
                      using (var channel = connection.CreateModel())
                      {
                          ////創建死信交換機
                          //channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                          ////創建死信隊列
                          //channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
                          ////死信隊列綁定死信交換機
                          //channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);
      
                          // 創建消息交換機
                          channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                          //創建消息隊列,并指定死信隊列,和設置這個隊列的消息過期時間為10s
                          channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
                                              new Dictionary<string, object> {
                                                   { "x-dead-letter-exchange",dlxexChange}, //設置當前隊列的DLX(死信交換機)
                                                   { "x-dead-letter-routing-key",dlxQueueName}, //設置DLX的路由key,DLX會根據該值去找到死信消息存放的隊列
                                                   { "x-message-ttl",10000} //設置隊列的消息過期時間
                                              });
                          //消息隊列綁定消息交換機
                          channel.QueueBind(queueName, exchange, routingKey: queueName);
      
                          var properties = channel.CreateBasicProperties();
                          properties.Persistent = true;
                          //properties.Expiration = "5000";發布消息,延時5s
                          //發布消息
                          channel.BasicPublish(exchange: exchange,
                                               routingKey: queueName,
                                               basicProperties: properties,
                                               body: Encoding.UTF8.GetBytes(message));
                          Console.WriteLine($"{DateTime.Now},向隊列:{queueName}發送消息:{message}");
                      }
                  }
              }
      
              public static void Consumer()
              {
                  //死信交換機
                  string dlxexChange = "dlx.exchange";
                  //死信隊列
                  string dlxQueueName = "dlx.queue";
                  var connection = new ConnectionFactory() { HostName = "localhost", Port = 5672 }.CreateConnection();
                  {
                      //創建信道
                      var channel = connection.CreateModel();
                      {
                          //創建死信交換機
                          channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                          //創建死信隊列
                          channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
                          //死信隊列綁定死信交換機
                          channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);
      
                          var consumer = new EventingBasicConsumer(channel);
                          channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
                          consumer.Received += (model, ea) =>
                          {
                              //處理業務
                              var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                              Console.WriteLine($"{DateTime.Now},隊列{dlxQueueName}消費消息:{message}");
                              channel.BasicAck(ea.DeliveryTag, false);
                          };
                          channel.BasicConsume(dlxQueueName, autoAck: false, consumer);
                      }
                  }
              }
          }

       

       

      延時隊列實際應用場景可能比較復雜,比如每條消息的過期時間不一樣,收到的消息的順序有可能會亂掉。這些不做深究,自行百度。

      關于死信隊列常見應用場景之一下單,支付,支付超時的各種場景,下面通過一個簡單的例子模擬一下

      同樣的新建一個空的webapi項目DeadLetterQueue,

      program代碼如下:

      public class Program
          {
              public static void Main(string[] args)
              {
                  var builder = WebApplication.CreateBuilder(args);
      
                  // Add services to the container.
      
                  builder.Services.AddControllers();
                  // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
                  builder.Services.AddEndpointsApiExplorer();
                  builder.Services.AddSwaggerGen();
                  builder.Services.AddHostedService<ConsumerService>();
                  builder.Services.AddHostedService<DeadConsumerService>();
                  var app = builder.Build();
      
                  // Configure the HTTP request pipeline.
                  if (app.Environment.IsDevelopment())
                  {
                      app.UseSwagger();
                      app.UseSwaggerUI();
                  }
                  app.MapGet("/normal/{message}", ([FromRoute] string message) =>
                  {
                      ConnectionFactory factory = new ConnectionFactory();
                      factory.HostName = "localhost";
                      factory.Port = 5672;
                      using (IConnection connection = factory.CreateConnection())
                      {
                          using (IModel channel = connection.CreateModel())
                          {
                              var queueName = "rbTest2023010";
                            
                              //channel.ExchangeDeclare("exchange.dlx", ExchangeType.Direct, true);
                              //channel.QueueDeclare("queue.dlx", true, false, false, null);
                              channel.ExchangeDeclare("exchange.normal", ExchangeType.Fanout, true);
                              channel.QueueDeclare(queueName, true, false, false,
                                  new Dictionary<string, object>
                              {
                                  { "x-message-ttl" ,10000},
                                  {"x-dead-letter-exchange","exchange.dlx" },
                                  {"x-dead-letter-routing-key","routingkey" }
                              }
                                  );
                             
                              channel.QueueBind(queueName, "exchange.normal", "");
                              {
                                  string sendMessage = string.Format("Message_{0}", message);
                                  byte[] buffer = Encoding.UTF8.GetBytes(sendMessage);
                                  IBasicProperties basicProperties = channel.CreateBasicProperties();
                                  basicProperties.DeliveryMode = 2; //持久化  1=非持久化
                                  channel.BasicPublish("exchange.normal", queueName, basicProperties, buffer);
                                  Console.WriteLine($"{DateTime.Now}消息發送成功:{sendMessage}" );
                              }
                          }
                      }
                  });
                  app.UseHttpsRedirection();
      
                  app.UseAuthorization();
      
      
                  app.MapControllers();
      
                  app.Run();
              }
          }

       

      下單后消費代碼ConsumerService如下

      using Microsoft.AspNetCore.Connections;
      using Microsoft.Extensions.Hosting;
      using RabbitMQ.Client;
      using RabbitMQ.Client.Events;
      using System.Text;
      
      namespace App
      {
          public class ConsumerService : BackgroundService
          {
              private readonly IModel channel;
              private readonly IConnection connection;
              public ConsumerService()
              {
                  ConnectionFactory factory = new ConnectionFactory();
                  factory.HostName = "localhost";
                  factory.Port = 5672;
                  factory.UserName = "admin";
                  factory.Password = "admin";
                  connection = factory.CreateConnection();
                  channel = connection.CreateModel();
                  var queueName = "rbTest2023010";
                  channel.ExchangeDeclare("exchange.normal", ExchangeType.Fanout, true);
                  channel.QueueDeclare(queueName, true, false, false, new Dictionary<string, object>
                              {
                                  { "x-message-ttl" ,10000},
                                  {"x-dead-letter-exchange","exchange.dlx" },
                                  {"x-dead-letter-routing-key","routingkey" }
                              });
      
                  channel.QueueBind(queueName, "exchange.normal", "");
                  //輸入1,那如果接收一個消息,但是沒有應答,則客戶端不會收到下一個消息
                  channel.BasicQos(0, 1, false);
                  //在隊列上定義一個消費者
                  var consumer = new EventingBasicConsumer(channel);
                  channel.BasicConsume(queueName, false, consumer);
                  consumer.Received += (ch, ea) =>
                  {
                      byte[] bytes = ea.Body.ToArray();
                      string str = Encoding.UTF8.GetString(bytes);
                      Console.WriteLine($"{DateTime.Now}來自死信隊列獲取的消息: {str.ToString()}");
                      //回復確認
                      if (str.Contains("跳過")) //假設超時不處理,留給后面deadconsumerservice處理
                      {
                          Console.WriteLine($"{DateTime.Now}來自死信隊列獲取的消息: {str.ToString()},該消息被拒絕");
                          channel.BasicNack(ea.DeliveryTag, false, false);
                      }
                      else  //正常消息處理
                      {
                          Console.WriteLine($"{DateTime.Now}來自死信隊列獲取的消息: {str.ToString()},該消息被接受");
                          channel.BasicAck(ea.DeliveryTag, false);
                      }
                  };
      
              }
              protected override async Task ExecuteAsync(CancellationToken stoppingToken)
              {
                  Console.WriteLine("normal Rabbitmq消費端開始工作!");
                  while (!stoppingToken.IsCancellationRequested)
                  {
                      await Task.Delay(5000);
                  }
              }
              public override void Dispose()
              {
                  // 在服務結束時關閉連接和通道
                  channel.Close();
                  connection.Close();
                  base.Dispose();
              }
          }
      }

       

      通過模擬發送的消息加入跳過兩個字會拒收這條消息,這樣就會跳到設置的exchange.dlx交換機隊列去,如果沒有跳過那么這條消息就正常處理掉,消費確認。

      超時不處理后我們通過新的消費服務DelayConsumerService來處理這異常的消費,比如回復庫存,訂單狀態改為取消等等

       

      using RabbitMQ.Client.Events;
      using RabbitMQ.Client;
      using System.Text;
      
      namespace App
      {
          public class DelayConsumerService:BackgroundService
          {
              private readonly IModel channel;
              private readonly IConnection connection;
              public DelayConsumerService()
              {
                  ConnectionFactory factory = new ConnectionFactory();
                  factory.HostName = "localhost";
                  factory.Port = 5672;
                  factory.UserName = "admin";
                  factory.Password = "admin";
                  connection = factory.CreateConnection();
                  channel = connection.CreateModel();
                  var queueName = "queue.dlx";
                  channel.ExchangeDeclare("exchange.dlx", ExchangeType.Direct, true);
                  //channel.QueueDeclare("queue.dlx", true, false, false, null);
      
                  channel.QueueDeclare(queueName, true, false, false, null);
      
                  channel.QueueBind(queueName, "exchange.dlx", "routingkey");  //可能是新版問題吧,不綁定routingkey消費不了。
                  //輸入1,那如果接收一個消息,但是沒有應答,則客戶端不會收到下一個消息
                  channel.BasicQos(0, 1, false);
                  //在隊列上定義一個消費者
                  var consumer = new EventingBasicConsumer(channel);
                  channel.BasicConsume("queue.dlx", false, consumer);
                  consumer.Received += (ch, ea) =>
                  {
                      byte[] bytes = ea.Body.ToArray();
                      string str = Encoding.UTF8.GetString(bytes);
                      Console.WriteLine($"{DateTime.Now}超時未處理的消息: {str.ToString()}");
                      //回復確認
                      {
                          channel.BasicAck(ea.DeliveryTag, false);
                      }
                  };
              }
              protected override async Task ExecuteAsync(CancellationToken stoppingToken)
              {
                  Console.WriteLine("delay Rabbitmq消費端開始工作!");
                  while (!stoppingToken.IsCancellationRequested)
                  {
                      await Task.Delay(5000);
                  }
              }
      
              public override void Dispose()
              {
                  // 在服務結束時關閉連接和通道
                  channel.Close();
                  connection.Close();
                  base.Dispose();
              }
          }
      }

       

      上面第一個例子的循環代碼也需要放到構造函數去,否則rabbitmq會不停的新增消費者。

      運行結果:

      關于rabbitmq的死信隊列和延時隊列的介紹什么的這里不去貼baidu了,應用demo就這么多了,代碼這里exercisebook/RabbitMQ.Test at main · liuzhixin405/exercisebook (github.com) 。小面分享一個完整一點的例子。

      exercisebook/cat.seckill/cat.seckill at main · liuzhixin405/exercisebook (github.com)

      感覺自己還是不合適寫這些玩意兒,沒有那么細心和耐心,有這時間真不如寫寫demo。

      posted @ 2023-01-02 21:22  星仔007  閱讀(948)  評論(8)    收藏  舉報
      主站蜘蛛池模板: 2020无码专区人妻系列日韩| 国产精品揄拍一区二区久久| 亚洲 中文 欧美 日韩 在线| 奇米777四色成人影视| 红桃视频成人传媒| 东京热大乱系列无码| 亚洲精品国产男人的天堂| 国产精品久久久久久无毒不卡| 四虎库影成人在线播放| 中字幕人妻一区二区三区| 高清日韩一区二区三区视频| 99久久久无码国产精品免费 | 少妇人妻偷人免费观看| 久久久亚洲欧洲日产国码二区| 国产精品免费第一区二区| 秋霞鲁丝片成人无码| 黎城县| 亚洲中文字幕精品久久| 日韩中文字幕亚洲精品| 欧洲人妻丰满av无码久久不卡| 乱老年女人伦免费视频| 成人3D动漫一区二区三区| 溆浦县| 越南毛茸茸的少妇| 国产99久久无码精品| 国产成人一区二区三区视频免费| 自拍第一区视频在线观看| 国产精品深夜福利免费观看 | 久热色视频精品在线观看| 又爽又大又黄a级毛片在线视频| 老太脱裤子让老头玩xxxxx| 久久亚洲国产精品五月天| 国产一卡2卡三卡4卡免费网站| 合水县| 亚洲精品美女久久久久9999| 久久久久成人片免费观看蜜芽| 安溪县| 成av人电影在线观看| 99九九视频高清在线| 亚洲蜜臀av乱码久久| 黑巨人与欧美精品一区|