本地事件總線和事務
本地事件總線和事務
通過重寫Ef Core的SaveChanges/SaveChangesAsync來實現(xiàn)事務。當然,如果您愿意實現(xiàn)倉儲層,可以在倉儲層實現(xiàn)展開對應實體包含的事件,并且調(diào)整事件的處理順序。
Github倉庫地址:soda-event-bus
實現(xiàn)AggregateRoot類
AggregateRoot類主要通過一個集合來記錄本次事務的所有事件,到保存前再展開讀取,在Abp中采用的ICollection記錄的本地事件,通過實現(xiàn)一個排序器來保證順序問題,我這里直接采用了ConcurrentQueue,保證原子操作的同時保證了順序性,實現(xiàn)更簡單一些。
public abstract class AggregateRoot
{
public ConcurrentQueue<object> LocalEvents { get; } = new();
public void AddLocalEvent<TEvent>(TEvent eventData) where TEvent : IEvent
{
LocalEvents.Enqueue(eventData);
}
public bool GetLocalEvent(out object? @event)
{
LocalEvents.TryDequeue(out var eventData);
@event = eventData;
return @event is not null;
}
public void ClearLocalEvents()
{
LocalEvents.Clear();
}
}
重寫DbContext
主要是從ServiceProvider中獲取對應實體類包含的事件,并且找到對應的Handler進行處理,然后再當作一個事務提交。
public class EventBusDbContext<TDbContext> : DbContext
where TDbContext : DbContext
{
private readonly IServiceProvider _serviceProvider;
public EventBusDbContext(DbContextOptions<TDbContext> options, IServiceProvider serviceProvider) : base(options)
{
_serviceProvider = serviceProvider;
}
public override int SaveChanges()
{
return base.SaveChanges();
}
public override int SaveChanges(bool acceptAllChangesOnSuccess)
{
return base.SaveChanges(acceptAllChangesOnSuccess);
}
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
{
await HandleEventsAsync();
return await base.SaveChangesAsync(cancellationToken);
}
public override async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = default)
{
await HandleEventsAsync();
return await base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
}
private async Task HandleEventsAsync()
{
foreach (var entityEntry in ChangeTracker.Entries<AggregateRoot>())
{
while (entityEntry.Entity.GetLocalEvent(out var @event))
{
if (@event is null) break;
await HandleEventAsync(@event);
}
entityEntry.Entity.ClearLocalEvents();
}
}
private async Task HandleEventAsync(object @event)
{
var eventHandlerType = typeof(IAsyncEventHandler<>).MakeGenericType(@event.GetType());
var eventHandler = _serviceProvider.GetRequiredService(eventHandlerType);
var method = eventHandler.GetType().GetMethod(nameof(IAsyncEventHandler<IEvent>.HandleAsync));
var exceptionHandleMethod = eventHandlerType.GetMethod(nameof(IAsyncEventHandler<IEvent>.HandleException));
try
{
await (Task)method!.Invoke(eventHandler, new[] { @event })!;
}
catch (Exception ex)
{
exceptionHandleMethod!.Invoke(eventHandler, new[] { @event, ex });
}
}
}
分布式事件總線和事務
根據(jù)需要擴展即可,基本邏輯相同,但可能需要增加確認機制等。

浙公網(wǎng)安備 33010602011771號