<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      aspnetcore微服務中使用發件箱模式實例

      aspnetcore微服務種服務之間的通信一般都有用到消息中間件,如何確保該服務的持久層保存創建的數據同時又把消息成功投遞到了關聯服務,關聯服務做對應的處理。

      下面就以一個簡單的例子來演示實現方式之一,即發件箱模式。

      下面解決方案有兩個服務,做演示用的比較簡單,一個是訂單服務,一個是賬單服務。完成訂單的同時把訂單信息通過本例的rabbitmq發送到billapi服務中去。

      首先trading服務有一個領域內事件接收器

       public abstract class IEntity
          {
              private int id;
              public virtual int Id
              {
                  get { return id; }
                  protected set { id = value; }
              }
      
              private List<IEvent> _domainEvents;
              public IReadOnlyCollection<IEvent> DomainEvents => _domainEvents?.AsReadOnly();
              public void AddDomainEvent(IEvent eventItem)
              {
                  _domainEvents = _domainEvents ?? new List<IEvent>();
                  _domainEvents.Add(eventItem);
              }
              public void RemoveDomainEvent(IEvent eventItem)
              {
                  _domainEvents?.Remove(eventItem);
              }
              public void ClearDomainEvents()
              {
                  _domainEvents?.Clear();
              }
          }

       

       public class CreateOrderEvent:IEvent
          {
              public Guid EventId { get; set; }
              public int CustomerId { get; set; }
              public CreateOrderEvent(Guid EventId,int customerId)
              {
                  this.EventId = EventId;
                  CustomerId = customerId;
              }
      
          }

      我把事件簡化到實體類里面,也可以不需要這個IEntity,那每次都需要自己創建order的同時創建一個事件,當然事件集合需要自己定義存起來。

      發件箱顧名思義就是所有郵件定時定期的投遞到郵箱中,定時定期的取出來往需要的地方去投遞。

      這里的郵件就是事件了,而投遞就是事件發布。

      這個實例的事件放到實體類種有領域的味道,因為在一個領域order內可以把關聯的事件都放一起。下面代碼就是借助efcore的攔截器來統一在savechange的地方來把事件寫到數據庫中去。

      我新建一個order,同時把發布的order事件存到數據,這就是發件箱模式。好多數據庫和中間件操作的最終一致性大體都是這個模式,借助數據庫的分布式事務。

        public sealed class OutBoxMessageInterceptor:SaveChangesInterceptor
          {
              public override ValueTask<InterceptionResult<int>> SavingChangesAsync(DbContextEventData eventData, InterceptionResult<int> result, CancellationToken cancellationToken = default)
              {
                  DbContext? dbContxt = eventData.Context;
                  if (dbContxt is null)
                  {
                      return base.SavingChangesAsync(eventData, result, cancellationToken);
                  }
                  var events = dbContxt.ChangeTracker.Entries<IEntity>().Select(x => x.Entity).SelectMany(x =>
                  {
                      List<IEvent> entities = new List<IEvent>();
                      foreach (var item in x.DomainEvents)
                      {
                          if(!(item is null))
                          entities.Add(item);
                      }
                      x.ClearDomainEvents();
                      return entities;
                  }).Select(x => new OutBoxMessage
                  {
                      Id = Guid.NewGuid(),
                      OccurredOnUtc = DateTime.UtcNow,
                      Type = x.GetType().Name,
                      Content = System.Text.Json.JsonSerializer.Serialize((CreateOrderEvent)x)
                  }).ToList();
                  if(events!=null && events.Any())
                   dbContxt.Set<OutBoxMessage>().AddRangeAsync(events);
                  return base.SavingChangesAsync(eventData, result, cancellationToken);
              }
          }

      數據庫攔截器注入的代碼少不了,寫是寫進去了,下面就是怎么去往另外的服務的發布呢?

       builder.Services.AddDbContext<TradingDbContext>((sp,ops) =>
                  {
                      ops.UseSqlServer("Data Source=(localdb)\\MSSQLLocalDB;Initial Catalog=Traing;Integrated Security=True;Connect Timeout=30;Encrypt=False;Trust Server Certificate=False;Application Intent=ReadWrite;Multi Subnet Failover=False");
                      var interceptor = sp.GetService<OutBoxMessageInterceptor>();
                      ops.AddInterceptors(interceptor);
                  }, ServiceLifetime.Scoped);

       

      這里就是后臺任務去取數據做處理了

       public class OutBoxMessageBackgroundService : BackgroundService
          {
              private readonly IServiceProvider _serviceProvider;
              private readonly IRabbitMQEventBus _publisher;
              public OutBoxMessageBackgroundService(IServiceProvider serviceProvider, IRabbitMQEventBus publisher)
              {
                  _serviceProvider = serviceProvider;
                  _publisher = publisher;
              }
              protected override async Task ExecuteAsync(CancellationToken stoppingToken)
              {
                  using var scope = _serviceProvider.CreateScope();
                  var _orderingContext = scope.ServiceProvider.GetService<TradingDbContext>();
                  var messages = await _orderingContext.Set<OutBoxMessage>().Where(m => m.ProceddedOnUtc == null)
                      .Take(10).ToListAsync(stoppingToken);
                  foreach (var message in messages)
                  {
                      if (string.IsNullOrEmpty(message.Content))
                          continue;
                      var retries = 3;
                      var retry = Policy.Handle<Exception>()
                          .WaitAndRetry(
                          retries,
                          retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
                          (exception, timeSpan, retry, ctx) =>
                          {
                              Console.WriteLine($"發布時間失敗:{message}");
                          });
                      retry.Execute(() => _publisher.Publish(new { Content=message.Content,Id = message.Id }, exchange: "RabbitMQ.EventBus.Simple", routingKey: "rabbitmq.eventbus.test"));
                      message.ProceddedOnUtc = DateTime.UtcNow;
                  }
                  await _orderingContext.SaveChangesAsync(stoppingToken);
              }
          }

      就是這么簡單,tradinfgapi的任務就這么愉快地完成了,這里保證了數據庫寫數據和發布事件出去最終是同步的,即使服務出問題重啟也一樣能完成任務。

      下面就是接受事件的billapi的服務了,因為上面代碼用來重試機制,而且其他情況也比面不了事件重復發送,下面就簡單的處理下訂閱事件的冪等性。

       public class IDomainEvent : IEvent
          {
              public Guid Id { get; set; }  
              public string Content { get; set; }
          }
          public  class IdempotentDomainEventHandler : IEventResponseHandler<IDomainEvent,int>,IDisposable
          {
              private readonly IServiceProvider _serviceProvider;
              public IdempotentDomainEventHandler(IServiceProvider serviceProvider)
              {
                  _serviceProvider = serviceProvider;
              }
      
              public void Dispose()
              {
                  Console.WriteLine("MessageBodyHandle Disposable.");
              }
      
              public async Task<int> HandleAsync(HandlerEventArgs<IDomainEvent> args)
              {
                  using var scope = _serviceProvider.CreateScope();
                  BillingDbContext _context = scope.ServiceProvider.GetService<BillingDbContext>();
                  string consumer = args.GetType().Name;
                  if (await _context.Set<OutboxMessageConsumer>().AnyAsync(o => o.Guid == args.EventObject.Id && o.Name==consumer))
                  {
                      return default;
                  }
                  Console.WriteLine($"等待處理的消息{args.EventObject.Content}");
                  CreateOrderEvent createOrderEvent = System.Text.Json.JsonSerializer.Deserialize<CreateOrderEvent>(args.EventObject.Content);
                  await _context.BillingRecords.AddAsync(new BillingRecord { CreateTime=DateTime.UtcNow, OrderEventId=createOrderEvent.EventId});
                  Console.WriteLine($"處理的消息完畢");
      
                  _context.Set<OutboxMessageConsumer>().Add(new OutboxMessageConsumer
                  {
                      Guid = args.EventObject.Id,
                      Name = consumer
                  });
                 return await _context.SaveChangesAsync();
              }
          }
      
          public class OutboxMessageConsumer
          {
              public int Id { get; set; }
              public Guid Guid { get; set; }
              public string Name { get; set; }
          }
       /// <summary>
          /// 來自tradingapi的數據
          /// </summary>
          public class CreateOrderEvent
          {
              public Guid EventId { get; set; }
              public int CustomerId { get; set; }
          }

       

      同樣是把事件處理后寫入到數據庫,每次進來去數據庫看看有沒有,就這么簡單的完成了事件訂閱的重復處理。

      下面運行一下程序看看效果,創建order前billingrecord是沒有記錄的。

       

       

      這里出現了一個喜聞樂見的事情,trading服務已經發布了事件,billing服務沒收到,可能是rabbitmq卡住了,不過沒關系,因為有這個發件箱模式可以重啟下服務,這個時間丟不了。

      重啟了下服務就消費掉了這條數據。

      至于重復消費的測試就省了,有需要自己下載源碼去測試

      liuzhixin405/outboxpattern: microservice (github.com)

      posted @ 2023-03-22 00:39  星仔007  閱讀(473)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 永久免费av网站可以直接看的| 亚洲国产成人无码av在线影院| 亚洲国产日韩伦中文字幕| 国产成AV人片久青草影院| 亚洲欧美中文日韩在线v日本| 国产亚洲精品久久久久婷婷图片| 国产一区二区三区av在线无码观看| 亚洲精品午夜国产VA久久成人| 免费吃奶摸下激烈视频| 欧美性猛交xxxx乱大交极品| 377p日本欧洲亚洲大胆张筱雨| 亚洲一区成人在线视频| 4虎四虎永久在线精品免费| 国产乱色熟女一二三四区| 国产精品一线二线三线区| 麻豆人妻| 中文字幕av国产精品| 国产成人综合在线观看不卡| 91精品国产吴梦梦在线观看永久| 欧美丰满熟妇bbbbbb| 麻豆精品久久精品色综合| 亚洲AV成人无码久久精品四虎| 日韩加勒比一本无码精品| 精品国产亚洲午夜精品a| 国产按头口爆吞精在线视频| 午夜精品福利亚洲国产| 久久99久久99精品免观看| 亚洲美女高潮不断亚洲| 亚洲熟妇自偷自拍另欧美| 亚洲综合一区二区三区视频| 国产精品香港三级国产av| 亚洲色在线v中文字幕| 久久综合久中文字幕青草| 免费无码午夜福利片| 中国熟妇毛多多裸交视频| 国产粉嫩系列一区二区三| 欧洲无码一区二区三区在线观看| 国产精品人成视频免费播放| 亚洲精品国产精品国在线| 国产亚洲精品久久久久久青梅| 亚洲精国产一区二区三区|