<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      命令模式的深度解析:從標準實現到TPL Dataflow高性能架構

      命令模式是對一類對象公共操作的抽象,它們具有相同的方法簽名,所以具有類似的操作,可以被抽象出來,成為一個抽象的命令對象。實際操作的調用者就不是和一組對象打交道,它是需要以來這個命令對象的方法簽名,并根據這個簽名調用相關的方法。

      以上是命令模式的大概含義,這里可以聯想到事件驅動,command和handler,也可以聯想到AOP的思想。聯想到數據流的操作我就寫了個數據流操作類庫。

      Snipaste_2025-09-14_15-03-02

      Snipaste_2025-09-14_15-03-15

      之前寫了一些有關AOP的,但是感覺還是差點意思,補上這次的可能在項目中會彌補一些短板回來,就是靈活性。
      但是該項目重點是數據流的處理,所以web端來實現只是一個例子,大量數據的處理最主要的是后臺任務吧,通過接口調用只是一個實例展示。

      有關數據流這塊代碼核心如下:

      using System;
      using System.Collections.Concurrent;
      using System.Collections.Generic;
      using System.Linq;
      using System.Reflection;
      using System.Threading;
      using System.Threading.Tasks;
      using System.Threading.Tasks.Dataflow;
      using Microsoft.Extensions.DependencyInjection;
      using Microsoft.Extensions.Logging;
      using Common.Bus.Core;
      using Common.Bus.Monitoring;
      
      namespace Common.Bus.Implementations
      {
          /// <summary>
          /// 基于TPL數據流的高性能CommandBus實現
          /// 支持并行處理、背壓控制和監控
          /// </summary>
          public class DataflowCommandBus : ICommandBus, IDisposable
          {
              private readonly IServiceProvider _provider;
              private readonly ILogger<DataflowCommandBus>? _logger;
              private readonly ConcurrentDictionary<Type, Func<object>> _handlerCache = new();
              private readonly ConcurrentDictionary<Type, Func<object[]>> _behaviorsCache = new();
              
              // 數據流網絡
              private ActionBlock<DataflowCommandRequest> _commandProcessor = null!;
              
              // 背壓控制
              private readonly SemaphoreSlim _concurrencyLimiter;
              private readonly int _maxConcurrency;
              
              // 監控指標
              private long _processedCommands;
              private long _failedCommands;
              private long _totalProcessingTime;
      
              public DataflowCommandBus(IServiceProvider serviceProvider, ILogger<DataflowCommandBus>? logger = null, 
                  int? maxConcurrency = null)
              {
                  _provider = serviceProvider;
                  _logger = logger;
                  _maxConcurrency = maxConcurrency ?? Environment.ProcessorCount * 2;
                  _concurrencyLimiter = new SemaphoreSlim(_maxConcurrency, _maxConcurrency);
                  
                  // 創建數據流網絡
                  CreateDataflowNetwork();
              }
      
              private void CreateDataflowNetwork()
              {
                  // 創建命令處理器
                  _commandProcessor = new ActionBlock<DataflowCommandRequest>(
                      async request =>
                      {
                          try
                          {
                              await _concurrencyLimiter.WaitAsync();
                              var startTime = DateTime.UtcNow;
                              
                              // 執行完整的命令處理管道
                              var result = await ProcessCommandPipeline(request);
                              
                              var processingTime = DateTime.UtcNow - startTime;
                              Interlocked.Add(ref _totalProcessingTime, processingTime.Ticks);
                              Interlocked.Increment(ref _processedCommands);
                              
                              request.TaskCompletionSource.SetResult(result);
                          }
                          catch (Exception ex)
                          {
                              Interlocked.Increment(ref _failedCommands);
                              _logger?.LogError(ex, "Command processing failed for {CommandType}", request.CommandType.Name);
                              request.TaskCompletionSource.SetException(ex);
                          }
                          finally
                          {
                              _concurrencyLimiter.Release();
                          }
                      },
                      new ExecutionDataflowBlockOptions
                      {
                          MaxDegreeOfParallelism = _maxConcurrency,
                          BoundedCapacity = _maxConcurrency * 2
                      });
              }
      
              public async Task<TResult> SendAsync<TCommand, TResult>(TCommand command, CancellationToken ct = default) 
                  where TCommand : ICommand<TResult>
              {
                  var commandType = typeof(TCommand);
                  var requestId = Guid.NewGuid();
                  var tcs = new TaskCompletionSource<object>();
                  
                  var request = new DataflowCommandRequest(requestId, commandType, typeof(TResult), command, tcs);
                  
                  // 發送到數據流網絡
                  if (!_commandProcessor.Post(request))
                  {
                      throw new InvalidOperationException("Unable to queue command for processing - system may be overloaded");
                  }
                  
                  try
                  {
                      var result = await tcs.Task.WaitAsync(ct);
                      return (TResult)result;
                  }
                  catch (OperationCanceledException) when (ct.IsCancellationRequested)
                  {
                      _logger?.LogWarning("Command {CommandType} was cancelled", commandType.Name);
                      throw;
                  }
              }
      
              private async Task<object> ProcessCommandPipeline(DataflowCommandRequest request)
              {
                  // 使用反射調用泛型方法
                  var method = typeof(DataflowCommandBus).GetMethod(nameof(ProcessCommandPipelineGeneric), BindingFlags.NonPublic | BindingFlags.Instance);
                  var genericMethod = method!.MakeGenericMethod(request.CommandType, request.ResultType);
                  
                  var task = (Task)genericMethod.Invoke(this, new object[] { request })!;
                  await task;
                  
                  var resultProperty = task.GetType().GetProperty("Result");
                  return resultProperty?.GetValue(task) ?? throw new InvalidOperationException("Failed to get result from task");
              }
      
              private async Task<TResult> ProcessCommandPipelineGeneric<TCommand, TResult>(DataflowCommandRequest request) 
                  where TCommand : ICommand<TResult>
              {
                  // 獲取處理器和行為的工廠函數
                  var handlerFactory = GetCachedHandler<TCommand, TResult>(request.CommandType);
                  var behaviorsFactory = GetCachedBehaviors<TCommand, TResult>(request.CommandType);
                  
                  // 創建處理器和行為的實例
                  var handler = handlerFactory();
                  var behaviors = behaviorsFactory();
                  
                  // 構建處理管道
                  Func<Task<TResult>> pipeline = () => ExecuteHandler<TCommand, TResult>(handler, (TCommand)request.Command);
                  
                  // 按順序應用管道行為
                  foreach (var behavior in behaviors.Reverse())
                  {
                      var currentBehavior = behavior;
                      var currentPipeline = pipeline;
                      pipeline = async () => (TResult)await ExecuteBehavior(currentBehavior, (TCommand)request.Command, currentPipeline);
                  }
                  
                  return await pipeline();
              }
      
              private async Task<object> ExecuteBehavior<TCommand, TResult>(
                  ICommandPipelineBehavior<TCommand, TResult> behavior, 
                  TCommand command, 
                  Func<Task<TResult>> next) 
                  where TCommand : ICommand<TResult>
              {
                  try
                  {
                      var result = await behavior.Handle(command, next, CancellationToken.None);
                      return result!;
                  }
                  catch (Exception ex)
                  {
                      throw new InvalidOperationException($"Error executing behavior {behavior.GetType().Name}: {ex.Message}", ex);
                  }
              }
      
              private Func<ICommandHandler<TCommand, TResult>> GetCachedHandler<TCommand, TResult>(Type commandType) 
                  where TCommand : ICommand<TResult>
              {
                  return (Func<ICommandHandler<TCommand, TResult>>)_handlerCache.GetOrAdd(commandType, _ =>
                  {
                      return new Func<ICommandHandler<TCommand, TResult>>(() =>
                      {
                          using var scope = _provider.CreateScope();
                          var handler = scope.ServiceProvider.GetService<ICommandHandler<TCommand, TResult>>();
                          if (handler == null)
                              throw new InvalidOperationException($"No handler registered for {commandType.Name}");
                          return handler;
                      });
                  });
              }
      
              private Func<ICommandPipelineBehavior<TCommand, TResult>[]> GetCachedBehaviors<TCommand, TResult>(Type commandType) 
                  where TCommand : ICommand<TResult>
              {
                  return (Func<ICommandPipelineBehavior<TCommand, TResult>[]>)_behaviorsCache.GetOrAdd(commandType, _ =>
                  {
                      return new Func<ICommandPipelineBehavior<TCommand, TResult>[]>(() =>
                      {
                          using var scope = _provider.CreateScope();
                          var behaviors = scope.ServiceProvider.GetServices<ICommandPipelineBehavior<TCommand, TResult>>().ToArray();
                          return behaviors;
                      });
                  });
              }
      
              private async Task<TResult> ExecuteHandler<TCommand, TResult>(ICommandHandler<TCommand, TResult> handler, TCommand command) 
                  where TCommand : ICommand<TResult>
              {
                  return await handler.HandleAsync(command, CancellationToken.None);
              }
      
              private async Task<object> ExecuteHandler(object handler, object command)
              {
                  var handlerType = handler.GetType();
                  var handleMethod = handlerType.GetMethod("HandleAsync");
                  
                  if (handleMethod == null)
                      throw new InvalidOperationException($"Handler {handlerType.Name} does not have HandleAsync method");
      
                  var task = (Task)handleMethod.Invoke(handler, new object[] { command, CancellationToken.None })!;
                  await task;
                  
                  var resultProperty = task.GetType().GetProperty("Result");
                  return resultProperty?.GetValue(task) ?? throw new InvalidOperationException("Failed to get result from task");
              }
      
              private Func<object> GetCachedHandler(Type commandType)
              {
                  return _handlerCache.GetOrAdd(commandType, _ =>
                  {
                      // 獲取命令類型實現的ICommand<TResult>接口
                      var commandInterface = commandType.GetInterfaces()
                          .FirstOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ICommand<>));
                      
                      if (commandInterface == null)
                          throw new InvalidOperationException($"Command type {commandType.Name} does not implement ICommand<TResult>");
                      
                      var resultType = commandInterface.GetGenericArguments()[0];
                      var handlerType = typeof(ICommandHandler<,>).MakeGenericType(commandType, resultType);
                      
                      // 返回一個工廠函數,而不是直接返回處理器實例
                      return new Func<object>(() =>
                      {
                          using var scope = _provider.CreateScope();
                          var handler = scope.ServiceProvider.GetService(handlerType);
                          if (handler == null)
                              throw new InvalidOperationException($"No handler registered for {commandType.Name}");
                          return handler;
                      });
                  });
              }
      
              private Func<object[]> GetCachedBehaviors(Type commandType)
              {
                  return _behaviorsCache.GetOrAdd(commandType, _ =>
                  {
                      // 獲取命令類型實現的ICommand<TResult>接口
                      var commandInterface = commandType.GetInterfaces()
                          .FirstOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ICommand<>));
                      
                      if (commandInterface == null)
                          throw new InvalidOperationException($"Command type {commandType.Name} does not implement ICommand<TResult>");
                      
                      var resultType = commandInterface.GetGenericArguments()[0];
                      var behaviorType = typeof(ICommandPipelineBehavior<,>).MakeGenericType(commandType, resultType);
                      
                      // 返回一個工廠函數,而不是直接返回行為實例
                      return new Func<object[]>(() =>
                      {
                          using var scope = _provider.CreateScope();
                          var behaviors = scope.ServiceProvider.GetServices(behaviorType).Where(b => b != null).ToArray();
                          return behaviors!;
                      });
                  });
              }
      
              // 監控和統計方法
              public DataflowMetrics GetMetrics()
              {
                  return new DataflowMetrics
                  {
                      ProcessedCommands = Interlocked.Read(ref _processedCommands),
                      FailedCommands = Interlocked.Read(ref _failedCommands),
                      TotalProcessingTime = TimeSpan.FromTicks(Interlocked.Read(ref _totalProcessingTime)),
                      AverageProcessingTime = _processedCommands > 0 
                          ? TimeSpan.FromTicks(Interlocked.Read(ref _totalProcessingTime) / _processedCommands)
                          : TimeSpan.Zero,
                      AvailableConcurrency = _concurrencyLimiter.CurrentCount,
                      MaxConcurrency = _maxConcurrency,
                      InputQueueSize = _commandProcessor.InputCount
                  };
              }
      
              public void ClearCache()
              {
                  _handlerCache.Clear();
                  _behaviorsCache.Clear();
              }
      
              public void Dispose()
              {
                  _commandProcessor?.Complete();
                  _concurrencyLimiter?.Dispose();
              }
          }
      
          // 輔助類
          internal class DataflowCommandRequest
          {
              public Guid Id { get; }
              public Type CommandType { get; }
              public Type ResultType { get; }
              public object Command { get; }
              public TaskCompletionSource<object> TaskCompletionSource { get; }
      
              public DataflowCommandRequest(Guid id, Type commandType, Type resultType, object command, TaskCompletionSource<object> tcs)
              {
                  Id = id;
                  CommandType = commandType;
                  ResultType = resultType;
                  Command = command;
                  TaskCompletionSource = tcs;
              }
          }
      
      }

       

      這里如果不是數據流方式可以使用通用模式:

      using System;
      using System.Collections.Concurrent;
      using System.Collections.Generic;
      using System.Linq;
      using System.Text;
      using System.Threading.Tasks;
      using Microsoft.Extensions.DependencyInjection;
      using Common.Bus.Core;
      
      namespace Common.Bus.Implementations
      {
          public class CommandBus : ICommandBus
          {
              private readonly IServiceProvider _provider;
              private readonly ConcurrentDictionary<Type, Func<object>> _handlerCache = new();
              private readonly ConcurrentDictionary<Type, Func<object[]>> _behaviorsCache = new();
              private readonly ConcurrentDictionary<Type, Func<object, object, CancellationToken, Task<object>>> _pipelineCache = new();
      
              public CommandBus(IServiceProvider serviceProvider)
              {
                  _provider = serviceProvider;
              }
      
              // 添加清理緩存的方法,用于測試或動態重新加載
              public void ClearCache()
              {
                  _handlerCache.Clear();
                  _behaviorsCache.Clear();
                  _pipelineCache.Clear();
              }
      
              public async Task<TResult> SendAsync<TCommand, TResult>(TCommand command, CancellationToken ct = default) where TCommand : ICommand<TResult>
              {
                  var commandType = typeof(TCommand);
                  
                  // 獲取緩存的Handler
                  var handler = GetCachedHandler<TCommand, TResult>(commandType);
                  
                  // 獲取緩存的Pipeline
                  var pipeline = GetCachedPipeline<TCommand, TResult>(commandType);
                  
                  // 執行Pipeline
                  var result = await pipeline(handler, command, ct);
                  return (TResult)result;
              }
      
              private ICommandHandler<TCommand, TResult> GetCachedHandler<TCommand, TResult>(Type commandType) where TCommand : ICommand<TResult>
              {
                  var handlerFactory = (Func<object>)_handlerCache.GetOrAdd(commandType, _ =>
                  {
                      return new Func<object>(() =>
                      {
                          using var scope = _provider.CreateScope();
                          var handler = scope.ServiceProvider.GetService(typeof(ICommandHandler<TCommand, TResult>));
                          if (handler == null)
                              throw new InvalidOperationException($"No handler registered for {commandType.Name}");
                          return handler;
                      });
                  });
                  return (ICommandHandler<TCommand, TResult>)handlerFactory();
              }
      
              private ICommandPipelineBehavior<TCommand, TResult>[] GetCachedBehaviors<TCommand, TResult>(Type commandType) where TCommand : ICommand<TResult>
              {
                  var behaviorsFactory = (Func<object[]>)_behaviorsCache.GetOrAdd(commandType, _ =>
                  {
                      return new Func<object[]>(() =>
                      {
                          using var scope = _provider.CreateScope();
                          var behaviors = scope.ServiceProvider.GetServices<ICommandPipelineBehavior<TCommand, TResult>>().ToArray();
                          return behaviors.Cast<object>().ToArray();
                      });
                  });
                  return behaviorsFactory().Cast<ICommandPipelineBehavior<TCommand, TResult>>().ToArray();
              }
      
              private Func<object, object, CancellationToken, Task<object>> GetCachedPipeline<TCommand, TResult>(Type commandType) where TCommand : ICommand<TResult>
              {
                  return _pipelineCache.GetOrAdd(commandType, _ =>
                  {
                      var behaviors = GetCachedBehaviors<TCommand, TResult>(commandType);
                      
                      // 預構建Pipeline,避免每次調用時重新構建
                      return async (handler, command, ct) =>
                      {
                          if (handler == null || command == null)
                              throw new ArgumentNullException("Handler or command cannot be null");
                              
                          var typedHandler = (ICommandHandler<TCommand, TResult>)handler;
                          var typedCommand = (TCommand)command;
      
                          // 如果沒有behaviors,直接調用handler
                          if (behaviors.Length == 0)
                          {
                              var result = await typedHandler.HandleAsync(typedCommand, ct);
                              return (object)result!;
                          }
      
                          // 使用遞歸方式構建pipeline,減少委托創建
                          var pipelineResult = await ExecutePipeline(typedHandler, typedCommand, behaviors, 0, ct);
                          return (object)pipelineResult!;
                      };
                  });
              }
      
              private async Task<TResult> ExecutePipeline<TCommand, TResult>(
                  ICommandHandler<TCommand, TResult> handler, 
                  TCommand command, 
                  ICommandPipelineBehavior<TCommand, TResult>[] behaviors, 
                  int behaviorIndex, 
                  CancellationToken ct) where TCommand : ICommand<TResult>
              {
                  if (behaviorIndex >= behaviors.Length)
                  {
                      return await handler.HandleAsync(command, ct);
                  }
      
                  var behavior = behaviors[behaviorIndex];
                  return await behavior.Handle(command, () => ExecutePipeline(handler, command, behaviors, behaviorIndex + 1, ct), ct);
              }
          }
      }

       

      其他批量操作、帶監控等模式就參考其他代碼:
      exercisebook/AOP/EventBusAOP/AopNew at main · liuzhixin405/exercisebook


      一下是項目更詳細介紹,如有錯誤多多指正:

      # CommandBus AOP 項目

      這是一個基于AOP(面向切面編程)的CommandBus項目,使用TPL Dataflow進行數據流處理優化,支持多種CommandBus實現和實時監控。

      ## CommandBus實現類型

      ### 1. Standard CommandBus
      - **類型**: `CommandBusType.Standard`
      - **特點**: 標準同步處理,適合簡單場景
      - **控制器**: `StandardCommandBusController`

      ### 2. Dataflow CommandBus
      - **類型**: `CommandBusType.Dataflow`
      - **特點**: 基于TPL Dataflow的異步并發處理,適合高并發場景
      - **控制器**: `DataflowCommandBusController`

      ### 3. Batch Dataflow CommandBus
      - **類型**: `CommandBusType.BatchDataflow`
      - **特點**: 支持批量處理,適合大批量數據場景
      - **控制器**: `BatchDataflowCommandBusController`

      ### 4. Typed Dataflow CommandBus
      - **類型**: `CommandBusType.TypedDataflow`
      - **特點**: 強類型安全,適合復雜業務場景
      - **控制器**: `TypedDataflowCommandBusController`

      ### 5. Monitored CommandBus
      - **類型**: `CommandBusType.Monitored`
      - **特點**: 包含性能監控,適合生產環境
      - **控制器**: `MonitoredCommandBusController`
       
      這里有一個擴展點behavior,可以注入前后時間,當前代代碼只做了業務前的攔截,業務后的可以如法炮制。這樣的話就是一個aop,那么跟aop切面編程又有什么區別和共同點呢?

                  // 構建處理管道
                  Func<Task<TResult>> pipeline = () => ExecuteHandler<TCommand, TResult>(handler, (TCommand)request.Command);
                  
                  // 按順序應用管道行為
                  foreach (var behavior in behaviors.Reverse())
                  {
                      var currentBehavior = behavior;
                      var currentPipeline = pipeline;
                      pipeline = async () => (TResult)await ExecuteBehavior(currentBehavior, (TCommand)request.Command, currentPipeline);
                  }
                  
                  return await pipeline();
              }

       

       

      ?? 共同點

      • 目標一致:都是把 橫切關注點(Logging、Validation、Transaction、Caching 等)從業務邏輯里抽離出來。

      • 調用鏈模式:無論是 AOP 的 攔截器鏈,還是 CommandBus 的 Behavior 管道,最終都是一層層包裝,最后執行真正的業務邏輯。

      • 可插拔:可以動態增加/減少某個橫切邏輯,而不用改業務代碼。


      ?? 區別

      特性AOP (動態代理 / 攔截器)CommandBus + Behavior
      觸發方式 方法調用時攔截(通過代理/動態代理實現) 命令執行時經過管道(需要顯式通過 CommandBus 調用)
      范圍 通用(任何類方法都能攔截) 限定在 Command 處理(CQRS 場景特化)
      技術實現 依賴 DI 容器攔截中間件編譯期注入 依賴 Pipeline 模式,類似 MediatR 的 IPipelineBehavior<TRequest,TResponse>
      靈活度 更通用,可以橫跨全項目(比如給 Service 層所有方法加日志) 針對性更強,主要是 Command/Query 的執行鏈
      侵入性 低,業務代碼不用改(只要接口/虛方法即可)


      性能對比總結

      維度AOP(動態代理)CommandBus + Behavior
      調用開銷 需要動態代理/反射 普通方法調用
      可擴展性 全局通用 局部(命令/查詢)
      性能損耗 相對較高 相對較低
      場景 橫切關注點,通用功能 CQRS 業務管道,高性能場景


       兩者一句話總結:

      • AOP 更通用,但性能稍差(尤其高并發、核心鏈路要慎用)。

      • CommandBus + Behavior 更高效,但應用范圍窄(主要適合命令/查詢處理管道)。

      posted @ 2025-09-15 00:44  星仔007  閱讀(80)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 男女啪祼交视频| 欧洲亚洲国内老熟女超碰| 被灌满精子的波多野结衣| 97一期涩涩97片久久久久久久 | 诱人的老师hd中文字幕| 龙陵县| 精品午夜福利在线视在亚洲| 最新国产精品拍自在线观看| 四虎成人精品永久网站| 国产日韩精品一区二区三区在线| 女人与牲口性恔配视频免费| 久久精品色妇熟妇丰满人| 国自产在线精品一本无码中文| 日韩三级一区二区在线看| 国产性生大片免费观看性| 少妇又爽又刺激视频| 18禁精品一区二区三区| 国产精品中文字幕日韩| 国产精品午夜无码AV天美传媒| 亚洲第一香蕉视频啪啪爽| 99久久国产福利自产拍| 伊人久久精品无码麻豆一区| 亚洲中文精品一区二区| 激情国产一区二区三区四区| 国产永久免费高清在线| 欧美午夜精品久久久久久浪潮 | 免费网站看V片在线毛| 春色校园综合人妻av| 中文字幕色偷偷人妻久久| 成人亚洲av免费在线| 干老熟女干老穴干老女人| 国产成人精品中文字幕| 成年女人免费视频播放体验区| 日韩精品18禁一区二区| 成人啪精品视频网站午夜| 免费国产好深啊好涨好硬视频| 翘臀少妇被扒开屁股日出水爆乳| 国产av成人精品播放| 久久人妻无码一区二区| 国产99青青成人A在线| 亚洲精品日本久久久中文字幕|