Drasi Reactions SDK
Reaction(反應器)是Drasi系統中的一個重要組件,它能夠對數據變化做出響應和處理。簡單來說,當你的數據發生變化時,Reaction就會被觸發,然后執行你定義的操作。擴展Drasi Reactions的文檔參見:https://github.com/drasi-project/docs/blob/main/docs/content/how-to-guides/extend-drasi/implement-a-reaction.md 。
實現自定義Reaction的三個步驟
第一步:創建Docker鏡像
你可以使用任何編程語言來編寫Reaction,目前SDK 支持三種主流編程語言:JavaScript/TypeScript、Python 和 .NET。只要最終能打包成Docker鏡像即可。這個Docker鏡像需要:
- 能夠讀取配置信息
- 能夠接收數據變化的消息
- 對數據變化做出響應
第二步:處理查詢配置
當Reaction運行時,Drasi會在容器內的 /etc/queries 目錄下為每個查詢創建配置文件。
實際例子
假設你配置了一個簡單的Reaction:
apiVersion: query.reactive-graph.io/v1
kind: Reaction
metadata:
name: my-reaction
spec:
reactionImage: my-reaction
queries:
- queryId: query1
- queryId: query2
- queryId: query3
這個配置會在 /etc/queries 目錄下創建三個文件:query1、query2和query3。
你也可以為每個查詢添加額外的配置信息:
apiVersion: query.reactive-graph.io/v1
kind: Reaction
metadata:
name: my-reaction
spec:
reactionImage: my-reaction
queries:
- queryId: query1
options: >
foo
- queryId: query2
options: >
bar
第三步:處理數據變化
當數據發生變化時,Drasi會通過 Dapr 發送消息到你的Reaction。當數據發生變化時,你會收到JSON格式的消息,包含三種類型的變化:
addedResults:新增的數據deletedResults:刪除的數據updatedResults:更新的數據
Drasi Reactions SDK 是一個跨語言的開發工具包,用于實現和處理 Drasi 平臺的 Reactions(反應器)功能。該 SDK 目前支持三種主流編程語言:JavaScript/TypeScript、Python 和 .NET。
主要功能和特點:
核心功能:
- 處理來自 Continuous Query(持續查詢)的變更事件(ChangeEvent)
- 處理控制事件(ControlEvent)
- 支持查詢配置的解析和管理
- 提供事件訂閱和處理機制
主要事件類型:
- ChangeEvent(變更事件):包含查詢結果的添加、刪除和更新信息
- ControlEvent(控制事件):處理啟動、停止等控制信號
支持的語言實現:
JavaScript/TypeScript 版本:
import { DrasiReaction, ChangeEvent } from '@drasi/reaction-sdk';
let myReaction = new DrasiReaction(async (event: ChangeEvent) => {
console.log(`Received change sequence: ${event.sequence} for query ${event.queryId}`);
// 處理添加的結果
for (let added of event.addedResults) {
console.log(`Added result: ${JSON.stringify(added)}`);
}
// 處理刪除的結果
for (let deleted of event.deletedResults) {
console.log(`Removed result: ${JSON.stringify(deleted)}`);
}
// 處理更新的結果
for (let updated of event.updatedResults) {
console.log(`Updated result - before: ${JSON.stringify(updated.before)}, after: ${JSON.stringify(updated.after)}`);
}
});
myReaction.start();
Python 版本:
from drasi.reaction.models.ChangeEvent import ChangeEvent
from drasi.reaction.sdk import DrasiReaction
async def change_event(event: ChangeEvent, query_configs: dict[str, Any] | None = None):
print(f"Received change sequence {event.sequence} for query {event.queryId}")
if event.addedResults:
print(f"Added result: {event.addedResults}")
if event.deletedResults:
print(f"Removed result: {event.deletedResults}")
if event.updatedResults:
print(f"Updated result - before: {event.updatedResults[0].before}, after {event.updatedResults[0].after}")
reaction = DrasiReaction(on_change_event=change_event)
reaction.start()
.NET 版本:
var reaction = new ReactionBuilder()
.UseChangeEventHandler(async (evt, queryConfig) => {
Console.WriteLine($"Received change event from query {evt.QueryId} sequence {evt.Sequence}");
foreach (var item in evt.AddedResults)
Console.WriteLine($"Added result: {item}");
foreach (var item in evt.UpdatedResults)
Console.WriteLine($"Updated result, before {item.Before}, after {item.After}");
foreach (var item in evt.DeletedResults)
Console.WriteLine($"Deleted result: {item}");
})
.Build();
await reaction.StartAsync();
上面是基礎示例,接下來我們來看一個高級示例
- 自定義配置類
class MyQueryConfig
{
[JsonPropertyName("greeting")]
public string? Greeting { get; set; }
}
- 自定義服務實現
class MyService
{
private readonly string _connectionString;
public MyService(IConfiguration configuration)
{
_connectionString = configuration["MyConnectionString"];
}
public void DoSomething()
{
Console.WriteLine("Doing something");
}
}
- 事件處理器實現
- 變更事件處理器(
MyChangeHandler)
class MyChangeHandler : IChangeEventHandler<MyQueryConfig>
{
private readonly MyService _service;
private readonly ILogger<MyChangeHandler> _logger;
public MyChangeHandler(MyService service, ILogger<MyChangeHandler> logger)
{
_service = service;
_logger = logger;
}
public async Task HandleChange(ChangeEvent evt, MyQueryConfig? queryConfig)
{
_logger.LogInformation($"Received change event from query {evt.QueryId} sequence {evt.Sequence}. Query greeting is {queryConfig?.Greeting}");
_logger.LogInformation($"Full event: {evt.ToJson()}");
_service.DoSomething();
}
}
// 控制事件處理器(MyControlSignalHandler)
class MyControlSignalHandler : IControlEventHandler<MyQueryConfig>
{
private readonly ILogger<MyControlSignalHandler> _logger;
public MyControlSignalHandler(ILogger<MyControlSignalHandler> logger)
{
_logger = logger;
}
public async Task HandleControlSignal(ControlEvent evt, MyQueryConfig? queryConfig)
{
_logger.LogWarning($"Received control signal: {evt.ControlSignal?.Kind} for query {evt.QueryId}. Query greeting is {queryConfig?.Greeting}");
}
}var reaction = new ReactionBuilder<MyQueryConfig>() .UseChangeEventHandler<MyChangeHandler>() // Use your custom change handler .UseControlEventHandler<MyControlSignalHandler>() // Use your custom control signal handler .UseYamlQueryConfig() // Parse the per query configuration from Yaml .ConfigureServices((services) => // Register your own services { services.AddSingleton<MyService>(); }) .Build(); // Start the reaction await reaction.StartAsync();
注冊你的ReactionProvider
創建好Reaction后,需要通過ReactionProvider將它注冊到Drasi系統中。 下面是一個ReactionProvider配置:
apiVersion: v1
kind: ReactionProvider
name: Advanced
spec:
services:
reaction:
image: reaction-advanced
externalImage: true
config_schema:
type: object
properties:
MyConnectionString:
type: string
required:
- MyConnectionString
Reaction使用示例
kind: Reaction
apiVersion: v1
name: test-advanced
spec:
kind: Advanced
properties:
MyConnectionString: "some connection string"
queries:
query1: |
greeting: "Hello, World!"
query2: |
greeting: "Howdy!"
如何部署
使用Drasi命令行工具部署你的Reaction:
# 先部署ReactionProvider drasi apply -f reaction-provider.yaml # 然后部署Reaction drasi apply -f reaction.yaml
調試技巧
- 使用環境變量來配置你的Reaction
- 查看Docker容器日志來排查問題
- 利用Drasi的VSCode插件來驗證配置文件
- 確保你的Docker鏡像能夠正確處理Dapr消息
常見問題
Q: 為什么我的Reaction沒有收到數據變化? A: 檢查查詢ID是否配置正確,以及Dapr訂閱是否成功。
Q: 如何測試我的Reaction? A: 可以先使用調試模式運行,打印收到的所有消息。
Q: 配置文件驗證失敗怎么辦? A: 使用Drasi的VSCode插件或CLI工具驗證配置文件格式是否正確。
SDK 的設計理念是提供一個簡單但強大的接口,讓開發者能夠方便地實現和管理 Drasi 平臺的反應器功能。無論使用哪種編程語言,都可以通過相似的 API 設計模式來處理事件和管理配置。
歡迎大家掃描下面二維碼成為我的客戶,扶你上云

浙公網安備 33010602011771號