第六章:C#數據流基礎
第六章:C#數據流基礎
TPL 數據流(Task Parallel Library Dataflow)是一個強大的庫,能夠創建網格或管道,異步處理數據。它具備非常明顯的聲明式編程風格:開發者首先定義完整的網格或管道結構,然后將數據輸入該結構,數據便會自動流經各個處理步驟。這種思維方式與傳統編程有所不同,但一旦掌握,數據流便能簡化許多復雜的并發處理場景。
在 TPL 數據流中,網格由若干個彼此關聯的“塊”(Block)組成。每個塊負責數據處理的一個步驟,完成后將結果傳遞給下一個塊。使用 TPL 數據流需要安裝 System.Threading.Tasks.Dataflow NuGet 包。
6.1 關聯的塊
TPL 數據流(Task Parallel Library Dataflow)中的塊(Block)是數據流中的基本構建塊。塊用于處理數據流中的某個步驟,并將處理后的數據傳遞給其他塊。要實現數據在塊之間的流動,通常需要將塊關聯起來,形成一個數據處理的管道或網格。塊的關聯方式和數據流動的控制是數據流編程中的核心概念之一。
6.1.1 塊的基本概念
數據流塊是 TPL 數據流中最基本的單位,負責接收、處理、傳遞數據。每個塊都有輸入和輸出,塊之間可以通過鏈接(LinkTo)將數據從一個塊傳遞到下一個塊。
常見的塊類型包括:
BufferBlock<T>:用于存儲和緩沖數據,類似于隊列。TransformBlock<TInput, TOutput>:處理輸入并生成輸出。它是最常用的塊之一,常用于數據的轉換、計算等。ActionBlock<T>:用于執行某些操作,但不產生任何輸出。通常用于數據流的最終階段,如保存到文件、數據庫等。BroadcastBlock<T>:將輸入廣播給多個目標塊。適合需要將數據發送到多個消費者的場景。
6.1.2 塊的關聯與 LinkTo
塊之間的關聯通過 LinkTo 方法實現。LinkTo 是一種擴展方法,用于將一個數據流塊的輸出鏈接到另一個塊的輸入。通過這種方式,可以將多個塊組合,形成一個數據流管道。
基本示例:
var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
var subtractBlock = new TransformBlock<int, int>(item => item - 2);
// 通過 LinkTo 將 multiplyBlock 的輸出鏈接到 subtractBlock 的輸入
multiplyBlock.LinkTo(subtractBlock);
// 發送數據到 multiplyBlock
multiplyBlock.Post(5);
// 從 subtractBlock 獲取結果
Console.WriteLine(await subtractBlock.ReceiveAsync()); // 輸出 8 (5 * 2 - 2)
塊關聯的特點:
- 數據流動:
LinkTo使得數據能夠從一個塊流向另一個塊。數據在multiplyBlock被處理(乘以 2),然后傳遞給subtractBlock進行后續處理(減去 2)。 - 異步處理:各個數據流塊獨立處理數據,整個處理流程是異步的。每個塊都可以并行處理多個數據項,最大限度地利用多核處理器的性能。
6.1.3 PropagateCompletion:傳播完成狀態
在默認情況下,數據流塊只會傳播數據,不會傳播完成狀態或錯誤。這意味著即使第一個塊完成了,后續塊也不會自動完成,可能會導致數據流無法正確結束。因此,通常需要顯式傳播完成狀態。
PropagateCompletion 選項允許在塊之間傳播完成狀態。這樣,當源塊完成時,目標塊也會自動完成。
示例:傳播完成狀態
var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
var subtractBlock = new TransformBlock<int, int>(item => item - 2);
// 設置 PropagateCompletion 選項,確保完成狀態傳播
var options = new DataflowLinkOptions { PropagateCompletion = true };
multiplyBlock.LinkTo(subtractBlock, options);
// 發送數據并完成第一個塊
multiplyBlock.Post(5);
multiplyBlock.Complete();
// 等待 subtractBlock 完成處理
await subtractBlock.Completion;
Console.WriteLine("All blocks completed.");
解釋:
multiplyBlock.Complete()標記multiplyBlock不再接收任何數據,并且處理完所有當前正在處理的數據后進入完成狀態。- 通過
PropagateCompletion,multiplyBlock的完成狀態會自動傳播給subtractBlock。因此,當multiplyBlock完成時,subtractBlock也會完成。
6.1.4 通過 LinkTo 進行數據過濾
LinkTo 方法還支持數據過濾。通過傳遞一個謂詞(Predicate<T>),可以指定只有滿足條件的數據才會流向下一個塊。未通過過濾器的數據會保留在當前塊中,等待其他處理。
示例:僅允許偶數通過鏈接
var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
var evenBlock = new TransformBlock<int, int>(item => item / 2);
// 只允許偶數通過
multiplyBlock.LinkTo(evenBlock, item => item % 2 == 0);
// 提交數據
multiplyBlock.Post(3); // 不會通過到 evenBlock
multiplyBlock.Post(4); // 會通過到 evenBlock
// 獲取結果
Console.WriteLine(await evenBlock.ReceiveAsync()); // 輸出 2
在該示例中,只有偶數會通過鏈接到下一個塊,而奇數會被阻塞在 multiplyBlock 中,不會傳遞到 evenBlock。
6.1.5 數據流中的分叉和循環
數據流不僅可以是線性的,還可以實現分叉和循環,來處理更復雜的場景。通過 LinkTo,一個塊可以連接到多個目標塊,從而實現分叉數據流。
示例:數據流的分叉
var sourceBlock = new BufferBlock<int>();
var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
var subtractBlock = new TransformBlock<int, int>(item => item - 2);
// 將同一個源塊的數據分叉到兩個目標塊
sourceBlock.LinkTo(multiplyBlock);
sourceBlock.LinkTo(subtractBlock);
// 提交數據
sourceBlock.Post(10);
// 獲取結果
Console.WriteLine(await multiplyBlock.ReceiveAsync()); // 輸出 20
Console.WriteLine(await subtractBlock.ReceiveAsync()); // 輸出 8
在這個例子中,sourceBlock 將數據同時發送給兩個目標塊。每個目標塊獨立地處理相同的數據,實現了數據流的分叉。
6.1.6 討論
塊的關聯和數據的流動是 TPL 數據流的核心機制。LinkTo 提供了靈活的方式來連接塊,形成線性管道、分叉、過濾等多種數據處理模式。
-
數據流的聲明式風格:
TPL 數據流提供了一種聲明式方式來定義數據處理過程。通過定義塊及其關聯關系,開發者可以輕松構建并發數據處理的管道。 -
異步與并行:每個塊獨立處理數據,支持異步和并行執行。
TPL 數據流能夠充分利用多核 CPU,提高處理效率。 -
過濾與分叉:通過
LinkTo的過濾功能,可以輕松實現數據流的條件分支。同時,塊之間可以實現分叉,允許數據同時流向多個目標塊。 -
完成狀態傳播:
PropagateCompletion是數據流中的重要機制,保證數據處理流程的結束狀態能夠在整個數據流管道中傳播。 -
高級場景:對于更復雜的網格結構,可以使用
LinkTo實現循環、分支、多目標塊等高級數據流模式。這些場景雖然不常見,但在某些特定的并發處理需求下非常有用。
6.2 傳播錯誤
在異步和并發程序中,異常處理是不可忽視的一部分。TPL 數據流 中的錯誤傳播機制,是數據流可靠性和健壯性的重要保證。當數據流中的某個塊發生異常時,默認情況下,該塊會進入錯誤狀態,并停止處理后續數據。為了能正確響應和處理這些異常,TPL 數據流 提供了多種機制來捕獲和傳播錯誤。
6.2.1 塊中的異常處理
當數據流塊中的委托(如 TransformBlock<TInput, TOutput> 的處理邏輯)拋出異常時,該塊會進入錯誤狀態。此時,塊會丟棄所有尚未處理的數據,并且不再接收新的數據。處理數據時發生的異常不會立即拋出,而是通過塊的 Completion 屬性表示塊的完成狀態。
示例:塊中的異常
var block = new TransformBlock<int, int>(item =>
{
if (item == 1)
throw new InvalidOperationException("Invalid data: " + item);
return item * 2;
});
block.Post(1); // 這個輸入會拋出異常
block.Post(2); // 這個數據不會被處理,因為塊已進入錯誤狀態
try
{
await block.Completion;
}
catch (InvalidOperationException ex)
{
Console.WriteLine($"Caught exception: {ex.Message}");
}
解釋:
block.Post(1)會觸發塊內部的異常,塊進入錯誤狀態。- 塊的
Completion屬性為一個Task,它會在塊完成后(即處理完所有數據或進入錯誤狀態)完成。如果塊因為異常進入錯誤狀態,則該Task會以錯誤完成,拋出與塊中異常相同的異常。 - 我們可以通過
await block.Completion來捕獲異常。
6.2.2 Completion 屬性與錯誤傳播
每個數據流塊都有一個 Completion 屬性,這是一個 Task,表示塊的完成狀態。當塊中的所有數據處理完成時,Completion 任務也會完成。如果塊內部發生異常,Completion 任務會以錯誤結束,并拋出封裝了異常的 AggregateException。
示例:捕獲塊的異常
var block = new TransformBlock<int, int>(item =>
{
if (item == 1)
throw new InvalidOperationException("Data error.");
return item * 2;
});
block.Post(1); // 觸發異常
block.Post(2); // 數據不會被處理
try
{
// 等待塊完成,如果出錯,會捕獲異常
await block.Completion;
}
catch (Exception ex)
{
// 捕獲異常并處理
Console.WriteLine(ex.Message); // 輸出 "Data error."
}
關鍵點:
- 每個數據流塊都有一個
Completion任務,用于表示塊的完成狀態。 - 如果塊中出現異常,
Completion任務會以錯誤完成,并拋出異常。 - 異常不會立即拋出,而是通過
Completion任務進行傳播。
6.2.3 異常的傳播與 PropagateCompletion
在數據流管道中,異常可以通過 PropagateCompletion 選項傳播到下游的塊。PropagateCompletion 不僅傳播完成狀態,還會將異常封裝在 AggregateException 中,傳遞給下游塊。
示例:傳播異常
var multiplyBlock = new TransformBlock<int, int>(item =>
{
if (item == 1)
throw new InvalidOperationException("Multiply error.");
return item * 2;
});
var subtractBlock = new TransformBlock<int, int>(item => item - 2);
// 傳遞完成狀態和異常
multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true });
multiplyBlock.Post(1); // 觸發異常
try
{
// 等待 subtractBlock 完成
await subtractBlock.Completion;
}
catch (AggregateException ex)
{
Console.WriteLine("Caught propagated exception: " + ex.Flatten().Message);
}
解釋:
multiplyBlock中處理數據時拋出了InvalidOperationException。PropagateCompletion選項確保異常被傳遞到subtractBlock。- 下游塊
subtractBlock的Completion任務會捕獲并封裝異常,異常被包裝在AggregateException中。 Flatten()方法可以解除嵌套的AggregateException,便于處理原始異常。
6.2.4 AggregateException 與錯誤的多層傳播
當塊之間通過 LinkTo 關聯,并且開啟了 PropagateCompletion,異常會被逐層傳遞。每個塊都會將收到的異常封裝在 AggregateException 中,傳遞給下一個塊。如果管道較長,異常會被多次封裝,形成嵌套的 AggregateException。
示例:多層異常傳播
var block1 = new TransformBlock<int, int>(item =>
{
if (item == 1)
throw new InvalidOperationException("Block 1 error.");
return item;
});
var block2 = new TransformBlock<int, int>(item => item);
var block3 = new TransformBlock<int, int>(item => item);
// 傳遞完成狀態和異常
block1.LinkTo(block2, new DataflowLinkOptions { PropagateCompletion = true });
block2.LinkTo(block3, new DataflowLinkOptions { PropagateCompletion = true });
block1.Post(1); // 觸發異常
try
{
// 捕獲 block3 的異常
await block3.Completion;
}
catch (AggregateException ex)
{
var flattenedEx = ex.Flatten();
foreach (var innerEx in flattenedEx.InnerExceptions)
{
Console.WriteLine($"Caught exception: {innerEx.Message}");
}
}
解釋:
block1拋出了InvalidOperationException,并通過PropagateCompletion傳播到block2和block3。- 異常在每個塊中都會被封裝在
AggregateException中,最終在block3捕獲到嵌套的異常。 Flatten()方法用于將嵌套的AggregateException展平。
6.2.5 異常處理策略
在復雜的數據流管道中,當異常發生時,如何設計合理的異常處理機制至關重要。以下是幾種常見的異常處理策略:
-
全局捕獲異常:在數據流的終端(如最后一個塊)捕獲所有異常。這種方式適合簡單的線性管道,所有異常匯總到最后進行處理。
-
局部處理異常:在每個塊的
Completion任務中捕獲異常,允許對異常進行更細粒度的控制。這種方式適合復雜的網格結構,每個塊都可以獨立處理自己的錯誤。 -
繼續處理其他數據:如果希望在異常發生時,仍繼續處理其他數據,可以將異常視作另一種數據,傳遞到下一個塊進行處理。通過這種方式,數據流可以在出現錯誤時繼續工作,而不會因為單個異常停止整個數據流。
示例:將異常作為數據處理
var exceptionHandlingBlock = new TransformBlock<int, string>(item =>
{
try
{
if (item == 1)
throw new InvalidOperationException("Invalid data.");
return $"Processed {item}";
}
catch (Exception ex)
{
return $"Error: {ex.Message}";
}
});
exceptionHandlingBlock.Post(1); // 會觸發異常,但異常被捕獲并作為數據處理
exceptionHandlingBlock.Post(2); // 正常處理
Console.WriteLine(await exceptionHandlingBlock.ReceiveAsync()); // 輸出 "Error: Invalid data."
Console.WriteLine(await exceptionHandlingBlock.ReceiveAsync()); // 輸出 "Processed 2"
解釋:
- 異常被捕獲并作為字符串返回,數據流不會因為異常停止。
- 這種模式允許數據流在處理異常的同時繼續處理其他有效數據。
6.2.6 錯誤處理的設計考慮
在設計數據流時,錯誤處理需要根據業務需求進行精心設計。以下是一些設計考慮:
-
錯誤傳播的范圍:是否需要在整個數據流中傳播錯誤?還是只在某些特定塊中處理錯誤?
-
異常處理的時機:是在每個塊中局部處理錯誤,還是在數據流的終端集中處理所有錯誤?
-
錯誤與數據分離:是否需要將錯誤作為數據的一部分進行處理?這種方式適合需要在錯誤發生時繼續處理其他數據的場景。
-
錯誤的可見性:通過
Completion屬性可以獲得錯誤信息,但如果需要在塊處理數據時即時處理錯誤,可以考慮將異常捕獲在塊的邏輯中,并作為數據傳遞。
6.3 塊的解耦
在 TPL 數據流 中,塊之間通過 LinkTo 方法進行關聯,形成數據流管道或網格。通常情況下,關聯的塊在數據流管道中保持連接,直到整個數據流完成。然而,某些情況下我們可能需要在運行時將塊解耦,即動態地移除塊之間的連接,改變數據流的結構。這種解耦機制在處理動態數據流、負載平衡、錯誤恢復等場景非常有用。
6.3.1 使用 IDisposable 解耦塊
每當我們使用 LinkTo 將兩個塊關聯時,LinkTo 方法會返回一個 IDisposable 對象。這個返回對象代表塊之間的連接。調用 Dispose() 方法可以解耦塊,使得數據不再從源塊流向目標塊。
示例:塊的解耦
var sourceBlock = new BufferBlock<int>();
var targetBlock = new TransformBlock<int, int>(x => x * 2);
// 關聯塊
IDisposable link = sourceBlock.LinkTo(targetBlock);
// 發送數據到源塊
sourceBlock.Post(5);
// 獲取處理結果
Console.WriteLine(await targetBlock.ReceiveAsync()); // 輸出 10 (5 * 2)
// 解耦塊
link.Dispose();
// 發送新的數據,但不會傳遞到 targetBlock
sourceBlock.Post(6);
// 由于解耦,targetBlock 不再接收數據
bool received = await targetBlock.OutputAvailableAsync(TimeSpan.FromSeconds(1));
Console.WriteLine(received); // 輸出 False
解釋:
- 塊之間的連接通過
IDisposable對象表示。 - 當調用
Dispose()時,塊之間的連接斷開,后續數據不會再傳遞到目標塊。 - 在數據流管道中,解耦后,目標塊仍然可以繼續處理在解耦前接收到的數據。
6.3.2 解耦的線程安全性
TPL 數據流 設計時考慮了并發場景,因此 Dispose() 操作是線程安全的。即使在多個線程同時傳輸數據的情況下,調用 Dispose() 解耦塊不會引發競爭條件或數據丟失。
解耦操作并不立即影響已經在處理中的數據。塊之間的連接斷開后,已經傳遞給目標塊的所有數據仍會被正常處理,但新的數據將不再傳遞到目標塊。
示例:解耦后數據的處理
var sourceBlock = new BufferBlock<int>();
var targetBlock = new TransformBlock<int, int>(x => x * 2);
// 關聯數據流塊
IDisposable link = sourceBlock.LinkTo(targetBlock);
// 發送多個數據項
sourceBlock.Post(5);
sourceBlock.Post(6);
// 立即斷開連接
link.Dispose();
// 即使連接已斷開,targetBlock 仍會處理已經傳遞的數據
Console.WriteLine(await targetBlock.ReceiveAsync()); // 輸出 10 (5 * 2)
Console.WriteLine(await targetBlock.ReceiveAsync()); // 輸出 12 (6 * 2)
// 由于連接已斷開,targetBlock 不會再接收新的數據
bool received = await targetBlock.OutputAvailableAsync(TimeSpan.FromSeconds(1));
Console.WriteLine(received); // 輸出 False
解釋:
- 盡管調用了
Dispose(),但已經進入目標塊的數據仍會被處理。 - 解耦只影響后續傳輸的數據,而不會影響已經傳遞的數據。
6.3.3 動態調整數據流結構
解耦塊為我們提供了在運行時動態調整數據流結構的能力。通過這種機制,我們可以根據實時的業務需求,動態地添加或移除塊,改變數據流的拓撲結構。常見的應用場景包括:
- 負載平衡:當某個塊的負載過高時,可以臨時解耦某些下游塊,減輕負載。
- 錯誤恢復:如果某個塊發生異常或需要維護,可以將其解耦,避免影響整個數據流。
- 動態分支:在某些情況下,我們可能需要臨時將數據流的某個分支斷開,或重新連接到新的塊。
示例:動態調整數據流
var sourceBlock = new BufferBlock<int>();
var multiplyBlock = new TransformBlock<int, int>(x => x * 2);
var subtractBlock = new TransformBlock<int, int>(x => x - 2);
// 將 sourceBlock 連接到 multiplyBlock 和 subtractBlock
IDisposable multiplyLink = sourceBlock.LinkTo(multiplyBlock);
IDisposable subtractLink = sourceBlock.LinkTo(subtractBlock);
// 發送數據
sourceBlock.Post(5);
// 獲取 multiplyBlock 和 subtractBlock 的輸出
Console.WriteLine(await multiplyBlock.ReceiveAsync()); // 輸出 10 (5 * 2)
Console.WriteLine(await subtractBlock.ReceiveAsync()); // 輸出 3 (5 - 2)
// 動態調整數據流,斷開 subtractBlock
subtractLink.Dispose();
// 再次發送數據,只會傳遞到 multiplyBlock
sourceBlock.Post(6);
Console.WriteLine(await multiplyBlock.ReceiveAsync()); // 輸出 12 (6 * 2)
// subtractBlock 已解耦,不再接收數據
bool received = await subtractBlock.OutputAvailableAsync(TimeSpan.FromSeconds(1));
Console.WriteLine(received); // 輸出 False
解釋:
sourceBlock同時連接到multiplyBlock和subtractBlock,數據流在兩個方向上并行處理。- 在運行時,我們通過調用
Dispose()斷開了subtractBlock,此后數據只會傳遞到multiplyBlock。
6.3.4 條件性解耦
有時我們希望根據某些條件來動態地解耦塊。例如,當某個數據流塊處理的數據量超過一定閾值時,我們可能希望將其解耦,以避免進一步的過載。這種條件性解耦通常與監控或計數機制結合使用。
示例:基于條件的解耦
var sourceBlock = new BufferBlock<int>();
var targetBlock = new TransformBlock<int, int>(x => x * 2);
// 關聯塊
IDisposable link = sourceBlock.LinkTo(targetBlock);
// 模擬處理數據的計數器
int processedCount = 0;
for (int i = 0; i < 10; i++)
{
sourceBlock.Post(i);
processedCount++;
// 當處理的數量達到 5 時,解耦塊
if (processedCount == 5)
{
link.Dispose();
}
}
// 獲取已經處理的數據
while (await targetBlock.OutputAvailableAsync())
{
Console.WriteLine(await targetBlock.ReceiveAsync());
}
解釋:
- 當處理的數據量達到 5 時,我們調用
Dispose()來解耦塊,停止數據傳輸。 - 在解耦之前發送的數據仍會被正常處理,解耦后數據流停止傳輸。
6.3.5 多次解耦和重新鏈接
在某些動態場景中,塊可能需要多次解耦和重新鏈接。例如,某些塊需要根據負載或外部條件臨時斷開連接,當條件恢復時再重新連接。這種機制可以通過保存 IDisposable 對象的引用,并在需要時調用 Dispose() 或重新調用 LinkTo 方法來實現。
示例:多次解耦和重新鏈接
var sourceBlock = new BufferBlock<int>();
var targetBlock = new TransformBlock<int, int>(x => x * 2);
// 初次關聯塊
IDisposable link = sourceBlock.LinkTo(targetBlock);
// 發送數據
sourceBlock.Post(5);
Console.WriteLine(await targetBlock.ReceiveAsync()); // 輸出 10
// 解耦塊
link.Dispose();
// 重新鏈接塊
link = sourceBlock.LinkTo(targetBlock);
// 發送新的數據
sourceBlock.Post(6);
Console.WriteLine(await targetBlock.ReceiveAsync()); // 輸出 12
解釋:
- 塊可以多次解耦和重新鏈接。
- 當需要重新建立數據流時,只需再次調用
LinkTo方法。
6.3.6 解耦與完成狀態
需要注意的是,調用 Dispose() 解耦塊不會影響塊的完成狀態。也就是說,盡管塊之間的連接斷開了,但塊仍可以接收新的數據(除非調用 Complete())。如果需要停止整個數據流的處理,應該顯式調用塊的 Complete() 方法。
示例:解耦后完成數據流
var sourceBlock = new BufferBlock<int>();
var targetBlock = new TransformBlock<int, int>(x => x * 2);
// 關聯塊
IDisposable link = sourceBlock.LinkTo(targetBlock);
// 發送數據
sourceBlock.Post(5);
// 解耦塊
link.Dispose();
// 完成源塊
sourceBlock.Complete();
// 等待目標塊處理完剩余數據
await targetBlock.Completion;
Console.WriteLine("Dataflow completed.");
解釋:
- 調用
Dispose()只是解耦塊之間的連接,并不會影響塊的完成狀態。 - 如果需要停止整個數據流,必須顯式調用
Complete(),以標記數據流的結束。
6.4 塊的節流
在并發編程中,節流(Throttling)是一種控制資源使用和限制操作速率的技術,以防止系統過載。TPL 數據流 中的塊(Block)同樣可以進行節流,以防止數據流管道處理過多的數據,導致性能下降或資源耗盡。節流機制通過限制并發程度、控制數據流量或設置緩沖區大小,確保塊以可控的速度處理數據。
6.4.1 數據流塊的背壓機制
在 TPL 數據流 中,背壓(Backpressure)是節流的重要概念。當某些塊的處理速度較慢時,背壓會防止上游塊繼續發送數據,直到下游塊有足夠的緩沖空間來接收新數據。背壓機制能夠自動調節數據流速率,避免數據過載。
當數據流中的某個塊飽和(內部緩沖區已滿)時,它會對上游塊施加背壓,阻止上游塊再發送數據。這種機制可以保護整個數據流系統免于過負荷運行。
示例:背壓的簡單演示
var bufferBlock = new BufferBlock<int>();
var slowBlock = new TransformBlock<int, int>(async item =>
{
// 模擬慢速處理
await Task.Delay(1000);
return item * 2;
});
// 將 bufferBlock 連接到 slowBlock
bufferBlock.LinkTo(slowBlock);
// 發送多個數據到 bufferBlock
for (int i = 0; i < 5; i++)
{
bufferBlock.Post(i);
}
while (await slowBlock.OutputAvailableAsync())
{
Console.WriteLine(await slowBlock.ReceiveAsync());
}
解釋:
bufferBlock是一個簡單的緩沖塊,能夠存儲多個數據。slowBlock是一個處理速度較慢的塊,每次處理數據時會延遲 1 秒。- 盡管數據很快被發送到
bufferBlock,但由于slowBlock處理速度較慢,緩沖區會填滿,觸發背壓,限制新數據的發送。
6.4.2 控制塊的并發性
TPL 數據流 中的某些塊(如 TransformBlock 和 ActionBlock)允許通過設置并發級別來控制塊的并行處理能力。通過配置 ExecutionDataflowBlockOptions.MaxDegreeOfParallelism 參數,塊可以限制同時處理的任務數量,從而實現節流。
示例:限制并發性
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 2 // 限制并發任務數為 2
};
var block = new TransformBlock<int, int>(async item =>
{
// 模擬慢速處理
Console.WriteLine($"Processing {item}");
await Task.Delay(1000); // 每次處理需要 1 秒
return item * 2;
}, options);
// 發送多個數據
for (int i = 0; i < 5; i++)
{
block.Post(i);
}
// 打印處理結果
for (int i = 0; i < 5; i++)
{
Console.WriteLine(await block.ReceiveAsync());
}
解釋:
MaxDegreeOfParallelism = 2限制了同時執行的任務數為 2。- 盡管發送了 5 個數據,但只有 2 個數據會被并發處理,其余數據會排隊等待。
- 這種方式對塊的處理能力進行節流,防止過多并發任務壓垮系統。
6.4.3 控制塊的輸入緩沖大小
除了控制塊的并發性,TPL 數據流 中還可以通過設置輸入緩沖區大小來限制塊可以接收的最大數據量。通過 BoundedCapacity 參數,我們可以指定塊的最大緩沖容量。一旦塊的緩沖區達到上限,塊將對上游塊施加背壓,阻止其繼續發送數據。
示例:限制緩沖區大小
var options = new ExecutionDataflowBlockOptions
{
BoundedCapacity = 2 // 限制緩沖區容量為 2
};
var block = new TransformBlock<int, int>(item =>
{
Console.WriteLine($"Processing {item}");
return item * 2;
}, options);
// 發送數據到塊
for (int i = 0; i < 5; i++)
{
if (!block.Post(i))
{
Console.WriteLine($"Could not post {i}, buffer is full");
}
}
// 讀取處理結果
for (int i = 0; i < 5; i++)
{
Console.WriteLine(await block.ReceiveAsync());
}
解釋:
BoundedCapacity = 2限制了塊的輸入緩沖區只能存儲 2 個數據項。- 當緩沖區已滿時,塊會拒絕新的
Post()操作,并返回false,表示無法接收更多數據。 - 這種方式通過限制緩沖區大小來防止數據流管道過度擁塞。
6.4.4 使用 Throttle 模式進行節流
在某些場景中,我們可能希望通過顯式的節流策略來控制數據流的速度。例如,限制每秒處理的數據量,或在一定時間間隔內發送少量數據。我們可以通過自定義邏輯來實現這種時間控制的節流(Throttle)。
示例:基于時間的節流
var block = new TransformBlock<int, int>(item =>
{
Console.WriteLine($"Processing {item} at {DateTime.Now}");
return item * 2;
});
// 節流發送,限制每秒發送 1 個數據
for (int i = 0; i < 5; i++)
{
block.Post(i);
await Task.Delay(1000); // 每秒發送 1 個數據
}
// 讀取處理結果
for (int i = 0; i < 5; i++)
{
Console.WriteLine(await block.ReceiveAsync());
}
解釋:
- 通過
Task.Delay(1000),我們人為地控制數據流的發送速率,使得每秒鐘發送 1 個數據。 - 這種基于時間的節流模式適用于需要限制數據流速率的場景,例如 API 速率限制或定時任務。
6.4.5 合并多個節流策略
在實際應用中,塊的節流通常需要多種策略結合使用。例如,可以同時限制塊的并發性、緩沖區大小,并應用顯式的時間節流策略。通過這種組合,可以更精細地控制數據流的行為,確保系統在高負載下依然能夠穩定運行。
示例:合并并發性和緩沖區限制
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 3, // 限制最大并發處理數為 3
BoundedCapacity = 2 // 限制緩沖區大小為 2
};
var block = new TransformBlock<int, int>(async item =>
{
Console.WriteLine($"Processing {item} at {DateTime.Now}");
await Task.Delay(1000); // 模擬 1 秒的處理時間
return item * 2;
}, options);
// 發送多個數據到塊
for (int i = 0; i < 10; i++)
{
if (!block.Post(i))
{
Console.WriteLine($"Could not post {i}, buffer is full.");
}
}
// 讀取處理結果
for (int i = 0; i < 10; i++)
{
Console.WriteLine(await block.ReceiveAsync());
}
解釋:
MaxDegreeOfParallelism = 3限制了塊的最大并發處理能力,確保最多只能并發處理 3 個數據。BoundedCapacity = 2限制了塊的緩沖區大小,使得緩沖區只能存儲 2 個數據項。如果塊的緩沖區已滿,新數據將被拒絕。- 這種組合策略在數據流量較大時能夠有效防止系統過載。
6.4.6 節流策略的設計考慮
在設計數據流的節流策略時,需要綜合考慮以下幾個因素:
-
系統資源的限制:節流的主要目的是防止系統資源(如 CPU、內存、網絡帶寬等)被耗盡。因此,節流策略應根據系統的資源限制進行配置。
-
數據處理的優先級:某些數據可能具有較高的優先級,必須優先處理。節流策略應考慮不同的數據處理優先級,避免因節流導致高優先級數據的延遲。
-
負載的動態性:系統負載通常是動態變化的。節流策略應能夠根據負載情況動態調整,例如在負載較低時放寬節流限制,在負載較高時加強節流。
-
吞吐量與響應時間的平衡:節流策略往往需要在系統吞吐量和響應時間之間做出權衡。過度節流可能導致數據處理延遲,而節流不足則可能導致系統過載。
小結
- 節流是
TPL 數據流中的重要機制,通過限制并發性、控制緩沖區大小或顯式的時間控制,確保數據流管道在高負載的情況下依然能夠平穩運行。 - 背壓機制是數據流的自我調節方式,能夠自動控制數據的流動,防止下游塊過載。
- MaxDegreeOfParallelism 和 BoundedCapacity 是常用的節流參數,分別控制塊的并發能力和緩沖容量。
- 基于時間的節流模式可以用于限制數據流的發送速率,適用于需要顯式控制數據處理速度的場景。
6.5 塊的并行處理
在 TPL 數據流 中,塊的處理通常是獨立的。每個塊可以并行處理輸入數據,形成一種天然的并行機制。然而,默認情況下,塊的處理是串行的,即每次只能處理一個數據項。如果需要更高的并行度,尤其是在處理高強度 CPU 計算或 I/O 操作時,可以通過設置 MaxDegreeOfParallelism 選項來讓塊并行處理多個數據項。
6.5.1 MaxDegreeOfParallelism
MaxDegreeOfParallelism 是 ExecutionDataflowBlockOptions 中的一個重要參數,它決定了一個塊可以同時處理的任務數量。默認情況下,MaxDegreeOfParallelism 的值為 1,意味著塊一次只能處理一個數據項。如果將這個值設置為更高的數字,塊就可以并行處理多個數據項。
示例:讓塊并行處理輸入數據
var multiplyBlock = new TransformBlock<int, int>(
item =>
{
Console.WriteLine($"Processing {item} on thread {Thread.CurrentThread.ManagedThreadId}");
return item * 2;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4 // 允許最多4個并行任務
});
var subtractBlock = new TransformBlock<int, int>(item => item - 2);
// 將multiplyBlock連接到subtractBlock
multiplyBlock.LinkTo(subtractBlock);
// 向multiplyBlock發送數據
for (int i = 0; i < 10; i++)
{
multiplyBlock.Post(i);
}
// 讀取subtractBlock的輸出
for (int i = 0; i < 10; i++)
{
Console.WriteLine(await subtractBlock.ReceiveAsync());
}
解釋:
- 在這個例子中,
multiplyBlock的并行度被設置為4,因此它可以同時處理最多 4 個輸入數據項。 - 每個輸入項都會在不同的線程上并行處理,這是通過
MaxDegreeOfParallelism實現的。 - 輸出結果的順序可能與輸入順序不同,因為處理是并行的,線程調度的順序可能不一致。
6.5.2 并行處理的適用場景
并行處理適用于需要執行高強度計算或異步 I/O 操作的場景。例如,如果一個塊需要處理 CPU 密集型任務(如數據加密、解壓縮、復雜的數學計算等),或者執行異步操作(如訪問遠程 API、文件讀寫、數據庫操作等),通過增加并行度可以顯著提高數據流的處理性能。
CPU 密集型任務
對于 CPU 密集型任務,增加并行度能夠充分利用多核 CPU 的能力。這樣,多個任務可以同時在不同的核心上執行,減少任務的總執行時間。
異步 I/O 操作
對于異步 I/O 操作(如網絡請求、文件操作等),增加并行度可以提高系統吞吐量。由于 I/O 操作通常依賴外部資源(如磁盤、網絡),通過并發執行多個異步任務,可以更快地處理大量數據。
示例:異步任務的并行處理
var asyncBlock = new TransformBlock<int, int>(async item =>
{
Console.WriteLine($"Starting async processing for {item} on thread {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(1000); // 模擬異步操作
Console.WriteLine($"Finished async processing for {item}");
return item * 2;
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 3 // 同時處理3個異步任務
});
// 向asyncBlock發送數據
for (int i = 0; i < 5; i++)
{
asyncBlock.Post(i);
}
// 獲取處理結果
for (int i = 0; i < 5; i++)
{
Console.WriteLine($"Result: {await asyncBlock.ReceiveAsync()}");
}
解釋:
- 在這個示例中,
MaxDegreeOfParallelism設置為3,意味著塊可以同時處理 3 個異步任務。 - 每個任務會模擬 1 秒的異步操作,多個任務會并行執行。
6.5.3 BoundedCapacity 與 MaxDegreeOfParallelism 的配合使用
在并行處理的場景中,除了設置 MaxDegreeOfParallelism,還可以使用 BoundedCapacity 選項來控制塊的輸入緩沖區大小。BoundedCapacity 限制了塊可以接收的最大數據項數,當緩沖區滿時,上游塊會被背壓。
通過合理設置 MaxDegreeOfParallelism 和 BoundedCapacity,可以有效控制塊的并行處理能力和內存使用。
示例:配合使用 BoundedCapacity 和 MaxDegreeOfParallelism
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 3, // 最多并行處理3個任務
BoundedCapacity = 5 // 緩沖區最多存儲5個數據
};
var block = new TransformBlock<int, int>(async item =>
{
Console.WriteLine($"Processing {item} on thread {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(1000); // 模擬慢速處理
return item * 2;
}, options);
// 發送數據
for (int i = 0; i < 10; i++)
{
if (!block.Post(i))
{
Console.WriteLine($"Block is full, could not post {i}");
}
}
// 獲取處理結果
for (int i = 0; i < 10; i++)
{
Console.WriteLine($"Result: {await block.ReceiveAsync()}");
}
解釋:
MaxDegreeOfParallelism = 3:塊可以并行處理 3 個數據項。BoundedCapacity = 5:塊的緩沖區最多可以存儲 5 個數據項。如果緩沖區滿了,Post()操作會返回false,表明無法接收更多數據。- 當緩沖區滿時,塊會對上游塊施加背壓,限制上游塊繼續傳遞數據。
6.5.4 調試并行處理
在調試并行處理時,可以通過調試器查看塊中待處理數據的數量。如果某個塊的待處理數據項數量過多,可能意味著該塊成為了性能瓶頸。這時可以考慮增加 MaxDegreeOfParallelism 以提高并行處理能力,或者對數據流結構進行重構。
提示:
- 可以在調試器中暫停執行,查看塊的
InputCount屬性,它表示當前塊中待處理的數據項數量。 - 如果
InputCount非常高,說明塊的處理速度跟不上數據的產生速度,可能需要通過并行處理來改善。
6.5.5 異步塊的并行處理
對于異步操作,MaxDegreeOfParallelism 選項同樣適用。每當一個異步任務開始時,它會占用一個并行“槽”(slot)。任務完成后,槽會釋放,新的任務可以開始處理。
異步塊的并行處理可以顯著提高系統的吞吐量,特別是在執行 I/O 綁定操作時(例如 HTTP 請求、數據庫查詢等)。
示例:異步并行處理
var asyncBlock = new TransformBlock<int, int>(async item =>
{
Console.WriteLine($"Processing {item} on thread {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(1000); // 模擬異步操作
return item * 2;
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5 // 同時處理5個異步任務
});
// 向塊發送數據
for (int i = 0; i < 10; i++)
{
asyncBlock.Post(i);
}
// 讀取并輸出結果
for (int i = 0; i < 10; i++)
{
Console.WriteLine($"Result: {await asyncBlock.ReceiveAsync()}");
}
解釋:
- 該塊能夠同時處理 5 個異步任務。
- 異步任務會并發執行,并且每個任務都會在 1 秒后完成。
6.5.6 并行處理的設計考慮
并行處理雖然能提高性能,但并不是所有場景都適合并行化。在設計并行處理時,需要考慮以下因素:
-
任務的獨立性:
- 確保任務之間是獨立的,沒有競爭資源或依賴關系。如果多個任務并發執行時需要爭奪同一資源(如鎖定共享數據),并行化可能反而會導致性能下降。
-
上下游的平衡:
- 并行化某個塊的處理能力后,可能會導致下游塊成為系統的瓶頸。因此,整個數據流管道的并行處理能力需要平衡。
-
系統資源的限制:
- 并行處理會增加 CPU 和內存的使用量,尤其是在處理大量數據時。如果系統資源有限,過度并行化可能會導致資源競爭,反而降低系統的整體性能。
小結
MaxDegreeOfParallelism選項可以讓塊并行處理多個數據項,默認值為1(串行處理)。- 并行處理適用于高強度 CPU 計算或異步 I/O 操作的場景,能夠顯著提高系統吞吐量。
- 與
BoundedCapacity配合使用,可以控制塊的緩沖區大小,避免內存占用過多或系統過載。 - 在設計并行處理時,需考慮任務的獨立性、上下游塊的平衡,以及系統資源的限制。
6.6 創建自定義塊
在 TPL 數據流 中,可以通過組合多個塊來創建更復雜的自定義塊。通過封裝這些組合塊,可以將復雜的邏輯抽象為單一的塊,簡化數據流的設計和復用。為了方便創建自定義塊,TPL 數據流 提供了 DataflowBlock.Encapsulate 方法,它可以將一個輸入塊和一個輸出塊封裝成一個新的塊,并自動處理數據流的傳播和完成狀態的傳遞。
6.6.1 使用 DataflowBlock.Encapsulate 創建自定義塊
假設你有多個塊,它們執行不同的操作,例如乘法、加法、和除法。你希望將這些塊組合成一個自定義塊,將它們的復雜邏輯封裝起來,以后可以在數據流中像使用普通塊一樣使用這個自定義塊。
示例:創建自定義塊
IPropagatorBlock<int, int> CreateMyCustomBlock()
{
// 定義三個 TransformBlock,執行不同的計算
var multiplyBlock = new TransformBlock<int, int>(item => item * 2); // 乘以 2
var addBlock = new TransformBlock<int, int>(item => item + 2); // 加 2
var divideBlock = new TransformBlock<int, int>(item => item / 2); // 除以 2
// 設置 PropagateCompletion 為 true,以便完成狀態向下游傳播
var flowCompletion = new DataflowLinkOptions { PropagateCompletion = true };
// 將塊連接起來:multiplyBlock -> addBlock -> divideBlock
multiplyBlock.LinkTo(addBlock, flowCompletion);
addBlock.LinkTo(divideBlock, flowCompletion);
// 使用 Encapsulate 將第一個塊和最后一個塊封裝起來
return DataflowBlock.Encapsulate(multiplyBlock, divideBlock);
}
解釋:
multiplyBlock:將輸入數據乘以 2。addBlock:將數據加 2。divideBlock:將數據除以 2。- 使用
LinkTo方法將這些塊鏈接在一起,形成一個數據流處理管道。 DataflowBlock.Encapsulate方法接收兩個參數:輸入塊(multiplyBlock)和輸出塊(divideBlock),并將它們封裝成一個新的自定義塊。PropagateCompletion = true確保完成狀態從一個塊傳播到下游塊,這意味著當輸入塊完成時,整個自定義塊的處理會順利完成。
6.6.2 使用自定義塊
定義了自定義塊后,你可以像使用普通的塊一樣使用它。下面是如何使用這個自定義塊的示例。
var customBlock = CreateMyCustomBlock();
// 向自定義塊發送數據
for (int i = 0; i < 5; i++)
{
customBlock.Post(i);
}
// 標記自定義塊完成
customBlock.Complete();
// 從自定義塊讀取結果
while (await customBlock.OutputAvailableAsync())
{
Console.WriteLine(await customBlock.ReceiveAsync());
}
輸出示例:
1 -> (1 * 2 + 2) / 2 = 2
2 -> (2 * 2 + 2) / 2 = 3
3 -> (3 * 2 + 2) / 2 = 4
4 -> (4 * 2 + 2) / 2 = 5
5 -> (5 * 2 + 2) / 2 = 6
解釋:
customBlock.Post(i)向自定義塊發送數據。customBlock.Complete()標記輸入完成,最終會傳播到每個內部塊并完成整個處理管道。- 結果通過
customBlock.ReceiveAsync()從自定義塊中讀取,輸出的結果是輸入數據經過三個塊處理后的最終結果。
6.6.3 自定義塊的選項設計
當創建自定義塊時,應該考慮哪些選項需要暴露給用戶,哪些選項需要在內部塊上配置。例如,MaxDegreeOfParallelism、BoundedCapacity、EnsureOrdered 等數據流選項在某些自定義塊中可能很有用,而在另一些自定義塊中可能并不適用。
通常情況下,自定義塊的實現者可以根據需求設計適合的選項,而不是簡單地接受所有 DataflowBlockOptions 參數。例如,如果自定義塊內部包含多個塊,某些塊可能不支持并行處理,那么暴露 MaxDegreeOfParallelism 選項就沒有意義。
6.6.4 包含多個輸入或輸出的自定義塊
DataflowBlock.Encapsulate 只適用于將單個輸入塊和單個輸出塊封裝成一個自定義塊。如果需要創建包含多個輸入或多個輸出的自定義塊,可以將這些塊封裝在一個對象中,并通過 ITargetBlock<T> 和 ISourceBlock<T> 屬性分別暴露輸入和輸出。
示例:多個輸入或輸出的自定義塊
public class CustomMultiBlock
{
private TransformBlock<int, int> _multiplyBlock;
private TransformBlock<int, int> _addBlock;
public CustomMultiBlock()
{
_multiplyBlock = new TransformBlock<int, int>(item => item * 2);
_addBlock = new TransformBlock<int, int>(item => item + 2);
// 將兩個塊連接
_multiplyBlock.LinkTo(_addBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
// 暴露多個輸入和輸出
public ITargetBlock<int> Input => _multiplyBlock;
public ISourceBlock<int> Output => _addBlock;
}
使用示例:
var multiBlock = new CustomMultiBlock();
// 向多輸入塊發送數據
for (int i = 0; i < 5; i++)
{
multiBlock.Input.Post(i);
}
multiBlock.Input.Complete();
// 讀取輸出
while (await multiBlock.Output.OutputAvailableAsync())
{
Console.WriteLine(await multiBlock.Output.ReceiveAsync());
}
解釋:
CustomMultiBlock包含兩個塊,并通過Input和Output屬性暴露塊的輸入和輸出。- 這種方式適用于更復雜的自定義塊,特別是當塊可能包含多個輸入或輸出時。
6.6.5 討論
在創建自定義塊時,除了封裝多個塊并通過 Encapsulate 方法暴露輸入和輸出,還要考慮以下幾點:
-
選項的暴露:
- 自定義塊通常需要通過某種方式暴露配置選項(如并行度、容量限制等),但并不是所有選項都適合暴露。例如,某些塊可能不支持并行處理,因此
MaxDegreeOfParallelism選項對用戶來說就沒有意義。
- 自定義塊通常需要通過某種方式暴露配置選項(如并行度、容量限制等),但并不是所有選項都適合暴露。例如,某些塊可能不支持并行處理,因此
-
完成狀態的傳播:
- 完成狀態的傳播是數據流運行中的一個關鍵部分。確保所有封裝塊的完成狀態能夠正確地向下游傳播,避免出現部分塊未完成的情況。
- 在封裝多個塊時,通過
LinkTo的DataflowLinkOptions.PropagateCompletion參數可以確保完成狀態從上游塊向下游塊傳播。
-
多輸入和多輸出的自定義塊:
DataflowBlock.Encapsulate只支持封裝單輸入和單輸出的塊。如果需要創建多輸入或多輸出的自定義塊,應該將這些塊封裝在一個類中,并通過ITargetBlock<T>和ISourceBlock<T>屬性暴露輸入和輸出。
-
封裝復雜邏輯:
- 自定義塊可以封裝任意復雜的數據處理邏輯,包括組合多個塊、處理不同類型的數據等。這種靈活性使得
TPL 數據流能夠適應各種實際應用場景。
- 自定義塊可以封裝任意復雜的數據處理邏輯,包括組合多個塊、處理不同類型的數據等。這種靈活性使得

浙公網安備 33010602011771號