<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      【RabbitMQ】與ASP.NET Core集成

      本章目標

      • 掌握在ASP.NET Core中配置和依賴注入RabbitMQ服務。

      • 學習使用IHostedService/BackgroundService實現常駐消費者服務。

      • 實現基于RabbitMQ的請求-響應模式。

      • 構建完整的微服務間異步通信解決方案。

      • 學習配置管理和健康檢查。


      一、理論部分

      1. ASP.NET Core集成模式

      將RabbitMQ集成到ASP.NET Core應用程序時,我們需要考慮幾個關鍵方面:

      • 依賴注入:正確管理連接和通道的生命周期。

      • 托管服務:實現后臺消息消費者。

      • 配置管理:從配置文件讀取RabbitMQ連接設置。

      • 健康檢查:監控RabbitMQ連接狀態。

      • 日志記錄:使用ASP.NET Core的日志系統。

      2. 生命周期管理

      • IConnection:建議注冊為單例,因為創建TCP連接開銷大。

      • IModel:建議注冊為瞬態或作用域,因為通道不是線程安全的。

      • 生產者服務:可以注冊為作用域或瞬態。

      • 消費者服務:通常在托管服務中管理。

      3. 托管服務(Hosted Services)

      ASP.NET Core提供了IHostedService接口和BackgroundService基類,用于實現長時間運行的后臺任務。這是實現RabbitMQ消費者的理想方式。

      4. 微服務架構中的消息模式

      • 異步命令:發送指令但不期待立即響應。

      • 事件通知:廣播狀態變化。

      • 請求-響應:類似RPC,但通過消息中間件。


      二、實操部分:構建訂單處理微服務

      我們將創建一個完整的訂單處理系統,包含:

      • Order.API:接收HTTP訂單請求,發布消息

      • OrderProcessor.BackgroundService:后臺處理訂單

      • 訂單狀態查詢API

      • 健康檢查

      • 配置管理

      第1步:創建項目結構

      # 創建解決方案
      dotnet new sln -n OrderSystem
      
      # 創建Web API項目
      dotnet new webapi -n Order.API
      dotnet new classlib -n Order.Core
      dotnet new classlib -n Order.Infrastructure
      dotnet new classlib -n OrderProcessor.Service
      
      # 添加到解決方案
      dotnet sln add Order.API/Order.API.csproj
      dotnet sln add Order.Core/Order.Core.csproj
      dotnet sln add Order.Infrastructure/Order.Infrastructure.csproj
      dotnet sln add OrderProcessor.Service/OrderProcessor.Service.csproj
      
      # 添加項目引用
      dotnet add Order.API reference Order.Core
      dotnet add Order.API reference Order.Infrastructure
      dotnet add OrderProcessor.Service reference Order.Core
      dotnet add OrderProcessor.Service reference Order.Infrastructure
      dotnet add Order.Infrastructure reference Order.Core
      
      # 添加NuGet包
      cd Order.API
      dotnet add package RabbitMQ.Client
      dotnet add package Microsoft.Extensions.Diagnostics.HealthChecks
      
      cd ../Order.Infrastructure
      dotnet add package RabbitMQ.Client
      dotnet add package Microsoft.Extensions.Configuration
      
      cd ../OrderProcessor.Service
      dotnet add package RabbitMQ.Client

      第2步:定義領域模型(Order.Core)

      Models/Order.cs

      namespace Order.Core.Models
      {
          public class Order
          {
              public string Id { get; set; } = Guid.NewGuid().ToString();
              public string CustomerId { get; set; }
              public string ProductId { get; set; }
              public int Quantity { get; set; }
              public decimal TotalAmount { get; set; }
              public OrderStatus Status { get; set; } = OrderStatus.Pending;
              public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
              public DateTime? ProcessedAt { get; set; }
          }
      
          public enum OrderStatus
          {
              Pending,
              Processing,
              Completed,
              Failed,
              Cancelled
          }
      }

      Messages/OrderMessage.cs

      namespace Order.Core.Messages
      {
          public class OrderMessage
          {
              public string OrderId { get; set; }
              public string CustomerId { get; set; }
              public string ProductId { get; set; }
              public int Quantity { get; set; }
              public decimal TotalAmount { get; set; }
              public string Action { get; set; } // "create", "cancel"
          }
      
          public class OrderStatusMessage
          {
              public string OrderId { get; set; }
              public OrderStatus Status { get; set; }
              public string Message { get; set; }
              public DateTime Timestamp { get; set; } = DateTime.UtcNow;
          }
      }

      第3步:基礎設施層(Order.Infrastructure)

      Services/IRabbitMQConnection.cs

      using RabbitMQ.Client;
      
      namespace Order.Infrastructure.Services
      {
          public interface IRabbitMQConnection : IDisposable
          {
              bool IsConnected { get; }
              IModel CreateModel();
              bool TryConnect();
          }
      }

      Services/RabbitMQConnection.cs

      using System.Net.Sockets;
      using Microsoft.Extensions.Logging;
      using RabbitMQ.Client;
      using RabbitMQ.Client.Events;
      using RabbitMQ.Client.Exceptions;
      
      namespace Order.Infrastructure.Services
      {
          public class RabbitMQConnection : IRabbitMQConnection
          {
              private readonly IConnectionFactory _connectionFactory;
              private readonly ILogger<RabbitMQConnection> _logger;
              private IConnection _connection;
              private bool _disposed;
              
              private readonly object _syncRoot = new object();
      
              public RabbitMQConnection(IConnectionFactory connectionFactory, ILogger<RabbitMQConnection> logger)
              {
                  _connectionFactory = connectionFactory;
                  _logger = logger;
              }
      
              public bool IsConnected => _connection != null && _connection.IsOpen && !_disposed;
      
              public IModel CreateModel()
              {
                  if (!IsConnected)
                  {
                      throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
                  }
      
                  return _connection.CreateModel();
              }
      
              public bool TryConnect()
              {
                  lock (_syncRoot)
                  {
                      if (IsConnected) return true;
      
                      _logger.LogInformation("RabbitMQ Client is trying to connect");
      
                      try
                      {
                          _connection = _connectionFactory.CreateConnection();
                          
                          _connection.ConnectionShutdown += OnConnectionShutdown;
                          _connection.CallbackException += OnCallbackException;
                          _connection.ConnectionBlocked += OnConnectionBlocked;
      
                          _logger.LogInformation("RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events", 
                              _connectionFactory.HostName);
      
                          return true;
                      }
                      catch (BrokerUnreachableException ex)
                      {
                          _logger.LogError(ex, "RabbitMQ connection failed: {Message}", ex.Message);
                          return false;
                      }
                      catch (SocketException ex)
                      {
                          _logger.LogError(ex, "RabbitMQ connection failed: {Message}", ex.Message);
                          return false;
                      }
                  }
              }
      
              private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
              {
                  if (_disposed) return;
      
                  _logger.LogWarning("A RabbitMQ connection is blocked. Reason: {Reason}", e.Reason);
                  
                  // 這里可以實現重連邏輯
                  TryConnect();
              }
      
              private void OnCallbackException(object sender, CallbackExceptionEventArgs e)
              {
                  if (_disposed) return;
      
                  _logger.LogWarning(e.Exception, "A RabbitMQ connection throw exception. Trying to re-connect...");
                  
                  // 這里可以實現重連邏輯
                  TryConnect();
              }
      
              private void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
              {
                  if (_disposed) return;
      
                  _logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect...");
      
                  // 這里可以實現重連邏輯
                  TryConnect();
              }
      
              public void Dispose()
              {
                  if (_disposed) return;
      
                  _disposed = true;
      
                  try
                  {
                      _connection?.Dispose();
                  }
                  catch (IOException ex)
                  {
                      _logger.LogCritical(ex, "Error disposing RabbitMQ connection");
                  }
              }
          }
      }

      Services/IOrderPublisher.cs

      using Order.Core.Messages;
      
      namespace Order.Infrastructure.Services
      {
          public interface IOrderPublisher
          {
              Task PublishOrderCreatedAsync(OrderMessage order);
              Task PublishOrderStatusAsync(OrderStatusMessage status);
          }
      }

      Services/OrderPublisher.cs

      using System.Text;
      using System.Text.Json;
      using Microsoft.Extensions.Logging;
      using Order.Core.Messages;
      using RabbitMQ.Client;
      
      namespace Order.Infrastructure.Services
      {
          public class OrderPublisher : IOrderPublisher
          {
              private readonly IRabbitMQConnection _connection;
              private readonly ILogger<OrderPublisher> _logger;
              private const string ExchangeName = "order.events";
              private const string OrderCreatedRoutingKey = "order.created";
              private const string OrderStatusRoutingKey = "order.status";
      
              public OrderPublisher(IRabbitMQConnection connection, ILogger<OrderPublisher> logger)
              {
                  _connection = connection;
                  _logger = logger;
                  
                  // 確保交換機和隊列存在
                  InitializeInfrastructure();
              }
      
              private void InitializeInfrastructure()
              {
                  using var channel = _connection.CreateModel();
                  
                  // 聲明主題交換機
                  channel.ExchangeDeclare(ExchangeName, ExchangeType.Topic, durable: true);
                  
                  // 聲明訂單創建隊列
                  channel.QueueDeclare("order.created.queue", durable: true, exclusive: false, autoDelete: false);
                  channel.QueueBind("order.created.queue", ExchangeName, OrderCreatedRoutingKey);
                  
                  // 聲明訂單狀態隊列
                  channel.QueueDeclare("order.status.queue", durable: true, exclusive: false, autoDelete: false);
                  channel.QueueBind("order.status.queue", ExchangeName, OrderStatusRoutingKey);
                  
                  _logger.LogInformation("RabbitMQ infrastructure initialized");
              }
      
              public async Task PublishOrderCreatedAsync(OrderMessage order)
              {
                  await PublishMessageAsync(order, OrderCreatedRoutingKey, "OrderCreated");
              }
      
              public async Task PublishOrderStatusAsync(OrderStatusMessage status)
              {
                  await PublishMessageAsync(status, OrderStatusRoutingKey, "OrderStatus");
              }
      
              private async Task PublishMessageAsync<T>(T message, string routingKey, string messageType)
              {
                  if (!_connection.IsConnected)
                  {
                      _connection.TryConnect();
                  }
      
                  using var channel = _connection.CreateModel();
                  
                  var json = JsonSerializer.Serialize(message);
                  var body = Encoding.UTF8.GetBytes(json);
      
                  var properties = channel.CreateBasicProperties();
                  properties.Persistent = true;
                  properties.ContentType = "application/json";
                  properties.Type = messageType;
      
                  try
                  {
                      channel.BasicPublish(
                          exchange: ExchangeName,
                          routingKey: routingKey,
                          mandatory: true,
                          basicProperties: properties,
                          body: body);
      
                      _logger.LogInformation("Published {MessageType} message for Order {OrderId}", 
                          messageType, GetOrderId(message));
                  }
                  catch (Exception ex)
                  {
                      _logger.LogError(ex, "Error publishing {MessageType} message for Order {OrderId}", 
                          messageType, GetOrderId(message));
                      throw;
                  }
      
                  await Task.CompletedTask;
              }
      
              private static string GetOrderId<T>(T message)
              {
                  return message switch
                  {
                      OrderMessage order => order.OrderId,
                      OrderStatusMessage status => status.OrderId,
                      _ => "unknown"
                  };
              }
          }
      }

      第4步:Order.API項目配置

      appsettings.json

      {
        "RabbitMQ": {
          "HostName": "localhost",
          "UserName": "myuser",
          "Password": "mypassword",
          "Port": 5672,
          "VirtualHost": "/"
        },
        "Logging": {
          "LogLevel": {
            "Default": "Information",
            "Microsoft.AspNetCore": "Warning"
          }
        },
        "AllowedHosts": "*"
      }
      
      

      Program.cs

      using Order.API.Controllers;
      using Order.API.Services;
      using Order.Core.Models;
      using Order.Infrastructure.Services;
      using RabbitMQ.Client;
      
      var builder = WebApplication.CreateBuilder(args);
      
      // Add services to the container.
      builder.Services.AddControllers();
      builder.Services.AddEndpointsApiExplorer();
      builder.Services.AddSwaggerGen();
      
      // Configure RabbitMQ
      builder.Services.AddSingleton<IConnectionFactory>(sp =>
      {
          var configuration = sp.GetRequiredService<IConfiguration>();
          return new ConnectionFactory
          {
              HostName = configuration["RabbitMQ:HostName"],
              UserName = configuration["RabbitMQ:UserName"],
              Password = configuration["RabbitMQ:Password"],
              Port = int.Parse(configuration["RabbitMQ:Port"] ?? "5672"),
              VirtualHost = configuration["RabbitMQ:VirtualHost"] ?? "/",
              DispatchConsumersAsync = true
          };
      });
      
      // Register RabbitMQ services
      builder.Services.AddSingleton<IRabbitMQConnection, RabbitMQConnection>();
      builder.Services.AddScoped<IOrderPublisher, OrderPublisher>();
      builder.Services.AddScoped<IOrderService, OrderService>();
      
      // Add Health Checks
      builder.Services.AddHealthChecks()
          .AddRabbitMQ(provider => 
          {
              var factory = provider.GetRequiredService<IConnectionFactory>();
              return factory.CreateConnection();
          }, name: "rabbitmq");
      
      // Add hosted service for status updates consumer
      builder.Services.AddHostedService<OrderStatusConsumerService>();
      
      var app = builder.Build();
      
      // Configure the HTTP request pipeline.
      if (app.Environment.IsDevelopment())
      {
          app.UseSwagger();
          app.UseSwaggerUI();
      }
      
      app.UseHttpsRedirection();
      app.UseAuthorization();
      app.MapControllers();
      
      // Add health check endpoint
      app.MapHealthChecks("/health");
      
      app.Run();

      Services/IOrderService.cs

      using Order.Core.Models;
      
      namespace Order.API.Services
      {
          public interface IOrderService
          {
              Task<Order> CreateOrderAsync(string customerId, string productId, int quantity, decimal unitPrice);
              Task<Order?> GetOrderAsync(string orderId);
              Task UpdateOrderStatusAsync(string orderId, OrderStatus status);
          }
      }

      Services/OrderService.cs

      using Order.Core.Messages;
      using Order.Core.Models;
      using Order.Infrastructure.Services;
      
      namespace Order.API.Services
      {
          public class OrderService : IOrderService
          {
              private readonly IOrderPublisher _orderPublisher;
              private readonly ILogger<OrderService> _logger;
              
              // 內存存儲用于演示(生產環境應該用數據庫)
              private static readonly Dictionary<string, Order> _orders = new();
      
              public OrderService(IOrderPublisher orderPublisher, ILogger<OrderService> logger)
              {
                  _orderPublisher = orderPublisher;
                  _logger = logger;
              }
      
              public async Task<Order> CreateOrderAsync(string customerId, string productId, int quantity, decimal unitPrice)
              {
                  var order = new Order
                  {
                      CustomerId = customerId,
                      ProductId = productId,
                      Quantity = quantity,
                      TotalAmount = quantity * unitPrice,
                      Status = OrderStatus.Pending
                  };
      
                  // 保存到內存
                  _orders[order.Id] = order;
      
                  // 發布訂單創建事件
                  var orderMessage = new OrderMessage
                  {
                      OrderId = order.Id,
                      CustomerId = order.CustomerId,
                      ProductId = order.ProductId,
                      Quantity = order.Quantity,
                      TotalAmount = order.TotalAmount,
                      Action = "create"
                  };
      
                  await _orderPublisher.PublishOrderCreatedAsync(orderMessage);
                  
                  _logger.LogInformation("Order {OrderId} created and published", order.Id);
      
                  return order;
              }
      
              public Task<Order?> GetOrderAsync(string orderId)
              {
                  _orders.TryGetValue(orderId, out var order);
                  return Task.FromResult(order);
              }
      
              public async Task UpdateOrderStatusAsync(string orderId, OrderStatus status)
              {
                  if (_orders.TryGetValue(orderId, out var order))
                  {
                      order.Status = status;
                      order.ProcessedAt = DateTime.UtcNow;
                      
                      // 發布狀態更新
                      var statusMessage = new OrderStatusMessage
                      {
                          OrderId = orderId,
                          Status = status,
                          Message = $"Order {status.ToString().ToLower()}"
                      };
      
                      await _orderPublisher.PublishOrderStatusAsync(statusMessage);
                      
                      _logger.LogInformation("Order {OrderId} status updated to {Status}", orderId, status);
                  }
              }
          }
      }
      View Code

      Services/OrderStatusConsumerService.cs

      using System.Text;
      using System.Text.Json;
      using Microsoft.Extensions.Options;
      using Order.API.Services;
      using Order.Core.Messages;
      using Order.Infrastructure.Services;
      using RabbitMQ.Client;
      using RabbitMQ.Client.Events;
      
      namespace Order.API.Services
      {
          public class OrderStatusConsumerService : BackgroundService
          {
              private readonly IRabbitMQConnection _connection;
              private readonly IServiceProvider _serviceProvider;
              private readonly ILogger<OrderStatusConsumerService> _logger;
              private IModel _channel;
              private const string QueueName = "order.status.queue";
      
              public OrderStatusConsumerService(
                  IRabbitMQConnection connection,
                  IServiceProvider serviceProvider,
                  ILogger<OrderStatusConsumerService> logger)
              {
                  _connection = connection;
                  _serviceProvider = serviceProvider;
                  _logger = logger;
                  InitializeChannel();
              }
      
              private void InitializeChannel()
              {
                  if (!_connection.IsConnected)
                  {
                      _connection.TryConnect();
                  }
      
                  _channel = _connection.CreateModel();
                  
                  // 確保隊列存在(已經在Publisher中聲明,這里做雙重保險)
                  _channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false);
                  
                  _channel.BasicQos(0, 1, false); // 公平分發
                  
                  _logger.LogInformation("OrderStatusConsumerService channel initialized");
              }
      
              protected override async Task ExecuteAsync(CancellationToken stoppingToken)
              {
                  stoppingToken.ThrowIfCancellationRequested();
      
                  var consumer = new AsyncEventingBasicConsumer(_channel);
                  consumer.Received += async (model, ea) =>
                  {
                      var body = ea.Body.ToArray();
                      var message = Encoding.UTF8.GetString(body);
      
                      try
                      {
                          await ProcessMessageAsync(message);
                          _channel.BasicAck(ea.DeliveryTag, false);
                      }
                      catch (Exception ex)
                      {
                          _logger.LogError(ex, "Error processing message: {Message}", message);
                          _channel.BasicNack(ea.DeliveryTag, false, false); // 不重新入隊
                      }
                  };
      
                  _channel.BasicConsume(QueueName, false, consumer);
      
                  _logger.LogInformation("OrderStatusConsumerService started consuming");
      
                  await Task.CompletedTask;
              }
      
              private async Task ProcessMessageAsync(string message)
              {
                  using var scope = _serviceProvider.CreateScope();
                  var orderService = scope.ServiceProvider.GetRequiredService<IOrderService>();
      
                  try
                  {
                      var statusMessage = JsonSerializer.Deserialize<OrderStatusMessage>(message);
                      if (statusMessage != null)
                      {
                          // 這里可以處理狀態更新,比如更新數據庫、發送通知等
                          _logger.LogInformation("Received order status update: {OrderId} -> {Status}", 
                              statusMessage.OrderId, statusMessage.Status);
                          
                          // 在實際應用中,這里可能會更新數據庫中的訂單狀態
                          // await orderService.UpdateOrderStatusAsync(statusMessage.OrderId, statusMessage.Status);
                      }
                  }
                  catch (JsonException ex)
                  {
                      _logger.LogError(ex, "Error deserializing message: {Message}", message);
                      throw;
                  }
              }
      
              public override void Dispose()
              {
                  _channel?.Close();
                  _channel?.Dispose();
                  base.Dispose();
              }
          }
      }
      View Code

      Controllers/OrdersController.cs

      using Microsoft.AspNetCore.Mvc;
      using Order.API.Services;
      using Order.Core.Models;
      
      namespace Order.API.Controllers
      {
          [ApiController]
          [Route("api/[controller]")]
          public class OrdersController : ControllerBase
          {
              private readonly IOrderService _orderService;
              private readonly ILogger<OrdersController> _logger;
      
              public OrdersController(IOrderService orderService, ILogger<OrdersController> logger)
              {
                  _orderService = orderService;
                  _logger = logger;
              }
      
              [HttpPost]
              public async Task<ActionResult<Order>> CreateOrder([FromBody] CreateOrderRequest request)
              {
                  try
                  {
                      var order = await _orderService.CreateOrderAsync(
                          request.CustomerId, 
                          request.ProductId, 
                          request.Quantity, 
                          request.UnitPrice);
      
                      return Ok(order);
                  }
                  catch (Exception ex)
                  {
                      _logger.LogError(ex, "Error creating order");
                      return StatusCode(500, "Error creating order");
                  }
              }
      
              [HttpGet("{orderId}")]
              public async Task<ActionResult<Order>> GetOrder(string orderId)
              {
                  var order = await _orderService.GetOrderAsync(orderId);
                  if (order == null)
                  {
                      return NotFound();
                  }
                  return Ok(order);
              }
      
              [HttpGet]
              public ActionResult<IEnumerable<Order>> GetOrders()
              {
                  // 這里只是演示,實際應該從數據庫獲取
                  return Ok(new List<Order>());
              }
          }
      
          public class CreateOrderRequest
          {
              public string CustomerId { get; set; }
              public string ProductId { get; set; }
              public int Quantity { get; set; }
              public decimal UnitPrice { get; set; }
          }
      }
      View Code

      第5步:訂單處理器服務(OrderProcessor.Service)

      Program.cs

      using Order.Core.Messages;
      using Order.Infrastructure.Services;
      using OrderProcessor.Service.Services;
      using RabbitMQ.Client;
      
      var builder = Host.CreateApplicationBuilder(args);
      
      builder.Services.AddHostedService<OrderProcessorService>();
      
      // Configure RabbitMQ
      builder.Services.AddSingleton<IConnectionFactory>(sp =>
      {
          var configuration = sp.GetRequiredService<IConfiguration>();
          return new ConnectionFactory
          {
              HostName = configuration["RabbitMQ:HostName"],
              UserName = configuration["RabbitMQ:UserName"],
              Password = configuration["RabbitMQ:Password"],
              Port = int.Parse(configuration["RabbitMQ:Port"] ?? "5672"),
              VirtualHost = configuration["RabbitMQ:VirtualHost"] ?? "/",
              DispatchConsumersAsync = true
          };
      });
      
      builder.Services.AddSingleton<IRabbitMQConnection, RabbitMQConnection>();
      builder.Services.AddScoped<IOrderPublisher, OrderPublisher>();
      
      builder.Services.AddLogging();
      
      var host = builder.Build();
      host.Run();
      View Code

      Services/OrderProcessorService.cs

      using System.Text;
      using System.Text.Json;
      using Microsoft.Extensions.Logging;
      using Order.Core.Messages;
      using Order.Infrastructure.Services;
      using RabbitMQ.Client;
      using RabbitMQ.Client.Events;
      
      namespace OrderProcessor.Service.Services
      {
          public class OrderProcessorService : BackgroundService
          {
              private readonly IRabbitMQConnection _connection;
              private readonly IOrderPublisher _orderPublisher;
              private readonly ILogger<OrderProcessorService> _logger;
              private IModel _channel;
              private const string QueueName = "order.created.queue";
      
              public OrderProcessorService(
                  IRabbitMQConnection connection,
                  IOrderPublisher orderPublisher,
                  ILogger<OrderProcessorService> logger)
              {
                  _connection = connection;
                  _orderPublisher = orderPublisher;
                  _logger = logger;
                  InitializeChannel();
              }
      
              private void InitializeChannel()
              {
                  if (!_connection.IsConnected)
                  {
                      _connection.TryConnect();
                  }
      
                  _channel = _connection.CreateModel();
                  _channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false);
                  _channel.BasicQos(0, 1, false);
                  
                  _logger.LogInformation("OrderProcessorService channel initialized");
              }
      
              protected override async Task ExecuteAsync(CancellationToken stoppingToken)
              {
                  stoppingToken.ThrowIfCancellationRequested();
      
                  var consumer = new AsyncEventingBasicConsumer(_channel);
                  consumer.Received += async (model, ea) =>
                  {
                      var body = ea.Body.ToArray();
                      var message = Encoding.UTF8.GetString(body);
      
                      try
                      {
                          await ProcessOrderAsync(message);
                          _channel.BasicAck(ea.DeliveryTag, false);
                      }
                      catch (Exception ex)
                      {
                          _logger.LogError(ex, "Error processing order: {Message}", message);
                          _channel.BasicNack(ea.DeliveryTag, false, false);
                      }
                  };
      
                  _channel.BasicConsume(QueueName, false, consumer);
      
                  _logger.LogInformation("OrderProcessorService started consuming orders");
      
                  await Task.CompletedTask;
              }
      
              private async Task ProcessOrderAsync(string message)
              {
                  try
                  {
                      var orderMessage = JsonSerializer.Deserialize<OrderMessage>(message);
                      if (orderMessage == null)
                      {
                          _logger.LogWarning("Received invalid order message: {Message}", message);
                          return;
                      }
      
                      _logger.LogInformation("Processing order {OrderId} for customer {CustomerId}", 
                          orderMessage.OrderId, orderMessage.CustomerId);
      
                      // 模擬訂單處理邏輯
                      await ProcessOrderBusinessLogic(orderMessage);
      
                      // 發布處理完成狀態
                      var statusMessage = new OrderStatusMessage
                      {
                          OrderId = orderMessage.OrderId,
                          Status = Order.Core.Models.OrderStatus.Completed,
                          Message = "Order processed successfully"
                      };
      
                      await _orderPublisher.PublishOrderStatusAsync(statusMessage);
                      
                      _logger.LogInformation("Order {OrderId} processed successfully", orderMessage.OrderId);
                  }
                  catch (JsonException ex)
                  {
                      _logger.LogError(ex, "Error deserializing order message: {Message}", message);
                      throw;
                  }
              }
      
              private async Task ProcessOrderBusinessLogic(OrderMessage orderMessage)
              {
                  // 模擬復雜的業務邏輯處理
                  _logger.LogInformation("Starting business logic for order {OrderId}", orderMessage.OrderId);
                  
                  // 模擬處理時間
                  var random = new Random();
                  var processingTime = random.Next(2000, 5000);
                  await Task.Delay(processingTime);
                  
                  // 模擬10%的失敗率
                  if (random.Next(0, 10) == 0)
                  {
                      throw new Exception("Simulated business logic failure");
                  }
                  
                  _logger.LogInformation("Business logic completed for order {OrderId}", orderMessage.OrderId);
              }
      
              public override void Dispose()
              {
                  _channel?.Close();
                  _channel?.Dispose();
                  base.Dispose();
              }
          }
      }
      View Code

      第6步:運行與測試

      1. 啟動服務

        # 終端1:啟動Order.API
        cd Order.API
        dotnet run
        
        # 終端2:啟動OrderProcessor.Service
        cd OrderProcessor.Service
        dotnet run
      2. 測試API

        # 創建訂單
        curl -X POST "https://localhost:7000/api/orders" \
             -H "Content-Type: application/json" \
             -d '{
               "customerId": "customer-123",
               "productId": "product-456", 
               "quantity": 2,
               "unitPrice": 29.99
             }'
        
        # 查詢訂單狀態
        curl "https://localhost:7000/api/orders/{orderId}"
      3. 測試健康檢查

        GET https://localhost:7000/health
      4. 觀察日志輸出

        • Order.API:接收HTTP請求,發布訂單創建消息

        • OrderProcessor.Service:消費訂單消息,處理業務邏輯,發布狀態更新

        • Order.API:消費狀態更新消息

      5. 測試錯誤場景

        • 停止RabbitMQ服務,觀察重連機制

        • 停止OrderProcessor.Service,觀察消息堆積

        • 重啟服務,觀察消息恢復處理

      第7步:高級特性 - 配置重試和 resilience

      在Order.Infrastructure中添加Polly支持:

      // 添加NuGet包
      dotnet add package Polly
      dotnet add package Microsoft.Extensions.Http.Polly
      
      // 在Program.cs中添加重試策略
      builder.Services.AddHttpClient("retry-client")
          .AddTransientHttpErrorPolicy(policy => 
              policy.WaitAndRetryAsync(3, retryAttempt => 
                  TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))));
      
      

      本章總結

      在這一章中,我們成功地將RabbitMQ集成到ASP.NET Core應用程序中,構建了一個完整的微服務系統:

      1. 依賴注入配置:正確管理RabbitMQ連接和通道的生命周期。

      2. 托管服務:使用BackgroundService實現長時間運行的消費者服務。

      3. 領域驅動設計:采用分層架構,分離關注點。

      4. 消息序列化:使用JSON序列化消息體。

      5. 健康檢查:集成RabbitMQ健康監控。

      6. 錯誤處理:實現完善的錯誤處理和日志記錄。

      7. 配置管理:從配置文件讀取連接字符串。

      這個架構為構建生產級的微服務系統提供了堅實的基礎。在下一章,我們將學習RabbitMQ的RPC模式。

      posted @ 2025-10-28 23:44  即興隨緣  閱讀(320)  評論(4)    收藏  舉報
      主站蜘蛛池模板: 中文字幕人妻丝袜美腿乱 | 国产精品第一页一区二区| 亚洲中文字幕国产精品| 免费观看全黄做爰大片| free性开放小少妇| 自拍偷拍第一区二区三区| 五月综合婷婷开心综合婷婷| 人妻av无码系列一区二区三区| 亚洲国产无套无码av电影| 午夜精品亚洲一区二区三区| 中文字幕免费不卡二区| 久久婷婷成人综合色综合| 日韩精品一区二区在线视| AV秘 无码一区二| 成人网站免费在线观看| 久久精品国产亚洲av天海翼| 沧源| 勃利县| 日韩一区精品视频一区二区| 国产蜜臀视频一区二区三区| 富蕴县| 国产成人精品一区二三区| 国日韩精品一区二区三区| 在线免费播放av观看| 人妻日韩人妻中文字幕| 久久精品亚洲国产综合色| 欧美人成精品网站播放| 日韩在线视频网| 五月天激情国产综合婷婷婷| 久久免费观看归女高潮特黄| 97久久综合亚洲色hezyo| 国产啪视频免费观看视频| 最近中文字幕国产精选| 国产一区二区亚洲精品| 97av麻豆蜜桃一区二区| 中文字幕日韩一区二区不卡 | 另类专区一区二区三区| 少妇av一区二区三区无码| 国产AV巨作丝袜秘书| 精品国产一区二区三区国产区| 国产成人精品无人区一区|