<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      實(shí)現(xiàn)一個(gè)事件總線

      使用 C# 實(shí)現(xiàn)一個(gè) Event Bus

      Event Bus(事件總線)是一種用于在應(yīng)用程序內(nèi)部或跨應(yīng)用程序組件之間進(jìn)行事件通信的機(jī)制。它允許不同的組件通過(guò)發(fā)布和訂閱事件來(lái)進(jìn)行解耦和通信。

      在給定的代碼片段中,我們可以看到一個(gè)使用C#實(shí)現(xiàn)的Event Bus。它定義了一些接口和類來(lái)實(shí)現(xiàn)事件的發(fā)布和訂閱。

      首先,我們有兩個(gè)基本的約束接口:IEventIAsyncEventHandler<TEvent>。IEvent是一個(gè)空接口,用于約束事件的類型。IAsyncEventHandler<TEvent>是一個(gè)泛型接口,用于約束事件處理程序的類型。它定義了處理事件的異步方法HandleAsync和處理異常的方法HandleException。

      接下來(lái),我們有一個(gè)IEventBus接口,它定義了一些操作方法用于發(fā)布和訂閱事件。其中,Publish<TEvent>PublishAsync<TEvent>方法用于發(fā)布事件,而OnSubscribe<TEvent>方法用于訂閱事件。

      然后,我們看到一個(gè)實(shí)現(xiàn)了本地事件總線的類LocalEventBusManager<TEvent>。它實(shí)現(xiàn)了ILocalEventBusManager<TEvent>接口,用于在單一管道內(nèi)處理本地事件。它使用了一個(gè)Channel<TEvent>來(lái)存儲(chǔ)事件,并提供了發(fā)布事件的方法PublishPublishAsync。此外,它還提供了一個(gè)自動(dòng)處理事件的方法AutoHandle

      總的來(lái)說(shuō),Event Bus提供了一種方便的方式來(lái)實(shí)現(xiàn)組件之間的松耦合通信。通過(guò)發(fā)布和訂閱事件,組件可以獨(dú)立地進(jìn)行操作,而不需要直接依賴于彼此的實(shí)現(xiàn)細(xì)節(jié)。這種機(jī)制可以提高代碼的可維護(hù)性和可擴(kuò)展性。

      Github倉(cāng)庫(kù)地址:soda-event-bus

      實(shí)現(xiàn)一些基本約束

      先實(shí)現(xiàn)一些約束,實(shí)現(xiàn)IEvent約束事件,實(shí)現(xiàn)IAsyncEvnetHandler<TEvent> where TEvent:IEvent來(lái)約束事件的處理程序。

      public interface IEvent
      {
      
      }
      
      public interface IAsyncEventHandler<in TEvent> where TEvent : IEvent
      {
          Task HandleAsync(IEvent @event);
      
          void HandleException(IEvent @event, Exception ex);
      }
      
      

      接下來(lái)規(guī)定一下咱們的IEventBus,會(huì)有哪些操作方法。基本就是發(fā)布和訂閱。

      public interface IEventBus
      {
          void Publish<TEvent>(TEvent @event) where TEvent : IEvent;
          Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent;
      
          void OnSubscribe<TEvent>() where TEvent : IEvent;
      }
      

      實(shí)現(xiàn)一個(gè)本地事件總線

      本地事件處理

      本地事件的處理我打算采用兩種方式實(shí)現(xiàn),一種是LocalEventBusManager即本地事件管理,第二種是LocalEventBusPool池化本地事件。

      LocalEvnetBusManager

      LocalEventBusManager主要在單一管道內(nèi)進(jìn)行處理,集中進(jìn)行消費(fèi)。

      public interface ILocalEventBusManager<in TEvent>where TEvent : IEvent
      {
          void Publish(TEvent @event);
          Task PublishAsync(TEvent @event) ;
          
          void AutoHandle();
      }
      
      public class LocalEventBusManager<TEvent>(IServiceProvider serviceProvider):ILocalEventBusManager<TEvent>
          where TEvent: IEvent
      {
          readonly IServiceProvider _servicesProvider = serviceProvider;
      
          private readonly Channel<TEvent> _eventChannel = Channel.CreateUnbounded<TEvent>();
      
          public void Publish(TEvent @event)
          {
              Debug.Assert(_eventChannel != null, nameof(_eventChannel) + " != null");
              _eventChannel.Writer.WriteAsync(@event);
          }
      
          private CancellationTokenSource Cts { get; } = new();
      
          public void Cancel()
          {
              Cts.Cancel();
          }
          
          public async Task PublishAsync(TEvent @event)
          {
              await _eventChannel.Writer.WriteAsync(@event);
          }
      
          public void AutoHandle()
          {
              // 確保只啟動(dòng)一次
              if (!Cts.IsCancellationRequested) return;
      
              Task.Run(async () =>
              {
                  while (!Cts.IsCancellationRequested)
                  {
                      var reader = await _eventChannel.Reader.ReadAsync();
                      await HandleAsync(reader);
                  }
              }, Cts.Token);
          }
      
          async Task HandleAsync(TEvent @event)
          {
              var handler = _servicesProvider.GetService<IAsyncEventHandler<TEvent>>();
      
              if (handler is null)
              {
                  throw new NullReferenceException($"No handler for event {@event.GetType().Name}");
              }
              try
              {
                  await handler.HandleAsync(@event);
              }
              catch (Exception ex)
              {
                  handler.HandleException( @event, ex);
              }
          }
      }
      
      

      LocalEventBusPool

      LocalEventBusPool即所有的Event都會(huì)有一個(gè)單獨(dú)的管道處理,單獨(dú)消費(fèi)處理,并行能力更好一些。

      public sealed class LocalEventBusPool(IServiceProvider serviceProvider)
      {
          private readonly IServiceProvider _serviceProvider = serviceProvider;
      
          private class ChannelKey
          {
              public required string Key { get; init; }
              public int Subscribers { get; set; }
      
              public override bool Equals(object? obj)
              {
                  if (obj is ChannelKey key)
                  {
                      return string.Equals(key.Key, Key, StringComparison.OrdinalIgnoreCase);
                  }
      
                  return false;
              }
      
              public override int GetHashCode()
              {
                  return 0;
              }
          }
      
          private Channel<IEvent> Rent(string channel)
          {
              _channels.TryGetValue(new ChannelKey() { Key = channel }, out var value);
      
              if (value != null) return value;
              value = Channel.CreateUnbounded<IEvent>();
              _channels.TryAdd(new ChannelKey() { Key = channel }, value);
              return value;
          }
      
          private Channel<IEvent> Rent(ChannelKey channelKey)
          {
              _channels.TryGetValue(channelKey, out var value);
              if (value != null) return value;
              value = Channel.CreateUnbounded<IEvent>();
              _channels.TryAdd(channelKey, value);
              return value;
          }
      
          private readonly ConcurrentDictionary<ChannelKey, Channel<IEvent>> _channels = new();
      
          private CancellationTokenSource Cts { get; } = new();
      
          public void Cancel()
          {
              Cts.Cancel();
              _channels.Clear();
              Cts.TryReset();
          }
      
          public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
          {
              await Rent(typeof(TEvent).Name).Writer.WriteAsync(@event);
          }
      
          public void Publish<TEvent>(TEvent @event) where TEvent : IEvent
          {
              Rent(typeof(TEvent).Name).Writer.TryWrite(@event);
          }
      
          public void OnSubscribe<TEvent>() where TEvent : IEvent
          {
              var channelKey = _channels.FirstOrDefault(x => x.Key.Key == typeof(TEvent).Name).Key ??
                               new ChannelKey() { Key = typeof(TEvent).Name };
              channelKey.Subscribers++;
      
              Task.Run(async () =>
              {
                  try
                  {
                      while (!Cts.IsCancellationRequested)
                      {
                          var @event = await ReadAsync(channelKey);
      
                          var handler = _serviceProvider.GetService<IAsyncEventHandler<TEvent>>();
                          if (handler == null) throw new NullReferenceException($"No handler for Event {typeof(TEvent).Name}");
                          try
                          {
                              await handler.HandleAsync((TEvent)@event);
                          }
                          catch (Exception ex)
                          {
                              handler.HandleException((TEvent)@event, ex);
                          }
                      }
                  }
                  catch (Exception e)
                  {
                      throw new InvalidOperationException("Error on onSubscribe handler", e);
                  }
              }, Cts.Token);
          }
      
          private async Task<IEvent> ReadAsync(string channel)
          {
              return await Rent(channel).Reader.ReadAsync(Cts.Token);
          }
      
          private async Task<IEvent> ReadAsync(ChannelKey channel)
          {
              return await Rent(channel).Reader.ReadAsync(Cts.Token);
          }
      }
      

      LocalEventBus

      實(shí)現(xiàn)LocalEventBus繼承自IEventBus即可,如果有需要擴(kuò)展的方法自行添加,池化和管理器的情況單獨(dú)處理。

      public interface ILocalEventBus: IEventBus
      {
      
      }
      
      public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus
      {
          private  LocalEventBusPool? EventBusPool => serviceProvider.GetService<LocalEventBusPool>();
          
          
          public void Publish<TEvent>(TEvent @event) where TEvent : IEvent
          {
              if (options.Pool)
              {
                  Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
                  EventBusPool.Publish(@event);
              }
              else
              {
                  var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
                  if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
                  manager.Publish(@event);
              }
          }
      
          public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
          {
              if (options.Pool)
              {
                  Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
                  await EventBusPool.PublishAsync(@event);
              }
              else
              {
                  var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
                  if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
                  await manager.PublishAsync(@event);
              }
          }
      
          public void OnSubscribe<TEvent>() where TEvent : IEvent
          {
              if (options.Pool)
              {
                  Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
                  EventBusPool.OnSubscribe<TEvent>();
              }
              else
              {
                  var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
                  if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
                  manager.AutoHandle();
              }
          }
      }
      

      分布式事件總線

      根據(jù)需要擴(kuò)展即可,基本邏輯相同,但可能需要增加確認(rèn)機(jī)制等。

      posted @ 2024-01-02 09:48  胖紙不爭(zhēng)  閱讀(1376)  評(píng)論(1)    收藏  舉報(bào)
      主站蜘蛛池模板: 德钦县| 黄男女激情一区二区三区| 国内精品久久久久久无码不卡| 永和县| 精品国精品国产自在久国产应用男 | 色吊丝永久性观看网站| 久久99精品久久久久久琪琪| 激情综合色综合啪啪开心| 国产精品女生自拍第一区| 国内精品久久人妻无码网站| 国产精品免费中文字幕| 亚洲乱码国产乱码精品精| 老司机午夜精品视频资源| 怀柔区| 国产超碰无码最新上传| 国产精品综合av一区二区国产馆| 国产精品国产三级国产专业| 国产在线视频精品视频| 日韩有码av中文字幕| 国产口爆吞精在线视频2020版| 少妇又爽又刺激视频| 94人妻少妇偷人精品| 日韩在线观看精品亚洲| 99精品国产一区二区三区不卡| 欧美日本国产va高清cabal| 亚洲精品国产精品乱码不卡| 成人网站免费观看永久视频下载| 亚洲av午夜福利大精品| √天堂中文www官网在线| 四虎永久免费精品视频| 午夜福利电影| 成人av午夜在线观看| 国产精品天堂蜜av在线播放| 国语对白做受xxxxx在线中国| 国产精品爽爽爽一区二区| 亚洲欧美中文字幕日韩一区二区 | 精品国产成人国产在线视| 免费大片av手机看片高清| 九九热中文字幕在线视频| 亚洲国产性夜夜综合| 小雪被老外黑人撑破了视频|