Mocha MemoryBufferQueue 設計概述
前言
Mocha 是一個基于 .NET 開發的 APM 系統,同時提供可伸縮的可觀測性數據分析和存儲平臺。
更多關于 Mocha 的介紹,可以參考 http://www.rzrgm.cn/eventhorizon/p/17979677
Mocha 會需要收集大量的數據,為處理這些數據,我們需要有一個緩沖區。初期我們實現了一個基于內存的緩沖區,下文稱之為 MemoryBufferQueue。
Buffer 模塊的代碼地址:
https://github.com/dotnetcore/mocha/tree/main/src/Mocha.Core/Buffer
本文介紹的版本是 v0.1.0,后續版本可能會有變化。
MemoryBufferQueue 功能概述
MemoryBufferQueue 將數據緩沖到內存中,消費者可以從隊列中獲取數據,當隊列中無數據時,消費者會異步等待數據到來。
MemoryBufferQueue 提供了以下功能:
- 支持創建多個 Topic,每個 Topic 都是一個獨立的隊列。
- 支持創建多個 Consumer Group,每個 Consumer Group 的消費進度都是獨立的。支持多個 Consumer Group 并發消費同一個 Topic。
- 支持同一個 Consumer Group 創建多個 Consumer,以負載均衡的方式消費數據。
- 支持數據的批量消費,可以一次性獲取多條數據。
- 支持重試機制,當消費者處理數據失敗時,可以選擇不確認消費,這樣數據會被重新消費。
需要注意的是,當前版本出于簡化實現的考慮,暫不支持消費者的動態擴容和縮容,需要在創建消費者時指定消費者數量。
Buffer 模塊 API 設計
MemoryBufferQueue 的出發點的是在項目初期提供一個性能足夠高的內存緩存隊列。后期隨著項目的發展,我們可能會將其替換為別的實現,比如支持持久化的隊列。
為了解耦,Buffer 模塊使用 Interface 進行了抽象。
public interface IBufferQueue
{
IBufferProducer<T> CreateProducer<T>(string topicName);
IBufferConsumer<T> CreateConsumer<T>(BufferConsumerOptions options);
IEnumerable<IBufferConsumer<T>> CreateConsumers<T>(BufferConsumerOptions options, int consumerNumber);
}
internal interface IBufferQueue<T>
{
string TopicName { get; }
IBufferProducer<T> CreateProducer();
IBufferConsumer<T> CreateConsumer(BufferConsumerOptions options);
IEnumerable<IBufferConsumer<T>> CreateConsumers(BufferConsumerOptions options, int consumerNumber);
}
public interface IBufferProducer<in T>
{
string TopicName { get; }
ValueTask ProduceAsync(T item);
}
public interface IBufferConsumer<out T>
{
string TopicName { get; }
string GroupName { get; }
IAsyncEnumerable<IEnumerable<T>> ConsumeAsync(CancellationToken cancellationToken = default);
ValueTask CommitAsync();
}
public class BufferConsumerOptions
{
public required string TopicName { get; init; }
public required string GroupName { get; init; }
public bool AutoCommit { get; init; }
public int BatchSize { get; init; } = 100;
}
數據通過 Producer 寫入 BufferQueue,由 Consumer 進行消費。
我們對 BufferQueue 有以下的要求:
-
同一個數據類型 下的 不同 Topic 的 BufferQueue 互不干擾。
-
同一個 Topic 下的 不同數據類型 的 BufferQueue 互不干擾。

因此我們設計了兩個層級的接口:
-
IBufferQueue:根據 TopicName 和 類型參數 T 將請求轉發給具體的 IBufferQueue<T> 實現(借助 KeyedService 實現),其中參數 T 代表 Buffer 所承載的數據實體的類型。
-
IBufferQueue<T>:具體的 BufferQueue 實現,負責管理 Topic 下的數據。屬于 Buffer 模塊的內部實現,不對外暴露。

