<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Maomi.MQ 2.0 | 功能強大的 .NET 消息隊列通訊模型框架

      說明

      作者:癡者工良

      文檔地址:https://mmq.whuanle.cn

      倉庫地址:https://github.com/whuanle/Maomi.MQ

      作者博客:

      導(dǎo)讀

      Maomi.MQ 是一個簡化了消息隊列使用方式的通訊框架,目前支持了 RabbitMQ。

      Maomi.MQ.RabbitMQ 是一個用于專為 RabbitMQ 設(shè)計的發(fā)布者和消費者通訊模型,大大簡化了發(fā)布和消息的代碼,并提供一系列簡便和實用的功能,開發(fā)者可以通過框架提供的消費模型實現(xiàn)高性能消費、事件編排,框架還支持發(fā)布者確認(rèn)機制、自定義重試機制、補償機制、死信隊列、延遲隊列、連接通道復(fù)用等一系列的便利功能。開發(fā)者可以把更多的精力放到業(yè)務(wù)邏輯中,通過 Maomi.MQ.RabbitMQ 框架簡化跨進(jìn)程消息通訊模式,使得跨進(jìn)程消息傳遞更加簡單和可靠。

      此外,框架通過 runtime 內(nèi)置的 api 支持了分布式可觀測性,可以通過進(jìn)一步使用 OpenTelemetry 等框架進(jìn)一步收集可觀測性信息,推送到基礎(chǔ)設(shè)施平臺中。

      快速開始

      在本篇教程中,將介紹 Maomi.MQ.RabbitMQ 的使用方法,以便讀者能夠快速了解該框架的使用方式和特點。


      創(chuàng)建一個 Web 項目(可參考 WebDemo 項目),引入 Maomi.MQ.RabbitMQ 包,在 Web 配置中注入服務(wù):

      // using Maomi.MQ;
      // using RabbitMQ.Client;
      
      builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
      {
          options.WorkId = 1;
          options.AppName = "myapp";
          options.Rabbit = (ConnectionFactory options) =>
          {
              options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
              options.Port = 5672;
              options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
          };
      }, [typeof(Program).Assembly]);
      
      var app = builder.Build();
      

      • WorkId: 指定用于生成分布式雪花 id 的節(jié)點 id,默認(rèn)為 0。

        每條消息生成一個唯一的 id,便于追蹤。如果不設(shè)置雪花id,在分布式服務(wù)中,多實例并行工作時,可能會產(chǎn)生相同的 id。

      • AppName:用于標(biāo)識消息的生產(chǎn)者,以及在日志和鏈路追蹤中標(biāo)識消息的生產(chǎn)者或消費者。

      • Rabbit:RabbitMQ 客戶端配置,請參考 ConnectionFactory。

      定義消息模型類,模型類是 MQ 通訊的消息基礎(chǔ),該模型類將會被序列化為二進(jìn)制內(nèi)容傳遞到 RabbitMQ 服務(wù)器中。

      public class TestEvent
      {
          public int Id { get; set; }
      
          public override string ToString()
          {
              return Id.ToString();
          }
      }
      

      定義消費者,消費者需要實現(xiàn) IConsumer<TEvent> 接口,以及使用 [Consumer] 特性注解配置消費者屬性,如下所示,[Consumer("test")] 表示該消費者訂閱的隊列名稱是 test。

      IConsumer<TEvent> 接口有三個方法,ExecuteAsync 方法用于處理消息,FaildAsync 會在 ExecuteAsync 異常時立即執(zhí)行,如果代碼一直異常,最終會調(diào)用 FallbackAsync 方法,Maomi.MQ 框架會根據(jù) ConsumerState 值確定是否將消息放回隊列重新消費,或者做其它處理動作。

      [Consumer("test")]
      public class MyConsumer : IConsumer<TestEvent>
      {
          // 消費
          public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
          {
              Console.WriteLine($"事件 id: {message.Id} {DateTime.Now}");
              await Task.CompletedTask;
          }
      
          // 每次消費失敗時執(zhí)行
          public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message) 
              => Task.CompletedTask;
      
          // 補償
          public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex) 
              => Task.FromResult( ConsumerState.Ack);
      }
      

      Maomi.MQ 還具有多種消費者模式,代碼寫法不一樣,后續(xù)會詳細(xì)講解不同的消費者模式。


      如果要發(fā)布消息,只需要注入 IMessagePublisher 服務(wù)即可。

      [ApiController]
      [Route("[controller]")]
      public class IndexController : ControllerBase
      {
          private readonly IMessagePublisher _messagePublisher;
      
          public IndexController(IMessagePublisher messagePublisher)
          {
              _messagePublisher = messagePublisher;
          }
      
          [HttpGet("publish")]
          public async Task<string> Publisher()
          {
              // 發(fā)布消息
              await _messagePublisher.PublishAsync(exchange: string.Empty, routingKey: "test", message: new TestEvent
              {
                  Id = 123
              });
              return "ok";
          }
      }
      

      啟動 Web 服務(wù),在 swagger 頁面上請求 API 接口,MyConsumer 服務(wù)會立即接收到發(fā)布的消息。

      image-20250206160702304


      如果是控制臺項目,則需要引入 Microsoft.Extensions.Hosting 包,以便讓消費者在后臺訂閱隊列消費消息。

      參考 ConsoleDemo 項目。

      using Maomi.MQ;
      using Microsoft.Extensions.Hosting;
      using Microsoft.Extensions.Logging;
      using RabbitMQ.Client;
      using System.Reflection;
      
      var host = new HostBuilder()
          .ConfigureLogging(options =>
          {
              options.AddConsole();
              options.AddDebug();
          })
          .ConfigureServices(services =>
          {
              services.AddMaomiMQ(options =>
              {
                  options.WorkId = 1;
                  options.AppName = "myapp";
                  options.Rabbit = (ConnectionFactory options) =>
                  {
                      options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
                      options.Port = 5672;
                      options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
                  };
              }, new System.Reflection.Assembly[] { typeof(Program).Assembly });
      
          }).Build();
      
      // 后臺運行
      var task =  host.RunAsync();
      
      Console.ReadLine();
      

      消息發(fā)布者

      消息發(fā)布者用于推送消息到 RabbitMQ 服務(wù)器中,Maomi.MQ 支持多種消息發(fā)布者模式,支持 RabbitMQ 事務(wù)模式等,示例項目請參考 PublisherWeb

      Maomi.MQ 通過 IMessagePublisher 向開發(fā)者提供消息推送服務(wù)。


      在發(fā)布消息之前,需要定義一個事件模型類,用于傳遞消息。

      public class TestEvent
      {
      	public int Id { get; set; }
      
      	public override string ToString()
      	{
      		return Id.ToString();
      	}
      }
      

      然后注入 IMessagePublisher 服務(wù),發(fā)布消息:

      [ApiController]
      [Route("[controller]")]
      public class IndexController : ControllerBase
      {
      	private readonly IMessagePublisher _messagePublisher;
      
      	public IndexController(IMessagePublisher messagePublisher)
      	{
      		_messagePublisher = messagePublisher;
      	}
      
          [HttpGet("publish")]
          public async Task<string> Publisher()
          {
              for (var i = 0; i < 100; i++)
              {
                  await _messagePublisher.PublishAsync(exchange: string.Empty, routingKey: "publish", message: new TestEvent
                  {
                      Id = i
                  });
              }
      
              return "ok";
          }
      }
      

      一般情況下,一個模型類只應(yīng)該被一個消費者所使用,那么通過事件可以找到唯一的消費者,也就是通過事件類型找到消費者的 IConsumerOptions,此時框架可以使用對應(yīng)的配置發(fā)送消息。

      TestMessageEvent 模型只有一個消費者:

      [Consumer("publish", Qos = 1, RetryFaildRequeue = true)]
      public class TestEventConsumer : IConsumer<TestMessageEvent>
      {
      	// ... ...
      }
      

      可以直接發(fā)送事件,不需要填寫交換器(Exchange)和路由鍵(RoutingKey)。

      [HttpGet("publish_message")]
      public async Task<string> PublisherMessage()
      {
      	// 如果在本項目中 TestMessageEvent 只指定了一個消費者,那么通過 TestMessageEvent 自動尋找對應(yīng)的配置
      	for (var i = 0; i < 100; i++)
      	{
      		await _messagePublisher.PublishAsync(model: new TestMessageEvent
      		{
      			Id = i
      		});
      	}
      
      	return "ok";
      }
      

      IMessagePublisher

      IMessagePublisher 是 Maomi.MQ 的基礎(chǔ)消息發(fā)布接口,有以下方法:

      // 消息發(fā)布者.
      public interface IMessagePublisher
      {
          Task PublishAsync<TMessage>(string exchange,    // 交換器名稱.
                                      string routingKey,  // 隊列/路由鍵名稱.
                                      TMessage message,   // 事件對象.
                                      Action<BasicProperties> properties, 
                                      CancellationToken cancellationToken = default)
              where TMessage : class;
      
          Task PublishAsync<TMessage>(string exchange, 
                                      string routingKey, 
                                      TMessage message, 
                                      BasicProperties? properties = default, 
                                      CancellationToken cancellationToken = default);
      
          Task PublishAsync<TMessage>(TMessage message, 
                                      Action<BasicProperties>? properties = null, 
                                      CancellationToken cancellationToken = default)
              where TMessage : class;
      
          Task PublishAsync<TMessage>(TMessage model, 
                                      BasicProperties? properties = default, 
                                      CancellationToken cancellationToken = default);
          
          Task CustomPublishAsync<TMessage>(string exchange, 
                                            string routingKey, 
                                            TMessage message, 
                                            BasicProperties? properties = default, 
                                            CancellationToken cancellationToken = default);
      }
      

      Maomi.MQ 的消息發(fā)布接口就這么幾個,由于直接公開了 BasicProperties ,因此開發(fā)者完全自由配置 RabbitMQ 原生的消息屬性,所以接口比較簡單,開發(fā)者使用接口時可以靈活一些,使用難度也不大。


      BasicProperties 是 RabbitMQ 中的消息基礎(chǔ)屬性對象,直接面向開發(fā)者,可以消息的發(fā)布和消費變得靈活和豐富功能,例如,可以通過 BasicProperties 配置單條消息的過期時間:

      await _messagePublisher.PublishAsync(exchange: string.Empty, routingKey: "publish", message: new TestEvent
      {
      	Id = i
      }, (BasicProperties p) =>
      {
      	p.Expiration = "1000";
      });
      

      Maomi.MQ 通過 DefaultMessagePublisher 類型實現(xiàn)了 IMessagePublisher,DefaultMessagePublisher 默認(rèn)生命周期是 Scoped:

      services.AddScoped<IMessagePublisher, DefaultMessagePublisher>();
      

      開發(fā)者也可以自行實現(xiàn) IMessagePublisher 接口,實現(xiàn)自己的消息發(fā)布模型,具體示例請參考 DefaultMessagePublisher 類型。

      原生通道

      開發(fā)者可以通過 ConnectionPool 服務(wù)獲取原生連接對象,直接在 IConnection 上使用 RabbitMQ 的接口發(fā)布消息:

      private readonly ConnectionPool _connectionPool;
      
      var connectionObject = _connectionPool.Get();
      connectionObject.DefaultChannel.BasicPublishAsync(... ...);
      

      常駐內(nèi)存連接對象

      Maomi.MQ 通過 ConnectionPool 管理 RabbitMQ 連接對象,注入 ConnectionPool 服務(wù)后,通過 .Get() 接口獲取全局默認(rèn)連接實例。


      如果開發(fā)者有自己的需求,也可以通過 .Create() 接口創(chuàng)建新的連接對象。

      using var newConnectionObject = _connectionPool.Create();
      using var newConnection = newConnectionObject.Connection;
      using var newChannel = newConnection.CreateChannelAsync();
      

      請務(wù)必妥善使用連接對象,不要頻繁創(chuàng)建和釋放,也不要忘記了管理生命周期,否則容易導(dǎo)致內(nèi)存泄漏。


      單個 IConnectionn 即可滿足大多數(shù)場景下的使用,吞吐量足夠用了,筆者經(jīng)過了多次長時間的測試,發(fā)現(xiàn)一個 IConnection 即可滿足需求,多個 IConnection 并不會帶來任何優(yōu)勢,因此去掉了舊版本的連接池,現(xiàn)在默認(rèn)全局只會存在一個 IConnection,但是不同的消費者使用 IChannel 來隔離。

      程序只維持一個 IConnection 時,四個發(fā)布者同時發(fā)布消息,每秒速度如下:

      image-20240720220241778


      如果消息內(nèi)容非常大時,單個 IConnection 也足夠應(yīng)付,取決于帶寬。

      每條消息 478 KiB。

      image-20240720220937413

      消息過期

      IMessagePublisher 對外開放了 BasicProperties,開發(fā)者可以自由配置消息屬性。

      例如為消息配置過期時間:

      [HttpGet("publish")]
      public async Task<string> Publisher()
      {
      	for (var i = 0; i < 1; i++)
      	{
      		await _messagePublisher.PublishAsync(exchange: string.Empty, routingKey: "publish", message: new TestEvent
      		{
      			Id = i
      		}, properties =>
      		{
      			properties.Expiration = "6000";
      		});
      	}
      
      	return "ok";
      }
      

      為該消息設(shè)置過期時間后,如果隊列綁定了死信隊列,那么該消息長時間沒有被消費時,會被移動到另一個隊列,請參考 死信隊列。


      還可以通過配置消息屬性實現(xiàn)更多的功能,請參考 IBasicProperties 文檔。

      事務(wù)

      RabbitMQ 原生支持事務(wù)模型,RabbitMQ 的事務(wù)通訊協(xié)議可以參考 https://www.rabbitmq.com/docs/semantics

      據(jù) RabbitMQ 官方文檔顯示,事務(wù)模式會使吞吐量減少 250 倍,這個主要跟事務(wù)機制有關(guān),事務(wù)模式不僅僅要保證消息已經(jīng)推送到 Rabbit broker,還要保證 Rabbit broker 多節(jié)點分區(qū)同步,在 Rabbit broker 掛掉的情況下消息已被完整同步。不過一般可能用不上這么嚴(yán)格的模式,所以也可以使用下一小節(jié)提到的發(fā)送方確認(rèn)機制。


      Maomi.MQ 的事務(wù)接口使用上比較簡單,可以使用擴展方法直接開啟一個 ITransactionPublisher,事務(wù)接口使用上也比較簡潔,示例如下:

      [HttpGet("publish_tran")]
      public async Task<string> Publisher_Tran()
      {
      	using var tranPublisher = _messagePublisher.CreateTransaction();
      	await tranPublisher.TxSelectAsync();
      
      	try
      	{
      		await tranPublisher.PublishAsync(exchange: string.Empty, routingKey: "publish", message: new TestEvent
      		{
      			Id = 666
      		});
      		await Task.Delay(5000);
      		await tranPublisher.TxCommitAsync();
      	}
      	catch
      	{
      		await tranPublisher.TxRollbackAsync();
      		throw;
      	}
      
      	return "ok";
      }
      

      發(fā)送方確認(rèn)模式

      事務(wù)模式可以保證消息會被推送到 RabbitMQ 服務(wù)器中,并在個節(jié)點中已完成同步,但是由于事務(wù)模式會導(dǎo)致吞吐量降低 250 倍,因此 RabbitMQ 引入了一種確認(rèn)機制,這種機制就像滑動窗口,能夠保證消息推送到服務(wù)器中,并且具備高性能的特性,其吞吐量是事務(wù)模式 100 倍,參考資料:

      https://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms

      https://www.rabbitmq.com/docs/confirms


      不過 .NET RabbitMQClient 庫的新版本已經(jīng)去掉了一些 API 接口,改動信息詳細(xì)參考:Issue #1682 、RabbitMQ tutorial - Reliable Publishing with Publisher Confirms


      Maomi.MQ 根據(jù)新版本做了簡化調(diào)整,具體用法是通過創(chuàng)建使用獨立通道的消息發(fā)布者,然后在參數(shù)中指定 IChannel 屬性。

      using var confirmPublisher = _messagePublisher.CreateSingle(
      	new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true));
      
      for (var i = 0; i < 5; i++)
      {
      	await confirmPublisher.PublishAsync(exchange: string.Empty, routingKey: "publish", message: new TestEvent
      	{
      		Id = 666
      	});
      }
      

      事務(wù)模式和確認(rèn)機制模式發(fā)布者是相互隔離的,創(chuàng)建這兩者對象時都會默認(rèn)自動使用新的 IChannel,因此不需要擔(dān)心沖突。


      如果開發(fā)者有自己的需求,也可以 CreateSingle 創(chuàng)建獨立的 IChannel,然后自定義 CreateChannelOptions 屬性,關(guān)于 CreateChannelOptions 的描述,請參考:

      https://rabbitmq.github.io/rabbitmq-dotnet-client/api/RabbitMQ.Client.CreateChannelOptions.html

      廣播模式

      廣播模式是用于將一條消息推送到交換器,然后綁定的多個隊列都可以收到相同的消息,簡單來說該模式是向交換器推送消息,然后交換器將消息轉(zhuǎn)發(fā)到各個綁定的隊列中,這樣一來不同隊列的消費者可以同時收到消息。

      98ae0f8039f4b17a0c14048c82f1e631_post-21430-6555f746c77f1


      RabbitMQ 中,交換器大約有四種模式,在 RabbitMQ 中有常量定義:

      public static class ExchangeType
      {
      	public const string Direct = "direct";
      	public const string Fanout = "fanout";
      	public const string Headers = "headers";
      	public const string Topic = "topic";
      	private static readonly string[] s_all = { Fanout, Direct, Topic, Headers };
      }
      

      但是不同的交換器模式使用上不一樣,下面筆者 fanout 為例,當(dāng)隊列綁定到 fanout 類型的交換器后,Rabbit broker 會忽略 RoutingKey,將消息推送到所有綁定的隊列中。

      所以我們定義兩個消費者,綁定到一個相同的 fanout 類型的交換器:

      [Consumer("fanout_1", BindExchange = "fanouttest", ExchangeType = "fanout")]
      public class FanoutEvent_1_Consumer : IConsumer<FanoutEvent>
      {
          // 消費
          public virtual async Task ExecuteAsync(MessageHeader messageHeader, FanoutEvent message)
          {
              Console.WriteLine($"【fanout_1】,事件 id: {message.Id} {DateTime.Now}");
              await Task.CompletedTask;
          }
          
          // ... ...
      }
      
      [Consumer("fanout_2", BindExchange = "fanouttest", ExchangeType = "fanout")]
      public class FanoutEvent_2_Consumer : IConsumer<FanoutEvent>
      {
          // 消費
          public virtual async Task ExecuteAsync(MessageHeader messageHeader, FanoutEvent message)
          {
              Console.WriteLine($"【fanout_2】,事件 id: {message.Id} {DateTime.Now}");
              await Task.CompletedTask;
          }
          
          // ... ...
      }
      

      image-20250208090419019


      發(fā)布消息時,只需要配置交換器名稱即可,兩個消費者服務(wù)都會同時收到消息:

      [HttpGet("publish_fanout")]
      public async Task<string> Publisher_Fanout()
      {
      	for (var i = 0; i < 5; i++)
      	{
      		await _messagePublisher.PublishAsync(exchange: "fanouttest", routingKey: string.Empty, message: new FanoutEvent
      		{
      			Id = 666
      		});
      	}
      
      	return "ok";
      }
      

      對于 Topic 類型的交換器和隊列,使用方式也是一致的,定義兩個消費者:

      [Consumer("red.yellow.#", BindExchange = "topictest", ExchangeType = "topic")]
      public class TopicEvent_1_Consumer : IConsumer<TopicEvent>
      {
          // 消費
          public virtual async Task ExecuteAsync(MessageHeader messageHeader, TopicEvent message)
          {
              Console.WriteLine($"【red.yellow.#】,事件 id: {message.Id} {DateTime.Now}");
              await Task.CompletedTask;
          }
          
          // ... ...
      }
      
      [Consumer("red.#", BindExchange = "topictest", ExchangeType = "topic")]
      public class TopicEvent_2_Consumer : IConsumer<TopicEvent>
      {
          // 消費
          public virtual async Task ExecuteAsync(MessageHeader messageHeader, TopicEvent message)
          {
              Console.WriteLine($"【red.#】,事件 id: {message.Id} {DateTime.Now}");
              await Task.CompletedTask;
          }
          
          // ... ...
      }
      

      發(fā)布消息:

      [HttpGet("publish_topic")]
      public async Task<string> Publisher_Topic()
      {
      	for (var i = 0; i < 5; i++)
      	{
      		await _messagePublisher.PublishAsync(exchange: "topictest", routingKey: "red.a", message: new TopicEvent
      		{
      			Id = 666
      		});
      		await _messagePublisher.PublishAsync(exchange: "topictest", routingKey: "red.yellow.a", message: new TopicEvent
      		{
      			Id = 666
      		});
      	}
      
      	return "ok";
      }
      

      不可路由消息

      當(dāng)發(fā)布消息時,如果該消息不可路由,即找不對應(yīng)的隊列等情況,那么將會觸發(fā) IBreakdown.BasicReturnAsync 接口,BasicReturnEventArgs 屬性有詳細(xì)的錯誤原因。


      image-20250208092628386

      image-20250206205340364


      對于網(wǎng)絡(luò)故障、RabbitMQ 服務(wù)掛了、沒有對應(yīng)交換器名稱等失敗等情況,則會在當(dāng)前線程上出現(xiàn)異常,并且 TCP 連接會自動重新連接。

      需要注意 RabbitMQ 的機制,推送消息并不是同步發(fā)生的,因此即使推送失敗,也不會在當(dāng)前線程中出現(xiàn)異常,所以不能判斷當(dāng)前消息是否成功推送。

      對于不可路由的消息,Maomi.MQ 只提供了簡單的接口通知,沒有其它處理機制,所以開發(fā)者需要自行處理,社區(qū)中有一款 MQ 通訊框架叫 EasyNetQ,它的默認(rèn)機制是自動創(chuàng)建新的隊列,將當(dāng)前不可路由的隊列推送到新的隊列中,以便持久化保存。


      開發(fā)者可以實現(xiàn)該接口,然后注冊為到容器:

      services.AddScoped<IBreakdown, MyDefaultBreakdown>();
      

      例如將不可路由的消息推送到新的隊列中:

      public class MyDefaultBreakdown : IBreakdown
      {
          private readonly ConnectionPool _connectionPool;
      
          public MyDefaultBreakdown(ConnectionPool connectionPool)
          {
              _connectionPool = connectionPool;
          }
      
          /// <inheritdoc />
          public async Task BasicReturnAsync(object sender, BasicReturnEventArgs @event)
          {
              var connectionObject = _connectionPool.Get();
              await connectionObject.DefaultChannel.BasicPublishAsync<BasicProperties>(
                  @event.Exchange, 
                  @event.RoutingKey + ".faild", 
                  true, 
                  new BasicProperties(@event.BasicProperties), 
                  @event.Body);
          }
      
          /// <inheritdoc />
          public Task NotFoundConsumerAsync(string queue, Type messageType, Type consumerType)
          {
              return Task.CompletedTask;
          }
      }
      

      其實對于這種不可路由消息的情況,不單單只是轉(zhuǎn)發(fā)存儲,要檢查是否誤刪隊列、發(fā)布消息時隊列名稱是否一致等。

      消費者

      Maomi.MQ.RabbitMQ 中,有三種消費模式,分別是消費者模式、事件模式(事件總線模式)、動態(tài)消費者模式,其中動態(tài)消費者模式也支持了多種消費模式。

      下面簡單介紹這三種模式的使用方法,后面會更加詳細(xì)地介紹。


      消費者模式

      消費者服務(wù)需要實現(xiàn) IConsumer<TEvent> 接口,并且配置 [Consumer("queue")] 特性綁定隊列名稱,通過消費者對象來控制消費行為,消費者模式有具有失敗通知和補償能力,使用上也比較簡單。

      在運行時可以修改配置 [ConsumerAttribute]。

      public class TestEvent
      {
          public int Id { get; set; }
      }
      
      [Consumer("PublisherWeb", Qos = 1, RetryFaildRequeue = true)]
      public class MyConsumer : IConsumer<TestEvent>
      {
          private static int _retryCount = 0;
      
          // 消費
          public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
          {
              _retryCount++;
              Console.WriteLine($"執(zhí)行次數(shù):{_retryCount} 事件 id: {message.Id} {DateTime.Now}");
              await Task.CompletedTask;
          }
      
          // 每次消費失敗時執(zhí)行
          public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
              => Task.CompletedTask;
      
          // 補償
          public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
              => Task.FromResult(ConsumerState.Ack);
      }
      

      事件模式

      事件模式是通過事件總線的方式實現(xiàn)的,以事件模型為中心,通過事件來控制消費行為。

      [EventTopic("web2", Qos = 1, RetryFaildRequeue = true)]
      public class TestEvent
      {
      	public string Message { get; set; }
      }
      

      然后使用 [EventOrder] 特性編排事件執(zhí)行順序。

      // 編排事件消費順序
      [EventOrder(0)]
      public class My1EventEventHandler : IEventHandler<TestEvent>
      {
      	public async Task CancelAsync(TestEvent @event, CancellationToken cancellationToken)
      	{
      	}
      
      	public async Task ExecuteAsync(TestEvent @event, CancellationToken cancellationToken)
      	{
      		Console.WriteLine($"{@event.Id},事件 1 已被執(zhí)行");
      	}
      }
      
      [EventOrder(1)]
      public class My2EventEventHandler : IEventHandler<TestEvent>
      {
      	public async Task CancelAsync(TestEvent @event, CancellationToken cancellationToken)
      	{
      	}
      
      	public async Task ExecuteAsync(TestEvent @event, CancellationToken cancellationToken)
      	{
      		Console.WriteLine($"{@event.Id},事件 2 已被執(zhí)行");
      	}
      }
      

      當(dāng)然,事件模式也可以通過創(chuàng)建中間件增加補償功能,通過中間件還可以將所有排序事件放到同一個事務(wù)中,一起成功或失敗,避免事件執(zhí)行時出現(xiàn)程序退出導(dǎo)致的一致性問題。

      public class TestEventMiddleware : IEventMiddleware<TestEvent>
      {
          private readonly BloggingContext _bloggingContext;
      
          public TestEventMiddleware(BloggingContext bloggingContext)
          {
              _bloggingContext = bloggingContext;
          }
      
          public async Task ExecuteAsync(MessageHeader messageHeader, TMessage message, EventHandlerDelegate<TMessage> next)
          {
              using (var transaction = _bloggingContext.Database.BeginTransaction())
              {
                  await next(@event, CancellationToken.None);
                  await transaction.CommitAsync();
              }
          }
      
          public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TMessage? message)
          {
              return Task.CompletedTask;
          }
      
          public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TMessage? message, Exception? ex)
          {
              return Task.FromResult(true);
          }
      }
      

      消費者模式和事件總線模式都可以應(yīng)對大容量的消息,如下圖所示,每個消息接近 500kb,多個隊列并發(fā)拉取消費。

      image-20240720221514504


      如果消息內(nèi)容不大,則可以達(dá)到很高的消費速度。

      image-20240720212715583

      動態(tài)消費者

      動態(tài)消費者可以在運行期間動態(tài)訂閱隊列,并且支持消費者類型、事件總線類型、函數(shù)綁定三種方式

      注入 IDynamicConsumer 即可使用動態(tài)消費者服務(wù)。

      await _dynamicConsumer.ConsumerAsync<MyConsumer, TestEvent>(new ConsumerOptions("myqueue")
      {
      	Qos = 10
      });
      
      // 自動事件模型對應(yīng)消費者
      await _dynamicConsumer.ConsumerAsync<TestEvent>(new ConsumerOptions("myqueue")
      {
      	Qos = 10
      });
      
      // 函數(shù)方式消費
      _dynamicConsumer.ConsumerAsync<TestEvent>(new ConsumerOptions("myqueue")
      {
      	Qos = 10
      }, async (header, message) =>
      {
      	Console.WriteLine($"事件 id: {message.Id} {DateTime.Now}");
      	await Task.CompletedTask;
      });
      

      消費者注冊模式

      Maomi.MQ 提供了 ITypeFilter 接口,開發(fā)者可以使用該接口實現(xiàn)自定義消費者注冊模式。

      Maomi.MQ 內(nèi)置三個 ITypeFilter,分別是:

      • 消費者模式 ConsumerTypeFilter
      • 事件總線模式 EventBusTypeFilter
      • 自定義消費者模式 ConsumerTypeFilter

      框架默認(rèn)注冊 ConsumerTypeFilter、EventBusTypeFilter 兩種模式,開發(fā)者可以自行調(diào)整決定使用哪種模式。

      var consumerTypeFilter = new ConsumerTypeFilter();
      // ...
      builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
      {
          // ... ...
      }, 
      [typeof(Program).Assembly], 	// 要自動掃描的程序集
      [new ConsumerTypeFilter(), new EventBusTypeFilter(), consumerTypeFilter]); 	// 配置要使用的消費者注冊模式
      

      消費者模式

      消費者模式要求服務(wù)實現(xiàn) IConsumer<TEvent> 接口,消費者服務(wù)的注冊方式有三種。

      • 添加 [Connsumer] 特性,程序啟動時自動掃描注入,可以動態(tài)修改 [Connsumer]
      • 不設(shè)置 [Connsumer] ,使用 CustomConsumerTypeFilter 手動設(shè)置消費者服務(wù)和配置。
      • 在運行時使用 IDynamicConsumer 動態(tài)綁定消費者。

      本篇示例可參考 ConsumerWeb 項目。


      IConsumer<TEvent> 接口比較簡單,其定義如下:

      public interface IConsumer<TMessage>
          where TMessage : class
      {
          public Task ExecuteAsync(MessageHeader messageHeader, TMessage message);
      
          public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TMessage message);
      
          public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TMessage? message, Exception? ex);
      }
      
      

      使用消費者模式時,需要先定義一個模型類,用于發(fā)布者和消費者之間傳遞消息,事件模型類只要是類即可,能夠正常序列化和反序列化,沒有其它要求。

      public class TestEvent
      {
      	public int Id { get; set; }
      
      	public override string ToString()
      	{
      		return Id.ToString();
      	}
      }
      

      然后繼承 IConsumer<TEvent> 接口實現(xiàn)消費者功能:

      [Consumer("ConsumerWeb", Qos = 1)]
      public class MyConsumer : IConsumer<TestEvent>
      {
          private readonly ILogger<MyConsumer> _logger;
      
          public MyConsumer(ILogger<MyConsumer> logger)
          {
              _logger = logger;
          }
      
          // 消費
          public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
          {
              Console.WriteLine($"事件 id:{message.Id}");
          }
      
          // 每次失敗時被執(zhí)行
          public async Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
          {
              _logger.LogError(ex, "Consumer exception,event id: {Id},retry count: {retryCount}", message!.Id, retryCount);
          }
      
          // 最后一次失敗時執(zhí)行
          public async Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
          {
              return ConsumerState.Ack;
          }
      }
      

      特性配置的說明請參考 消費者配置 。

      手動注入消費者

      開發(fā)者也可以通過 CustomConsumerTypeFilter 手動注冊消費者服務(wù),只需要手動配置 ConsumerOptions 即可。

      var consumerOptions = new ConsumerOptions("test-queue_2")
      {
      	DeadExchange = "test-dead-exchange_2",
      	DeadRoutingKey = "test-dead-routing-key_2",
      	Expiration = 60000,
      	Qos = 10,
      	RetryFaildRequeue = true,
      	AutoQueueDeclare = AutoQueueDeclare.Enable,
      	BindExchange = "test-bind-exchange_2",
      	ExchangeType = "direct",
      	RoutingKey = "test-routing_2"
      };
      
      // 創(chuàng)建自定義的消費者模式
      var consumerTypeFilter = new CustomConsumerTypeFilter();
      var consumerType = typeof(TestConsumer);
      consumerTypeFilter.AddConsumer(consumerType, consumerOptions);
      

      在注冊 MQ 服務(wù)時,添加自定義消費者模式:

      builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
      {
          // ... ...
      }, 
      [typeof(Program).Assembly], 
      [new ConsumerTypeFilter(), new EventBusTypeFilter(),consumerTypeFilter]);	// 添加自定義消費者模式
      

      動態(tài)消費者

      注入 IDynamicConsumer 即可使用動態(tài)消費者服務(wù),添加的消費者會在后臺自動運行。

      var consumerTag = await _dynamicConsumer.ConsumerAsync<MyConsumer, TestEvent>(new ConsumerOptions("myqueue")
      {
      	Qos = 10
      });
      

      如果需要需求訂閱,可以通過 consumerTag 或隊列名稱進(jìn)行取消。

      await _dynamicConsumer.StopConsumerTagAsync(consumerTag);
      await _dynamicConsumer.StopConsumerAsync(queueName);
      

      消費、重試和補償

      消費者收到服務(wù)器推送的消息時,ExecuteAsync 方法會被自動執(zhí)行。當(dāng) ExecuteAsync 執(zhí)行異常時,FaildAsync 方法會馬上被觸發(fā),開發(fā)者可以利用 FaildAsync 記錄相關(guān)日志信息。

      // 每次失敗時被執(zhí)行
      public async Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
      {
      	_logger.LogError(ex, "Consumer exception,event id: {Id},retry count: {retryCount}", message!.Id, retryCount);
      }
      

      默認(rèn)情況下,框架會最多重試三次,也就是總共最多執(zhí)行四次 ExecuteAsync 方法。

      如果 FaildAsync 方法也出現(xiàn)異常時,不會影響整體流程,框架會等待到達(dá)間隔時間后繼續(xù)重試 ExecuteAsync 方法。

      建議 FaildAsync 使用 try{}cathc{} 套住代碼,不要對外拋出異常,FaildAsync 的邏輯不要包含太多邏輯,并且 FaildAsync 只應(yīng)記錄日志或進(jìn)行告警使用。


      當(dāng) ExecuteAsync 方法執(zhí)行異常時,框架會自動重試,默認(rèn)會重試三次,如果三次都失敗,則會執(zhí)行 FallbackAsync 方法進(jìn)行補償。

      重試間隔時間會逐漸增大,請參考 重試。


      當(dāng)重試三次之后,就會立即啟動補償機制。

      // 最后一次失敗時執(zhí)行
      public async Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
      {
      	return ConsumerState.Ack;
      }
      

      FallbackAsync 方法需要返回 ConsumerState 表示雖然 ExecuteAsync 出現(xiàn)異常,但是 FallbackAsync 補償后已經(jīng)正常,該消息會被正常消費掉。如果返回 false,則說補償失敗,該消息按照消費失敗處理。

      只有 ExecuteAsync 異常時,才會觸發(fā) FaildAsyncFallbackAsync

      消費失敗

      當(dāng) ExecuteAsync 失敗次數(shù)達(dá)到閾值時,則該條消息消費失敗,或者由于序列化等錯誤時直接失敗,最后會觸發(fā) FallbackAsync。


      在 IConsumerOptions 中有三個很重要的配置:

      public class IConsumerOptions : Attribute
      {
          // 消費失敗次數(shù)達(dá)到條件時,是否放回隊列.
          public bool RetryFaildRequeue { get; set; }
      
          /// 綁定死信交換器
          public string? DeadExchange { get; set; }
      
          /// 綁定死信隊列
          public string? DeadRoutingKey { get; set; }
      
      }
      

      FallbackAsync 返回值是 ConsumerState 枚舉,其定義如下:

      /// 接受 RabbitMQ 消息后,通過狀態(tài)枚舉確定進(jìn)行 ACK、NACK 以及放回隊列等.
      public enum ConsumerState
      {
          /// ACK.
          Ack = 1,
      
          /// 立即 NACK,并使用默認(rèn)配置設(shè)置是否將消息放回隊列.
          Nack = 1 << 1,
      
          /// 立即 NACK,并將消息放回隊列.
          NackAndRequeue = 1 << 2,
      
          /// 立即 NACK,消息將會從服務(wù)器隊列中移除.
          NackAndNoRequeue = 1 << 3,
      
          /// 出現(xiàn)異常情況.
          Exception = 1 << 4
      }
      

      消費失敗的情況有多種,下面列出具體邏輯:

      • 如果反序列化異常或者 FallbackAsync 執(zhí)行異常等,會直接觸發(fā) ConsumerState.Exception,最后根據(jù) IConsumerOptions.RetryFaildRequeue 確定是否要將消息放回隊列中,下次重新消費。
      • 如果 FallbackAsync 返回 ConsumerState.ACK,表示雖然消費消息一直失敗,但是依然 ACK 該條消息。
      • 如果 FallbackAsync 返回 ConsumerState.Nack,表示消費失敗,但是是否要返回隊列,由 IConsumerOptions.RetryFaildRequeue 決定。
      • 如果 FallbackAsync 返回 ConsumerState.NackAndRequeue,表示立即消費失敗,并將消息放回隊列。
      • 如果 FallbackAsync 返回 ConsumerState.NackAndNoRequeue,表示立即消費失敗,并且該消息不再放回隊列。

      自動創(chuàng)建隊列

      框架默認(rèn)會自動創(chuàng)建隊列,如果需要關(guān)閉自動創(chuàng)建功能,把 AutoQueueDeclare 設(shè)置為 false 即可。

      builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
      {
      	options.WorkId = 1;
      	options.AppName = "myapp";
      	options.AutoQueueDeclare = false;
      	options.Rabbit = (ConnectionFactory options) =>
      	{
              // ... ...
      	};
      }, [typeof(Program).Assembly]);
      

      當(dāng)然還可以單獨為消費者配置是否自動創(chuàng)建隊列:

      [Consumer("ConsumerWeb_create", AutoQueueDeclare = AutoQueueDeclare.Enable)]
      

      默認(rèn)情況下,關(guān)閉了全局自動創(chuàng)建,則不會自動創(chuàng)建隊列。

      如果關(guān)閉全局自動創(chuàng)建,但是消費者配置了 AutoQueueDeclare = AutoQueueDeclare.Enable,則還是會自動創(chuàng)建隊列。

      如果消費者配置了 AutoQueueDeclare = AutoQueueDeclare.Disable ,則會忽略全局配置,不會創(chuàng)建隊列。

      Qos

      默認(rèn) Qos = 100

      讓程序需要嚴(yán)格根據(jù)順序消費時,可以使用 Qos = 1,框架會嚴(yán)格保證逐條消費,如果程序不需要順序消費,希望可以快速處理所有消息,則可以將 Qos 設(shè)置大一些。由于 Qos 和重試、補償機制組合使用會有多種情況,因此請參考 重試


      Qos 是通過特性來配置的:

      [Consumer("ConsumerWeb", Qos = 1)]
      

      可以通過調(diào)高 Qos 值,讓程序在可以并發(fā)消息,提高并發(fā)量。


      根據(jù)網(wǎng)絡(luò)環(huán)境、服務(wù)器性能和實例數(shù)量等設(shè)置 Qos 值可以有效提高消息處理速度,請參考 Qos.

      延遲隊列

      延遲隊列有兩種,一種設(shè)置消息過期時間,一種是設(shè)置隊列過期時間。

      設(shè)置消息過期時間,那么該消息在一定時間沒有被消費時,會被丟棄或移動到死信隊列中,該配置只對單個消息有效,請參考 消息過期。

      隊列設(shè)置過期后,當(dāng)消息在一定時間內(nèi)沒有被消費時,會被丟棄或移動到死信隊列中,該配置只對所有消息有效?;谶@一點,我們可以實現(xiàn)延遲隊列。


      首先創(chuàng)建消費者,繼承 EmptyConsumer,那么該隊列會在程序啟動時被創(chuàng)建,但是不會創(chuàng)建 IConnection 進(jìn)行消費。然后設(shè)置隊列消息過期時間以及綁定死信隊列,綁定的死信隊列既可以使用消費者模式實現(xiàn),也可以使用事件模式實現(xiàn)。

      [Consumer("consumerWeb_dead", Expiration = 6000, DeadRoutingKey = "consumerWeb_dead_queue")]
      public class EmptyDeadConsumer : EmptyConsumer<DeadEvent>
      {
      }
      
      // ConsumerWeb_dead 消費失敗的消息會被此消費者消費。
      [Consumer("consumerWeb_dead_queue", Qos = 1)]
      public class Dead_QueueConsumer : IConsumer<DeadQueueEvent>
      {
          // 消費
          public Task ExecuteAsync(MessageHeader messageHeader, DeadQueueEvent message)
          {
              Console.WriteLine($"死信隊列,事件 id:{message.Id}");
              return Task.CompletedTask;
          }
      
          // 每次失敗時被執(zhí)行
          public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, DeadQueueEvent message) => Task.CompletedTask;
      
          // 最后一次失敗時執(zhí)行
          public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, DeadQueueEvent? message, Exception? ex)
              => Task.FromResult(ConsumerState.Ack);
      }
      

      空消費者

      當(dāng)識別到空消費者時,框架只會創(chuàng)建隊列,而不會啟動消費者消費消息。

      可以結(jié)合延遲隊列一起使用,該隊列不會有任何消費者,當(dāng)該隊列的消息過期時,都由死信隊列直接消費,示例如下:

      [Consumer("ConsumerWeb_empty", Expiration = 6000, DeadQueue = "ConsumerWeb_empty_dead")]
      public class MyEmptyConsumer : EmptyConsumer<TestEvent> { }
      
      [Consumer("ConsumerWeb_empty_dead", Qos = 10)]
      public class MyDeadConsumer : IConsumer<TestEvent>
      {
          // ... ...
      }
      

      對于跨進(jìn)程的隊列,A 服務(wù)不消費只發(fā)布,B 服務(wù)負(fù)責(zé)消費,A 服務(wù)中可以加一個空消費者,保證 A 服務(wù)啟動時該隊列一定存在,另一方面,消費者服務(wù)不應(yīng)該關(guān)注隊列的定義,也不太應(yīng)該創(chuàng)建隊列。

      廣播模式

      在 RabbitMQ 中,設(shè)置一個 Fanout 或 Topic 交換器之后,多個隊列綁定到該交換器時,每個隊列都會收到一模一樣的消息,在微服務(wù)場景下,比如用戶中心,員工離職后,需要發(fā)布一個消息,所有訂閱了這個消息的系統(tǒng)都要處理員工離職后的相關(guān)數(shù)據(jù)。


      創(chuàng)建兩個消費者隊列,隊列的名稱不能相同,然后綁定到同一個交換器,名稱可以隨意,例如 exchange

      [Consumer("ConsumerWeb_exchange_1", BindExchange = "exchange")]
      public class Exchange_1_Consumer : IConsumer<TestEvent>
      {
          /// ... ...
      }
      
      [Consumer("ConsumerWeb_exchange_2", BindExchange = "exchange")]
      public class Exchange_2_Consumer : IConsumer<TestEvent>
      {
          // ... ... 
      }
      

      發(fā)布者發(fā)布消息時,需要使用廣播發(fā)布者模式發(fā)布,請參考:廣播模式


      當(dāng)然,Maomi.MQ 可以自定義交換器類型和交換器名字。

      基于事件

      Maomi.MQ 內(nèi)部設(shè)計了一個事件總線,可以幫助開發(fā)者實現(xiàn)事件編排、實現(xiàn)本地事務(wù)、正向執(zhí)行和補償。


      Maomi.MQ 沒有設(shè)計本地消息表等分布式事務(wù)保障機制,主要基于以下幾點考慮:

      • Maomi.MQ 是基于消息隊列的通訊模型,不是專門為分布式事務(wù)設(shè)計的,對于分布式事務(wù)沒有什么協(xié)調(diào)能力,要使用到分布式事務(wù)編排,需要使用類似 DTM 、Seata 等類型的分布式事務(wù)管理平臺,分布式事務(wù)需要一個事務(wù)中心協(xié)調(diào)平臺。
      • Maomi.MQ 本身設(shè)計了重試策略和補償策略機制,可以一定程度上解決異常的情況。
      • Maomi.MQ 本身不能保證冪等性、空補償?shù)葐栴},但是也不是什么情況都需要嚴(yán)格保證消費的。
      • 通過事件模式的中間件功能,開發(fā)者也可以很簡單地處理冪等性、空補償、懸掛等問題。

      使用事件模式

      首先定義一個事件類型,該事件綁定一個 topic 或隊列,事件需要使用 [EventTopic] 標(biāo)識,并設(shè)置該事件對于的隊列名稱。

      [EventTopic] 特性擁有與 [Consumer] 相同的特性,可參考 [Consumer] 的使用配置事件,請參考 消費者配置。

      [EventTopic("EventWeb")]
      public class TestEvent
      {
      	public string Message { get; set; }
      
      	public override string ToString()
      	{
      		return Message;
      	}
      }
      

      然后編排事件執(zhí)行器,每個執(zhí)行器都需要繼承 IEventHandler<T> 接口,然后使用 [EventOrder] 特性標(biāo)記執(zhí)行順序。

      [EventOrder(0)]
      public class My1EventEventHandler : IEventHandler<TestEvent>
      {
          public Task CancelAsync(TestEvent message, CancellationToken cancellationToken)
          {
              return Task.CompletedTask;
          }
      
          public Task ExecuteAsync(TestEvent message, CancellationToken cancellationToken)
          {
              Console.WriteLine($"{message.Message},事件 1 已被執(zhí)行");
              return Task.CompletedTask;
          }
      }
      
      [EventOrder(1)]
      public class My2EventEventHandler : IEventHandler<TestEvent>
      {
          public Task CancelAsync(TestEvent message, CancellationToken cancellationToken)
          {
              return Task.CompletedTask;
          }
      
          public Task ExecuteAsync(TestEvent message, CancellationToken cancellationToken)
          {
              Console.WriteLine($"{message.Message},事件 2 已被執(zhí)行");
              return Task.CompletedTask;
          }
      }
      

      每個事件執(zhí)行器都必須實現(xiàn) IEventHandler<T> 接口,并且設(shè)置 [EventOrder] 特性以便確認(rèn)事件的執(zhí)行順序,框架會按順序執(zhí)行 IEventHandler<T>ExecuteAsync 方法,當(dāng) ExecuteAsync 出現(xiàn)異常時,則反向按順序調(diào)用 CancelAsync。


      由于程序可能隨時掛掉,因此通過 CancelAsync 實現(xiàn)補償是不太可能的,CancelAsync 主要作為記錄相關(guān)信息而使用。

      中間件

      中間件的作用是便于開發(fā)者攔截事件、記錄信息、實現(xiàn)本地事務(wù)等,如果開發(fā)者不配置,則框架會自動創(chuàng)建 DefaultEventMiddleware<TEvent> 類型作為該事件的中間件服務(wù)。


      自定義事件中間件示例代碼:

      public class TestEventMiddleware : IEventMiddleware<TestEvent>
      {
          public async Task ExecuteAsync(MessageHeader messageHeader,TestEvent message, EventHandlerDelegate<TestEvent> next)
          {
              await next(messageHeader, message, CancellationToken.None);
          }
          public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent? message) => Task.CompletedTask;
          public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex) => Task.FromResult(ConsumerState.Ack);
      }
      

      next 委托是框架構(gòu)建的事件執(zhí)行鏈路,在中間件中可以攔截事件、決定是否執(zhí)行事件鏈路。


      在中間件中調(diào)用 next() 委托時,框架開始按順序執(zhí)行事件,即前面提到的 My1EventEventHandler、My2EventEventHandler。


      當(dāng)一個事件有多個執(zhí)行器時,由于程序可能會在任何時刻掛掉,因此本地事務(wù)必不可少。


      例如,在中間件中注入數(shù)據(jù)庫上下文,然后啟動事務(wù)執(zhí)行數(shù)據(jù)庫操作,當(dāng)其中一個 EventHandler 執(zhí)行失敗時,執(zhí)行鏈路會回滾,同時不會提交事務(wù)。

      可以參考 消費者模式 實現(xiàn)中間件的重試和補償方法。


      示例如下:

      public class TestEventMiddleware : IEventMiddleware<TestEvent>
      {
          private readonly BloggingContext _bloggingContext;
      
          public TestEventMiddleware(BloggingContext bloggingContext)
          {
              _bloggingContext = bloggingContext;
          }
      
          public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message, EventHandlerDelegate<TestEvent> next)
          {
              using (var transaction = _bloggingContext.Database.BeginTransaction())
              {
                  await next(messageHeader, message, CancellationToken.None);
                  await transaction.CommitAsync();
              }
          }
      
          public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent? message)
          {
              return Task.CompletedTask;
          }
      
          public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
          {
              return Task.FromResult(ConsumerState.Ack);
          }
      }
      

      [EventOrder(0)]
      public class My1EventEventHandler : IEventHandler<TestEvent>
      {
          private readonly BloggingContext _bloggingContext;
      
          public My1EventEventHandler(BloggingContext bloggingContext)
          {
              _bloggingContext = bloggingContext;
          }
      
          public async Task CancelAsync(TestEvent message, CancellationToken cancellationToken)
          {
              Console.WriteLine($"{message.Message} 被補償,[1]");
          }
      
          public async Task ExecuteAsync(TestEvent message, CancellationToken cancellationToken)
          {
              await _bloggingContext.Posts.AddAsync(new Post
              {
                  Title = "魯濱遜漂流記",
                  Content = "隨便寫寫就對了"
              });
              await _bloggingContext.SaveChangesAsync();
          }
      }
      
      [EventOrder(1)]
      public class My2EventEventHandler : IEventHandler<TestEvent>
      {
          private readonly BloggingContext _bloggingContext;
      
          public My2EventEventHandler(BloggingContext bloggingContext)
          {
              _bloggingContext = bloggingContext;
          }
          public async Task CancelAsync(TestEvent message, CancellationToken cancellationToken)
          {
              Console.WriteLine($"{message.Id} 被補償,[2]");
          }
      
          public async Task ExecuteAsync(TestEvent message, CancellationToken cancellationToken)
          {
              await _bloggingContext.Posts.AddAsync(new Post
              {
                  Title = "紅樓夢",
                  Content = "賈寶玉初試云雨情"
              });
              await _bloggingContext.SaveChangesAsync();
      
              throw new OperationCanceledException("故意報錯");
          }
      }
      

      image-20240525155639461


      事件執(zhí)行時,如果出現(xiàn)異常,也是會被重試的,中間件 TestEventMiddleware 的 FaildAsync、FallbackAsync 會被依次執(zhí)行。

      你可以參考 消費者模式 或者 重試

      冪等性、空補償、懸掛

      在微服務(wù)中,一個服務(wù)可能會在任何一個時間掛掉重啟,由此會出現(xiàn)冪等性、空補償、懸掛等問題。


      冪等性

      比如,A 消費者消費消息 01 時,將結(jié)果寫入數(shù)據(jù)庫,然后 Maomi.MQ 還沒有向 RabbitMQ 推送 ack 時,程序就重啟了。程序重啟后,由于 01 還沒有被 ack,因此程序會重復(fù)消費該條消息,如果此時繼續(xù)寫入數(shù)據(jù)庫,就會導(dǎo)致重復(fù)。因此,開發(fā)者需要保證即使重復(fù)消費了該消息,也不會導(dǎo)致數(shù)據(jù)庫的數(shù)據(jù)不一致或重復(fù)操作。

      當(dāng)然,并不是所有情況都不能重復(fù)消費,我們這里只圍繞那些只能消費一次的情況,例如插入訂單信息到數(shù)據(jù)庫。

      這就要求每個消息都有一個特定的業(yè)務(wù) id 或分布式雪花 id,在消費時,需要判斷數(shù)據(jù)庫是否已經(jīng)存在該 id,這樣可以判斷程序是否重復(fù)消費。

      例如:

      public class TestEventMiddleware : IEventMiddleware<TestEvent>
      {
          private readonly BloggingContext _bloggingContext;
      
          public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message, EventHandlerDelegate<TestEvent> next)
          {
              var existId = await _bloggingContext.Posts.AnyAsync(x=>x.PostId == @event.Id);
              if (existId)
              {
                  return;
              }
      
              using (var transaction = _bloggingContext.Database.BeginTransaction())
              {
                  await next(@event, CancellationToken.None);
                  await transaction.CommitAsync();
              }
          }
      }
      

      空補償

      在分布式事務(wù)中,當(dāng)編排 A => B => C 三個服務(wù)的接口時,如果 C 出現(xiàn)了異常,則分布式事務(wù)管理器會先調(diào)用 C 的補償接口,然后調(diào)用 B、A。

      這里每次調(diào)用都是通過接口調(diào)用,因此無法在一個數(shù)據(jù)庫事務(wù)中處理。


      這里兩種情況。

      一種是,C 已經(jīng)完成了插入數(shù)據(jù)庫的操作,給用戶的余額+100 ,但是接著程序重啟了或者超時了等各種情況,導(dǎo)致事務(wù)管理器認(rèn)為失敗了,需要調(diào)用補償接口。此時補償接口撤銷之前修改的數(shù)據(jù)。這里沒問題。

      第二種情況,C 數(shù)據(jù)庫還沒有完成數(shù)據(jù)庫操作就異常了,此時事務(wù)管理器調(diào)用了補償接口,如果補償接口給用戶余額 -100 元,那就不對了。

      因此,服務(wù)必須保證之前的操作到底有沒有成功,如果有,則開始撤銷流程,如果沒有,那就立即返回補償成功的結(jié)果。


      一般情況下 Maomi.MQ 不會出現(xiàn)空補償問題,因為 Maomi.MQ 壓根不是分布式事務(wù)框架,哈哈哈。

      Maomi.MQ 雖然提供了 CancelAsync() 方法用于執(zhí)行撤銷流程,但是這個主要是用于給開發(fā)者記錄日志等,不是用于執(zhí)行補償?shù)摹6沂录幣诺乃辛鞒潭荚诒镜?,完全不會涉及分布式事?wù)的空補償問題,因此只需要保證本地數(shù)據(jù)庫事務(wù)即可,即保證冪等性即可。


      懸掛

      在分布式事務(wù)中,會有一個正向執(zhí)行請求和一個撤銷請求,如果執(zhí)行失敗,就會調(diào)用撤銷接口。但是由于分布式網(wǎng)絡(luò)的復(fù)雜性,事務(wù)管理器并不能很確定 C 服務(wù)的情況,C 服務(wù)相對于一個小黑盒,當(dāng)請求失敗時,事務(wù)管理器就會調(diào)用補償接口。補償接口被調(diào)用之后,由于各種原因,正向執(zhí)行接口被調(diào)用了,可能是因為網(wǎng)關(guān)的自動重試,也可能由于服務(wù)太卡了,結(jié)果補償接口先進(jìn)入代碼,然后正向執(zhí)行接口才進(jìn)入代碼。此時,這個分布式事務(wù)是失敗的,事務(wù)管理器已經(jīng)調(diào)用了補償流程,那么這個事務(wù)已經(jīng)結(jié)束了,但是由于 C 在后面執(zhí)行了一次正向接口,用戶余額 +100,就會導(dǎo)致看起來都正常,實際上不正常。這就是懸掛。


      由于 Maomi.MQ 不涉及多服務(wù)事務(wù)編排,因此只需要關(guān)心冪等性即可,不需要關(guān)心空補償和懸掛問題,而冪等性是否需要保證,則需要開發(fā)者依據(jù)業(yè)務(wù)來定,因此 Maomi.MQ 沒有設(shè)計本地消息表的分布式事務(wù)工作模式。


      事件模式下的配置與消費者模式一致,因此這里不再贅述,可以參考 消費者模式.

      自定義消費者和動態(tài)訂閱

      主要實現(xiàn)了兩部分的功能。

      • 在程序啟動時,可以自定義消費者配置和消費者模型,不需要使用特性注解配置。
      • 在程序啟動后,可以隨時啟動一個消費者或者停止一個消費者。

      參考示例項目:https://github.com/whuanle/Maomi.MQ/tree/main/example/consumer/DynamicConsumerWeb


      自定義消費者

      消費者可以不使用特性注解,只需要實現(xiàn) IConsumer<TEvent> 即可,掃描程序集時會忽略掉沒有添加特性注解的消費者。

      定義消費者模型:

      public class DynamicCustomConsumer : IConsumer<TestEvent>
      {
          public Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
          {
              throw new NotImplementedException();
          }
      
          public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
          {
              throw new NotImplementedException();
          }
      
          public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
          {
              throw new NotImplementedException();
          }
      }
      

      然后通過 DynamicConsumerTypeFilter 手動配置消費者和屬性。

      DynamicConsumerTypeFilter dynamicConsumerTypeFilter = new();
      
      dynamicConsumerTypeFilter.AddConsumer(typeof(DynamicCustomConsumer), new ConsumerOptions
      {
      	Queue = "test1"
      });
      dynamicConsumerTypeFilter.AddConsumer(typeof(DynamicCustomConsumer), new ConsumerOptions
      {
      	Queue = "test2"
      });
      

      然后注入服務(wù)時,手動添加類型過濾器。


      builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
      {
      	options.WorkId = 1;
      	options.AutoQueueDeclare = true;
      	options.AppName = "myapp";
      	options.Rabbit = (ConnectionFactory options) =>
      	{
              // ... ...
      	};
      }, [typeof(Program).Assembly], [
          new ConsumerTypeFilter(),  // 消費者類型過濾器
          new EventBusTypeFilter(),  // 事件總線類型過濾器
          dynamicConsumerTypeFilter  // 動態(tài)消費者過濾器
      ]);
      

      動態(tài)訂閱

      在程序啟動后,通過 IDynamicConsumer 服務(wù)可以動態(tài)啟動或停止一個消費者。對于在程序啟動時就已經(jīng)運行的消費者,不會受到動態(tài)訂閱控制,不能在程序運行時停止。


      動態(tài)啟動消費者:

      private readonly IMessagePublisher _messagePublisher;
      private readonly IDynamicConsumer _dynamicConsumer;
      
      [HttpPost("create")]
      public async Task<string> CreateConsumer([FromBody] ConsumerDto consumer)
      {
      	foreach (var item in consumer.Queues)
      	{
      		await _dynamicConsumer.ConsumerAsync<MyConsumer, TestEvent>(new ConsumerOptions(item));
      	}
      
      	return "ok";
      }
      

      如果不想定義模型類,也可以直接使用函數(shù)方式:

      foreach (var item in consumer.Queues)
      {
      	var consumerTag = await _dynamicConsumer.ConsumerAsync<TestEvent>(
      		consumerOptions: new ConsumerOptions(item),
      		execute: async (header, message) =>
      		{
      			await Task.CompletedTask;
      		},
      		faild: async (header, ex, retryCount, message) => { },
      		fallback: async (header, message, ex) => ConsumerState.Ack
      		);
      }
      
      return "ok";
      

      使用隊列名稱可以動態(tài)停止消費者:

      [HttpPost("stop")]
      public async Task<string> StopConsumer([FromBody] ConsumerDto consumer)
      {
      	foreach (string queueName in consumer.Queues)
      	{
      		await _dynamicConsumer.StopConsumerAsync(queueName);
      	}
      
      	return "ok";
      }
      

      也可以使用消費者標(biāo)識:

      var consumerTag = await _dynamicConsumer.ConsumerAsync<MyConsumer, TestEvent>(new ConsumerOptions(item));
      await _dynamicConsumer.StopConsumerTagAsync(consumerTag);
      

      配置

      在引入 Maomi.MQ 框架時,可以配置相關(guān)屬性,示例和說明如下:


      // this.
      builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
      {
          // 必填,當(dāng)前程序節(jié)點,用于配置分布式雪花 id,
          // 配置 WorkId 可以避免高并發(fā)情況下同一個消息的 id 重復(fù)。
      	options.WorkId = 1;
          
          // 是否自動創(chuàng)建隊列
      	options.AutoQueueDeclare = true;
          
          // 當(dāng)前應(yīng)用名稱,用于標(biāo)識消息的發(fā)布者和消費者程序
      	options.AppName = "myapp";
          
          // 必填,RabbitMQ 配置
      	options.Rabbit = (ConnectionFactory options) =>
      	{
              options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
              options.Port = 5672;
      		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
      	};
      }, [typeof(Program).Assembly]);  // 要被掃描的程序集
      

      開發(fā)者可以通過 ConnectionFactory 手動管理 RabbitMQ 連接,例如故障恢復(fù)、自定義連接參數(shù)等。

      類型過濾器

      類型過濾器的接口是 ITypeFilter,作用是掃描識別類型,并將其添加為消費者,默認(rèn)啟用 ConsumerTypeFilter、EventBusTypeFilter 兩個類型過濾器,它們會識別并使用消費者模型和事件總線消費者模式,這兩種模型都要求配置對于的特性注解。


      此外還有一個動態(tài)消費者過濾器 DynamicConsumerTypeFilter,可以自定義消費者模型和配置。


      如果開發(fā)者需要自定義消費者模型或者接入內(nèi)存事件總線例如 MediatR ,只需要實現(xiàn) ITypeFilter 即可。

      攔截器

      Maomi.MQ 默認(rèn)啟用消費者模式和事件總線模式,開發(fā)者可以自由配置是否啟用。

      builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
      {
      	options.WorkId = 1;
      	options.AutoQueueDeclare = true;
      	options.AppName = "myapp";
      	options.Rabbit = (ConnectionFactory options) =>
      	{
              // ... ...
      	};
      },
      [typeof(Program).Assembly], 
      [new ConsumerTypeFilter(), new EventBusTypeFilter()]); // 注入消費者模式和事件總線模式
      

      另外框架還提供了動態(tài)配置攔截,可以實現(xiàn)在程序啟動時修改消費者特性的配置。

      builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
      {
      	options.WorkId = 1;
      	options.AutoQueueDeclare = true;
      	options.AppName = "myapp";
      	options.Rabbit = (ConnectionFactory options) =>
      	{
              // ... ...
      	};
      },
      [typeof(Program).Assembly],
      [new ConsumerTypeFilter(ConsumerInterceptor), new EventBusTypeFilter(EventInterceptor)]);
      

      實現(xiàn)攔截器函數(shù):

      private static RegisterQueue ConsumerInterceptor(IConsumerOptions consumerOptions, Type consumerType)
      {
      	var newConsumerOptions = new ConsumerOptions(consumerOptions.Queue);
      	consumerOptions.CopyFrom(newConsumerOptions);
      
      	// 修改 newConsumerOptions 中的配置
      
      	return new RegisterQueue(true, consumerOptions);
      }
      
      private static RegisterQueue EventInterceptor(IConsumerOptions consumerOptions, Type eventType)
      {
      	if (eventType == typeof(TestEvent))
      	{
      		var newConsumerOptions = new ConsumerOptions(consumerOptions.Queue);
      		consumerOptions.CopyFrom(newConsumerOptions);
      		newConsumerOptions.Queue = newConsumerOptions.Queue + "_1";
      
      		return new RegisterQueue(true, newConsumerOptions);
      	}
      	return new RegisterQueue(true, consumerOptions);
      }
      

      開發(fā)者可以在攔截器中修改配置值。

      攔截器有返回值,當(dāng)返回 false 時,框架會忽略注冊該消費者或事件,也就是該隊列不會啟動消費者。

      消費者配置

      Maomi.MQ 中對于消費者的邏輯處理,是通過 IConsumerOptions 接口的屬性來流轉(zhuǎn)的,無論是自定義消費者還是事件總線等消費模式,本身都是向框架注冊 IConsumerOptions 。

      其配置說明如下:

      名稱 類型 必填 默認(rèn)值 說明
      Queue string 必填 隊列名稱
      DeadExchange string? 可選 綁定死信交換器名稱
      DeadRoutingKey string? 可選 綁定死信路由鍵
      Expiration int 可選 隊列消息過期時間,單位毫秒
      Qos ushort 可選 100 每次拉取消息時可以拉取的消息的數(shù)量,有助于提高消費能力
      RetryFaildRequeue bool 可選 false 消費失敗次數(shù)達(dá)到條件時,是否放回隊列
      AutoQueueDeclare AutoQueueDeclare 可選 None 是否自動創(chuàng)建隊列
      BindExchange string? 可選 綁定交換器名稱
      ExchangeType string? 可選 BindExchange 的交換器類型
      RoutingKey string? 可選 BindExchange 的路由鍵名稱

      前面提到,框架會掃描消費者和事件總線的消費者特性,然后生成 IConsumerOptions 綁定該消費者,可以通過攔截函數(shù)的方式修改配置屬性。

      new ConsumerTypeFilter((consumerOptions, type) =>
      {
      	var newConsumerOptions = new ConsumerOptions(consumerOptions.Queue);
      	consumerOptions.CopyFrom(newConsumerOptions);
      
      	newConsumerOptions.Queue = "app1_" + newConsumerOptions.Queue;
      
      	return new RegisterQueue(true, consumerOptions);
      });
      

      此外,還有一個 IRoutingProvider 接口可以動態(tài)映射新的配置,在程序啟動后,Maomi.MQ 會自動創(chuàng)建交換器、隊列,會調(diào)用 IRoutingProvider 映射新的配置,在發(fā)布消息時,如果使用模型類發(fā)布,也會通過 IRoutingProvider 映射配置,所以開發(fā)者可以通過實現(xiàn)此接口動態(tài)修改配置的屬性。

      services.AddSingleton<IRoutingProvider, MyRoutingProvider>();
      

      環(huán)境隔離

      目前還在考慮要不要支持多租戶模式。


      在開發(fā)中,往往需要在本地調(diào)試,本地程序啟動后會連接到開發(fā)服務(wù)器上,一個隊列收到消息時,會向其中一個消費者推送消息。那么我本地調(diào)試時,發(fā)布一個消息后,可能本地程序收不到該消息,而是被開發(fā)環(huán)境中的程序消費掉了。

      這個時候,我們希望可以將本地調(diào)試環(huán)境跟開發(fā)環(huán)境隔離開來,可以使用 RabbitMQ 提供的 VirtualHost 功能。


      首先通過 put 請求 RabbitMQ 創(chuàng)建一個新的 VirtualHost,請參考文檔:https://www.rabbitmq.com/docs/vhosts#using-http-api

      image-20240612193415867


      然后在代碼中配置 VirtualHost 名稱:

      builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
      {
      	options.WorkId = 1;
      	options.AutoQueueDeclare = true;
      	options.AppName = "myapp";
      	options.Rabbit = (ConnectionFactory options) =>
      	{
              options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
              options.Port = 5672;
      #if DEBUG
      		options.VirtualHost = "debug";
      #endif
      		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
      	};
      }, [typeof(Program).Assembly]);
      

      當(dāng)本地調(diào)試時,發(fā)布和接收消息都會跟服務(wù)器環(huán)境隔離。

      雪花 id 配置

      Maomi.MQ.RabbitMQ 使用了 IdGenerator 生成雪花 id,使得每個事件在集群中都有一個唯一 id。

      框架通過 IIdFactory 接口創(chuàng)建雪花 id,你可以通過替換 IIdFactory 接口配置雪花 id 生成規(guī)則。

      services.AddSingleton<IIdFactory>(new DefaultIdFactory((ushort)optionsBuilder.WorkId));
      

      示例:

      public class DefaultIdFactory : IIdFactory
      {
          /// <summary>
          /// Initializes a new instance of the <see cref="DefaultIdFactory"/> class.
          /// </summary>
          /// <param name="workId"></param>
          public DefaultIdFactory(ushort workId)
          {
              var options = new IdGeneratorOptions(workId) { SeqBitLength = 10 };
              YitIdHelper.SetIdGenerator(options);
          }
      
          /// <inheritdoc />
          public long NextId() => YitIdHelper.NextId();
      }
      

      IdGenerator 框架生成雪花 id 配置請參考:

      https://github.com/yitter/IdGenerator/tree/master/C%23

      調(diào)試

      Maomi.MQ 框架在 nuget.org 中有符號包,需要調(diào)試 Maomi.MQ 框架時會非常方便。


      image-20240622110409621

      image-20240622110718661


      第一次使用時建議加載所有模塊,并啟動程序。

      image-20240622112130250


      后面可以手動選擇只加載那些模塊。

      image-20240622110227993


      F12 到要調(diào)試的位置,啟動程序后即可進(jìn)入斷點。

      image-20240622112507607


      如果需要調(diào)試 Maomi.MQ.RabbtiMQ,可以在程序中加一個斷點(不是在 Maomi.MQ 中),然后等待程序啟動到達(dá)這個斷點后,配置符號,點擊加載所有符號。

      然后在 Maomi.MQ.RabbitMQ 中設(shè)置斷點即可進(jìn)入調(diào)試。

      image-20240622112753150

      Qos 并發(fā)和順序

      基于消費者模式和基于事件模式都是通過特性來配置消費屬性,Qos 是其中一個重要的屬性,Qos 默認(rèn)值為 100,Qos 配置指的是一次允許消費者接收多少條未確認(rèn)的消息。


      Qos 場景

      全局所有消費者共用一個 IConnection 對象,每個消費者獨占一個 IChannel。


      對于消費頻率很高但是不能并發(fā)的隊列,請務(wù)必設(shè)置 Qos = 1,這樣 RabbitMQ 會逐個推送消息,在保證順序的情況下,保證消費嚴(yán)格順序。

      [Consumer("web1", Qos = 1)]
      public class MyConsumer : IConsumer<TestEvent>
      {
      }
      

      當(dāng)需要需要提高消費吞吐量,而且不需要順序消費時,可以將 Qos 設(shè)置高一些,RabbitMQ Client 框架會通過預(yù)取等方式提高吞吐量,并且多條消息可以并發(fā)消費。

      并發(fā)和異常處理

      主要根據(jù) Qos 和 RetryFaildRequeue 來處理,RetryFaildRequeue 默認(rèn)是 true。

      Qos = 1 的情況下,結(jié)合 IConsumerOptions.RetryFaildRequeueFallbackAsync ,當(dāng)該消息放回隊列時,下一次還是繼續(xù)消費該條消息。

      Qos > 1 的情況下,由于并發(fā)性,那么消費失敗的消息會被放回隊列中,但是不一定下一次會立即重新消費該條消息。

      Qos 為 1 時,會保證嚴(yán)格順序消費,ExecptionRequeue 、RetryFaildRequeue 會影響失敗的消息是否會被放回隊列,如果放回隊列,下一次消費會繼續(xù)消費之前失敗的消息。如果錯誤(如 bug)得不到解決,則會出現(xiàn)消費、失敗、放回隊列、重新消費這樣的循環(huán)。


      如何設(shè)置 Qos

      注意,在 RabbitMQClient 7.0 版本中,新增了很多東西,其中一個是消費者并發(fā)線程數(shù) ConsumerDispatchConcurrency ,默認(rèn)為 1,如果不修改該配置,會導(dǎo)致消費速度非常低下,每個 IChannel 都可以單獨設(shè)置該屬性,也可以在 ConnectionFactory 設(shè)置默認(rèn)全局屬性。

      services.AddMaomiMQ(options =>
      {
      	options.WorkId = 1;
      	options.AppName = "myapp-consumer";
      	options.Rabbit = (options) =>
      	{
      		options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
      		options.Port = 5672;
      		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
      		options.ConsumerDispatchConcurrency = 100;		// 設(shè)置這里
      	};
      }, new System.Reflection.Assembly[] { typeof(Program).Assembly });
      
      

      Maomi.MQ.RabbitMQ 中的 Qos 指 prefetch_count,取值范圍是 0-65535,為 0 時指不限制,一般默認(rèn)設(shè)置為 100 即可,Qos 設(shè)置再大不一定可以提高消費效率。

      Qos 不等于消費者并發(fā)線程數(shù)量,而是指一次可以接收的未經(jīng)處理的消息數(shù)量,消費者可以一次性拉取 N 條,然后逐個消費。


      根據(jù)官方 Finding bottlenecks with RabbitMQ 3.3 | RabbitMQ 文檔顯示,預(yù)取數(shù)量對象會影響消費者的隊列利用率。

      Prefetch limit預(yù)取限制 Consumer utilisation消費者使用率
      1 14%
      3 25%
      10 46%
      30 70%
      1000 74%

      一般情況下需要開發(fā)者中綜合各類因素去配置 Qos,應(yīng)當(dāng)綜合考慮機器網(wǎng)絡(luò)帶寬、每條消息的大小、發(fā)布消息的頻率、估算程序整體占用的資源、服務(wù)實例等情況。

      當(dāng)程序需要嚴(yán)格順序消費時,可以設(shè)置為 1。

      如果在內(nèi)網(wǎng)連接 RabbitMQ 可以無視網(wǎng)絡(luò)帶寬限制,消息的內(nèi)容十分大、需要極高的并發(fā)量時,可以設(shè)置 Qos = 0。當(dāng) Qos = 0 時,RabbitMQ.Client 會盡可能吃掉機器的性能,請謹(jǐn)慎使用。

      Qos 和消費性能測試

      為了說明不同 Qos 對消費者程序的性能影響,下面設(shè)置不同 Qos 消費 100w 條消息的代碼進(jìn)行測試,在啟動消費者之前, 先向 RabbitMQ 服務(wù)器推送 100w 條數(shù)據(jù)。

      定義事件:

      public class TestEvent
      {
          public int Id { get; set; }
          public string Message { get; set; }
          public int[] Data { get; set; }
      
          public override string ToString()
          {
              return Id.ToString();
          }
      }
      

      QosPublisher 項目的消息發(fā)布者代碼如下,用于向服務(wù)器推送 100w 條消息,每條的消息內(nèi)容約 800 byte,小于 1k。

      [HttpGet("publish")]
      public async Task<string> Publisher()
      {
      	int totalCount = 0;
      	List<Task> tasks = new();
      	var message = string.Join(",", Enumerable.Range(0, 100));
      	var data = Enumerable.Range(0, 100).ToArray();
      	for (var i = 0; i < 100; i++)
      	{
      		var task = Task.Factory.StartNew(async () =>
      		{
      			using var singlePublisher = _messagePublisher.CreateSingle();
      
      			for (int k = 0; k < 10000; k++)
      			{
      				var count = Interlocked.Increment(ref totalCount);
      				await singlePublisher.PublishAsync(exchange: string.Empty, routingKey: "qos", message: new TestEvent
      				{
      					Id = count,
      					Message = message,
      					Data = data
      				});
      			}
      		});
      		tasks.Add(task);
      	}
      
      	await Task.WhenAll(tasks);
      	return "ok";
      }
      

      等待一段時間后,服務(wù)器已經(jīng)有 100w 條消息了。

      image-20240621130733745

      創(chuàng)建消費者項目 QosConsole,人為給消費者增加 50ms 的耗時,運行程序。

      class Program
      {
          static async Task Main()
          {
              var host = new HostBuilder()
                  .ConfigureLogging(options =>
                  {
                      options.AddConsole();
                      options.AddDebug();
                  })
                  .ConfigureServices(services =>
                  {
                      services.AddMaomiMQ(options =>
                      {
                          options.WorkId = 1;
                          options.AppName = "myapp-consumer";
                          options.Rabbit = (options) =>
                          {
                              options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
                              options.Port = 5672;
                              options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
                              options.ConsumerDispatchConcurrency = 1000;
                          };
                      }, new System.Reflection.Assembly[] { typeof(Program).Assembly });
      
                  }).Build();
      
              Console.WriteLine($"start time:{DateTime.Now}");
              await host.RunAsync();
          }
      }
      
      
      [Consumer("qos", Qos = 30)]
      public class QosConsumer : IConsumer<TestEvent>
      {
          private static int Count = 0;
      
          public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
          {
              Interlocked.Increment(ref Count);
              Console.WriteLine($"date time:{DateTime.Now},id:{message.Id}, count:{Count}");
              await Task.Delay(50);
          }
      
          public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
          {
              return Task.CompletedTask;
          }
      
          public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
          {
              return Task.FromResult(ConsumerState.Ack);
          }
      }
      

      為了有直觀的對比,這里也直接使用 RabbitMQ.Client 編寫原生消費者項目 RabbitMQConsole。

      static async Task Main()
      {
      	ConnectionFactory connectionFactory = new ConnectionFactory
      	{
      		HostName = Environment.GetEnvironmentVariable("RABBITMQ")!,
      		Port = 5672,
      		ConsumerDispatchConcurrency = 1000
      	};
      
      	var connection = await connectionFactory.CreateConnectionAsync();
      	var channel = await connection.CreateChannelAsync(new CreateChannelOptions(
      		publisherConfirmationsEnabled: false,
      		publisherConfirmationTrackingEnabled: false,
      		consumerDispatchConcurrency: 1000));
      	var messageSerializer = new DefaultMessageSerializer();
      
      	var consumer = new AsyncEventingBasicConsumer(channel);
      	await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1000, global: true);
      
      	consumer.ReceivedAsync += async (sender, eventArgs) =>
      	{
      		var testEvent = messageSerializer.Deserialize<TestEvent>(eventArgs.Body.Span);
      		Console.WriteLine($"start time:{DateTime.Now} {testEvent.Id}");
      		await Task.Delay(50);
      		await channel.BasicAckAsync(eventArgs.DeliveryTag, false);
      	};
      
      	await channel.BasicConsumeAsync(
      		queue: "qos",
      		autoAck: false,
      		consumer: consumer);
      
      	while (true)
      	{
      		await Task.Delay(10000);
      	}
      }
      

      Maomi.MQ.RabbitMQ 是基于 RabbitMQ.Client 進(jìn)行封裝的,Maomi.MQ.RabbitMQ 消費時需要記錄日志、增加可觀測性信息、構(gòu)建新的依賴注入容器 等,因此耗時和資源消耗肯定會比 RabbitMQ.Client 多一些,因此需要將兩者對比一下。


      以 Release 模式在 VS 中啟動程序,以單進(jìn)程方式,分開啟動 QosConsole、RabbitMQConsole 進(jìn)行測試,并測試不同 Qos 情況下的消費速度。

      穩(wěn)定性測試

      可以參考 可觀測性 搭建監(jiān)控環(huán)境,參考 OpenTelemetryConsole 中的代碼,一個程序中一個有三個消費者,在該程序中發(fā)布消息和消費。


      每秒發(fā)布或消費約 560 條消息,三個小時內(nèi)發(fā)布約 900w 條消息已經(jīng)消費 900w 條消息。

      image-20240629101521224

      image-20240629101645663


      內(nèi)存穩(wěn)定,機器 CPU 性能不高,并且不定期的 GC 等情況都需要消耗 CPU,其波動如下:

      image-20240629101738893

      重試

      重試時間

      當(dāng)消費者 ExecuteAsync 方法異常時,框架會進(jìn)行重試,默認(rèn)會重試三次,按照 2 作為指數(shù)設(shè)置重試時間間隔。

      第一次失敗后,立即重試,然后間隔 2 秒重試,第二次失敗后,間隔 4 秒,接著分別是 8、16 秒。

      Maomi.MQ.RabbitMQ 使用了 Polly 框架做重試策略管理器,默認(rèn)通過 DefaultRetryPolicyFactory 服務(wù)生成重試間隔策略。


      DefaultRetryPolicyFactory 代碼示例如下:

      /// <summary>
      /// Default retry policy.<br />
      /// 默認(rèn)的策略提供器.
      /// </summary>
      public class DefaultRetryPolicyFactory : IRetryPolicyFactory
      {
          protected readonly int RetryCount = 3;
          protected readonly int RetryBaseDelaySeconds = 2;
      
          protected readonly ILogger<DefaultRetryPolicyFactory> _logger;
      
          /// <summary>
          /// Initializes a new instance of the <see cref="DefaultRetryPolicyFactory"/> class.
          /// </summary>
          /// <param name="logger"></param>
          public DefaultRetryPolicyFactory(ILogger<DefaultRetryPolicyFactory> logger)
          {
              _logger = logger;
      
              RetryCount = 3;
              RetryBaseDelaySeconds = 2;
          }
      
          /// <inheritdoc/>
          public virtual Task<AsyncRetryPolicy> CreatePolicy(string queue, string id)
          {
              // Create a retry policy.
              // 創(chuàng)建重試策略.
              var retryPolicy = Policy
                  .Handle<Exception>()
                  .WaitAndRetryAsync(
                      retryCount: RetryCount,
                      sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(RetryBaseDelaySeconds, retryAttempt)),
                      onRetry: async (exception, timeSpan, retryCount, context) =>
                      {
                          _logger.LogDebug("Retry execution event,queue [{Queue}],retry count [{RetryCount}],timespan [{TimeSpan}]", queue, retryCount, timeSpan);
                          await FaildAsync(queue, exception, timeSpan, retryCount, context);
                      });
      
              return Task.FromResult(retryPolicy);
          }
      
          
          public virtual Task FaildAsync(string queue, Exception ex, TimeSpan timeSpan, int retryCount, Context context)
          {
              return Task.CompletedTask;
          }
      }
      

      你可以通過實現(xiàn) IRetryPolicyFactory 接口,替換默認(rèn)的重試策略服務(wù)服務(wù)。

      services.AddSingleton<IRetryPolicyFactory, DefaultRetryPolicyFactory>();
      

      持久化剩余重試次數(shù)

      當(dāng)消費者處理消息失敗時,默認(rèn)消費者會重試 3 次,如果已經(jīng)重試了 2 次,此時程序重啟,那么下一次消費該消息時,最后重試一次。

      需要記憶重試次數(shù),在程序重啟時,能夠按照剩余次數(shù)進(jìn)行重試。


      引入 Maomi.MQ.RedisRetry 包。

      配置示例:

      builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
      {
      	options.WorkId = 1;
      	options.AutoQueueDeclare = true;
      	options.AppName = "myapp";
      	options.Rabbit = (ConnectionFactory options) =>
      	{
              // ... ... 
      	};
      }, [typeof(Program).Assembly]);
      
      builder.Services.AddMaomiMQRedisRetry((s) =>
      {
      	ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.3.248");
      	IDatabase db = redis.GetDatabase();
      	return db;
      });
      

      默認(rèn) key 只會保留 5 分鐘。也就是說,如果五分鐘之后程序才重新消費該消息,那么就會剩余重試次數(shù)就會重置。

      死信隊列

      可以給一個消費者或事件綁定死信隊列,當(dāng)該隊列的消息失敗后并且不會放回隊列時,該消息會被推送到死信隊列中,示例:


      [Consumer("ConsumerWeb_dead", Qos = 1, DeadQueue = "ConsumerWeb_dead_queue", RetryFaildRequeue = false)]
      public class DeadConsumer : IConsumer<DeadEvent>
      {
      	// 消費
      	public Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
      	{
      		Console.WriteLine($"事件 id:{message.Id}");
      		throw new OperationCanceledException();
      	}
      }
      
      // ConsumerWeb_dead 消費失敗的消息會被此消費者消費。
      [Consumer("ConsumerWeb_dead_queue", Qos = 1)]
      public class DeadQueueConsumer : IConsumer<DeadQueueEvent>
      {
      	// 消費
      	public Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
      	{
      		Console.WriteLine($"死信隊列,事件 id:{message.Id}");
      		return Task.CompletedTask;
      	}
      }
      
      

      image-20240601012127169


      如果使用死信隊列,則務(wù)必將 RetryFaildRequeue 設(shè)置為 false,那么消費者會在重試多次失敗后,向 RabbitMQ 發(fā)送 nack 信號,RabbitMQ 就會將該消息轉(zhuǎn)發(fā)到綁定的死信隊列中。

      延遲隊列

      創(chuàng)建一個消費者,繼承 EmptyConsumer,那么該隊列會在程序啟動時被創(chuàng)建,但是不會創(chuàng)建 IConnection 進(jìn)行消費。然后設(shè)置隊列消息過期時間以及綁定死信隊列,綁定的死信隊列既可以使用消費者模式實現(xiàn),也可以使用事件模式實現(xiàn)。


      [Consumer("ConsumerWeb_dead_2", Expiration = 6000, DeadQueue = "ConsumerWeb_dead_queue_2")]
      public class EmptyDeadConsumer : EmptyConsumer<DeadEvent>
      {
      }
      
      // ConsumerWeb_dead 消費失敗的消息會被此消費者消費。
      [Consumer("ConsumerWeb_dead_queue_2", Qos = 1)]
      public class Dead_2_QueueConsumer : IConsumer<DeadQueueEvent>
      {
          // 消費
          public Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
          {
              Console.WriteLine($"事件 id:{message.Id} 已到期");
              return Task.CompletedTask;
          }
      }
      

      例如,用戶下單之后,如果 15 分鐘之內(nèi)沒有付款,那么消息到期時,自動取消訂單。

      可觀測性

      請參考 ActivitySourceApi 、OpenTelemetryConsole 示例。


      部署環(huán)境

      為了快速部署可觀測性平臺,可以使用 OpenTelemetry 官方提供的示例包快速部署相關(guān)的服務(wù),里面包含了 Prometheus、Grafana、Jaeger 等中間件。

      open-telemetry 官方集成項目地址:https://github.com/open-telemetry/opentelemetry-demo


      下載示例倉庫源碼:

      git clone -b 1.12.0 https://github.com/open-telemetry/opentelemetry-demo.git
      

      請注意,不要下載 main 分支,因為有可能帶有 bug。

      可以把版本號設(shè)置為最新的版本。


      由于 docker-compose.yml 示例中會包含大量的 demo 微服務(wù),我們只需要基礎(chǔ)設(shè)施即可因此我們需要打開 docker-compose.yml 文件,將 services 節(jié)點的 Core Demo ServicesDependent Services 只保留 valkey-cart,其它直接刪除?;蛘咧苯狱c擊下載筆者已經(jīng)修改好的版本替換到項目中: docker-compose.yml

      注意,不同版本可能不一樣。

      image-20250208154943481


      執(zhí)行命令部署可觀測性服務(wù):

      docker-compose up -d
      

      image-20240612201100976


      opentelemetry-collector-contrib 用于收集鏈路追蹤的可觀測性信息,有 grpc 和 http 兩種,監(jiān)聽端口如下:

      Port Protocol Endpoint Function
      4317 gRPC n/a Accepts traces in OpenTelemetry OTLP format? (Protobuf).
      4318 HTTP /v1/traces Accepts traces in OpenTelemetry OTLP format? (Protobuf and JSON).

      經(jīng)過容器端口映射后,對外端口可能不是 4317、4318 了。

      1718196602032.png


      引入 Maomi.MQ.Instrumentation 包,以及其它相關(guān) OpenTelemetry 包。

      <PackageReference Include="Maomi.MQ.Instrumentation " Version="1.1.0" />
      <PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.8.1" />
      <PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.8.1" />
      <PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.8.1" />
      <PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.8.1" />
      

      引入命名空間:

      using OpenTelemetry.Logs;
      using OpenTelemetry.Metrics;
      using OpenTelemetry.Resources;
      using OpenTelemetry.Trace;
      using Maomi.MQ;
      using OpenTelemetry.Exporter;
      using RabbitMQ.Client;
      using System.Reflection;
      using OpenTelemetry;
      

      注入鏈路追蹤和監(jiān)控,自動上報到 Opentelemetry。

      builder.Services.AddOpenTelemetry()
      	  .ConfigureResource(resource => resource.AddService(serviceName))
      	  .WithTracing(tracing =>
      	  {
      		  tracing.AddMaomiMQInstrumentation(options =>
      		  {
      			  options.Sources.AddRange(MaomiMQDiagnostic.Sources);
      			  options.RecordException = true;
      		  })
      		  .AddAspNetCoreInstrumentation()
      		  .AddOtlpExporter(options =>
      		  {
      			  options.Endpoint = new Uri(Environment.GetEnvironmentVariable("OTLPEndpoint")! + "/v1/traces");
      			  options.Protocol = OtlpExportProtocol.HttpProtobuf;
      		  });
      	  })
      	  .WithMetrics(metrices =>
      	  {
      		  metrices.AddAspNetCoreInstrumentation()
      		  .AddMaomiMQInstrumentation()
      		  .AddOtlpExporter(options =>
      		  {
      			  options.Endpoint = new Uri(Environment.GetEnvironmentVariable("OTLPEndpoint")! + "/v1/metrics");
      			  options.Protocol = OtlpExportProtocol.HttpProtobuf;
      		  });
      	  });
      
      

      鏈路追蹤

      啟動 ActivitySourceApi 服務(wù)后,進(jìn)行發(fā)布、消費,鏈路追蹤信息會被自動推送到 OpenTelemetry Collector 中,通過 Jaeger 、Skywalking 等組件可以讀取出來。


      打開映射了 16686 端口的 Jaejer ui 面板:

      image-20240612205140595


      由于 publish、consumer 屬于兄弟 trace 而不是同一個 trace,因此需要通過 Tags 查詢相關(guān)聯(lián)的 trace,格式 event.id=xxx

      1718196773292

      3662d0c35aaac72c77046a430988e87

      監(jiān)控

      Maomi.MQ 內(nèi)置了以下指標(biāo):

      名稱 說明
      maomimq_consumer_message_pull_count_total 已拉取的消息條數(shù)
      maomimq_consumer_message_faild_count_total 消費失敗的消息數(shù)量
      maomimq_consumer_message_received_Byte_bucket
      maomimq_consumer_message_received_Byte_count
      maomimq_consumer_message_received_Byte_sum 接收到的消息總字節(jié)數(shù)
      maomimq_publisher_message_count_total 發(fā)送的消息數(shù)量
      maomimq_publisher_message_faild_count_total 發(fā)送失敗的消息數(shù)量
      maomimq_publisher_message_sent_Byte_bucket
      maomimq_publisher_message_sent_Byte_count
      maomimq_publisher_message_sent_Byte_sum 發(fā)送的消息的總字節(jié)數(shù)

      接著,要將數(shù)據(jù)顯示到 Grafana 中。

      下載模板文件: maomi.json

      然后在 Grafana 面板的 Dashboards 中導(dǎo)入文件,可以在面板中查看當(dāng)前所有服務(wù)的消息隊列監(jiān)控。


      image-20240629011543582

      image-20250220212204225


      開源項目代碼引用

      OpenTelemetry.Instrumentation.MaomiMQ 項目的 Includes 代碼來源于 https://github.com/open-telemetry/opentelemetry-dotnet-contrib/tree/main/src/Shared

      posted @ 2025-02-21 08:30  癡者工良  閱讀(1630)  評論(5)    收藏  舉報
      主站蜘蛛池模板: 久久精品av国产一区二区 | 精品黄色av一区二区三区| 亚洲av永久无码精品水牛影视| 日韩av在线不卡一区二区| 成人做受120秒试看试看视频| 国产午夜精品久久精品电影| 看免费的无码区特aa毛片| 国产三级精品三级在线看| 麻豆蜜桃伦理一区二区三区| 国精偷拍一区二区三区| 启东市| 亚洲中文字幕无码爆乳| 精品无码一区二区三区电影| 中文字幕人妻不卡精品| 久久亚洲精品天天综合网| 最近免费中文字幕mv在线视频3| 国偷自产一区二区三区在线视频| 视频一区二区三区四区五区| 亚洲成人av综合一区| 沙雅县| 亚洲v欧美v日韩v国产v| av在线中文字幕不卡电影网| 男女啪啪高清无遮挡免费| 97欧美精品系列一区二区| 国产成人欧美一区二区三区| 99网友自拍视频在线| 午夜免费福利小电影| 国产精品久久久久久无毒不卡 | 久久99国产乱子伦精品免费| 国产欧美亚洲精品第一页在线| 国产精品一码二码三码四码| 欧美喷水抽搐magnet| 国产仑乱无码内谢| 国产日韩入口一区二区| 国产成人精品亚洲日本片| 久久久久无码中| 姐姐6电视剧在线观看| 99精品久久久久久久婷婷| 国产毛片精品一区二区色| 94人妻少妇偷人精品| 久热中文字幕在线精品观|