開源一款功能強大的 .NET 消息隊列通訊模型框架 Maomi.MQ
文檔說明
作者:癡者工良
倉庫地址:https://github.com/whuanle/Maomi.MQ
作者博客:
導讀
Maomi.MQ 是一個消息通訊模型項目,目前只支持了 RabbitMQ。
Maomi.MQ.RabbitMQ 是一個用于專為 RabbitMQ 設計的發布者和消費者通訊模型,大大簡化了發布和消息的代碼,并提供一系列簡便和實用的功能,開發者可以通過框架提供的消費模型實現高性能消費、事件編排,框架還支持發布者確認機制、自定義重試機制、補償機制、死信隊列、延遲隊列、連接通道復用等一系列的便利功能。開發者可以把更多的精力放到業務邏輯中,通過 Maomi.MQ.RabbitMQ 框架簡化跨進程消息通訊模式,使得跨進程消息傳遞更加簡單和可靠。
此外,框架通過 runtime 內置的 api 支持了分布式可觀測性,可以通過進一步使用 OpenTelemetry 等框架進一步收集可觀測性信息,推送到基礎設施平臺中。
快速開始
本文將快速介紹 Maomi.MQ.RabbitMQ 的使用方法。
引入 Maomi.MQ.RabbitMQ 包,在 Web 配置中注入服務:
builder.Services.AddSwaggerGen();
builder.Services.AddLogging();
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]);
var app = builder.Build();
- WorkId: 指定用于生成分布式雪花 id 的節點 id,默認為 0。
- AppName:用于標識消息的生產者,以及在日志和鏈路追蹤中標識消息的生產者或消費者。
- Rabbit:RabbitMQ 客戶端配置,請參考 ConnectionFactory。
如果是控制臺項目,則需要引入 Microsoft.Extensions.Hosting 包。
var host = new HostBuilder()
.ConfigureLogging(options =>
{
options.AddConsole();
options.AddDebug();
})
.ConfigureServices(services =>
{
services.AddMaomiMQ(options =>
{
options.WorkId = 1;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, new System.Reflection.Assembly[] { typeof(Program).Assembly });
// Your services.
services.AddHostedService<MyPublishAsync>();
}).Build();
await host.RunAsync();
定義消息模型類,該模型類將會被序列化為二進制內容傳遞到 RabbitMQ 服務器中。
public class TestEvent
{
public int Id { get; set; }
public override string ToString()
{
return Id.ToString();
}
}
定義消費者,消費者需要實現 IConsumer<TEvent> 接口,以及使用 [Consumer] 特性注解配置消費者屬性。
[Consumer("test", Qos = 1, RetryFaildRequeue = true)]
public class MyConsumer : IConsumer<TestEvent>
{
private static int _retryCount = 0;
// 消費
public async Task ExecuteAsync(EventBody<TestEvent> message)
{
Console.WriteLine($"事件 id: {message.Id} {DateTime.Now}");
await Task.CompletedTask;
}
// 每次消費失敗時執行
public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;
// 補償
public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true);
}
然后注入 IMessagePublisher 服務發布消息:
[ApiController]
[Route("[controller]")]
public class IndexController : ControllerBase
{
private readonly IMessagePublisher _messagePublisher;
public IndexController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}
[HttpGet("publish")]
public async Task<string> Publisher()
{
// 發布消息
await _messagePublisher.PublishAsync(queue: "test", message: new TestEvent
{
Id = i
});
return "ok";
}
}
消息發布者
消息發布者用于推送消息到 RabbitMQ 服務器中。
通過注入 IMessagePublisher 接口即可向 RabbitMQ 推送消息,示例項目請參考 PublisherWeb。
定義一個事件模型類:
public class TestEvent
{
public int Id { get; set; }
public override string ToString()
{
return Id.ToString();
}
}
注入 IMessagePublisher 服務后發布消息:
[ApiController]
[Route("[controller]")]
public class IndexController : ControllerBase
{
private readonly IMessagePublisher _messagePublisher;
public IndexController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}
[HttpGet("publish")]
public async Task<string> Publisher()
{
for (var i = 0; i < 100; i++)
{
await _messagePublisher.PublishAsync(queue: "PublisherWeb", message: new TestEvent
{
Id = i
});
}
return "ok";
}
}
IMessagePublisher
IMessagePublisher 定義比較簡單,只有三個方法和一個屬性:
public ConnectionPool ConnectionPool { get; }
Task PublishAsync<TEvent>(string queue, TEvent message, Action<IBasicProperties>? properties = null)
where TEvent : class;
Task PublishAsync<TEvent>(string queue, TEvent message, IBasicProperties properties);
// 不建議直接使用該接口。
Task CustomPublishAsync<TEvent>(string queue, EventBody<TEvent> message, BasicProperties properties);
三個 PublishAsync 方法用于發布事件,ConnectionPool 屬性用于獲取 RabbitMQ.Client.IConnection 對象。
由于直接公開了 BasicProperties ,因此開發者完全自由配置 RabbitMQ 原生的消息屬性,所以 Maomi.MQ.RabbitMQ 沒必要過度設計,只提供了簡單的功能接口。
例如,可以通過 BasicProperties 配置單條消息的過期時間:
await _messagePublisher.PublishAsync(queue: "RetryWeb", message: new TestEvent
{
Id = i
}, (BasicProperties p) =>
{
p.Expiration = "1000";
});
當發布一條消息時,實際上框架傳遞的是 EventBody<T> 類型,EventBody<T> 中包含了一些重要的附加消息屬性,這些屬性會給消息處理和故障診斷帶來很大的方便。
public class EventBody<TEvent>
{
// 事件唯一 id.
public long Id { get; init; }
// Queue.
public string Queue { get; init; } = null!;
// App name.
public string Publisher { get; init; } = null!;
// 事件創建時間.
public DateTimeOffset CreationTime { get; init; }
// 事件體.
public TEvent Body { get; init; } = default!;
}
Maomi.MQ 通過 DefaultMessagePublisher 類型實現了 IMessagePublisher,DefaultMessagePublisher 默認生命周期是 Singleton:
services.AddSingleton<IMessagePublisher, DefaultMessagePublisher>();
生命周期不重要,如果需要修改默認的生命周期,可以手動修改替換。
services.AddScoped<IMessagePublisher, DefaultMessagePublisher>();
開發者也可以自行實現 IMessagePublisher 接口,具體示例請參考 DefaultMessagePublisher 類型。
連接池
為了復用 RabbitMQ.Client.IConnection ,Maomi.MQ.RabbitMQ 內部實現了 ConnectionPool 類型,通過對象池維護復用的 RabbitMQ.Client.IConnection 對象。
默認對象池中的 RabbitMQ.Client.IConnection 數量為 0,只有當連接被真正使用時才會從對象池委托中創建,連接對象會隨著程序并發量而自動增加,但是,默認最大連接對象數量為 Environment.ProcessorCount * 2。
除了 IMessagePublisher 接口提供的 PublishAsync 方法可以發布事件,開發者還可以從 ConnectionPool 獲取連接對象,請務必在使用完畢后通過 ConnectionPool.Return() 方法將其歸還到連接對象池。
通過連接池直接使用 IConnection 對象發布消息:
[HttpGet("publish")]
public async Task<string> Publisher()
{
for (var i = 0; i < 100; i++)
{
var connectionPool = _messagePublisher.ConnectionPool;
var connection = connectionPool.Get();
try
{
connection.Channel.BasicPublishAsync(
exchange: string.Empty,
routingKey: "queue",
basicProperties: properties,
body: _jsonSerializer.Serializer(message),
mandatory: true);
}
finally
{
connectionPool.Return(connection);
}
}
return "ok";
}
你也可以繞開 IMessagePublisher ,直接注入 ConnectionPool 使用 RabbitMQ 連接對象,但是不建議這樣使用。
private readonly ConnectionPool _connectionPool;
public DefaultMessagePublisher(ConnectionPool connectionPool)
{
_connectionPool = connectionPool;
}
public async Task MyPublshAsync()
{
var connection = _connectionPool.Get();
try
{
await connection.Channel.BasicPublishAsync(...);
}
finally
{
_connectionPool.Return(connection);
}
}
為了更加簡便地管理連接對象,可以使用 CreateAutoReturn() 函數創建連接管理對象,該對象被釋放時會自動將 IConnection 返還給連接池。
using var poolObject = _messagePublisher.ConnectionPool.CreateAutoReturn();
poolObject.Channel.BasicPublishAsync(
exchange: string.Empty,
routingKey: "queue",
basicProperties: properties,
body: _jsonSerializer.Serializer(message),
mandatory: true);
如果你自行使用 ConnectionPool 推送消息到 RabbitMQ,請務必通過序列化 EventBody<TEvent> 事件對象,這樣 Maomi.MQ.RabbitMQ 消費者才能正常工作。同時,Moami.MQ 對可觀測性做了支持,如果自行使用 ConnectionPool 獲取連接對象推送消息,可能會導致可觀測性信息缺失。
正常情況下,RabbitMQ.Client 中包含了可觀測性的功能,但是 Maomi.MQ.RabbitMQ 附加的可觀測性信息有助于診斷故障問題。
請注意:
-
Maomi.MQ.RqbbitMQ 通過
EventBody<TEvent>泛型對象發布和接收事件。 -
DefaultMessagePublisher 包含了鏈路追蹤等可觀測性代碼。
消息過期
IMessagePublisher 對外開放了 BasicProperties 或 BasicProperties,可以自由配置消息屬性。
例如為消息配置過期時間:
[HttpGet("publish")]
public async Task<string> Publisher()
{
for (var i = 0; i < 1; i++)
{
await _messagePublisher.PublishAsync(queue: "test", message: new TestEvent
{
Id = i
}, properties =>
{
properties.Expiration = "6000";
});
}
return "ok";
}
如果此時為 test 綁定死信隊列,那么該消息長時間沒有被消費時,會被移動到另一個隊列,請參考 死信隊列。
還可以通過配置消息屬性實現更多的功能,請參考 IBasicProperties。
事務
RabbitMQ 支持事務,不過據 RabbitMQ 官方文檔顯示,事務會使吞吐量減少 250 倍。
RabbitMQ 事務使用上比較簡單,可以保證發布的消息已經被推送到 RabbitMQ 服務器,只有當提交事務時,提交的消息才會被 RabbitMQ 存儲并推送給消費者。
使用示例:
[HttpGet("publish_tran")]
public async Task<string> Publisher_Tran()
{
using var tranPublisher = await _messagePublisher.TxSelectAsync();
try
{
await tranPublisher.PublishAsync(queue: "publish_tran", message: new TestEvent
{
Id = 666
});
await tranPublisher.TxCommitAsync();
}
catch
{
await tranPublisher.TxRollbackAsync();
throw;
}
return "ok";
}
或者手動開啟事務:
[HttpGet("publish_tran")]
public async Task<string> Publisher_Tran()
{
using var tranPublisher = _messagePublisher.CreateTransaction();
try
{
await tranPublisher.TxSelectAsync();
await tranPublisher.PublishAsync(queue: "publish_tran", message: new TestEvent
{
Id = 666
});
await tranPublisher.TxCommitAsync();
}
catch
{
await tranPublisher.TxRollbackAsync();
throw;
}
return "ok";
}
注意,在該種模式之下,創建 TransactionPublisher 對象時,會從對象池中取出一個連接對象,因為開啟事務模式可能會污染當前連接通道,因此 TransactionPublisher 不會向連接池歸還連接對象,而是直接釋放。
發送方確認模式
雖然事務模式可以保證消息會被推送到 RabbitMQ 服務器中,但是由于事務模式會導致吞吐量降低 250 倍,因此不是一個好的選擇。為了解決這個問題, RabbitMQ 引入了一種確認機制,這種機制就像滑動窗口,能夠保證消息推送到服務器中,并且具備高性能的特性。
使用示例:
[HttpGet("publish_confirm")]
public async Task<string> Publisher_Confirm()
{
using var confirmPublisher = await _messagePublisher.ConfirmSelectAsync();
for (var i = 0; i < 5; i++)
{
await confirmPublisher.PublishAsync(queue: "publish_confirm1", message: new TestEvent
{
Id = 666
});
var result = await confirmPublisher.WaitForConfirmsAsync();
// 如果在超時內沒有接收到 nacks,則為 True,否則為 false。
Console.WriteLine($"發布 {i},{result}");
}
return "ok";
}
WaitForConfirmsAsync 方法會返回一個值,如果正常被服務器確認了消息已經傳達,則結果為 true,如果超時沒有被服務器確認,則返回 false。
此外,還有一個 WaitForConfirmsOrDieAsync 方法,它會一直等待該頻道上的所有已發布消息都得到確認,使用示例:
using var confirmPublisher = await _messagePublisher.ConfirmSelectAsync();
for (var i = 0; i < 5; i++)
{
await confirmPublisher.PublishAsync(queue: "publish_confirm1", message: new TestEvent
{
Id = 666
});
Console.WriteLine($"發布 {i}");
}
await confirmPublisher.WaitForConfirmsOrDieAsync();
注意,在該種模式之下,創建 ConfirmPublisher 對象時,會從對象池中取出一個連接對象,因為開啟事務模式可能會污染當前連接通道,因此 ConfirmPublisher 不會向連接池歸還連接對象,而是直接釋放。
注意,同一個通道不能同時使用事務和發送方確認模式。
獨占模式
默認情況下,每次使用 IMessagePublisher.PublishAsync() 發布消息時,都會從連接池中取出連接對象,然后使用該連接通道發布消息,發布完畢后就會歸還連接對象給連接池。
如果需要在短時間內大批量發布消息,則需要每次都要重復獲取和返還連接對象。
使用獨占模式時可以在一段時間內獨占一個連接對象,超出作用域后,連接對象會自動放回連接池。這種模式對于需要大量發布消息的場景提高吞吐量非常有幫助。為了能夠將連接通道歸還連接池,請務必使用 using 關鍵字修飾變量,或者手動調用 Dispose 函數。
使用示例:
// 創建獨占模式
using var singlePublisher = _messagePublisher.CreateSingle();
for (var i = 0; i < 500; i++)
{
await singlePublisher.PublishAsync(queue: "publish_single", message: new TestEvent
{
Id = 666
});
}
消費者
Maomi.MQ.RabbitMQ 中,有兩種消費模式,一種是消費者模式,一種是事件模式(事件總線模式)。
下面簡單了解這兩種模式的使用方法。
消費者模式
消費者服務需要實現 IConsumer<TEvent> 接口,并且配置 [Consumer("queue")] 特性綁定隊列名稱,通過消費者對象來控制消費行為。
消費者模式有具有失敗通知和補償能力,使用上也比較簡單。
public class TestEvent
{
public int Id { get; set; }
}
[Consumer("PublisherWeb", Qos = 1, RetryFaildRequeue = true)]
public class MyConsumer : IConsumer<TestEvent>
{
private static int _retryCount = 0;
// 消費或重試
public async Task ExecuteAsync(EventBody<TestEvent> message)
{
_retryCount++;
Console.WriteLine($"執行次數:{_retryCount} 事件 id: {message.Id} {DateTime.Now}");
await Task.CompletedTask;
}
// 失敗
public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;
// 補償
public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true);
}
事件模式
事件模式是通過事件總線的方式實現的,以事件模型為中心,通過事件來控制消費行為。
[EventTopic("web2", Qos = 1, RetryFaildRequeue = true)]
public class TestEvent
{
public string Message { get; set; }
}
然后使用 [EventOrder] 特性編排事件執行順序。
// 編排事件消費順序
[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
}
public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id},事件 1 已被執行");
}
}
[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
}
public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id},事件 2 已被執行");
}
}
當然,事件模式也可以通過創建中間件增加補償功能,通過中間件還可以將所有排序事件放到同一個事務中,一起成功或失敗,避免事件執行時出現程序退出導致的一致性問題。
public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public TestEventMiddleware(BloggingContext bloggingContext)
{
_bloggingContext = bloggingContext;
}
public async Task ExecuteAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
{
using (var transaction = _bloggingContext.Database.BeginTransaction())
{
await next(@event, CancellationToken.None);
await transaction.CommitAsync();
}
}
public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
return Task.CompletedTask;
}
public Task<bool> FallbackAsync(EventBody<TestEvent>? message)
{
return Task.FromResult(true);
}
}
分組
消費者模式和事件模式都可以設置分組,在特性上設置了 Group 屬性,具有同一個分組的事件會被放到一個連接通道(RabbitMQ.Client.IConnection)中,對于消費頻率不高的事件,復用連接通道可以有效較低資源消耗。
消費者模式分組示例:
[Consumer("ConsumerWeb_group_1", Qos = 1, Group = "group")]
public class Group_1_Consumer : IConsumer<GroupEvent>
{
public Task ExecuteAsync(EventBody<GroupEvent> message) => Task.CompletedTask;
public Task FaildAsync(Exception ex, int retryCount, EventBody<GroupEvent>? message) => Task.CompletedTask;
public Task<bool> FallbackAsync(EventBody<GroupEvent>? message) => Task.FromResult(true);
}
[Consumer("ConsumerWeb_group_2", Qos = 1, Group = "group")]
public class Group_2_Consumer : IConsumer<GroupEvent>
{
public Task ExecuteAsync(EventBody<GroupEvent> message) => Task.CompletedTask;
public Task FaildAsync(Exception ex, int retryCount, EventBody<GroupEvent>? message) => Task.CompletedTask;
public Task<bool> FallbackAsync(EventBody<GroupEvent>? message) => Task.FromResult(true);
}
事件總線模式分組示例:
[EventTopic("web1", Qos = 1, RetryFaildRequeue = true, Group = "group")]
public class Test1Event
{
public string Message { get; set; }
}
[EventTopic("web2", Qos = 1, RetryFaildRequeue = true, Group = "group")]
public class Test2Event
{
public string Message { get; set; }
}
消費者模式
消費者模式要求服務實現 IConsumer<TEvent> 接口,并添加 [Connsumer] 特性。
IConsumer<TEvent> 接口比較簡單,其定義如下:
public interface IConsumer<TEvent>
where TEvent : class
{
// 消息處理.
public Task ExecuteAsync(EventBody<TEvent> message);
// ExecuteAsync 異常后立即執行此代碼.
public Task FaildAsync(Exception ex, int retryCount, EventBody<TEvent>? message);
// 最后一次重試失敗時執行,用于補償.
public Task<bool> FallbackAsync(EventBody<TEvent>? message);
}
使用消費者模式時,需要先定義一個模型類,用于發布者和消費者之間傳遞消息,事件模型類只要是類即可,能夠正常序列化和反序列化,沒有其它要求。
public class TestEvent
{
public int Id { get; set; }
public override string ToString()
{
return Id.ToString();
}
}
然后繼承 IConsumer<TEvent> 接口實現消費者功能:
[Consumer("web1", Qos = 1)]
public class MyConsumer : IConsumer<TestEvent>
{
// 消費
public async Task ExecuteAsync(EventBody<TestEvent> message)
{
Console.WriteLine(message.Body.Id);
}
// 每次失敗時被執行
public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
Console.WriteLine($"重試 {message.Body.Id},次數 {retryCount}");
await Task.CompletedTask;
}
// 最后一次失敗時執行
public async Task<bool> FallbackAsync(EventBody<TestEvent>? message)
{
Console.WriteLine($"最后一次 {message.Body.Id}");
// 如果返回 true,說明補償成功。
return true;
}
}
特性配置的說明請參考 消費者配置 。
消費、重試和補償
消費者收到服務器推送的消息時,ExecuteAsync 方法會被自動執行。當 ExecuteAsync 執行異常時,FaildAsync 方法會馬上被觸發,開發者可以利用 FaildAsync 記錄相關日志信息。
// 每次失敗時被執行
public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
// 當 retryCount == -1 時,錯誤并非是 ExecuteAsync 方法導致的
if (retryCount == -1)
{
_logger.LogError(ex, "Consumer error,event id: {Id}", message?.Id);
// 可以在此處添加告警通知代碼
await Task.Delay(1000);
}
else
{
_logger.LogError(ex, "Consumer exception,event id: {Id},retry count: {retryCount}", message!.Id, retryCount);
}
}
如果 FaildAsync 方法也出現異常時,不會影響整體流程,框架會等待到達間隔時間后繼續重試 ExecuteAsync 方法。
建議 FaildAsync 使用 try{}cathc{} 套住代碼,不要對外拋出異常,FaildAsync 的邏輯不要包含太多邏輯,并且 FaildAsync 只應記錄日志或進行告警使用。
FaildAsync 被執行有一個額外情況,就是在消費消息之前就已經發生錯誤,例如一個事件模型類有構造函數導致不能被反序列化,這個時候 FaildAsync 會被立即執行,且 retryCount = -1。
當 ExecuteAsync 方法執行異常時,框架會自動重試,默認會重試五次,如果五次都失敗,則會執行 FallbackAsync 方法進行補償。
重試間隔時間會逐漸增大,請參考 重試。
當重試五次之后,就會立即啟動補償機制。
// 最后一次失敗時執行
public async Task<bool> FallbackAsync(EventBody<TestEvent>? message)
{
return true;
}
FallbackAsync 方法需要返回 bool,如果返回 true ,表示雖然 ExecuteAsync 出現異常,但是 FallbackAsync 補償后已經正常,該消息會被正常消費掉。如果返回 false,則說補償失敗,該消息按照消費失敗處理。
只有 ExecuteAsync 異常時,才會觸發 FaildAsync 和 FallbackAsync ,如果是在處理消息之前的異常,會直接失敗。