Buffer 模塊提供了通過 ServiceCollection 進行注冊的擴展方法:
public static class BufferServiceCollectionExtensions
{
public static IServiceCollection AddBuffer(
this IServiceCollection services,
Action<BufferOptionsBuilder> configure)
{
services.AddSingleton<IBufferQueue, BufferQueue>();
configure(new BufferOptionsBuilder(services));
return services;
}
}
MemoryBufferQueue 模塊通過提供 BufferOptionsBuilder 來進行配置:
public static class BufferOptionsBuilderExtensions
{
public static BufferOptionsBuilder UseMemory(
this BufferOptionsBuilder builder,
Action<MemoryBufferOptions> configure)
{
var options = new MemoryBufferOptions(builder.Services);
configure(options);
return builder;
}
}
下面是配置和使用 MemoryBufferQueue 的示例:
var services = new ServiceCollection();
services.AddBuffer(options =>
{
options.UseMemory(bufferOptions =>
{
bufferOptions.AddTopic<MochaSpan>("otlp-span", Environment.ProcessorCount);
});
});
var provider = services.BuildServiceProvider();
var bufferQueue = provider.GetRequiredService<IBufferQueue>();
var producer = bufferQueue.CreateProducer<MochaSpan>("otlp-span");
var consumers = bufferQueue.CreateConsumers<MochaSpan>(new BufferConsumerOptions
{
TopicName = "otlp-span",
GroupName = "test",
AutoCommit = true, // 配置為 false 時,需要手動調用 CommitAsync 方法
BatchSize = 100
}, 2);
var consumerTasks = consumers.Select(async consumer =>
{
await foreach (var batch in consumer.ConsumeAsync())
{
foreach (var item in batch)
{
Console.WriteLine(item);
}
// 如果 AutoCommit 為 false,需要手動調用 CommitAsync 方法
// await consumer.CommitAsync();
}
});
Task.Run(async () =>
{
for (int i = 0; i < 1000; i++)
{
await producer.ProduceAsync(new MochaSpan());
}
});
await Task.WhenAll(consumerTasks);
MemoryBufferQueue 的設計
Partition 的設計
為了保證消費速度,MemoryBufferQueue 將數據劃分為多個 Partition,每個 Partition 都是一個獨立的隊列,每個 Partition 都有一個對應的消費者線程。
Producer 以輪詢的方式往每個 Partition 中寫入數據。
Consumer 最多不允許超過 Partition 的數量,Partition 按平均分配到組內每個 Customer 上。
當一個 Consumer 被分配了多個 Partition 時,以輪訓的方式進行消費。
每個 Partition 上會記錄不同消費組的消費進度,不同組之間的消費進度互不干擾。

對并發的支持
Producer 支持并發寫入。
Consumer 消費時是綁定 Partition 的,為保證能正確管理 Partition 的消費進度,Consumer 不支持并發消費。
如果要增加消費速度,需創建多個 Consumer。
Partition 的動態擴容
Partition 的基本組成單元是 Segment,Segment 代表保存數據的數組,多個 Segment 通過鏈表的形式組合成一個 Partition。
當一個 Segment 寫滿后,通過在其后面追加一個 Segment 實現擴容。
Segment 中用于保存數據的數組的每一個元素稱為 Slot,每個 Slot 都有一個Partition 內唯一的自增 Offset。

Offset
Offset 用于標識數據在 Partition 內位置,Partition 都會用 Offset 記錄各個消費組的消費進度。
Offset 被設計為可以無限自增,一個 Offset 由 Generation 和 Index 組成。
Generation 和 Index 默認值都為 0,每 Offset 自增一次,Index 加 1,直至溢出后歸 0,Index 每溢出一次,Generation 加 1。
Generation 最終也會溢出歸 0,此時如果一個 Generation 等于 0 的 Offset 和一個 Generation 等于 ulong.Max 的 Offset 進行比較,則認為前者更大。
readonly record struct MemoryBufferPartitionOffset(ulong Generation, ulong Index)
Segment 的回收機制
每次在 Partition 中新增 Segment 時,會從頭判斷此前的 Segment 是否已經被所有消費組消費完,回收最后一個消費完的 Segment 作為新的 Segment 追加到 Partition 末尾使用。

歡迎關注個人技術公眾號


浙公網安備 33010602011771號