Orleans基礎(chǔ)知識(shí)以及使用示例
Orleans簡(jiǎn)介
Orleans 是一個(gè)與ABP齊名,支持有狀態(tài)云生應(yīng)用/服務(wù)水平伸縮的基于Virtual Actor 模型的.NET分布式應(yīng)用框架。
Actor模型
簡(jiǎn)單來(lái)講:Actor模型 = 狀態(tài) + 行為 + 消息。一個(gè)應(yīng)用/服務(wù)由多個(gè)Actor組成,每個(gè)Actor都是一個(gè)獨(dú)立的運(yùn)行單元,擁有隔離的運(yùn)行空間,在隔離的空間內(nèi),其有獨(dú)立的狀態(tài)和行為,不被外界干預(yù),Actor之間通過(guò)消息進(jìn)行交互,而同一時(shí)刻,每個(gè)Actor只能被單個(gè)線程執(zhí)行,這樣既有效避免了數(shù)據(jù)共享和并發(fā)問(wèn)題,又確保了應(yīng)用的伸縮性。
另外Actor基于事件驅(qū)動(dòng)模型進(jìn)行異步通信,性能良好。且位置透明,無(wú)論Actor是在本機(jī)亦或是在集群中的其他機(jī)器,都可以直接進(jìn)行透明調(diào)用。
因此Actor模型賦予了應(yīng)用/服務(wù)的生命力(有狀態(tài))、高并發(fā)的處理能力和彈性伸縮能力。
然而Actor模型作為一個(gè)偏底層的技術(shù)框架,對(duì)于開(kāi)發(fā)者來(lái)說(shuō),需要有一定分布式應(yīng)用的開(kāi)發(fā)經(jīng)驗(yàn),才能用好Actor(包括Actor的生命周期管理,狀態(tài)管理等等)。為了進(jìn)一步簡(jiǎn)化分布式編程,微軟的研究人員引入了 Virtual Actor 模型概念,簡(jiǎn)單來(lái)講Virtual Actor模型是對(duì)Actor模型的進(jìn)一步封裝和抽象。 其與Actor模型的最大的區(qū)別在于,Actor的物理實(shí)例完全被抽象出來(lái),并由Virtual Actor所在的運(yùn)行時(shí)自動(dòng)管理。
Orleans 就是作為一款面向.NET的Virtual Actor模型的實(shí)現(xiàn)框架,提供了開(kāi)發(fā)者友好的編程方式,簡(jiǎn)化了分布式應(yīng)用的開(kāi)發(fā)成本。在Orleans中Virtual Actor由Grain來(lái)體現(xiàn)。
Orleans框架的基本構(gòu)成
Grains(顆粒)
Orleans模型里,每一個(gè)actor有專門的類代表它,叫做Grain,這個(gè)grain類就是模擬通信場(chǎng)景中的”人”
所有通過(guò)Orleans建立的應(yīng)用程序的基本單位都是Grains. 也可以理解為任何Orleans程序都是由一個(gè)一個(gè)的Grain組成的. Grain是一個(gè)由用戶自定義標(biāo)識(shí),行為和狀態(tài)組成的實(shí)體. 標(biāo)識(shí)是用戶自定義的鍵(Key),其他應(yīng)用程序或Grain通過(guò)鍵來(lái)調(diào)用該Grain. Grains是通過(guò)強(qiáng)類型接口(協(xié)議)與其他Grains或客戶端進(jìn)行通信. Graint是實(shí)現(xiàn)一個(gè)或多個(gè)這些接口的實(shí)例.
Orleans為了解決多線程帶來(lái)的“資源競(jìng)爭(zhēng)”等問(wèn)題,在Orleans框架內(nèi),它保證每個(gè)grain類符合以下行為規(guī)范:
A. 發(fā)往同一個(gè)grain類實(shí)例的任何消息都會(huì)在固定線程內(nèi)執(zhí)行。
B. grain類按照接受消息的先后,依次處理消息。在任意時(shí)間點(diǎn),一個(gè)grain實(shí)例只處理一個(gè)消息。
C. grain實(shí)例內(nèi)的字段屬性,只能由實(shí)例本身訪問(wèn)。外界不能訪問(wèn)。
Silos(筒倉(cāng))
Silos是Orleans運(yùn)行時(shí)的主要組件,Silos 是托管和執(zhí)行 Grains 的容器。Orleans 通過(guò) Silos 創(chuàng)建和管理 Grains 對(duì)象,并且執(zhí)行 Grains 對(duì)象,客戶端僅通過(guò) Grains 定義的接口去調(diào)用。從而將 Grains 的對(duì)象狀態(tài)封裝起來(lái),只公開(kāi) Grains 聲明的接口方法。
Orleans 運(yùn)行時(shí)會(huì)根據(jù)需要自行實(shí)例化或管理 Grains 對(duì)象。會(huì)將長(zhǎng)期不使用的 Grains 對(duì)象從內(nèi)存中釋放。當(dāng) Grain 出現(xiàn)異常時(shí)會(huì)自動(dòng)恢復(fù)。Orleans 運(yùn)行時(shí)會(huì)自動(dòng)管理 Grain 的整個(gè)生命周期,使得開(kāi)發(fā)人員可以專注業(yè)務(wù)開(kāi)發(fā)中。
通常, 一組silo是以集群方式運(yùn)行的, 并以此來(lái)實(shí)現(xiàn)可伸縮性和容錯(cuò)性. 當(dāng)這些silo作為集群方式運(yùn)行的時(shí)候,silo之間彼此協(xié)調(diào)分配工作, 檢測(cè)故障以及故障恢復(fù). Orleans運(yùn)行時(shí)使得集群中的Grian能夠像在一個(gè)進(jìn)程中一樣彼此相互通信.
Clients(客戶端)
客戶端又稱 Grains 客戶端,即調(diào)用 Grains 程序代碼。客戶端分為兩種:一種與 Silos 存在相同進(jìn)程中,即共同托管的客戶端;另外一種是運(yùn)行 Silos 外的進(jìn)程中,即外部客戶端。
簡(jiǎn)單Orleans持久化解決方案示例
項(xiàng)目環(huán)境:.Net Core3.1+MySql+Orleans
示例項(xiàng)目源碼地址:https://github.com/wswind/learn-orleans