消費失敗
當 ExecuteAsync 失敗次數達到閾值時,并且 FallbackAsync 返回 false,則該條消息消費失敗,或者由于序列化等錯誤時直接失敗。
在 [Consumer] 特性中有三個很重要的配置:
public class ConsumerAttribute : Attribute
{
// 消費失敗次數達到條件時,是否放回隊列.
public bool RetryFaildRequeue { get; set; }
// 現異常時是否放回隊列,例如序列化錯誤等原因導致的,而不是消費時發生異常導致的.
public bool ExecptionRequeue { get; set; } = true;
// 綁定死信隊列.
public string? DeadQueue { get; set; }
}
當 ExecuteAsync 失敗次數達到閾值時,并且 FallbackAsync 返回 false,則該條消息消費失敗。
如果 RetryFaildRequeue == false,那么該條消息會被 RabbitMQ 丟棄。
如果綁定了死信隊列,則會先推送到死信隊列,接著再丟棄。
如果 RetryFaildRequeue == true,那么該條消息會被返回 RabbbitMQ 隊列中,等待下一次消費。
由于消息失敗后會被放回隊列,因此綁定的死信隊列不會收到該消息。
當序列化異常或者其它問題導致錯誤而不能進入 ExecuteAsync 方法時,FaildAsync 方法會首先被觸發一次,此時 retryCount 參數值為 -1。
出現此種問題時,一般是開發者 bug 導致的,不會進行補償等操作,開發者可以在 FaildAsync 中處理該事件,記錄相關日志信息。
// 每次失敗時被執行,或者出現無法進入 ExecuteAsync 的異常
public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
// 當 retryCount == -1 時,錯誤并非是 ExecuteAsync 方法導致的
if (retryCount == -1)
{
_logger.LogError(ex, "Consumer error,event id: {Id}", message?.Id);
// 可以在此處添加告警通知代碼
await Task.Delay(1000);
}
else
{
_logger.LogError(ex, "Consumer exception,event id: {Id},retry count: {retryCount}", message!.Id, retryCount);
}
}
由于這種情況不妥善處理,會導致消息丟失,因此框架默認將 ExecptionRequeue 設置為 true,也就是說出現這種異常時,消息會被放回隊列。如果問題一致沒有得到解決,則會出現循環:調用 FaildAsync 、放回隊列、調用 FaildAsync 、放回隊列... ...
所以應該在 FaildAsync 中添加代碼通知開發者相關信息,并且設置間隔時間,避免重試太頻繁。
自動創建隊列
框架默認會自動創建隊列,如果需要關閉自動創建功能,把 AutoQueueDeclare 設置為 false 即可。
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AppName = "myapp";
options.AutoQueueDeclare = false;
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]);
當然還可以單獨為消費者配置是否自動創建隊列:
[Consumer("ConsumerWeb_create", AutoQueueDeclare = AutoQueueDeclare.Enable)]
默認情況下,關閉了全局自動創建,則不會自動創建隊列。
如果關閉全局自動創建,但是消費者配置了 AutoQueueDeclare = AutoQueueDeclare.Enable,則還是會自動創建隊列。
如果消費者配置了 AutoQueueDeclare = AutoQueueDeclare.Disable ,則會忽略全局配置,不會創建隊列。
Qos
讓程序需要嚴格根據順序消費時,可以使用 Qos = 1,框架會嚴格保證逐條消費,如果程序不需要順序消費,希望可以快速處理所有消息,則可以將 Qos 設置大一些。由于 Qos 和重試、補償機制組合使用會有多種情況,因此請參考 重試。
Qos 是通過特性來配置的:
[Consumer("ConsumerWeb", Qos = 1)]
可以通過調高 Qos 值,讓程序在可以并發消息,提高并發量。
延遲隊列
延遲隊列有兩種,一種設置消息過期時間,一種是設置隊列過期時間。
設置消息過期時間,那么該消息在一定時間沒有被消費時,會被丟棄或移動到死信隊列中,該配置只對單個消息有效,請參考 消息過期。
隊列設置過期后,當消息在一定時間內沒有被消費時,會被丟棄或移動到死信隊列中,該配置只對所有消息有效。基于這一點,我們可以實現延遲隊列。
首先創建消費者,繼承 EmptyConsumer,那么該隊列會在程序啟動時被創建,但是不會創建 IConnection 進行消費。然后設置隊列消息過期時間以及綁定死信隊列,綁定的死信隊列既可以使用消費者模式實現,也可以使用事件模式實現。
[Consumer("ConsumerWeb_dead_2", Expiration = 6000, DeadQueue = "ConsumerWeb_dead_queue_2")]
public class EmptyDeadConsumer : EmptyConsumer<DeadEvent>
{
}
// ConsumerWeb_dead 消費失敗的消息會被此消費者消費。
[Consumer("ConsumerWeb_dead_queue_2", Qos = 1)]
public class Dead_2_QueueConsumer : IConsumer<DeadQueueEvent>
{
// 消費
public Task ExecuteAsync(EventBody<DeadQueueEvent> message)
{
Console.WriteLine($"死信隊列,事件 id:{message.Id}");
return Task.CompletedTask;
}
// 每次失敗時被執行
public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadQueueEvent>? message) => Task.CompletedTask;
// 最后一次失敗時執行
public Task<bool> FallbackAsync(EventBody<DeadQueueEvent>? message) => Task.FromResult(false);
}
空消費者
當識別到空消費者時,框架只會創建隊列,而不會啟動消費者消費消息。
可以結合延遲隊列一起使用,該隊列不會有任何消費者,當該隊列的消息過期時,都由死信隊列直接消費,示例如下:
[Consumer("ConsumerWeb_empty", Expiration = 6000, DeadQueue = "ConsumerWeb_empty_dead")]
public class MyEmptyConsumer : EmptyConsumer<TestEvent> { }
[Consumer("ConsumerWeb_empty_dead", Qos = 10)]
public class MyDeadConsumer : IConsumer<TestEvent>
{
public Task ExecuteAsync(EventBody<TestEvent> message) => Task.CompletedTask;
public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;
public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true);
}
對于跨進程的隊列,A 服務不消費只發布,B 服務負責消費,A 服務中可以加一個空消費者,保證 A 服務啟動時該隊列一定存在,另一方面,消費者服務不應該關注隊列的定義,也不太應該創建隊列。
分組
通過配置 Group 屬性將多個消費者放到同一個連接通道中執行,對于那些并發量不高的隊列,復用連接通道可以降低資源消耗。
示例:
[Consumer("ConsumerWeb_group_1", Qos = 1, Group = "group")]
public class Group_1_Consumer : IConsumer<GroupEvent>
{
}
[Consumer("ConsumerWeb_group_2", Qos = 1, Group = "group")]
public class Group_2_Consumer : IConsumer<GroupEvent>
{
}
事件總線模式
Maomi.MQ 內部設計了一個事件總線,可以幫助開發者實現事件編排、實現本地事務、正向執行和補償。
首先定義一個事件類型,該事件綁定一個 topic 或隊列,事件需要使用 [EventTopic] 標識,并設置該事件對于的隊列名稱。
[EventTopic] 特性擁有與 [Consumer] 相同的特性,可參考 [Consumer] 的使用配置事件,請參考 消費者配置。
[EventTopic("EventWeb")]
public class TestEvent
{
public string Message { get; set; }
public override string ToString()
{
return Message;
}
}
然后編排事件執行器,每個執行器都需要繼承 IEventHandler<T> 接口,然后使用 [EventOrder] 特性標記執行順序。
[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
}
public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id},事件 1 已被執行");
}
}
[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
}
public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id},事件 2 已被執行");
}
}
每個事件執行器都必須實現 IEventHandler<T> 接口,并且設置 [EventOrder] 特性以便確認事件的執行順序,框架會按順序執行 IEventHandler<T> 的 ExecuteAsync 方法,當 ExecuteAsync 出現異常時,則反向按順序調用 CancelAsync。
由于程序可能隨時掛掉,因此通過 CancelAsync 實現補償是不太可能的,CancelAsync 主要作為記錄相關信息而使用。
中間件
中間件的作用是便于開發者攔截事件、記錄信息、實現本地事務等,如果開發者不配置,則框架會自動創建 DefaultEventMiddleware<TEvent> 類型作為該事件的中間件服務。
自定義事件中間件示例代碼:
public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
public async Task HandleAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
{
await next(@event, CancellationToken.None);
}
}
next 委托是框架構建的事件執行鏈路,在中間件中可以攔截事件、決定是否執行事件鏈路。
在中間件中調用 next() 委托時,框架開始按順序執行事件,即前面提到的 My1EventEventHandler、My2EventEventHandler。
當一個事件有多個執行器時,由于程序可能會在任何時刻掛掉,因此本地事務必不可少。
例如,在中間件中注入數據庫上下文,然后啟動事務執行數據庫操作,當其中一個 EventHandler 執行失敗時,執行鏈路會回滾,同時不會提交事務。
可以參考 消費者模式 實現中間件的重試和補償方法。
示例如下:
public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public TestEventMiddleware(BloggingContext bloggingContext)
{
_bloggingContext = bloggingContext;
}
public async Task HandleAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
{
using (var transaction = _bloggingContext.Database.BeginTransaction())
{
await next(@event, CancellationToken.None);
await transaction.CommitAsync();
}
}
public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
return Task.CompletedTask;
}
public Task<bool> FallbackAsync(EventBody<TestEvent>? message)
{
return Task.FromResult(true);
}
}
[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public My1EventEventHandler(BloggingContext bloggingContext)
{
_bloggingContext = bloggingContext;
}
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id} 被補償,[1]");
}
public async Task HandlerAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
await _bloggingContext.Posts.AddAsync(new Post
{
Title = "魯濱遜漂流記",
Content = "隨便寫寫就對了"
});
await _bloggingContext.SaveChangesAsync();
}
}
[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public My2EventEventHandler(BloggingContext bloggingContext)
{
_bloggingContext = bloggingContext;
}
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id} 被補償,[2]");
}
public async Task HandlerAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
await _bloggingContext.Posts.AddAsync(new Post
{
Title = "紅樓夢",
Content = "賈寶玉初試云雨情"
});
await _bloggingContext.SaveChangesAsync();
throw new OperationCanceledException("故意報錯");
}
}

