<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      開源一款功能強大的 .NET 消息隊列通訊模型框架 Maomi.MQ

      文檔說明

      作者:癡者工良

      文檔地址:https://mmq.whuanle.cn

      倉庫地址:https://github.com/whuanle/Maomi.MQ

      作者博客:

      導讀

      Maomi.MQ 是一個消息通訊模型項目,目前只支持了 RabbitMQ。

      Maomi.MQ.RabbitMQ 是一個用于專為 RabbitMQ 設計的發布者和消費者通訊模型,大大簡化了發布和消息的代碼,并提供一系列簡便和實用的功能,開發者可以通過框架提供的消費模型實現高性能消費、事件編排,框架還支持發布者確認機制、自定義重試機制、補償機制、死信隊列、延遲隊列、連接通道復用等一系列的便利功能。開發者可以把更多的精力放到業務邏輯中,通過 Maomi.MQ.RabbitMQ 框架簡化跨進程消息通訊模式,使得跨進程消息傳遞更加簡單和可靠。

      此外,框架通過 runtime 內置的 api 支持了分布式可觀測性,可以通過進一步使用 OpenTelemetry 等框架進一步收集可觀測性信息,推送到基礎設施平臺中。

      快速開始

      本文將快速介紹 Maomi.MQ.RabbitMQ 的使用方法。

      引入 Maomi.MQ.RabbitMQ 包,在 Web 配置中注入服務:

      builder.Services.AddSwaggerGen();
      builder.Services.AddLogging();
      
      builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
      {
      	options.WorkId = 1;
      	options.AppName = "myapp";
      	options.Rabbit = (ConnectionFactory options) =>
      	{
      		options.HostName = "192.168.3.248";
      		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
      	};
      }, [typeof(Program).Assembly]);
      
      var app = builder.Build();
      
      • WorkId: 指定用于生成分布式雪花 id 的節點 id,默認為 0。
      • AppName:用于標識消息的生產者,以及在日志和鏈路追蹤中標識消息的生產者或消費者。
      • Rabbit:RabbitMQ 客戶端配置,請參考 ConnectionFactory

      如果是控制臺項目,則需要引入 Microsoft.Extensions.Hosting 包。

      var host = new HostBuilder()
      	.ConfigureLogging(options =>
      	{
      		options.AddConsole();
      		options.AddDebug();
      	})
      	.ConfigureServices(services =>
      	{
      		services.AddMaomiMQ(options =>
      		{
      			options.WorkId = 1;
      			options.AppName = "myapp";
      			options.Rabbit = (ConnectionFactory options) =>
      			{
      				options.HostName = "192.168.3.248";
      				options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
      			};
      		}, new System.Reflection.Assembly[] { typeof(Program).Assembly });
      		
      		// Your services.
      		services.AddHostedService<MyPublishAsync>();
      	}).Build();
      
      await host.RunAsync();
      

      定義消息模型類,該模型類將會被序列化為二進制內容傳遞到 RabbitMQ 服務器中。

      public class TestEvent
      {
          public int Id { get; set; }
      
          public override string ToString()
          {
              return Id.ToString();
          }
      }
      

      定義消費者,消費者需要實現 IConsumer<TEvent> 接口,以及使用 [Consumer] 特性注解配置消費者屬性。

      [Consumer("test", Qos = 1, RetryFaildRequeue = true)]
      public class MyConsumer : IConsumer<TestEvent>
      {
          private static int _retryCount = 0;
      
          // 消費
          public async Task ExecuteAsync(EventBody<TestEvent> message)
          {
              Console.WriteLine($"事件 id: {message.Id} {DateTime.Now}");
              await Task.CompletedTask;
          }
          
          // 每次消費失敗時執行
          public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;
          
          // 補償
          public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true);
      }
      

      然后注入 IMessagePublisher 服務發布消息:

      [ApiController]
      [Route("[controller]")]
      public class IndexController : ControllerBase
      {
          private readonly IMessagePublisher _messagePublisher;
      
          public IndexController(IMessagePublisher messagePublisher)
          {
              _messagePublisher = messagePublisher;
          }
      
          [HttpGet("publish")]
          public async Task<string> Publisher()
          {
              // 發布消息
              await _messagePublisher.PublishAsync(queue: "test", message: new TestEvent
      		{
              	Id = i
              });
              return "ok";
          }
      }
      

      消息發布者

      消息發布者用于推送消息到 RabbitMQ 服務器中。

      通過注入 IMessagePublisher 接口即可向 RabbitMQ 推送消息,示例項目請參考 PublisherWeb

      定義一個事件模型類:

      public class TestEvent
      {
      	public int Id { get; set; }
      
      	public override string ToString()
      	{
      		return Id.ToString();
      	}
      }
      

      注入 IMessagePublisher 服務后發布消息:

      [ApiController]
      [Route("[controller]")]
      public class IndexController : ControllerBase
      {
      	private readonly IMessagePublisher _messagePublisher;
      
      	public IndexController(IMessagePublisher messagePublisher)
      	{
      		_messagePublisher = messagePublisher;
      	}
      
      	[HttpGet("publish")]
      	public async Task<string> Publisher()
      	{
      		for (var i = 0; i < 100; i++)
      		{
      			await _messagePublisher.PublishAsync(queue: "PublisherWeb", message: new TestEvent
      			{
      				Id = i
      			});
      		}
      
      		return "ok";
      	}
      }
      

      IMessagePublisher

      IMessagePublisher 定義比較簡單,只有三個方法和一個屬性:

      public ConnectionPool ConnectionPool { get; }
      
      Task PublishAsync<TEvent>(string queue, TEvent message, Action<IBasicProperties>? properties = null)
      where TEvent : class;
      
      Task PublishAsync<TEvent>(string queue, TEvent message, IBasicProperties properties);
      
      //  不建議直接使用該接口。
      Task CustomPublishAsync<TEvent>(string queue, EventBody<TEvent> message, BasicProperties properties);
      

      三個 PublishAsync 方法用于發布事件,ConnectionPool 屬性用于獲取 RabbitMQ.Client.IConnection 對象。

      由于直接公開了 BasicProperties ,因此開發者完全自由配置 RabbitMQ 原生的消息屬性,所以 Maomi.MQ.RabbitMQ 沒必要過度設計,只提供了簡單的功能接口。

      例如,可以通過 BasicProperties 配置單條消息的過期時間:

      await _messagePublisher.PublishAsync(queue: "RetryWeb", message: new TestEvent
      {
      	Id = i
      }, (BasicProperties p) =>
      {
      	p.Expiration = "1000";
      });
      

      當發布一條消息時,實際上框架傳遞的是 EventBody<T> 類型,EventBody<T> 中包含了一些重要的附加消息屬性,這些屬性會給消息處理和故障診斷帶來很大的方便。

      public class EventBody<TEvent>
      {
      	// 事件唯一 id.
      	public long Id { get; init; }
      
      	// Queue.
      	public string Queue { get; init; } = null!;
      
      	// App name.
      	public string Publisher { get; init; } = null!;
      
      	// 事件創建時間.
      	public DateTimeOffset CreationTime { get; init; }
      
      	// 事件體.
      	public TEvent Body { get; init; } = default!;
      }
      

      Maomi.MQ 通過 DefaultMessagePublisher 類型實現了 IMessagePublisher,DefaultMessagePublisher 默認生命周期是 Singleton:

      services.AddSingleton<IMessagePublisher, DefaultMessagePublisher>();
      

      生命周期不重要,如果需要修改默認的生命周期,可以手動修改替換。

      services.AddScoped<IMessagePublisher, DefaultMessagePublisher>();
      

      開發者也可以自行實現 IMessagePublisher 接口,具體示例請參考 DefaultMessagePublisher 類型。

      連接池

      為了復用 RabbitMQ.Client.IConnection ,Maomi.MQ.RabbitMQ 內部實現了 ConnectionPool 類型,通過對象池維護復用的 RabbitMQ.Client.IConnection 對象。

      默認對象池中的 RabbitMQ.Client.IConnection 數量為 0,只有當連接被真正使用時才會從對象池委托中創建,連接對象會隨著程序并發量而自動增加,但是,默認最大連接對象數量為 Environment.ProcessorCount * 2

      除了 IMessagePublisher 接口提供的 PublishAsync 方法可以發布事件,開發者還可以從 ConnectionPool 獲取連接對象,請務必在使用完畢后通過 ConnectionPool.Return() 方法將其歸還到連接對象池。

      通過連接池直接使用 IConnection 對象發布消息:

      [HttpGet("publish")]
      public async Task<string> Publisher()
      {
      	for (var i = 0; i < 100; i++)
      	{
      		var connectionPool = _messagePublisher.ConnectionPool;
      		var connection = connectionPool.Get();
      
      		try
      		{
      			connection.Channel.BasicPublishAsync(
      			exchange: string.Empty,
      			routingKey: "queue",
      			basicProperties: properties,
      			body: _jsonSerializer.Serializer(message),
      			mandatory: true);
      		}
      		finally
      		{
      			connectionPool.Return(connection);
      		}
      	}
      
      	return "ok";
      }
      

      你也可以繞開 IMessagePublisher ,直接注入 ConnectionPool 使用 RabbitMQ 連接對象,但是不建議這樣使用。

      private readonly ConnectionPool _connectionPool;
      
      public DefaultMessagePublisher(ConnectionPool connectionPool)
      {
      	_connectionPool = connectionPool;
      }
      
      public async Task MyPublshAsync()
      {
      	var connection = _connectionPool.Get();
      	try
      	{
      		await connection.Channel.BasicPublishAsync(...);
      	}
      	finally
      	{
      		_connectionPool.Return(connection);
      	}
      }
      

      為了更加簡便地管理連接對象,可以使用 CreateAutoReturn() 函數創建連接管理對象,該對象被釋放時會自動將 IConnection 返還給連接池。

      using var poolObject = _messagePublisher.ConnectionPool.CreateAutoReturn();
      poolObject.Channel.BasicPublishAsync(
      	exchange: string.Empty,
      	routingKey: "queue",
      	basicProperties: properties,
      	body: _jsonSerializer.Serializer(message),
      	mandatory: true);
      

      如果你自行使用 ConnectionPool 推送消息到 RabbitMQ,請務必通過序列化 EventBody<TEvent> 事件對象,這樣 Maomi.MQ.RabbitMQ 消費者才能正常工作。同時,Moami.MQ 對可觀測性做了支持,如果自行使用 ConnectionPool 獲取連接對象推送消息,可能會導致可觀測性信息缺失。

      正常情況下,RabbitMQ.Client 中包含了可觀測性的功能,但是 Maomi.MQ.RabbitMQ 附加的可觀測性信息有助于診斷故障問題。

      請注意:

      • Maomi.MQ.RqbbitMQ 通過 EventBody<TEvent> 泛型對象發布和接收事件。

      • DefaultMessagePublisher 包含了鏈路追蹤等可觀測性代碼。

      消息過期

      IMessagePublisher 對外開放了 BasicProperties 或 BasicProperties,可以自由配置消息屬性。

      例如為消息配置過期時間:

      [HttpGet("publish")]
      public async Task<string> Publisher()
      {
      	for (var i = 0; i < 1; i++)
      	{
      		await _messagePublisher.PublishAsync(queue: "test", message: new TestEvent
      		{
      			Id = i
      		}, properties =>
      		{
      			properties.Expiration = "6000";
      		});
      	}
      
      	return "ok";
      }
      

      如果此時為 test 綁定死信隊列,那么該消息長時間沒有被消費時,會被移動到另一個隊列,請參考 死信隊列

      還可以通過配置消息屬性實現更多的功能,請參考 IBasicProperties

      事務

      RabbitMQ 支持事務,不過據 RabbitMQ 官方文檔顯示,事務會使吞吐量減少 250 倍。

      RabbitMQ 事務使用上比較簡單,可以保證發布的消息已經被推送到 RabbitMQ 服務器,只有當提交事務時,提交的消息才會被 RabbitMQ 存儲并推送給消費者。

      使用示例:

      [HttpGet("publish_tran")]
      public async Task<string> Publisher_Tran()
      {
      	using var tranPublisher = await _messagePublisher.TxSelectAsync();
      
      	try
      	{
      		await tranPublisher.PublishAsync(queue: "publish_tran", message: new TestEvent
      		{
      			Id = 666
      		});
      		await tranPublisher.TxCommitAsync();
      	}
      	catch
      	{
      		await tranPublisher.TxRollbackAsync();
      		throw;
      	}
      
      	return "ok";
      }
      

      或者手動開啟事務:

      [HttpGet("publish_tran")]
      public async Task<string> Publisher_Tran()
      {
      	using var tranPublisher = _messagePublisher.CreateTransaction();
      
      	try
      	{
      		await tranPublisher.TxSelectAsync();
      		await tranPublisher.PublishAsync(queue: "publish_tran", message: new TestEvent
      		{
      			Id = 666
      		});
      		await tranPublisher.TxCommitAsync();
      	}
      	catch
      	{
      		await tranPublisher.TxRollbackAsync();
      		throw;
      	}
      
      	return "ok";
      }
      

      注意,在該種模式之下,創建 TransactionPublisher 對象時,會從對象池中取出一個連接對象,因為開啟事務模式可能會污染當前連接通道,因此 TransactionPublisher 不會向連接池歸還連接對象,而是直接釋放

      發送方確認模式

      雖然事務模式可以保證消息會被推送到 RabbitMQ 服務器中,但是由于事務模式會導致吞吐量降低 250 倍,因此不是一個好的選擇。為了解決這個問題, RabbitMQ 引入了一種確認機制,這種機制就像滑動窗口,能夠保證消息推送到服務器中,并且具備高性能的特性。

      請參考 https://www.rabbitmq.com/docs/confirms

      使用示例:

      [HttpGet("publish_confirm")]
      public async Task<string> Publisher_Confirm()
      {
      	using var confirmPublisher = await _messagePublisher.ConfirmSelectAsync();
      
      	for (var i = 0; i < 5; i++)
      	{
      		await confirmPublisher.PublishAsync(queue: "publish_confirm1", message: new TestEvent
      		{
      			Id = 666
      		});
      
      		var result = await confirmPublisher.WaitForConfirmsAsync();
      
      		// 如果在超時內沒有接收到 nacks,則為 True,否則為 false。
      		Console.WriteLine($"發布 {i},{result}");
      	}
      
      	return "ok";
      }
      

      WaitForConfirmsAsync 方法會返回一個值,如果正常被服務器確認了消息已經傳達,則結果為 true,如果超時沒有被服務器確認,則返回 false。

      此外,還有一個 WaitForConfirmsOrDieAsync 方法,它會一直等待該頻道上的所有已發布消息都得到確認,使用示例:

      using var confirmPublisher = await _messagePublisher.ConfirmSelectAsync();
      
      for (var i = 0; i < 5; i++)
      {
      	await confirmPublisher.PublishAsync(queue: "publish_confirm1", message: new TestEvent
      	{
      		Id = 666
      	});
      
      	Console.WriteLine($"發布 {i}");
      }
      
      await confirmPublisher.WaitForConfirmsOrDieAsync();
      

      注意,在該種模式之下,創建 ConfirmPublisher 對象時,會從對象池中取出一個連接對象,因為開啟事務模式可能會污染當前連接通道,因此 ConfirmPublisher 不會向連接池歸還連接對象,而是直接釋放

      注意,同一個通道不能同時使用事務和發送方確認模式。

      獨占模式

      默認情況下,每次使用 IMessagePublisher.PublishAsync() 發布消息時,都會從連接池中取出連接對象,然后使用該連接通道發布消息,發布完畢后就會歸還連接對象給連接池。

      如果需要在短時間內大批量發布消息,則需要每次都要重復獲取和返還連接對象。

      使用獨占模式時可以在一段時間內獨占一個連接對象,超出作用域后,連接對象會自動放回連接池。這種模式對于需要大量發布消息的場景提高吞吐量非常有幫助。為了能夠將連接通道歸還連接池,請務必使用 using 關鍵字修飾變量,或者手動調用 Dispose 函數。

      使用示例:

      // 創建獨占模式
      using var singlePublisher = _messagePublisher.CreateSingle();
      
      for (var i = 0; i < 500; i++)
      {
      	await singlePublisher.PublishAsync(queue: "publish_single", message: new TestEvent
      	{
      		Id = 666
      	});
      }
      

      消費者

      Maomi.MQ.RabbitMQ 中,有兩種消費模式,一種是消費者模式,一種是事件模式(事件總線模式)。

      下面簡單了解這兩種模式的使用方法。

      消費者模式

      消費者服務需要實現 IConsumer<TEvent> 接口,并且配置 [Consumer("queue")] 特性綁定隊列名稱,通過消費者對象來控制消費行為。

      消費者模式有具有失敗通知和補償能力,使用上也比較簡單。

      public class TestEvent
      {
          public int Id { get; set; }
      }
      
      [Consumer("PublisherWeb", Qos = 1, RetryFaildRequeue = true)]
      public class MyConsumer : IConsumer<TestEvent>
      {
          private static int _retryCount = 0;
      
          // 消費或重試
          public async Task ExecuteAsync(EventBody<TestEvent> message)
          {
              _retryCount++;
              Console.WriteLine($"執行次數:{_retryCount} 事件 id: {message.Id} {DateTime.Now}");
              await Task.CompletedTask;
          }
          
          // 失敗
          public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;
          
          // 補償
          public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true);
      }
      

      事件模式

      事件模式是通過事件總線的方式實現的,以事件模型為中心,通過事件來控制消費行為。

      [EventTopic("web2", Qos = 1, RetryFaildRequeue = true)]
      public class TestEvent
      {
      	public string Message { get; set; }
      }
      

      然后使用 [EventOrder] 特性編排事件執行順序。

      // 編排事件消費順序
      [EventOrder(0)]
      public class My1EventEventHandler : IEventHandler<TestEvent>
      {
      	public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
      	{
      	}
      
      	public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
      	{
      		Console.WriteLine($"{@event.Id},事件 1 已被執行");
      	}
      }
      
      [EventOrder(1)]
      public class My2EventEventHandler : IEventHandler<TestEvent>
      {
      	public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
      	{
      	}
      
      	public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
      	{
      		Console.WriteLine($"{@event.Id},事件 2 已被執行");
      	}
      }
      

      當然,事件模式也可以通過創建中間件增加補償功能,通過中間件還可以將所有排序事件放到同一個事務中,一起成功或失敗,避免事件執行時出現程序退出導致的一致性問題。

      public class TestEventMiddleware : IEventMiddleware<TestEvent>
      {
          private readonly BloggingContext _bloggingContext;
      
          public TestEventMiddleware(BloggingContext bloggingContext)
          {
              _bloggingContext = bloggingContext;
          }
      
          public async Task ExecuteAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
          {
              using (var transaction = _bloggingContext.Database.BeginTransaction())
              {
                  await next(@event, CancellationToken.None);
                  await transaction.CommitAsync();
              }
          }
      
          public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
          {
              return Task.CompletedTask;
          }
      
          public Task<bool> FallbackAsync(EventBody<TestEvent>? message)
          {
              return Task.FromResult(true);
          }
      }
      

      分組

      消費者模式和事件模式都可以設置分組,在特性上設置了 Group 屬性,具有同一個分組的事件會被放到一個連接通道(RabbitMQ.Client.IConnection)中,對于消費頻率不高的事件,復用連接通道可以有效較低資源消耗。

      消費者模式分組示例:

      [Consumer("ConsumerWeb_group_1", Qos = 1, Group = "group")]
      public class Group_1_Consumer : IConsumer<GroupEvent>
      {
          public Task ExecuteAsync(EventBody<GroupEvent> message) => Task.CompletedTask;
      
          public Task FaildAsync(Exception ex, int retryCount, EventBody<GroupEvent>? message) => Task.CompletedTask;
      
          public Task<bool> FallbackAsync(EventBody<GroupEvent>? message) => Task.FromResult(true);
      }
      
      [Consumer("ConsumerWeb_group_2", Qos = 1, Group = "group")]
      public class Group_2_Consumer : IConsumer<GroupEvent>
      {
          public Task ExecuteAsync(EventBody<GroupEvent> message) => Task.CompletedTask;
      
          public Task FaildAsync(Exception ex, int retryCount, EventBody<GroupEvent>? message) => Task.CompletedTask;
      
          public Task<bool> FallbackAsync(EventBody<GroupEvent>? message) => Task.FromResult(true);
      }
      

      事件總線模式分組示例:

      [EventTopic("web1", Qos = 1, RetryFaildRequeue = true, Group = "group")]
      public class Test1Event
      {
      	public string Message { get; set; }
      }
      [EventTopic("web2", Qos = 1, RetryFaildRequeue = true, Group = "group")]
      public class Test2Event
      {
      	public string Message { get; set; }
      }
      
      

      消費者模式

      消費者模式要求服務實現 IConsumer<TEvent> 接口,并添加 [Connsumer] 特性。

      IConsumer<TEvent> 接口比較簡單,其定義如下:

      public interface IConsumer<TEvent>
          where TEvent : class
      {
          // 消息處理.
          public Task ExecuteAsync(EventBody<TEvent> message);
      
          // ExecuteAsync 異常后立即執行此代碼.
          public Task FaildAsync(Exception ex, int retryCount, EventBody<TEvent>? message);
      
          // 最后一次重試失敗時執行,用于補償.
          public Task<bool> FallbackAsync(EventBody<TEvent>? message);
      }
      

      使用消費者模式時,需要先定義一個模型類,用于發布者和消費者之間傳遞消息,事件模型類只要是類即可,能夠正常序列化和反序列化,沒有其它要求。

      public class TestEvent
      {
      	public int Id { get; set; }
      
      	public override string ToString()
      	{
      		return Id.ToString();
      	}
      }
      

      然后繼承 IConsumer<TEvent> 接口實現消費者功能:

      [Consumer("web1", Qos = 1)]
      public class MyConsumer : IConsumer<TestEvent>
      {
      	// 消費
      	public async Task ExecuteAsync(EventBody<TestEvent> message)
      	{
      		Console.WriteLine(message.Body.Id);
      	}
      
      	// 每次失敗時被執行
      	public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
      	{
      		Console.WriteLine($"重試 {message.Body.Id},次數 {retryCount}");
      		await Task.CompletedTask;
      	}
      
      	// 最后一次失敗時執行
      	public async Task<bool> FallbackAsync(EventBody<TestEvent>? message)
      	{
      		Console.WriteLine($"最后一次 {message.Body.Id}");
              // 如果返回 true,說明補償成功。
      		return true;
      	}
      }
      

      特性配置的說明請參考 消費者配置

      消費、重試和補償

      消費者收到服務器推送的消息時,ExecuteAsync 方法會被自動執行。當 ExecuteAsync 執行異常時,FaildAsync 方法會馬上被觸發,開發者可以利用 FaildAsync 記錄相關日志信息。

      // 每次失敗時被執行
      public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
      {
      	// 當 retryCount == -1 時,錯誤并非是 ExecuteAsync 方法導致的
      	if (retryCount == -1)
      	{
      		_logger.LogError(ex, "Consumer error,event id: {Id}", message?.Id);
      
      		// 可以在此處添加告警通知代碼
      		await Task.Delay(1000);
      	}
      	else
      	{
      		_logger.LogError(ex, "Consumer exception,event id: {Id},retry count: {retryCount}", message!.Id, retryCount);
      	}
      }
      

      如果 FaildAsync 方法也出現異常時,不會影響整體流程,框架會等待到達間隔時間后繼續重試 ExecuteAsync 方法。

      建議 FaildAsync 使用 try{}cathc{} 套住代碼,不要對外拋出異常,FaildAsync 的邏輯不要包含太多邏輯,并且 FaildAsync 只應記錄日志或進行告警使用。

      FaildAsync 被執行有一個額外情況,就是在消費消息之前就已經發生錯誤,例如一個事件模型類有構造函數導致不能被反序列化,這個時候 FaildAsync 會被立即執行,且 retryCount = -1

      ExecuteAsync 方法執行異常時,框架會自動重試,默認會重試五次,如果五次都失敗,則會執行 FallbackAsync 方法進行補償。

      重試間隔時間會逐漸增大,請參考 重試

      當重試五次之后,就會立即啟動補償機制。

      // 最后一次失敗時執行
      public async Task<bool> FallbackAsync(EventBody<TestEvent>? message)
      {
      	return true;
      }
      

      FallbackAsync 方法需要返回 bool,如果返回 true ,表示雖然 ExecuteAsync 出現異常,但是 FallbackAsync 補償后已經正常,該消息會被正常消費掉。如果返回 false,則說補償失敗,該消息按照消費失敗處理。

      只有 ExecuteAsync 異常時,才會觸發 FaildAsyncFallbackAsync ,如果是在處理消息之前的異常,會直接失敗。

      retry

      消費失敗

      ExecuteAsync 失敗次數達到閾值時,并且 FallbackAsync 返回 false,則該條消息消費失敗,或者由于序列化等錯誤時直接失敗。

      [Consumer] 特性中有三個很重要的配置:

      public class ConsumerAttribute : Attribute
      {
          // 消費失敗次數達到條件時,是否放回隊列.
          public bool RetryFaildRequeue { get; set; }
      
          // 現異常時是否放回隊列,例如序列化錯誤等原因導致的,而不是消費時發生異常導致的.
          public bool ExecptionRequeue { get; set; }  = true;
          
          // 綁定死信隊列.
          public string? DeadQueue { get; set; }
      }
      

      ExecuteAsync 失敗次數達到閾值時,并且 FallbackAsync 返回 false,則該條消息消費失敗。

      如果 RetryFaildRequeue == false,那么該條消息會被 RabbitMQ 丟棄。

      如果綁定了死信隊列,則會先推送到死信隊列,接著再丟棄。

      如果 RetryFaildRequeue == true,那么該條消息會被返回 RabbbitMQ 隊列中,等待下一次消費。

      由于消息失敗后會被放回隊列,因此綁定的死信隊列不會收到該消息。

      當序列化異常或者其它問題導致錯誤而不能進入 ExecuteAsync 方法時,FaildAsync 方法會首先被觸發一次,此時 retryCount 參數值為 -1

      出現此種問題時,一般是開發者 bug 導致的,不會進行補償等操作,開發者可以在 FaildAsync 中處理該事件,記錄相關日志信息。

      // 每次失敗時被執行,或者出現無法進入 ExecuteAsync 的異常
      public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
      {
      	// 當 retryCount == -1 時,錯誤并非是 ExecuteAsync 方法導致的
      	if (retryCount == -1)
      	{
      		_logger.LogError(ex, "Consumer error,event id: {Id}", message?.Id);
      
      		// 可以在此處添加告警通知代碼
      		await Task.Delay(1000);
      	}
      	else
      	{
      		_logger.LogError(ex, "Consumer exception,event id: {Id},retry count: {retryCount}", message!.Id, retryCount);
      	}
      }
      

      由于這種情況不妥善處理,會導致消息丟失,因此框架默認將 ExecptionRequeue 設置為 true,也就是說出現這種異常時,消息會被放回隊列。如果問題一致沒有得到解決,則會出現循環:調用 FaildAsync 、放回隊列、調用 FaildAsync 、放回隊列... ...

      所以應該在 FaildAsync 中添加代碼通知開發者相關信息,并且設置間隔時間,避免重試太頻繁。

      自動創建隊列

      框架默認會自動創建隊列,如果需要關閉自動創建功能,把 AutoQueueDeclare 設置為 false 即可。

      builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
      {
      	options.WorkId = 1;
      	options.AppName = "myapp";
      	options.AutoQueueDeclare = false;
      	options.Rabbit = (ConnectionFactory options) =>
      	{
      		options.HostName = "192.168.3.248";
      		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
      	};
      }, [typeof(Program).Assembly]);
      

      當然還可以單獨為消費者配置是否自動創建隊列:

      [Consumer("ConsumerWeb_create", AutoQueueDeclare = AutoQueueDeclare.Enable)]
      

      默認情況下,關閉了全局自動創建,則不會自動創建隊列。

      如果關閉全局自動創建,但是消費者配置了 AutoQueueDeclare = AutoQueueDeclare.Enable,則還是會自動創建隊列。

      如果消費者配置了 AutoQueueDeclare = AutoQueueDeclare.Disable ,則會忽略全局配置,不會創建隊列。

      Qos

      讓程序需要嚴格根據順序消費時,可以使用 Qos = 1,框架會嚴格保證逐條消費,如果程序不需要順序消費,希望可以快速處理所有消息,則可以將 Qos 設置大一些。由于 Qos 和重試、補償機制組合使用會有多種情況,因此請參考 重試

      Qos 是通過特性來配置的:

      [Consumer("ConsumerWeb", Qos = 1)]
      

      可以通過調高 Qos 值,讓程序在可以并發消息,提高并發量。

      延遲隊列

      延遲隊列有兩種,一種設置消息過期時間,一種是設置隊列過期時間。

      設置消息過期時間,那么該消息在一定時間沒有被消費時,會被丟棄或移動到死信隊列中,該配置只對單個消息有效,請參考 消息過期

      隊列設置過期后,當消息在一定時間內沒有被消費時,會被丟棄或移動到死信隊列中,該配置只對所有消息有效。基于這一點,我們可以實現延遲隊列。

      首先創建消費者,繼承 EmptyConsumer,那么該隊列會在程序啟動時被創建,但是不會創建 IConnection 進行消費。然后設置隊列消息過期時間以及綁定死信隊列,綁定的死信隊列既可以使用消費者模式實現,也可以使用事件模式實現。

      [Consumer("ConsumerWeb_dead_2", Expiration = 6000, DeadQueue = "ConsumerWeb_dead_queue_2")]
      public class EmptyDeadConsumer : EmptyConsumer<DeadEvent>
      {
      }
      
      // ConsumerWeb_dead 消費失敗的消息會被此消費者消費。
      [Consumer("ConsumerWeb_dead_queue_2", Qos = 1)]
      public class Dead_2_QueueConsumer : IConsumer<DeadQueueEvent>
      {
          // 消費
          public Task ExecuteAsync(EventBody<DeadQueueEvent> message)
          {
              Console.WriteLine($"死信隊列,事件 id:{message.Id}");
              return Task.CompletedTask;
          }
      
          // 每次失敗時被執行
          public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadQueueEvent>? message) => Task.CompletedTask;
      
          // 最后一次失敗時執行
          public Task<bool> FallbackAsync(EventBody<DeadQueueEvent>? message) => Task.FromResult(false);
      }
      

      空消費者

      當識別到空消費者時,框架只會創建隊列,而不會啟動消費者消費消息。

      可以結合延遲隊列一起使用,該隊列不會有任何消費者,當該隊列的消息過期時,都由死信隊列直接消費,示例如下:

      [Consumer("ConsumerWeb_empty", Expiration = 6000, DeadQueue = "ConsumerWeb_empty_dead")]
      public class MyEmptyConsumer : EmptyConsumer<TestEvent> { }
      
      [Consumer("ConsumerWeb_empty_dead", Qos = 10)]
      public class MyDeadConsumer : IConsumer<TestEvent>
      {
      	public Task ExecuteAsync(EventBody<TestEvent> message) => Task.CompletedTask;
      
      	public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;
      
      	public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true);
      }
      

      對于跨進程的隊列,A 服務不消費只發布,B 服務負責消費,A 服務中可以加一個空消費者,保證 A 服務啟動時該隊列一定存在,另一方面,消費者服務不應該關注隊列的定義,也不太應該創建隊列。

      分組

      通過配置 Group 屬性將多個消費者放到同一個連接通道中執行,對于那些并發量不高的隊列,復用連接通道可以降低資源消耗。

      示例:

      [Consumer("ConsumerWeb_group_1", Qos = 1, Group = "group")]
      public class Group_1_Consumer : IConsumer<GroupEvent>
      {
      }
      
      [Consumer("ConsumerWeb_group_2", Qos = 1, Group = "group")]
      public class Group_2_Consumer : IConsumer<GroupEvent>
      {
      }
      

      事件總線模式

      Maomi.MQ 內部設計了一個事件總線,可以幫助開發者實現事件編排、實現本地事務、正向執行和補償。

      首先定義一個事件類型,該事件綁定一個 topic 或隊列,事件需要使用 [EventTopic] 標識,并設置該事件對于的隊列名稱。

      [EventTopic] 特性擁有與 [Consumer] 相同的特性,可參考 [Consumer] 的使用配置事件,請參考 消費者配置

      [EventTopic("EventWeb")]
      public class TestEvent
      {
      	public string Message { get; set; }
      
      	public override string ToString()
      	{
      		return Message;
      	}
      }
      

      然后編排事件執行器,每個執行器都需要繼承 IEventHandler<T> 接口,然后使用 [EventOrder] 特性標記執行順序。

      [EventOrder(0)]
      public class My1EventEventHandler : IEventHandler<TestEvent>
      {
      	public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
      	{
      	}
      
      	public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
      	{
      		Console.WriteLine($"{@event.Id},事件 1 已被執行");
      	}
      }
      
      [EventOrder(1)]
      public class My2EventEventHandler : IEventHandler<TestEvent>
      {
      	public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
      	{
      	}
      
      	public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
      	{
      		Console.WriteLine($"{@event.Id},事件 2 已被執行");
      	}
      }
      

      每個事件執行器都必須實現 IEventHandler<T> 接口,并且設置 [EventOrder] 特性以便確認事件的執行順序,框架會按順序執行 IEventHandler<T>ExecuteAsync 方法,當 ExecuteAsync 出現異常時,則反向按順序調用 CancelAsync

      由于程序可能隨時掛掉,因此通過 CancelAsync 實現補償是不太可能的,CancelAsync 主要作為記錄相關信息而使用。

      中間件

      中間件的作用是便于開發者攔截事件、記錄信息、實現本地事務等,如果開發者不配置,則框架會自動創建 DefaultEventMiddleware<TEvent> 類型作為該事件的中間件服務。

      自定義事件中間件示例代碼:

      public class TestEventMiddleware : IEventMiddleware<TestEvent>
      {
      	public async Task HandleAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
      	{
      		await next(@event, CancellationToken.None);
      	}
      }
      

      next 委托是框架構建的事件執行鏈路,在中間件中可以攔截事件、決定是否執行事件鏈路。

      在中間件中調用 next() 委托時,框架開始按順序執行事件,即前面提到的 My1EventEventHandlerMy2EventEventHandler

      當一個事件有多個執行器時,由于程序可能會在任何時刻掛掉,因此本地事務必不可少。

      例如,在中間件中注入數據庫上下文,然后啟動事務執行數據庫操作,當其中一個 EventHandler 執行失敗時,執行鏈路會回滾,同時不會提交事務。

      可以參考 消費者模式 實現中間件的重試和補償方法。

      示例如下:

      public class TestEventMiddleware : IEventMiddleware<TestEvent>
      {
          private readonly BloggingContext _bloggingContext;
      
          public TestEventMiddleware(BloggingContext bloggingContext)
          {
              _bloggingContext = bloggingContext;
          }
      
          public async Task HandleAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
          {
              using (var transaction = _bloggingContext.Database.BeginTransaction())
              {
                  await next(@event, CancellationToken.None);
                  await transaction.CommitAsync();
              }
          }
      
          public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
          {
              return Task.CompletedTask;
          }
      
          public Task<bool> FallbackAsync(EventBody<TestEvent>? message)
          {
              return Task.FromResult(true);
          }
      }
      
      [EventOrder(0)]
      public class My1EventEventHandler : IEventHandler<TestEvent>
      {
          private readonly BloggingContext _bloggingContext;
      
          public My1EventEventHandler(BloggingContext bloggingContext)
          {
              _bloggingContext = bloggingContext;
          }
      
          public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
          {
              Console.WriteLine($"{@event.Id} 被補償,[1]");
          }
      
          public async Task HandlerAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
          {
              await _bloggingContext.Posts.AddAsync(new Post
              {
                  Title = "魯濱遜漂流記",
                  Content = "隨便寫寫就對了"
              });
              await _bloggingContext.SaveChangesAsync();
          }
      }
      
      [EventOrder(1)]
      public class My2EventEventHandler : IEventHandler<TestEvent>
      {
          private readonly BloggingContext _bloggingContext;
      
          public My2EventEventHandler(BloggingContext bloggingContext)
          {
              _bloggingContext = bloggingContext;
          }
          public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
          {
              Console.WriteLine($"{@event.Id} 被補償,[2]");
          }
      
          public async Task HandlerAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
          {
              await _bloggingContext.Posts.AddAsync(new Post
              {
                  Title = "紅樓夢",
                  Content = "賈寶玉初試云雨情"
              });
              await _bloggingContext.SaveChangesAsync();
      
              throw new OperationCanceledException("故意報錯");
          }
      }
      

      image-20240525155639461

      事件執行時,如果出現異常,也是會被重試的,中間件 TestEventMiddleware 的 FaildAsync、FallbackAsync 會被依次執行。

      你可以參考 消費者模式 或者 重試

      分組消費

      事件分組消費主要是利用同一個 IConnection 同時處理多個消息隊列,提高通道利用率。

      示例:

      [EventTopic("EventGroup_1", Group = "aaa")]
      public class Test1Event
      {
      	public string Message { get; set; }
      
      	public override string ToString()
      	{
      		return Message;
      	}
      }
      
      [EventTopic("EventGroup_2", Group = "aaa")]
      public class Test2Event
      {
      	public string Message { get; set; }
      
      	public override string ToString()
      	{
      		return Message;
      	}
      }
      

      Maomi.MQ 的 IConsumer<T> 是一個消費者(一個隊列)使用一個 IConnection,默認情況下事件總線也是。

      對于哪些并發量不大或利用率較低的隊列,可以通過事件分組將其合并到同一個 IConnection 中進行處理。

      使用方法很簡單,只需要在定義事件時,配置 [EventTopic] 特性的 Group 方法即可。

      由于不同隊列被放到一個 IConnection 中消費,如果事件都設置了 Qos,那么框架會默認計算平均值,例如:

      [EventTopic("web3_1", Group = "aaa", Qos = 10)]
      public class Test1Event
      
      [EventTopic("web3_2", Group = "aaa", Qos = 6)]
      public class Test2Event
      

      此時框架會設置 Qos 為 8

      配置

      在引入 Maomi.MQ 框架時,可以配置相關屬性,示例和說明如下:

      // this.
      builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
      {
          // 當前程序節點,用于配置分布式雪花 id
      	options.WorkId = 1;
          
          // 是否自動創建隊列
      	options.AutoQueueDeclare = true;
          
          // 當前應用名稱,用于標識消息的發布者和消費者程序
      	options.AppName = "myapp";
          
          // RabbitMQ 配置
      	options.Rabbit = (ConnectionFactory options) =>
      	{
      		options.HostName = "192.168.3.248";
      		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
      	};
      }, [typeof(Program).Assembly]);  // 要被掃描的程序集
      

      消費者配置

      消費者模式 [Consumer] 和事件總線模式 [EventTopic] 具有相同的屬性配置,其配置說明如下:

      名稱 類型 默認值 說明
      Queue string 隊列名稱
      DeadQueue string? 綁定死信隊列名稱
      ExecptionRequeue bool true 出現異常時是否放回隊列,例如序列化錯誤等原因導致的,而不是消費時發生異常導致的
      Expiration int 0 隊列消息過期時間,單位毫秒
      Qos ushort 1 Qos
      RetryFaildRequeue bool false 消費失敗次數達到條件時,是否放回隊列
      Group string? 分組名稱
      AutoQueueDeclare AutoQueueDeclare AutoQueueDeclare.None 是否自動創建隊列

      環境隔離

      目前還在考慮要不要支持多租戶模式。

      在開發中,往往需要在本地調試,本地程序啟動后會連接到開發服務器上,一個隊列收到消息時,會向其中一個消費者推送消息。那么我本地調試時,發布一個消息后,可能本地程序收不到該消息,而是被開發環境中的程序消費掉了。

      這個時候,我們希望可以將本地調試環境跟開發環境隔離開來,可以使用 RabbitMQ 提供的 VirtualHost 功能。

      首先通過 put 請求創建一個新的 VirtualHost,請參考文檔:https://www.rabbitmq.com/docs/vhosts#using-http-api

      image-20240612193415867

      然后在代碼中配置 VirtualHost:

      builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
      {
      	options.WorkId = 1;
      	options.AutoQueueDeclare = true;
      	options.AppName = "myapp";
      	options.Rabbit = (ConnectionFactory options) =>
      	{
      		options.HostName = "192.168.3.248";
      #if DEBUG
      		options.VirtualHost = "debug";
      #endif
      		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
      	};
      }, [typeof(Program).Assembly]);
      

      雪花 id 配置

      Maomi.MQ.RabbitMQ 使用了 IdGenerator 生成雪花 id,使得每個事件在集群中都有一個唯一 id。

      框架通過 IIdFactory 接口創建雪花 id,你可以通過替換 IIdFactory 接口配置雪花 id 生成規則。

      services.AddSingleton<IIdFactory>(new DefaultIdFactory((ushort)optionsBuilder.WorkId));
      

      示例:

      public class DefaultIdFactory : IIdFactory
      {
          /// <summary>
          /// Initializes a new instance of the <see cref="DefaultIdFactory"/> class.
          /// </summary>
          /// <param name="workId"></param>
          public DefaultIdFactory(ushort workId)
          {
              var options = new IdGeneratorOptions(workId) { SeqBitLength = 10 };
              YitIdHelper.SetIdGenerator(options);
          }
      
          /// <inheritdoc />
          public long NextId() => YitIdHelper.NextId();
      }
      

      IdGenerator 框架生成雪花 id 配置請參考:

      https://github.com/yitter/IdGenerator/tree/master/C%23

      Qos 并發和順序

      基于消費者模式和基于事件模式都是通過特性來配置消費屬性,Qos 是其中一個重要的屬性。

      Qos 場景

      對于消費者模式和事件總線模式,在沒有使用 Group 屬性配置消費行為時,每個隊列都會獨占一個 IConnection 以及 Host service。

      對于消費頻率很高但是不能并發的隊列,最好不要設置 Group 屬性,以及務必設置 Qos = 1。這樣依賴,該消費者會獨占資源進行消費,在保證順序的情況下,獨占資源有助于提高消費能力。

      [Consumer("web1", Qos = 1)]
      public class MyConsumer : IConsumer<TestEvent>
      {
      }
      

      當需要需要提高消費吞吐量,而且不需要順序消費時,可以將 Qos 設置高一些,RabbitMQ Client 框架會通過預取等方式提高吞吐量,并且多條消息可以并發消費。

      如果判斷一些消費者的消費頻率不會很高時,可以將這些消費者放到一個分組中。

      當多個消費者或事件配置共用一個分組時,那么這些事件的 Qos 應當一致,否則按照平均值來算。

      示例:

      [Consumer("web1", Qos = 10, Group = "group")]
      public class My1Consumer : IConsumer<TestEvent>
      {
      }
      
      [Consumer("web2", Qos = 6, Group = "group")]
      public class My2Consumer : IConsumer<TestEvent>
      {
      }
      

      由于兩個消費者使用相同的分組,因此復用通道的 Qos 會被設置為 8。

      如果消費頻率不高,但是需要順序消費時,可以將這些消費者放到同一個分組中,并且 Qos 設置為 1。

      [Consumer("web1", Qos = 1, Group = "group1")]
      public class My1Consumer : IConsumer<TestEvent>
      {
      }
      
      [Consumer("web2", Qos = 1, Group = "group1")]
      public class My2Consumer : IConsumer<TestEvent>
      {
      }
      

      并發和異常處理

      第一次情況,Qos 為 1 時,不設置 ExecptionRequeue 、RetryFaildRequeue。

      第二種情況,Qos 為 1 時,設置 ExecptionRequeue 、RetryFaildRequeue。

      Qos 為 1 時,會保證嚴格順序消費,ExecptionRequeue 、RetryFaildRequeue 會影響失敗的消息是否會被放回隊列,如果放回隊列,下一次消費會繼續消費之前失敗的消息。如果錯誤(如 bug)得不到解決,則會出現消費、失敗、放回隊列、重新消費這樣的循環。

      第三次情況,Qos > 1 時,不設置 ExecptionRequeue 、RetryFaildRequeue。

      第四種情況,Qos > 1 時,設置 ExecptionRequeue 、RetryFaildRequeue。

      當 Qos 大于 1 時,如果設置了 RetryFaildRequeue = true,那么消費失敗的消息會被放回隊列中,但是不一定下一次會立即重新消費該條消息。

      重試

      重試時間

      當消費者 ExecuteAsync 方法異常時,框架會進行重試,默認會重試五次,按照 2 作為指數設置重試時間間隔。

      第一次失敗后,間隔 2 秒重試,第二次失敗后,間隔 4 秒,接著分別是 8、16、32 秒。

      Maomi.MQ.RabbitMQ 使用了 Polly 框架做重試策略管理器,默認通過 DefaultRetryPolicyFactory 服務生成重試間隔策略。

      DefaultRetryPolicyFactory 代碼示例如下:

      /// <summary>
      /// Default retry policy.<br />
      /// 默認的策略提供器.
      /// </summary>
      public class DefaultRetryPolicyFactory : IRetryPolicyFactory
      {
          /// <inheritdoc/>
          public virtual Task<AsyncRetryPolicy> CreatePolicy(string queue, long id)
          {
              // Create a retry policy.
              // 創建重試策略.
              var retryPolicy = Policy
                  .Handle<Exception>()
                  .WaitAndRetryAsync(
                      retryCount: 5,
                      sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
                      onRetry: async (exception, timeSpan, retryCount, context) =>
                      {
                          _logger.LogDebug("Retry execution event,queue [{Queue}],retry count [{RetryCount}],timespan [{TimeSpan}]", queue, retryCount, timeSpan);
                          await FaildAsync(queue, exception, timeSpan, retryCount, context);
                      });
      
              return Task.FromResult(retryPolicy);
          }
      
          
          public virtual Task FaildAsync(string queue, Exception ex, TimeSpan timeSpan, int retryCount, Context context)
          {
              return Task.CompletedTask;
          }
      }
      

      你可以通過實現 IRetryPolicyFactory 接口,替換默認的重試策略服務服務。

      services.AddSingleton<IRetryPolicyFactory, DefaultRetryPolicyFactory>();
      

      重試機制

      設定消費者代碼如下:

          [Consumer("web1", Qos = 1 , RetryFaildRequeue = true)]
          public class MyConsumer : IConsumer<TestEvent>
          {
              private  int _retryCount = 0;
              // 消費
              public async Task ExecuteAsync(EventBody<TestEvent> message)
              {
                  Console.WriteLine($"執行 {message.Body.Id} 第幾次:{_retryCount} {DateTime.Now}");
                  _retryCount++;
                  throw new Exception("1");
              }
      
              // 每次失敗時被執行
              public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
              {
                  Console.WriteLine($"重試 {message.Body.Id} 第幾次:{retryCount} {DateTime.Now}");
                  await Task.CompletedTask;
              }
      
      
              // 最后一次失敗時執行
              public async Task<bool> FallbackAsync(EventBody<TestEvent>? message)
              {
                  Console.WriteLine($"執行 {message.Body.Id} 補償 {DateTime.Now}");
                  return true;
              }
          }
      }
      

      retry

      首先會執行 IConsumer<TEvent>.ExecuteAsync()IEventMiddleware<TEvent>.ExecuteAsync() 消費消息,此時 ExecuteAsync() 執行失敗,立即觸發 FaildAsync() 函數。

      然后等待一段時間間隔后,接著會重新執行 ExecuteAsync() 方法。

      比如默認重試機制是重試五次,那么最終 IConsumer<TEvent>.ExecuteAsync()IEventMiddleware<TEvent>.ExecuteAsync() 都會被執行 6次,一次正常消費和五次重試消費。

      FallbackAsync() 方法會在最后一次重試失敗后被調用,該函數要返回一個 bool 類型。

      當多次重試失敗后,框架會調用 FallbackAsync 方法,如果該方法放回 true,那么框架會認為雖然 ExecuteAsync() 執行失敗,但是通過 FallbackAsync() 已經補償好了,該消息會被當做正常完成消費,框架會向 RabbitMQ 服務器發送 ACK,接著消費下一條消息。

      如果 FallbackAsync() 返回 false,框架會認為該消息徹底失敗,如果設置了 RetryFaildRequeue = true,那么該條消息會被放回消息隊列,等待下一次消費。否則該條消息會被直接丟棄。

      持久化剩余重試次數

      當消費者處理消息失敗時,默認消費者會重試 5 次,如果已經重試了 3 次,此時程序重啟,那么下一次消費該消息時,依然是繼續重試五次。

      需要記憶重試次數,在程序重啟時,能夠按照剩余次數進行重試。

      引入 Maomi.MQ.RedisRetry 包。

      配置示例:

      builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
      {
      	options.WorkId = 1;
      	options.AutoQueueDeclare = true;
      	options.AppName = "myapp";
      	options.Rabbit = (ConnectionFactory options) =>
      	{
      		options.HostName = "192.168.3.248";
      		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
      	};
      }, [typeof(Program).Assembly]);
      
      builder.Services.AddMaomiMQRedisRetry((s) =>
      {
      	ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.3.248");
      	IDatabase db = redis.GetDatabase();
      	return db;
      });
      

      默認 key 只會保留 5 分鐘。也就是說,如果五分鐘之后程序才重新消費該消息,那么就會剩余重試次數就會重置。

      死信隊列

      死信隊列

      可以給一個消費者或事件綁定死信隊列,當該隊列的消息失敗后并且不會放回隊列時,該消息會被推送到死信隊列中,示例:

      [Consumer("ConsumerWeb_dead", Qos = 1, DeadQueue = "ConsumerWeb_dead_queue", RetryFaildRequeue = false)]
      public class DeadConsumer : IConsumer<DeadEvent>
      {
      	// 消費
      	public Task ExecuteAsync(EventBody<DeadEvent> message)
      	{
      		Console.WriteLine($"事件 id:{message.Id}");
      		throw new OperationCanceledException();
      	}
      
      	// 每次失敗時被執行
      	public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadEvent>? message) => Task.CompletedTask;
      
      	// 最后一次失敗時執行
      	public Task<bool> FallbackAsync(EventBody<DeadEvent>? message) => Task.FromResult(false);
      }
      
      // ConsumerWeb_dead 消費失敗的消息會被此消費者消費。
      [Consumer("ConsumerWeb_dead_queue", Qos = 1)]
      public class DeadQueueConsumer : IConsumer<DeadQueueEvent>
      {
      	// 消費
      	public Task ExecuteAsync(EventBody<DeadQueueEvent> message)
      	{
      		Console.WriteLine($"死信隊列,事件 id:{message.Id}");
      		return Task.CompletedTask;
      	}
      
      	// 每次失敗時被執行
      	public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadQueueEvent>? message) => Task.CompletedTask;
      
      	// 最后一次失敗時執行
      	public Task<bool> FallbackAsync(EventBody<DeadQueueEvent>? message) => Task.FromResult(false);
      }
      
      

      image-20240601012127169

      如果使用死信隊列,則務必將 RetryFaildRequeue 設置為 false,那么消費者會在重試多次失敗后,向 RabbitMQ 發送 nack 信號,RabbitMQ 就會將該消息轉發到綁定的死信隊列中。

      延遲隊列

      創建一個消費者,繼承 EmptyConsumer,那么該隊列會在程序啟動時被創建,但是不會創建 IConnection 進行消費。然后設置隊列消息過期時間以及綁定死信隊列,綁定的死信隊列既可以使用消費者模式實現,也可以使用事件模式實現。

      [Consumer("ConsumerWeb_dead_2", Expiration = 6000, DeadQueue = "ConsumerWeb_dead_queue_2")]
      public class EmptyDeadConsumer : EmptyConsumer<DeadEvent>
      {
      }
      
      // ConsumerWeb_dead 消費失敗的消息會被此消費者消費。
      [Consumer("ConsumerWeb_dead_queue_2", Qos = 1)]
      public class Dead_2_QueueConsumer : IConsumer<DeadQueueEvent>
      {
          // 消費
          public Task ExecuteAsync(EventBody<DeadQueueEvent> message)
          {
              Console.WriteLine($"事件 id:{message.Id} 已到期");
              return Task.CompletedTask;
          }
      
          // 每次失敗時被執行
          public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadQueueEvent>? message) => Task.CompletedTask;
      
          // 最后一次失敗時執行
          public Task<bool> FallbackAsync(EventBody<DeadQueueEvent>? message) => Task.FromResult(false);
      }
      

      例如,用戶下單之后,如果 15 分鐘之內沒有付款,那么消息到期時,自動取消訂單。

      可觀測性

      功能還在繼續完善中。請參考 ActivitySourceApi 示例。

      為了快速部署可觀測性平臺,可以使用 OpenTelemetry 官方提供的示例包快速部署相關的服務。

      下載示例倉庫源碼:

      git clone https://github.com/open-telemetry/opentelemetry-demo.git
      

      由于示例中會包含大量的 demo 微服務,因此我們需要打開 docker-compose.yml 文件,將 services 節點的 Core Demo ServicesDependent Services 服務直接刪除,只保留可觀測性組件。或者直接點擊下載筆者已經修改好的版本: docker-compose.yml

      image-20240612200711787

      執行命令部署可觀測性服務:

      docker-compose up -d
      

      image-20240612201100976

      opentelemetry-collector-contrib 用于收集鏈路追蹤的可觀測性信息,有 grpc 和 http 兩種,監聽端口如下:

      Port Protocol Endpoint Function
      4317 gRPC n/a Accepts traces in OpenTelemetry OTLP format? (Protobuf).
      4318 HTTP /v1/traces Accepts traces in OpenTelemetry OTLP format? (Protobuf and JSON).

      經過容器端口映射后,對外端口可能不是 4317、4318 了。

      1718196602032.png

      引入 Maomi.MQ.Instrumentation 包,以及其它相關 OpenTelemetry 包。

      <PackageReference Include="Maomi.MQ.Instrumentation " Version="1.1.0" />
      <PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.8.1" />
      <PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.8.1" />
      <PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.8.1" />
      <PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.8.1" />
      

      然后注入服務:

      const string serviceName = "myapp";
      
      builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
      {
      	options.WorkId = 1;
      	options.AutoQueueDeclare = true;
      	options.AppName = serviceName;
      	options.Rabbit = (ConnectionFactory options) =>
      	{
      		options.HostName = "192.168.3.248";
      		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
      	};
      }, [typeof(Program).Assembly]);
      
      builder.Services.AddOpenTelemetry()
      	  .ConfigureResource(resource => resource.AddService(serviceName))
      	  .WithTracing(tracing =>
      	  {
      		  tracing.AddMaomiMQInstrumentation(options =>
      		  {
                    options.Sources.AddRange(MaomiMQDiagnostic.Sources);
      			  options.RecordException = true;
      		  })
      		  .AddAspNetCoreInstrumentation()
      		  .AddOtlpExporter(options =>
      		  {
      			  options.Endpoint = new Uri("http://127.0.0.1:32772/v1/traces");
      			  options.Protocol = OtlpExportProtocol.HttpProtobuf;
      		  });
      	  });
      

      啟動服務后,進行發布、消費,鏈路追蹤信息會被自動推送到 OpenTelemetry Collector 中,通過 Jaeger 、Skywalking 等組件可以讀取出來。

      image-20240612205140595

      由于 publish、consumer 屬于兄弟 trace 而不是同一個 trace,因此需要通過 Tags 查詢相關聯的 trace,格式 event.id=xxx

      1718196773292

      3662d0c35aaac72c77046a430988e87

      posted @ 2024-06-13 08:31  癡者工良  閱讀(4538)  評論(25)    收藏  舉報
      主站蜘蛛池模板: av亚洲在线一区二区| 国产伦码精品一区二区| 国99久9在线 | 免费| 亚洲日韩性欧美中文字幕| 国内精品久久人妻互换| 精品视频福利| 视频一区二区不中文字幕| 九九综合va免费看| 日本亚洲色大成网站www久久| 国产精品乱码一区二区三| 亚洲+成人+国产| 亚洲av成人无码精品电影在线| 亚洲第一无码AV无码专区| 亚洲国内精品一区二区| 临沭县| 久久精品无码免费不卡| 手机看片日本在线观看视频| 山阳县| 国精品91人妻无码一区二区三区 | 三门县| 日本亚洲一区二区精品久久| 亚洲天堂激情av在线| 国产精品国三级国产av| 熟妇无码熟妇毛片| 国产精品久久久久影院亚瑟| 中文字幕亚洲人妻系列| 亚洲色最新高清AV网站| 亚洲欧美中文日韩v在线97| 亚洲国产精品午夜福利| 国产精品黄色片在线观看| 暖暖免费观看电视在线高清 | 国产片av在线观看国语| 乱码中字在线观看一二区| 抚宁县| 亚洲欧洲日产国产 最新| 广东少妇大战黑人34厘米视频| 亚洲AVAV天堂AV在线网阿V| 国产av亚洲一区二区| 四虎库影成人在线播放| 激情文学一区二区国产区| 国产精品中文字幕自拍|