<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      分布式事務 | 基于MassTransit的StateMachine實現Saga編排式分布式事務

      什么是狀態機

      狀態機作為一種程序開發范例,在實際的應用開發中有很多的應用場景,其中.NET 中的async/await 的核心底層實現就是基于狀態機機制。狀態機分為兩種:有限狀態機和無限狀態機,本文介紹的就是有限狀態機,有限狀態機在任何時候都可以準確地處于有限狀態中的一種,其可以根據一些輸入從一個狀態轉換到另一個狀態。一個有限狀態機是由其狀態列表、初始狀態和觸發每個轉換的輸入來定義的。如下圖展示的就是一個閘機的狀態機示意圖:

      從上圖可以看出,狀態機主要有以下核心概念:

      1. State:狀態,閘機有已開啟(opened)和已關閉(closed)狀態。
      2. Transition:轉移,即閘機從一個狀態轉移到另一個狀態的過程。
      3. Transition Condition:轉移條件,也可理解為事件,即閘機在某一狀態下只有觸發了某個轉移條件,才會執行狀態轉移。比如,閘機處于已關閉狀態時,只有接收到開啟事件才會執行轉移動作,進而轉移到開啟狀態。
      4. Action:動作,即完成狀態轉移要執行的動作。比如要從關閉狀態轉移到開啟狀態,則需要執行開閘動作。

      在.NET中,dotnet-state-machine/statelessMassTransit都提供了開箱即用的狀態機實現。本文將重點介紹MassTransit中的狀態機在Saga 模式中的應用。

      MassTransit StateMachine

      在MassTransit 中MassTransitStateMachine就是狀態機的具體抽象,可以用其編排一系列事件來實現狀態的流轉,也可以用來實現Saga模式的分布式事務。并支持與EF Core和Dapper集成將狀態持久化到關系型數據庫,也支持將狀態持久化到MongoDB、Redis等數據庫。是以簡單的下單流程:創建訂單->扣減庫存->支付訂單舉例而言,其示意圖如下所示。

      基于狀態機實現編排式Saga事務

      那具體如何使用MassTransitStateMachine來應用編排式Saga 模式呢,接下來就來創建解決方案來實現以上下單流程示例。依次創建以下項目,除共享類庫項目外,均安裝MassTransitMassTransit.RabbitMQNuGet包。

      項目 項目名 項目類型
      訂單服務 MassTransit.SmDemo.OrderService ASP.NET Core Web API
      庫存服務 MassTransit.SmDemo.InventoryService Worker Service
      支付服務 MassTransit.SmDemo.PaymentService Worker Service
      共享類庫 MassTransit.SmDemo.Shared Class Library

      三個服務都添加擴展類MassTransitServiceExtensions,并在Program.cs類中調用services.AddMassTransitWithRabbitMq();注冊服務。

      using System.Reflection;
      using MassTransit.CourierDemo.Shared.Models;
      
      namespace MassTransit.CourierDemo.InventoryService;
      
      public static class MassTransitServiceExtensions
      {
          public static IServiceCollection AddMassTransitWithRabbitMq(this IServiceCollection services)
          {
              return services.AddMassTransit(x =>
              {
                  x.SetKebabCaseEndpointNameFormatter();
      
                  // By default, sagas are in-memory, but should be changed to a durable
                  // saga repository.
                  x.SetInMemorySagaRepositoryProvider();
      
                  var entryAssembly = Assembly.GetEntryAssembly();
                  x.AddConsumers(entryAssembly);
                  x.AddSagaStateMachines(entryAssembly);
                  x.AddSagas(entryAssembly);
                  x.AddActivities(entryAssembly);
                  x.UsingRabbitMq((context, busConfig) =>
                  {
                      busConfig.Host(
                          host: "localhost",
                          port: 5672,
                          virtualHost: "masstransit",
                          configure: hostConfig =>
                          {
                              hostConfig.Username("guest");
                              hostConfig.Password("guest");
                          });
      
                      busConfig.ConfigureEndpoints(context);
                  });
              });
          }
      }
      

      訂單服務

      訂單服務作為下單流程中的核心服務,主要職責包含接收創建訂單請求和訂單狀態機的實現。先來定義OrderController如下:

      namespace MassTransit.SmDemo.OrderService.Controllers;
      [ApiController]
      [Route("[controller]")]
      public class OrderController : ControllerBase
      {
          private readonly IBus _bus;
          public OrderController(IBus bus)
          {
              _bus = bus;
          }
      
          [HttpPost]
          public async Task<IActionResult> CreateOrder(CreateOrderDto createOrderDto)
          {
              await _bus.Publish<ICreateOrderCommand>(new
              {
                  createOrderDto.CustomerId,
                  createOrderDto.ShoppingCartItems
              });
              return Ok();
          }
      }
      

      緊接著,訂閱ICreateOrderCommand,執行訂單創建邏輯,訂單創建完畢后會發布ICreateOrderSucceed事件。

      public class CreateOrderConsumer : IConsumer<ICreateOrderCommand>
      {
          private readonly ILogger<CreateOrderConsumer> _logger;
      
          public CreateOrderConsumer(ILogger<CreateOrderConsumer> logger)
          {
              _logger = logger;
          }
          public async Task Consume(ConsumeContext<ICreateOrderCommand> context)
          {
              var shoppingItems =
                  context.Message.ShoppingCartItems.Select(item => new ShoppingCartItem(item.SkuId, item.Price, item.Qty));
              var order = new Order(context.Message.CustomerId).NewOrder(shoppingItems.ToArray());
              await OrderRepository.Insert(order);
              
              _logger.LogInformation($"Order {order.OrderId} created successfully");
              await context.Publish<ICreateOrderSucceed>(new
              {
                  order.OrderId,
                  order.OrderItems
              });
          }
      }
      

      最后來實現訂單狀態機,主要包含以下幾步:

      1. 定義狀態機狀態: 一個狀態機從啟動到結束可能會經歷各種異常,包括程序異常或物理故障,為確保狀態機能從異常中恢復,因此必須保存狀態機的狀態。本例中,定義OrderState以保存狀態機實例狀態數據:
      using MassTransit.SmDemo.OrderService.Domains;
      
      namespace MassTransit.SmDemo.OrderService;
      
      public class OrderState : SagaStateMachineInstance
      {
          public Guid CorrelationId { get; set; }
          public string CurrentState { get; set; }
          public Guid OrderId { get; set; }
          public decimal Amount { get; set; }
          public List<OrderItem> OrderItems { get; set; }
      }
      
      1. 定義狀態機:直接繼承自MassTransitStateMachine并同時指定狀態實例即可:
      namespace MassTransit.SmDemo.OrderService;
      
      public class OrderStateMachine : MassTransitStateMachine<OrderState>
      {
      }
      
      1. 注冊狀態機:這里指定內存持久化方式來持久化狀態,也可指定諸如MongoDb、MySQL等數據庫進行狀態持久化:
      return services.AddMassTransit(x =>
      {
          //...
          x.AddSagaStateMachine<OrderStateMachine, OrderState>()
              .InMemoryRepository();
      }
      
      1. 定義狀態列表:即狀態機涉及到的系列狀態,并通過State類型定義,本例中為:
        1. 已創建:public State Created { get; private set; }
        2. 庫存已扣減:public State InventoryDeducted { get; private set; }
        3. 已支付:public State Paid { get; private set; }
        4. 已取消:public State Canceled { get; private set; }
      2. 定義轉移條件:即推動狀態流轉的事件,通過Event<T>類型定義,本例涉及有:
        1. 訂單成功創建事件:public Event<ICreateOrderSucceed> OrderCreated {get; private set;}
        2. 庫存扣減成功事件:public Event<IDeduceInventorySucceed> DeduceInventorySucceed {get; private set;}
        3. 庫存扣減失敗事件:public Event<IDeduceInventoryFailed> DeduceInventoryFailed {get; private set;}
        4. 訂單支付成功事件:public Event<IPayOrderSucceed> PayOrderSucceed {get; private set;}
        5. 訂單支付失敗事件:public Event<IPayOrderFailed> PayOrderFailed {get; private set;}
        6. 庫存已返還事件:public Event<IReturnInventorySucceed> ReturnInventorySucceed { get; private set; }
        7. 訂單取消事件:public Event<ICancelOrderSucceed> OrderCanceled { get; private set; }
      3. 定義關聯關系:由于每個事件都是孤立的,但相關聯的事件終會作用到某個具體的狀態機實例上,如何關聯事件以推動狀態機的轉移呢?配置關聯Id。以下就是將事件消息中的傳遞的OrderId作為關聯ID。
        1. Event(() => OrderCreated, x => x.CorrelateById(m => m.Message.OrderId));
        2. Event(() => DeduceInventorySucceed, x => x.CorrelateById(m => m.Message.OrderId));
        3. Event(() => DeduceInventoryFailed, x => x.CorrelateById(m => m.Message.OrderId));
        4. Event(() => PayOrderSucceed, x => x.CorrelateById(m => m.Message.OrderId));
      4. 定義狀態轉移:即狀態在什么條件下做怎樣的動作完成狀態的轉移,本例中涉及的正向狀態轉移有:

      (1) 初始狀態->已創建:觸發條件為OrderCreated事件,同時要發送IDeduceInventoryCommand推動庫存服務執行庫存扣減。

      Initially(
          When(OrderCreated)
              .Then(context =>
              {
                  context.Saga.OrderId = context.Message.OrderId;
                  context.Saga.OrderItems = context.Message.OrderItems;
                  context.Saga.Amount = context.Message.OrderItems.Sum(x => x.Price * x.Qty);
              })
              .PublishAsync(context => context.Init<IDeduceInventoryCommand>(new
              {
                  context.Saga.OrderId,
                  DeduceInventoryItems =
                      context.Saga.OrderItems.Select(x => new DeduceInventoryItem(x.SkuId, x.Qty)).ToList()
              }))
              .TransitionTo(Created));
      

      (2) 已創建-> 庫存已扣減:觸發條件為DeduceInventorySucceed事件,同時要發送IPayOrderCommand推動支付服務執行訂單支付。

      During(Created,
          When(DeduceInventorySucceed)
              .Then(context =>
              {
                  context.Publish<IPayOrderCommand>(new
                  {
                      context.Saga.OrderId,
                      context.Saga.Amount
                  });
              }).TransitionTo(InventoryDeducted),
          When(DeduceInventoryFailed).Then(context =>
          {
              context.Publish<ICancelOrderCommand>(new
              {
                  context.Saga.OrderId
              });
          })
      );
      

      (3) 庫存已扣減->已支付:觸發條件為PayOrderSucceed事件,轉移到已支付后,流程結束。

      During(InventoryDeducted,
          When(PayOrderFailed).Then(context =>
          {
              context.Publish<IReturnInventoryCommand>(new
              {
                  context.Message.OrderId,
                  ReturnInventoryItems =
                      context.Saga.OrderItems.Select(x => new ReturnInventoryItem(x.SkuId, x.Qty)).ToList()
              });
          }),
          When(PayOrderSucceed).TransitionTo(Paid).Then(context => context.SetCompleted()));
      

      最終完整版的OrderStateMachine如下所示:

      using MassTransit.SmDemo.OrderService.Events;
      using MassTransit.SmDemo.Shared.Contracts;
      
      namespace MassTransit.SmDemo.OrderService;
      
      public class OrderStateMachine : MassTransitStateMachine<OrderState>
      {
          public State Created { get; private set; }
          public State InventoryDeducted { get; private set; }
          public State Paid { get; private set; }
          public State Canceled { get; private set; }
      
          public Event<ICreateOrderSucceed> OrderCreated { get; private set; }
          public Event<IDeduceInventorySucceed> DeduceInventorySucceed { get; private set; }
          public Event<IDeduceInventoryFailed> DeduceInventoryFailed { get; private set; }
          public Event<ICancelOrderSucceed> OrderCanceled { get; private set; }
          public Event<IPayOrderSucceed> PayOrderSucceed { get; private set; }
          public Event<IPayOrderFailed> PayOrderFailed { get; private set; }
          public Event<IReturnInventorySucceed> ReturnInventorySucceed { get; private set; }
          public Event<IOrderStateRequest> OrderStateRequested { get; private set; }
          
      	public OrderStateMachine()
          {
              Event(() => OrderCreated, x => x.CorrelateById(m => m.Message.OrderId));
              Event(() => DeduceInventorySucceed, x => x.CorrelateById(m => m.Message.OrderId));
              Event(() => DeduceInventoryFailed, x => x.CorrelateById(m => m.Message.OrderId));
              Event(() => ReturnInventorySucceed, x => x.CorrelateById(m => m.Message.OrderId));
              Event(() => PayOrderSucceed, x => x.CorrelateById(m => m.Message.OrderId));
              Event(() => PayOrderFailed, x => x.CorrelateById(m => m.Message.OrderId));
              Event(() => OrderCanceled, x => x.CorrelateById(m => m.Message.OrderId));
              Event(() => OrderStateRequested, x =>
              {
                  x.CorrelateById(m => m.Message.OrderId);
                  x.OnMissingInstance(m =>
                  {
                      return m.ExecuteAsync(x => x.RespondAsync<IOrderNotFoundOrCompleted>(new { x.Message.OrderId }));
                  });
              });
      
              InstanceState(x => x.CurrentState);
      
              Initially(
                  When(OrderCreated)
                      .Then(context =>
                      {
                          context.Saga.OrderId = context.Message.OrderId;
                          context.Saga.OrderItems = context.Message.OrderItems;
      					var amount = context.Message.OrderItems.Sum(x => x.Price * x.Qty);
                          context.Saga.Amount = amount;
                      })
                      .PublishAsync(context => context.Init<IDeduceInventoryCommand>(new
                      {
                          context.Saga.OrderId,
                          DeduceInventoryItems =
                              context.Saga.OrderItems.Select(x => new DeduceInventoryItem(x.SkuId, x.Qty)).ToList()
                      }))
                      .TransitionTo(Created));
      
              During(Created,
                  When(DeduceInventorySucceed)
                      .Then(context =>
                      {
                          context.Publish<IPayOrderCommand>(new
                          {
                              context.Saga.OrderId,
                              context.Saga.Amount
                          });
                      }).TransitionTo(InventoryDeducted),
                  When(DeduceInventoryFailed).Then(context =>
                  {
                      context.Publish<ICancelOrderCommand>(new
                      {
                          context.Saga.OrderId
                      });
                  })
              );
      
              During(InventoryDeducted,
                  When(PayOrderFailed).Then(context =>
                  {
                      context.Publish<IReturnInventoryCommand>(new
                      {
                          context.Message.OrderId,
                          ReturnInventoryItems =
                              context.Saga.OrderItems.Select(x => new ReturnInventoryItem(x.SkuId, x.Qty)).ToList()
                      });
                  }),
                  When(PayOrderSucceed).TransitionTo(Paid).Then(context => context.SetCompleted()),
                  When(ReturnInventorySucceed)
                      .ThenAsync(context => context.Publish<ICancelOrderCommand>(new
                      {
                          context.Saga.OrderId
                      })).TransitionTo(Created));
      
              DuringAny(When(OrderCanceled).TransitionTo(Canceled).ThenAsync(async context =>
              {
                  await Task.Delay(TimeSpan.FromSeconds(10));
                  await context.SetCompleted();
              }));
      
      
              DuringAny(
                  When(OrderStateRequested)
                      .RespondAsync(x => x.Init<IOrderStateResponse>(new
                      {
                          x.Saga.OrderId,
                          State = x.Saga.CurrentState
                      }))
              );
          }
      }
      

      庫存服務

      庫存服務在整個下單流程的職責主要是庫存的扣減和返還,其僅需要訂閱IDeduceInventoryCommandIReturnInventoryCommand兩個命令并實現即可。代碼如下所示:

      using MassTransit.SmDemo.InventoryService.Repositories;
      using MassTransit.SmDemo.Shared.Contracts;
      
      namespace MassTransit.SmDemo.InventoryService.Consumers;
      
      public class DeduceInventoryConsumer : IConsumer<IDeduceInventoryCommand>
      {
          private readonly ILogger<DeduceInventoryConsumer> _logger;
      
          public DeduceInventoryConsumer(ILogger<DeduceInventoryConsumer> logger)
          {
              _logger = logger;
          }
      
          public async Task Consume(ConsumeContext<IDeduceInventoryCommand> context)
          {
              if (!CheckStock(context.Message.DeduceInventoryItems))
              {
                  _logger.LogWarning($"Insufficient stock for order [{context.Message.OrderId}]!");
                  await context.Publish<IDeduceInventoryFailed>(
                      new { context.Message.OrderId, Reason = "insufficient stock" });
              }
              else
              {
                  _logger.LogInformation($"Inventory has been deducted for order [{context.Message.OrderId}]!");
                  DeduceStocks(context.Message.DeduceInventoryItems);
                  await context.Publish<IDeduceInventorySucceed>(new { context.Message.OrderId });
              }
          }
      
      
          private bool CheckStock(List<DeduceInventoryItem> deduceItems)
          {
              foreach (var stockItem in deduceItems)
              {
                  if (InventoryRepository.GetStock(stockItem.SkuId) < stockItem.Qty) return false;
              }
      
              return true;
          }
      
          private void DeduceStocks(List<DeduceInventoryItem> deduceItems)
          {
              foreach (var stockItem in deduceItems)
              {
                  InventoryRepository.TryDeduceStock(stockItem.SkuId, stockItem.Qty);
              }
          }
      }
      
      namespace MassTransit.SmDemo.InventoryService.Consumers;
      
      public class ReturnInventoryConsumer : IConsumer<IReturnInventoryCommand>
      {
          private readonly ILogger<ReturnInventoryConsumer> _logger;
      
          public ReturnInventoryConsumer(ILogger<ReturnInventoryConsumer> logger)
          {
              _logger = logger;
          }
      
          public async Task Consume(ConsumeContext<IReturnInventoryCommand> context)
          {
              foreach (var returnInventoryItem in context.Message.ReturnInventoryItems)
              {
                  InventoryRepository.ReturnStock(returnInventoryItem.SkuId, returnInventoryItem.Qty);
              }
      
              _logger.LogInformation($"Inventory has been returned for order [{context.Message.OrderId}]!");
              await context.Publish<IReturnInventorySucceed>(new { context.Message.OrderId });
          }
      }
      

      支付服務

      對于下單流程的支付用例來說,要么成功要么失敗,因此僅需要訂閱IPayOrderCommand命令即可,具體PayOrderConsumer實現如下:

      using MassTransit.SmDemo.Shared.Contracts;
      
      namespace MassTransit.SmDemo.PaymentService.Consumers;
      
      public class PayOrderConsumer : IConsumer<IPayOrderCommand>
      {
          private readonly ILogger<PayOrderConsumer> _logger;
      
          public PayOrderConsumer(ILogger<PayOrderConsumer> logger)
          {
              _logger = logger;
          }
          public async Task Consume(ConsumeContext<IPayOrderCommand> context)
          {
              await Task.Delay(TimeSpan.FromSeconds(10));
              if (context.Message.Amount % 2 == 0)
              {_logger.LogInformation($"Order [{context.Message.OrderId}] paid successfully!");
                  await context.Publish<IPayOrderSucceed>(new { context.Message.OrderId });
              }
              else
              {
                  _logger.LogWarning($"Order [{context.Message.OrderId}] payment failed!");
                  await context.Publish<IPayOrderFailed>(new
                  {
                      context.Message.OrderId,
                      Reason = "Insufficient account balance"
                  });
              }
          }
      }
      

      運行結果

      啟動三個項目,并在Swagger中發起訂單創建請求,如下圖所示:

      由于訂單總額為奇數,因此支付會失敗,最終控制臺輸出如下圖所示:

      打開RabbitMQ后臺,可以看見MassTransit按照約定創建了以下隊列用于服務間的消息傳遞:

      其中order-state隊列綁定到類型為fanout的同名order-stateExchange,其綁定關系如下圖所示,該Exchange負責從其他同名事件的Exchange轉發事件。

      總結

      通過以上示例的講解,相信了解到MassTransit StateMachine的強大之處。StateMachine充當著事務編排器的角色,通過集中定義狀態、轉移條件和狀態轉移的執行順序,實現高內聚的事務流轉控制,也確保了其他伴生服務僅需關注自己的業務邏輯,而無需關心事務的流轉,真正實現了關注點分離。

      posted @ 2023-01-02 14:57  「圣杰」  閱讀(3579)  評論(5)    收藏  舉報
      主站蜘蛛池模板: 成人国产精品一区二区不卡| 国产乱码精品一区二区三上| 建宁县| 日韩精品中文女同在线播放 | 久久综合伊人77777| 国产亚洲亚洲国产一二区| 无码精品一区二区免费AV| 国产福利片无码区在线观看| 天天澡日日澡狠狠欧美老妇| 久久精品亚洲精品国产色婷| 亚洲av色香蕉一二三区| 亚洲成A人片在线观看的电影| 亚洲中文字幕精品一区二区三区 | 国产老熟女乱子一区二区| 国产偷窥熟女精品视频大全 | 国产免费无遮挡吃奶视频| 柳江县| 亚洲日韩精品一区二区三区无码| 一本久道久久综合狠狠躁av| 无码人妻一区二区三区免费N鬼沢 亚洲国产精品自产在线播放 | 亚洲精品久久久久国色天香| 人妻系列无码专区无码中出| 视频一区视频二区在线视频| 久久国产成人亚洲精品影院老金| 日韩人妻熟女中文字幕a美景之屋| 天堂а√在线最新版中文在线| 中文字幕理伦午夜福利片| 国产真实野战在线视频| 亚洲一区二区三区人妻天堂| 视频一区视频二区视频三区| 久久精品蜜芽亚洲国产AV| 国产av午夜精品福利| 亚洲最大av一区二区| 免费国产午夜理论片不卡 | 饥渴的熟妇张开腿呻吟视频| 久草热在线视频免费播放| 亚洲爆乳少妇无码激情| 亚洲中文字幕无码中字| 老色鬼永久精品网站| 一区二区三区不卡国产| 亚洲www永久成人网站|