第九章:C#并發集合
第九章:并發集合
- 第九章:并發集合
- 簡介
- 9.1 不可變的棧和隊列
- 9.2 不可變列表 (
ImmutableList<T>) - 9.3 不可變
Set - 9.4 不可變字典 (
ImmutableDictionary<TKey, TValue>) - 9.5 并發字典 (
ConcurrentDictionary<TKey, TValue>) - 9.6 并發隊列(
Concurrent Queue<T>) - 9.7 并發棧(
Concurrent Stack<T>) - 9.8 并發背包(
ConcurrentBag<T>) - 9.9 阻塞隊列(
Blocking Queue<T>) - 9.10 阻塞棧與阻塞背包
- 9.11 異步隊列
- 9.12 節流隊列
- 9.13 采樣隊列
- 9.14 異步棧和異步背包
- 9.15 同步與異步混合隊列
在并發應用程序中,選擇合適的集合類型至關重要。本章將介紹一些專門為并發或異步場景設計的集合,幫助你在多線程環境中有效管理數據。
簡介
不可變集合
不可變集合是 只讀 的數據結構,任何修改操作都會返回新的集合實例,而不會改變現有的集合。這種設計不僅減少了內存浪費,還具有天然的線程安全性,因為不可變對象在多線程環境下無需加鎖。你可以通過 System.Collections.Immutable NuGet 包獲取這些集合,并在多線程或單線程應用中使用它們。不可變集合是未來開發中推薦的默認選擇,尤其在需要安全并發訪問時。
線程安全集合
這些集合允許多個線程同時修改數據,它們通過混合使用 細粒度鎖 和 無鎖技術 來最小化阻塞時間,甚至完全避免阻塞。線程安全集合的一個特點是,它們的枚舉操作會創建集合的快照,從而確保枚舉過程的安全。
生產者與消費者集合
這類集合專為 生產者-消費者 模式設計,允許多個生產者向集合中添加數據,多個消費者從集合中取出數據。它們支持 阻塞 和 異步 API,適用于不同的并發場景。例如,當集合為空時,阻塞集合會阻塞調用線程,而異步集合則允許線程異步等待,直至有數據可用。
生產者-消費者集合類型
本章將介紹多種生產者-消費者集合,每種集合適用于不同的并發需求:
- 通道(
Channel):支持隊列語義和異步 API,適用于大多數并發場景。 BlockingCollection<T>:提供阻塞 API,適合同步的生產者-消費者模型。BufferBlock<T>:基于數據流模型,適用于異步的場景。AsyncProducerConsumerQueue<T>和AsyncCollection<T>:支持異步操作,但適用場景較為特殊。
這些集合都可以通過相關的 NuGet 包獲取,例如 System.Threading.Channels 和 System.Threading.Tasks.Dataflow。
9.1 不可變的棧和隊列
問題
假設你需要一個 多線程安全 的棧或隊列,這些集合不會頻繁改變,但可以安全地被多個線程讀取和操作。例如,隊列可以用于管理待處理任務,??梢杂糜诠芾沓蜂N操作。
解決方案
不可變棧 和 不可變隊列 是最簡單的不可變集合。它們的行為類似于標準的 Stack<T> 和 Queue<T>,但每次修改后都會生成一個新的集合實例,相當于是一個快照,保留原來的集合不變。由于不可變集合的特性,它們天然是線程安全的。
不可變棧(ImmutableStack<T>)
棧是 先進后出(LIFO)的數據結構。每次 Push 操作會創建一個新的棧實例,保留舊的棧不變。以下是不可變棧的示例:
ImmutableStack<int> stack = ImmutableStack<int>.Empty;
stack = stack.Push(13);
stack = stack.Push(7); // 棧中現在是 [7, 13]
// 枚舉棧元素,顯示順序 [7, 13]
foreach (int item in stack)
Trace.WriteLine(item);
int lastItem;
stack = stack.Pop(out lastItem); // 彈出棧頂的7,棧中剩下 [13]
每次修改(如 Push 和 Pop 操作),都會返回一個新的棧實例,而原始棧保持不變。因此,不同的棧實例可以共享相同的內存部分。例如:
ImmutableStack<int> stack = ImmutableStack<int>.Empty;
stack = stack.Push(13);
ImmutableStack<int> biggerStack = stack.Push(7);
// biggerStack 包含 [7, 13],而 stack 仍然是 [13]
foreach (int item in biggerStack)
Trace.WriteLine(item); // 輸出 7, 13
foreach (int item in stack)
Trace.WriteLine(item); // 輸出 13
這種共享內存的機制使得不可變棧非常高效,尤其在需要保存多個狀態快照時。
不可變隊列(ImmutableQueue<T>)
與棧類似,不可變隊列是 先進先出(FIFO)的數據結構。每次 Enqueue 操作會生成一個新的隊列實例,而舊的隊列保持不變。以下是不可變隊列的示例:
ImmutableQueue<int> queue = ImmutableQueue<int>.Empty;
queue = queue.Enqueue(13);
queue = queue.Enqueue(7); // 隊列中現在是 [13, 7]
// 枚舉隊列元素,顯示順序 [13, 7]
foreach (int item in queue)
Trace.WriteLine(item);
queue = queue.Dequeue(out int firstItem); // 彈出隊首的13,隊列中剩下 [7]
討論
- 不可變棧和隊列 適合需要多線程訪問且修改較少的場景。因為它們是不可變的,每個實例都是線程安全的。
- 修改操作(如
Push、Pop和Enqueue)都會返回新的實例,原有集合保持不變,可以輕松創建數據的快照。 - 共享內存的機制提高了內存使用效率,尤其適合需要頻繁保存狀態的場景。
不可變集合不僅適用于并發編程場景,也同樣適合單線程應用,尤其在函數式編程風格或需要頻繁存儲快照的情況下。
9.2 不可變列表 (ImmutableList<T>)
問題
當需要一個支持索引訪問、且不會頻繁修改的數據結構時,不可變列表(ImmutableList<T>)是一個合適的選擇。它可以安全地被多個線程讀取和操作,但需要注意其性能特性。
解決方案
ImmutableList<T> 提供類似于 List<T> 的方法,如 Insert、RemoveAt 和 Index 操作,但其表現背后基于樹形結構,允許盡可能多的內存共享。以下是一個使用不可變列表的示例:
ImmutableList<int> list = ImmutableList<int>.Empty;
list = list.Insert(0, 13);
list = list.Insert(0, 7); // 在13之前插入7
foreach (int item in list)
Trace.WriteLine(item); // 輸出 7, 13
list = list.RemoveAt(1); // 移除索引1的元素,剩下 [7]
性能差異
ImmutableList<T> 與 List<T> 在某些操作上的性能差異顯著。以下是常見操作的復雜度對比:
| 操作 | List<T> |
ImmutableList<T> |
|---|---|---|
| 添加 | 均攤 O(1) | O(log N) |
| 插入 | O(N) | O(log N) |
| 移除 | O(N) | O(log N) |
| 索引訪問 | O(1) | O(log N) |
特別地,ImmutableList<T> 的索引操作是 O(log N),而不是 List<T> 的 O(1)。因此,遍歷時應盡量使用 foreach 而非 for,以避免性能問題:
// 推薦的遍歷方式
foreach (var item in list)
Trace.WriteLine(item);
// 遍歷效率較低
for (int i = 0; i < list.Count; i++)
Trace.WriteLine(list[i]);
討論
ImmutableList<T> 是一個強大的數據結構,尤其適合需要多線程安全和低頻修改的場景。然而,由于其性能特性,不能盲目替換所有的 List<T>。在大多數情況下,List<T> 仍是默認選擇,除非明確需要不可變集合。
ImmutableList<T> 及其他不可變集合可以通過 System.Collections.Immutable NuGet 包獲取。
9.3 不可變 Set
問題
在某些場景下,我們需要一個不會存儲重復項、且可以安全地被多個線程讀取的數據結構。例如,索引文件中的單詞集合是一個典型的用例。為了滿足這些需求,可以使用不可變 Set,這種集合不會頻繁變化,并且能確保線程安全。
解決方案
在 .NET 中,有兩種主要的不可變 Set 實現:
ImmutableHashSet<T>:一個無序的唯一項集合。ImmutableSortedSet<T>:一個通過比較器排序的唯一項集合。
它們提供類似的接口,但在元素存儲上有所不同。
ImmutableHashSet<T>
ImmutableHashSet<T> 是一個無序的集合,不保證元素的順序,但確保不存儲重復項。以下是一個示例:
ImmutableHashSet<int> hashSet = ImmutableHashSet<int>.Empty;
hashSet = hashSet.Add(13);
hashSet = hashSet.Add(7); // 以不可預知的順序顯示7和13
foreach (int item in hashSet)
Trace.WriteLine(item); // 輸出 7, 13 或 13, 7
hashSet = hashSet.Remove(7); // 移除元素7
ImmutableSortedSet<T>
ImmutableSortedSet<T> 是一個有序集合,元素按照某種排序規則排列,可以通過索引訪問最小或最大元素。示例如下:
ImmutableSortedSet<int> sortedSet = ImmutableSortedSet<int>.Empty;
sortedSet = sortedSet.Add(13);
sortedSet = sortedSet.Add(7); // 7 在 13 之前
foreach (int item in sortedSet)
Trace.WriteLine(item); // 輸出 7, 13
int smallestItem = sortedSet[0]; // smallestItem == 7
sortedSet = sortedSet.Remove(7); // 移除元素7
性能差異
盡管 ImmutableHashSet<T> 和 ImmutableSortedSet<T> 的結構不同,其大多數操作的時間復雜度相似。以下是兩者的典型性能對比:
| 操作 | ImmutableHashSet<T> |
ImmutableSortedSet<T> |
|---|---|---|
| 添加 | O(log N) | O(log N) |
| 移除 | O(log N) | O(log N) |
| 索引訪問 | 不適用 | O(log N) |
對于大多數應用,如果不需要元素排序,建議優先選擇 ImmutableHashSet<T>,因為它適用于更多類型。ImmutableSortedSet<T> 需要類型支持比較器,且索引訪問的效率較低(O(log N)),這意味著在遍歷時應盡量使用 foreach 而非 for 循環。
討論
雖然不可變 Set 是線程安全且實用的數據結構,但對于大規模數據的填充,性能可能較慢。為優化性能,可以先使用可變集合進行批量操作,最后轉換為不可變集合。許多不可變集合,包括 ImmutableHashSet<T> 和 ImmutableSortedSet<T>,都提供了這種方式的構造器。
你可以通過 System.Collections.Immutable NuGet 包獲取這些不可變集合。
9.4 不可變字典 (ImmutableDictionary<TKey, TValue>)
問題
在某些場景下,需要一個鍵–值對集合,它不會頻繁更改,并且能夠安全地被多個線程訪問。例如,引用數據可以存儲在這樣的集合中,供不同線程在不加鎖的情況下讀取。
解決方案
ImmutableDictionary<TKey, TValue> 和 ImmutableSortedDictionary<TKey, TValue> 是兩種不可變字典:
ImmutableDictionary<TKey, TValue>:無序字典,鍵–值對的順序不可預知。ImmutableSortedDictionary<TKey, TValue>:有序字典,鍵–值對按鍵排序。
它們的接口非常相似。以下是使用 ImmutableDictionary 的示例:
ImmutableDictionary<int, string> dictionary = ImmutableDictionary<int, string>.Empty;
dictionary = dictionary.Add(10, "Ten");
dictionary = dictionary.Add(21, "Twenty-One");
dictionary = dictionary.SetItem(10, "Diez"); // 更新鍵10的值
foreach (KeyValuePair<int, string> item in dictionary)
Trace.WriteLine($"{item.Key}: {item.Value}"); // 輸出 "10: Diez" 和 "21: Twenty-One"
string ten = dictionary[10]; // ten == "Diez"
dictionary = dictionary.Remove(21); // 移除鍵21
使用 ImmutableSortedDictionary 時,鍵會按順序排列:
ImmutableSortedDictionary<int, string> sortedDictionary = ImmutableSortedDictionary<int, string>.Empty;
sortedDictionary = sortedDictionary.Add(10, "Ten");
sortedDictionary = sortedDictionary.Add(21, "Twenty-One");
sortedDictionary = sortedDictionary.SetItem(10, "Diez");
foreach (KeyValuePair<int, string> item in sortedDictionary)
Trace.WriteLine($"{item.Key}: {item.Value}"); // 先輸出 "10: Diez",再輸出 "21: Twenty-One"
string ten = sortedDictionary[10]; // ten == "Diez"
sortedDictionary = sortedDictionary.Remove(21); // 移除鍵21
性能對比
ImmutableDictionary 和 ImmutableSortedDictionary 的性能相似,通常的操作耗時如下表所示:
| 操作 | ImmutableDictionary<TKey, TValue> |
ImmutableSortedDictionary<TKey, TValue> |
|---|---|---|
| 添加 | O(log N) | O(log N) |
| 設置項 | O(log N) | O(log N) |
| 訪問項 | O(log N) | O(log N) |
| 移除 | O(log N) | O(log N) |
盡管性能相近,除非明確需要元素排序,通常推薦使用無序的 ImmutableDictionary,因為它可以應用于更多類型的鍵,并且整體運行得更快。
討論
字典是一種常用的工具,尤其在應用程序狀態管理和查找場景中。與其他不可變集合類似,不可變字典也提供了高效的構造器,可以在需要初始化大量元素時快速構建字典。如果字典中的數據是在應用程序啟動時加載的,通常應使用構造器來構建初始字典;如果數據是逐步構建的,Add 方法則更合適。
你可以通過 System.Collections.Immutable NuGet 包獲取 ImmutableDictionary<TKey, TValue> 和 ImmutableSortedDictionary<TKey, TValue>。
9.5 并發字典 (ConcurrentDictionary<TKey, TValue>)
ConcurrentDictionary<TKey, TValue> 是 C# 中一種線程安全的字典集合,位于 System.Collections.Concurrent 命名空間中。它是為了解決多線程場景下的字典操作而設計的,提供高效且線程安全的字典操作,無需顯式使用鎖 (lock)。
使用場景
- 多線程或異步環境中,多個線程需要共享并訪問相同的字典數據。
- 需要在并發操作下維護字典的正確性和一致性。
- 替代傳統的字典 (
Dictionary<TKey, TValue>) 在并發場景下避免手動使用鎖,提高性能和簡化代碼。
代碼示例
以下示例展示了 ConcurrentDictionary<TKey, TValue> 的常見用法:
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
class Program
{
static void Main()
{
var dict = new ConcurrentDictionary<string, int>();
// 添加元素
dict.TryAdd("key1", 1);
dict.TryAdd("key2", 2);
// 更新元素:如果鍵存在則更新,否則添加
dict.AddOrUpdate("key1", 10, (key, oldValue) => oldValue + 10);
// 獲取元素
if (dict.TryGetValue("key1", out int value))
{
Console.WriteLine($"key1 的值為:{value}");
}
// 并行操作
Parallel.For(0, 1000, i =>
{
dict.AddOrUpdate("counter", 1, (key, oldValue) => oldValue + 1);
});
Console.WriteLine($"并行更新后的 counter 值為:{dict["counter"]}");
// 移除元素
dict.TryRemove("key2", out int removedValue);
Console.WriteLine($"已移除的 key2 值為:{removedValue}");
}
}
輸出:
key1 的值為:11
并行更新后的 counter 值為:1000
已移除的 key2 值為:2
背后原理
ConcurrentDictionary<TKey, TValue> 使用了一種 分段鎖定 (Lock Stripping) 的策略來實現線程安全性。它將內部存儲劃分為多個獨立的桶 (bucket),每個桶可以獨立加鎖,從而減少了鎖沖突,提高了并發性能。
具體來說:
-
分段鎖定:
ConcurrentDictionary會根據鍵的哈希值將其映射到不同的分段中,不同的分段互不影響。- 當對字典進行寫操作(例如
AddOrUpdate或TryRemove)時,只會鎖定涉及的分段,而不是整個字典。
-
樂觀并發控制:
- 在某些情況下,
ConcurrentDictionary使用了 樂觀并發控制,例如在讀取數據時盡量避免加鎖,而是先嘗試讀取,然后在發現沖突時才加鎖重試。
- 在某些情況下,
-
線程安全的操作:
ConcurrentDictionary提供了多種原子操作,如AddOrUpdate、TryAdd、TryRemove和TryGetValue,確保在并發情況下不會發生競態條件。
常用方法
| 方法 | 說明 |
|---|---|
TryAdd(key, value) |
嘗試添加鍵值對,如果鍵已存在則返回 false。 |
AddOrUpdate(key, addValue, updateValueFactory) |
如果鍵存在則更新值,不存在則添加。 |
TryGetValue(key, out value) |
嘗試獲取鍵對應的值,獲取成功返回 true。 |
TryRemove(key, out value) |
嘗試移除鍵值對,移除成功返回 true。 |
ContainsKey(key) |
檢查字典中是否包含指定的鍵。 |
Count |
獲取字典中鍵值對的數量。 |
Clear() |
清空字典中的所有元素。 |
最佳實踐
-
避免過度競爭:
- 盡量減少字典操作的頻率,尤其是在高并發場景下。雖然
ConcurrentDictionary是線程安全的,但過多的寫操作仍會引起性能瓶頸。
- 盡量減少字典操作的頻率,尤其是在高并發場景下。雖然
-
選擇合適的數據結構:
- 在單線程環境下,使用普通的
Dictionary<TKey, TValue>會有更好的性能。 - 在大多數并發讀多于寫的情況下,
ConcurrentDictionary的性能優于手動加鎖的字典。
- 在單線程環境下,使用普通的
-
利用原子操作:
- 使用
AddOrUpdate和GetOrAdd等原子操作,而不是先檢查再更新或添加,避免競態條件。
- 使用
與其他線程安全集合的比較
| 集合類型 | 線程安全性 | 使用場景 |
|---|---|---|
Dictionary<TKey, TValue> |
非線程安全 | 單線程或手動加鎖控制的場景。 |
ConcurrentDictionary<TKey, TValue> |
線程安全 | 多線程并發訪問,特別是需要頻繁的讀寫操作。 |
ImmutableDictionary<TKey, TValue> |
線程安全(不可變) | 多線程讀多于寫,不需要修改字典內容的場景。 |
9.6 并發隊列(Concurrent Queue<T>)
在現代并發編程中,隊列是一種常用的數據結構,它以先進先出(FIFO)的方式處理數據。標準的 Queue<T> 在多線程環境下并不安全,C# 提供了 ConcurrentQueue<T> 作為線程安全的替代品,能夠在高并發環境下進行無鎖(lock-free)操作。
使用場景
- 日志記錄系統:日志通常是以隊列的形式存儲,多個線程可能同時記錄日志,而日志處理線程則依次從隊列中讀取并保存。
- 任務調度器:在任務調度中,多個生產者線程可以將任務排入隊列,而工作線程則依次處理這些任務。
- 事件處理系統:多個事件源可能同時產生事件,事件處理器從隊列中逐一獲取并處理這些事件。
代碼示例
下面的示例展示了如何使用 ConcurrentQueue<T> 實現簡單的多線程任務隊列。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
class Program
{
private static readonly ConcurrentQueue<int> _queue = new ConcurrentQueue<int>();
static async Task Main()
{
// 啟動生產者任務
var producerTask = Task.Run(() => Producer());
// 啟動消費者任務
var consumerTask1 = Task.Run(() => Consumer("消費者1"));
var consumerTask2 = Task.Run(() => Consumer("消費者2"));
await Task.WhenAll(producerTask, consumerTask1, consumerTask2);
Console.WriteLine("所有任務已完成");
}
// 生產者線程
static void Producer()
{
for (int i = 0; i < 10; i++)
{
_queue.Enqueue(i);
Console.WriteLine($"生產者添加:{i}");
Thread.Sleep(200); // 模擬生產過程的延遲
}
}
// 消費者線程
static void Consumer(string name)
{
while (true)
{
if (_queue.TryDequeue(out int item))
{
Console.WriteLine($"{name} 消費了:{item}");
Thread.Sleep(300); // 模擬處理過程的延遲
}
else
{
break;
}
}
Console.WriteLine($"{name} 處理完畢");
}
}
輸出示例:
生產者添加:0
消費者1 消費了:0
生產者添加:1
消費者2 消費了:1
生產者添加:2
消費者1 消費了:2
生產者添加:3
消費者2 消費了:3
...
消費者1 處理完畢
消費者2 處理完畢
所有任務已完成
背后原理
-
無鎖設計:
ConcurrentQueue<T>是基于無鎖算法實現的,內部使用了比較并交換(CAS, Compare-And-Swap)操作。這種設計可以在高并發環境下避免鎖的開銷,提高性能。ConcurrentQueue<T>使用鏈表結構作為底層存儲,支持高效的入隊和出隊操作。
-
線程安全性:
ConcurrentQueue<T>的所有公共方法都是線程安全的。例如,Enqueue方法可以被多個線程同時調用,不會產生數據競態問題。- 讀取和修改操作是原子的,即操作之間不會互相干擾,保證了數據一致性。
常用方法
| 方法 | 說明 |
|---|---|
Enqueue(item) |
將元素添加到隊列末尾。 |
TryDequeue(out T result) |
嘗試從隊列開頭移除并返回一個元素,如果隊列為空則返回 false。 |
TryPeek(out T result) |
嘗試查看隊列開頭的元素而不移除它,如果隊列為空則返回 false。 |
IsEmpty |
檢查隊列是否為空。 |
Count |
返回隊列中的元素數量(注意:在高并發下,Count 可能不準確,僅用于參考)。 |
最佳實踐
-
避免在高并發環境下頻繁調用
Count:Count方法會遍歷隊列來計算元素數量,因此在高并發情況下性能較低,盡量避免頻繁調用。- 可以通過
IsEmpty來檢查隊列是否為空,而不是使用Count == 0。
-
優先使用
TryDequeue和TryPeek:- 這些方法都是線程安全的,不會拋出異常,也不會阻塞線程。
- 在并發環境中,使用這些非阻塞方法可以避免不必要的鎖等待。
-
適用于無需嚴格順序保證的場景:
ConcurrentQueue<T>是無鎖設計,適用于高吞吐量的場景,但不適合需要嚴格順序控制的場景。
-
避免在 UI 線程中直接訪問:
- 在 UI 應用程序中,避免在 UI 線程上直接調用可能阻塞的操作,尤其是在高并發下頻繁入隊和出隊的場景。
總結
ConcurrentQueue<T> 是 C# 中常用的并發集合,提供了無鎖的隊列操作,能夠在高并發環境下高效、安全地執行入隊和出隊操作。它適用于生產者-消費者模式,但需要注意在高并發情況下避免頻繁調用 Count 方法。在更復雜的場景下,如需要阻塞行為或異步操作,可以考慮使用 BlockingCollection<T> 或 TPL 數據流 (BufferBlock<T>) 作為替代方案。
9.7 并發棧(Concurrent Stack<T>)
ConcurrentStack<T> 是 C# 中的一種線程安全的棧結構,遵循后進先出(LIFO)的原則,適用于高并發環境下的快速入棧和出棧操作。
使用場景
- 任務回滾:需要按逆序撤銷操作時,將任務入棧以便后續回退。
- 遞歸問題迭代化:在某些算法中,棧結構可以代替遞歸實現。
- 高并發緩存:存儲和快速回收對象實例。
代碼示例
以下示例展示多個線程如何安全地使用 ConcurrentStack<T>。
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
class Program
{
private static readonly ConcurrentStack<int> _stack = new ConcurrentStack<int>();
static async Task Main()
{
// 生產者任務:入棧
var producer = Task.Run(() =>
{
for (int i = 0; i < 5; i++)
{
_stack.Push(i);
Console.WriteLine($"生產者入棧:{i}");
}
});
// 消費者任務:出棧
var consumer = Task.Run(() =>
{
while (_stack.TryPop(out int item))
{
Console.WriteLine($"消費者出棧:{item}");
}
});
await Task.WhenAll(producer, consumer);
Console.WriteLine("任務完成");
}
}
主要方法
| 方法 | 說明 |
|---|---|
Push(item) |
將元素壓入棧頂。 |
TryPop(out T) |
嘗試從棧頂移除并返回元素,若為空則返回 false。 |
TryPeek(out T) |
查看棧頂元素而不移除,若為空則返回 false。 |
PushRange(items) |
批量入棧。 |
TryPopRange() |
批量出棧,返回移除的元素數量。 |
背后原理
- 無鎖實現:
ConcurrentStack<T>基于無鎖算法設計,使用Interlocked和 CAS(比較并交換)確保線程安全。 - 鏈表結構:內部采用鏈表存儲,支持高效的入棧和出棧操作。
最佳實踐
- 適合無序任務:使用
ConcurrentStack<T>時,不需要關心元素的順序(除了 LIFO 的操作順序)。 - 批量操作優化性能:對大量數據使用
PushRange或TryPopRange可以顯著提高性能。 - 避免頻繁調用
Count:Count的計算可能在高并發下開銷較大,不推薦用于實時判斷集合大小。
總結
ConcurrentStack<T> 提供了線程安全的 LIFO 操作,適用于需要按逆序處理任務的場景。它簡單高效,特別適合任務撤銷或遞歸替代問題。在需要更復雜的行為(如阻塞、容量限制等)時,可以選擇其他并發集合,如 BlockingCollection<T>。
9.8 并發背包(ConcurrentBag<T>)
ConcurrentBag<T> 是 C# 提供的線程安全集合,適用于高并發場景下的無序數據存取,允許多個線程同時添加和移除元素。
使用場景
- 對象池:用于存儲重復使用的對象,避免頻繁的分配和回收。
- 無序任務分配:當任務處理順序不重要時,多個線程可以并行從集合中取任務。
- 高并發數據處理:適用于并發情況下不關心元素順序的任務,如計數器匯總等。
代碼示例
以下示例展示了多個線程如何安全地在 ConcurrentBag<T> 中并發添加和移除元素。
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
class Program
{
private static readonly ConcurrentBag<int> _bag = new ConcurrentBag<int>();
static async Task Main()
{
// 生產者任務:向背包中添加元素
var producer = Task.Run(() =>
{
for (int i = 0; i < 5; i++)
{
_bag.Add(i);
Console.WriteLine($"生產者添加:{i}");
}
});
// 消費者任務:從背包中取出元素
var consumer = Task.Run(() =>
{
while (!_bag.IsEmpty)
{
if (_bag.TryTake(out int item))
{
Console.WriteLine($"消費者取出:{item}");
}
}
});
await Task.WhenAll(producer, consumer);
Console.WriteLine("任務完成");
}
}
主要方法
| 方法 | 說明 |
|---|---|
Add(item) |
向背包中添加一個元素。 |
TryTake(out T) |
嘗試從背包中取出一個元素,若背包為空返回 false。 |
TryPeek(out T) |
查看一個元素但不移除,若背包為空返回 false。 |
ToArray() |
返回背包中的所有元素組成的數組。 |
IsEmpty |
檢查背包是否為空。 |
背后原理
- 無序存儲:
ConcurrentBag<T>不保證元素的順序,多個線程可以同時添加和移除元素,但移除的順序是不可預測的。 - 桶式設計:內部采用多個桶(bucket)來存儲數據,減少鎖的爭用,提高并發性能。
- 高效的并發訪問:通過線程局部存儲(TLS)和分段鎖優化,避免頻繁的全局鎖爭用,提升性能。
最佳實踐
- 避免依賴順序:
ConcurrentBag<T>適合無序數據的場景。若順序重要,考慮使用其他并發集合。 - 適用于大量臨時數據:對于短生命周期且無序的數據,
ConcurrentBag<T>是一個高效選擇。 - 適用于工作竊取模式:如果任務的順序不重要,可以讓多個消費者線程從同一
ConcurrentBag<T>中工作竊取任務。
總結
ConcurrentBag<T> 是一種高效的線程安全集合,適用于無序任務分配和對象池等場景。它支持并發的元素添加和移除,且通過內部分段優化提高性能。在處理不關心順序的任務時,ConcurrentBag<T> 是理想的選擇,但對于需要順序或限制容量的場景,建議選擇其他并發集合。
9.9 阻塞隊列(Blocking Queue<T>)
在并發編程中,阻塞隊列是一種用于在生產者線程和消費者線程之間安全傳遞數據的集合。
BlockingCollection<T> 是 .NET 提供的通用阻塞集合,在默認情況下是包裝的ConcurrentQueue<T>來充當線程安全的阻塞隊列,但它可以通過包裝不同的線程安全集合(如 ConcurrentStack<T> 或 ConcurrentBag<T>),實現后進先出(LIFO)的棧行為或無序存儲的背包行為。BlockingCollection<T>主要用于解決生產者-消費者問題。
使用場景
- 在多線程程序中,需要在不同線程之間傳遞數據。
- 生產者線程不斷產生數據,而消費者線程不斷處理數據。
- 需要在生產者速度和消費者速度不匹配時進行調節和控制。
- 當線程池或后臺線程負責處理數據時,阻塞隊列可以有效避免數據丟失和線程競爭。
代碼示例
下面的示例展示了使用 BlockingCollection<T> 實現生產者-消費者模型:
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
class Program
{
private static readonly BlockingCollection<int> _blockingQueue = new BlockingCollection<int>();
static async Task Main()
{
// 啟動生產者線程
var producerTask = Task.Run(() => Producer());
// 啟動消費者線程
var consumerTask = Task.Run(() => Consumer());
await Task.WhenAll(producerTask, consumerTask);
Console.WriteLine("所有任務已完成");
}
// 生產者線程
static void Producer()
{
for (int i = 0; i < 5; i++)
{
Console.WriteLine($"生產數據:{i}");
_blockingQueue.Add(i);
Thread.Sleep(500); // 模擬生產過程的延遲
}
// 標記生產完成
_blockingQueue.CompleteAdding();
Console.WriteLine("生產者完成所有數據的添加");
}
// 消費者線程
static void Consumer()
{
foreach (var item in _blockingQueue.GetConsumingEnumerable())
{
Console.WriteLine($"消費數據:{item}");
Thread.Sleep(1000); // 模擬處理過程的延遲
}
Console.WriteLine("消費者完成所有數據的處理");
}
}
輸出示例:
生產數據:0
消費數據:0
生產數據:1
消費數據:1
生產數據:2
消費數據:2
生產數據:3
消費數據:3
生產數據:4
消費數據:4
生產者完成所有數據的添加
消費者完成所有數據的處理
所有任務已完成
背后原理
-
線程安全性:
BlockingCollection<T>是基于IProducerConsumerCollection<T>接口實現的,并且默認使用ConcurrentQueue<T>作為內部存儲容器。- 它使用了內部鎖或信號量(SemaphoreSlim)來保證線程安全,使得多個線程能夠同時訪問集合而不會出現競態條件。
-
阻塞機制:
BlockingCollection<T>在獲取或添加元素時,如果隊列為空或達到容量上限,它會自動阻塞當前線程,直到有數據可供消費或有空間可供添加為止。
-
并發控制:
BlockingCollection<T>支持容量限制,可以通過構造函數指定最大容量。這種方式可以在生產者生產過快時進行節流,防止隊列無限增長導致內存溢出。
常用方法
| 方法 | 說明 |
|---|---|
Add(item) |
向集合中添加元素,如果達到容量上限會阻塞。 |
Take() |
從集合中移除并返回一個元素,如果集合為空會阻塞。 |
TryTake(out T) |
非阻塞地嘗試獲取一個元素,若無元素返回 false。 |
CompleteAdding() |
標記集合不再接受新元素,消費者線程會收到結束信號。 |
GetConsumingEnumerable() |
返回可枚舉的集合,消費者線程可以使用此方法在數據消費完畢后自動退出循環。 |
IsCompleted |
檢查集合是否已完成添加且沒有可消費的元素。 |
最佳實踐
-
優先使用
GetConsumingEnumerable():GetConsumingEnumerable()能自動處理阻塞和結束信號,代碼更為簡潔和安全,避免手動處理循環和異常。
-
適當設置容量限制:
- 使用構造函數
BlockingCollection<T>(boundedCapacity)設置容量上限,防止生產者過快生產導致內存問題。
var boundedQueue = new BlockingCollection<int>(100); // 容量上限為 100 - 使用構造函數
-
在適當時使用
CompleteAdding():- 當生產者線程完成數據生產后,調用
CompleteAdding()通知消費者不會再有新數據,避免消費者線程無限等待。
- 當生產者線程完成數據生產后,調用
-
避免在 UI 線程中使用阻塞操作:
- 在 UI 應用程序中使用時,避免直接調用阻塞方法(如
Add和Take),否則可能導致 UI 卡頓。可以使用Task或async/await實現異步操作。
- 在 UI 應用程序中使用時,避免直接調用阻塞方法(如
總結
BlockingCollection<T> 是一種適合在多線程環境下使用的阻塞隊列,能夠簡化生產者-消費者模型的實現。它不僅線程安全,還支持容量限制和自動阻塞機制,適用于后臺線程和線程池場景。然而,在需要異步操作或復雜數據流的場合,TPL 數據流 (BufferBlock<T>) 或異步生產者-消費者隊列 (AsyncProducerConsumerQueue<T>) 可能是更好的選擇。
9.10 阻塞棧與阻塞背包
BlockingCollection<T> 是 .NET 提供的通用阻塞集合,在默認情況下充當線程安全的阻塞隊列,但它可以通過包裝不同的線程安全集合(如 ConcurrentStack<T> 或 ConcurrentBag<T>),實現后進先出(LIFO)的棧行為或無序存儲的背包行為。
使用場景
-
阻塞棧(LIFO):
適用于后進先出的任務處理場景,如撤銷操作隊列、遞歸任務處理等。 -
阻塞背包(無序):
適用于無序任務分配,如對象池、無序的并發數據處理。
代碼示例
以下示例展示了阻塞棧和阻塞背包的基本用法:
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading.Tasks;
class Program
{
private static readonly BlockingCollection<int> _blockingStack =
new BlockingCollection<int>(new ConcurrentStack<int>());
private static readonly BlockingCollection<int> _blockingBag =
new BlockingCollection<int>(new ConcurrentBag<int>());
static async Task Main()
{
// 阻塞棧的生產者與消費者
var stackProducer = Task.Run(() =>
{
_blockingStack.Add(1);
_blockingStack.Add(2);
_blockingStack.CompleteAdding(); // 標記為完成
});
var stackConsumer = Task.Run(() =>
{
foreach (var item in _blockingStack.GetConsumingEnumerable())
{
Trace.WriteLine($"阻塞棧消費者處理:{item}");
}
});
await Task.WhenAll(stackProducer, stackConsumer);
// 阻塞背包的生產者與消費者
var bagProducer = Task.Run(() =>
{
_blockingBag.Add(3);
_blockingBag.Add(4);
_blockingBag.CompleteAdding();
});
var bagConsumer = Task.Run(() =>
{
foreach (var item in _blockingBag.GetConsumingEnumerable())
{
Trace.WriteLine($"阻塞背包消費者處理:{item}");
}
});
await Task.WhenAll(bagProducer, bagConsumer);
}
}
阻塞棧與背包的關鍵特性
-
阻塞行為:
當消費者嘗試獲取項時,若集合為空,會阻塞直到有項可用,或者集合標記為完成。 -
包裝線程安全集合:
- 阻塞棧:通過
ConcurrentStack<T>實現 LIFO 行為。 - 阻塞背包:通過
ConcurrentBag<T>實現無序存儲。
- 阻塞棧:通過
-
競爭條件的影響:
當生產者和消費者并發運行時,項的順序可能與預期略有不同,例如消費者可能立即獲取最新項,而不是等待CompleteAdding。
主要方法
| 方法 | 說明 |
|---|---|
Add(item) |
添加元素到集合。 |
CompleteAdding() |
標記集合不再接受新元素。 |
GetConsumingEnumerable() |
以阻塞方式逐個獲取集合中的元素。 |
TryTake(out T) |
非阻塞地嘗試獲取一個元素,若無元素返回 false。 |
注意事項
-
節流支持:
可以通過設置BlockingCollection<T>的容量限制內存使用,防止生產者過快填充。 -
生產者與消費者平衡:
在多線程場景中,確保生產者和消費者處理能力相匹配,避免積壓。 -
替代方案:
對于需要異步操作的場景,考慮使用BufferBlock<T>或其他異步隊列工具。
總結
BlockingCollection<T>是一種多功能阻塞集合,適用于需要線程安全數據傳遞的場景。- 阻塞棧和阻塞背包擴展了
BlockingCollection<T>的適用范圍,分別提供了 LIFO 和無序的存取特性。 - 在復雜并發場景中,配合容量限制與消費者模式,能夠有效提高程序的穩定性和吞吐量。
9.11 異步隊列
異步隊列是一種支持異步生產者-消費者模式的數據結構,適用于需要在不同代碼片段間傳遞數據且避免線程阻塞的場景,例如,數據加載任務在后臺進行時,主線程(如 UI 線程)需要異步地顯示數據,而不能因為等待數據而被阻塞。。
使用場景
- 異步數據流:在不阻塞線程的情況下實現先進先出的異步數據傳遞。
- 跨線程數據共享:在生產者和消費者間傳遞數據,例如后臺任務生成數據,主線程更新 UI。
- 高性能高并發:適用于高吞吐量的異步數據生產與消費場景。
實現方案
核心 .NET 框架沒有原生的異步隊列類型,但可以通過外部庫來實現異步 API 的隊列。這些庫不僅允許生產者和消費者以非阻塞的方式交互,還提供了高性能和高容量的解決方案。
以下是常用的三種異步隊列實現方式:
1. Channels 庫
System.Threading.Channels 是一種現代化的異步生產者-消費者實現,支持高容量場景并具有高性能。核心概念是 Channel<T>,由生產者通過 WriteAsync 寫入數據,消費者通過 ReadAllAsync 異步讀取數據。
代碼示例:
using System;
using System.Diagnostics;
using System.Threading.Channels;
using System.Threading.Tasks;
class Program
{
static async Task Main()
{
var channel = Channel.CreateUnbounded<int>(); // 創建無界通道
// 生產者
var producer = Task.Run(async () =>
{
var writer = channel.Writer;
await writer.WriteAsync(7);
await writer.WriteAsync(13);
writer.Complete(); // 通知通道生產完成
});
// 消費者
var consumer = Task.Run(async () =>
{
var reader = channel.Reader;
await foreach (var item in reader.ReadAllAsync())
{
Trace.WriteLine($"消費:{item}");
}
});
await Task.WhenAll(producer, consumer);
}
}
優點:
- 支持異步流(
ReadAllAsync)。 - 提供多種節流和采樣策略。
- 高效、靈活,適合現代 .NET 平臺。
在 較舊的平臺 上,異步流可能不受支持,消費者代碼可以使用 WaitToReadAsync 和 TryRead 來替代異步流:
ChannelReader<int> reader = queue.Reader;
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out int value))
{
Trace.WriteLine(value); // 輸出 7 和 13
}
}
WaitToReadAsync:異步等待通道中有可讀取的項。TryRead:嘗試讀取項,成功時返回true,否則返回false。
2. BufferBlock(TPL 數據流)
BufferBlock<T> 是 TPL 數據流的一部分,提供類似隊列的行為。生產者使用 SendAsync 發送數據,消費者通過 ReceiveAsync 接收數據。
代碼示例:
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
static async Task Main()
{
var buffer = new BufferBlock<int>(); // 創建 BufferBlock
// 生產者
var producer = Task.Run(async () =>
{
await buffer.SendAsync(7);
await buffer.SendAsync(13);
buffer.Complete(); // 標記完成
});
// 消費者
var consumer = Task.Run(async () =>
{
while (await buffer.OutputAvailableAsync())
{
var item = await buffer.ReceiveAsync();
Trace.WriteLine($"消費:{item}");
}
});
await Task.WhenAll(producer, consumer);
}
}
SendAsync:異步地將數據發送到隊列。OutputAvailableAsync:異步地檢查隊列中是否有可用的數據。ReceiveAsync:異步地從隊列中獲取數據。
注意:當有多個消費者時,OutputAvailableAsync 可能會為多個消費者返回 true,即使隊列中只有一個可用項。因此,如果有多個消費者,建議使用如下代碼模式來處理可能的異常:
while (true)
{
try
{
int item = await _asyncQueue.ReceiveAsync();
Trace.WriteLine(item); // 輸出隊列中的數據
}
catch (InvalidOperationException)
{
break; // 隊列已完成,退出循環
}
}
3. AsyncProducerConsumerQueue
Nito.AsyncEx 提供了 AsyncProducerConsumerQueue<T>,類似 BufferBlock<T>,但適合使用該庫的項目。
代碼示例:
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using Nito.AsyncEx;
class Program
{
static async Task Main()
{
var queue = new AsyncProducerConsumerQueue<int>(); // 創建隊列
// 生產者
var producer = Task.Run(async () =>
{
await queue.EnqueueAsync(7);
await queue.EnqueueAsync(13);
queue.CompleteAdding(); // 標記完成
});
// 消費者
var consumer = Task.Run(async () =>
{
while (await queue.OutputAvailableAsync())
{
var item = await queue.DequeueAsync();
Trace.WriteLine($"消費:{item}");
}
});
await Task.WhenAll(producer, consumer);
}
}
EnqueueAsync:異步地將數據添加到隊列。DequeueAsync:異步地從隊列中獲取數據。OutputAvailableAsync:異步地檢查隊列中是否有可用數據。
與 BufferBlock<T> 類似,如果有多個消費者,代碼結構可以如下:
while (true)
{
try
{
int item = await _asyncQueue.DequeueAsync();
Trace.WriteLine(item); // 輸出隊列中的數據
}
catch (InvalidOperationException)
{
break; // 隊列已完成,退出循環
}
}
對比
| 特性 | Channels | BufferBlock |
AsyncProducerConsumerQueue |
|---|---|---|---|
| 性能 | 高 | 中 | 中 |
| 靈活性 | 高 | 中 | 低 |
| 支持異步流 | 是 | 否 | 否 |
| 適用場景 | 高吞吐量場景 | 數據流管道處理 | 簡單生產者-消費者模式 |
| 庫來源 | System.Threading.Channels | System.Threading.Tasks.Dataflow | Nito.AsyncEx |
選擇
-
Channels 庫:這是官方推薦的方案,適合高性能、高容量的異步生產者-消費者模式。支持異步流(
IAsyncEnumerable),其代碼更為簡潔自然,特別是對于較新版本的 .NET 平臺。 -
BufferBlock<T>(TPL 數據流):提供類似的功能,適合已有TPL 數據流經驗的開發者。它的 API 稍微復雜一些,特別是在處理多個消費者時。 -
AsyncProducerConsumerQueue<T>(AsyncEx 庫):與BufferBlock<T>類似,API 設計更加簡潔,但功能上基本一致。適合需要簡化異步隊列實現的場景。
9.12 節流隊列
節流隊列通過限制隊列的最大容量,防止生產者速度超出消費者處理能力,從而避免內存過度使用。它通過對生產者施加“背壓”機制,確保系統運行在可控的資源范圍內。
使用場景
- 生產者快于消費者:例如,數據生成器快于處理器,可能導致內存占用過高。
- 跨環境兼容:在硬件性能未知的環境或云實例中運行時,需確保生產與消費的平衡。
- 避免資源爭搶:在高并發系統中限制資源使用,避免系統因超載而崩潰。
實現方案
1. Channels (限界通道)
通過 Channel.CreateBounded<T> 創建限制容量的通道,異步對生產者進行節流。
代碼示例:
using System;
using System.Diagnostics;
using System.Threading.Channels;
using System.Threading.Tasks;
class Program
{
static async Task Main()
{
var queue = Channel.CreateBounded<int>(1); // 限制隊列容量為 1
// 生產者
var producer = Task.Run(async () =>
{
var writer = queue.Writer;
await writer.WriteAsync(7); // 成功寫入
await writer.WriteAsync(13); // 等待 7 被消費后寫入
writer.Complete();
});
// 消費者
var consumer = Task.Run(async () =>
{
var reader = queue.Reader;
await foreach (var item in reader.ReadAllAsync())
{
Trace.WriteLine($"消費:{item}");
await Task.Delay(500); // 模擬處理時間
}
});
await Task.WhenAll(producer, consumer);
}
}
2. BufferBlock(TPL 數據流)
通過設置 BoundedCapacity,限制隊列容量。
代碼示例:
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
static async Task Main()
{
var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 1 });
// 生產者
var producer = Task.Run(async () =>
{
await queue.SendAsync(7); // 成功發送
await queue.SendAsync(13); // 等待 7 被消費后發送
queue.Complete();
});
// 消費者
var consumer = Task.Run(async () =>
{
while (await queue.OutputAvailableAsync())
{
var item = await queue.ReceiveAsync();
Trace.WriteLine($"消費:{item}");
await Task.Delay(500); // 模擬處理時間
}
});
await Task.WhenAll(producer, consumer);
}
}
3. AsyncProducerConsumerQueue
Nito.AsyncEx 提供的 AsyncProducerConsumerQueue<T> 也支持節流,通過 maxCount 參數限制隊列大小。
代碼示例:
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using Nito.AsyncEx;
class Program
{
static async Task Main()
{
var queue = new AsyncProducerConsumerQueue<int>(maxCount: 1); // 限制隊列容量為 1
// 生產者
var producer = Task.Run(async () =>
{
await queue.EnqueueAsync(7); // 成功入隊
await queue.EnqueueAsync(13); // 等待 7 被消費后入隊
queue.CompleteAdding();
});
// 消費者
var consumer = Task.Run(async () =>
{
while (await queue.OutputAvailableAsync())
{
var item = await queue.DequeueAsync();
Trace.WriteLine($"消費:{item}");
await Task.Delay(500); // 模擬處理時間
}
});
await Task.WhenAll(producer, consumer);
}
}
4. BlockingCollection
BlockingCollection<T> 是線程安全的集合,通過 boundedCapacity 參數實現節流。
代碼示例:
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading.Tasks;
class Program
{
static async Task Main()
{
var queue = new BlockingCollection<int>(boundedCapacity: 1); // 限制容量為 1
// 生產者
var producer = Task.Run(() =>
{
queue.Add(7); // 成功添加
queue.Add(13); // 等待 7 被消費后添加
queue.CompleteAdding();
});
// 消費者
var consumer = Task.Run(() =>
{
foreach (var item in queue.GetConsumingEnumerable())
{
Trace.WriteLine($"消費:{item}");
Task.Delay(500).Wait(); // 模擬處理時間
}
});
await Task.WhenAll(producer, consumer);
}
}
最佳實踐
-
適配異步 API:
- 優先使用
Channel或BufferBlock<T>,便于與異步操作集成。 - 對于傳統同步場景,
BlockingCollection<T>是一個簡單且可靠的選擇。
- 優先使用
-
限制容量:
- 根據系統負載合理設置隊列容量,避免不必要的內存消耗。
- 生產者和消費者速度不可控時,節流是必需的。
-
考慮環境變化:
- 在云實例或資源有限的設備上,節流機制可以避免程序崩潰或性能下降。
-
采樣與節流結合:
- 如果并不需要處理所有項,可以結合采樣策略(詳見下一節)進一步優化。
9.13 采樣隊列
采樣隊列通過限制隊列容量并對超出范圍的項進行丟棄,從而過濾不必要的隊列項。這種方式在生產者速度快于消費者且不需要保留所有數據的場景中尤為有效。
使用場景
- 高頻輸入:例如,從實時傳感器或日志數據中采樣關鍵數據點。
- 消費者性能受限:消費者無法處理生產的全部數據,只需處理最新或關鍵數據。
- 降低資源使用:避免因為處理所有項而導致內存過度消耗。
實現方案
1. 使用 Channels 采樣隊列
通過 BoundedChannelFullMode 設置通道的滿載行為,常見選項包括:
DropOldest:當隊列滿員時丟棄最老的項,保留最新項。DropWrite:當隊列滿員時丟棄新寫入的項,保留最老項。
代碼示例:
using System;
using System.Diagnostics;
using System.Threading.Channels;
using System.Threading.Tasks;
class Program
{
static async Task Main()
{
// 丟棄最老的項(默認保留最新的項)
var queue = Channel.CreateBounded<int>(new BoundedChannelOptions(1)
{
FullMode = BoundedChannelFullMode.DropOldest
});
var writer = queue.Writer;
// 生產者
var producer = Task.Run(async () =>
{
await writer.WriteAsync(7); // 添加 7
await writer.WriteAsync(13); // 隊列滿員,7 被丟棄
await writer.WriteAsync(42); // 隊列滿員,13 被丟棄
writer.Complete();
});
// 消費者
var consumer = Task.Run(async () =>
{
var reader = queue.Reader;
await foreach (var item in reader.ReadAllAsync())
{
Trace.WriteLine($"消費:{item}"); // 只消費最新的項:42
}
});
await Task.WhenAll(producer, consumer);
}
}
2. 基于時間的采樣(System.Reactive)
使用 System.Reactive 提供的時間運算符來限制數據流速率,例如“每秒最多處理10項”。
代碼示例:
using System;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Threading.Tasks;
class Program
{
static async Task Main()
{
var observable = Observable.Interval(TimeSpan.FromMilliseconds(100)) // 模擬高頻生產
.Select(x => (int)x) // 轉換為整型
.Sample(TimeSpan.FromSeconds(1)); // 每秒只保留最新項
var subscription = observable.Subscribe(item =>
{
Trace.WriteLine($"消費:{item}");
});
await Task.Delay(5000); // 運行 5 秒
subscription.Dispose();
}
}
3. 自定義采樣邏輯
更復雜的采樣需求(如基于權重或特定條件)可在消費者代碼中實現。例如,消費者只處理偶數項:
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading.Tasks;
class Program
{
static async Task Main()
{
var queue = new BlockingCollection<int>(boundedCapacity: 10); // 無需特殊采樣支持
// 生產者
var producer = Task.Run(() =>
{
for (int i = 0; i < 100; i++) queue.Add(i);
queue.CompleteAdding();
});
// 消費者
var consumer = Task.Run(() =>
{
foreach (var item in queue.GetConsumingEnumerable())
{
if (item % 2 == 0) // 自定義采樣:僅保留偶數項
{
Trace.WriteLine($"消費:{item}");
}
}
});
await Task.WhenAll(producer, consumer);
}
}
最佳實踐
-
優先使用
DropOldest模式:- 在高頻場景下,丟棄最老的項通常是最符合需求的選擇,保留最新數據以供消費。
-
基于時間采樣優選
System.Reactive:- 適用于需要對數據流按時間窗口限速的場景,如日志采樣或實時監控數據處理。
-
避免數據丟失:
- 如果所有數據均不可丟棄,采樣可能不合適,應改用節流隊列(參見前節)。
-
根據場景優化性能:
- 對于低延遲要求,可結合采樣策略與輕量級的數據過濾邏輯。
-
測試不同采樣策略:
- 根據生產和消費模式,嘗試不同的
BoundedChannelFullMode或時間窗口大小,優化數據流的吞吐與延遲。
- 根據生產和消費模式,嘗試不同的
9.14 異步棧和異步背包
問題
異步棧(LIFO,后進先出)和異步背包(無序集合)是 Nito.AsyncEx 庫中提供的功能,允許開發者在并發環境下使用異步集合,而不局限于隊列的先進先出(FIFO)行為。
解決方案
Nito.AsyncEx 庫提供了 AsyncCollection<T> 類型,它支持異步的生產者-消費者模式。AsyncCollection<T> 默認行為類似異步隊列,但可以通過傳入不同的集合類型來實現異步棧或異步背包,與BlockingCollection<T>是一個道理,可以說AsyncCollection<T> 是 BlockingCollection<T> 的異步版本。
- 異步棧:使用
ConcurrentStack<T>實現后進先出(LIFO)。 - 異步背包:使用
ConcurrentBag<T>實現無序行為。
示例代碼
使用 AsyncCollection<T> 實現異步棧和異步背包
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading.Tasks;
using Nito.AsyncEx;
class Program
{
static async Task Main()
{
// 創建異步棧(LIFO行為)
var asyncStack = new AsyncCollection<int>(new ConcurrentStack<int>());
// 創建異步背包(無序行為)
var asyncBag = new AsyncCollection<int>(new ConcurrentBag<int>());
// 示例:異步棧生產與消費
await AsyncStackExample(asyncStack);
// 示例:異步背包生產與消費
await AsyncBagExample(asyncBag);
}
private static async Task AsyncStackExample(AsyncCollection<int> asyncStack)
{
// 生產者
var producer = Task.Run(async () =>
{
await asyncStack.AddAsync(7);
await asyncStack.AddAsync(13);
asyncStack.CompleteAdding();
});
// 消費者
var consumer = Task.Run(async () =>
{
while (await asyncStack.OutputAvailableAsync())
{
int item = await asyncStack.TakeAsync();
Trace.WriteLine($"消費(棧):{item}");
}
});
await Task.WhenAll(producer, consumer);
}
private static async Task AsyncBagExample(AsyncCollection<int> asyncBag)
{
// 生產者
var producer = Task.Run(async () =>
{
await asyncBag.AddAsync(7);
await asyncBag.AddAsync(13);
asyncBag.CompleteAdding();
});
// 消費者
var consumer = Task.Run(async () =>
{
while (await asyncBag.OutputAvailableAsync())
{
int item = await asyncBag.TakeAsync();
Trace.WriteLine($"消費(背包):{item}");
}
});
await Task.WhenAll(producer, consumer);
}
}
注意事項
- 對于異步棧來說,如果生產者在消費者啟動前完成,棧會按預期后進先出的順序工作(先添加的后取出)。但如果生產者和消費者并發運行,消費者總是會優先獲取最近添加的項,因此行為可能與普通棧略有不同。
- 異步背包沒有順序,消費者獲取數據的順序是無序的。
支持節流
AsyncCollection<T> 支持節流,可以通過設置 maxCount 參數來限制集合的最大容量。當集合滿時,生產者會異步等待,直到有空間。
節流示例
var _asyncStack = new AsyncCollection<int>(new ConcurrentStack<int>(), maxCount: 1);
// 生產者代碼
await _asyncStack.AddAsync(7); // 立即完成
await _asyncStack.AddAsync(13); // 等待7被移除后才會添加13
_asyncStack.CompleteAdding();
多消費者處理
在多消費者場景下,推薦使用如下模式來處理可能的異常:
while (true)
{
try
{
int item = await _asyncStack.TakeAsync();
Trace.WriteLine(item);
}
catch (InvalidOperationException)
{
break; // 集合已完成,退出循環
}
}
最佳實踐
-
選擇適合的集合類型:
- 使用
ConcurrentStack<int>構造異步棧。 - 使用
ConcurrentBag<int>構造異步背包。
- 使用
-
考慮并發影響:
- 在高并發場景中,異步棧的行為可能偏離嚴格的 LIFO,需評估是否符合業務需求。
-
優先考慮節流:
- 即使消費者速度通常較快,建議添加合理的容量限制,確保應用程序能夠在不同硬件或高負載場景下運行。
-
多消費者場景:
- 若多個消費者需要讀取異步棧/背包,需注意分配邏輯,以防止數據遺漏或重復處理。
9.15 同步與異步混合隊列
問題
在某些情況下,你可能需要一個隊列,既能同步地處理生產者端或消費者端,也能異步地處理另一端。比如,后臺線程需要同步阻塞地推送數據,而 UI 線程則需要異步地從隊列中拉取數據,以保持響應性。
解決方案
可以使用支持同步和異步 API 的隊列類型,如 BufferBlock<T>、ActionBlock<T>,或 AsyncProducerConsumerQueue<T>。
BufferBlock<T>
BufferBlock<T> 是 TPL 數據流(System.Threading.Tasks.Dataflow)的一部分,既支持同步方法,也支持異步方法。
示例
異步生產者與異步消費者:
var queue = new BufferBlock<int>();
// 異步生產者代碼
await queue.SendAsync(7);
await queue.SendAsync(13);
queue.Complete();
// 異步消費者代碼
while (await queue.OutputAvailableAsync())
Trace.WriteLine(await queue.ReceiveAsync());
同步生產者與同步消費者:
var queue = new BufferBlock<int>();
// 同步生產者代碼
queue.Post(7);
queue.Post(13);
queue.Complete();
// 同步消費者代碼
while (true)
{
try
{
int item = queue.Receive();
Trace.WriteLine(item);
}
catch (InvalidOperationException)
{
break; // 隊列已完成
}
}
ActionBlock<T>
ActionBlock<T> 是 TPL 數據流 的另一種塊結構,適合定義響應式消費者。它也支持同步和異步的生產者。
示例
消費者代碼:
var queue = new ActionBlock<int>(item => Trace.WriteLine(item));
// 異步生產者代碼
await queue.SendAsync(7);
await queue.SendAsync(13);
// 同步生產者代碼
queue.Post(7);
queue.Post(13);
queue.Complete();
AsyncProducerConsumerQueue<T>
Nito.AsyncEx 提供了 AsyncProducerConsumerQueue<T>,一個支持同步和異步 API 的生產者-消費者隊列。
示例
異步生產者與異步消費者:
var queue = new AsyncProducerConsumerQueue<int>();
// 異步生產者代碼
await queue.EnqueueAsync(7);
await queue.EnqueueAsync(13);
queue.CompleteAdding();
// 異步消費者代碼
while (await queue.OutputAvailableAsync())
Trace.WriteLine(await queue.DequeueAsync());
同步生產者與同步消費者:
var queue = new AsyncProducerConsumerQueue<int>();
// 同步生產者代碼
queue.Enqueue(7);
queue.Enqueue(13);
queue.CompleteAdding();
// 同步消費者代碼
foreach (int item in queue.GetConsumingEnumerable())
Trace.WriteLine(item);
Channel<T>
雖然 Channel<T> 的 API 是異步的,但你可以通過 Task.Run 包裝同步代碼,來強制同步生產和消費。
示例
var queue = Channel.CreateBounded<int>(10);
// 同步生產者代碼(通過 Task.Run)
Task.Run(async () => {
await queue.Writer.WriteAsync(7);
await queue.Writer.WriteAsync(13);
queue.Writer.Complete();
}).GetAwaiter().GetResult();
// 同步消費者代碼(通過 Task.Run)
Task.Run(async () => {
while (await queue.Reader.WaitToReadAsync())
{
while (queue.Reader.TryRead(out int value))
Trace.WriteLine(value);
}
}).GetAwaiter().GetResult();
討論
- 推薦使用
BufferBlock<T>或ActionBlock<T>,它們是TPL 數據流的一部分,經過廣泛測試和優化。 - 如果你的項目已經使用了
Nito.AsyncEx庫,AsyncProducerConsumerQueue<T>也是一個不錯的選擇。 Channel<T>本質上是異步的,但可以通過Task.Run包裝同步代碼來工作。

浙公網安備 33010602011771號