事件執行時,如果出現異常,也是會被重試的,中間件 TestEventMiddleware 的 FaildAsync、FallbackAsync 會被依次執行。
分組消費
事件分組消費主要是利用同一個 IConnection 同時處理多個消息隊列,提高通道利用率。
示例:
[EventTopic("EventGroup_1", Group = "aaa")]
public class Test1Event
{
public string Message { get; set; }
public override string ToString()
{
return Message;
}
}
[EventTopic("EventGroup_2", Group = "aaa")]
public class Test2Event
{
public string Message { get; set; }
public override string ToString()
{
return Message;
}
}
Maomi.MQ 的 IConsumer<T> 是一個消費者(一個隊列)使用一個 IConnection,默認情況下事件總線也是。
對于哪些并發量不大或利用率較低的隊列,可以通過事件分組將其合并到同一個 IConnection 中進行處理。
使用方法很簡單,只需要在定義事件時,配置 [EventTopic] 特性的 Group 方法即可。
由于不同隊列被放到一個 IConnection 中消費,如果事件都設置了 Qos,那么框架會默認計算平均值,例如:
[EventTopic("web3_1", Group = "aaa", Qos = 10)]
public class Test1Event
[EventTopic("web3_2", Group = "aaa", Qos = 6)]
public class Test2Event
此時框架會設置 Qos 為 8。
配置
在引入 Maomi.MQ 框架時,可以配置相關屬性,示例和說明如下:
// this.
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
// 當前程序節點,用于配置分布式雪花 id
options.WorkId = 1;
// 是否自動創建隊列
options.AutoQueueDeclare = true;
// 當前應用名稱,用于標識消息的發布者和消費者程序
options.AppName = "myapp";
// RabbitMQ 配置
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]); // 要被掃描的程序集
消費者配置
消費者模式 [Consumer] 和事件總線模式 [EventTopic] 具有相同的屬性配置,其配置說明如下:
| 名稱 | 類型 | 默認值 | 說明 |
|---|---|---|---|
| Queue | string | 隊列名稱 | |
| DeadQueue | string? | 綁定死信隊列名稱 | |
| ExecptionRequeue | bool | true | 出現異常時是否放回隊列,例如序列化錯誤等原因導致的,而不是消費時發生異常導致的 |
| Expiration | int | 0 | 隊列消息過期時間,單位毫秒 |
| Qos | ushort | 1 | Qos |
| RetryFaildRequeue | bool | false | 消費失敗次數達到條件時,是否放回隊列 |
| Group | string? | 分組名稱 | |
| AutoQueueDeclare | AutoQueueDeclare | AutoQueueDeclare.None | 是否自動創建隊列 |
環境隔離
目前還在考慮要不要支持多租戶模式。
在開發中,往往需要在本地調試,本地程序啟動后會連接到開發服務器上,一個隊列收到消息時,會向其中一個消費者推送消息。那么我本地調試時,發布一個消息后,可能本地程序收不到該消息,而是被開發環境中的程序消費掉了。
這個時候,我們希望可以將本地調試環境跟開發環境隔離開來,可以使用 RabbitMQ 提供的 VirtualHost 功能。
首先通過 put 請求創建一個新的 VirtualHost,請參考文檔:https://www.rabbitmq.com/docs/vhosts#using-http-api

