<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      如何優雅的使用RabbitMQ

      RabbitMQ無疑是目前最流行的消息隊列之一,對各種語言環境的支持也很豐富,作為一個.NET developer有必要學習和了解這一工具。消息隊列的使用場景大概有3種:

      1、系統集成,分布式系統的設計。各種子系統通過消息來對接,這種解決方案也逐步發展成一種架構風格,即“通過消息傳遞的架構”。

      2、當系統中的同步處理方式嚴重影響了吞吐量,比如日志記錄。假如需要記錄系統中所有的用戶行為日志,如果通過同步的方式記錄日志勢必會影響系統的響應速度,當我們將日志消息發送到消息隊列,記錄日志的子系統就會通過異步的方式去消費日志消息。

      3、系統的高可用性,比如電商的秒殺場景。當某一時刻應用服務器或數據庫服務器收到大量請求,將會出現系統宕機。如果能夠將請求轉發到消息隊列,再由服務器去消費這些消息將會使得請求變得平穩,提高系統的可用性。

      一、開始使用RabbitMQ

      RabbitMQ官網提供了詳細的安裝步驟,另外官網還提供了RabbitMQ在六種場景的使用教程。其中教程1、3、6將覆蓋99%的使用場景,所以正常來說只需要搞清楚這3個教程即可快速上手。

      二、簡單分析

      我們以官方提供的教程1做個簡單梳理:該教程展示了Producer如何向一個消息隊列(message queue)發送一個消息(message),消息消費者(Consumer)收到該消息后消費該消息。

      1、producer端:

                 var factory = new ConnectionFactory() { HostName = "localhost" };
                  using (var connection = factory.CreateConnection())
                  {
                      while (Console.ReadLine() != null)
                      {
                          using (var channel = connection.CreateModel())
                          {
                              //創建一個名叫"hello"的消息隊列
                              channel.QueueDeclare(queue: "hello",
                                  durable: false,
                                  exclusive: false,
                                  autoDelete: false,
                                  arguments: null);
      
                              var message = "Hello World!";
                              var body = Encoding.UTF8.GetBytes(message);
      
                              //向該消息隊列發送消息message
                              channel.BasicPublish(exchange: "",
                                  routingKey: "hello",
                                  basicProperties: null,
                                  body: body);
                              Console.WriteLine(" [x] Sent {0}", message);
                          }
                      }
                  }

      該段代碼非常簡單,幾乎到了無法精簡的地步:創建了一個信道(channel)->創建一個隊列->向該隊列發送消息。

      2、Consumer端

                 var factory = new ConnectionFactory() { HostName = "localhost" };
                  using (var connection = factory.CreateConnection())
                  {
                      using (var channel = connection.CreateModel())
                      {
                          //創建一個名為"hello"的隊列,防止producer端沒有創建該隊列
                          channel.QueueDeclare(queue: "hello",
                                               durable: false,
                                               exclusive: false,
                                               autoDelete: false,
                                               arguments: null);
      
                          //回調,當consumer收到消息后會執行該函數
                          var consumer = new EventingBasicConsumer(channel);
                          consumer.Received += (model, ea) =>
                          {
                              var body = ea.Body;
                              var message = Encoding.UTF8.GetString(body);
                              Console.WriteLine(" [x] Received {0}", message);
                          };
      
                          //消費隊列"hello"中的消息
                          channel.BasicConsume(queue: "hello",
                                               noAck: true,
                                               consumer: consumer);
      
                          Console.WriteLine(" Press [enter] to exit.");
                          Console.ReadLine();
                      }
                  }
      

      該段代碼可以理解為:創建信道->創建隊列->定義回調函數->消費消息。

      該實例描述了Send/Receive模式,可以簡單理解為1(producer) VS 1(consumer)的場景;

      實例3則描述了Publish/Subscriber模式,即1(producer) VS 多個(consumer);

      在以上兩個示例中,producer只需要發送消息即可,并不關心consumer的返回結果。實例6則描述了一個RPC調用場景,producer發送消息后還要接收consumer的返回結果,這一場景看起來跟使用消息隊列的目的有點相悖。因為使用消息隊列的目的之一就是要異步,但是這一場景似乎又將異步變成了同步,不過這一場景也很有用,比如一個用戶操作產生了一個消息,應用服務收到該消息后執行了一些邏輯并使得數據庫發生了變化,UI會一直等待應用服務的返回結果才刷新頁面。

      三、 發現抽象

      我桌子上放著一本RabbitMQ in Action,另外官網提供的文檔也很詳細,我感覺在一個月內我就能精通RabbitMQ,到時候簡歷上又可以寫上“精通…”,感覺有點小得意呢... ,但是我知道這并不是使用RabbitMQ的最佳方式。

      我們知道合理的抽象可以幫我們隱藏掉一些技術細節,讓我們將重心放在核心業務上,比如一個人問你:“大雁塔如何走?”你的回答可能是“小寨往東,一直走兩站,右手邊”,如果你回答:“右轉45度,向前走100米,再轉90度…”,對方就會迷失在這些細節中。

      消息隊列的使用過程中實際隱藏著一種抽象——服務總線(Service Bus)。

      我們在回頭看第一個例子,這個例子隱含的業務是:ClientA發送一個指令,ClientB收到該指令后做出反應。如果是這樣,我們為什么要關心如何創建channel,如何創建一個queue? 我僅僅是要發送一個消息而已。另外這個例子寫的其實不夠健壯:

      沒有重試機制:如果ClientB第一次沒有執行成功如何對該消息處理?

      沒有錯誤處理機制:如果ClientB在重試了N次之后還是異常如何處理該消息?

      沒有熔斷機制;

      如何對ClientA做一個schedule(計劃安排),比如定時發送等;

      沒有消息審計機制;

      無法對消息的各個狀態做追蹤;

      事物處理等。

      服務總線正是這種場景的抽象,并且為我們提供了這些機制,讓我們趕快來看個究竟吧。

      四、初識MassTransit

      MassTransit是.NET平臺下的一款開源免費的ESB產品,官網:http://masstransit-project.com/,GitHub 700 star,500 Fork,類似的產品還有NServiceBus,之所以要選用MassTransit是因為他要比NServiceBus輕量級,另外在MassTransit開發之初就選用了RabbitMQ作為消息傳輸組建;同時我想拿他跟NServiceBus做個比較,看看他們到底有哪些側重點。

      1、新建控制臺應用程序:Masstransit.RabbitMQ.GreetingClient

      使用MassTransit可以從Nuget中安裝:

      Install-Package MassTransit.RabbitMQ

      2、創建服務總線,發送一個命令

              static void Main(string[] args)
              {
                  Console.WriteLine("Press 'Enter' to send a message.To exit, Ctrl + C");
      
                  var bus = BusCreator.CreateBus();
                  var sendToUri = new Uri($"{RabbitMqConstants.RabbitMqUri}{RabbitMqConstants.GreetingQueue}");
      
                  while (Console.ReadLine()!=null)
                  {
                      Task.Run(() => SendCommand(bus, sendToUri)).Wait();
                  }
      
                  Console.ReadLine();
              }
      
              private static async void SendCommand(IBusControl bus,Uri sendToUri)
              {
                  var endPoint =await bus.GetSendEndpoint(sendToUri);
                  var command = new GreetingCommand()
                  {
                      Id = Guid.NewGuid(),
                      DateTime = DateTime.Now
                  };
      
                  await endPoint.Send(command);
      
                  Console.WriteLine($"send command:id={command.Id},{command.DateTime}"); 
              }
      

      這一段代碼隱藏了眾多關于消息隊列的細節,將我們的注意力集中在發送消息上,同時ServiceBus提供的API也更接近業務,我們雖然發送的是一個消息,但是在這種場景下體現出來是一個命令,Send(command)這一API描述了我們的意圖。

      3、服務端接收這一命令

      新建一個命令臺控制程序:Masstransit.RabbitMQ.GreetingServer

                  var bus = BusCreator.CreateBus((cfg, host) =>
                  {
                      cfg.ReceiveEndpoint(host, RabbitMqConstants.GreetingQueue, e =>
                      {
                          e.Consumer<GreetingConsumer>();
      
                      });
                  });
      

      這一代碼可以理解為服務端在監聽消息,我們在服務端注冊了一個名為“GreetingConsumer”的消費者,GreetingConsumer的定義:

          public class GreetingConsumer :IConsumer<GreetingCommand>
          {
              public async Task Consume(ConsumeContext<GreetingCommand> context)
              {
      
                  await Console.Out.WriteLineAsync($"receive greeting commmand: {context.Message.Id},{context.Message.DateTime}");
              }
          }
      

      該consumer可以消費類型為GreetingCommand的消息。這一實例幾乎隱藏了有關RabbitMQ的技術細節,將代碼中心放在了業務中,將這兩個控制臺應用跑起來試試:

      五、實現Publish/Subscribe模式

      發布/訂閱模式使得基于消息傳遞的軟件架構成為可能,這一能力表現為ClientA發送消息X,ClientB和ClientC都可以訂閱消息X。

      1、我們在上面的例子中改造一下,當GreetingConsumer收到GreetingCommand后發送一個GreetingEvent:

                 var greetingEvent = new GreetingEvent()
                  {
                      Id = context.Message.Id,
                      DateTime = DateTime.Now
                  };
      
                  await context.Publish(greetingEvent);
      

      2、新建控制臺程序Masstransit.RabbitMQ.GreetingEvent.SubscriberA用來訂閱GreetingEvent消息:

                 var bus = BusCreator.CreateBus((cfg, host) =>
                  {
                      cfg.ReceiveEndpoint(host, RabbitMqConstants.GreetingEventSubscriberAQueue, e =>
                      {
                          e.Consumer<GreetingEventConsumer>();
                      });
                  });
      
                  bus.Start();
      

      定義GreetingEventConsumer:

         public class GreetingEventConsumer:IConsumer<Greeting.Message.GreetingEvent>
          {
              public async Task Consume(ConsumeContext<Greeting.Message.GreetingEvent> context)
              {
                  await Console.Out.WriteLineAsync($"receive greeting event: id {context.Message.Id}");
              }
          }
      

      這一代碼跟Masstransit.RabbitMQ.GreetingServer接受一個命令幾乎一模一樣,唯一的區別在于:

      在Send/Receive模式中Client首先要獲得對方(Server)的終結點(endpoint),直接向該終結點發送命令。Server方監聽自己的終結點并消費命令。

      而Publish/Subscribe模式中Client publish一個事件,SubscriberA在自己的終結點(endpointA)監聽事件,SubscriberB在自己的終結點(endpointB)監聽事件。

      3、根據上面的分析再定義一個Masstransit.RabbitMQ.GreetingEvent.SubscriberB

      4、將4個控制臺應用程序跑起來看看

      六、實現RPC模式

      這一模式在Masstransit中被稱作Request/Response模式,通過IRequestClient<IRequest, IResponse> 接口來實現相關操作。一個相關的例子在官方的github

       

      結束語:本篇文章分析了如何使用Masstransit來抽象業務,避免直接使用具體的消息隊列,當然本文提到的眾多服務總線機制,如“重試、熔斷等”并沒有在該文中出現,需要大家進一步去了解該項目。

      通過對Masstransit的一些試用和NServiceBus的對比,Masstransit在實際項目中很容易上手并且免費,各種API定義的也非常清晰,但是官方的文檔有點過于簡單,實際使用中還需要去做深入的研究。作為.NET平臺下為數不多的ESB開源產品,其關注程度還是不夠,期待大家為開源項目做出貢獻。

       

      本文例子提供下載:http://git.oschina.net/richieyangs/RabbitMQ.Practice

      posted @ 2016-05-16 06:50  richiezhang  閱讀(28373)  評論(42)    收藏  舉報
      主站蜘蛛池模板: a男人的天堂久久a毛片| 91亚洲国产成人久久蜜臀| 熟女一区二区中文字幕| 国产人妻精品无码av在线| 国产熟女一区二区三区蜜臀| 18禁在线一区二区三区| 国产手机在线αⅴ片无码观看| 国产肥臀视频一区二区三区| 99久久国产综合精品成人影院| 日韩秘 无码一区二区三区 | 免费A级毛片无码A∨蜜芽试看 | 伊在人间香蕉最新视频| 国产成人精品日本亚洲专区6| 精品无人区一区二区三区在线| 丰满人妻被黑人猛烈进入| 亚洲日本精品国产第一区| 崇左市| 国产精品视频白浆免费视频| 久久久天堂国产精品女人| 欧美人与动牲交a免费| 十九岁的日本电影免费观看| 国产精品免费AⅤ片在线观看| 成人毛片一区二区| 国产美女在线精品免费观看| 在线人成免费视频69国产| 久久精品国产精品亚洲综合| 男人扒开女人内裤强吻桶进去| 色综合五月伊人六月丁香| 中阳县| 亚洲精品天堂一区二区| 欧洲亚洲精品免费二区| 亚洲av二区三区在线| 亚洲国产精品一区二区久久| 亚洲午夜av一区二区| 人妻少妇无码精品专区| 亚洲精品日本一区二区| 亚洲高请码在线精品av| 国产精品天干天干综合网| 久久99精品国产99久久6尤物| 色av永久无码影院av| 18禁亚洲一区二区三区|