<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      MassTransit 知多少 | .NET 分布式應用框架

      引言

      A free, open-source distributed application framework for .NET.
      一個免費、開源的.NET 分布式應用框架。 -- MassTransit 官網

      MassTransit,直譯公共交通, 是由Chris Patterson開發的基于消息驅動的.NET 分布式應用框架,其核心思想是借助消息來實現服務之間的松耦合異步通信,進而確保應用更高的可用性、可靠性和可擴展性。通過對消息模型的高度抽象,以及對主流的消息代理(包括RabbitMQ、ActiveMQ、Kafaka、Azure Service Bus、Amazon SQS等)的集成,大大簡化了基于消息驅動的開發門檻,同時內置了連接管理、消息序列化和消費者生命周期管理,以及諸如重試、限流、斷路器等異常處理機制,讓開發者更好的專注于業務實現。
      簡而言之,MassTransit實現了消息代理透明化。無需面向消息代理編程進行諸如連接管理、隊列的申明和綁定等操作,即可輕松實現應用間消息的傳遞和消費。

      快速體驗

      空口無憑,創建一個項目快速體驗一下。

      1. 基于worker模板創建一個基礎項目:dotnet new worker -n MassTransit.Demo
      2. 打開項目,添加NuGet包:MassTransit
      3. 定義訂單創建事件消息契約:
      using System;
      
      namespace MassTransit.Demo
      {
          public record OrderCreatedEvent
          {
              public Guid OrderId { get; set; }
          }
      }
      
      1. 修改Worker類,發送訂單創建事件:
      namespace MassTransit.Demo;
      
      public class Worker : BackgroundService
      {
          readonly IBus _bus;//注冊總線
          public Worker(IBus bus)
          {
              _bus = bus;
          }
          protected override async Task ExecuteAsync(CancellationToken stoppingToken)
          {
              while (!stoppingToken.IsCancellationRequested)
              {
                  //模擬并發送訂單創建事件
                  await _bus.Publish(new OrderCreatedEvent(Guid.NewGuid()), stoppingToken);
                  await Task.Delay(1000, stoppingToken);
              }
          }
      }
      
      
      1. 僅需實現IConsumer<OrderCreatedEvent>泛型接口,即可實現消息的訂閱:
      public class OrderCreatedEventConsumer: IConsumer<OrderCreatedEvent>
      {
          private readonly ILogger<OrderCreatedEventConsumer> _logger;
          public OrderCreatedEventConsumer(ILogger<OrderCreatedEventConsumer> logger)
          {
              _logger = logger;
          }
          public Task Consume(ConsumeContext<OrderCreatedEvent> context)
          {
              _logger.LogInformation($"Received Order:{context.Message.OrderId}");
              return Task.CompletedTask;
          }
      }
      
      1. 注冊服務:
      using MassTransit;
      using MassTransit.Demo;
      
      IHost host = Host.CreateDefaultBuilder(args)
          .ConfigureServices(services =>
          {
              services.AddHostedService<Worker>();
              services.AddMassTransit(configurator =>
              {
                  //注冊消費者
                  configurator.AddConsumer<OrderCreatedEventConsumer>();
                  //使用基于內存的消息路由傳輸
                  configurator.UsingInMemory((context, cfg) =>
                  {
                      cfg.ConfigureEndpoints(context);
                  });
              });
          })
          .Build();
      
      await host.RunAsync();
      
      
      1. 運行項目,一個簡單的進程內事件發布訂閱的應用就完成了。

      如果需要使用RabbitMQ 消息代理進行消息傳輸,則僅需安裝MassTransit.RabbitMQNuGet包,然后指定使用RabbitMQ 傳輸消息即可。

      using MassTransit;
      using MassTransit.Demo;
      
      IHost host = Host.CreateDefaultBuilder(args)
          .ConfigureServices(services =>
          {
              services.AddHostedService<Worker>();
              services.AddMassTransit(configurator =>
              {
                  configurator.AddConsumer<OrderCreatedEventConsumer>();
                  
                  // configurator.UsingInMemory((context, cfg) =>
                  // {
                  //     cfg.ConfigureEndpoints(context);
                  // });
                  
                  configurator.UsingRabbitMq((context, cfg) =>
                  {
                      cfg.Host(
                          host: "localhost",
                          port: 5672,
                          virtualHost: "/",
                          configure: hostConfig =>
                          {
                              hostConfig.Username("guest");
                              hostConfig.Password("guest");
                          });
                      cfg.ConfigureEndpoints(context);
                  });
              });
          })
          .Build();
      
      await host.RunAsync();
      
      

      運行項目,MassTransit會自動在指定的RabbitMQ上創建一個類型為fanoutMassTransit.Demo.OrderCreatedEventExchange和一個與OrderCreatedEvent同名的隊列進行消息傳輸,如下圖所示。

      核心概念

      MassTranist 為了實現消息代理的透明化和應用間消息的高效傳輸,抽象了以下概念,其中消息流轉流程如下圖所示:

      1. Message:消息契約,定義了消息生產者和消息消費者之間的契約。
      2. Producer:生產者,發送消息的一方都可以稱為生產者。
      3. SendEndpoint:發送端點,用于將消息內容序列化,并發送到傳輸模塊。
      4. Transport:傳輸模塊,消息代理透明化的核心,用于和消息代理通信,負責發送和接收消息。
      5. ReceiveEndpoint:接收端點,用于從傳輸模塊接收消息,反序列化消息內容,并將消息路由到消費者。
      6. Consumer:消費者,用于消息消費。

      從上圖可知,本質上還是發布訂閱模式的實現,接下來就核心概念進行詳解。

      Message

      Message:消息,可以使用class、interface、struct和record來創建,消息作為一個契約,需確保創建后不能篡改,因此應只保留只讀屬性且不應包含方法和行為。MassTransit使用的是包含命名空間的完全限定名即typeof(T).FullName來表示特定的消息類型。因此若在另外的項目中消費同名的消息類型,需確保消息的命名空間相同。另外需注意消息不應繼承,以避免發送基類消息類型造成的不可預期的結果。為避免此類情況,官方建議使用接口來定義消息。在MassTransit中,消息主要分為兩種類型:

      1. Command:命令,用于告訴服務做什么,命令被發送到指定端點,僅被一個服務接收并執行。一般以動名詞結構命名,如:UpdateAddress、CancelOrder。
      2. Event:事件,用于告訴服務什么發生了,事件被發布到多個端點,可以被多個服務消費。 一般以過去式結構命名,如:AddressUpdated,OrderCanceled。

      經過MassTransit發送的消息,會使用信封包裝,包含一些附加信息,數據結構舉例如下:

      {
          "messageId": "6c600000-873b-00ff-9a8f-08da8da85542",
          "requestId": null,
          "correlationId": null,
          "conversationId": "6c600000-873b-00ff-9526-08da8da85544",
          "initiatorId": null,
          "sourceAddress": "rabbitmq://localhost/THINKPAD_MassTransitDemo_bus_ptoyyyr88cyx9s1gbdpe5kniy1?temporary=true",
          "destinationAddress": "rabbitmq://localhost/MassTransit.Demo:OrderCreatedEvent",
          "responseAddress": null,
          "faultAddress": null,
          "messageType": [
              "urn:message:MassTransit.Demo:OrderCreatedEvent"
          ],
          "message": {
              "orderId": "fd8a3598-4c3a-4ec9-bbf9-d5f508e1a0d8"
          },
          "expirationTime": null,
          "sentTime": "2022-09-03T12:32:15.0796943Z",
          "headers": {},
          "host": {
              "machineName": "THINKPAD",
              "processName": "MassTransit.Demo",
              "processId": 24684,
              "assembly": "MassTransit.Demo",
              "assemblyVersion": "1.0.0.0",
              "frameworkVersion": "6.0.5",
              "massTransitVersion": "8.0.6.0",
              "operatingSystemVersion": "Microsoft Windows NT 10.0.19044.0"
          }
      }
      

      從以上消息實例中可以看出一個包裝后的消息包含以下核心屬性:

      1. messageId:全局唯一的消息ID
      2. messageType:消息類型
      3. message:消息體,也就是具體的消息實例
      4. sourceAddress:消息來源地址
      5. destinationAddress:消息目標地址
      6. responseAddress:響應地址,在請求響應模式中使用
      7. faultAddress:消息異常發送地址,用于存儲異常消費消息
      8. headers:消息頭,允許應用自定義擴展信息
      9. correlationId:關聯Id,在Saga狀態機中會用到,用來關聯系列事件
      10. host:宿主,消息來源應用的宿主信息

      Producer

      Producer,生產者,即用于生產消息。在MassTransit主要借助以下對象進行命令的發送和事件的發布。

      從以上類圖可以看出,消息的發送主要核心依賴于兩個接口:

      1. ISendEndpoint:提供了Send方法,用于發送命令。
      2. IPublishEndpoint:提供了Publish方法,用于發布事件。

      但基于上圖的繼承體系,可以看出通過IBusISendEndpointProviderConsumeContext進行命令的發送;通過IBusIPublishEndpointProvider進行事件的發布。具體舉例如下:

      發送命令

      1. 通過IBus發送:
      private readonly IBus _bus;
      public async Task Post(CreateOrderRequest request)
      {
          //通過以下方式配置對應消息類型的目標地址
          EndpointConvention.Map<CreateOrderRequest>(new Uri("queue:create-order"));
          await _bus.Send(request);
      }
      
      1. 通過ISendEndpointProvider發送:
      private readonly ISendEndpointProvider  _sendEndpointProvider;
      public async Task Post(CreateOrderRequest request)
      {
          var serviceAddress = new Uri("queue:create-order");
          var endpoint = await _sendEndpointProvider.GetSendEndpoint(serviceAddress);
          await endpoint.Send(request);
      }
      
      1. 通過ConsumeContext發送:
      public class CreateOrderRequestConsumer:IConsumer<CreateOrderRequest>
      {    
          public async Task Consume(ConsumeContext<CreateOrderRequest> context)
          {
          	//do something else
              var destinationAddress = new Uri("queue:lock-stock");
              var command = new LockStockRequest(context.Message.OrderId);
             
              await context.Send<LockStockRequest>(destinationAddress, command);
       		// 也可以通過獲取`SendEndpoint`發送命令
              // var endpoint = await context.GetSendEndpoint(destinationAddress);
              // await endpoint.Send<LockStockRequest>(command);
          	
          }
      }
      

      發布事件

      1. 通過IBus發布:
      private readonly IBus _bus;
      public async Task Post(CreateOrderRequest request)
      {
          //do something
          await _bus.Publish(request);
      }
      
      1. 通過IPublishEndpoint發布:
      private readonly IPublishEndpoint _publishEndpoint;
      public async Task Post(CreateOrderRequest request)
      {
          //do something
          var order = CreateOrder(request);
          await _publishEndpoint.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));
      }
      
      1. 通過ConsumeContext發布:
      public class CreateOrderRequestConsumer: IConsumer<CreateOrderRequest>
      {    
          public async Task Consume(ConsumeContext<CreateOrderRequest> context)
          {
              var order = CreateOrder(conext.Message);
          	await context.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));
          }
      }
      

      Consumer

      Consumer,消費者,即用于消費消息。MassTransit 包括多種消費者類型,主要分為無狀態和有狀態兩種消費者類型。

      無狀態消費者

      無狀態消費者,即消費者無狀態,消息消費完畢,消費者就釋放。主要的消費者類型有:IConsumer<TMessage>JobConsumerIActivityRoutingSlip等。其中IConsumer<TMessage>已經在上面的快速體驗部分舉例說明。而JobConsumer<TMessage>主要是對IConsumer<TMessage>的補充,其主要應用場景在于執行耗時任務。
      而對于IActivityRoutingSlip則是MassTransit Courier的核心對象,主要用于實現Saga模式的分布式事務。MassTransit Courier 實現了Routing Slip模式,通過按需有序組合一系列的Activity,得到一個用來限定消息處理順序的Routing Slip。而每個Activity的具體抽象就是IActivityIExecuteActivity。二者的差別在于IActivity定義了ExecuteCompensate兩個方法,而IExecuteActivitiy僅定義了Execute方法。其中Execute代表正向操作,Compensate代表反向補償操作。用一個簡單的下單流程:創建訂單->扣減庫存->支付訂單舉例而言,其示意圖如下所示。而對于具體實現,可參閱文章:AspNetCore&MassTransit Courier實現分布式事務

      有狀態消費者

      有狀態消費者,即消費者有狀態,其狀態會持久化,代表的消費者類型為MassTransitStateMachineMassTransitStateMachineMassTransit Automatonymous 庫定義的,Automatonymous 是一個.NET 狀態機庫,用于定義狀態機,包括狀態、事件和行為。MassTransitStateMachine就是狀態機的具體抽象,可以用其編排一系列事件來實現狀態的流轉,也可以用來實現Saga模式的分布式事務。并支持與EF Core和Dapper集成將狀態持久化到關系型數據庫,也支持將狀態持久化到MongoDB、Redis等數據庫。MassTransitStateMachine對于Saga模式分布式事務的實現方式與RoutingSlip不同,還是以簡單的下單流程:創建訂單->扣減庫存->支付訂單舉例而言,其示意圖如下所示。基于MassTransitStateMachine 實現分布式事務詳參后續文章。

      從上圖可知,通過MassTransitStateMachine可以將事件的執行順序邏輯編排在一個集中的狀態機中,通過發送命令和訂閱事件來推動狀態流轉,而這也正是Saga編排模式的實現。

      應用場景

      了解完MassTransit的核心概念,接下來再來看下MassTransit的核心特性以及應用場景:

      1. 基于消息的請求響應模式:可用于同步通信
      2. Mediator模式:中間者模式的實現,類似MediatR,但功能更完善
      3. 計劃任務:可用于執行定時任務
      4. Routing Slip 模式:可用于實現Saga模式的分布式事務
      5. Saga 狀態機:可用于實現Saga模式的分布式事務
      6. 本地消息表:類似DotNetCore.Cap,用于實現最終一致性

      總體而言,MassTransit是一款優秀的分布式應用框架,可作為分布式應用的消息總線,也可以用作單體應用的事件總線。感興趣的朋友不妨一觀。

      posted @ 2022-10-23 17:43  「圣杰」  閱讀(9656)  評論(25)    收藏  舉報
      主站蜘蛛池模板: 香港经典a毛片免费观看播放| 成人自拍短视频午夜福利| 麻豆蜜桃av蜜臀av色欲av| 久久精品国产亚洲av热一区| 野花香视频在线观看免费高清版| 亚洲综合一区二区国产精品| 色综合热无码热国产| 中文字幕结果国产精品| 国产精品乱人伦一区二区| 国产情侣激情在线对白| 超碰人人超碰人人| 2020国产欧洲精品网站| 又大又粗欧美黑人aaaaa片| 欧美片内射欧美美美妇| 澳门永久av免费网站| 纳雍县| 亚洲天堂亚洲天堂亚洲色图| 亚洲国产精品人人做人人爱| 精品人妻av区乱码| 最新中文字幕av无码专区不| 国产 另类 在线 欧美日韩| 男女扒开双腿猛进入爽爽免费看| 蜜桃臀av在线一区二区| 国产99在线 | 免费| 九九热精品在线观看| 好吊视频一区二区三区人妖| 国产区成人精品视频| 亚洲Av综合日韩精品久久久 | 国产精品视频一区二区不卡| 成人福利国产午夜AV免费不卡在线| 大又大又粗又硬又爽少妇毛片| 男女激情一区二区三区| 麻豆久久久9性大片| 国产精品18久久久久久麻辣| ww污污污网站在线看com| 欧美巨大巨粗黑人性aaaaaa| 亚洲综合久久精品哦夜夜嗨| 国产精品久久中文字幕| 国色天香成人一区二区| 国产一卡2卡三卡4卡免费网站| 久久99九九精品久久久久蜜桃|