第十二章:C#多線程同步
第十二章:C#多線程同步
12.1 簡介
在并發編程中,同步的核心任務是協調多個代碼段對共享資源的訪問,確保數據的一致性和完整性。隨著.NET平臺對并發編程的支持日益增強,大部分場景已經通過高效的庫或框架實現了隱式同步。然而,理解同步的基礎仍然是編寫健壯代碼的關鍵。
為什么需要同步?
同步通常用來解決以下問題:
- 數據保護:當多個代碼段并發運行且共享可變數據時,確保數據的一致性。
- 交互協調:在一個代碼段需要通知另一個代碼段某個事件或狀態變化時,確保信息傳遞的正確性。
只有代碼同時滿足以下三個條件,才需要同步機制:
- 并發執行:代碼段在不同線程或任務中存在并行運行。
- 共享數據:這些代碼段訪問同一個數據對象。
- 數據修改:至少有一個代碼段會更新(寫入)共享數據。
什么時候不需要同步?
并非所有的并發操作都需要同步,以下是常見的例外場景:
- 沒有并發執行:如果代碼是按順序運行的,雖然可能是異步的,但只要沒有同時執行的情況,就不需要同步。
- 獨立數據:每個線程或任務使用獨立的數據副本,彼此沒有交集。
- 只讀數據:即使數據是共享的,但沒有代碼修改它(例如,不可變類型)。
示例代碼說明這一點:
async Task ExampleAsync()
{
int value = 10;
await Task.Delay(1000);
value += 1;
await Task.Delay(1000);
value -= 1;
Console.WriteLine(value);
}
即使是異步代碼,但因為是順序執行,并沒有并發訪問,同一時間訪問value的線程或任務只有一個,雖然可能不是同一個,所以無需同步。
同步的常見應用
- 保護共享數據
當多個任務并發修改同一變量時,需要通過同步保證數據更新的正確性,以下代碼通過 Task.Run 啟動了并行任務,但它們共享同一個變量 value。
private int value;
async Task<int> SimpleParallelismAsync()
{
int value = 0;
Task task1 = Task.Run(() => { value = value + 1; });
Task task2 = Task.Run(() => { value = value + 1; });
Task task3 = Task.Run(() => { value = value + 1; });
await Task.WhenAll(task1, task2, task3);
return value;
}
- 線程安全集合
如 ConcurrentDictionary 等線程安全集合內置了同步機制,在并發場景中可以直接使用:
async Task<int> UseThreadSafeCollectionAsync()
{
var dictionary = new ConcurrentDictionary<int, int>();
Task task1 = Task.Run(() => dictionary.TryAdd(1, 10));
Task task2 = Task.Run(() => dictionary.TryAdd(2, 20));
Task task3 = Task.Run(() => dictionary.TryAdd(3, 30));
await Task.WhenAll(task1, task2, task3);
return dictionary.Count; // 始終返回 3
}
- 不可變數據結構
不可變集合(如 ImmutableStack)通過設計避免了同步問題,因為每次更新都會生成新的集合:
async Task<bool> UseImmutableStackAsync()
{
var stack = ImmutableStack<int>.Empty;
Task task1 = Task.Run(() => stack.Push(1));
Task task2 = Task.Run(() => stack.Push(2));
Task task3 = Task.Run(() => stack.Push(3));
await Task.WhenAll(task1, task2, task3);
return stack.IsEmpty; // 始終返回 true,因為原始 stack 未被修改
}
同步的重要性
同步的核心目標是為多線程環境中的代碼提供一致的數據視圖。如果缺乏適當的同步,可能導致:
- 數據競爭:多個線程同時修改數據,結果不可預測。
- 死鎖:多個線程因不當的鎖管理而相互等待,系統無法繼續執行。
- 性能瓶頸:過度同步可能導致線程阻塞,降低并發效率。
了解何時需要同步以及如何選擇合適的工具,是并發編程的基礎。后續小節將進一步探討常見的同步技術及其在不同場景中的應用。
12.2 原子操作
簡介
在多線程環境中,原子操作是不可分割的操作,要么完全執行成功,要么完全不執行,且在執行過程中不會被其他線程中斷或觀察到中間狀態。
原子操作提供了一種輕量級的線程安全方式,特別適用于高性能場景,避免了使用鎖引入的上下文切換和性能開銷。
.NET 提供了一些內置的原子操作支持,例如通過 Interlocked 類和某些原子性的集合操作,以此來方便地實現線程安全的數據更新。
使用場景
-
計數器或狀態管理
在多線程環境下安全地遞增、遞減或交換值,例如統計請求數、管理資源計數。 -
無鎖編程
在高性能并發場景下,減少鎖的使用,提高吞吐量。 -
簡化共享變量操作
對簡單變量執行原子操作,無需引入復雜的鎖機制。
代碼示例
使用 Interlocked 類
Interlocked 提供線程安全的方法,用于操作簡單的整數或引用類型。
1. 線程安全地遞增、遞減和累加
-
遞增 (
Interlocked.Increment) 和遞減 (Interlocked.Decrement)using System; using System.Threading; using System.Threading.Tasks; class Program { private static int _counter = 0; static async Task Main(string[] args) { // 啟動多個并發任務 Task[] tasks = new Task[10]; for (int i = 0; i < tasks.Length; i++) { tasks[i] = Task.Run(() => { for (int j = 0; j < 1000; j++) { Interlocked.Increment(ref _counter); // 線程安全遞增 } }); } await Task.WhenAll(tasks); Console.WriteLine($"Final Counter Value: {_counter}"); // 應該為 10000 } } -
累加 (
Interlocked.Add)class Program { private static int _sum = 0; static void AddValue(int value) { Interlocked.Add(ref _sum, value); // 原子累加 } static void Main() { Parallel.For(0, 10, i => AddValue(10)); // 并發累加 Console.WriteLine($"Total Sum: {_sum}"); // 應該為 100 } }
2. 線程安全地交換值
- 交換變量值 (
Interlocked.Exchange)
適用于在并發場景中重置或切換變量值。class Program { private static int _sharedValue = 42; static void UpdateValue(int newValue) { int originalValue = Interlocked.Exchange(ref _sharedValue, newValue); // 原子交換 Console.WriteLine($"Original Value: {originalValue}, New Value: {_sharedValue}"); } static void Main() { Parallel.For(0, 5, i => UpdateValue(i * 10)); // 并發更新 } }
3. 比較并交換值
Interlocked.CompareExchange
只有當當前值等于預期值時,才會更新為新值。private static int _state = 0; public static void SetState(int newState) { int originalState = Interlocked.CompareExchange(ref _state, newState, 0); // 如果 _state 是 0,則設置為 newState if (originalState == 0) { Console.WriteLine("State updated successfully."); } else { Console.WriteLine("State change failed. Current state: " + originalState); } }
4. 無鎖棧的簡單實現
利用原子操作實現無鎖的鏈表:
public class LockFreeStack<T>
{
private Node? _head;
private class Node
{
public T Value;
public Node? Next;
public Node(T value) => Value = value;
}
public void Push(T value)
{
var newNode = new Node(value);
Node? oldHead;
do
{
oldHead = _head; // 讀取當前頭節點
newNode.Next = oldHead;
} while (Interlocked.CompareExchange(ref _head, newNode, oldHead) != oldHead);
}
public bool TryPop(out T? value)
{
Node? oldHead;
do
{
oldHead = _head;
if (oldHead == null)
{
value = default;
return false;
}
} while (Interlocked.CompareExchange(ref _head, oldHead.Next, oldHead) != oldHead);
value = oldHead.Value;
return true;
}
}
原子操作的特點與限制
-
原子操作僅適用于簡單數據類型
Interlocked支持的類型包括int、long和引用類型等簡單變量。- 對于復雜的數據結構或多字段,可能需要其他線程安全方法。
-
原子性無法跨多個操作保證
原子操作僅在單次操作上提供保障,如果需要對多個變量或復雜邏輯實現原子性,需要借助鎖或其他同步機制。 -
低粒度但高性能
原子操作非常輕量級,尤其適合頻繁更新的場景。但對于復雜場景,可能會導致代碼難以理解和維護。
最佳實踐
-
優先考慮簡單場景
- 原子操作適合單一字段的更新,例如計數器或標志位。
- 避免在復雜邏輯中濫用原子操作。
-
使用合適的工具
- 對單變量更新,首選
Interlocked方法。 - 對集合操作,優先使用
ConcurrentDictionary、ConcurrentQueue等線程安全集合。
- 對單變量更新,首選
-
避免數據爭用
- 如果多個線程頻繁競爭同一個變量,即使使用原子操作,也可能導致性能下降。
- 通過分片或分區減少爭用熱點(如將計數分布在多個線程私有變量中,最后合并結果)。
-
小心無鎖編程的陷阱
- 無鎖編程雖然性能高,但邏輯難以驗證,容易引入隱藏的競爭條件。
- 在追求無鎖性能的同時,確保代碼正確性。
通過熟練使用原子操作,開發者可以在復雜的多線程環境中實現高性能且線程安全的代碼。
12.3 阻塞鎖lock
問題
當多個線程需要安全地訪問和修改共享數據時,如何確保數據的一致性和線程安全?阻塞鎖是解決這一問題的最簡單和常用工具。
解決方案
在.NET中,lock 語句提供了一種簡單易用的阻塞鎖實現方式。它通過鎖定一個引用對象,確保在某一時間點只有一個線程可以進入指定的代碼塊。
示例代碼:
class MyClass
{
// 鎖對象,用于保護共享字段 _value
private readonly object _mutex = new object();
private int _value;
public void Increment()
{
lock (_mutex) // 進入鎖
{
_value = _value + 1; // 僅允許一個線程修改 _value
} // 退出鎖
}
}
在上述代碼中,lock 語句使用 _mutex 對象作為鎖,確保 _value 的修改是線程安全的。
阻塞鎖的原理
lock 是 Monitor 的簡化語法糖。Monitor 提供了更細粒度的控制,但使用 lock 更直觀且便于維護。
lock 的工作機制:
- 嘗試進入鎖:線程嘗試獲取鎖對象(即
_mutex)。 - 阻塞其他線程:在鎖被占用時,其他線程會進入等待狀態,直到鎖釋放。
- 自動釋放:當控制流離開
lock塊,無論是否發生異常,鎖都會被自動釋放。
等價的 Monitor 寫法:
public void Increment()
{
Monitor.Enter(_mutex);
try
{
_value = _value + 1;
}
finally
{
Monitor.Exit(_mutex);
}
}
實際上,lock語句只是語法糖,編譯過后也是通過Monitor實現
常見陷阱
-
鎖嵌套導致死鎖
當線程 A 和線程 B 相互等待對方持有的鎖時,會導致程序永遠卡住。示例:
private readonly object lockA = new object(); private readonly object lockB = new object(); public void ThreadA() { lock (lockA) { // 需要 lockB,但 lockB 被 ThreadB 持有 lock (lockB) { // 執行代碼 } } } public void ThreadB() { lock (lockB) { // 需要 lockA,但 lockA 被 ThreadA 持有 lock (lockA) { // 執行代碼 } } }解決方法:始終按一致的順序獲取多個鎖。例如,確保線程總是先獲取 lockA,再獲取 lockB。
-
過度鎖定
- 鎖定不必要的代碼塊會降低并發性能,應僅鎖定需要保護的核心邏輯。
-
鎖的誤用
- 避免對公共對象(如
this)加鎖,否則可能引發外部代碼的競爭。
- 避免對公共對象(如
最佳實踐
-
限制鎖的可見性
- 鎖對象(如
_mutex)應為私有字段,且不能暴露給外部,避免潛在的死鎖風險。 - 雖然可以鎖定任何應用對象,但是避免鎖定
this、Type實例或string類型,因為它們可能被外部代碼共享。
- 鎖對象(如
-
記錄鎖的用途
- 在代碼注釋中明確鎖的作用范圍和保護的數據,便于日后維護。
-
最小化鎖內代碼
- 鎖定范圍越小,發生死鎖或性能瓶頸的可能性越低。
- 避免在鎖定狀態下執行阻塞操作(如 I/O 操作)或長時間運行的邏輯。
-
避免鎖定狀態下調用外部代碼
- 鎖定時不要調用委托、觸發事件或調用虛方法,因為這些代碼的行為可能難以預測,甚至引發死鎖。
其他鎖類型
除了基本的 lock 語句,.NET 提供了多個高級鎖類型,例如:
-
Monitor
提供比lock更細粒度的鎖控制(如超時等待等),但通常不需要直接使用。 -
SpinLock
適用于短時間鎖定的場景,可以避免線程掛起,但如果鎖持有時間較長會導致性能降低。 -
ReaderWriterLockSlim
支持多讀單寫的場景,對于讀多寫少的情況可以提高性能。但在多數情況下,lock已足夠簡單且高效。
建議:
在絕大多數應用程序中,lock 是最簡單且高效的選擇,能夠滿足 99% 的需求。僅在特殊場景下才需要考慮使用其他類型的鎖。
12.4 自旋鎖SpinLock(新手不推薦)
簡介
SpinLock 是一種輕量級的鎖,它通過不斷輪詢的方式(自旋)等待鎖的釋放,而不是阻塞線程或切換上下文。它的目標是減少線程上下文切換的開銷,因此適合鎖等待時間非常短的場景,例如高性能的多線程計算。
與阻塞鎖(如 lock 或 Monitor)不同,SpinLock 不會導致線程進入阻塞掛起狀態,而是持續占用 CPU 資源嘗試獲取鎖。
這使得 SpinLock 在短時間鎖競爭中具有極高的性能,但如果鎖競爭時間較長,會導致 CPU 資源的浪費,強烈不建議新手使用。
使用場景
-
高性能計算
用于短時間占用的鎖保護,如計數器更新、快速內存操作等。 -
避免線程切換開銷
在實時性較高的程序中,減少上下文切換和線程阻塞導致的延遲。 -
多線程并發編程
場景需要非常精細的性能優化,并且可以確保鎖爭用時間極短。
代碼示例
1. SpinLock 基本用法
using System;
using System.Threading;
class Program
{
private static SpinLock _spinLock = new SpinLock();
private static int _counter = 0;
static void IncrementCounter()
{
bool lockTaken = false;
try
{
_spinLock.Enter(ref lockTaken); // 獲取鎖
_counter++; // 保護的共享資源
}
finally
{
if (lockTaken)
{
_spinLock.Exit(); // 釋放鎖
}
}
}
static void Main()
{
const int threadCount = 10;
Thread[] threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++)
{
threads[i] = new Thread(() =>
{
for (int j = 0; j < 1000; j++)
{
IncrementCounter();
}
});
threads[i].Start();
}
foreach (var thread in threads)
{
thread.Join(); // 等待線程完成
}
Console.WriteLine($"Final Counter Value: {_counter}"); // 輸出 10000
}
}
2. 使用 SpinLock 的遞歸陷阱
SpinLock 默認不支持在同一線程中遞歸調用 Enter 方法。
static void RecursiveLock(SpinLock spinLock, int depth)
{
if (depth == 0) return;
bool lockTaken = false;
try
{
spinLock.Enter(ref lockTaken); // 獲取鎖
Console.WriteLine($"Lock taken at depth {depth}");
RecursiveLock(spinLock, depth - 1); // 再次調用導致死鎖
}
finally
{
if (lockTaken)
{
spinLock.Exit(); // 釋放鎖
}
}
}
解決方法:允許線程內重入
SpinLock spinLock = new SpinLock(enableThreadOwnerTracking: true);
但是,啟用線程跟蹤會增加一些性能開銷,應根據需要選擇使用。
SpinLock 的特點
-
輕量級與高性能
- SpinLock 避免了線程阻塞帶來的上下文切換開銷,在短時間鎖競爭場景中效率極高。
- 自旋期間線程不會讓出 CPU,因此適合短時間操作。
-
可能導致忙等
- 如果鎖占用時間較長,SpinLock 會導致其他線程長時間自旋,浪費 CPU 資源。
-
線程跟蹤功能
SpinLock可以選擇啟用線程所有權跟蹤(enableThreadOwnerTracking)。- 開啟后,
SpinLock能檢測同一線程的重復鎖請求(支持重入),但會帶來額外性能開銷。
-
不適合 IO 密集場景
- 在需要等待 IO 或其他慢速操作的場景中,SpinLock 的忙等可能導致嚴重的性能問題。
最佳實踐
-
僅在短時間鎖定中使用
- 如果共享資源的鎖持有時間較長,應避免使用 SpinLock。
-
謹慎處理遞歸調用
- 默認情況下 SpinLock 不支持遞歸調用,啟用線程跟蹤功能可以解決,但會帶來性能開銷。
-
釋放鎖的正確性
- 確保所有獲取鎖的路徑都能正確釋放鎖,尤其是在異常處理中。推薦在
try...finally塊中管理鎖。
- 確保所有獲取鎖的路徑都能正確釋放鎖,尤其是在異常處理中。推薦在
-
避免死鎖風險
- 確保鎖獲取和釋放的順序始終一致。
- 盡量減少鎖內操作的復雜性,避免調用外部方法。
-
考慮 SpinWait 替代
- 如果鎖等待邏輯需要更精細的控制,可以使用
SpinWait,它允許自定義自旋行為并在必要時讓出 CPU。
- 如果鎖等待邏輯需要更精細的控制,可以使用
SpinLock vs 其他鎖
| 特性 | SpinLock |
lock / Monitor |
Interlocked |
|---|---|---|---|
| 適用場景 | 短時間鎖定 | 長時間鎖定 | 單變量原子操作 |
| 性能 | 高(短時間鎖定) | 較低(上下文切換開銷) | 最高(無鎖) |
| 阻塞 | 無阻塞(自旋) | 阻塞線程 | 無阻塞 |
| 資源消耗 | 高(占用 CPU) | 中等 | 最低 |
| 復雜性 | 較高 | 低 | 低 |
| 遞歸支持 | 不支持 | 支持 | 不適用 |
12.5 自旋等待SpinWait(新手不推薦)
SpinWait 是 .NET 提供的一種輕量級同步工具,與 SpinLock 類似,SpinWait 也利用了“忙等待”的概念,但它更加靈活,旨在通過自旋和適度的讓步(Yielding)來平衡 CPU 使用和上下文切換開銷。
核心思想
當一個線程需要等待某個條件滿足時,有兩種常見的策略:
-
阻塞等待(Blocking):
使用鎖或等待句柄(如Monitor.Wait、AutoResetEvent等)來掛起線程,直到條件滿足。這種方式會導致線程上下文切換,增加額外開銷。 -
自旋等待(Spinning):
線程在一個循環中反復檢查條件是否滿足,同時不掛起線程。自旋的優勢在于避免了上下文切換的開銷,但會消耗 CPU 資源。
SpinWait 是介于這兩者之間的一種策略:
- 它首先通過自旋等待一段時間(消耗 CPU),以避免線程掛起的開銷。
- 當自旋次數達到一定閾值后,它會通過調用
Thread.Yield或短暫睡眠讓出 CPU,避免長時間占用 CPU 核心。
基本用法
以下是一個使用 SpinWait 的簡單示例:
using System;
using System.Threading;
class SpinWaitExample
{
private static bool _isConditionMet = false;
public static void Main()
{
Thread workerThread = new Thread(() =>
{
Console.WriteLine("Worker: Performing work...");
Thread.Sleep(1000); // 模擬一些工作
_isConditionMet = true; // 設置條件
Console.WriteLine("Worker: Work completed.");
});
workerThread.Start();
// 主線程使用 SpinWait 等待條件滿足
SpinWait spinWait = new SpinWait();
while (!_isConditionMet)
{
spinWait.SpinOnce(); // 每次循環調用 SpinOnce
}
Console.WriteLine("Main: Condition met, proceeding with execution.");
}
}
代碼說明:
-
SpinWait實例:
可以創建一個SpinWait實例來控制自旋行為。 -
SpinOnce方法:
每次調用SpinOnce,線程會執行一次自旋操作,隨著調用次數的增加,它會逐步調整自旋策略(比如在高自旋次數時調用Thread.Yield讓出時間片)。 -
條件檢查:
在循環中反復檢查_isConditionMet,直到條件滿足。
SpinWait 的特點
-
自適應自旋策略:
SpinWait會根據調用SpinOnce的次數動態調整策略。在初期,它會持續自旋以避免線程上下文切換;當自旋次數較多時,它會嘗試讓出 CPU(通過Thread.Yield或短暫的睡眠)。 -
非阻塞:
與阻塞式等待相比,SpinWait不會直接掛起線程,因此在等待時間較短的情況下性能更高。 -
避免 CPU 過載:
在高自旋次數情況下,通過讓出 CPU 避免了長時間占用 CPU 資源。
SpinOnce 的行為
SpinOnce 是 SpinWait 的核心方法,每次調用它時,SpinWait 會決定下一步的行為:
-
短時間自旋:
在前幾次調用時,SpinWait會以忙等待的方式直接占用 CPU 核心。 -
讓出 CPU:
當自旋次數達到一定閾值時,它會調用Thread.Yield將 CPU 時間片讓給其他線程。 -
短暫睡眠:
如果自旋次數繼續增加,SpinWait會調用Thread.Sleep(1)進行短暫的線程休眠。
這種動態調整的特性使得 SpinWait 能在短時間等待中保持高性能,同時在長時間等待中避免 CPU 過載。
SpinWait 的常見屬性和方法
-
SpinOnce():
執行一次自旋操作,動態調整自旋策略。 -
NextSpinWillYield(屬性):
返回一個布爾值,表示下一次調用SpinOnce時是否會嘗試讓出 CPU。SpinWait spinWait = new SpinWait(); while (!condition) { spinWait.SpinOnce(); if (spinWait.NextSpinWillYield)// 判斷下次調用spinWait.SpinOnce()是否會讓出CPU { Console.WriteLine("Yielding CPU to other threads..."); } } -
Count(屬性):
返回調用SpinOnce的次數,可以用來調試或控制自旋行為。
使用場景
-
短時間等待:
如果等待的條件會在很短的時間內滿足,SpinWait可以避免線程掛起,提高性能。示例:線程池中的任務調度器可以使用
SpinWait來等待任務隊列的更新。 -
高性能并發場景:
在高并發場景中,SpinWait可以用來等待輕量級的標志位或狀態切換,而不引入鎖的開銷。示例:生產者-消費者模型中,消費者可以使用
SpinWait等待生產者的狀態更新。 -
替代
Thread.Sleep或Thread.Yield:
在需要微秒級別等待的場景中,SpinWait比直接使用Thread.Sleep或Thread.Yield更高效。
SpinWait vs SpinLock
| 特性 | SpinWait |
SpinLock |
|---|---|---|
| 目的 | 等待條件滿足(循環檢查) | 保護臨界區(鎖機制) |
| 是否提供鎖機制 | 否 | 是 |
| 動態調整策略 | 是(自適應自旋 + 讓出 CPU) | 否(純自旋) |
| 使用復雜性 | 較低 | 較高 |
| 適用場景 | 條件等待 | 臨界區保護 |
底層實現
SpinWait 的實現依賴硬件和操作系統的調度支持:
- 在自旋階段,
SpinWait會反復執行輕量級的 CPU 指令,避免線程掛起。 - 在達到高自旋次數后,
SpinWait會調用Thread.Yield或Thread.Sleep,讓出 CPU 時間片。 - 通過這種方式,
SpinWait平衡了性能和資源消耗。
12.6 輕量級讀寫鎖ReaderWriterLockSlim
ReaderWriterLockSlim 是 .NET 中的一種高級鎖機制,旨在通過區分“讀操作”和“寫操作”來提高并發性能。它允許多個線程同時執行讀操作(共享鎖),但在寫操作(獨占鎖)時會阻止其他線程的讀寫操作。
這種鎖的設計理念是:
- 讀操作的并發性高: 允許多個線程同時讀取共享資源。
- 寫操作的獨占性強: 寫操作需要獨占鎖,確保線程安全。
相較于傳統的 lock 或 Monitor,ReaderWriterLockSlim 在 讀多寫少 的場景下提供了更高的性能。
核心特點
-
讀寫分離:
- 允許多個線程同時讀取數據,只要沒有線程在寫入。
- 如果有線程正在寫入,所有其他線程(包括讀線程)都會被阻塞。
-
支持遞歸鎖:
同一線程可以多次獲取讀鎖或寫鎖(遞歸鎖)。 -
升級鎖功能:
允許線程從讀鎖升級為寫鎖,但需要注意升級鎖可能導致死鎖問題。 -
高性能:
相較于老式的ReaderWriterLock,ReaderWriterLockSlim更輕量、更高效,適合高并發場景。
基本用法
以下是使用 ReaderWriterLockSlim 的典型示例:
using System;
using System.Threading;
class ReaderWriterLockSlimExample
{
private static ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
private static int _sharedResource = 0;
public static void Main()
{
Thread writer = new Thread(WriteResource);
Thread reader1 = new Thread(ReadResource);
Thread reader2 = new Thread(ReadResource);
writer.Start();
reader1.Start();
reader2.Start();
writer.Join();
reader1.Join();
reader2.Join();
Console.WriteLine("All operations completed.");
}
private static void WriteResource()
{
_lock.EnterWriteLock(); // 獲取寫鎖
try
{
Console.WriteLine("Writer: Acquired write lock.");
_sharedResource++;
Thread.Sleep(1000); // 模擬寫操作
Console.WriteLine($"Writer: Updated shared resource to {_sharedResource}.");
}
finally
{
_lock.ExitWriteLock(); // 釋放寫鎖
Console.WriteLine("Writer: Released write lock.");
}
}
private static void ReadResource()
{
_lock.EnterReadLock(); // 獲取讀鎖
try
{
Console.WriteLine($"Reader: Acquired read lock. Shared resource = {_sharedResource}");
Thread.Sleep(500); // 模擬讀操作
}
finally
{
_lock.ExitReadLock(); // 釋放讀鎖
Console.WriteLine("Reader: Released read lock.");
}
}
}
代碼說明:
-
EnterWriteLock和ExitWriteLock:EnterWriteLock用于獲取寫鎖,確保只有一個線程可以寫入共享資源。- 操作完成后,必須調用
ExitWriteLock釋放鎖。
-
EnterReadLock和ExitReadLock:EnterReadLock用于獲取讀鎖,允許多個線程同時讀取共享資源。- 操作完成后,必須調用
ExitReadLock釋放鎖。
-
讀多寫少:
示例中,兩個讀線程可以在寫線程完成之前同時讀取數據,體現了讀寫分離的優勢。
常用方法
| 方法/屬性 | 描述 |
|---|---|
EnterReadLock |
獲取讀鎖,允許多個線程同時讀取。 |
ExitReadLock |
釋放讀鎖。 |
EnterWriteLock |
獲取寫鎖,阻止其他線程的讀寫操作。 |
ExitWriteLock |
釋放寫鎖。 |
EnterUpgradeableReadLock |
獲取可升級的讀鎖,允許從讀鎖升級為寫鎖。 |
ExitUpgradeableReadLock |
釋放可升級的讀鎖。 |
IsReadLockHeld |
當前線程是否持有讀鎖。 |
IsWriteLockHeld |
當前線程是否持有寫鎖。 |
IsUpgradeableReadLockHeld |
當前線程是否持有可升級的讀鎖。 |
可升級讀鎖
什么是可升級讀鎖?
EnterUpgradeableReadLock 允許線程在持有讀鎖的同時,有條件地升級為寫鎖。它適用于以下場景:
- 線程開始時需要讀取數據。
- 根據某些條件,決定是否需要修改(寫入)數據。
以下是一個可升級讀鎖的示例:
private static void UpgradeableReadExample()
{
_lock.EnterUpgradeableReadLock(); // 獲取可升級讀鎖
try
{
Console.WriteLine($"Thread: Reading shared resource = {_sharedResource}");
if (_sharedResource == 0) // 滿足條件時升級為寫鎖
{
_lock.EnterWriteLock();
try
{
Console.WriteLine("Thread: Upgraded to write lock.");
_sharedResource = 42;
Console.WriteLine("Thread: Updated shared resource.");
}
finally
{
_lock.ExitWriteLock(); // 釋放寫鎖
}
}
}
finally
{
_lock.ExitUpgradeableReadLock(); // 釋放可升級讀鎖
}
}
注意:
- 只能在單個線程中使用可升級讀鎖。
- 如果多個線程同時嘗試升級讀鎖到寫鎖,可能會導致死鎖。
- 在可升級讀鎖中,其他線程仍可以獲取讀鎖,但不能獲取寫鎖。
最佳實踐
-
選擇合適的鎖類型
如果讀操作占絕對主導地位,優先選擇ReaderWriterLockSlim;否則,可以考慮更輕量的lock。 -
合理使用升級讀鎖
- 僅在確實需要時才升級為寫鎖。
- 避免過長時間持有升級讀鎖以減少競爭。
-
減少鎖粒度
盡量縮小鎖的作用范圍,減少持有鎖的時間,避免潛在的死鎖或性能下降。 -
避免嵌套鎖操作
嵌套ReaderWriterLockSlim的讀寫鎖操作會引發復雜的死鎖問題,應盡量避免。 -
釋放鎖的順序
- 保證
ExitReadLock、ExitWriteLock和ExitUpgradeableReadLock與其對應的Enter方法嚴格匹配。 - 使用
try-finally確保鎖釋放。
- 保證
12.7 異步鎖
傳統的同步鎖(如 lock 或 Monitor)無法直接與 async/await 兼容,因為它們并未為異步操作設計。為此,.NET 提供了 SemaphoreSlim,并支持異步方法,同時第三方庫(如 Nito.AsyncEx)也引入了更優雅的異步鎖實現,例如 AsyncLock。
使用場景
- 在
async/await場景中,需要保護共享數據免受并發訪問。 - 希望避免阻塞線程,并保持程序的高性能和響應性。
- 多個異步任務同時訪問一個共享資源時需要協調。
代碼示例
使用 SemaphoreSlim
using System;
using System.Threading;
using System.Threading.Tasks;
class MyClass
{
private readonly SemaphoreSlim _mutex = new SemaphoreSlim(1);
private int _value;
public async Task DelayAndIncrementAsync()
{
await _mutex.WaitAsync();
try
{
int oldValue = _value;
await Task.Delay(TimeSpan.FromSeconds(oldValue)); // 模擬異步操作
_value = oldValue + 1;
}
finally
{
_mutex.Release();
}
}
}
使用 AsyncLock(來自 Nito.AsyncEx)
using System;
using System.Threading.Tasks;
using Nito.AsyncEx;
class MyClass
{
private readonly AsyncLock _mutex = new AsyncLock();
private int _value;
public async Task DelayAndIncrementAsync()
{
using (await _mutex.LockAsync())
{
int oldValue = _value;
await Task.Delay(TimeSpan.FromSeconds(oldValue)); // 模擬異步操作
_value = oldValue + 1;
}
}
}
背后原理
-
SemaphoreSlim 的異步特性:
SemaphoreSlim.WaitAsync使用信號量限制同時訪問的線程數量。通過異步方式,它不會阻塞調用線程,而是返回一個等待的任務,直到資源可用。Release方法釋放鎖,使其他等待中的任務得以運行。
-
AsyncLock 的實現:
AsyncLock是一個簡化的異步鎖實現,通常基于SemaphoreSlim或類似的同步機制封裝。- 它通過
LockAsync提供了更易用的 API,同時內置了IDisposable模式,讓鎖的釋放更方便和安全。
-
避免阻塞線程:
- 傳統同步鎖可能阻塞線程,占用線程池資源。而異步鎖通過任務調度器讓出線程,從而提升系統的整體性能。
12.8 阻塞信號
問題
在多線程編程中,經常需要在線程間發送通知。例如,一個線程需要等待另一個線程完成某些初始化操作,然后再繼續執行。這種線程間的信號協調如何實現?
解決方案
.NET 提供了多種跨線程的信號機制,最常見的就是 ManualResetEventSlim。它是一種手動重置的事件信號,可以在線程間進行同步通知。ManualResetEventSlim 擁有兩種狀態:
-
有信號狀態(Signaled):
表示事件已經觸發,所有等待的線程都會被釋放。 -
無信號狀態(Non-Signaled):
表示事件未觸發,任何調用Wait的線程都會阻塞,直到事件被設置為有信號狀態。
基本用法
以下是一段示例代碼,展示如何使用 ManualResetEventSlim 在不同線程間發送信號:
using System;
using System.Threading;
class MyClass
{
// 手動重置事件
private readonly ManualResetEventSlim _initialized = new ManualResetEventSlim();
private int _value;
// 等待初始化完成
public int WaitForInitialization()
{
Console.WriteLine("Waiting for initialization...");
_initialized.Wait(); // 阻塞當前線程,直到事件被觸發
Console.WriteLine("Initialization complete.");
return _value;
}
// 另一個線程完成初始化并發送信號
public void InitializeFromAnotherThread()
{
Console.WriteLine("Initializing...");
_value = 13; // 設置共享數據
_initialized.Set(); // 觸發信號,釋放等待的線程
Console.WriteLine("Signal sent.");
}
}
class Program
{
static void Main()
{
var myClass = new MyClass();
// 啟動線程等待信號
var waitingThread = new Thread(() =>
{
int value = myClass.WaitForInitialization();
Console.WriteLine($"Value received: {value}");
});
waitingThread.Start();
// 主線程延遲后初始化
Thread.Sleep(2000); // 模擬一些延遲操作
myClass.InitializeFromAnotherThread();
waitingThread.Join(); // 等待線程結束
Console.WriteLine("Main thread exiting.");
}
}
輸出示例:
Waiting for initialization...
Initializing...
Signal sent.
Initialization complete.
Value received: 13
Main thread exiting.
代碼說明:
-
聲明事件:
ManualResetEventSlim _initialized是一個手動重置事件,用于線程間的信號同步。 -
等待信號:
WaitForInitialization方法在調用Wait()時會阻塞當前線程,直到事件變為有信號狀態。 -
觸發信號:
InitializeFromAnotherThread方法調用Set()方法將事件設置為有信號狀態,釋放所有等待中的線程。 -
線程間通信:
一個線程完成初始化后發送信號,另一個線程接收到信號后繼續執行。
ManualResetEventSlim 的核心方法
| 方法/屬性 | 描述 |
|---|---|
Wait() |
阻塞當前線程,直到事件進入有信號狀態。 |
Set() |
將事件設置為有信號狀態,釋放所有等待中的線程。 |
Reset() |
將事件重置為無信號狀態,阻止后續線程通過 Wait()。 |
IsSet |
返回事件當前是否處于有信號狀態。 |
Wait(TimeSpan) |
帶超時的等待。在指定時間內如果事件未觸發,返回 false。 |
與其他同步信號的對比
如果 ManualResetEventSlim 無法滿足需求,還可以考慮以下同步信號類型,由于不常用,僅僅給出簡單示例:
1. AutoResetEvent
自動重置事件,每次調用 Set() 只釋放一個等待的線程,然后自動重置為無信號狀態。
適用場景: 需要按順序逐個釋放線程,比如生產者/消費者模型。
代碼示例:
using System;
using System.Threading;
class AutoResetEventExample
{
private static AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
static void Main()
{
new Thread(Worker).Start();
Thread.Sleep(1000); // 模擬其他操作
Console.WriteLine("主線程發送信號...");
_autoResetEvent.Set(); // 釋放一個等待線程
}
private static void Worker()
{
Console.WriteLine("子線程等待信號...");
_autoResetEvent.WaitOne(); // 等待信號
Console.WriteLine("子線程收到信號,繼續執行!");
}
}
2. CountdownEvent
倒計數事件,初始化時設置一個計數器值,每調用一次 Signal() 計數減一,當計數降為 0 時,所有等待線程被釋放。
適用場景: 等待多個線程完成任務后再繼續執行,比如多線程下載完成后合并結果。
代碼示例:
using System;
using System.Threading;
class CountdownEventExample
{
private static CountdownEvent _countdown = new CountdownEvent(3); // 初始計數器為 3
static void Main()
{
for (int i = 0; i < 3; i++)
{
new Thread(Worker).Start(i);
}
Console.WriteLine("主線程等待所有子線程完成...");
_countdown.Wait(); // 等待計數器歸零
Console.WriteLine("所有子線程已完成!");
}
private static void Worker(object id)
{
Console.WriteLine($"子線程 {id} 正在執行...");
Thread.Sleep(1000); // 模擬工作
Console.WriteLine($"子線程 {id} 完成!");
_countdown.Signal(); // 計數器減一
}
}
3. Barrier
障礙同步機制,多個線程在每個階段完成時等待其他線程,所有線程完成該階段后自動進入下一個階段。
適用場景: 線程需要分階段協作,比如并行計算中的階段性數據處理。
代碼示例:
using System;
using System.Threading;
class BarrierExample
{
private static Barrier _barrier = new Barrier(3, (b) =>
{
Console.WriteLine($"所有線程已到達階段 {b.CurrentPhaseNumber + 1},進入下一階段...");
});
static void Main()
{
for (int i = 0; i < 3; i++)
{
new Thread(Worker).Start(i);
}
}
private static void Worker(object id)
{
for (int i = 0; i < 3; i++)
{
Console.WriteLine($"線程 {id} 正在完成階段 {i + 1}...");
Thread.Sleep(1000); // 模擬工作
_barrier.SignalAndWait(); // 等待其他線程到達障礙
}
}
}
總結
ManualResetEventSlim:通用信號機制,適合跨線程協調,手動控制信號狀態。AutoResetEvent:自動信號機制,每次觸發僅釋放一個線程,適合逐一釋放線程的場景。CountdownEvent:倒計數信號,適合等待一組線程完成任務。Barrier:階段性同步信號,適合線程分階段協作。
等待多個線程示例
以下示例展示如何使用多個 ManualResetEventSlim 同步多個線程:
using System;
using System.Threading;
class Program
{
static void Main()
{
var events = new ManualResetEventSlim[3]; // 創建三個信號
for (int i = 0; i < events.Length; i++)
{
events[i] = new ManualResetEventSlim(false);
}
for (int i = 0; i < events.Length; i++)
{
int index = i; // 避免閉包問題
new Thread(() =>
{
Console.WriteLine($"Thread {index + 1} starting...");
Thread.Sleep(1000 * (index + 1)); // 模擬工作
Console.WriteLine($"Thread {index + 1} completed.");
events[index].Set(); // 觸發對應的信號
}).Start();
}
// 等待所有線程完成
foreach (var e in events)
{
e.Wait();
}
Console.WriteLine("All threads completed. Main thread exiting.");
}
}
輸出示例:
Thread 1 starting...
Thread 2 starting...
Thread 3 starting...
Thread 1 completed.
Thread 2 completed.
Thread 3 completed.
All threads completed. Main thread exiting.
ManualResetEventSlim 的注意事項
-
避免信號丟失:
如果一個線程在另一個線程調用Wait()之前就已經調用了Set(),等待線程可能會錯過信號。這種情況下,應確保信號的觸發順序符合邏輯。 -
線程阻塞:
Wait()會阻塞線程,因此應避免長時間等待,或者考慮使用帶有超時的Wait(TimeSpan)方法。 -
信號的正確使用:
在某些場景下,如果信號只是用來協調訪問共享數據(而不是線程間通知),更合適的方式可能是使用鎖(如Monitor或ReaderWriterLockSlim)。 -
輕量級場景:
ManualResetEventSlim適用于輕量級同步。如果需要更復雜的線程協調機制,可以考慮其他同步方式(如AutoResetEvent或Barrier)。
12.9 異步信號
異步信號是用于異步代碼中的一種信號機制,允許任務在非阻塞的情況下等待某種條件滿足。不同于傳統的阻塞信號(如 ManualResetEventSlim),異步信號不會阻塞線程,而是返回一個可等待的任務,當條件滿足時繼續執行。
在 .NET 中,可以通過 SemaphoreSlim、TaskCompletionSource<T> 或第三方庫(如 Nito.AsyncEx 的 AsyncManualResetEvent)來實現異步信號。
代碼示例
示例 1:使用 SemaphoreSlim
using System;
using System.Threading;
using System.Threading.Tasks;
class AsyncSignalExample
{
private readonly SemaphoreSlim _signal = new SemaphoreSlim(0); // 初始無信號狀態
private int _sharedData;
// 等待信號的異步方法
public async Task<int> WaitForSignalAsync()
{
await _signal.WaitAsync(); // 異步等待信號
return _sharedData;
}
// 觸發信號的方法
public async Task SetSignalAsync()
{
await Task.Delay(1000); // 模擬異步操作
_sharedData = 42; // 更新共享數據
_signal.Release(); // 觸發信號
Console.WriteLine("信號已發送");
}
}
class Program
{
static async Task Main()
{
var example = new AsyncSignalExample();
// 同時啟動等待和觸發任務
var waitTask = example.WaitForSignalAsync();
var setTask = example.SetSignalAsync();
int result = await waitTask; // 等待信號
await setTask;
Console.WriteLine($"收到信號,數據為:{result}");
}
}
示例 2:使用 TaskCompletionSource<T> 實現單次信號
當一個通知僅需發送一次時,可以使用 TaskCompletionSource<T>。
using System;
using System.Threading.Tasks;
class AsyncSignalExample
{
private readonly TaskCompletionSource<object> _initialized = new TaskCompletionSource<object>();
private int _value1;
private int _value2;
// 異步等待信號的任務
public async Task<int> WaitForInitializationAsync()
{
await _initialized.Task; // 等待信號
return _value1 + _value2;
}
// 觸發信號的方法
public void Initialize()
{
_value1 = 13;
_value2 = 17;
_initialized.TrySetResult(null); // 觸發信號
Console.WriteLine("信號已發送");
}
}
class Program
{
static async Task Main()
{
var example = new AsyncSignalExample();
// 啟動等待任務
var waitTask = example.WaitForInitializationAsync();
// 模擬異步觸發
await Task.Delay(1000);
example.Initialize();
// 獲取結果
int result = await waitTask;
Console.WriteLine($"收到信號,計算結果為:{result}");
}
}
示例 3:使用 AsyncManualResetEvent 實現多次信號
對于需要多次設置和重置信號的場景,可以使用 Nito.AsyncEx 提供的 AsyncManualResetEvent。
using System;
using System.Threading.Tasks;
using Nito.AsyncEx;
class AsyncSignalExample
{
private readonly AsyncManualResetEvent _connected = new AsyncManualResetEvent();
// 異步等待信號的任務
public async Task WaitForConnectedAsync()
{
await _connected.WaitAsync(); // 等待信號
Console.WriteLine("已連接");
}
// 設置或重置信號的方法
public void ConnectedChanged(bool connected)
{
if (connected)
{
_connected.Set(); // 設置信號
Console.WriteLine("信號已設置");
}
else
{
_connected.Reset(); // 重置信號
Console.WriteLine("信號已重置");
}
}
}
class Program
{
static async Task Main()
{
var example = new AsyncSignalExample();
// 啟動等待任務
var waitTask = example.WaitForConnectedAsync();
// 模擬信號變化
example.ConnectedChanged(true);
await Task.Delay(1000); // 等待信號響應
example.ConnectedChanged(false);
}
}
背后原理
-
TaskCompletionSource<T>的實現:TaskCompletionSource<T>創建了一個Task,通過TrySetResult或TrySetException等方法控制任務的完成狀態。- 一旦信號被觸發,所有等待任務都會被解除掛起。
-
AsyncManualResetEvent的機制:- 使用內部的
TaskCompletionSource來管理信號狀態。 Set方法觸發信號,完成等待的任務;Reset方法創建新的未完成任務,等待新的信號觸發。
- 使用內部的
-
SemaphoreSlim的WaitAsync本質上是對一個信號量計數器的操作,當計數器大于 0 時,不阻塞線程。 -
與阻塞信號的區別:
- 阻塞信號:線程掛起,直到信號釋放。
- 異步信號:線程釋放,任務被掛起,資源利用率更高。
常見陷阱
-
不正確的信號狀態管理:
- 在需要重復使用信號時,忘記調用
Reset,可能導致等待的任務無法再次被觸發。
- 在需要重復使用信號時,忘記調用
-
誤用阻塞操作:
- 在異步代碼中使用阻塞信號(如
ManualResetEventSlim),可能導致線程被不必要地占用,降低性能。
- 在異步代碼中使用阻塞信號(如
-
資源泄漏:
- 如果使用
SemaphoreSlim,忘記調用Release會導致信號量計數不一致,可能阻止等待任務完成。
- 如果使用
-
誤用異步和同步混合操作:
-
示例(問題代碼):
_signal.WaitAsync().Wait(); // 同步等待異步信號,可能導致死鎖
-
最佳實踐
-
根據需求選擇合適的信號工具:
- 單次信號:使用
TaskCompletionSource<T>。 - 多次信號:使用
AsyncManualResetEvent。
- 單次信號:使用
-
清晰管理信號的生命周期:
- 在
Set后,若需要再次等待,應調用Reset明確恢復到初始狀態。
- 在
-
避免混用同步和異步:
- 在異步代碼中,始終使用異步信號機制,避免混合同步等待。
-
結合超時與取消令牌:
-
在
WaitAsync中傳遞CancellationToken,避免信號等待無限期掛起。 -
示例:
await _signal.WaitAsync(cancellationToken); // await _signal.WaitAsync(TimeSpan.FromSeconds(5));// 或者
-
12.10 節流
問題
在高并發場景中,代碼可能會生成大量并發任務或線程,導致以下問題:
- 資源耗盡: 過多的并發操作可能導致 CPU、內存、網絡連接等資源被耗盡。
- 性能下降: 由于資源競爭,任務處理效率可能反而降低。
- 系統不穩定: 如果無法有效限制并發數,可能導致應用程序崩潰或響應變慢。
為了解決上述問題,需要引入節流機制,通過限制并發操作的數量,平衡性能與資源消耗。
解決方案
節流機制的核心是限制并發操作的數量。根據代碼的并發類型,可以采用不同的節流方法。
以下是常見的節流方案:
- 數據流(Dataflow)
- PLINQ(并行 LINQ)
Parallel類- 異步代碼:
SemaphoreSlim和Channel
1. 數據流(Dataflow)
數據流(Dataflow)是 .NET 提供的一種用于并發處理的編程模型,它內置了節流機制,可以通過設置 MaxDegreeOfParallelism 來限制并發任務的數量。
示例:TransformBlock 的節流
以下代碼使用 TPL 數據流的 TransformBlock 實現節流:
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
static async Task Main(string[] args)
{
// 創建一個 TransformBlock,用于執行并發的乘法操作
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10 // 最大并發數為 10
};
var block = new TransformBlock<int, int>(data =>
{
Console.WriteLine($"Processing {data} on Task {Task.CurrentId}");
Task.Delay(100).Wait(); // 模擬耗時操作
return data * 2;
}, options);
// 向塊中發送數據
for (int i = 0; i < 20; i++)
{
block.Post(i);
}
block.Complete(); // 表示不再有更多的數據發送到塊中
// 獲取處理結果
while (await block.OutputAvailableAsync())
{
Console.WriteLine(await block.ReceiveAsync());
}
Console.WriteLine("Processing complete.");
}
}
代碼說明:
-
TransformBlock:
用于將輸入映射到輸出的異步數據塊。 -
ExecutionDataflowBlockOptions:
設置塊的最大并發數為 10,限制同時處理的數據量。 -
節流效果:
即使發送了 20 條數據,只有 10 個任務會同時運行。
2. PLINQ(并行 LINQ)
PLINQ 是 LINQ 的并行版本,支持對集合中的元素進行并行處理。同樣可以通過 WithDegreeOfParallelism 限制并發數。
示例:PLINQ 實現節流
using System;
using System.Linq;
class Program
{
static void Main(string[] args)
{
var values = Enumerable.Range(1, 20);
// 使用 PLINQ 對數據進行并行處理,并限制并發數
var results = values
.AsParallel()
.WithDegreeOfParallelism(5) // 最大并發數為 5
.Select(item =>
{
Console.WriteLine($"Processing {item} on Task {Task.CurrentId}");
Task.Delay(100).Wait(); // 模擬耗時操作
return item * 2;
})
.ToList();
Console.WriteLine("Results: " + string.Join(", ", results));
}
}
代碼說明:
-
AsParallel:
將集合轉為并行查詢。 -
WithDegreeOfParallelism:
限制并發任務的數量為 5。 -
節流效果:
即使集合有 20 個元素,最多只有 5 個任務會同時執行。
3. Parallel 類
Parallel 類提供了多種方法(如 Parallel.ForEach)用于并行處理集合中的數據。同樣可以通過 ParallelOptions 設置最大并發數。
示例:Parallel.ForEach 的節流
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
var matrices = new List<int>(Enumerable.Range(1, 20));
// 使用 Parallel.ForEach 并限制最大并發數
var options = new ParallelOptions
{
MaxDegreeOfParallelism = 4 // 最大并發數為 4
};
Parallel.ForEach(matrices, options, matrix =>
{
Console.WriteLine($"Processing {matrix} on Task {Task.CurrentId}");
Task.Delay(100).Wait(); // 模擬耗時操作
});
Console.WriteLine("Processing complete.");
}
}
代碼說明:
-
ParallelOptions:
設置最大并發數為 4。 -
節流效果:
即使集合有 20 個元素,最多只有 4 個任務會同時執行。
4. 異步代碼的節流
在異步編程中,可以使用 SemaphoreSlim 或 Channel 來限制并發任務的數量。這種方法適用于網絡請求或 I/O 操作等異步任務。
示例 1:使用 SemaphoreSlim
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
var urls = new List<string>
{
"https://example.com",
"https://example.org",
"https://example.net"
};
using var client = new HttpClient();
using var semaphore = new SemaphoreSlim(2); // 最大并發數為 2
var tasks = urls.Select(async url =>
{
await semaphore.WaitAsync(); // 獲取信號
try
{
Console.WriteLine($"Downloading {url}...");
string content = await client.GetStringAsync(url);
Console.WriteLine($"Downloaded {url}");
return content;
}
finally
{
semaphore.Release(); // 釋放信號
}
});
var results = await Task.WhenAll(tasks);
Console.WriteLine("All downloads complete.");
}
}
代碼說明:
-
SemaphoreSlim:
控制同時執行的異步任務數量。 -
節流效果:
即使有多個 URL 需要下載,最多只有 2 個請求會同時發出。
示例 2:使用 Channel
Channel 是 .NET 提供的高性能生產者-消費者模型,同樣可以實現節流。
using System;
using System.Collections.Generic;
using System.Threading.Channels;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
var data = new List<int>(Enumerable.Range(1, 20));
var channel = Channel.CreateBounded<int>(5); // 最大并發數為 5
// 寫入數據到通道
_ = Task.Run(async () =>
{
foreach (var item in data)
{
await channel.Writer.WriteAsync(item);
Console.WriteLine($"Produced {item}");
}
channel.Writer.Complete();
});
// 從通道中讀取數據并處理
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Processing {item} on Task {Task.CurrentId}");
await Task.Delay(100); // 模擬耗時操作
}
Console.WriteLine("Processing complete.");
}
}
代碼說明:
-
Channel.CreateBounded:
創建一個容量為 5 的通道,用于限制同時處理的數據量。 -
節流效果:
生產者可以連續寫入數據,但消費者最多只能處理 5 個數據項。
節流的適用場景
-
高并發任務:
限制同時執行的任務數量,防止資源耗盡。 -
網絡請求:
控制并發的 HTTP 請求數量,防止服務器過載。 -
CPU 密集型任務:
避免任務過度占用 CPU,確保其他線程也能獲得資源。 -
生產者-消費者模型:
限制消費者的處理速度,防止緩沖區過度填充。 -
最佳實踐:
- 根據硬件性能設置合理的并發限制。
- 測試實際負載,避免限制過于寬松或過于保守。

浙公網安備 33010602011771號