命令模式的深度解析:從標準實現到TPL Dataflow高性能架構
命令模式是對一類對象公共操作的抽象,它們具有相同的方法簽名,所以具有類似的操作,可以被抽象出來,成為一個抽象的命令對象。實際操作的調用者就不是和一組對象打交道,它是需要以來這個命令對象的方法簽名,并根據這個簽名調用相關的方法。
以上是命令模式的大概含義,這里可以聯想到事件驅動,command和handler,也可以聯想到AOP的思想。聯想到數據流的操作我就寫了個數據流操作類庫。


之前寫了一些有關AOP的,但是感覺還是差點意思,補上這次的可能在項目中會彌補一些短板回來,就是靈活性。
但是該項目重點是數據流的處理,所以web端來實現只是一個例子,大量數據的處理最主要的是后臺任務吧,通過接口調用只是一個實例展示。
有關數據流這塊代碼核心如下:
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Common.Bus.Core; using Common.Bus.Monitoring; namespace Common.Bus.Implementations { /// <summary> /// 基于TPL數據流的高性能CommandBus實現 /// 支持并行處理、背壓控制和監控 /// </summary> public class DataflowCommandBus : ICommandBus, IDisposable { private readonly IServiceProvider _provider; private readonly ILogger<DataflowCommandBus>? _logger; private readonly ConcurrentDictionary<Type, Func<object>> _handlerCache = new(); private readonly ConcurrentDictionary<Type, Func<object[]>> _behaviorsCache = new(); // 數據流網絡 private ActionBlock<DataflowCommandRequest> _commandProcessor = null!; // 背壓控制 private readonly SemaphoreSlim _concurrencyLimiter; private readonly int _maxConcurrency; // 監控指標 private long _processedCommands; private long _failedCommands; private long _totalProcessingTime; public DataflowCommandBus(IServiceProvider serviceProvider, ILogger<DataflowCommandBus>? logger = null, int? maxConcurrency = null) { _provider = serviceProvider; _logger = logger; _maxConcurrency = maxConcurrency ?? Environment.ProcessorCount * 2; _concurrencyLimiter = new SemaphoreSlim(_maxConcurrency, _maxConcurrency); // 創建數據流網絡 CreateDataflowNetwork(); } private void CreateDataflowNetwork() { // 創建命令處理器 _commandProcessor = new ActionBlock<DataflowCommandRequest>( async request => { try { await _concurrencyLimiter.WaitAsync(); var startTime = DateTime.UtcNow; // 執行完整的命令處理管道 var result = await ProcessCommandPipeline(request); var processingTime = DateTime.UtcNow - startTime; Interlocked.Add(ref _totalProcessingTime, processingTime.Ticks); Interlocked.Increment(ref _processedCommands); request.TaskCompletionSource.SetResult(result); } catch (Exception ex) { Interlocked.Increment(ref _failedCommands); _logger?.LogError(ex, "Command processing failed for {CommandType}", request.CommandType.Name); request.TaskCompletionSource.SetException(ex); } finally { _concurrencyLimiter.Release(); } }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = _maxConcurrency, BoundedCapacity = _maxConcurrency * 2 }); } public async Task<TResult> SendAsync<TCommand, TResult>(TCommand command, CancellationToken ct = default) where TCommand : ICommand<TResult> { var commandType = typeof(TCommand); var requestId = Guid.NewGuid(); var tcs = new TaskCompletionSource<object>(); var request = new DataflowCommandRequest(requestId, commandType, typeof(TResult), command, tcs); // 發送到數據流網絡 if (!_commandProcessor.Post(request)) { throw new InvalidOperationException("Unable to queue command for processing - system may be overloaded"); } try { var result = await tcs.Task.WaitAsync(ct); return (TResult)result; } catch (OperationCanceledException) when (ct.IsCancellationRequested) { _logger?.LogWarning("Command {CommandType} was cancelled", commandType.Name); throw; } } private async Task<object> ProcessCommandPipeline(DataflowCommandRequest request) { // 使用反射調用泛型方法 var method = typeof(DataflowCommandBus).GetMethod(nameof(ProcessCommandPipelineGeneric), BindingFlags.NonPublic | BindingFlags.Instance); var genericMethod = method!.MakeGenericMethod(request.CommandType, request.ResultType); var task = (Task)genericMethod.Invoke(this, new object[] { request })!; await task; var resultProperty = task.GetType().GetProperty("Result"); return resultProperty?.GetValue(task) ?? throw new InvalidOperationException("Failed to get result from task"); } private async Task<TResult> ProcessCommandPipelineGeneric<TCommand, TResult>(DataflowCommandRequest request) where TCommand : ICommand<TResult> { // 獲取處理器和行為的工廠函數 var handlerFactory = GetCachedHandler<TCommand, TResult>(request.CommandType); var behaviorsFactory = GetCachedBehaviors<TCommand, TResult>(request.CommandType); // 創建處理器和行為的實例 var handler = handlerFactory(); var behaviors = behaviorsFactory(); // 構建處理管道 Func<Task<TResult>> pipeline = () => ExecuteHandler<TCommand, TResult>(handler, (TCommand)request.Command); // 按順序應用管道行為 foreach (var behavior in behaviors.Reverse()) { var currentBehavior = behavior; var currentPipeline = pipeline; pipeline = async () => (TResult)await ExecuteBehavior(currentBehavior, (TCommand)request.Command, currentPipeline); } return await pipeline(); } private async Task<object> ExecuteBehavior<TCommand, TResult>( ICommandPipelineBehavior<TCommand, TResult> behavior, TCommand command, Func<Task<TResult>> next) where TCommand : ICommand<TResult> { try { var result = await behavior.Handle(command, next, CancellationToken.None); return result!; } catch (Exception ex) { throw new InvalidOperationException($"Error executing behavior {behavior.GetType().Name}: {ex.Message}", ex); } } private Func<ICommandHandler<TCommand, TResult>> GetCachedHandler<TCommand, TResult>(Type commandType) where TCommand : ICommand<TResult> { return (Func<ICommandHandler<TCommand, TResult>>)_handlerCache.GetOrAdd(commandType, _ => { return new Func<ICommandHandler<TCommand, TResult>>(() => { using var scope = _provider.CreateScope(); var handler = scope.ServiceProvider.GetService<ICommandHandler<TCommand, TResult>>(); if (handler == null) throw new InvalidOperationException($"No handler registered for {commandType.Name}"); return handler; }); }); } private Func<ICommandPipelineBehavior<TCommand, TResult>[]> GetCachedBehaviors<TCommand, TResult>(Type commandType) where TCommand : ICommand<TResult> { return (Func<ICommandPipelineBehavior<TCommand, TResult>[]>)_behaviorsCache.GetOrAdd(commandType, _ => { return new Func<ICommandPipelineBehavior<TCommand, TResult>[]>(() => { using var scope = _provider.CreateScope(); var behaviors = scope.ServiceProvider.GetServices<ICommandPipelineBehavior<TCommand, TResult>>().ToArray(); return behaviors; }); }); } private async Task<TResult> ExecuteHandler<TCommand, TResult>(ICommandHandler<TCommand, TResult> handler, TCommand command) where TCommand : ICommand<TResult> { return await handler.HandleAsync(command, CancellationToken.None); } private async Task<object> ExecuteHandler(object handler, object command) { var handlerType = handler.GetType(); var handleMethod = handlerType.GetMethod("HandleAsync"); if (handleMethod == null) throw new InvalidOperationException($"Handler {handlerType.Name} does not have HandleAsync method"); var task = (Task)handleMethod.Invoke(handler, new object[] { command, CancellationToken.None })!; await task; var resultProperty = task.GetType().GetProperty("Result"); return resultProperty?.GetValue(task) ?? throw new InvalidOperationException("Failed to get result from task"); } private Func<object> GetCachedHandler(Type commandType) { return _handlerCache.GetOrAdd(commandType, _ => { // 獲取命令類型實現的ICommand<TResult>接口 var commandInterface = commandType.GetInterfaces() .FirstOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ICommand<>)); if (commandInterface == null) throw new InvalidOperationException($"Command type {commandType.Name} does not implement ICommand<TResult>"); var resultType = commandInterface.GetGenericArguments()[0]; var handlerType = typeof(ICommandHandler<,>).MakeGenericType(commandType, resultType); // 返回一個工廠函數,而不是直接返回處理器實例 return new Func<object>(() => { using var scope = _provider.CreateScope(); var handler = scope.ServiceProvider.GetService(handlerType); if (handler == null) throw new InvalidOperationException($"No handler registered for {commandType.Name}"); return handler; }); }); } private Func<object[]> GetCachedBehaviors(Type commandType) { return _behaviorsCache.GetOrAdd(commandType, _ => { // 獲取命令類型實現的ICommand<TResult>接口 var commandInterface = commandType.GetInterfaces() .FirstOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ICommand<>)); if (commandInterface == null) throw new InvalidOperationException($"Command type {commandType.Name} does not implement ICommand<TResult>"); var resultType = commandInterface.GetGenericArguments()[0]; var behaviorType = typeof(ICommandPipelineBehavior<,>).MakeGenericType(commandType, resultType); // 返回一個工廠函數,而不是直接返回行為實例 return new Func<object[]>(() => { using var scope = _provider.CreateScope(); var behaviors = scope.ServiceProvider.GetServices(behaviorType).Where(b => b != null).ToArray(); return behaviors!; }); }); } // 監控和統計方法 public DataflowMetrics GetMetrics() { return new DataflowMetrics { ProcessedCommands = Interlocked.Read(ref _processedCommands), FailedCommands = Interlocked.Read(ref _failedCommands), TotalProcessingTime = TimeSpan.FromTicks(Interlocked.Read(ref _totalProcessingTime)), AverageProcessingTime = _processedCommands > 0 ? TimeSpan.FromTicks(Interlocked.Read(ref _totalProcessingTime) / _processedCommands) : TimeSpan.Zero, AvailableConcurrency = _concurrencyLimiter.CurrentCount, MaxConcurrency = _maxConcurrency, InputQueueSize = _commandProcessor.InputCount }; } public void ClearCache() { _handlerCache.Clear(); _behaviorsCache.Clear(); } public void Dispose() { _commandProcessor?.Complete(); _concurrencyLimiter?.Dispose(); } } // 輔助類 internal class DataflowCommandRequest { public Guid Id { get; } public Type CommandType { get; } public Type ResultType { get; } public object Command { get; } public TaskCompletionSource<object> TaskCompletionSource { get; } public DataflowCommandRequest(Guid id, Type commandType, Type resultType, object command, TaskCompletionSource<object> tcs) { Id = id; CommandType = commandType; ResultType = resultType; Command = command; TaskCompletionSource = tcs; } } }
這里如果不是數據流方式可以使用通用模式:
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Common.Bus.Core; namespace Common.Bus.Implementations { public class CommandBus : ICommandBus { private readonly IServiceProvider _provider; private readonly ConcurrentDictionary<Type, Func<object>> _handlerCache = new(); private readonly ConcurrentDictionary<Type, Func<object[]>> _behaviorsCache = new(); private readonly ConcurrentDictionary<Type, Func<object, object, CancellationToken, Task<object>>> _pipelineCache = new(); public CommandBus(IServiceProvider serviceProvider) { _provider = serviceProvider; } // 添加清理緩存的方法,用于測試或動態重新加載 public void ClearCache() { _handlerCache.Clear(); _behaviorsCache.Clear(); _pipelineCache.Clear(); } public async Task<TResult> SendAsync<TCommand, TResult>(TCommand command, CancellationToken ct = default) where TCommand : ICommand<TResult> { var commandType = typeof(TCommand); // 獲取緩存的Handler var handler = GetCachedHandler<TCommand, TResult>(commandType); // 獲取緩存的Pipeline var pipeline = GetCachedPipeline<TCommand, TResult>(commandType); // 執行Pipeline var result = await pipeline(handler, command, ct); return (TResult)result; } private ICommandHandler<TCommand, TResult> GetCachedHandler<TCommand, TResult>(Type commandType) where TCommand : ICommand<TResult> { var handlerFactory = (Func<object>)_handlerCache.GetOrAdd(commandType, _ => { return new Func<object>(() => { using var scope = _provider.CreateScope(); var handler = scope.ServiceProvider.GetService(typeof(ICommandHandler<TCommand, TResult>)); if (handler == null) throw new InvalidOperationException($"No handler registered for {commandType.Name}"); return handler; }); }); return (ICommandHandler<TCommand, TResult>)handlerFactory(); } private ICommandPipelineBehavior<TCommand, TResult>[] GetCachedBehaviors<TCommand, TResult>(Type commandType) where TCommand : ICommand<TResult> { var behaviorsFactory = (Func<object[]>)_behaviorsCache.GetOrAdd(commandType, _ => { return new Func<object[]>(() => { using var scope = _provider.CreateScope(); var behaviors = scope.ServiceProvider.GetServices<ICommandPipelineBehavior<TCommand, TResult>>().ToArray(); return behaviors.Cast<object>().ToArray(); }); }); return behaviorsFactory().Cast<ICommandPipelineBehavior<TCommand, TResult>>().ToArray(); } private Func<object, object, CancellationToken, Task<object>> GetCachedPipeline<TCommand, TResult>(Type commandType) where TCommand : ICommand<TResult> { return _pipelineCache.GetOrAdd(commandType, _ => { var behaviors = GetCachedBehaviors<TCommand, TResult>(commandType); // 預構建Pipeline,避免每次調用時重新構建 return async (handler, command, ct) => { if (handler == null || command == null) throw new ArgumentNullException("Handler or command cannot be null"); var typedHandler = (ICommandHandler<TCommand, TResult>)handler; var typedCommand = (TCommand)command; // 如果沒有behaviors,直接調用handler if (behaviors.Length == 0) { var result = await typedHandler.HandleAsync(typedCommand, ct); return (object)result!; } // 使用遞歸方式構建pipeline,減少委托創建 var pipelineResult = await ExecutePipeline(typedHandler, typedCommand, behaviors, 0, ct); return (object)pipelineResult!; }; }); } private async Task<TResult> ExecutePipeline<TCommand, TResult>( ICommandHandler<TCommand, TResult> handler, TCommand command, ICommandPipelineBehavior<TCommand, TResult>[] behaviors, int behaviorIndex, CancellationToken ct) where TCommand : ICommand<TResult> { if (behaviorIndex >= behaviors.Length) { return await handler.HandleAsync(command, ct); } var behavior = behaviors[behaviorIndex]; return await behavior.Handle(command, () => ExecutePipeline(handler, command, behaviors, behaviorIndex + 1, ct), ct); } } }
其他批量操作、帶監控等模式就參考其他代碼:
exercisebook/AOP/EventBusAOP/AopNew at main · liuzhixin405/exercisebook
一下是項目更詳細介紹,如有錯誤多多指正:
# CommandBus AOP 項目
這是一個基于AOP(面向切面編程)的CommandBus項目,使用TPL Dataflow進行數據流處理優化,支持多種CommandBus實現和實時監控。
## CommandBus實現類型
### 1. Standard CommandBus
- **類型**: `CommandBusType.Standard`
- **特點**: 標準同步處理,適合簡單場景
- **控制器**: `StandardCommandBusController`
### 2. Dataflow CommandBus
- **類型**: `CommandBusType.Dataflow`
- **特點**: 基于TPL Dataflow的異步并發處理,適合高并發場景
- **控制器**: `DataflowCommandBusController`
### 3. Batch Dataflow CommandBus
- **類型**: `CommandBusType.BatchDataflow`
- **特點**: 支持批量處理,適合大批量數據場景
- **控制器**: `BatchDataflowCommandBusController`
### 4. Typed Dataflow CommandBus
- **類型**: `CommandBusType.TypedDataflow`
- **特點**: 強類型安全,適合復雜業務場景
- **控制器**: `TypedDataflowCommandBusController`
### 5. Monitored CommandBus
- **類型**: `CommandBusType.Monitored`
- **特點**: 包含性能監控,適合生產環境
- **控制器**: `MonitoredCommandBusController`
這里有一個擴展點behavior,可以注入前后時間,當前代代碼只做了業務前的攔截,業務后的可以如法炮制。這樣的話就是一個aop,那么跟aop切面編程又有什么區別和共同點呢?
// 構建處理管道 Func<Task<TResult>> pipeline = () => ExecuteHandler<TCommand, TResult>(handler, (TCommand)request.Command); // 按順序應用管道行為 foreach (var behavior in behaviors.Reverse()) { var currentBehavior = behavior; var currentPipeline = pipeline; pipeline = async () => (TResult)await ExecuteBehavior(currentBehavior, (TCommand)request.Command, currentPipeline); } return await pipeline(); }
?? 共同點
-
目標一致:都是把 橫切關注點(Logging、Validation、Transaction、Caching 等)從業務邏輯里抽離出來。
-
調用鏈模式:無論是 AOP 的 攔截器鏈,還是 CommandBus 的 Behavior 管道,最終都是一層層包裝,最后執行真正的業務邏輯。
-
可插拔:可以動態增加/減少某個橫切邏輯,而不用改業務代碼。
?? 區別
| 特性 | AOP (動態代理 / 攔截器) | CommandBus + Behavior |
|---|---|---|
| 觸發方式 | 方法調用時攔截(通過代理/動態代理實現) | 命令執行時經過管道(需要顯式通過 CommandBus 調用) |
| 范圍 | 通用(任何類方法都能攔截) | 限定在 Command 處理(CQRS 場景特化) |
| 技術實現 | 依賴 DI 容器攔截、中間件 或 編譯期注入 | 依賴 Pipeline 模式,類似 MediatR 的 IPipelineBehavior<TRequest,TResponse> |
| 靈活度 | 更通用,可以橫跨全項目(比如給 Service 層所有方法加日志) | 針對性更強,主要是 Command/Query 的執行鏈 |
| 侵入性 | 低,業務代碼不用改(只要接口/虛方法即可) |
性能對比總結
| 維度 | AOP(動態代理) | CommandBus + Behavior |
|---|---|---|
| 調用開銷 | 需要動態代理/反射 | 普通方法調用 |
| 可擴展性 | 全局通用 | 局部(命令/查詢) |
| 性能損耗 | 相對較高 | 相對較低 |
| 場景 | 橫切關注點,通用功能 | CQRS 業務管道,高性能場景 |
兩者一句話總結:
-
AOP 更通用,但性能稍差(尤其高并發、核心鏈路要慎用)。
-
CommandBus + Behavior 更高效,但應用范圍窄(主要適合命令/查詢處理管道)。


浙公網安備 33010602011771號