此示例采用的是03.MultiGrain
- 包含 Grains 接口的類庫(kù) —— GrainInterfaces
- 包含 Grains 類庫(kù) —— Grains
- Silos 控制臺(tái)應(yīng)用程序 —— Silo
- Client 控制臺(tái)應(yīng)用程序 —— Client
Silo項(xiàng)目
作為Orleans服務(wù)端程序,單獨(dú)啟動(dòng),依賴Microsoft.Orleans.Server、Microsoft.Orleans.EventSourcing包,如果需要使用Mysql實(shí)現(xiàn)持久化存儲(chǔ),則還需依賴MySql.Data包。主要相關(guān)配置代碼如下:
private static async Task<ISiloHost> StartSilo() { // define the cluster configuration var builder = new SiloHostBuilder() // 因?yàn)槭潜镜亻_(kāi)發(fā), silo 使用 localhost 集群 .UseLocalhostClustering() //配置存儲(chǔ)提供程序--內(nèi)存存儲(chǔ) .AddMemoryGrainStorage("DevStore") //配置存儲(chǔ)提供程序--AdoNet持久化存儲(chǔ) .AddAdoNetGrainStorage("OrleansStorage", options => { //options.Invariant = "System.Data.SqlClient"; options.Invariant = "MySql.Data.MySqlClient"; //options.ConnectionString = "Server=.;Database=o3;Trusted_Connection=True;"; options.ConnectionString = "Server=localhost;DataBase=graintest;uid=root;pwd=12345678;pooling=true;port=3306;CharSet=utf8mb3;sslMode=None;"; options.UseJsonFormat = true;//指定使用Json格式序列化存儲(chǔ)grain狀態(tài) }) //配置集群Id 和 服務(wù)Id .Configure<ClusterOptions>(options => { options.ClusterId = "dev";//獲取或設(shè)置群集標(biāo)識(shí) options.ServiceId = "OrleansBasics";//獲取或設(shè)置此服務(wù)的唯一標(biāo)識(shí)符,該標(biāo)識(shí)符應(yīng)在部署和重新部署后繼續(xù)存在 }) //應(yīng)用程序部分:只需引用我們使用的 Grain 實(shí)現(xiàn) .ConfigureApplicationParts(parts => parts.AddApplicationPart(typeof(HelloGrain).Assembly).WithReferences()) //配置日志輸出到控制臺(tái) .ConfigureLogging(logging => logging.AddConsole()); var host = builder.Build(); await host.StartAsync(); return host; }
GrainInterfaces項(xiàng)目
作為Orleans項(xiàng)目的公共接口,依賴Microsoft.Orleans.Core.Abstractions、Microsoft.Orleans.CodeGenerator.MSBuild、Microsoft.Orleans.Core包
定義Grain的行為接口:IHello : Orleans.IGrainWithIntegerKey
public interface IHello : Orleans.IGrainWithIntegerKey { Task<string> SayHello(string greeting); Task AddCount(); Task<int> GetCount(); }
Grains項(xiàng)目
作為Orleans的Grains的具體實(shí)現(xiàn),依賴Microsoft.Orleans.Persistence.AdoNet、Microsoft.Orleans.CodeGenerator.MSBuild和Microsoft.Orleans.Core.Abstractions包。
定義需要持久化存儲(chǔ)的Grain狀態(tài)數(shù)據(jù)類:PersistentData
public class PersistentData { public int Count { get; set; } }
定義具體的Grain實(shí)體類:HelloGrain : Grain<PersistentData>, IHello
從Grain <T>繼承的Grain類(其中T是需要持久化的特定于應(yīng)用程序的狀態(tài)數(shù)據(jù)類型)將從指定的存儲(chǔ)區(qū)自動(dòng)加載它們的狀態(tài)。
同時(shí)需要指定持久化存儲(chǔ)的配置名稱[StorageProvider(ProviderName= "OrleansStorage")]
并重寫OnActivateAsync()和OnDeactivateAsync()方法,確保Grain實(shí)例在每次激活和未激活狀態(tài)下的數(shù)據(jù)持久化。
[StorageProvider(ProviderName= "OrleansStorage")] public class HelloGrain : Grain<PersistentData>, IHello { private readonly ILogger logger; public override Task OnActivateAsync() { this.ReadStateAsync(); return base.OnActivateAsync(); } public override Task OnDeactivateAsync() { this.WriteStateAsync(); return base.OnDeactivateAsync(); } public HelloGrain(ILogger<HelloGrain> logger) { this.logger = logger; } public async Task AddCount() { this.State.Count ++; await this.WriteStateAsync(); } public Task<int> GetCount() { return Task.FromResult(this.State.Count); } Task<string> IHello.SayHello(string greeting) { logger.LogInformation($"\n SayHello message received: greeting = '{greeting}'"); return Task.FromResult($"\n Client said: '{greeting}', so HelloGrain says: Hello!"); } }
Client項(xiàng)目
作為Orleans項(xiàng)目的客戶端程序,單獨(dú)啟動(dòng),負(fù)責(zé)調(diào)用Orleans的Silo服務(wù)端,依賴Microsoft.Orleans.Client包。主要相關(guān)配置代碼如下:
private static async Task<IClusterClient> ConnectClient() { IClusterClient client; client = new ClientBuilder() // 這里配置與 Silo 相同 .UseLocalhostClustering() //與 Silo 配置的服務(wù)一樣,否則客戶端會(huì)連接失敗 .Configure<ClusterOptions>(options => { options.ClusterId = "dev"; options.ServiceId = "OrleansBasics"; }) .ConfigureLogging(logging => logging.AddConsole()) .Build(); await client.Connect(); Console.WriteLine("Client successfully connected to silo host \n"); return client; }
創(chuàng)建好客戶端連接之后,即可通過(guò)客戶端連接執(zhí)行多個(gè)Grains的操作(節(jié)省篇幅,只列出部分關(guān)鍵代碼)
private static async Task DoClientWork(IClusterClient client) { var client1 = client.GetGrain<IHello>(0); var client2 = client.GetGrain<IHello>(1); //https://dotnet.github.io/orleans/Documentation/grains/grain_identity.html var id1 = client1.GetGrainIdentity().GetPrimaryKeyLong(out string keyExt); var id2 = client2.GetGrainIdentity().GetPrimaryKeyLong(out string keyExt2); Console.WriteLine(id1); Console.WriteLine(keyExt); Console.WriteLine(id2); Console.WriteLine(keyExt2); await client1.AddCount(); var count1 = await client1.GetCount(); Console.WriteLine("count1:{0}", count1); await client2.AddCount(); var count2 = await client2.GetCount(); Console.WriteLine("count2:{0}", count2); await client2.AddCount(); var count3 = await client2.GetCount(); Console.WriteLine("count3:{0}", count3); }
數(shù)據(jù)庫(kù)初始化
要使Orleans代碼在給定的關(guān)系數(shù)據(jù)庫(kù)后端發(fā)揮作用,還需要初始化一個(gè)對(duì)應(yīng)的數(shù)據(jù)庫(kù),并與代碼兼容。這是通過(guò)運(yùn)行供應(yīng)商特定的數(shù)據(jù)庫(kù)創(chuàng)建腳本來(lái)完成的。這些腳本位于OrleansSqlUtils NuGet包中,隨每個(gè)Orleans版本一起發(fā)布。目前有兩個(gè)數(shù)據(jù)庫(kù)腳本:
- SQL Server -
CreateOrleansTables_SqlServer.sql。AdoInvariant是System.Data.SqlClient。
- MySQL -
CreateOrleansTables_MySql.sql。AdoInvariant是MySql.Data.MySqlClient。
但因?yàn)榇舜问纠窃贛ac上使用Rider開(kāi)發(fā),不方便打開(kāi)NuGet包中的內(nèi)容,可以通過(guò)以下地址找到對(duì)應(yīng)數(shù)據(jù)庫(kù)初始化腳本:
https://github.com/dotnet/orleans/tree/main/src/AdoNet/Shared/MySQL-Main.sql
/* Implementation notes: 1) The general idea is that data is read and written through Orleans specific queries. Orleans operates on column names and types when reading and on parameter names and types when writing. 2) The implementations *must* preserve input and output names and types. Orleans uses these parameters to reads query results by name and type. Vendor and deployment specific tuning is allowed and contributions are encouraged as long as the interface contract is maintained. 3) The implementation across vendor specific scripts *should* preserve the constraint names. This simplifies troubleshooting by virtue of uniform naming across concrete implementations. 5) ETag for Orleans is an opaque column that represents a unique version. The type of its actual implementation is not important as long as it represents a unique version. In this implementation we use integers for versioning 6) For the sake of being explicit and removing ambiguity, Orleans expects some queries to return either TRUE as >0 value or FALSE as =0 value. That is, affected rows or such does not matter. If an error is raised or an exception is thrown the query *must* ensure the entire transaction is rolled back and may either return FALSE or propagate the exception. Orleans handles exception as a failure and will retry. 7) The implementation follows the Extended Orleans membership protocol. For more information, see at: https://docs.microsoft.com/dotnet/orleans/implementation/cluster-management https://github.com/dotnet/orleans/blob/main/src/Orleans.Core/SystemTargetInterfaces/IMembershipTable.cs */ -- This table defines Orleans operational queries. Orleans uses these to manage its operations, -- these are the only queries Orleans issues to the database. -- These can be redefined (e.g. to provide non-destructive updates) provided the stated interface principles hold. CREATE TABLE OrleansQuery ( QueryKey VARCHAR(64) NOT NULL, QueryText VARCHAR(8000) NOT NULL, CONSTRAINT OrleansQuery_Key PRIMARY KEY(QueryKey) );
https://github.com/dotnet/orleans/blob/main/src/AdoNet/Orleans.Persistence.AdoNet/MySQL-Persistence.sql
-- The design criteria for this table are: -- -- 1. It can contain arbitrary content serialized as binary, XML or JSON. These formats -- are supported to allow one to take advantage of in-storage processing capabilities for -- these types if required. This should not incur extra cost on storage. -- -- 2. The table design should scale with the idea of tens or hundreds (or even more) types -- of grains that may operate with even hundreds of thousands of grain IDs within each -- type of a grain. -- -- 3. The table and its associated operations should remain stable. There should not be -- structural reason for unexpected delays in operations. It should be possible to also -- insert data reasonably fast without resource contention. -- -- 4. For reasons in 2. and 3., the index should be as narrow as possible so it fits well in -- memory and should it require maintenance, isn't resource intensive. For this -- reason the index is narrow by design (ideally non-clustered). Currently the entity -- is recognized in the storage by the grain type and its ID, which are unique in Orleans silo. -- The ID is the grain ID bytes (if string type UTF-8 bytes) and possible extension key as UTF-8 -- bytes concatenated with the ID and then hashed. -- -- Reason for hashing: Database engines usually limit the length of the column sizes, which -- would artificially limit the length of IDs or types. Even when within limitations, the -- index would be thick and consume more memory. -- -- In the current setup the ID and the type are hashed into two INT type instances, which -- are made a compound index. When there are no collisions, the index can quickly locate -- the unique row. Along with the hashed index values, the NVARCHAR(nnn) values are also -- stored and they are used to prune hash collisions down to only one result row. -- -- 5. The design leads to duplication in the storage. It is reasonable to assume there will -- a low number of services with a given service ID operational at any given time. Or that -- compared to the number of grain IDs, there are a fairly low number of different types of -- grain. The catch is that were these data separated to another table, it would make INSERT -- and UPDATE operations complicated and would require joins, temporary variables and additional -- indexes or some combinations of them to make it work. It looks like fitting strategy -- could be to use table compression. -- -- 6. For the aforementioned reasons, grain state DELETE will set NULL to the data fields -- and updates the Version number normally. This should alleviate the need for index or -- statistics maintenance with the loss of some bytes of storage space. The table can be scrubbed -- in a separate maintenance operation. -- -- 7. In the storage operations queries the columns need to be in the exact same order -- since the storage table operations support optionally streaming. CREATE TABLE OrleansStorage ( -- These are for the book keeping. Orleans calculates -- these hashes (see RelationalStorageProvide implementation), -- which are signed 32 bit integers mapped to the *Hash fields. -- The mapping is done in the code. The -- *String columns contain the corresponding clear name fields. -- -- If there are duplicates, they are resolved by using GrainIdN0, -- GrainIdN1, GrainIdExtensionString and GrainTypeString fields. -- It is assumed these would be rarely needed. GrainIdHash INT NOT NULL, GrainIdN0 BIGINT NOT NULL, GrainIdN1 BIGINT NOT NULL, GrainTypeHash INT NOT NULL, GrainTypeString NVARCHAR(512) NOT NULL, GrainIdExtensionString NVARCHAR(512) NULL, ServiceId NVARCHAR(150) NOT NULL, -- The usage of the Payload records is exclusive in that -- only one should be populated at any given time and two others -- are NULL. The types are separated to advantage on special -- processing capabilities present on database engines (not all might -- have both JSON and XML types. -- -- One is free to alter the size of these fields. PayloadBinary BLOB NULL, PayloadXml LONGTEXT NULL, PayloadJson LONGTEXT NULL, -- Informational field, no other use. ModifiedOn DATETIME NOT NULL, -- The version of the stored payload. Version INT NULL -- The following would in principle be the primary key, but it would be too thick -- to be indexed, so the values are hashed and only collisions will be solved -- by using the fields. That is, after the indexed queries have pinpointed the right -- rows down to [0, n] relevant ones, n being the number of collided value pairs. ) ROW_FORMAT = COMPRESSED KEY_BLOCK_SIZE = 16; ALTER TABLE OrleansStorage ADD INDEX IX_OrleansStorage (GrainIdHash, GrainTypeHash); -- The following alters the column to JSON format if MySQL is at least of version 5.7.8. -- See more at https://dev.mysql.com/doc/refman/5.7/en/json.html for JSON and -- http://dev.mysql.com/doc/refman/5.7/en/comments.html for the syntax. /*!50708 ALTER TABLE OrleansStorage MODIFY COLUMN PayloadJson JSON */; DELIMITER $$ CREATE PROCEDURE ClearStorage ( in _GrainIdHash INT, in _GrainIdN0 BIGINT, in _GrainIdN1 BIGINT, in _GrainTypeHash INT, in _GrainTypeString NVARCHAR(512), in _GrainIdExtensionString NVARCHAR(512), in _ServiceId NVARCHAR(150), in _GrainStateVersion INT ) BEGIN DECLARE _newGrainStateVersion INT; DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN ROLLBACK; RESIGNAL; END; DECLARE EXIT HANDLER FOR SQLWARNING BEGIN ROLLBACK; RESIGNAL; END; SET _newGrainStateVersion = _GrainStateVersion; -- Default level is REPEATABLE READ and may cause Gap Lock issues SET TRANSACTION ISOLATION LEVEL READ COMMITTED; START TRANSACTION; UPDATE OrleansStorage SET PayloadBinary = NULL, PayloadJson = NULL, PayloadXml = NULL, Version = Version + 1 WHERE GrainIdHash = _GrainIdHash AND _GrainIdHash IS NOT NULL AND GrainTypeHash = _GrainTypeHash AND _GrainTypeHash IS NOT NULL AND GrainIdN0 = _GrainIdN0 AND _GrainIdN0 IS NOT NULL AND GrainIdN1 = _GrainIdN1 AND _GrainIdN1 IS NOT NULL AND GrainTypeString = _GrainTypeString AND _GrainTypeString IS NOT NULL AND ((_GrainIdExtensionString IS NOT NULL AND GrainIdExtensionString IS NOT NULL AND GrainIdExtensionString = _GrainIdExtensionString) OR _GrainIdExtensionString IS NULL AND GrainIdExtensionString IS NULL) AND ServiceId = _ServiceId AND _ServiceId IS NOT NULL AND Version IS NOT NULL AND Version = _GrainStateVersion AND _GrainStateVersion IS NOT NULL LIMIT 1; IF ROW_COUNT() > 0 THEN SET _newGrainStateVersion = _GrainStateVersion + 1; END IF; SELECT _newGrainStateVersion AS NewGrainStateVersion; COMMIT; END$$ DELIMITER $$ CREATE PROCEDURE WriteToStorage ( in _GrainIdHash INT, in _GrainIdN0 BIGINT, in _GrainIdN1 BIGINT, in _GrainTypeHash INT, in _GrainTypeString NVARCHAR(512), in _GrainIdExtensionString NVARCHAR(512), in _ServiceId NVARCHAR(150), in _GrainStateVersion INT, in _PayloadBinary BLOB, in _PayloadJson LONGTEXT, in _PayloadXml LONGTEXT ) BEGIN DECLARE _newGrainStateVersion INT; DECLARE _rowCount INT; DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN ROLLBACK; RESIGNAL; END; DECLARE EXIT HANDLER FOR SQLWARNING BEGIN ROLLBACK; RESIGNAL; END; SET _newGrainStateVersion = _GrainStateVersion; -- Default level is REPEATABLE READ and may cause Gap Lock issues SET TRANSACTION ISOLATION LEVEL READ COMMITTED; START TRANSACTION; -- Grain state is not null, so the state must have been read from the storage before. -- Let's try to update it. -- -- When Orleans is running in normal, non-split state, there will -- be only one grain with the given ID and type combination only. This -- grain saves states mostly serially if Orleans guarantees are upheld. Even -- if not, the updates should work correctly due to version number. -- -- In split brain situations there can be a situation where there are two or more -- grains with the given ID and type combination. When they try to INSERT -- concurrently, the table needs to be locked pessimistically before one of -- the grains gets @GrainStateVersion = 1 in return and the other grains will fail -- to update storage. The following arrangement is made to reduce locking in normal operation. -- -- If the version number explicitly returned is still the same, Orleans interprets it so the update did not succeed -- and throws an InconsistentStateException. -- -- See further information at https://docs.microsoft.com/dotnet/orleans/grains/grain-persistence. IF _GrainStateVersion IS NOT NULL THEN UPDATE OrleansStorage SET PayloadBinary = _PayloadBinary, PayloadJson = _PayloadJson, PayloadXml = _PayloadXml, ModifiedOn = UTC_TIMESTAMP(), Version = Version + 1 WHERE GrainIdHash = _GrainIdHash AND _GrainIdHash IS NOT NULL AND GrainTypeHash = _GrainTypeHash AND _GrainTypeHash IS NOT NULL AND GrainIdN0 = _GrainIdN0 AND _GrainIdN0 IS NOT NULL AND GrainIdN1 = _GrainIdN1 AND _GrainIdN1 IS NOT NULL AND GrainTypeString = _GrainTypeString AND _GrainTypeString IS NOT NULL AND ((_GrainIdExtensionString IS NOT NULL AND GrainIdExtensionString IS NOT NULL AND GrainIdExtensionString = _GrainIdExtensionString) OR _GrainIdExtensionString IS NULL AND GrainIdExtensionString IS NULL) AND ServiceId = _ServiceId AND _ServiceId IS NOT NULL AND Version IS NOT NULL AND Version = _GrainStateVersion AND _GrainStateVersion IS NOT NULL LIMIT 1; IF ROW_COUNT() > 0 THEN SET _newGrainStateVersion = _GrainStateVersion + 1; SET _GrainStateVersion = _newGrainStateVersion; END IF; END IF; -- The grain state has not been read. The following locks rather pessimistically -- to ensure only on INSERT succeeds. IF _GrainStateVersion IS NULL THEN INSERT INTO OrleansStorage ( GrainIdHash, GrainIdN0, GrainIdN1, GrainTypeHash, GrainTypeString, GrainIdExtensionString, ServiceId, PayloadBinary, PayloadJson, PayloadXml, ModifiedOn, Version ) SELECT * FROM ( SELECT _GrainIdHash, _GrainIdN0, _GrainIdN1, _GrainTypeHash, _GrainTypeString, _GrainIdExtensionString, _ServiceId, _PayloadBinary, _PayloadJson, _PayloadXml, UTC_TIMESTAMP(), 1) AS TMP WHERE NOT EXISTS ( -- There should not be any version of this grain state. SELECT 1 FROM OrleansStorage WHERE GrainIdHash = _GrainIdHash AND _GrainIdHash IS NOT NULL AND GrainTypeHash = _GrainTypeHash AND _GrainTypeHash IS NOT NULL AND GrainIdN0 = _GrainIdN0 AND _GrainIdN0 IS NOT NULL AND GrainIdN1 = _GrainIdN1 AND _GrainIdN1 IS NOT NULL AND GrainTypeString = _GrainTypeString AND _GrainTypeString IS NOT NULL AND ((_GrainIdExtensionString IS NOT NULL AND GrainIdExtensionString IS NOT NULL AND GrainIdExtensionString = _GrainIdExtensionString) OR _GrainIdExtensionString IS NULL AND GrainIdExtensionString IS NULL) AND ServiceId = _ServiceId AND _ServiceId IS NOT NULL ) LIMIT 1; IF ROW_COUNT() > 0 THEN SET _newGrainStateVersion = 1; END IF; END IF; SELECT _newGrainStateVersion AS NewGrainStateVersion; COMMIT; END$$ DELIMITER ; INSERT INTO OrleansQuery(QueryKey, QueryText) VALUES ( 'ReadFromStorageKey', 'SELECT PayloadBinary, PayloadXml, PayloadJson, UTC_TIMESTAMP(), Version FROM OrleansStorage WHERE GrainIdHash = @GrainIdHash AND GrainTypeHash = @GrainTypeHash AND @GrainTypeHash IS NOT NULL AND GrainIdN0 = @GrainIdN0 AND @GrainIdN0 IS NOT NULL AND GrainIdN1 = @GrainIdN1 AND @GrainIdN1 IS NOT NULL AND GrainTypeString = @GrainTypeString AND GrainTypeString IS NOT NULL AND ((@GrainIdExtensionString IS NOT NULL AND GrainIdExtensionString IS NOT NULL AND GrainIdExtensionString = @GrainIdExtensionString) OR @GrainIdExtensionString IS NULL AND GrainIdExtensionString IS NULL) AND ServiceId = @ServiceId AND @ServiceId IS NOT NULL LIMIT 1;' ); INSERT INTO OrleansQuery(QueryKey, QueryText) VALUES ( 'WriteToStorageKey',' call WriteToStorage(@GrainIdHash, @GrainIdN0, @GrainIdN1, @GrainTypeHash, @GrainTypeString, @GrainIdExtensionString, @ServiceId, @GrainStateVersion, @PayloadBinary, @PayloadJson, @PayloadXml);' ); INSERT INTO OrleansQuery(QueryKey, QueryText) VALUES ( 'ClearStorageKey',' call ClearStorage(@GrainIdHash, @GrainIdN0, @GrainIdN1, @GrainTypeHash, @GrainTypeString, @GrainIdExtensionString, @ServiceId, @GrainStateVersion);' );
將這兩個(gè)腳本依次下載運(yùn)行,既可完成Orleans數(shù)據(jù)庫(kù)的初始化。
運(yùn)行
接著分別啟動(dòng)運(yùn)行Silo和Client項(xiàng)目,則可以在數(shù)據(jù)庫(kù)OrleansStorage表中看到相應(yīng)的Grain狀態(tài)數(shù)據(jù)持久化效果。
事件溯源(Event Sourcing)
事件溯源是一種架構(gòu)模式,是借鑒數(shù)據(jù)庫(kù)事件日志的一種數(shù)據(jù)持久方式。
它存在以下幾個(gè)特點(diǎn):
- 整個(gè)系統(tǒng)以事件為驅(qū)動(dòng),所有業(yè)務(wù)都由事件驅(qū)動(dòng)來(lái)完成。
- 系統(tǒng)的數(shù)據(jù)以事件為基礎(chǔ),事件要保存在某種存儲(chǔ)上。
- 業(yè)務(wù)數(shù)據(jù)只是一些由事件產(chǎn)生的視圖,不一定要保存到數(shù)據(jù)庫(kù)中。
Event Sourcing遵循一個(gè)簡(jiǎn)單的思想,就是存儲(chǔ)的時(shí)候只存儲(chǔ)變化量,而不存儲(chǔ)最終結(jié)果.需要最終結(jié)果的地方,就必須提取所有的變化量以及初始狀態(tài),讓它們相加得到最終結(jié)果.
a.不保存對(duì)象的最新?tīng)顟B(tài),而是保存對(duì)象產(chǎn)生的所有事件;
b.通過(guò)事件溯源(Event Sourcing)得到對(duì)象最新?tīng)顟B(tài);
由于只存儲(chǔ)變化量,意味著數(shù)據(jù)只增不減,意味著數(shù)據(jù)存儲(chǔ)后就不會(huì)被更改,意味著高并發(fā)和高吞吐量.因?yàn)閿?shù)據(jù)庫(kù)的數(shù)據(jù)永遠(yuǎn)不變,所以多線程操作都不需要加鎖。可是這里隱藏著另一個(gè)風(fēng)險(xiǎn),就是讀取的數(shù)據(jù)不一定是最新的。這個(gè)"非最新"的確是個(gè)難題,不過(guò)好消息是,如果一直讀,最終能夠讀到最新的,這就是"最終一致性"。
日志一致性提供程序
Orleans實(shí)現(xiàn)了Event Sourcing機(jī)制,而且它的
StateStorage.LogConsistencyProvider(狀態(tài)存儲(chǔ))
使用可單獨(dú)配置的標(biāo)準(zhǔn)存儲(chǔ)提供程序來(lái)存儲(chǔ) grain 狀態(tài)快照。
保存在存儲(chǔ)中的數(shù)據(jù)是一個(gè)對(duì)象,其中包含 grain 狀態(tài)(由
JournaledGrain 的第一個(gè)類型參數(shù)指定)和一些元數(shù)據(jù)(版本號(hào),以及用于避免在存儲(chǔ)訪問(wèn)失敗時(shí)事件重復(fù)的特殊標(biāo)記)。由于每次我們?cè)L問(wèn)存儲(chǔ)時(shí)都會(huì)讀取/寫入整個(gè) grain 狀態(tài),因此該提供程序不適合用于 grain 狀態(tài)很大的對(duì)象。
LogStorage.LogConsistencyProvider(日志存儲(chǔ))
使用可單獨(dú)配置的標(biāo)準(zhǔn)存儲(chǔ)提供程序?qū)⑼暾氖录蛄写鎯?chǔ)為單個(gè)對(duì)象。
保存在存儲(chǔ)中的數(shù)據(jù)是一個(gè)對(duì)象,其中包含 此提供程序支持 RetrieveConfirmedEvents。 所有事件始終可用并保存在內(nèi)存中。
由于每次我們?cè)L問(wèn)存儲(chǔ)時(shí)都會(huì)讀取/寫入整個(gè)事件序列,因此該提供程序不適合在生產(chǎn)環(huán)境中使用,除非保證事件序列相當(dāng)短。 此提供程序的主要用途是演示事件溯源的語(yǔ)義以及示例/測(cè)試環(huán)境。
CustomStorage.LogConsistencyProvider(自定義存儲(chǔ))
允許開(kāi)發(fā)人員插入其存儲(chǔ)接口,然后一致性協(xié)議將在適當(dāng)?shù)臅r(shí)間調(diào)用該接口。 此提供程序不會(huì)對(duì)存儲(chǔ)的內(nèi)容是狀態(tài)快照還是事件做出具體的假設(shè) – 由程序員控制這種選擇(可以存儲(chǔ)快照和/或事件)。
若要使用此提供程序,grain 必須如前所述派生自 JournaledGrain<TGrainState,TEventBase>,此外必須實(shí)現(xiàn)以下接口:
public interface ICustomStorageInterface<StateType, EventType> { Task<KeyValuePair<int, StateType>> ReadStateFromStorage(); Task<bool> ApplyUpdatesToStorage( IReadOnlyList<EventType> updates, int expectedVersion); }
此提供程序不支持
RetrieveConfirmedEvents。 當(dāng)然,由于開(kāi)發(fā)人員仍然控制著存儲(chǔ)接口,因此他們不需要一開(kāi)始就調(diào)用此方法,而可以實(shí)現(xiàn)事件檢索。
基于Orleans的Event Sourcing持久化解決方案示例
項(xiàng)目環(huán)境:.Net Core3.1+MySql+Orleans Event Sourcing
示例項(xiàng)目源碼地址:https://github.com/wswind/learn-orleans/tree/master/04.EventSourcing
此Event Sourcing示例解決方案采用的是LogStorage.LogConsistencyProvider日志一致性提供程序,但沒(méi)有提供持久化存儲(chǔ),因此需要進(jìn)行一定程度的改造。
Silo項(xiàng)目
作為Orleans服務(wù)端程序,單獨(dú)啟動(dòng),依賴Microsoft.Orleans.Server、Microsoft.Orleans.EventSourcing包,添加依賴MySql.Data包以支持Mysql持久化存儲(chǔ),Program.cs中相關(guān)代碼配置如下:
private static async Task<ISiloHost> StartSilo() { // define the cluster configuration var builder = new SiloHostBuilder() .UseLocalhostClustering() .AddMemoryGrainStorageAsDefault() .AddMemoryGrainStorage("DevStore") .AddAdoNetGrainStorage("OrleansStorage", options => { options.Invariant = "MySql.Data.MySqlClient"; //options.Invariant = "System.Data.SqlClient"; options.ConnectionString = "Server=localhost;DataBase=eventsourcingtest;uid=root;pwd=12345678;pooling=true;port=3306;CharSet=utf8;sslMode=None;"; //options.ConnectionString = "Server=.;Database=o3;Trusted_Connection=True;"; options.UseJsonFormat = true; }) .AddLogStorageBasedLogConsistencyProvider("LogStorage") .Configure<ClusterOptions>(options => { options.ClusterId = "dev"; options.ServiceId = "OrleansBasics"; }) .ConfigureApplicationParts(parts => parts.AddApplicationPart(typeof(HelloGrain).Assembly).WithReferences()) .ConfigureLogging(logging => logging.AddConsole()); var host = builder.Build(); await host.StartAsync(); return host; }
GrainInterfaces項(xiàng)目
作為Orleans項(xiàng)目的公共接口,依賴Microsoft.Orleans.Core.Abstractions、Microsoft.Orleans.CodeGenerator.MSBuild包
定義Grain的行為接口:IHello : Orleans.IGrainWithIntegerKey
這里為了使示例看起來(lái)更簡(jiǎn)單明了一些,刪掉了原示例接口程序中2個(gè)無(wú)用的接口。
public interface IHello : Orleans.IGrainWithIntegerKey { Task<int> GetCount();//獲取某個(gè)Grain狀態(tài)屬性接口 Task NewEvent(EventData @event);//將新事件持久化存儲(chǔ)接口 }
定義Grain相關(guān)的公共事件類,為了使最終示例效果更明顯一些,加入了一個(gè)string類型的Who屬性
[Serializable] public class EventData { public EventData() { When = DateTime.UtcNow; } public DateTime When; public string Who; }
再定義2個(gè)基于此事件類的派生事件類
public class EventDataAdd : EventData { public int AddCount; } public class EventDataMinus : EventData { public int MinusCount; }
Grains項(xiàng)目
作為Orleans的Grains的具體實(shí)現(xiàn),依賴Microsoft.Orleans.Persistence.AdoNet、Microsoft.Orleans.Core.Abstractions、Microsoft.Orleans.CodeGenerator.MSBuild和Microsoft.Orleans.EventSourcing包
日志式 Grain 派生自 JournaledGrain<TGrainState,TEventBase>,具有以下類型參數(shù):
- 表示 grain 狀態(tài)的 所有狀態(tài)和事件對(duì)象都應(yīng)該可序列化(因?yàn)槿罩疽恢滦蕴峁┏绦蚩赡苄枰志帽4嫠鼈儯?或在通知消息中發(fā)送它們)。
簡(jiǎn)單說(shuō),支持Event Sourcing的Grain類需要派生自JournaledGrain<TGrainState,TEventBase>,它需要有兩個(gè)泛型參數(shù),一個(gè)是Grain的狀態(tài)類,一個(gè)是與Grain相關(guān)的事件類
定義具體的Grain實(shí)體類:HelloGrain : JournaledGrain<HelloState, EventData>, IHello
同時(shí)需要指定持久化存儲(chǔ)的配置名稱[StorageProvider(ProviderName= "OrleansStorage")]
[StorageProvider(ProviderName = "OrleansStorage")] [LogConsistencyProvider(ProviderName = "LogStorage")] public class HelloGrain : JournaledGrain<HelloState, EventData>, IHello { private readonly ILogger logger; public HelloGrain(ILogger<HelloGrain> logger) { this.logger = logger; } public Task<int> GetCount() { //讀取Grain狀態(tài)屬性 //為了讀取當(dāng)前 grain 狀態(tài)并確定其版本號(hào),JournaledGrain 提供了屬性 //GrainState State { get; }int Version { get; } //版本號(hào)始終等于已確認(rèn)事件的總數(shù),狀態(tài)是將所有已確認(rèn)事件應(yīng)用于初始狀態(tài)后的結(jié)果。 return Task.FromResult(this.State.Count); } public async Task NewEvent(EventData @event) { //RaiseEvent 將事件寫入存儲(chǔ),但不等待寫入完成。 RaiseEvent(@event); //對(duì)于許多應(yīng)用程序而言,必須等待我們收到已持久保存事件的確認(rèn)。 //在這種情況下,我們始終會(huì)通過(guò)等待 ConfirmEvents 來(lái)跟進(jìn) await ConfirmEvents(); //即使不顯式調(diào)用 ConfirmEvents,事件最終也會(huì)得到確認(rèn) - 確認(rèn)會(huì)在后臺(tái)自動(dòng)發(fā)生 } }
重要說(shuō)明:應(yīng)用程序永遠(yuǎn)不應(yīng)直接修改
State 返回的對(duì)象。 該對(duì)象僅供讀取。 相反,當(dāng)應(yīng)用程序想要修改狀態(tài)時(shí),它必須通過(guò)RaiseEvent(引發(fā)事件)來(lái)間接修改。定義Grain的狀態(tài)類:HelloState
public class HelloState { public int Count { get; set; }//狀態(tài)屬性 //更新?tīng)顟B(tài)以響應(yīng)事件 public void Apply(EventDataAdd addData) { Count += addData.AddCount; } public void Apply(EventDataMinus minusData) { Count -= minusData.MinusCount; } }
每當(dāng)RaiseEvent(引發(fā)事件)時(shí),運(yùn)行時(shí)都會(huì)自動(dòng)更新 grain 狀態(tài)。 應(yīng)用程序無(wú)需在引發(fā)事件后顯式更新?tīng)顟B(tài)。 但是,(a) GrainState 類可以在 (b) 也可以在Grain類重寫
TransitionState 函數(shù)。protected override void TransitionState( State state, EventType @event) { // code that updates the state }
Client項(xiàng)目
作為Orleans項(xiàng)目的客戶端程序,單獨(dú)啟動(dòng),負(fù)責(zé)調(diào)用Orleans的Silo服務(wù)端,依賴Microsoft.Orleans.Client包,Program.cs中主要業(yè)務(wù)邏輯部分代碼如下:
private static async Task DoClientWork(IClusterClient client) { //獲取Grain var client1 = client.GetGrain<IHello>(0); //制作事件對(duì)象 EventDataAdd dataAdd = new EventDataAdd { Who = "syb", AddCount = 3 }; //將事件對(duì)象發(fā)給Grain,觸發(fā)寫入存儲(chǔ)及Grain狀態(tài)更新 await client1.NewEvent(dataAdd); //獲取Grain狀態(tài)屬性,檢查是否更新成功 var count = await client1.GetCount(); Console.WriteLine(count); }
數(shù)據(jù)庫(kù)初始化
同樣需要通過(guò)運(yùn)行供應(yīng)商特定的數(shù)據(jù)庫(kù)創(chuàng)建腳本,初始化一個(gè)與代碼兼容的供Orleans使用的數(shù)據(jù)庫(kù)。
https://github.com/dotnet/orleans/tree/main/src/AdoNet/Shared/MySQL-Main.sql
https://github.com/dotnet/orleans/blob/main/src/AdoNet/Orleans.Persistence.AdoNet/MySQL-Persistence.sql
運(yùn)行
最后,依次運(yùn)行Silo和Client項(xiàng)目,也可以多次修改Client事件對(duì)象參數(shù)并啟動(dòng)運(yùn)行,則可以在Mysql數(shù)據(jù)庫(kù)看到Orleans的Event Sourcing效果。
其中事件類型序列存儲(chǔ)在PayloadJson字段,大致內(nèi)容如下:
{"$id": "1", "Log": {"$type": "System.Collections.Generic.List`1[[OrleansBasics.EventData, GrainInterfaces]], System.Private.CoreLib", "$values": [{"$id": "2", "Who": "syb", "When": "2022-08-18T09:01:51.011686Z", "$type": "OrleansBasics.EventDataAdd, GrainInterfaces", "AddCount": 3}, {"$id": "3", "Who": "shiyibo", "When": "2022-08-18T09:02:30.636478Z", "$type": "OrleansBasics.EventDataAdd, GrainInterfaces", "AddCount": 5}]}, "$type": "Orleans.EventSourcing.LogStorage.LogStateWithMetaData`1[[OrleansBasics.EventData, GrainInterfaces]], Orleans.EventSourcing", "WriteVector": "", "GlobalVersion": 2}
本文為作者月井石原創(chuàng),轉(zhuǎn)載請(qǐng)注明出處~

浙公網(wǎng)安備 33010602011771號(hào)