<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      【RabbitMQ】RPC模式(請求/回復)

      本章目標

      • 理解RabbitMQ RPC模式的工作原理和適用場景。

      • 掌握回調隊列(Callback Queue)和關聯ID(Correlation Id)的使用。

      • 實現基于RabbitMQ的異步RPC調用。

      • 學習RPC模式下的錯誤處理和超時機制。

      • 構建完整的微服務間同步通信解決方案。


      一、理論部分

      1. RPC模式簡介

      RPC(Remote Procedure Call)模式允許客戶端應用程序調用遠程服務器上的方法,就像調用本地方法一樣。在RabbitMQ中,RPC是通過消息隊列實現的異步RPC。

      與傳統HTTP RPC的區別:

      • HTTP RPC:同步,直接連接,需要服務端在線

      • 消息隊列RPC:異步,通過消息代理,支持解耦和負載均衡

      2. RabbitMQ RPC核心組件

      1. 請求隊列(Request Queue):客戶端發送請求的隊列

      2. 回復隊列(Reply Queue):服務器返回響應的隊列

      3. 關聯ID(Correlation Id):匹配請求和響應的唯一標識

      4. 消息屬性:使用IBasicProperties.ReplyToIBasicProperties.CorrelationId

      3. RPC工作流程

      Client端:
      1. 生成唯一CorrelationId
      2. 創建臨時回復隊列
      3. 發送請求到請求隊列,設置ReplyTo和CorrelationId
      4. 監聽回復隊列,等待匹配的CorrelationId
      
      Server端:
      1. 監聽請求隊列
      2. 處理請求
      3. 將響應發送到請求中的ReplyTo隊列
      4. 設置相同的CorrelationId
      
      Client端:
      5. 收到響應,根據CorrelationId匹配請求
      6. 處理響應

      4. 適用場景

      • 需要同步響應的異步操作

      • 微服務間的同步通信

      • 計算密集型任務的分布式處理

      • 需要負載均衡的同步調用


      二、實操部分:構建分布式計算服務

      我們將創建一個分布式斐波那契數列計算服務,演示完整的RPC模式實現。

      第1步:創建項目結構

      # 創建解決方案
      dotnet new sln -n RpcSystem
      
      # 創建項目
      dotnet new webapi -n RpcClient.API
      dotnet new classlib -n RpcClient.Core
      dotnet new classlib -n RpcServer.Service
      dotnet new classlib -n RpcShared
      
      # 添加到解決方案
      dotnet sln add RpcClient.API/RpcClient.API.csproj
      dotnet sln add RpcClient.Core/RpcClient.Core.csproj
      dotnet sln add RpcServer.Service/RpcServer.Service.csproj
      dotnet sln add RpcShared/RpcShared.csproj
      
      # 添加項目引用
      dotnet add RpcClient.API reference RpcClient.Core
      dotnet add RpcClient.API reference RpcShared
      dotnet add RpcClient.Core reference RpcShared
      dotnet add RpcServer.Service reference RpcShared
      
      # 添加NuGet包
      cd RpcClient.API
      dotnet add package RabbitMQ.Client
      
      cd ../RpcClient.Core
      dotnet add package RabbitMQ.Client
      
      cd ../RpcServer.Service
      dotnet add package RabbitMQ.Client

      第2步:定義共享模型(RpcShared)

      Models/RpcRequest.cs

      using System.Text.Json.Serialization;
      
      namespace RpcShared.Models
      {
          public class RpcRequest
          {
              [JsonPropertyName("requestId")]
              public string RequestId { get; set; } = Guid.NewGuid().ToString();
      
              [JsonPropertyName("method")]
              public string Method { get; set; } = string.Empty;
      
              [JsonPropertyName("parameters")]
              public Dictionary<string, object> Parameters { get; set; } = new();
      
              [JsonPropertyName("timestamp")]
              public DateTime Timestamp { get; set; } = DateTime.UtcNow;
      
              public RpcRequest WithParameter(string key, object value)
              {
                  Parameters[key] = value;
                  return this;
              }
      
              public T? GetParameter<T>(string key)
              {
                  if (Parameters.TryGetValue(key, out var value))
                  {
                      try
                      {
                          return (T)Convert.ChangeType(value, typeof(T));
                      }
                      catch
                      {
                          return default;
                      }
                  }
                  return default;
              }
          }
      }
      View Code

      Models/RpcResponse.cs

      using System.Text.Json.Serialization;
      
      namespace RpcShared.Models
      {
          public class RpcResponse
          {
              [JsonPropertyName("requestId")]
              public string RequestId { get; set; } = string.Empty;
      
              [JsonPropertyName("success")]
              public bool Success { get; set; }
      
              [JsonPropertyName("data")]
              public object? Data { get; set; }
      
              [JsonPropertyName("error")]
              public string? Error { get; set; }
      
              [JsonPropertyName("timestamp")]
              public DateTime Timestamp { get; set; } = DateTime.UtcNow;
      
              [JsonPropertyName("processingTimeMs")]
              public long ProcessingTimeMs { get; set; }
      
              public static RpcResponse SuccessResponse(string requestId, object data, long processingTimeMs = 0)
              {
                  return new RpcResponse
                  {
                      RequestId = requestId,
                      Success = true,
                      Data = data,
                      ProcessingTimeMs = processingTimeMs
                  };
              }
      
              public static RpcResponse ErrorResponse(string requestId, string error, long processingTimeMs = 0)
              {
                  return new RpcResponse
                  {
                      RequestId = requestId,
                      Success = false,
                      Error = error,
                      ProcessingTimeMs = processingTimeMs
                  };
              }
      
              public T? GetData<T>()
              {
                  if (Data is JsonElement jsonElement)
                  {
                      return jsonElement.Deserialize<T>();
                  }
                  return Data is T typedData ? typedData : default;
              }
          }
      }
      View Code

      Messages/FibonacciRequest.cs

      namespace RpcShared.Messages
      {
          public class FibonacciRequest
          {
              public int Number { get; set; }
              public bool UseOptimizedAlgorithm { get; set; } = true;
          }
      
          public class FibonacciResponse
          {
              public long Result { get; set; }
              public long CalculationTimeMs { get; set; }
              public int InputNumber { get; set; }
          }
      }
      View Code

      第3步:RPC客戶端核心庫(RpcClient.Core)

      Services/IRpcClient.cs

      using RpcShared.Models;
      
      namespace RpcClient.Core.Services
      {
          public interface IRpcClient : IDisposable
          {
              Task<RpcResponse> CallAsync(RpcRequest request, TimeSpan timeout);
              Task<TResponse?> CallAsync<TResponse>(RpcRequest request, TimeSpan timeout) where TResponse : class;
          }
      }
      View Code

      Services/RpcClient.cs

      using System.Collections.Concurrent;
      using System.Text;
      using System.Text.Json;
      using Microsoft.Extensions.Logging;
      using RabbitMQ.Client;
      using RabbitMQ.Client.Events;
      using RpcShared.Models;
      
      namespace RpcClient.Core.Services
      {
          public class RpcClient : IRpcClient
          {
              private readonly IConnection _connection;
              private readonly IModel _channel;
              private readonly ILogger<RpcClient> _logger;
              private readonly string _replyQueueName;
              private readonly ConcurrentDictionary<string, TaskCompletionSource<RpcResponse>> _pendingRequests;
              private readonly AsyncEventingBasicConsumer _consumer;
              private bool _disposed = false;
      
              public RpcClient(
                  IConnectionFactory connectionFactory,
                  ILogger<RpcClient> logger)
              {
                  _logger = logger;
                  _pendingRequests = new ConcurrentDictionary<string, TaskCompletionSource<RpcResponse>>();
      
                  // 建立連接和通道
                  _connection = connectionFactory.CreateConnection();
                  _channel = _connection.CreateModel();
      
                  // 聲明臨時回復隊列(排他性,連接關閉時自動刪除)
                  _replyQueueName = _channel.QueueDeclare(
                      queue: "",
                      durable: false,
                      exclusive: true,
                      autoDelete: true,
                      arguments: null).QueueName;
      
                  // 創建消費者監聽回復隊列
                  _consumer = new AsyncEventingBasicConsumer(_channel);
                  _consumer.Received += OnResponseReceived;
      
                  // 開始消費回復隊列
                  _channel.BasicConsume(
                      queue: _replyQueueName,
                      autoAck: false,
                      consumer: _consumer);
      
                  _logger.LogInformation("RPC Client initialized with reply queue: {ReplyQueue}", _replyQueueName);
              }
      
              public async Task<RpcResponse> CallAsync(RpcRequest request, TimeSpan timeout)
              {
                  if (_disposed)
                      throw new ObjectDisposedException(nameof(RpcClient));
      
                  var tcs = new TaskCompletionSource<RpcResponse>();
                  var cancellationTokenSource = new CancellationTokenSource(timeout);
      
                  // 注冊超時取消
                  cancellationTokenSource.Token.Register(() =>
                  {
                      if (_pendingRequests.TryRemove(request.RequestId, out var removedTcs))
                      {
                          removedTcs.TrySetException(new TimeoutException($"RPC call timed out after {timeout.TotalSeconds} seconds"));
                          _logger.LogWarning("RPC request {RequestId} timed out", request.RequestId);
                      }
                  });
      
                  // 將請求添加到待處理字典
                  if (!_pendingRequests.TryAdd(request.RequestId, tcs))
                  {
                      throw new InvalidOperationException($"Request with ID {request.RequestId} is already pending");
                  }
      
                  try
                  {
                      // 序列化請求
                      var requestJson = JsonSerializer.Serialize(request);
                      var requestBody = Encoding.UTF8.GetBytes(requestJson);
      
                      // 設置消息屬性
                      var properties = _channel.CreateBasicProperties();
                      properties.ReplyTo = _replyQueueName;
                      properties.CorrelationId = request.RequestId;
                      properties.Persistent = true;
      
                      _logger.LogDebug("Sending RPC request {RequestId} to queue: rpc_queue", request.RequestId);
      
                      // 發布請求到RPC隊列
                      _channel.BasicPublish(
                          exchange: "",
                          routingKey: "rpc_queue",
                          basicProperties: properties,
                          body: requestBody);
      
                      _logger.LogInformation("RPC request {RequestId} sent successfully", request.RequestId);
      
                      // 等待響應
                      return await tcs.Task;
                  }
                  catch (Exception ex)
                  {
                      // 發生異常時移除待處理請求
                      _pendingRequests.TryRemove(request.RequestId, out _);
                      _logger.LogError(ex, "Error sending RPC request {RequestId}", request.RequestId);
                      throw;
                  }
              }
      
              public async Task<TResponse?> CallAsync<TResponse>(RpcRequest request, TimeSpan timeout) where TResponse : class
              {
                  var response = await CallAsync(request, timeout);
                  
                  if (!response.Success)
                  {
                      throw new InvalidOperationException($"RPC call failed: {response.Error}");
                  }
      
                  return response.GetData<TResponse>();
              }
      
              private async Task OnResponseReceived(object sender, BasicDeliverEventArgs ea)
              {
                  var responseBody = ea.Body.ToArray();
                  var responseJson = Encoding.UTF8.GetString(responseBody);
                  var correlationId = ea.BasicProperties.CorrelationId;
      
                  _logger.LogDebug("Received RPC response for correlation ID: {CorrelationId}", correlationId);
      
                  try
                  {
                      var response = JsonSerializer.Deserialize<RpcResponse>(responseJson);
                      if (response == null)
                      {
                          _logger.LogError("Failed to deserialize RPC response for correlation ID: {CorrelationId}", correlationId);
                          return;
                      }
      
                      // 查找匹配的待處理請求
                      if (_pendingRequests.TryRemove(correlationId, out var tcs))
                      {
                          tcs.TrySetResult(response);
                          _logger.LogDebug("RPC response for {CorrelationId} delivered to waiting task", correlationId);
                      }
                      else
                      {
                          _logger.LogWarning("Received response for unknown correlation ID: {CorrelationId}", correlationId);
                      }
      
                      // 手動確認消息
                      _channel.BasicAck(ea.DeliveryTag, false);
                  }
                  catch (Exception ex)
                  {
                      _logger.LogError(ex, "Error processing RPC response for correlation ID: {CorrelationId}", correlationId);
                      
                      // 處理失敗時拒絕消息(不重新入隊)
                      _channel.BasicNack(ea.DeliveryTag, false, false);
                      
                      // 如果反序列化失敗,仍然通知等待的任務
                      if (_pendingRequests.TryRemove(correlationId, out var tcs))
                      {
                          tcs.TrySetException(new InvalidOperationException("Failed to process RPC response"));
                      }
                  }
      
                  await Task.CompletedTask;
              }
      
              public void Dispose()
              {
                  if (!_disposed)
                  {
                      _disposed = true;
      
                      // 取消所有待處理的請求
                      foreach (var (requestId, tcs) in _pendingRequests)
                      {
                          tcs.TrySetCanceled();
                      }
                      _pendingRequests.Clear();
      
                      _channel?.Close();
                      _channel?.Dispose();
                      _connection?.Close();
                      _connection?.Dispose();
      
                      _logger.LogInformation("RPC Client disposed");
                  }
              }
          }
      }

      Services/FibonacciRpcClient.cs

      using RpcClient.Core.Services;
      using RpcShared.Messages;
      using RpcShared.Models;
      
      namespace RpcClient.Core.Services
      {
          public class FibonacciRpcClient
          {
              private readonly IRpcClient _rpcClient;
              private readonly ILogger<FibonacciRpcClient> _logger;
      
              public FibonacciRpcClient(IRpcClient rpcClient, ILogger<FibonacciRpcClient> logger)
              {
                  _rpcClient = rpcClient;
                  _logger = logger;
              }
      
              public async Task<long> CalculateFibonacciAsync(int number, bool useOptimized = true, TimeSpan? timeout = null)
              {
                  var request = new RpcRequest
                  {
                      Method = "fibonacci.calculate",
                      Timestamp = DateTime.UtcNow
                  }
                  .WithParameter("number", number)
                  .WithParameter("useOptimized", useOptimized);
      
                  timeout ??= TimeSpan.FromSeconds(30);
      
                  try
                  {
                      _logger.LogInformation("Calculating Fibonacci({Number}) with timeout {Timeout}s", 
                          number, timeout.Value.TotalSeconds);
      
                      var response = await _rpcClient.CallAsync<FibonacciResponse>(request, timeout.Value);
                      
                      if (response != null)
                      {
                          _logger.LogInformation(
                              "Fibonacci({Number}) = {Result} (calculated in {Time}ms)", 
                              number, response.Result, response.CalculationTimeMs);
                          
                          return response.Result;
                      }
      
                      throw new InvalidOperationException("Received null response from RPC server");
                  }
                  catch (TimeoutException ex)
                  {
                      _logger.LogError(ex, "Fibonacci calculation timed out for number {Number}", number);
                      throw;
                  }
                  catch (Exception ex)
                  {
                      _logger.LogError(ex, "Error calculating Fibonacci for number {Number}", number);
                      throw;
                  }
              }
      
              public async Task<FibonacciResponse> CalculateFibonacciDetailedAsync(int number, bool useOptimized = true, TimeSpan? timeout = null)
              {
                  var request = new RpcRequest
                  {
                      Method = "fibonacci.calculate",
                      Timestamp = DateTime.UtcNow
                  }
                  .WithParameter("number", number)
                  .WithParameter("useOptimized", useOptimized);
      
                  timeout ??= TimeSpan.FromSeconds(30);
      
                  var response = await _rpcClient.CallAsync<FibonacciResponse>(request, timeout.Value);
                  return response ?? throw new InvalidOperationException("Received null response from RPC server");
              }
          }
      }

      第4步:RPC客戶端API(RpcClient.API)

      Program.cs

      using RpcClient.API.Services;
      using RpcClient.Core.Services;
      using RpcShared.Models;
      using RabbitMQ.Client;
      
      var builder = WebApplication.CreateBuilder(args);
      
      // Add services to the container.
      builder.Services.AddControllers();
      builder.Services.AddEndpointsApiExplorer();
      builder.Services.AddSwaggerGen();
      
      // Configure RabbitMQ
      builder.Services.AddSingleton<IConnectionFactory>(sp =>
      {
          var configuration = sp.GetRequiredService<IConfiguration>();
          return new ConnectionFactory
          {
              HostName = configuration["RabbitMQ:HostName"],
              UserName = configuration["RabbitMQ:UserName"],
              Password = configuration["RabbitMQ:Password"],
              Port = int.Parse(configuration["RabbitMQ:Port"] ?? "5672"),
              VirtualHost = configuration["RabbitMQ:VirtualHost"] ?? "/",
              DispatchConsumersAsync = true
          };
      });
      
      // Register RPC services
      builder.Services.AddSingleton<IRpcClient, RpcClient>();
      builder.Services.AddScoped<FibonacciRpcClient>();
      builder.Services.AddScoped<IMathRpcService, MathRpcService>();
      
      // Add health checks
      builder.Services.AddHealthChecks()
          .AddRabbitMQ(provider => 
          {
              var factory = provider.GetRequiredService<IConfiguration>();
              return factory.CreateConnection();
          });
      
      var app = builder.Build();
      
      // Configure the HTTP request pipeline.
      if (app.Environment.IsDevelopment())
      {
          app.UseSwagger();
          app.UseSwaggerUI();
      }
      
      app.UseHttpsRedirection();
      app.UseAuthorization();
      app.MapControllers();
      app.MapHealthChecks("/health");
      
      app.Run();
      View Code

      Services/IMathRpcService.cs

      using RpcShared.Messages;
      
      namespace RpcClient.API.Services
      {
          public interface IMathRpcService
          {
              Task<long> CalculateFibonacciAsync(int number);
              Task<FibonacciResponse> CalculateFibonacciDetailedAsync(int number);
              Task<bool> HealthCheckAsync();
          }
      }
      View Code

      Services/MathRpcService.cs

      using RpcClient.Core.Services;
      using RpcShared.Messages;
      
      namespace RpcClient.API.Services
      {
          public class MathRpcService : IMathRpcService
          {
              private readonly FibonacciRpcClient _fibonacciClient;
              private readonly ILogger<MathRpcService> _logger;
      
              public MathRpcService(FibonacciRpcClient fibonacciClient, ILogger<MathRpcService> logger)
              {
                  _fibonacciClient = fibonacciClient;
                  _logger = logger;
              }
      
              public async Task<long> CalculateFibonacciAsync(int number)
              {
                  if (number < 0)
                      throw new ArgumentException("Number must be non-negative", nameof(number));
      
                  if (number > 50)
                      throw new ArgumentException("Number too large for demonstration", nameof(number));
      
                  return await _fibonacciClient.CalculateFibonacciAsync(number);
              }
      
              public async Task<FibonacciResponse> CalculateFibonacciDetailedAsync(int number)
              {
                  if (number < 0)
                      throw new ArgumentException("Number must be non-negative", nameof(number));
      
                  if (number > 50)
                      throw new ArgumentException("Number too large for demonstration", nameof(number));
      
                  return await _fibonacciClient.CalculateFibonacciDetailedAsync(number);
              }
      
              public async Task<bool> HealthCheckAsync()
              {
                  try
                  {
                      // 簡單的健康檢查:計算 Fibonacci(1)
                      var result = await _fibonacciClient.CalculateFibonacciAsync(1, timeout: TimeSpan.FromSeconds(5));
                      return result == 1;
                  }
                  catch (Exception ex)
                  {
                      _logger.LogWarning(ex, "RPC health check failed");
                      return false;
                  }
              }
          }
      }
      View Code

      Controllers/MathController.cs

      using Microsoft.AspNetCore.Mvc;
      using RpcClient.API.Services;
      using RpcShared.Messages;
      
      namespace RpcClient.API.Controllers
      {
          [ApiController]
          [Route("api/[controller]")]
          public class MathController : ControllerBase
          {
              private readonly IMathRpcService _mathService;
              private readonly ILogger<MathController> _logger;
      
              public MathController(IMathRpcService mathService, ILogger<MathController> logger)
              {
                  _mathService = mathService;
                  _logger = logger;
              }
      
              [HttpGet("fibonacci/{number}")]
              public async Task<ActionResult<long>> CalculateFibonacci(int number)
              {
                  try
                  {
                      _logger.LogInformation("Calculating Fibonacci({Number}) via RPC", number);
                      var result = await _mathService.CalculateFibonacciAsync(number);
                      return Ok(result);
                  }
                  catch (ArgumentException ex)
                  {
                      return BadRequest(ex.Message);
                  }
                  catch (TimeoutException ex)
                  {
                      _logger.LogWarning(ex, "Fibonacci calculation timed out for number {Number}", number);
                      return StatusCode(408, "Calculation timed out");
                  }
                  catch (Exception ex)
                  {
                      _logger.LogError(ex, "Error calculating Fibonacci for number {Number}", number);
                      return StatusCode(500, "Internal server error");
                  }
              }
      
              [HttpGet("fibonacci/{number}/detailed")]
              public async Task<ActionResult<FibonacciResponse>> CalculateFibonacciDetailed(int number)
              {
                  try
                  {
                      _logger.LogInformation("Calculating Fibonacci({Number}) with details via RPC", number);
                      var result = await _mathService.CalculateFibonacciDetailedAsync(number);
                      return Ok(result);
                  }
                  catch (ArgumentException ex)
                  {
                      return BadRequest(ex.Message);
                  }
                  catch (TimeoutException ex)
                  {
                      _logger.LogWarning(ex, "Fibonacci calculation timed out for number {Number}", number);
                      return StatusCode(408, "Calculation timed out");
                  }
                  catch (Exception ex)
                  {
                      _logger.LogError(ex, "Error calculating Fibonacci for number {Number}", number);
                      return StatusCode(500, "Internal server error");
                  }
              }
      
              [HttpGet("health")]
              public async Task<ActionResult> HealthCheck()
              {
                  var isHealthy = await _mathService.HealthCheckAsync();
                  return isHealthy ? Ok("RPC service is healthy") : StatusCode(503, "RPC service is unavailable");
              }
          }
      }
      View Code

      第5步:RPC服務器(RpcServer.Service)

      Program.cs

      using RpcServer.Service.Services;
      using RpcShared.Models;
      using RabbitMQ.Client;
      
      var builder = Host.CreateApplicationBuilder(args);
      
      builder.Services.AddHostedService<FibonacciRpcServer>();
      
      // Configure RabbitMQ
      builder.Services.AddSingleton<IConnectionFactory>(sp =>
      {
          var configuration = sp.GetRequiredService<IConfiguration>();
          return new ConnectionFactory
          {
              HostName = configuration["RabbitMQ:HostName"],
              UserName = configuration["RabbitMQ:UserName"],
              Password = configuration["RabbitMQ:Password"],
              Port = int.Parse(configuration["RabbitMQ:Port"] ?? "5672"),
              VirtualHost = configuration["RabbitMQ:VirtualHost"] ?? "/",
              DispatchConsumersAsync = true
          };
      });
      
      builder.Services.AddSingleton<FibonacciCalculator>();
      
      var host = builder.Build();
      host.Run();
      View Code

      Services/FibonacciCalculator.cs

      using RpcShared.Messages;
      
      namespace RpcServer.Service.Services
      {
          public class FibonacciCalculator
          {
              private readonly ILogger<FibonacciCalculator> _logger;
              private readonly Dictionary<int, long> _cache = new();
      
              public FibonacciCalculator(ILogger<FibonacciCalculator> logger)
              {
                  _logger = logger;
              }
      
              public FibonacciResponse Calculate(int number, bool useOptimized = true)
              {
                  var startTime = DateTime.UtcNow;
      
                  try
                  {
                      _logger.LogInformation("Calculating Fibonacci({Number}) with optimized: {Optimized}", 
                          number, useOptimized);
      
                      long result;
                      if (useOptimized)
                      {
                          result = CalculateOptimized(number);
                      }
                      else
                      {
                          result = CalculateNaive(number);
                      }
      
                      var calculationTime = (DateTime.UtcNow - startTime).TotalMilliseconds;
      
                      _logger.LogInformation(
                          "Fibonacci({Number}) = {Result} (calculated in {Time}ms)", 
                          number, result, calculationTime);
      
                      return new FibonacciResponse
                      {
                          Result = result,
                          CalculationTimeMs = (long)calculationTime,
                          InputNumber = number
                      };
                  }
                  catch (Exception ex)
                  {
                      _logger.LogError(ex, "Error calculating Fibonacci({Number})", number);
                      throw;
                  }
              }
      
              private long CalculateOptimized(int n)
              {
                  if (n < 0) throw new ArgumentException("Number must be non-negative");
                  if (n <= 1) return n;
      
                  // 檢查緩存
                  if (_cache.TryGetValue(n, out var cachedResult))
                  {
                      _logger.LogDebug("Cache hit for Fibonacci({Number})", n);
                      return cachedResult;
                  }
      
                  long a = 0, b = 1;
      
                  for (int i = 2; i <= n; i++)
                  {
                      var temp = a + b;
                      a = b;
                      b = temp;
      
                      // 緩存中間結果
                      if (i % 10 == 0) // 每10個數緩存一次以減少內存使用
                      {
                          _cache[i] = b;
                      }
                  }
      
                  // 緩存最終結果
                  _cache[n] = b;
                  return b;
              }
      
              private long CalculateNaive(int n)
              {
                  if (n < 0) throw new ArgumentException("Number must be non-negative");
                  if (n <= 1) return n;
      
                  // 模擬計算密集型任務
                  Thread.Sleep(100);
      
                  return CalculateNaive(n - 1) + CalculateNaive(n - 2);
              }
      
              public void ClearCache()
              {
                  _cache.Clear();
                  _logger.LogInformation("Fibonacci cache cleared");
              }
          }
      }
      View Code

      Services/FibonacciRpcServer.cs

      using System.Text;
      using System.Text.Json;
      using Microsoft.Extensions.Options;
      using RabbitMQ.Client;
      using RabbitMQ.Client.Events;
      using RpcShared.Messages;
      using RpcShared.Models;
      
      namespace RpcServer.Service.Services
      {
          public class FibonacciRpcServer : BackgroundService
          {
              private readonly IConnection _connection;
              private readonly IModel _channel;
              private readonly FibonacciCalculator _calculator;
              private readonly ILogger<FibonacciRpcServer> _logger;
              private const string QueueName = "rpc_queue";
      
              public FibonacciRpcServer(
                  IConnectionFactory connectionFactory,
                  FibonacciCalculator calculator,
                  ILogger<FibonacciRpcServer> logger)
              {
                  _calculator = calculator;
                  _logger = logger;
      
                  // 建立連接和通道
                  _connection = connectionFactory.CreateConnection();
                  _channel = _connection.CreateModel();
      
                  InitializeQueue();
              }
      
              private void InitializeQueue()
              {
                  // 聲明RPC請求隊列(持久化)
                  _channel.QueueDeclare(
                      queue: QueueName,
                      durable: true,
                      exclusive: false,
                      autoDelete: false,
                      arguments: null);
      
                  // 設置公平分發,每次只處理一個請求
                  _channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
      
                  _logger.LogInformation("Fibonacci RPC Server initialized and listening on queue: {QueueName}", QueueName);
              }
      
              protected override Task ExecuteAsync(CancellationToken stoppingToken)
              {
                  stoppingToken.ThrowIfCancellationRequested();
      
                  var consumer = new AsyncEventingBasicConsumer(_channel);
                  consumer.Received += async (model, ea) =>
                  {
                      RpcResponse response = null;
                      string correlationId = ea.BasicProperties.CorrelationId;
                      string replyTo = ea.BasicProperties.ReplyTo;
      
                      _logger.LogDebug("Received RPC request with correlation ID: {CorrelationId}", correlationId);
      
                      try
                      {
                          // 處理請求
                          response = await ProcessRequestAsync(ea.Body.ToArray());
                          response.RequestId = correlationId;
                      }
                      catch (Exception ex)
                      {
                          _logger.LogError(ex, "Error processing RPC request {CorrelationId}", correlationId);
                          response = RpcResponse.ErrorResponse(correlationId, ex.Message);
                      }
                      finally
                      {
                          // 發送響應
                          if (!string.IsNullOrEmpty(replyTo))
                          {
                              SendResponse(replyTo, correlationId, response ?? 
                                  RpcResponse.ErrorResponse(correlationId, "Unknown error occurred"));
                          }
      
                          // 確認消息處理完成
                          _channel.BasicAck(ea.DeliveryTag, false);
                      }
                  };
      
                  _channel.BasicConsume(
                      queue: QueueName,
                      autoAck: false, // 手動確認
                      consumer: consumer);
      
                  _logger.LogInformation("Fibonacci RPC Server started successfully");
      
                  return Task.CompletedTask;
              }
      
              private async Task<RpcResponse> ProcessRequestAsync(byte[] body)
              {
                  var startTime = DateTime.UtcNow;
      
                  try
                  {
                      var requestJson = Encoding.UTF8.GetString(body);
                      var request = JsonSerializer.Deserialize<RpcRequest>(requestJson);
      
                      if (request == null)
                      {
                          return RpcResponse.ErrorResponse("unknown", "Invalid request format");
                      }
      
                      _logger.LogInformation("Processing RPC request {RequestId}, Method: {Method}", 
                          request.RequestId, request.Method);
      
                      // 根據方法名路由到不同的處理邏輯
                      object result = request.Method.ToLowerInvariant() switch
                      {
                          "fibonacci.calculate" => ProcessFibonacciRequest(request),
                          "ping" => new { message = "pong", timestamp = DateTime.UtcNow },
                          _ => throw new NotSupportedException($"Method {request.Method} is not supported")
                      };
      
                      var processingTime = (DateTime.UtcNow - startTime).TotalMilliseconds;
      
                      return RpcResponse.SuccessResponse(
                          request.RequestId, 
                          result, 
                          (long)processingTime);
                  }
                  catch (Exception ex)
                  {
                      var processingTime = (DateTime.UtcNow - startTime).TotalMilliseconds;
                      _logger.LogError(ex, "Error processing RPC request");
                      
                      return RpcResponse.ErrorResponse(
                          "unknown", 
                          ex.Message, 
                          (long)processingTime);
                  }
              }
      
              private FibonacciResponse ProcessFibonacciRequest(RpcRequest request)
              {
                  var number = request.GetParameter<int>("number");
                  var useOptimized = request.GetParameter<bool>("useOptimized") ?? true;
      
                  if (number < 0)
                  {
                      throw new ArgumentException("Fibonacci number must be non-negative");
                  }
      
                  // 防止過大的計算消耗資源
                  if (number > 50)
                  {
                      throw new ArgumentException("Number too large for calculation");
                  }
      
                  return _calculator.Calculate(number, useOptimized);
              }
      
              private void SendResponse(string replyTo, string correlationId, RpcResponse response)
              {
                  try
                  {
                      var responseJson = JsonSerializer.Serialize(response);
                      var responseBody = Encoding.UTF8.GetBytes(responseJson);
      
                      var properties = _channel.CreateBasicProperties();
                      properties.CorrelationId = correlationId;
                      properties.Persistent = true;
      
                      _channel.BasicPublish(
                          exchange: "",
                          routingKey: replyTo,
                          basicProperties: properties,
                          body: responseBody);
      
                      _logger.LogDebug("Sent RPC response for correlation ID: {CorrelationId}", correlationId);
                  }
                  catch (Exception ex)
                  {
                      _logger.LogError(ex, "Error sending RPC response for correlation ID: {CorrelationId}", correlationId);
                  }
              }
      
              public override void Dispose()
              {
                  _channel?.Close();
                  _channel?.Dispose();
                  _connection?.Close();
                  _connection?.Dispose();
                  
                  _logger.LogInformation("Fibonacci RPC Server disposed");
                  
                  base.Dispose();
              }
          }
      }
      View Code

      第6步:高級特性 - 帶重試的RPC客戶端

      Services/ResilientRpcClient.cs

      using Microsoft.Extensions.Logging;
      using Polly;
      using Polly.Retry;
      using RpcShared.Models;
      
      namespace RpcClient.Core.Services
      {
          public class ResilientRpcClient : IRpcClient
          {
              private readonly IRpcClient _innerClient;
              private readonly ILogger<ResilientRpcClient> _logger;
              private readonly AsyncRetryPolicy _retryPolicy;
      
              public ResilientRpcClient(IRpcClient innerClient, ILogger<ResilientRpcClient> logger)
              {
                  _innerClient = innerClient;
                  _logger = logger;
      
                  // 配置重試策略
                  _retryPolicy = Policy
                      .Handle<TimeoutException>()
                      .Or<InvalidOperationException>()
                      .WaitAndRetryAsync(
                          retryCount: 3,
                          sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
                          onRetry: (exception, delay, retryCount, context) =>
                          {
                              _logger.LogWarning(
                                  "RPC call failed. Retry {RetryCount} after {Delay}ms. Error: {Error}",
                                  retryCount, delay.TotalMilliseconds, exception.Message);
                          });
              }
      
              public async Task<RpcResponse> CallAsync(RpcRequest request, TimeSpan timeout)
              {
                  return await _retryPolicy.ExecuteAsync(async () =>
                  {
                      return await _innerClient.CallAsync(request, timeout);
                  });
              }
      
              public async Task<TResponse?> CallAsync<TResponse>(RpcRequest request, TimeSpan timeout) where TResponse : class
              {
                  return await _retryPolicy.ExecuteAsync(async () =>
                  {
                      return await _innerClient.CallAsync<TResponse>(request, timeout);
                  });
              }
      
              public void Dispose()
              {
                  _innerClient?.Dispose();
              }
          }
      }
      View Code

      第7步:運行與測試

      1. 啟動服務

        # 終端1:啟動RPC服務器
        cd RpcServer.Service
        dotnet run
        
        # 終端2:啟動RPC客戶端API
        cd RpcClient.API
        dotnet run
      2. 測試API

        # 計算斐波那契數列
        curl -X GET "https://localhost:7000/api/math/fibonacci/10"
        curl -X GET "https://localhost:7000/api/math/fibonacci/20/detailed"
        
        # 健康檢查
        curl -X GET "https://localhost:7000/api/math/health"
      3. 測試錯誤場景

        # 測試超時(設置很小的超時時間)
        # 測試無效輸入
        curl -X GET "https://localhost:7000/api/math/fibonacci/-5"
        curl -X GET "https://localhost:7000/api/math/fibonacci/100"
      4. 觀察日志輸出

        • 客戶端發送請求,生成CorrelationId

        • 服務器接收請求,處理計算

        • 服務器發送響應,使用相同的CorrelationId

        • 客戶端接收響應,匹配CorrelationId

      第8步:性能測試和監控

      創建性能測試控制器

      [ApiController]
      [Route("api/[controller]")]
      public class BenchmarkController : ControllerBase
      {
          private readonly IMathRpcService _mathService;
          private readonly ILogger<BenchmarkController> _logger;
      
          public BenchmarkController(IMathRpcService mathService, ILogger<BenchmarkController> logger)
          {
              _mathService = mathService;
              _logger = logger;
          }
      
          [HttpPost("fibonacci/batch")]
          public async Task<ActionResult> CalculateFibonacciBatch([FromBody] List<int> numbers)
          {
              var results = new List<object>();
              var totalStopwatch = System.Diagnostics.Stopwatch.StartNew();
      
              foreach (var number in numbers)
              {
                  var stopwatch = System.Diagnostics.Stopwatch.StartNew();
                  try
                  {
                      var result = await _mathService.CalculateFibonacciAsync(number);
                      results.Add(new
                      {
                          number,
                          result,
                          success = true,
                          durationMs = stopwatch.ElapsedMilliseconds
                      });
                  }
                  catch (Exception ex)
                  {
                      results.Add(new
                      {
                          number,
                          success = false,
                          error = ex.Message,
                          durationMs = stopwatch.ElapsedMilliseconds
                      });
                  }
              }
      
              return Ok(new
              {
                  totalDurationMs = totalStopwatch.ElapsedMilliseconds,
                  requests = numbers.Count,
                  results
              });
          }
      }

      本章總結

      在這一章中,我們完整實現了RabbitMQ的RPC模式:

      1. RPC核心概念:理解了回調隊列、關聯ID、請求-響應模式。

      2. 客戶端實現:創建了能夠發送請求并異步等待響應的RPC客戶端。

      3. 服務器實現:構建了處理請求并返回響應的RPC服務器。

      4. 錯誤處理:實現了超時控制、異常處理和重試機制。

      5. 性能優化:使用緩存和優化算法提高計算效率。

      6. ** resilience**:通過Polly實現了彈性重試策略。

      RPC模式為微服務架構提供了強大的同步通信能力,結合消息隊列的異步特性,既保持了系統的解耦性,又提供了同步調用的便利性。這種模式特別適合需要等待計算結果的分布式任務。

      posted @ 2025-10-31 23:57  即興隨緣  閱讀(136)  評論(0)    收藏  舉報
      主站蜘蛛池模板: yy111111在线尤物| 国产蜜臀一区二区在线播放| 亚洲国产高清精品线久久| 欧美牲交a欧美牲交aⅴ免费| 国产高清在线男人的天堂| 强奷漂亮少妇高潮麻豆| 成人做爰www网站视频| 亚洲嫩模一区二区三区| 国产伦视频一区二区三区| 色又黄又爽18禁免费网站现观看| 秋霞AV鲁丝片一区二区| 少妇伦子伦精品无吗| 亚洲AⅤ天堂AV天堂无码| 日韩av片无码一区二区三区| 少妇人妻无码专区在线视频| 自拍偷拍第一区二区三区| 深夜av免费在线观看| 亚洲av无在线播放中文| 国产av不卡一区二区| 亚洲码欧洲码一二三四五| 久久综合九色综合久桃花| 日本道播放一区二区三区| 国产精品久久久国产盗摄| 成熟妇女性成熟满足视频| 国产伦精品一区二区三区妓女| 国产精品自在线拍国产手青青机版| 国产一区二区av天堂热| 精品人妻午夜福利一区二区| 久爱无码精品免费视频在线观看| 精品午夜福利在线视在亚洲| 亚洲欧洲精品日韩av| 久热这里只有精品视频六| 少妇特黄a一区二区三区| 亚洲美女被黑人巨大在线播放| 国产精品成人午夜福利| 国产精品第一二三区久久| 猫咪网网站免费观看| 亚洲 欧美 唯美 国产 伦 综合| 精品久久久久久中文字幕202| 性欧洲大肥性欧洲大肥女 | 又爽又黄又无遮掩的免费视频|