[NewLife.XCode]實體隊列(多線程生產(chǎn)的大數(shù)據(jù)集中保存)
NewLife.XCode是一個有15年歷史的開源數(shù)據(jù)中間件,支持netcore/net45/net40,由新生命團隊(2002~2020)開發(fā)完成并維護至今,以下簡稱XCode。
整個系列教程會大量結(jié)合示例代碼和運行日志來進行深入分析,蘊含多年開發(fā)經(jīng)驗于其中,代表作有百億級大數(shù)據(jù)實時計算項目。
開源地址:https://github.com/NewLifeX/X (求star, 1067+)
在大數(shù)據(jù)分析處理中,需要對海量數(shù)據(jù)進行添刪改操作,常規(guī)單行操作難以滿足要求,批量操作勢在必行!
飛仙(http://feixian.newlifex.com/)有收藏各種數(shù)據(jù)庫批量插入數(shù)據(jù)的性能排行榜,其中MySql冠軍是60萬tps,SQLite冠軍是56.6萬tps!
然而很多時候,數(shù)據(jù)來自多個渠道(多線程、多網(wǎng)絡(luò)連接),單個渠道數(shù)據(jù)量不大,甚至只有一行,就難以使用批量添刪改操作了。例如物聯(lián)網(wǎng)數(shù)據(jù)采集、埋點日志等,在多線程上有大量數(shù)據(jù)需要寫入。因此,XCode創(chuàng)造性設(shè)計了實體隊列技術(shù)!
!!閱讀本文之前,建議閱讀:https://www.yuque.com/smartstone/xcode/batch
什么是實體隊列
要說實體隊列EntityDeferredQueue,就不得不提它的基類延遲隊列DeferredQueue。
延遲隊列DeferredQueue的核心思想就是“湊批”,把要處理的零散數(shù)據(jù)放入一個“隊列”,然后定時集中處理。
例如物聯(lián)網(wǎng)采集服務(wù)端從多個連接收到數(shù)據(jù),需要寫入數(shù)據(jù)庫,為了提升吞吐,可以把實體數(shù)據(jù)放入延遲隊列,然后定時的落庫,此時,延遲隊列得到一批數(shù)據(jù),可以使用批量插入技術(shù)。

實際上DeferredQueue內(nèi)部并不是一個隊列,而是一個并發(fā)字典,因為有些業(yè)務(wù)場景,需要在“入隊列”時去重,例如統(tǒng)計數(shù)據(jù),需要拿出某省份的統(tǒng)計數(shù)據(jù),多次累加后集中保存。
private static readonly DeferredQueue _statCache = new EntityDeferredQueue { Name = "Gun", Action = EntityActions.Save };
private static void SaveStat(DateTime date, Int32 provinceID, String kind, ScanKinds scanKind, String code)
{
var key = $"{date:yyMMdd}_{provinceID}_{kind}";
var stat = _statCache.GetOrAdd(key, k => GunProvinceStat.FindByKey(k, true) ?? new GunProvinceStat());
stat.StatDate = date;
stat.Kind = kind;
stat.ProvinceID = provinceID;
stat.LastCode = code;
stat.ProcessStat(scanKind);
_statCache.Commit(key);
}
主要流程

對于統(tǒng)計型數(shù)據(jù)來說,可以在內(nèi)存里面多次累加計算指標(biāo),然后一次性保存,并且是批量保存,極大減少了數(shù)據(jù)庫寫入次數(shù)。這是大數(shù)據(jù)分析必備利器!
延遲隊列主要屬性
/// <summary>跟蹤數(shù)。達到該值時輸出跟蹤日志,默認(rèn)1000</summary>
public Int32 TraceCount { get; set; } = 1000;
/// <summary>周期。默認(rèn)10_000毫秒</summary>
public Int32 Period { get; set; } = 10_000;
/// <summary>最大個數(shù)。超過該個數(shù)時,進入隊列將產(chǎn)生堵塞。默認(rèn)100_000</summary>
public Int32 MaxEntity { get; set; } = 100_000;
/// <summary>批大小。默認(rèn)5_000</summary>
public Int32 BatchSize { get; set; } = 5_000;
/// <summary>等待借出對象確認(rèn)修改的時間,默認(rèn)3000ms</summary>
public Int32 WaitForBusy { get; set; } = 3_000;
/// <summary>保存速度,每秒保存多少個實體</summary>
public Int32 Speed { get; private set; }
/// <summary>是否異步處理。默認(rèn)true表示異步處理,共用DQ定時調(diào)度;false表示同步處理,獨立線程</summary>
public Boolean Async { get; set; } = true;
回過頭來,實體隊列EntityDeferredQueue作為延遲隊列的擴展延伸,實際上是定義了“隊列數(shù)據(jù)”的處理行為。延遲隊列只負(fù)責(zé)收集數(shù)據(jù)和定時調(diào)度,實際處理行為Process需要擴展。
EntityDeferredQueue定義了 Save/Insert/Update/Upsert/Delete 等行為供選擇。
如何使用實體隊列提升吞吐
再次深入分析前文的例子
private static readonly DeferredQueue _statCache = new EntityDeferredQueue { Name = "Gun", Action = EntityActions.Save };
private static void SaveStat(DateTime date, Int32 provinceID, String kind, ScanKinds scanKind, String code)
{
var key = $"{date:yyMMdd}_{provinceID}_{kind}";
var stat = _statCache.GetOrAdd(key, k => GunProvinceStat.FindByKey(k, true) ?? new GunProvinceStat());
stat.StatDate = date;
stat.Kind = kind;
stat.ProvinceID = provinceID;
stat.LastCode = code;
stat.ProcessStat(scanKind);
_statCache.Commit(key);
}
這是一個非常簡單的數(shù)據(jù)分析項目,統(tǒng)計每天各省每一種掃描類型的操作次數(shù)。日均分析處理5億行數(shù)據(jù),每一行數(shù)據(jù)都要識別出日期、省份、類別等字段,也就是SaveStat每天要調(diào)用5億次,結(jié)果數(shù)據(jù)分類存入統(tǒng)計表。共31省份27種類別,每日統(tǒng)計行數(shù)約800行(并非每個省都有全部類別)。通俗來講,5億行數(shù)據(jù),分組聚合得到800行,實時計算,每5秒計算一次。
采用流式計算框架,逐行遍歷5億行實時數(shù)據(jù),如果Insert/Update數(shù)據(jù)庫5億次,顯然很不現(xiàn)實!
平均每行寫入62.5萬次(5億/800),如果能夠在內(nèi)存里面“湊一湊”,每1000次更新,才寫入一次數(shù)據(jù)庫,那么總寫入次數(shù)降低為50萬次,平均每行寫入625次。
實體隊列/延遲隊列,正是為了這類場景而設(shè)計!
首先,根據(jù)業(yè)務(wù)去構(gòu)造一個唯一key,在這里就是日期+省份+類別;
其次,GetOrAdd嘗試從隊列里獲取該key對應(yīng)的統(tǒng)計對象,99%時候內(nèi)存命中,如果不存在,則查數(shù)據(jù)庫或者new一個;
再次,取得統(tǒng)計對象后,可以進行字段累加,stat.ProcessStat(scanKind);
最后,Commit告訴隊列,該key對應(yīng)的實體對象已經(jīng)使用完成,可以提交;
在延遲隊列內(nèi)部,定時(Period=10_000ms)執(zhí)行一次保存,把內(nèi)存里面的統(tǒng)計對象批量保存到數(shù)據(jù)庫,并清空隊列。
這里遇到的第一個問題就是,少量統(tǒng)計對象仍然使用怎么辦?請放心,定時任務(wù)會等待一定時間(WaitForBusy=3000ms),如果使用方Commit則提前完成。因此,上面的Commit可以不要,效果會變差一些,同時,統(tǒng)計邏輯必須盡快完成(<3000ms)。
第二個問題很重要,定時間隔(Period=10_000ms)之內(nèi),內(nèi)存數(shù)據(jù)是高危狀態(tài),如果此時進程退出,則意味著統(tǒng)計數(shù)據(jù)丟失。標(biāo)準(zhǔn)架構(gòu)應(yīng)該是在數(shù)據(jù)落庫以后做Ack確認(rèn),但是原始數(shù)據(jù)實在太多(5億),很不現(xiàn)實。因此,實際工作中,我們是通過提升系統(tǒng)可靠性來規(guī)避該問題,采用螞蟻調(diào)度AntJob,結(jié)合分布式多節(jié)點部署,在實時計算中,內(nèi)存保留數(shù)據(jù)并不多。每次需要更新程序時,先停止調(diào)度一分鐘,等待數(shù)據(jù)落庫和冷卻,才能推出應(yīng)用進程。在數(shù)據(jù)分析領(lǐng)域,一般允許有一定的數(shù)據(jù)誤差(<0.01%),或者白天實時計算加夜晚離線重算的模式!
實際經(jīng)驗表明,只要應(yīng)用沒有非法退出,不存在數(shù)據(jù)丟失問題!
再來看看 ProcessStat內(nèi)部,(這里的GunProvinceStat是XCode實體類,一張統(tǒng)計表)
public void ProcessStat(ScanKinds kind)
{
//stat.Total++;
Interlocked.Increment(ref _Total);
switch (kind)
{
case ScanKinds.Receipt:
//stat.Receipts++;
Interlocked.Increment(ref _Receipts);
break;
case ScanKinds.SendBill:
case ScanKinds.SendAir:
//stat.Sends++;
Interlocked.Increment(ref _Sends);
break;
case ScanKinds.SendBag:
Interlocked.Increment(ref _SendBags);
break;
case ScanKinds.ComeBill:
case ScanKinds.ComeAir:
//stat.Comes++;
Interlocked.Increment(ref _Comes);
break;
case ScanKinds.ComeBag:
Interlocked.Increment(ref _ComeBags);
break;
case ScanKinds.SendCar:
case ScanKinds.ComeCar:
Interlocked.Increment(ref _Cars);
break;
case ScanKinds.Dispatch:
//stat.Dispatchs++;
Interlocked.Increment(ref _Dispatchs);
break;
case ScanKinds.Sign:
//stat.Signs++;
Interlocked.Increment(ref _Signs);
break;
case ScanKinds.Back:
Interlocked.Increment(ref _Backs);
break;
case ScanKinds.Problem:
Interlocked.Increment(ref _Problems);
break;
case ScanKinds.Stay:
case ScanKinds.Other:
case ScanKinds.Input:
case ScanKinds.Order:
case ScanKinds.Electronic:
default:
Interlocked.Increment(ref _Others);
break;
}
}
數(shù)據(jù)表結(jié)構(gòu)
<Table Name="GunProvinceStat" Description="巴槍省份統(tǒng)計" IgnoreNameCase="False">
<Columns>
<Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="編號" />
<Column Name="StatDate" DataType="DateTime" Description="統(tǒng)計日期" />
<Column Name="ProvinceID" DataType="Int32" Description="省份。0表示全國" />
<Column Name="Kind" DataType="String" Description="類別。All表示所有類型" />
<Column Name="Total" DataType="Int64" Description="總次數(shù)" />
<Column Name="Receipts" DataType="Int64" Description="收件數(shù)" />
<Column Name="Sends" DataType="Int64" Description="發(fā)件數(shù)" />
<Column Name="Comes" DataType="Int64" Description="到件數(shù)" />
<Column Name="Dispatchs" DataType="Int64" Description="派件數(shù)" />
<Column Name="Signs" DataType="Int64" Description="簽收數(shù)" />
<Column Name="SendBags" DataType="Int64" Description="發(fā)包數(shù)" />
<Column Name="ComeBags" DataType="Int64" Description="到包數(shù)" />
<Column Name="Cars" DataType="Int64" Description="掃車數(shù)" />
<Column Name="Backs" DataType="Int64" Description="退件數(shù)" />
<Column Name="Problems" DataType="Int64" Description="問題件數(shù)" />
<Column Name="Others" DataType="Int64" Description="其它數(shù)" />
<Column Name="LastCode" DataType="String" Description="最后單號" />
<Column Name="CreateTime" DataType="DateTime" Description="創(chuàng)建時間" />
<Column Name="UpdateTime" DataType="DateTime" Description="更新時間" />
</Columns>
<Indexes>
<Index Columns="StatDate,ProvinceID,Kind" Unique="True" />
<Index Columns="Kind,ProvinceID" />
</Indexes>
</Table>
系列教程
NewLife.XCode教程系列[2019版]
- 增刪改查入門。快速展現(xiàn)用法,代碼配置連接字符串
- 數(shù)據(jù)模型文件。建立表格字段和索引,名字以及數(shù)據(jù)類型規(guī)范,推薦字段(時間,用戶,IP)
- 實體類詳解。數(shù)據(jù)類業(yè)務(wù)類,泛型基類,接口
- 功能設(shè)置。連接字符串,調(diào)試開關(guān),SQL日志,慢日志,參數(shù)化,執(zhí)行超時。代碼與配置文件設(shè)置,連接字符串局部設(shè)置
- 反向工程。自動建立數(shù)據(jù)庫數(shù)據(jù)表
- 數(shù)據(jù)初始化。InitData寫入初始化數(shù)據(jù)
- 高級增刪改。重載攔截,自增字段,Valid驗證,實體模型(時間,用戶,IP)
- 臟數(shù)據(jù)。如何產(chǎn)生,怎么利用
- 增量累加。高并發(fā)統(tǒng)計
- 事務(wù)處理。單表和多表,不同連接,多種寫法
- 擴展屬性。多表關(guān)聯(lián),Map映射
- 高級查詢。復(fù)雜條件,分頁,自定義擴展FieldItem,查總記錄數(shù),查匯總統(tǒng)計
- 數(shù)據(jù)層緩存。Sql緩存,更新機制
- 實體緩存。全表整理緩存,更新機制
- 對象緩存。字典緩存,適用用戶等數(shù)據(jù)較多場景。
- 百億級性能。字段精煉,索引完備,合理查詢,充分利用緩存
- 實體工廠。元數(shù)據(jù),通用處理程序
- 角色權(quán)限。Membership
- 導(dǎo)入導(dǎo)出。Xml,Json,二進制,網(wǎng)絡(luò)或文件
- 分表分庫。常見拆分邏輯
- 高級統(tǒng)計。聚合統(tǒng)計,分組統(tǒng)計
- 批量寫入。批量插入,批量Upsert,異步保存
- 實體隊列。寫入級緩存,提升性能。
- 備份同步。備份數(shù)據(jù),恢復(fù)數(shù)據(jù),同步數(shù)據(jù)
- 數(shù)據(jù)服務(wù)。提供RPC接口服務(wù),遠(yuǎn)程執(zhí)行查詢,例如SQLite網(wǎng)絡(luò)版
- 大數(shù)據(jù)分析。ETL抽取,調(diào)度計算處理,結(jié)果持久化

浙公網(wǎng)安備 33010602011771號