然后在代碼中配置 VirtualHost:
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AutoQueueDeclare = true;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
#if DEBUG
options.VirtualHost = "debug";
#endif
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]);
雪花 id 配置
Maomi.MQ.RabbitMQ 使用了 IdGenerator 生成雪花 id,使得每個事件在集群中都有一個唯一 id。
框架通過 IIdFactory 接口創建雪花 id,你可以通過替換 IIdFactory 接口配置雪花 id 生成規則。
services.AddSingleton<IIdFactory>(new DefaultIdFactory((ushort)optionsBuilder.WorkId));
示例:
public class DefaultIdFactory : IIdFactory
{
/// <summary>
/// Initializes a new instance of the <see cref="DefaultIdFactory"/> class.
/// </summary>
/// <param name="workId"></param>
public DefaultIdFactory(ushort workId)
{
var options = new IdGeneratorOptions(workId) { SeqBitLength = 10 };
YitIdHelper.SetIdGenerator(options);
}
/// <inheritdoc />
public long NextId() => YitIdHelper.NextId();
}
IdGenerator 框架生成雪花 id 配置請參考:
https://github.com/yitter/IdGenerator/tree/master/C%23
Qos 并發和順序
基于消費者模式和基于事件模式都是通過特性來配置消費屬性,Qos 是其中一個重要的屬性。
Qos 場景
對于消費者模式和事件總線模式,在沒有使用 Group 屬性配置消費行為時,每個隊列都會獨占一個 IConnection 以及 Host service。
對于消費頻率很高但是不能并發的隊列,最好不要設置 Group 屬性,以及務必設置 Qos = 1。這樣依賴,該消費者會獨占資源進行消費,在保證順序的情況下,獨占資源有助于提高消費能力。
[Consumer("web1", Qos = 1)]
public class MyConsumer : IConsumer<TestEvent>
{
}
當需要需要提高消費吞吐量,而且不需要順序消費時,可以將 Qos 設置高一些,RabbitMQ Client 框架會通過預取等方式提高吞吐量,并且多條消息可以并發消費。
如果判斷一些消費者的消費頻率不會很高時,可以將這些消費者放到一個分組中。
當多個消費者或事件配置共用一個分組時,那么這些事件的 Qos 應當一致,否則按照平均值來算。
示例:
[Consumer("web1", Qos = 10, Group = "group")]
public class My1Consumer : IConsumer<TestEvent>
{
}
[Consumer("web2", Qos = 6, Group = "group")]
public class My2Consumer : IConsumer<TestEvent>
{
}
由于兩個消費者使用相同的分組,因此復用通道的 Qos 會被設置為 8。
如果消費頻率不高,但是需要順序消費時,可以將這些消費者放到同一個分組中,并且 Qos 設置為 1。
[Consumer("web1", Qos = 1, Group = "group1")]
public class My1Consumer : IConsumer<TestEvent>
{
}
[Consumer("web2", Qos = 1, Group = "group1")]
public class My2Consumer : IConsumer<TestEvent>
{
}
并發和異常處理
第一次情況,Qos 為 1 時,不設置 ExecptionRequeue 、RetryFaildRequeue。
第二種情況,Qos 為 1 時,設置 ExecptionRequeue 、RetryFaildRequeue。
Qos 為 1 時,會保證嚴格順序消費,ExecptionRequeue 、RetryFaildRequeue 會影響失敗的消息是否會被放回隊列,如果放回隊列,下一次消費會繼續消費之前失敗的消息。如果錯誤(如 bug)得不到解決,則會出現消費、失敗、放回隊列、重新消費這樣的循環。
第三次情況,Qos > 1 時,不設置 ExecptionRequeue 、RetryFaildRequeue。
第四種情況,Qos > 1 時,設置 ExecptionRequeue 、RetryFaildRequeue。
當 Qos 大于 1 時,如果設置了 RetryFaildRequeue = true,那么消費失敗的消息會被放回隊列中,但是不一定下一次會立即重新消費該條消息。
重試
重試時間
當消費者 ExecuteAsync 方法異常時,框架會進行重試,默認會重試五次,按照 2 作為指數設置重試時間間隔。
第一次失敗后,間隔 2 秒重試,第二次失敗后,間隔 4 秒,接著分別是 8、16、32 秒。
Maomi.MQ.RabbitMQ 使用了 Polly 框架做重試策略管理器,默認通過 DefaultRetryPolicyFactory 服務生成重試間隔策略。
DefaultRetryPolicyFactory 代碼示例如下:
/// <summary>
/// Default retry policy.<br />
/// 默認的策略提供器.
/// </summary>
public class DefaultRetryPolicyFactory : IRetryPolicyFactory
{
/// <inheritdoc/>
public virtual Task<AsyncRetryPolicy> CreatePolicy(string queue, long id)
{
// Create a retry policy.
// 創建重試策略.
var retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(
retryCount: 5,
sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
onRetry: async (exception, timeSpan, retryCount, context) =>
{
_logger.LogDebug("Retry execution event,queue [{Queue}],retry count [{RetryCount}],timespan [{TimeSpan}]", queue, retryCount, timeSpan);
await FaildAsync(queue, exception, timeSpan, retryCount, context);
});
return Task.FromResult(retryPolicy);
}
public virtual Task FaildAsync(string queue, Exception ex, TimeSpan timeSpan, int retryCount, Context context)
{
return Task.CompletedTask;
}
}
你可以通過實現 IRetryPolicyFactory 接口,替換默認的重試策略服務服務。
services.AddSingleton<IRetryPolicyFactory, DefaultRetryPolicyFactory>();
重試機制
設定消費者代碼如下:
[Consumer("web1", Qos = 1 , RetryFaildRequeue = true)]
public class MyConsumer : IConsumer<TestEvent>
{
private int _retryCount = 0;
// 消費
public async Task ExecuteAsync(EventBody<TestEvent> message)
{
Console.WriteLine($"執行 {message.Body.Id} 第幾次:{_retryCount} {DateTime.Now}");
_retryCount++;
throw new Exception("1");
}
// 每次失敗時被執行
public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
Console.WriteLine($"重試 {message.Body.Id} 第幾次:{retryCount} {DateTime.Now}");
await Task.CompletedTask;
}
// 最后一次失敗時執行
public async Task<bool> FallbackAsync(EventBody<TestEvent>? message)
{
Console.WriteLine($"執行 {message.Body.Id} 補償 {DateTime.Now}");
return true;
}
}
}

