<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Mocha MemoryBufferQueue 設計概述

      前言

      Mocha 是一個基于 .NET 開發的 APM 系統,同時提供可伸縮的可觀測性數據分析和存儲平臺。

      更多關于 Mocha 的介紹,可以參考 http://www.rzrgm.cn/eventhorizon/p/17979677

      Mocha 會需要收集大量的數據,為處理這些數據,我們需要有一個緩沖區。初期我們實現了一個基于內存的緩沖區,下文稱之為 MemoryBufferQueue。

      Buffer 模塊的代碼地址:
      https://github.com/dotnetcore/mocha/tree/main/src/Mocha.Core/Buffer

      本文介紹的版本是 v0.1.0,后續版本可能會有變化。

      MemoryBufferQueue 功能概述

      MemoryBufferQueue 將數據緩沖到內存中,消費者可以從隊列中獲取數據,當隊列中無數據時,消費者會異步等待數據到來。

      MemoryBufferQueue 提供了以下功能:

      1. 支持創建多個 Topic,每個 Topic 都是一個獨立的隊列。
      2. 支持創建多個 Consumer Group,每個 Consumer Group 的消費進度都是獨立的。支持多個 Consumer Group 并發消費同一個 Topic。
      3. 支持同一個 Consumer Group 創建多個 Consumer,以負載均衡的方式消費數據。
      4. 支持數據的批量消費,可以一次性獲取多條數據。
      5. 支持重試機制,當消費者處理數據失敗時,可以選擇不確認消費,這樣數據會被重新消費。

      需要注意的是,當前版本出于簡化實現的考慮,暫不支持消費者的動態擴容和縮容,需要在創建消費者時指定消費者數量。

      Buffer 模塊 API 設計

      MemoryBufferQueue 的出發點的是在項目初期提供一個性能足夠高的內存緩存隊列。后期隨著項目的發展,我們可能會將其替換為別的實現,比如支持持久化的隊列。

      為了解耦,Buffer 模塊使用 Interface 進行了抽象。

      public interface IBufferQueue
      {
          IBufferProducer<T> CreateProducer<T>(string topicName);
      
          IBufferConsumer<T> CreateConsumer<T>(BufferConsumerOptions options);
      
          IEnumerable<IBufferConsumer<T>> CreateConsumers<T>(BufferConsumerOptions options, int consumerNumber);
      }
      
      internal interface IBufferQueue<T>
      {
          string TopicName { get; }
      
          IBufferProducer<T> CreateProducer();
      
          IBufferConsumer<T> CreateConsumer(BufferConsumerOptions options);
      
          IEnumerable<IBufferConsumer<T>> CreateConsumers(BufferConsumerOptions options, int consumerNumber);
      }
      
      public interface IBufferProducer<in T>
      {
          string TopicName { get; }
      
          ValueTask ProduceAsync(T item);
      }
      public interface IBufferConsumer<out T>
      {
          string TopicName { get; }
      
          string GroupName { get; }
      
          IAsyncEnumerable<IEnumerable<T>> ConsumeAsync(CancellationToken cancellationToken = default);
      
          ValueTask CommitAsync();
      }
      
      public class BufferConsumerOptions
      {
          public required string TopicName { get; init; }
      
          public required string GroupName { get; init; }
      
          public bool AutoCommit { get; init; }
      
          public int BatchSize { get; init; } = 100;
      }
      

      數據通過 Producer 寫入 BufferQueue,由 Consumer 進行消費。

      我們對 BufferQueue 有以下的要求:

      • 同一個數據類型 下的 不同 Topic 的 BufferQueue 互不干擾。

      • 同一個 Topic 下的 不同數據類型 的 BufferQueue 互不干擾。

      BufferQueue

      因此我們設計了兩個層級的接口:

      • IBufferQueue:根據 TopicName類型參數 T 將請求轉發給具體的 IBufferQueue<T> 實現(借助 KeyedService 實現),其中參數 T 代表 Buffer 所承載的數據實體的類型。

      • IBufferQueue<T>:具體的 BufferQueue 實現,負責管理 Topic 下的數據。屬于 Buffer 模塊的內部實現,不對外暴露。

      IBufferQueue

      Buffer 模塊提供了通過 ServiceCollection 進行注冊的擴展方法:

      public static class BufferServiceCollectionExtensions
      {
          public static IServiceCollection AddBuffer(
              this IServiceCollection services,
              Action<BufferOptionsBuilder> configure)
          {
              services.AddSingleton<IBufferQueue, BufferQueue>();
              configure(new BufferOptionsBuilder(services));
              return services;
          }
      }
      

      MemoryBufferQueue 模塊通過提供 BufferOptionsBuilder 來進行配置:

      public static class BufferOptionsBuilderExtensions
      {
          public static BufferOptionsBuilder UseMemory(
              this BufferOptionsBuilder builder,
              Action<MemoryBufferOptions> configure)
          {
              var options = new MemoryBufferOptions(builder.Services);
              configure(options);
      
              return builder;
          }
      }
      

      下面是配置和使用 MemoryBufferQueue 的示例:

      var services = new ServiceCollection();
      
      services.AddBuffer(options =>
      {
          options.UseMemory(bufferOptions =>
          {
              bufferOptions.AddTopic<MochaSpan>("otlp-span", Environment.ProcessorCount);
          });
      });
      
      var provider = services.BuildServiceProvider();
      
      var bufferQueue = provider.GetRequiredService<IBufferQueue>();
      
      var producer = bufferQueue.CreateProducer<MochaSpan>("otlp-span");
      
      var consumers = bufferQueue.CreateConsumers<MochaSpan>(new BufferConsumerOptions
      {
          TopicName = "otlp-span",
          GroupName = "test",
          AutoCommit = true, // 配置為 false 時,需要手動調用 CommitAsync 方法
          BatchSize = 100
      }, 2);
      
      var consumerTasks = consumers.Select(async consumer =>
      {
          await foreach (var batch in consumer.ConsumeAsync())
          {
              foreach (var item in batch)
              {
                  Console.WriteLine(item);
              }
              // 如果 AutoCommit 為 false,需要手動調用 CommitAsync 方法
              // await consumer.CommitAsync();
          }
      });
      
      Task.Run(async () =>
      {
          for (int i = 0; i < 1000; i++)
          {
              await producer.ProduceAsync(new MochaSpan());
          }
      });
      
      await Task.WhenAll(consumerTasks);
      

      MemoryBufferQueue 的設計

      Partition 的設計

      為了保證消費速度,MemoryBufferQueue 將數據劃分為多個 Partition,每個 Partition 都是一個獨立的隊列,每個 Partition 都有一個對應的消費者線程。

      Producer 以輪詢的方式往每個 Partition 中寫入數據。
      Consumer 最多不允許超過 Partition 的數量,Partition 按平均分配到組內每個 Customer 上。
      當一個 Consumer 被分配了多個 Partition 時,以輪訓的方式進行消費。
      每個 Partition 上會記錄不同消費組的消費進度,不同組之間的消費進度互不干擾。

      Partition

      對并發的支持

      Producer 支持并發寫入。

      Consumer 消費時是綁定 Partition 的,為保證能正確管理 Partition 的消費進度,Consumer 不支持并發消費。

      如果要增加消費速度,需創建多個 Consumer。

      Partition 的動態擴容

      Partition 的基本組成單元是 Segment,Segment 代表保存數據的數組,多個 Segment 通過鏈表的形式組合成一個 Partition。

      當一個 Segment 寫滿后,通過在其后面追加一個 Segment 實現擴容。

      Segment 中用于保存數據的數組的每一個元素稱為 Slot,每個 Slot 都有一個Partition 內唯一的自增 Offset。

      Segment

      Offset

      Offset 用于標識數據在 Partition 內位置,Partition 都會用 Offset 記錄各個消費組的消費進度。

      Offset 被設計為可以無限自增,一個 Offset 由 Generation 和 Index 組成。

      Generation 和 Index 默認值都為 0,每 Offset 自增一次,Index 加 1,直至溢出后歸 0,Index 每溢出一次,Generation 加 1。

      Generation 最終也會溢出歸 0,此時如果一個 Generation 等于 0 的 Offset 和一個 Generation 等于 ulong.Max 的 Offset 進行比較,則認為前者更大。

      readonly record struct MemoryBufferPartitionOffset(ulong Generation, ulong Index)
      

      Segment 的回收機制

      每次在 Partition 中新增 Segment 時,會從頭判斷此前的 Segment 是否已經被所有消費組消費完,回收最后一個消費完的 Segment 作為新的 Segment 追加到 Partition 末尾使用。

      SegmentRecycle

      歡迎關注個人技術公眾號

      posted @ 2024-01-30 20:47  黑洞視界  閱讀(675)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 久久九九久精品国产免费直播 | 成在线人免费视频| 欧美饥渴熟妇高潮喷水| 97人人模人人爽人人少妇 | 天堂v亚洲国产v第一次| 精品无码国产污污污免费| 色综合视频一区二区三区| 国产中文字幕在线一区| 亚洲熟女乱一区二区三区| 亚洲高清日韩专区精品| 污网站大全免费| 成人免费A级毛片无码网站入口| 国产欧美亚洲精品a第一页| 亚洲成av人片在线观看www| 精品亚洲精品日韩精品| 亚洲自拍偷拍中文字幕色| 丰满少妇内射一区| 欧美成人免费一区二区三区视频| 亚洲精品国偷自产在线| 午夜福利片一区二区三区| 欧美三级中文字幕在线观看| 他掀开裙子把舌头伸进去添视频 | 狠狠色丁香婷婷综合尤物| 性欧美牲交在线视频| 狠狠躁夜夜躁无码中文字幕| 精品亚洲女同一区二区| 性色欲情网站iwww| 精品中文人妻在线不卡| 亚洲国产区男人本色vr| 国产精品熟女孕妇一区二区| 中文字幕国产精品日韩| 一本大道久久香蕉成人网| 无码精品国产VA在线观看DVD| 天堂中文8资源在线8| 99久久国产综合精品成人影院| 日本高清视频网站www| 上司人妻互换中文字幕| 日本熟妇乱一区二区三区| 欧美激情一区二区三区成人| 国产精品久久久久久爽爽爽| 熟女亚洲综合精品伊人久久|