【RabbitMQ】RPC模式(請求/回復)
本章目標
-
理解RabbitMQ RPC模式的工作原理和適用場景。
-
掌握回調隊列(Callback Queue)和關聯ID(Correlation Id)的使用。
-
實現基于RabbitMQ的異步RPC調用。
-
學習RPC模式下的錯誤處理和超時機制。
-
構建完整的微服務間同步通信解決方案。
一、理論部分
1. RPC模式簡介
RPC(Remote Procedure Call)模式允許客戶端應用程序調用遠程服務器上的方法,就像調用本地方法一樣。在RabbitMQ中,RPC是通過消息隊列實現的異步RPC。
與傳統HTTP RPC的區別:
-
HTTP RPC:同步,直接連接,需要服務端在線
-
消息隊列RPC:異步,通過消息代理,支持解耦和負載均衡
2. RabbitMQ RPC核心組件
-
請求隊列(Request Queue):客戶端發送請求的隊列
-
回復隊列(Reply Queue):服務器返回響應的隊列
-
關聯ID(Correlation Id):匹配請求和響應的唯一標識
-
消息屬性:使用
IBasicProperties.ReplyTo和IBasicProperties.CorrelationId
3. RPC工作流程
Client端: 1. 生成唯一CorrelationId 2. 創建臨時回復隊列 3. 發送請求到請求隊列,設置ReplyTo和CorrelationId 4. 監聽回復隊列,等待匹配的CorrelationId Server端: 1. 監聽請求隊列 2. 處理請求 3. 將響應發送到請求中的ReplyTo隊列 4. 設置相同的CorrelationId Client端: 5. 收到響應,根據CorrelationId匹配請求 6. 處理響應
4. 適用場景
-
需要同步響應的異步操作
-
微服務間的同步通信
-
計算密集型任務的分布式處理
-
需要負載均衡的同步調用
二、實操部分:構建分布式計算服務
我們將創建一個分布式斐波那契數列計算服務,演示完整的RPC模式實現。
第1步:創建項目結構
# 創建解決方案 dotnet new sln -n RpcSystem # 創建項目 dotnet new webapi -n RpcClient.API dotnet new classlib -n RpcClient.Core dotnet new classlib -n RpcServer.Service dotnet new classlib -n RpcShared # 添加到解決方案 dotnet sln add RpcClient.API/RpcClient.API.csproj dotnet sln add RpcClient.Core/RpcClient.Core.csproj dotnet sln add RpcServer.Service/RpcServer.Service.csproj dotnet sln add RpcShared/RpcShared.csproj # 添加項目引用 dotnet add RpcClient.API reference RpcClient.Core dotnet add RpcClient.API reference RpcShared dotnet add RpcClient.Core reference RpcShared dotnet add RpcServer.Service reference RpcShared # 添加NuGet包 cd RpcClient.API dotnet add package RabbitMQ.Client cd ../RpcClient.Core dotnet add package RabbitMQ.Client cd ../RpcServer.Service dotnet add package RabbitMQ.Client
第2步:定義共享模型(RpcShared)
Models/RpcRequest.cs
using System.Text.Json.Serialization; namespace RpcShared.Models { public class RpcRequest { [JsonPropertyName("requestId")] public string RequestId { get; set; } = Guid.NewGuid().ToString(); [JsonPropertyName("method")] public string Method { get; set; } = string.Empty; [JsonPropertyName("parameters")] public Dictionary<string, object> Parameters { get; set; } = new(); [JsonPropertyName("timestamp")] public DateTime Timestamp { get; set; } = DateTime.UtcNow; public RpcRequest WithParameter(string key, object value) { Parameters[key] = value; return this; } public T? GetParameter<T>(string key) { if (Parameters.TryGetValue(key, out var value)) { try { return (T)Convert.ChangeType(value, typeof(T)); } catch { return default; } } return default; } } }
Models/RpcResponse.cs
using System.Text.Json.Serialization; namespace RpcShared.Models { public class RpcResponse { [JsonPropertyName("requestId")] public string RequestId { get; set; } = string.Empty; [JsonPropertyName("success")] public bool Success { get; set; } [JsonPropertyName("data")] public object? Data { get; set; } [JsonPropertyName("error")] public string? Error { get; set; } [JsonPropertyName("timestamp")] public DateTime Timestamp { get; set; } = DateTime.UtcNow; [JsonPropertyName("processingTimeMs")] public long ProcessingTimeMs { get; set; } public static RpcResponse SuccessResponse(string requestId, object data, long processingTimeMs = 0) { return new RpcResponse { RequestId = requestId, Success = true, Data = data, ProcessingTimeMs = processingTimeMs }; } public static RpcResponse ErrorResponse(string requestId, string error, long processingTimeMs = 0) { return new RpcResponse { RequestId = requestId, Success = false, Error = error, ProcessingTimeMs = processingTimeMs }; } public T? GetData<T>() { if (Data is JsonElement jsonElement) { return jsonElement.Deserialize<T>(); } return Data is T typedData ? typedData : default; } } }
Messages/FibonacciRequest.cs
namespace RpcShared.Messages { public class FibonacciRequest { public int Number { get; set; } public bool UseOptimizedAlgorithm { get; set; } = true; } public class FibonacciResponse { public long Result { get; set; } public long CalculationTimeMs { get; set; } public int InputNumber { get; set; } } }
第3步:RPC客戶端核心庫(RpcClient.Core)
Services/IRpcClient.cs
using RpcShared.Models; namespace RpcClient.Core.Services { public interface IRpcClient : IDisposable { Task<RpcResponse> CallAsync(RpcRequest request, TimeSpan timeout); Task<TResponse?> CallAsync<TResponse>(RpcRequest request, TimeSpan timeout) where TResponse : class; } }
Services/RpcClient.cs
using System.Collections.Concurrent; using System.Text; using System.Text.Json; using Microsoft.Extensions.Logging; using RabbitMQ.Client; using RabbitMQ.Client.Events; using RpcShared.Models; namespace RpcClient.Core.Services { public class RpcClient : IRpcClient { private readonly IConnection _connection; private readonly IModel _channel; private readonly ILogger<RpcClient> _logger; private readonly string _replyQueueName; private readonly ConcurrentDictionary<string, TaskCompletionSource<RpcResponse>> _pendingRequests; private readonly AsyncEventingBasicConsumer _consumer; private bool _disposed = false; public RpcClient( IConnectionFactory connectionFactory, ILogger<RpcClient> logger) { _logger = logger; _pendingRequests = new ConcurrentDictionary<string, TaskCompletionSource<RpcResponse>>(); // 建立連接和通道 _connection = connectionFactory.CreateConnection(); _channel = _connection.CreateModel(); // 聲明臨時回復隊列(排他性,連接關閉時自動刪除) _replyQueueName = _channel.QueueDeclare( queue: "", durable: false, exclusive: true, autoDelete: true, arguments: null).QueueName; // 創建消費者監聽回復隊列 _consumer = new AsyncEventingBasicConsumer(_channel); _consumer.Received += OnResponseReceived; // 開始消費回復隊列 _channel.BasicConsume( queue: _replyQueueName, autoAck: false, consumer: _consumer); _logger.LogInformation("RPC Client initialized with reply queue: {ReplyQueue}", _replyQueueName); } public async Task<RpcResponse> CallAsync(RpcRequest request, TimeSpan timeout) { if (_disposed) throw new ObjectDisposedException(nameof(RpcClient)); var tcs = new TaskCompletionSource<RpcResponse>(); var cancellationTokenSource = new CancellationTokenSource(timeout); // 注冊超時取消 cancellationTokenSource.Token.Register(() => { if (_pendingRequests.TryRemove(request.RequestId, out var removedTcs)) { removedTcs.TrySetException(new TimeoutException($"RPC call timed out after {timeout.TotalSeconds} seconds")); _logger.LogWarning("RPC request {RequestId} timed out", request.RequestId); } }); // 將請求添加到待處理字典 if (!_pendingRequests.TryAdd(request.RequestId, tcs)) { throw new InvalidOperationException($"Request with ID {request.RequestId} is already pending"); } try { // 序列化請求 var requestJson = JsonSerializer.Serialize(request); var requestBody = Encoding.UTF8.GetBytes(requestJson); // 設置消息屬性 var properties = _channel.CreateBasicProperties(); properties.ReplyTo = _replyQueueName; properties.CorrelationId = request.RequestId; properties.Persistent = true; _logger.LogDebug("Sending RPC request {RequestId} to queue: rpc_queue", request.RequestId); // 發布請求到RPC隊列 _channel.BasicPublish( exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: requestBody); _logger.LogInformation("RPC request {RequestId} sent successfully", request.RequestId); // 等待響應 return await tcs.Task; } catch (Exception ex) { // 發生異常時移除待處理請求 _pendingRequests.TryRemove(request.RequestId, out _); _logger.LogError(ex, "Error sending RPC request {RequestId}", request.RequestId); throw; } } public async Task<TResponse?> CallAsync<TResponse>(RpcRequest request, TimeSpan timeout) where TResponse : class { var response = await CallAsync(request, timeout); if (!response.Success) { throw new InvalidOperationException($"RPC call failed: {response.Error}"); } return response.GetData<TResponse>(); } private async Task OnResponseReceived(object sender, BasicDeliverEventArgs ea) { var responseBody = ea.Body.ToArray(); var responseJson = Encoding.UTF8.GetString(responseBody); var correlationId = ea.BasicProperties.CorrelationId; _logger.LogDebug("Received RPC response for correlation ID: {CorrelationId}", correlationId); try { var response = JsonSerializer.Deserialize<RpcResponse>(responseJson); if (response == null) { _logger.LogError("Failed to deserialize RPC response for correlation ID: {CorrelationId}", correlationId); return; } // 查找匹配的待處理請求 if (_pendingRequests.TryRemove(correlationId, out var tcs)) { tcs.TrySetResult(response); _logger.LogDebug("RPC response for {CorrelationId} delivered to waiting task", correlationId); } else { _logger.LogWarning("Received response for unknown correlation ID: {CorrelationId}", correlationId); } // 手動確認消息 _channel.BasicAck(ea.DeliveryTag, false); } catch (Exception ex) { _logger.LogError(ex, "Error processing RPC response for correlation ID: {CorrelationId}", correlationId); // 處理失敗時拒絕消息(不重新入隊) _channel.BasicNack(ea.DeliveryTag, false, false); // 如果反序列化失敗,仍然通知等待的任務 if (_pendingRequests.TryRemove(correlationId, out var tcs)) { tcs.TrySetException(new InvalidOperationException("Failed to process RPC response")); } } await Task.CompletedTask; } public void Dispose() { if (!_disposed) { _disposed = true; // 取消所有待處理的請求 foreach (var (requestId, tcs) in _pendingRequests) { tcs.TrySetCanceled(); } _pendingRequests.Clear(); _channel?.Close(); _channel?.Dispose(); _connection?.Close(); _connection?.Dispose(); _logger.LogInformation("RPC Client disposed"); } } } }
Services/FibonacciRpcClient.cs
using RpcClient.Core.Services; using RpcShared.Messages; using RpcShared.Models; namespace RpcClient.Core.Services { public class FibonacciRpcClient { private readonly IRpcClient _rpcClient; private readonly ILogger<FibonacciRpcClient> _logger; public FibonacciRpcClient(IRpcClient rpcClient, ILogger<FibonacciRpcClient> logger) { _rpcClient = rpcClient; _logger = logger; } public async Task<long> CalculateFibonacciAsync(int number, bool useOptimized = true, TimeSpan? timeout = null) { var request = new RpcRequest { Method = "fibonacci.calculate", Timestamp = DateTime.UtcNow } .WithParameter("number", number) .WithParameter("useOptimized", useOptimized); timeout ??= TimeSpan.FromSeconds(30); try { _logger.LogInformation("Calculating Fibonacci({Number}) with timeout {Timeout}s", number, timeout.Value.TotalSeconds); var response = await _rpcClient.CallAsync<FibonacciResponse>(request, timeout.Value); if (response != null) { _logger.LogInformation( "Fibonacci({Number}) = {Result} (calculated in {Time}ms)", number, response.Result, response.CalculationTimeMs); return response.Result; } throw new InvalidOperationException("Received null response from RPC server"); } catch (TimeoutException ex) { _logger.LogError(ex, "Fibonacci calculation timed out for number {Number}", number); throw; } catch (Exception ex) { _logger.LogError(ex, "Error calculating Fibonacci for number {Number}", number); throw; } } public async Task<FibonacciResponse> CalculateFibonacciDetailedAsync(int number, bool useOptimized = true, TimeSpan? timeout = null) { var request = new RpcRequest { Method = "fibonacci.calculate", Timestamp = DateTime.UtcNow } .WithParameter("number", number) .WithParameter("useOptimized", useOptimized); timeout ??= TimeSpan.FromSeconds(30); var response = await _rpcClient.CallAsync<FibonacciResponse>(request, timeout.Value); return response ?? throw new InvalidOperationException("Received null response from RPC server"); } } }
第4步:RPC客戶端API(RpcClient.API)
Program.cs
using RpcClient.API.Services; using RpcClient.Core.Services; using RpcShared.Models; using RabbitMQ.Client; var builder = WebApplication.CreateBuilder(args); // Add services to the container. builder.Services.AddControllers(); builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen(); // Configure RabbitMQ builder.Services.AddSingleton<IConnectionFactory>(sp => { var configuration = sp.GetRequiredService<IConfiguration>(); return new ConnectionFactory { HostName = configuration["RabbitMQ:HostName"], UserName = configuration["RabbitMQ:UserName"], Password = configuration["RabbitMQ:Password"], Port = int.Parse(configuration["RabbitMQ:Port"] ?? "5672"), VirtualHost = configuration["RabbitMQ:VirtualHost"] ?? "/", DispatchConsumersAsync = true }; }); // Register RPC services builder.Services.AddSingleton<IRpcClient, RpcClient>(); builder.Services.AddScoped<FibonacciRpcClient>(); builder.Services.AddScoped<IMathRpcService, MathRpcService>(); // Add health checks builder.Services.AddHealthChecks() .AddRabbitMQ(provider => { var factory = provider.GetRequiredService<IConfiguration>(); return factory.CreateConnection(); }); var app = builder.Build(); // Configure the HTTP request pipeline. if (app.Environment.IsDevelopment()) { app.UseSwagger(); app.UseSwaggerUI(); } app.UseHttpsRedirection(); app.UseAuthorization(); app.MapControllers(); app.MapHealthChecks("/health"); app.Run();
Services/IMathRpcService.cs
using RpcShared.Messages; namespace RpcClient.API.Services { public interface IMathRpcService { Task<long> CalculateFibonacciAsync(int number); Task<FibonacciResponse> CalculateFibonacciDetailedAsync(int number); Task<bool> HealthCheckAsync(); } }
Services/MathRpcService.cs
using RpcClient.Core.Services; using RpcShared.Messages; namespace RpcClient.API.Services { public class MathRpcService : IMathRpcService { private readonly FibonacciRpcClient _fibonacciClient; private readonly ILogger<MathRpcService> _logger; public MathRpcService(FibonacciRpcClient fibonacciClient, ILogger<MathRpcService> logger) { _fibonacciClient = fibonacciClient; _logger = logger; } public async Task<long> CalculateFibonacciAsync(int number) { if (number < 0) throw new ArgumentException("Number must be non-negative", nameof(number)); if (number > 50) throw new ArgumentException("Number too large for demonstration", nameof(number)); return await _fibonacciClient.CalculateFibonacciAsync(number); } public async Task<FibonacciResponse> CalculateFibonacciDetailedAsync(int number) { if (number < 0) throw new ArgumentException("Number must be non-negative", nameof(number)); if (number > 50) throw new ArgumentException("Number too large for demonstration", nameof(number)); return await _fibonacciClient.CalculateFibonacciDetailedAsync(number); } public async Task<bool> HealthCheckAsync() { try { // 簡單的健康檢查:計算 Fibonacci(1) var result = await _fibonacciClient.CalculateFibonacciAsync(1, timeout: TimeSpan.FromSeconds(5)); return result == 1; } catch (Exception ex) { _logger.LogWarning(ex, "RPC health check failed"); return false; } } } }
Controllers/MathController.cs
using Microsoft.AspNetCore.Mvc; using RpcClient.API.Services; using RpcShared.Messages; namespace RpcClient.API.Controllers { [ApiController] [Route("api/[controller]")] public class MathController : ControllerBase { private readonly IMathRpcService _mathService; private readonly ILogger<MathController> _logger; public MathController(IMathRpcService mathService, ILogger<MathController> logger) { _mathService = mathService; _logger = logger; } [HttpGet("fibonacci/{number}")] public async Task<ActionResult<long>> CalculateFibonacci(int number) { try { _logger.LogInformation("Calculating Fibonacci({Number}) via RPC", number); var result = await _mathService.CalculateFibonacciAsync(number); return Ok(result); } catch (ArgumentException ex) { return BadRequest(ex.Message); } catch (TimeoutException ex) { _logger.LogWarning(ex, "Fibonacci calculation timed out for number {Number}", number); return StatusCode(408, "Calculation timed out"); } catch (Exception ex) { _logger.LogError(ex, "Error calculating Fibonacci for number {Number}", number); return StatusCode(500, "Internal server error"); } } [HttpGet("fibonacci/{number}/detailed")] public async Task<ActionResult<FibonacciResponse>> CalculateFibonacciDetailed(int number) { try { _logger.LogInformation("Calculating Fibonacci({Number}) with details via RPC", number); var result = await _mathService.CalculateFibonacciDetailedAsync(number); return Ok(result); } catch (ArgumentException ex) { return BadRequest(ex.Message); } catch (TimeoutException ex) { _logger.LogWarning(ex, "Fibonacci calculation timed out for number {Number}", number); return StatusCode(408, "Calculation timed out"); } catch (Exception ex) { _logger.LogError(ex, "Error calculating Fibonacci for number {Number}", number); return StatusCode(500, "Internal server error"); } } [HttpGet("health")] public async Task<ActionResult> HealthCheck() { var isHealthy = await _mathService.HealthCheckAsync(); return isHealthy ? Ok("RPC service is healthy") : StatusCode(503, "RPC service is unavailable"); } } }
第5步:RPC服務器(RpcServer.Service)
Program.cs
using RpcServer.Service.Services; using RpcShared.Models; using RabbitMQ.Client; var builder = Host.CreateApplicationBuilder(args); builder.Services.AddHostedService<FibonacciRpcServer>(); // Configure RabbitMQ builder.Services.AddSingleton<IConnectionFactory>(sp => { var configuration = sp.GetRequiredService<IConfiguration>(); return new ConnectionFactory { HostName = configuration["RabbitMQ:HostName"], UserName = configuration["RabbitMQ:UserName"], Password = configuration["RabbitMQ:Password"], Port = int.Parse(configuration["RabbitMQ:Port"] ?? "5672"), VirtualHost = configuration["RabbitMQ:VirtualHost"] ?? "/", DispatchConsumersAsync = true }; }); builder.Services.AddSingleton<FibonacciCalculator>(); var host = builder.Build(); host.Run();
Services/FibonacciCalculator.cs
using RpcShared.Messages; namespace RpcServer.Service.Services { public class FibonacciCalculator { private readonly ILogger<FibonacciCalculator> _logger; private readonly Dictionary<int, long> _cache = new(); public FibonacciCalculator(ILogger<FibonacciCalculator> logger) { _logger = logger; } public FibonacciResponse Calculate(int number, bool useOptimized = true) { var startTime = DateTime.UtcNow; try { _logger.LogInformation("Calculating Fibonacci({Number}) with optimized: {Optimized}", number, useOptimized); long result; if (useOptimized) { result = CalculateOptimized(number); } else { result = CalculateNaive(number); } var calculationTime = (DateTime.UtcNow - startTime).TotalMilliseconds; _logger.LogInformation( "Fibonacci({Number}) = {Result} (calculated in {Time}ms)", number, result, calculationTime); return new FibonacciResponse { Result = result, CalculationTimeMs = (long)calculationTime, InputNumber = number }; } catch (Exception ex) { _logger.LogError(ex, "Error calculating Fibonacci({Number})", number); throw; } } private long CalculateOptimized(int n) { if (n < 0) throw new ArgumentException("Number must be non-negative"); if (n <= 1) return n; // 檢查緩存 if (_cache.TryGetValue(n, out var cachedResult)) { _logger.LogDebug("Cache hit for Fibonacci({Number})", n); return cachedResult; } long a = 0, b = 1; for (int i = 2; i <= n; i++) { var temp = a + b; a = b; b = temp; // 緩存中間結果 if (i % 10 == 0) // 每10個數緩存一次以減少內存使用 { _cache[i] = b; } } // 緩存最終結果 _cache[n] = b; return b; } private long CalculateNaive(int n) { if (n < 0) throw new ArgumentException("Number must be non-negative"); if (n <= 1) return n; // 模擬計算密集型任務 Thread.Sleep(100); return CalculateNaive(n - 1) + CalculateNaive(n - 2); } public void ClearCache() { _cache.Clear(); _logger.LogInformation("Fibonacci cache cleared"); } } }
Services/FibonacciRpcServer.cs
using System.Text; using System.Text.Json; using Microsoft.Extensions.Options; using RabbitMQ.Client; using RabbitMQ.Client.Events; using RpcShared.Messages; using RpcShared.Models; namespace RpcServer.Service.Services { public class FibonacciRpcServer : BackgroundService { private readonly IConnection _connection; private readonly IModel _channel; private readonly FibonacciCalculator _calculator; private readonly ILogger<FibonacciRpcServer> _logger; private const string QueueName = "rpc_queue"; public FibonacciRpcServer( IConnectionFactory connectionFactory, FibonacciCalculator calculator, ILogger<FibonacciRpcServer> logger) { _calculator = calculator; _logger = logger; // 建立連接和通道 _connection = connectionFactory.CreateConnection(); _channel = _connection.CreateModel(); InitializeQueue(); } private void InitializeQueue() { // 聲明RPC請求隊列(持久化) _channel.QueueDeclare( queue: QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // 設置公平分發,每次只處理一個請求 _channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); _logger.LogInformation("Fibonacci RPC Server initialized and listening on queue: {QueueName}", QueueName); } protected override Task ExecuteAsync(CancellationToken stoppingToken) { stoppingToken.ThrowIfCancellationRequested(); var consumer = new AsyncEventingBasicConsumer(_channel); consumer.Received += async (model, ea) => { RpcResponse response = null; string correlationId = ea.BasicProperties.CorrelationId; string replyTo = ea.BasicProperties.ReplyTo; _logger.LogDebug("Received RPC request with correlation ID: {CorrelationId}", correlationId); try { // 處理請求 response = await ProcessRequestAsync(ea.Body.ToArray()); response.RequestId = correlationId; } catch (Exception ex) { _logger.LogError(ex, "Error processing RPC request {CorrelationId}", correlationId); response = RpcResponse.ErrorResponse(correlationId, ex.Message); } finally { // 發送響應 if (!string.IsNullOrEmpty(replyTo)) { SendResponse(replyTo, correlationId, response ?? RpcResponse.ErrorResponse(correlationId, "Unknown error occurred")); } // 確認消息處理完成 _channel.BasicAck(ea.DeliveryTag, false); } }; _channel.BasicConsume( queue: QueueName, autoAck: false, // 手動確認 consumer: consumer); _logger.LogInformation("Fibonacci RPC Server started successfully"); return Task.CompletedTask; } private async Task<RpcResponse> ProcessRequestAsync(byte[] body) { var startTime = DateTime.UtcNow; try { var requestJson = Encoding.UTF8.GetString(body); var request = JsonSerializer.Deserialize<RpcRequest>(requestJson); if (request == null) { return RpcResponse.ErrorResponse("unknown", "Invalid request format"); } _logger.LogInformation("Processing RPC request {RequestId}, Method: {Method}", request.RequestId, request.Method); // 根據方法名路由到不同的處理邏輯 object result = request.Method.ToLowerInvariant() switch { "fibonacci.calculate" => ProcessFibonacciRequest(request), "ping" => new { message = "pong", timestamp = DateTime.UtcNow }, _ => throw new NotSupportedException($"Method {request.Method} is not supported") }; var processingTime = (DateTime.UtcNow - startTime).TotalMilliseconds; return RpcResponse.SuccessResponse( request.RequestId, result, (long)processingTime); } catch (Exception ex) { var processingTime = (DateTime.UtcNow - startTime).TotalMilliseconds; _logger.LogError(ex, "Error processing RPC request"); return RpcResponse.ErrorResponse( "unknown", ex.Message, (long)processingTime); } } private FibonacciResponse ProcessFibonacciRequest(RpcRequest request) { var number = request.GetParameter<int>("number"); var useOptimized = request.GetParameter<bool>("useOptimized") ?? true; if (number < 0) { throw new ArgumentException("Fibonacci number must be non-negative"); } // 防止過大的計算消耗資源 if (number > 50) { throw new ArgumentException("Number too large for calculation"); } return _calculator.Calculate(number, useOptimized); } private void SendResponse(string replyTo, string correlationId, RpcResponse response) { try { var responseJson = JsonSerializer.Serialize(response); var responseBody = Encoding.UTF8.GetBytes(responseJson); var properties = _channel.CreateBasicProperties(); properties.CorrelationId = correlationId; properties.Persistent = true; _channel.BasicPublish( exchange: "", routingKey: replyTo, basicProperties: properties, body: responseBody); _logger.LogDebug("Sent RPC response for correlation ID: {CorrelationId}", correlationId); } catch (Exception ex) { _logger.LogError(ex, "Error sending RPC response for correlation ID: {CorrelationId}", correlationId); } } public override void Dispose() { _channel?.Close(); _channel?.Dispose(); _connection?.Close(); _connection?.Dispose(); _logger.LogInformation("Fibonacci RPC Server disposed"); base.Dispose(); } } }
第6步:高級特性 - 帶重試的RPC客戶端
Services/ResilientRpcClient.cs
using Microsoft.Extensions.Logging; using Polly; using Polly.Retry; using RpcShared.Models; namespace RpcClient.Core.Services { public class ResilientRpcClient : IRpcClient { private readonly IRpcClient _innerClient; private readonly ILogger<ResilientRpcClient> _logger; private readonly AsyncRetryPolicy _retryPolicy; public ResilientRpcClient(IRpcClient innerClient, ILogger<ResilientRpcClient> logger) { _innerClient = innerClient; _logger = logger; // 配置重試策略 _retryPolicy = Policy .Handle<TimeoutException>() .Or<InvalidOperationException>() .WaitAndRetryAsync( retryCount: 3, sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), onRetry: (exception, delay, retryCount, context) => { _logger.LogWarning( "RPC call failed. Retry {RetryCount} after {Delay}ms. Error: {Error}", retryCount, delay.TotalMilliseconds, exception.Message); }); } public async Task<RpcResponse> CallAsync(RpcRequest request, TimeSpan timeout) { return await _retryPolicy.ExecuteAsync(async () => { return await _innerClient.CallAsync(request, timeout); }); } public async Task<TResponse?> CallAsync<TResponse>(RpcRequest request, TimeSpan timeout) where TResponse : class { return await _retryPolicy.ExecuteAsync(async () => { return await _innerClient.CallAsync<TResponse>(request, timeout); }); } public void Dispose() { _innerClient?.Dispose(); } } }
第7步:運行與測試
-
啟動服務
# 終端1:啟動RPC服務器 cd RpcServer.Service dotnet run # 終端2:啟動RPC客戶端API cd RpcClient.API dotnet run -
測試API
# 計算斐波那契數列 curl -X GET "https://localhost:7000/api/math/fibonacci/10" curl -X GET "https://localhost:7000/api/math/fibonacci/20/detailed" # 健康檢查 curl -X GET "https://localhost:7000/api/math/health"
-
測試錯誤場景
# 測試超時(設置很小的超時時間) # 測試無效輸入 curl -X GET "https://localhost:7000/api/math/fibonacci/-5" curl -X GET "https://localhost:7000/api/math/fibonacci/100"
-
觀察日志輸出
-
客戶端發送請求,生成CorrelationId
-
服務器接收請求,處理計算
-
服務器發送響應,使用相同的CorrelationId
-
客戶端接收響應,匹配CorrelationId
-
第8步:性能測試和監控
創建性能測試控制器
[ApiController] [Route("api/[controller]")] public class BenchmarkController : ControllerBase { private readonly IMathRpcService _mathService; private readonly ILogger<BenchmarkController> _logger; public BenchmarkController(IMathRpcService mathService, ILogger<BenchmarkController> logger) { _mathService = mathService; _logger = logger; } [HttpPost("fibonacci/batch")] public async Task<ActionResult> CalculateFibonacciBatch([FromBody] List<int> numbers) { var results = new List<object>(); var totalStopwatch = System.Diagnostics.Stopwatch.StartNew(); foreach (var number in numbers) { var stopwatch = System.Diagnostics.Stopwatch.StartNew(); try { var result = await _mathService.CalculateFibonacciAsync(number); results.Add(new { number, result, success = true, durationMs = stopwatch.ElapsedMilliseconds }); } catch (Exception ex) { results.Add(new { number, success = false, error = ex.Message, durationMs = stopwatch.ElapsedMilliseconds }); } } return Ok(new { totalDurationMs = totalStopwatch.ElapsedMilliseconds, requests = numbers.Count, results }); } }
本章總結
在這一章中,我們完整實現了RabbitMQ的RPC模式:
-
RPC核心概念:理解了回調隊列、關聯ID、請求-響應模式。
-
客戶端實現:創建了能夠發送請求并異步等待響應的RPC客戶端。
-
服務器實現:構建了處理請求并返回響應的RPC服務器。
-
錯誤處理:實現了超時控制、異常處理和重試機制。
-
性能優化:使用緩存和優化算法提高計算效率。
-
** resilience**:通過Polly實現了彈性重試策略。
RPC模式為微服務架構提供了強大的同步通信能力,結合消息隊列的異步特性,既保持了系統的解耦性,又提供了同步調用的便利性。這種模式特別適合需要等待計算結果的分布式任務。

浙公網安備 33010602011771號