首先會執行 IConsumer<TEvent>.ExecuteAsync() 或 IEventMiddleware<TEvent>.ExecuteAsync() 消費消息,此時 ExecuteAsync() 執行失敗,立即觸發 FaildAsync() 函數。
然后等待一段時間間隔后,接著會重新執行 ExecuteAsync() 方法。
比如默認重試機制是重試五次,那么最終 IConsumer<TEvent>.ExecuteAsync() 或 IEventMiddleware<TEvent>.ExecuteAsync() 都會被執行 6次,一次正常消費和五次重試消費。
FallbackAsync() 方法會在最后一次重試失敗后被調用,該函數要返回一個 bool 類型。
當多次重試失敗后,框架會調用 FallbackAsync 方法,如果該方法放回 true,那么框架會認為雖然 ExecuteAsync() 執行失敗,但是通過 FallbackAsync() 已經補償好了,該消息會被當做正常完成消費,框架會向 RabbitMQ 服務器發送 ACK,接著消費下一條消息。
如果 FallbackAsync() 返回 false,框架會認為該消息徹底失敗,如果設置了 RetryFaildRequeue = true,那么該條消息會被放回消息隊列,等待下一次消費。否則該條消息會被直接丟棄。
持久化剩余重試次數
當消費者處理消息失敗時,默認消費者會重試 5 次,如果已經重試了 3 次,此時程序重啟,那么下一次消費該消息時,依然是繼續重試五次。
需要記憶重試次數,在程序重啟時,能夠按照剩余次數進行重試。
引入 Maomi.MQ.RedisRetry 包。
配置示例:
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AutoQueueDeclare = true;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]);
builder.Services.AddMaomiMQRedisRetry((s) =>
{
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.3.248");
IDatabase db = redis.GetDatabase();
return db;
});
默認 key 只會保留 5 分鐘。也就是說,如果五分鐘之后程序才重新消費該消息,那么就會剩余重試次數就會重置。
死信隊列
死信隊列
可以給一個消費者或事件綁定死信隊列,當該隊列的消息失敗后并且不會放回隊列時,該消息會被推送到死信隊列中,示例:
[Consumer("ConsumerWeb_dead", Qos = 1, DeadQueue = "ConsumerWeb_dead_queue", RetryFaildRequeue = false)]
public class DeadConsumer : IConsumer<DeadEvent>
{
// 消費
public Task ExecuteAsync(EventBody<DeadEvent> message)
{
Console.WriteLine($"事件 id:{message.Id}");
throw new OperationCanceledException();
}
// 每次失敗時被執行
public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadEvent>? message) => Task.CompletedTask;
// 最后一次失敗時執行
public Task<bool> FallbackAsync(EventBody<DeadEvent>? message) => Task.FromResult(false);
}
// ConsumerWeb_dead 消費失敗的消息會被此消費者消費。
[Consumer("ConsumerWeb_dead_queue", Qos = 1)]
public class DeadQueueConsumer : IConsumer<DeadQueueEvent>
{
// 消費
public Task ExecuteAsync(EventBody<DeadQueueEvent> message)
{
Console.WriteLine($"死信隊列,事件 id:{message.Id}");
return Task.CompletedTask;
}
// 每次失敗時被執行
public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadQueueEvent>? message) => Task.CompletedTask;
// 最后一次失敗時執行
public Task<bool> FallbackAsync(EventBody<DeadQueueEvent>? message) => Task.FromResult(false);
}

