<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      使用Masstransit開發基于消息傳遞的分布式應用

      Masstransit作為.Net平臺下的一款優秀的開源產品卻沒有得到應有的關注,這段時間有機會閱讀了Masstransit的源碼,我覺得我有必要普及一下這個框架的使用。

      值得一提的是Masstransit的源碼寫的非常優秀,值得每個想提高自己編程能力的.Net選手閱讀,整個代碼看起來賞心悅目。反之,每次打開自己公司項目的時候心情都異常沉重。所以不是.Net不行,還是咱們水平不行。

      學會了Masstransit你再也不用羨慕別人有Dubbo、Mule、Akka什么的了,當然在某些方面他們的使用場景還是有一些區別。另外插播一條廣告:本人目前在西安求職中,如果那位同學有好的工作機會希望能夠幫忙推薦。

      閱讀本篇文章的前提是你需要對消息隊列有一些了解,特別是RabbitMq,Masstransit作為一款輕量級的ESB默認支持RabbitMq和MSMQ。本文的例子都使用RabbitMq來介紹,所以你最好能讀一下我之前寫的《如何優雅的使用RabbitMq》。

      簡單來說,Masstransit提供了使用消息隊列場景的一種抽象,也就是說,如果你有使用消息隊列的需求,都可以通過Masstransit來完成,當然如果僅僅是拿消息隊列來發個短信、郵件之類的并不能體現出Masstransit的優越性。當整個業務系統都通過Masstransit過來構建和交互的時候,才能真正體現ESB的價值所在。

      我寫了5不同場景個Demo,方便大家學習和參考。我會重點講解Real World的案例,也就是如何在真實場景使用Masstransit。如果僅僅是把一些組件融入到了項目中并且能夠運行,并不能算是一個合格的架構師,一個合格的架構師一定是可以將某個組件以最佳實踐的方式融入到了自己的項目中,并且能夠為開發者提供清晰且合理的抽象,然后針對這一方案制定一些約定和規則,隨著項目的推進,整個項目的代碼都能夠有章可循,始終在架構師的掌控之中。

      一、發送命令模型(Send Command Pattern)

      這種模型最常見的就是CQRS中C,用來向DomainHandler發送一個Command。另外系統的發送郵件服務、發送短信服務也可以通過這種模式來實現。這種模型跟郵遞員向郵箱投遞郵件有點相似。這一模型的特點是你需要知道對方終結點的地址,意味著你要明確要向哪個地址發送消息。從Masstransit提供的api就可以看出來:

      var endPoint =await bus.GetSendEndpoint(sendToUri);
                  var command = new GreetingCommandA()
                  {
                      Id = Guid.NewGuid(),
                      DateTime = DateTime.Now
                  };
      
                  await endPoint.Send(command);
      

      這個Demo主要由2個工程組成,Client發送消息到Server,Server來響應這一消息。

      二、發布/訂閱模型(publish/subscribe pattern)

      之所以有基于消息傳遞的分布式應用這種架構模式,很大程度上就是依靠這種模式來完成。一個典型的例子是子系統A發布了一條消息,子系統B和子系統C都可以訂閱這一消息并異步處理該消息。而這一過程對子系統A來說是不關心的。從而減少不同的子系統之間的耦合,提高系統的可擴展性。

      三、消息的繼承層次

      用過RabbitMQ的同學應該知道,RabbitMQ提供了3中類型的Exchange,分別為direct、fanout和topic。所有這一切都是為了提供一種路由消息的機制。而這一切是通過匹配一種字符串類型的routingKey來實現的,當然有了Masstransit你就不用這么費勁了。C#作為一種強類型的語言,我們可以通過設計消息的繼承層次來實現消息的路由機制。比如我們可以設計下面的消息繼承體系:

       public interface IMessage
          {
              Guid Id { get; set; }
          }
      
          public class Message : IMessage
          {
              public Guid Id { get; set; }
              public string Type { get; set; }
          }
      
          public class UserUpdatedMessage : Message
          {
              public Guid Id { get; set; }
          }
      

      有了這樣的繼承體系,我們可以定義下面的Consumer類型:

      public class BaseInterfaceMessageConsumer:IConsumer<IMessage>
          {
              public async Task Consume(ConsumeContext<IMessage> context)
              {
                  await Console.Out.WriteLineAsync($"consumer is BaseInterfaceMessageConsumer,message type is {context.Message.GetType()}");
              }
          }
      

      還可以定義下面的Consumer類型:

      public class UserUpdatedMessageConsumer: IConsumer<UserUpdatedMessage>
          {
              public async Task Consume(ConsumeContext<UserUpdatedMessage> context)
              {
                  await Console.Out.WriteLineAsync($"consumer is UserUpdatedMessageConsumer,message type is {context.Message.GetType()}");
              }
          }

      這樣就可以路由不同的消息到相應的Consumer中了。

      四、使用Topshelf來構建windows服務

      我們最終要將consumer程序集打成windows服務來安裝在產品環境下,Topshelf為我們提供了一組DSL描述的api來創建window服務:

      HostFactory.Run(x =>                                 
                  {
                      x.Service<GreetingServer>(s =>                        
                      {
                          s.ConstructUsing(name => new GreetingServer());     
                          s.WhenStarted(tc => tc.Start());            
                          s.WhenStopped(tc => tc.Stop());
                      });
                      x.StartAutomatically();
                      x.RunAsLocalSystem();                          
                      x.SetDescription("A greeting service");        
                      x.SetDisplayName("Greeting Service");                      
                      x.SetServiceName("GreetingService");     
                  });

      五、RPC調用(request/response pattern)

      我們還可以通過Masstransit實現RPC調用:

      var response = await client.Request(new SimpleRequest() {CustomerId = customerId});
      
      Console.WriteLine("Customer Name: {0}", response.CusomerName);
      

      這有點像是一個webservice調用,不過在ESB的設計中我們應該盡量避免這種設計,特別是在異構系統之間,應該盡量采用send command pattern和publish/subscriber pattern。

      六、正式場景該如何使用Masstransit

      在使用Masstranit的正式場景中,我們主要考慮以下幾個方面:

      1、配置方式

      定義一個抽象類,用來統一配置方式:

          public abstract class BusConfiguration
          {
              public abstract string RabbitMqAddress { get; }
              public abstract string QueueName { get; }
              public abstract string RabbitMqUserName { get; }
              public abstract string RabbitMqPassword { get; }
              public abstract Action<IRabbitMqBusFactoryConfigurator,IRabbitMqHost> Configuration { get; }
      
      
              public virtual IBus CreateBus()
              {
                  var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
                  {
                      var host = cfg.Host(new Uri(RabbitMqAddress), hst =>
                      {
                          hst.Username(RabbitMqUserName);
                          hst.Password(RabbitMqPassword);
                      });
      
                      Configuration?.Invoke(cfg, host);
                  });
      
                  return bus;
              }
          }
      

      具體的項目會繼承該配置類做對應的配置:如UserManagementBusConfiguration、UserManagementServiceBusConfiguration等

      2、能夠跟DI容器結合,本例以Castle Windsor Container為例:

      在web項目中添加ServiceBusInstaller:

      public class ServiceBusInstaller:IWindsorInstaller
          {
              public void Install(IWindsorContainer container, IConfigurationStore store)
              {
                  container.Register(
                      Component.For<IBus, IBusControl>()
                          .Instance(UserManagementBusConfiguration.BusInstance)
                          .LifestyleSingleton());
              }
          }
      

      然后我們就可以在controller中注入IBus了:

      private readonly IUserProvider _userProvider;
              private readonly IBus _bus;
      
              public ValuesController(IUserProvider userProvider,IBus bus)
              {
                  _userProvider = userProvider;
                  _bus = bus;
              }
      
              [HttpGet]
              [Route("api/values/createuser")]
              public string CreateUser()
              {
                  //save user in local db
      
                  _bus.Publish(new UserCreatedEvent() {UserName = "Tom", Email = "tom@google.com"});
      
                  return "create user named Tom";
              }
      

      同樣的道理,在consumer項目中也可以做同樣的配置,添加ConsumersInstaller:

      public class ConsumersInstaller:IWindsorInstaller
          {
              public void Install(IWindsorContainer container, IConfigurationStore store)
              {
                  container.Register(
                      Classes.FromThisAssembly().BasedOn(typeof (IConsumer)).WithServiceBase().WithServiceSelf().LifestyleTransient());
              }
          }
      

      在Consumer中注入一個組件試試:

      public class UserCreatedEventConsumer : IConsumer<UserCreatedEvent>
          {
              private readonly GreetingWriter _greetingWriter;
      
              public UserCreatedEventConsumer(GreetingWriter greetingWriter)
              {
                  _greetingWriter = greetingWriter;
              }
      
              public async Task Consume(ConsumeContext<UserCreatedEvent> context)
              {
                  _greetingWriter.SayHello();
      
                  await Console.Out.WriteLineAsync($"user name is {context.Message.UserName}");
                  await Console.Out.WriteLineAsync($"user email is {context.Message.Email}");
              }
          }
      

      把web項目和consumer服務都跑起來看看:

      3、重試配置

       cfg.UseRetry(Retry.Interval(3, TimeSpan.FromMinutes(1)));
      

      消息消費失敗后重試3次,每次間隔1分鐘

      4、限速器

      cfg.UseRateLimit(1000, TimeSpan.FromSeconds(1));
      

      每分鐘消息消費數限定在1000之內

      5、熔斷器

      cfg.UseCircuitBreaker(cb =>
                          {
                              cb.TrackingPeriod = TimeSpan.FromMinutes(1);
                              cb.TripThreshold = 15;
                              cb.ActiveThreshold = 10;
                          });

      參照Martin Folwer對熔斷器模式的描述:CircuitBreaker

      6、異常處理

       public class UserUpdatedEventComsumer
              :IConsumer<UserUpdatedEvent>
              ,IConsumer<Fault<UserUpdatedEvent>>
          {
              public Task Consume(ConsumeContext<UserUpdatedEvent> context)
              {
                  throw new System.NotImplementedException();
              }
      
              public async Task Consume(ConsumeContext<Fault<UserUpdatedEvent>> context)
              {
                  await Console.Out.WriteLineAsync($"catch exception: {context.Message.Message}");
              }
          }
      

      只要繼承于對應的Fault<TMessage>即可為對應的消息編寫異常處理。

      7、單元測試(待續)

      8、消息定時發送(待續)

      9、自定義中間件(待續)

      10、自定義觀察者(待續)

      11、長生命周期的消費者:Turnout(待續)

      12、長生命周期的狀態機:saga(待續)

      13、Routing slip pattern的實現:Courier(待續)

      整個Demo代碼提供下載:http://git.oschina.net/richieyangs/RabbitMQ.Practice

      posted @ 2016-08-02 21:49  richiezhang  閱讀(5972)  評論(24)    收藏  舉報
      主站蜘蛛池模板: 最新国产精品好看的精品| 亚洲a∨国产av综合av| 曰韩精品无码一区二区三区视频| 亚洲AV无码秘?蜜桃蘑菇| 亚洲熟妇色xxxxx欧美老妇| 成人av午夜在线观看| 她也色tayese在线视频| 亚洲影院丰满少妇中文字幕无码| 4hu亚洲人成人无码网www电影首页| 亚洲日本欧美日韩中文字幕| 精品亚洲国产成人av| 日韩人妻无码精品系列| 四虎女优在线视频免费看| 中文字幕人妻不卡精品| 国精产品999国精产品官网| 亚洲综合精品第一页| 无码一区二区三区AV免费| 国产一区在线播放av| 国产专区一va亚洲v天堂| 9久久伊人精品综合| 成人国产av精品免费网| 色伊人久久综合中文字幕| 国产旡码高清一区二区三区| 久久av高潮av喷水av无码| 九九热免费精品在线视频| 日本少妇被黑人xxxxx| 精品久久久久中文字幕日本| 欧美伦费免费全部午夜最新| 亚洲一区在线成人av| 亚洲精品一区二区三天美| 亚洲人成电影网站色| 欧美大胆老熟妇乱子伦视频| 欧美亚洲综合成人A∨在线| 四虎精品永久在线视频| 国内熟妇人妻色在线视频| 大陆一级毛片免费播放| 91孕妇精品一区二区三区| 深夜av免费在线观看| 石棉县| 无码av岛国片在线播放| 亚洲影院丰满少妇中文字幕无码|