aspnetcore微服務中使用發件箱模式實例
aspnetcore微服務種服務之間的通信一般都有用到消息中間件,如何確保該服務的持久層保存創建的數據同時又把消息成功投遞到了關聯服務,關聯服務做對應的處理。
下面就以一個簡單的例子來演示實現方式之一,即發件箱模式。
下面解決方案有兩個服務,做演示用的比較簡單,一個是訂單服務,一個是賬單服務。完成訂單的同時把訂單信息通過本例的rabbitmq發送到billapi服務中去。

首先trading服務有一個領域內事件接收器
public abstract class IEntity { private int id; public virtual int Id { get { return id; } protected set { id = value; } } private List<IEvent> _domainEvents; public IReadOnlyCollection<IEvent> DomainEvents => _domainEvents?.AsReadOnly(); public void AddDomainEvent(IEvent eventItem) { _domainEvents = _domainEvents ?? new List<IEvent>(); _domainEvents.Add(eventItem); } public void RemoveDomainEvent(IEvent eventItem) { _domainEvents?.Remove(eventItem); } public void ClearDomainEvents() { _domainEvents?.Clear(); } }
public class CreateOrderEvent:IEvent { public Guid EventId { get; set; } public int CustomerId { get; set; } public CreateOrderEvent(Guid EventId,int customerId) { this.EventId = EventId; CustomerId = customerId; } }
我把事件簡化到實體類里面,也可以不需要這個IEntity,那每次都需要自己創建order的同時創建一個事件,當然事件集合需要自己定義存起來。
發件箱顧名思義就是所有郵件定時定期的投遞到郵箱中,定時定期的取出來往需要的地方去投遞。
這里的郵件就是事件了,而投遞就是事件發布。
這個實例的事件放到實體類種有領域的味道,因為在一個領域order內可以把關聯的事件都放一起。下面代碼就是借助efcore的攔截器來統一在savechange的地方來把事件寫到數據庫中去。
我新建一個order,同時把發布的order事件存到數據,這就是發件箱模式。好多數據庫和中間件操作的最終一致性大體都是這個模式,借助數據庫的分布式事務。
public sealed class OutBoxMessageInterceptor:SaveChangesInterceptor { public override ValueTask<InterceptionResult<int>> SavingChangesAsync(DbContextEventData eventData, InterceptionResult<int> result, CancellationToken cancellationToken = default) { DbContext? dbContxt = eventData.Context; if (dbContxt is null) { return base.SavingChangesAsync(eventData, result, cancellationToken); } var events = dbContxt.ChangeTracker.Entries<IEntity>().Select(x => x.Entity).SelectMany(x => { List<IEvent> entities = new List<IEvent>(); foreach (var item in x.DomainEvents) { if(!(item is null)) entities.Add(item); } x.ClearDomainEvents(); return entities; }).Select(x => new OutBoxMessage { Id = Guid.NewGuid(), OccurredOnUtc = DateTime.UtcNow, Type = x.GetType().Name, Content = System.Text.Json.JsonSerializer.Serialize((CreateOrderEvent)x) }).ToList(); if(events!=null && events.Any()) dbContxt.Set<OutBoxMessage>().AddRangeAsync(events); return base.SavingChangesAsync(eventData, result, cancellationToken); } }
數據庫攔截器注入的代碼少不了,寫是寫進去了,下面就是怎么去往另外的服務的發布呢?
builder.Services.AddDbContext<TradingDbContext>((sp,ops) => { ops.UseSqlServer("Data Source=(localdb)\\MSSQLLocalDB;Initial Catalog=Traing;Integrated Security=True;Connect Timeout=30;Encrypt=False;Trust Server Certificate=False;Application Intent=ReadWrite;Multi Subnet Failover=False"); var interceptor = sp.GetService<OutBoxMessageInterceptor>(); ops.AddInterceptors(interceptor); }, ServiceLifetime.Scoped);
這里就是后臺任務去取數據做處理了
public class OutBoxMessageBackgroundService : BackgroundService { private readonly IServiceProvider _serviceProvider; private readonly IRabbitMQEventBus _publisher; public OutBoxMessageBackgroundService(IServiceProvider serviceProvider, IRabbitMQEventBus publisher) { _serviceProvider = serviceProvider; _publisher = publisher; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { using var scope = _serviceProvider.CreateScope(); var _orderingContext = scope.ServiceProvider.GetService<TradingDbContext>(); var messages = await _orderingContext.Set<OutBoxMessage>().Where(m => m.ProceddedOnUtc == null) .Take(10).ToListAsync(stoppingToken); foreach (var message in messages) { if (string.IsNullOrEmpty(message.Content)) continue; var retries = 3; var retry = Policy.Handle<Exception>() .WaitAndRetry( retries, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (exception, timeSpan, retry, ctx) => { Console.WriteLine($"發布時間失敗:{message}"); }); retry.Execute(() => _publisher.Publish(new { Content=message.Content,Id = message.Id }, exchange: "RabbitMQ.EventBus.Simple", routingKey: "rabbitmq.eventbus.test")); message.ProceddedOnUtc = DateTime.UtcNow; } await _orderingContext.SaveChangesAsync(stoppingToken); } }
就是這么簡單,tradinfgapi的任務就這么愉快地完成了,這里保證了數據庫寫數據和發布事件出去最終是同步的,即使服務出問題重啟也一樣能完成任務。
下面就是接受事件的billapi的服務了,因為上面代碼用來重試機制,而且其他情況也比面不了事件重復發送,下面就簡單的處理下訂閱事件的冪等性。
public class IDomainEvent : IEvent { public Guid Id { get; set; } public string Content { get; set; } } public class IdempotentDomainEventHandler : IEventResponseHandler<IDomainEvent,int>,IDisposable { private readonly IServiceProvider _serviceProvider; public IdempotentDomainEventHandler(IServiceProvider serviceProvider) { _serviceProvider = serviceProvider; } public void Dispose() { Console.WriteLine("MessageBodyHandle Disposable."); } public async Task<int> HandleAsync(HandlerEventArgs<IDomainEvent> args) { using var scope = _serviceProvider.CreateScope(); BillingDbContext _context = scope.ServiceProvider.GetService<BillingDbContext>(); string consumer = args.GetType().Name; if (await _context.Set<OutboxMessageConsumer>().AnyAsync(o => o.Guid == args.EventObject.Id && o.Name==consumer)) { return default; } Console.WriteLine($"等待處理的消息{args.EventObject.Content}"); CreateOrderEvent createOrderEvent = System.Text.Json.JsonSerializer.Deserialize<CreateOrderEvent>(args.EventObject.Content); await _context.BillingRecords.AddAsync(new BillingRecord { CreateTime=DateTime.UtcNow, OrderEventId=createOrderEvent.EventId}); Console.WriteLine($"處理的消息完畢"); _context.Set<OutboxMessageConsumer>().Add(new OutboxMessageConsumer { Guid = args.EventObject.Id, Name = consumer }); return await _context.SaveChangesAsync(); } } public class OutboxMessageConsumer { public int Id { get; set; } public Guid Guid { get; set; } public string Name { get; set; } }
/// <summary> /// 來自tradingapi的數據 /// </summary> public class CreateOrderEvent { public Guid EventId { get; set; } public int CustomerId { get; set; } }
同樣是把事件處理后寫入到數據庫,每次進來去數據庫看看有沒有,就這么簡單的完成了事件訂閱的重復處理。
下面運行一下程序看看效果,創建order前billingrecord是沒有記錄的。




這里出現了一個喜聞樂見的事情,trading服務已經發布了事件,billing服務沒收到,可能是rabbitmq卡住了,不過沒關系,因為有這個發件箱模式可以重啟下服務,這個時間丟不了。
重啟了下服務就消費掉了這條數據。

至于重復消費的測試就省了,有需要自己下載源碼去測試


浙公網安備 33010602011771號