<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      CQRS+ES項目解析-Diary.CQRS

      在《當(dāng)我們在討論CQRS時,我們在討論些神馬》中,我們討論了當(dāng)使用CQRS的過程中,需要關(guān)心的一些問題。其中與CQRS關(guān)聯(lián)最為緊密的模式莫過于Event Sourcing了,CQRS與ES的結(jié)合,為我們構(gòu)造高性能、可擴(kuò)展系統(tǒng)提供了基本思路。本文將介紹
      Kanasz Robert在《Introduction to CQRS》中的示例項目Diary.CQRS。

      獲取Diary.CQRS項目

      該項目為Kanasz Robert為了介紹CQRS模式而寫的一個測試項目,原始項目可以通過訪問《Introduction to CQRS》來獲取,由于項目版本比較舊,沒有使用nuget管理程序包等,導(dǎo)致下載以后并不能正常運(yùn)行,我下載了這個項目,升級到Visual Studio 2017,重新引用了StructMap框架(使用nuget),移除了Web層報錯的代碼,并上傳到博客園,可以從這里下載:Diary.CQRS.rar

      Diary.CQRS項目簡介

      Diary.CQRS項目的場景為日記本管理,提供了新增、編輯、刪除、列表等功能,整個解決方案分為三個項目:

      • Diary.CQRS:核心項目,完成了EventBus、CommandBus、Domain、Storage等功能,也是我們分析的重點(diǎn)。
      • Diary.CQRS.Configuration:服務(wù)配置,通過ServiceLocator類進(jìn)行依賴注入、服務(wù)查找功能。
      • Diary.CQRS.Web:用戶界面,MVC項目。

      這是一個很好的入門項目,功能簡單、結(jié)構(gòu)清晰,概念覆蓋全面。如果CQRS是一個城堡,那么Diary.CQRS則是打開第一重門的鑰匙,接下來讓我們一起推開這扇門吧。

      Diary.CQRS.Web

      運(yùn)行項目,最先看到的是一個Web頁面,如下圖:

      image

      很簡單,只有一個Add按鈕,當(dāng)我們點(diǎn)擊以后,會進(jìn)入添加的頁面:

      image

      我們填上一些內(nèi)容,然后點(diǎn)擊Save按鈕,就會返回到列表頁,我們可以看到已添加的條目:

      image

      然后我們進(jìn)行編輯操作,點(diǎn)擊列表中的Edit按鈕,跳轉(zhuǎn)到編輯頁面:

      image

      雖然頁面中顯示的是Add,但確實(shí)是Edit頁面。我們編輯以后點(diǎn)擊Save按鈕,然后返回列表頁即可看到編輯后的內(nèi)容。

      在列表頁中,如果我們點(diǎn)擊Delete按鈕,則會刪除改條目。

      到此為止,我們已經(jīng)看到了這個項目的所有頁面,一個簡單的CURD操作。我們繼續(xù)看它的代碼(在HomeController中)。

      Index:列表頁面

      public ActionResult Index()
      {
          ViewBag.Model = ServiceLocator.ReportDatabase.GetItems();
          return View();
      }
      

      通過ServiceLocator定位ReportDatabase,并從ReportDatabase中獲取所有條目。

      Add:新增頁面

      public ActionResult Add()
      {
          return View();
      }
      
      [HttpPost]
      public ActionResult Add(DiaryItemDto item)
      {
          ServiceLocator.CommandBus.Send(new CreateItemCommand(Guid.NewGuid(), item.Title, item.Description, -1, item.From, item.To));
          return RedirectToAction("Index");
      }
      

      兩個方法:

      • Add()方法,處理Get請求,返回新增視圖;
      • Add(DiaryItemDto item)方法,接收DiaryItemDto參數(shù),處理Post請求,創(chuàng)建并發(fā)送CreateItemCommand命令,然后返回到Index頁面

      Edit:編輯頁面

      public ActionResult Edit(Guid id)
      {
          var item = ServiceLocator.ReportDatabase.GetById(id);
          var model = new DiaryItemDto()
          {
              Description = item.Description,
              From = item.From,
              Id = item.Id,
              Title = item.Title,
              To = item.To,
              Version = item.Version
          };
          return View(model);
      }
      
      [HttpPost]
      public ActionResult Edit(DiaryItemDto item)
      {
          ServiceLocator.CommandBus.Send(new ChangeItemCommand(item.Id, item.Title, item.Description, item.From, item.To, item.Version));
          return RedirectToAction("Index");
      }
      

      仍然是兩個方法:

      • Edit(Guid id)方法,接收Guid作為參數(shù),并從ReportDatabase中獲取數(shù)據(jù),構(gòu)建dto對象返回給頁面
      • Edit(DiaryItemDto item)方法,接收DiaryItemDto對象,處理Post請求,接收到請求以后根據(jù)dto對象創(chuàng)建ChangeItemCommand命令,然后返回到Index頁面

      Delete:刪除操作

      public ActionResult Delete(Guid id)
      {
          var item = ServiceLocator.ReportDatabase.GetById(id);
          ServiceLocator.CommandBus.Send(new DeleteItemCommand(item.Id, item.Version));
          return RedirectToAction("Index");
      }
      

      對于刪除操作來說,它沒有視圖頁面,接收到請求以后,先獲取該記錄,創(chuàng)建并發(fā)送DeleteImteCommand命令,然后返回到Index頁面

      題外話:對于改變數(shù)據(jù)狀態(tài)的操作,使用Get請求是不可取的,可能存在安全隱患

      通過上面的代碼,你會發(fā)現(xiàn)所有的操作都是從ServiceLocator發(fā)起的,通過它我們能夠定位到CommandBus和ReportDatabase,從而進(jìn)行相應(yīng)的操作,我們在接下來會介紹ServiceLocator類。

      Diary.CQRS.Configuration

      Diary.CQRS.Configuration 項目中定義了ServiceLocator類,這個類的作用是完成IoC容器的服務(wù)注冊、服務(wù)定位功能。例如我們可以通過ServiceLocator獲取到CommandBus實(shí)例、獲取ReportDatabase實(shí)例。

      服務(wù)注冊

      ServiceLocator使用StructureMap作為依賴注入框架,提供了服務(wù)注冊、服務(wù)導(dǎo)航的功能。ServiceLocator類通過靜態(tài)構(gòu)造函數(shù)完成對服務(wù)注冊和服務(wù)實(shí)例化工作:

      static ServiceLocator()
      {
          if (!_isInitialized)
          {
              lock (_lockThis)
              {
                  ContainerBootstrapper.BootstrapStructureMap();
                  _commandBus = ObjectFactory.GetInstance<ICommandBus>();
                  _reportDatabase = ObjectFactory.GetInstance<IReportDatabase>();
                  _isInitialized = true;
              }
          }
      }
      

      首先調(diào)用ContainerBootstrapper.BootstrapStructureMap()方法,這個方法里面包含了對將服務(wù)添加到容器的代碼;然后使用容器創(chuàng)建CommandBus和ReportDatabase的實(shí)例。

      • CommandBus:命令總線,對應(yīng)Command操作,用來發(fā)送命令,程序中需要定義相應(yīng)的命令處理器,從而完成具體的操作。
      • ReportDatabase:報表數(shù)據(jù)庫,對應(yīng)Query操作,用來獲取數(shù)據(jù)。

      ServiceLocator的重要之處在于對外暴露了兩個至關(guān)重要的實(shí)例,分別處理CQRS中的Command和Query。

      為什么沒有Event相關(guān)操作呢?到目前為止我們還沒有涉及到,因?yàn)閷τ赨I層來說,用戶的意圖都是通過Command表示的,而數(shù)據(jù)的狀態(tài)變化才會觸發(fā)Event。

      Diary.CQRS

      在ServiceLocator中定義了獲取CommandBus和ReportDatabase的方法,我們順著這兩個對象繼續(xù)分析。

      CommandBus

      在基于消息的系統(tǒng)設(shè)計中,我們常會看到總線的身影,Command也是一種消息,所以使用總線是再合適不過的了。CommandBus就是我們在Diary.CQRS項目中用到的一種消息總線。

      在Diary.CQRS中,它被定義在Messaging目錄,在這個目錄下面,還有與Event相關(guān)的EventBus,我們稍后再進(jìn)行介紹。

      CommandBus實(shí)現(xiàn)ICommandBus接口,ICommandBus接口的定義如下:

      public interface ICommandBus
      {
          void Send<T>(T command) where T : Command;
      }
      

      它只包含了Send方法,用來將命令發(fā)送到對應(yīng)的處理程序。

      CommandBus是ICommand的實(shí)現(xiàn),具體代碼如下:

      public class CommandBus:ICommandBus
      {
          private readonly ICommandHandlerFactory _commandHandlerFactory;
      
          public CommandBus(ICommandHandlerFactory commandHandlerFactory)
          {
              _commandHandlerFactory = commandHandlerFactory;
          }
      
          public void Send<T>(T command) where T : Command
          {
              var handler = _commandHandlerFactory.GetHandler<T>();
              if (handler!=null)
              {
                  handler.Execute(command);
              }
              else
              {
                  throw new Exception();
              }
          }
      }
      

      在CommandBus中,顯式依賴ICommandHandlerFactory類,通過構(gòu)造函數(shù)進(jìn)行注入。那么 _commandHandlerFactory 的作用是什么呢?我們在Send方法中可以看到,通過 _commandHandlerFactory 可以獲取到與Command對應(yīng)的CommandHandler(命令處理程序),在程序的設(shè)計上,每一個Command都會有一個對應(yīng)的CommandHandler,而手工判斷類型、實(shí)例化處理程序顯然不符合使用習(xí)慣,此處采用工廠模式來獲取命令處理程序。

      當(dāng)獲取到與Command對應(yīng)的CommandHandler后,調(diào)用handler的Execute方法,執(zhí)行該命令。

      截止目前為止,我們又接觸了三個概念:CommandHandlerFactory、CommandHandler、Command:

      • CommandHandlerFactory:命令處理程序工廠,通過GetHandler方法獲取到與命令對應(yīng)的處理程序
      • CommandHandler:命令處理程序,用于執(zhí)行對應(yīng)的命令
      • Command:命令,描述用戶的意圖、并包含與意圖相關(guān)的數(shù)據(jù)

      CommandHandlerFactory

      使用簡單工廠模式,用來獲取與命令對應(yīng)的處理程序。它的代碼在Utils文件夾中,它的作用是提供一種獲取Handler的方式,所以它只能作為工具存在。

      接口定義如下:

      public interface ICommandHandlerFactory
      {
          ICommandHandler<T> GetHandler<T>() where T : Command;
      }
      

      只有GetHandler一個方法,它的實(shí)現(xiàn)是 StructureMapCommandHandlerFactory,即通過StructureMap作為依賴注入框架來實(shí)現(xiàn)的,代碼也比較簡單,這里不再貼出來了。

      Command和CommandHandler

      命令是代表用戶的意圖、并包含與意圖相關(guān)的數(shù)據(jù),比如用戶想要添加一條數(shù)據(jù),這便是一個意圖,于是就有了CreateItemCommand,用戶要在界面上填寫添加操作必須的數(shù)據(jù),于是就有了命令的屬性。

      關(guān)于命令的定義如下:

      public interface ICommand
      {
          Guid Id { get; }
      }
      
      public class Command : ICommand
      {
          public Guid Id { get; private set; }
          public int Version { get; set; }
      
          public Command(Guid id, int version)
          {
              Id = id;
              Version = version;
          }
      }
      
      • ICommand接口:包含Id屬性,這個Id表示Command對應(yīng)聚合的Id。聚合是領(lǐng)域驅(qū)動開發(fā)(DDD)的概念,表示一組強(qiáng)關(guān)聯(lián)的領(lǐng)域?qū)ο螅鴮酆现袪顟B(tài)的變更,只能通過聚合根(AggregateRoot)來完成。
      • Command類:實(shí)現(xiàn)了ICommand接口,并增加了Version屬性,用來標(biāo)記當(dāng)前操作對應(yīng)的聚合跟的版本。

      為什么要有版本的概念的?因?yàn)楫?dāng)使用ES模式的時候,數(shù)據(jù)庫中的數(shù)據(jù)都是事件產(chǎn)生的數(shù)據(jù)鏡像,保存了某個時間點(diǎn)的數(shù)據(jù)快照,如果要獲取到最新的數(shù)據(jù),則需要通過加載該聚合根對應(yīng)的所有Event來回放到最新狀態(tài)。如果引入版本的概念,每一個Event對應(yīng)一個版本,而景象中的數(shù)據(jù)也有一個版本,在進(jìn)行回放的時候,可以僅加載高版本的Event進(jìn)行回放,節(jié)省了系統(tǒng)資源,并提高了運(yùn)行效率。

      命令處理程序,它的作用是處理與它相對應(yīng)的命令,處理CQRS的核心,接口定義如下:

      public interface ICommandHandler<TCommand> where TCommand : Command
      {
          void Execute(TCommand command);
      }
      

      它接收command作為參數(shù),執(zhí)行該命令的處理邏輯。每一個命令都有一個與之對應(yīng)的處理程序。

      我們再重新梳理一下流程,首先用戶要新增一個數(shù)據(jù),點(diǎn)擊保存按鈕后,生成CreateItemCommand命令,隨后這個命令被發(fā)送到CommandBus中,CommandBus通過CommandHandlerFactory找到該Command的處理程序,此時在CommandBus的Send方法中,我們有一個Command和CommandHandler,然后調(diào)用CommandHandler的Execute方法,即完成了該方法的處理。至此,Command的處理流程完結(jié)。

      CreateItemCommand和CreateItemCommandHandler

      我們來看一下CreateItemCommand的代碼:

      public class CreateItemCommand : Command
      {
          public string Title { get; internal set; }
          public string Description { get; internal set; }
          public DateTime From { get; internal set; }
          public DateTime To { get; internal set; }
      
          public CreateItemCommand(Guid aggregateId, string title,
              string description, int version, DateTime from, DateTime to)
              : base(aggregateId, version)
          {
              Title = title;
              Description = description;
              From = from;
              To = to;
          }
      }
      

      它繼承自Command基類,繼承后即擁有了Id和Version屬性,然后又定義了幾個其它的屬性。它只包含數(shù)據(jù),與該命令對應(yīng)的處理程序叫做CreateItemCommandHandler,代碼如下:

      public class CreateItemCommandHandler : ICommandHandler<CreateItemCommand>
      {
          private IRepository<DiaryItem> _repository;
      
          public CreateItemCommandHandler(IRepository<DiaryItem> repository)
          {
              _repository = repository;
          }
      
          public void Execute(CreateItemCommand command)
          {
              if (command == null)
              {
                  throw new Exception();
              }
              if (_repository == null)
              {
                  throw new Exception();
              }
              var aggregate = new DiaryItem(command.Id, command.Title, command.Description, command.From, command.To);
              aggregate.Version = -1;
              _repository.Save(aggregate, aggregate.Version);
          }
      }
      

      這才是我們要分析的核心,在Handler中,我們看到了Repository,看到了DiaryItem聚合:

      • IRepository:倉儲類,代表數(shù)據(jù)的儲存方式,通過倉儲能夠進(jìn)行數(shù)據(jù)操作
      • DiaryItem:領(lǐng)域?qū)ο螅酆细袛?shù)據(jù)狀態(tài)的變更只能通過聚合根來修改

      在上面的代碼中,由于是新增,所以聚合的版本為-1,然后調(diào)用倉儲的Save方法進(jìn)行保存。我們繼續(xù)往下扒,看看倉儲和聚合的實(shí)現(xiàn)。

      Repository

      對于Repository的定義,仍然先看一下接口中的定義,代碼如下:

      public interface IRepository<T> where T : AggregateRoot, new()
      {
          void Save(AggregateRoot aggregate, int expectedVersion);
          T GetById(Guid id);
      }
      

      在倉儲中只有兩個方法:

      • Save(AggregateRoot aggregate, int expectedVersion):保存期望版本的聚合根
      • GetById(Guid id):根據(jù)聚合根Id獲取聚合根

      關(guān)于IRepository的實(shí)現(xiàn),代碼在Repository.cs中,我們拆開來進(jìn)行介紹:

      private readonly IEventStorage _eventStorage;
      private static object _lock = new object();
      
      public Repository(IEventStorage eventStorage)
      {
          _eventStorage = eventStorage;
      }
      

      首先是它的構(gòu)造函數(shù),強(qiáng)依賴IEventStorage,通過構(gòu)造函數(shù)注入。EventStorage是事件的儲存?zhèn)}庫,有個更為熟知的名字EventStore,我們稍后進(jìn)行介紹。

      public T GetById(Guid id)
      {
          IEnumerable<Event> events;
          var memento = _eventStorage.GetMemento<BaseMemento>(id);
          if (memento != null)
          {
              events = _eventStorage.GetEvents(id).Where(e => e.Version >= memento.Version);
          }
          else
          {
              events = _eventStorage.GetEvents(id);
          }
          var obj = new T();
          if (memento != null)
          {
              ((IOriginator)obj).SetMemento(memento);
          }
          obj.LoadsFromHistory(events);
          return obj;
      }
      

      GetById(Guid id)方法通過Id獲取一個聚合對象,獲取一個聚合對象有以下幾個步驟:

      • 首先會從EventStorage中獲取到該聚合的快照(memento的翻譯為記憶碎片、紀(jì)念品、備忘錄,用來聚合對象的快照)。
      • 加載Event列表,加載到的事件列表將用來做事件回放。

      如果獲取到快照的話,則加載版本高于該快照版本的事件列表,如果沒有獲取到快照,則加載全部事件列表。此處在上面已經(jīng)介紹過,通過快照的方式保存聚合對象,在獲取數(shù)據(jù)時可以減少重放事件的數(shù)量,起到提高加載速度的作用。

      • 實(shí)例化聚合根,對應(yīng)代碼中的var obj = new T();
      • 從快照中設(shè)置聚合根的狀態(tài)。在獲取到快照以后,如果快照不為空,則調(diào)用聚合根的SetMemento方法設(shè)置為快照中的狀態(tài),SetMemento方法定義在IOriginator接口中,聚合根需要實(shí)現(xiàn)該接口。
      • 加載歷史事件,完成重放。完成這個步驟以后,聚合根將更新到最新狀態(tài)。

      通過這幾個步驟以后,我們得到了一個最新狀態(tài)的聚合根對象。

      public void Save(AggregateRoot aggregate, int expectedVersion)
      {
          if (aggregate.GetUncommittedChanges().Any())
          {
              lock (_lock)
              {
                  var item = new T();
                  if (expectedVersion != -1)
                  {
                      item = GetById(aggregate.Id);
                      if (item.Version != expectedVersion)
                      {
                          throw new Exception();
                      }
                  }
                  _eventStorage.Save(aggregate);
              }
          }
      }
      

      Save方法,用來保存一個聚合根對象。在這個方法中,參數(shù)expectedVersion表示期望的版本,這里約定-1為新增的聚合根,當(dāng)聚合根為新增的時候,會直接調(diào)用EventStorage中的Save方法。

      關(guān)于expectedVersion參數(shù),我們可以理解為對并發(fā)的控制,只有當(dāng)expectedVersion與GetById獲取到的聚合根對象的版本相同時才能進(jìn)行保存操作。

      在介紹Repository類的時候,我們接觸了兩個新的概念:EventStorage和AggregateRoot,接下來我們分別進(jìn)行介紹。

      AggregateRoot

      AggregateRoot是聚合根,他表示一組強(qiáng)關(guān)聯(lián)的領(lǐng)域?qū)ο螅袑ο蟮臓顟B(tài)變更只能通過聚合根來完成,這樣可以保證數(shù)據(jù)的一致性,以及減少并發(fā)沖突。應(yīng)用到EventSourcing模式中,聚合根的好處也是很明顯的,我們所有對數(shù)據(jù)狀態(tài)的變更都通過聚合根完成,而每次變更,聚合根都會生成相應(yīng)的事件,在進(jìn)行事件回放的時候,又通過聚合根來完成歷史事件的加載。由此我們可以看到,聚合根對象應(yīng)該具備生成事件、重放事件的能力。

      我們來看看聚合根基類的定義,在Domain文件夾中:

      public abstract class AggregateRoot : IEventProvider{
          // ......
      }
      

      首先這是一個抽象類,實(shí)現(xiàn)了IEventProvider接口,該接口的定義如下:

      public interface IEventProvider
      {
          void LoadsFromHistory(IEnumerable<Event> history);
          IEnumerable<Event> GetUncommittedChanges();
      }
      

      它定義了兩個方法,我們分別進(jìn)行說明:

      • LoadsFromHistory()方法:加載歷史事件,還原聚合根的最新狀態(tài),我們在Repository中已經(jīng)用過這個方法。
      • GetUncommittedChanges()方法:獲取未提交的事件。一個命令可能造成聚合根發(fā)生多次更改,每次更改都會產(chǎn)生一個事件,這些事件被暫時的保存在聚合根對象中,通過該方法可以獲取到未提交的事件列表。

      為了實(shí)現(xiàn)這個接口,聚合根中定義了 List<Event> _changes對象,用來臨時存儲所有未提交的事件,該對象在構(gòu)造函數(shù)中進(jìn)行初始化。

      AggregateRoot中對于該事件的實(shí)現(xiàn)如下:

      public void LoadsFromHistory(IEnumerable<Event> history)
      {
          foreach (var e in history)
          {
              ApplyChange(e, false);
          }
          Version = history.Last().Version;
          EventVersion = Version;
      }
      
      public IEnumerable<Event> GetUncommittedChanges()
      {
          return _changes;
      }
      

      LoadsFromHistory方法遍歷歷史事件,并調(diào)用ApplyChange方法更新聚合根的狀態(tài),在完成更新后設(shè)置版本號為最后一個事件的版本。GetUncommittedChanges方法比較簡單,返回對象的_changes事件列表。

      接下來我們看看ApplyChange方法,該方法有兩個實(shí)現(xiàn),代碼如下:

      protected void ApplyChange(Event @event)
      {
          ApplyChange(@event, true);
      }
      
      protected void ApplyChange(Event @event, bool isNew)
      {
          dynamic d = this;
          d.Handle(Converter.ChangeTo(@event, @event.GetType()));
          if (isNew)
          {
              _changes.Add(@event);
          }
      }
      

      這兩個方法定義為protected,只能被子類訪問。我們可以理解為,ApplyChange(Event @event)方法為簡化操作,對第二個參數(shù)進(jìn)行了默認(rèn)為true的操作,然后調(diào)用ApplyChange(Event @event, bool isNew)方法。

      在ApplyChange(Event @event, bool isNew)方法中,調(diào)用了聚合根的Handle方法,用來處理事件。如果isNew參數(shù)為true,則將事件添加到change列表中,如果為false,則認(rèn)為是在進(jìn)行事件回放,所以不進(jìn)行事件的添加。

      需要注意的是,聚合根的Handle方法,與EventHandler不同,當(dāng)Event產(chǎn)生以后,首先由它對應(yīng)的聚合根進(jìn)行處理,因此聚合根要具備處理該事件的能力,如何具備呢?聚合根要實(shí)現(xiàn)IHandle接口,該接口的定義如下:

      public interface IHandle<TEvent> where TEvent:Event
      {
          void Handle(TEvent e);
      }
      

      這里可以看出,IHandle接口是泛型的,它只對一個具體的Event類型生效,在代碼上的體現(xiàn)如下:

      public class DiaryItem : AggregateRoot,
          IHandle<ItemCreatedEvent>,
          IHandle<ItemRenamedEvent>,
          IHandle<ItemFromChangedEvent>,
          IHandle<ItemToChangedEvent>,
          IHandle<ItemDescriptionChangedEvent>,
          IOriginator
      {
          //......
      }
      

      最后,聚合根還定義了清除所有事件的方法,代碼如下:

      public void MarkChangesAsCommitted()
      {
          _changes.Clear();
      }
      

      MarkChangesAsCommitted()方法用來清空事件列表。

      Event

      終于到我們今天的另外一個核心內(nèi)容了,Event是ES中的一等公民,所有的狀態(tài)變更最終都以Event的形式進(jìn)行存儲,當(dāng)我們要查看聚合根最新狀態(tài)的時候,可以通過事件回放來獲取。我們來看看Event的定義:

      public interface IEvent
      {
          Guid Id { get; }
      }
      

      IEvent接口定義了一個事件必須擁有唯一的Id進(jìn)行標(biāo)識。然后Event實(shí)現(xiàn)了IEvent接口:

      public class Event:IEvent
      {
          public int Version;
          public Guid AggregateId { get; set; }
          public Guid Id { get; private set; }
      }
      

      可以看到,除了Id屬性外,還添加了兩個字段Version和AggregateId。AggregateId表示該事件關(guān)聯(lián)的聚合根Id,通過該Id可以獲取到唯一的聚合根對象;Version表示事件發(fā)生時該事件的版本,每次產(chǎn)生新的事件,Version都會進(jìn)行累加。

      從而可以知道,在EventStorage中,聚合根Id對應(yīng)的所有Event中的Version是順序累加的,按照Version進(jìn)行排序可以得到事件發(fā)生的先后順序。

      EventStorage

      顧名思義,EventStorage是用來存儲Event的地方。在Diary.CQRS中,EventStorage的定義如下:

      public interface IEventStorage
      {
          IEnumerable<Event> GetEvents(Guid aggregateId);
          void Save(AggregateRoot aggregate);
          T GetMemento<T>(Guid aggregateId) where T : BaseMemento;
          void SaveMemento(BaseMemento memento);
      }
      
      • GetEvents(Guid aggregateId):根據(jù)聚合根Id獲取該聚合根的所有事件
      • Save(AggregateRoot aggregate):保存方法,入?yún)榫酆细鶎ο螅趯?shí)現(xiàn)上則是獲取聚合根中所有未提交的事件,隨后對這些事件進(jìn)行處理
      • GetMemento():獲取快照
      • SaveMemento():存儲快照

      Diary.CQRS中使用InMemory的方式實(shí)現(xiàn)了EventStorage,屬性和構(gòu)造函數(shù)如下:

      private List<Event> _events;
      private List<BaseMemento> _mementoes;
      private readonly IEventBus _eventBus;
      
      public InMemoryEventStorage(IEventBus eventBus)
      {
          _events = new List<Event>();
          _mementoes = new List<BaseMemento>();
          _eventBus = eventBus;
      }
      
      • _events:事件列表,內(nèi)存中存儲事件的位置,所有事件最終都會存儲在該列表中
      • _mementoes:快照列表,用于存儲聚合根的某個事件版本的狀態(tài)
      • _eventBus:事件總線,用于發(fā)布任務(wù)

      當(dāng)Event生成后,它并沒有馬上存入EventStorage,而是在Repository顯示調(diào)用Save方法時,倉儲將存儲權(quán)交給了EventStorage,EventStorage是事件倉庫,事件倉儲在存儲時進(jìn)行了如下操作:

      • 獲取聚合根中所有未提交的Event,同時獲取到聚合根當(dāng)前的版本號
      • 遍歷未提交Event列表,根據(jù)聚合根版本號自動為Event生成版本號,保持自增長的特性;
      • 生成聚合根快照。示例中每3個版本生成一次,并保持到事件倉儲中。
      • 將任務(wù)添加到事件倉庫中。
      • 再次遍歷未提交Event列表,此時將進(jìn)行任務(wù)發(fā)布,調(diào)用事件總線的Publish方法進(jìn)行發(fā)布。

      Save方法的代碼如下:

      public void Save(AggregateRoot aggregate)
      {
          var uncommittedChanges = aggregate.GetUncommittedChanges();
          var version = aggregate.Version;
      
          foreach (var @event in uncommittedChanges)
          {
              version++;
              if (version > 2)
              {
                  if (version % 3 == 0)
                  {
                      var originator = (IOriginator)aggregate;
                      var memento = originator.GetMemento();
                      memento.Version = version;
                      SaveMemento(memento);
                  }
              }
              @event.Version = version;
              _events.Add(@event);
          }
          foreach (var @event in uncommittedChanges)
          {
              var desEvent = Converter.ChangeTo(@event, @event.GetType());
              _eventBus.Publish(desEvent);
          }
      }
      

      至此Event的處理流程就算完結(jié)了。此時所有的操作都是在主庫完成的,當(dāng)事件被發(fā)布以后,訂閱了該事件的所有Handler都將會被觸發(fā)。

      在Diary.CQRS項目中,EventHandler都被用來處理ReportDatabase了。

      ReportDatabase

      當(dāng)你使用ES模式時,都存在一個嚴(yán)重問題,那就是數(shù)據(jù)查詢的問題。當(dāng)用戶進(jìn)行數(shù)據(jù)檢索是,必然會使用各種查詢條件,然而無論那種事件倉庫都很難滿足復(fù)雜查詢。為了解決此問題,ReportDatabase就顯得格外重要。

      ReportDatabase的作用被定義為獲取數(shù)據(jù)、應(yīng)對數(shù)據(jù)查詢、生成報表等,它的結(jié)構(gòu)與主庫不同,可以根據(jù)不同的業(yè)務(wù)場景進(jìn)行定義。

      ReportDatabase的數(shù)據(jù)不是通過業(yè)務(wù)邏輯進(jìn)行更新的,它通過訂閱Event進(jìn)行更新。在本示例中ReportDatabase實(shí)現(xiàn)的很簡單,接口定義如下:

      public interface IReportDatabase
      {
          DiaryItemDto GetById(Guid id);
          void Add(DiaryItemDto item);
          void Delete(Guid id);
          List<DiaryItemDto> GetItems();
      }
      

      實(shí)現(xiàn)上,通過內(nèi)存中維護(hù)一個列表,每次接收到事件以后,都對相應(yīng)數(shù)據(jù)進(jìn)行更新,此處不在貼出。

      EventHandler、EventHandlerFactory和EventBus

      在上文中已經(jīng)介紹過Event,而針對Event的處理,實(shí)現(xiàn)邏輯上與Command非常相似,唯一的區(qū)別是,命令只可以有一個對應(yīng)的處理程序,而事件則可以有多個處理程序。所以在EventHandlerFactory中獲取處理程序的方法返回了EventHandler列表,代碼如下:

      public IEnumerable<IEventHandler<T>> GetHandlers<T>() where T : Event
      {
          var handlers = GetHandlerType<T>();
      
          var lstHandlers = handlers.Select(handler => (IEventHandler<T>)ObjectFactory.GetInstance(handler)).ToList();
          return lstHandlers;
      }
      

      在EventBus中,如果一個事件沒有處理程序也不會引發(fā)錯誤,如果有一個或多個處理程序,則會以此調(diào)用他們的Handle方法,代碼如下:

      public void Publish<T>(T @event) where T : Event
      {
          var handlers = _eventHandlerFactory.GetHandlers<T>();
          foreach (var eventHandler in handlers)
          {
              eventHandler.Handle(@event);
          }
      }
      

      總結(jié)

      Diary.CQRS是一個典型的CQRS+ES演示項目,通過對該項目的分析,我們能了解到Command、AggregateRoot、Event、EventStorage、ReportDatabase的基礎(chǔ)知識,了解他們相互關(guān)系,尤其是如何進(jìn)行事件存儲、如何進(jìn)行事件回放的內(nèi)容。

      另外,我們發(fā)現(xiàn)在使用CQRS+ES的過程中,項目的復(fù)雜度增加了很多,我們不可避免的要使用EventStore、Messaging等架構(gòu),從而影響那些不了解CQRS的團(tuán)隊成員的加入,因此在應(yīng)用到實(shí)際項目的時候,要適可而止,慎重選擇,避免過度設(shè)計。

      由于這是一個示例,項目代碼中存在很多不夠嚴(yán)謹(jǐn)?shù)牡胤剑蠹以趯W(xué)習(xí)的過程中應(yīng)進(jìn)行甄別。

      由于本人的知識有限,如果內(nèi)容中存在不準(zhǔn)確或錯誤的地方,還請不吝賜教!

      posted @ 2019-06-24 00:00  拓荒者IT  閱讀(1791)  評論(2)    收藏  舉報
      皮膚配置 參考地址:https://www.yuque.com/awescnb/user
      主站蜘蛛池模板: 久久综合色之久久综合色| 新竹市| 大屁股国产白浆一二区| 精品久久久久国产免费| 国产日韩一区二区天美麻豆 | 国产精品熟妇视频国产偷人| 孟村| 国产精品午夜福利合集| 性动态图无遮挡试看30秒| 亚洲高潮喷水无码AV电影| 国产粉嫩区一区二区三区| 三级国产在线观看| 人妻教师痴汉电车波多野结衣| 湘阴县| 国产黑色丝袜在线播放| 国产欧美另类精品久久久| 国产一区二区高潮视频| 最新亚洲av日韩av二区| 偷拍激情视频一区二区三区| 性做久久久久久久久| 人人妻人人澡人人爽欧美一区双 | 亚洲精品久久国产高清| 女同在线观看亚洲国产精品| 亚洲国产精品无码一区二区三区| 国产明星精品无码AV换脸| 国产精品午夜福利免费看| 中文字幕日韩一区二区不卡 | 嫩草成人AV影院在线观看| 99精品国产一区二区三| 国产极品美女高潮无套| 狠狠色噜噜狠狠狠狠av不卡| 日韩亚洲中文图片小说| 少妇人妻无码专区在线视频| 亚洲av日韩av综合在线观看| 免费久久人人爽人人爽AV| 亚洲午夜天堂| 成人国产精品免费网站| 男女爽爽无遮挡午夜视频| 国产真实野战在线视频| 精品一区二区三区少妇蜜臀| 无码专区 人妻系列 在线|