如果使用死信隊列,則務必將 RetryFaildRequeue 設置為 false,那么消費者會在重試多次失敗后,向 RabbitMQ 發送 nack 信號,RabbitMQ 就會將該消息轉發到綁定的死信隊列中。
延遲隊列
創建一個消費者,繼承 EmptyConsumer,那么該隊列會在程序啟動時被創建,但是不會創建 IConnection 進行消費。然后設置隊列消息過期時間以及綁定死信隊列,綁定的死信隊列既可以使用消費者模式實現,也可以使用事件模式實現。
[Consumer("ConsumerWeb_dead_2", Expiration = 6000, DeadQueue = "ConsumerWeb_dead_queue_2")]
public class EmptyDeadConsumer : EmptyConsumer<DeadEvent>
{
}
// ConsumerWeb_dead 消費失敗的消息會被此消費者消費。
[Consumer("ConsumerWeb_dead_queue_2", Qos = 1)]
public class Dead_2_QueueConsumer : IConsumer<DeadQueueEvent>
{
// 消費
public Task ExecuteAsync(EventBody<DeadQueueEvent> message)
{
Console.WriteLine($"事件 id:{message.Id} 已到期");
return Task.CompletedTask;
}
// 每次失敗時被執行
public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadQueueEvent>? message) => Task.CompletedTask;
// 最后一次失敗時執行
public Task<bool> FallbackAsync(EventBody<DeadQueueEvent>? message) => Task.FromResult(false);
}
例如,用戶下單之后,如果 15 分鐘之內沒有付款,那么消息到期時,自動取消訂單。
可觀測性
功能還在繼續完善中。請參考 ActivitySourceApi 示例。
為了快速部署可觀測性平臺,可以使用 OpenTelemetry 官方提供的示例包快速部署相關的服務。
下載示例倉庫源碼:
git clone https://github.com/open-telemetry/opentelemetry-demo.git
由于示例中會包含大量的 demo 微服務,因此我們需要打開 docker-compose.yml 文件,將 services 節點的 Core Demo Services 和 Dependent Services 服務直接刪除,只保留可觀測性組件。或者直接點擊下載筆者已經修改好的版本: docker-compose.yml

執行命令部署可觀測性服務:
docker-compose up -d

opentelemetry-collector-contrib 用于收集鏈路追蹤的可觀測性信息,有 grpc 和 http 兩種,監聽端口如下:
| Port | Protocol | Endpoint | Function |
|---|---|---|---|
| 4317 | gRPC | n/a | Accepts traces in OpenTelemetry OTLP format? (Protobuf). |
| 4318 | HTTP | /v1/traces |
Accepts traces in OpenTelemetry OTLP format? (Protobuf and JSON). |
經過容器端口映射后,對外端口可能不是 4317、4318 了。

引入 Maomi.MQ.Instrumentation 包,以及其它相關 OpenTelemetry 包。
<PackageReference Include="Maomi.MQ.Instrumentation " Version="1.1.0" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.8.1" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.8.1" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.8.1" />
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.8.1" />
然后注入服務:
const string serviceName = "myapp";
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AutoQueueDeclare = true;
options.AppName = serviceName;
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]);
builder.Services.AddOpenTelemetry()
.ConfigureResource(resource => resource.AddService(serviceName))
.WithTracing(tracing =>
{
tracing.AddMaomiMQInstrumentation(options =>
{
options.Sources.AddRange(MaomiMQDiagnostic.Sources);
options.RecordException = true;
})
.AddAspNetCoreInstrumentation()
.AddOtlpExporter(options =>
{
options.Endpoint = new Uri("http://127.0.0.1:32772/v1/traces");
options.Protocol = OtlpExportProtocol.HttpProtobuf;
});
});
啟動服務后,進行發布、消費,鏈路追蹤信息會被自動推送到 OpenTelemetry Collector 中,通過 Jaeger 、Skywalking 等組件可以讀取出來。

由于 publish、consumer 屬于兄弟 trace 而不是同一個 trace,因此需要通過 Tags 查詢相關聯的 trace,格式 event.id=xxx。



浙公網安備 33010602011771號