練習:自己動手實現一個輕量級的信號量(一)
信號量歷史悠久,折磨死了一代又一代的計算機專業學生,但是不得不承認其在Multi-thread環境下的巨大作用。最經典的案例莫過于管理一個環狀緩沖區。.NET 中的Semaphore對象等同于Win32中的Semaphore。屬于內核級對象,因此使用它的代價就比較大了。并且Semaphore對象每次僅僅能夠等待一個Count,這有的時候讓事情變得有些煩,例如你可能不得不將環狀緩沖區分割為一個個的Chunk(實際上這是一個好方法,因為我們應該對于Cache進行優化)。Qt中的信號量可以一次獲得多個Count,感覺很方便。綜上,我們希望自己動手實現一個輕量級的,支持一次獲得多個資源的信號量。
首先看看信號量重要維護哪些信息。第一,當前還有多少剩余的資源(_currentResources);第二,系統初始化的時候有多少資源(_initResources);第三,當前有多少個線程被阻塞(_waitingThreadCount):
2 {
3 private int _currentResources;
4 private int _initResources;
5 private volatile int _waitingThreadCount;
6 }
我們知道,如果信號量持有的資源數目沒完沒了的減少,那么降到0就會阻塞,但是沒完沒了的增加呢?厄~你說會溢出,所以,我們還需要進行一個上限的檢查。于是我們添加了一個上限值,這個上限值應該由用戶指定,并且一旦指定就不能更改了。于是我們的代碼中添加了一個新的Field。
2 {
3 private int _currentResources;
4 private int _initResources;
5 private volatile int _waitingThreadCount;
6 // 資源數目的上限
7 private readonly int _maxResourceCount;
8 }
好的,你發現了,我們的需要管理的Fields不止一個,那么~~對了,為了保證某些操作的一致性,我們肯定需要一把鎖。
2 {
3 private int _currentResources;
4 private int _initResources;
5 private volatile int _waitingThreadCount;
6 private readonly int _maxResourceCount;
7 // 信號量對象的鎖
8 private object _globalLock;
9 }
好了,下面我們可以開始我們的工作了。下一步我們干什么呢?厄~咱們解決掉關鍵問題之后剩下的工作肯定輕松愉快,因此,我們先考慮Wait和Release吧。
首先解決Release的問題,想想Release要做哪些事情。
(1)看看當前的資源數目加上釋放的數目會不會超過上限
(2)如果不超過上限就增加我們的資源數目
(3)如果發現有的線程在等待資源則激活那些線程,讓他們看看現有的資源是不是夠他們使用了
(4)將之前的資源數目返回給用戶。還有關鍵的一點,這些操作必須保證是Atomic的,于是這些操作都需要lock!
寫代碼了~~~
2 {
3 // 參數需要檢查一下
4 if (releaseResCount < 1)
5 throw new ArgumentOutOfRangeException("releaseCount");
6 // 這個函數需要保證其結果的一致性
7 lock (this._globalLock)
8 {
9 // 已經超過了上限了
10 if (this._maxResourceCount - this._currentResources < releaseResCount)
11 {
12 throw new System.Threading.SemaphoreFullException();
13 }
14 // 很好,現在我們在做第(2)步
15 this._currentResources += releaseResCount;
16 // 我們現在可以做第(3)步了
17 // 為了性能考慮,我們應當分兩種情況進行討論。
18 // 當然了,如果沒有線程等待就不管了~
19 // 什么?萬一你在判斷的時候等待的線程增加了怎么辦?有鎖呢,不可能~
20 if (this._waitingThreadCount == 1)
21 {
22 System.Threading.Monitor.Pulse(this._globalLock);
23 }
24 else if (this._waitingThreadCount > 1)
25 {
26 System.Threading.Monitor.PulseAll(this._globalLock);
27 }
28 // 這就是第(4)步
29 return (this._currentResources - releaseResCount);
30 }
31 }
以上的代碼正確么?答案是錯誤的!錯誤在了第(3)步,最終導致第4步結果可能是錯誤的。
問題出在了Pulse上。如果你發現看MSDN上的解釋有些頭暈,那么我們就用偽代碼解釋一下。
Monitor.Pulse(...)
等價于
if(_globalLock.Owner == Thread.CurrentThread)
{
Monitor.Exit(_globalLock);
}
else
{
throw new ...;
}
InternalPulse(...);
Monitor.Enter(_globalLock);
問題已經出來了,實際上我們喚醒線程時釋放了鎖,喚醒之后只有重新獲得鎖之后才能繼續,而這個時候信號量擁有的資源數目很可能已經改變了!我們說,Release返回給用戶的數據是調用Release之前信號量所保有的資源數目,那么,我們就應當提前保存這個值。修改一下:
2 {
3 // 參數需要檢查一下
4 if (releaseResCount < 1)
5 throw new ArgumentOutOfRangeException("releaseCount");
6 // 這個函數需要保證其結果的一致性
7 lock (this._globalLock)
8 {
9 // 已經超過了上限了
10 if (this._maxResourceCount - this._currentResources < releaseResCount)
11 {
12 throw new System.Threading.SemaphoreFullException();
13 }
14 // 很好,現在我們在做第(2)步
15 int old = this._currentResources;
16 this._currentResources += releaseResCount;
17 // 我們現在可以做第(3)步了
18 // 為了性能考慮,我們應當分兩種情況進行討論。
19 // 當然了,如果沒有線程等待就不管了~
20 // 什么?萬一你在判斷的時候等待的線程增加了怎么辦?有鎖呢,不可能~
21 if (this._waitingThreadCount == 1)
22 {
23 System.Threading.Monitor.Pulse(this._globalLock);
24 }
25 else if (this._waitingThreadCount > 1)
26 {
27 System.Threading.Monitor.PulseAll(this._globalLock);
28 }
29 // 這就是第(4)步
30 return old;
31 }
32 }
好,目前我們輕松的解決了Release問題。但是Wait就不是那么好辦的了。想這個花掉很多腦細胞。首先一個問題,Wait可能阻塞,所以必須支持超時操作。其次就是必須考慮線程喚醒之后的工作。
我們還是先理一下思路。Wait試圖減少資源數目。如果失敗就需要不停的等待,直到他被別人喚醒。首先用文字書寫一下:
(1)試圖獲得鎖!這是必須的因為沒有鎖的話我們根本沒有辦法保證操作的一致性。這個時候我們不能夠使用lock{...}因為我們必須處理超時的問題。
(2)好樣的,鎖在我們手里了,現在我們要試圖減少資源數目:
while(當前的資源數還不夠我們減少的)
{
// 也就是說我們必須等待了
if (已經超時了)
{
返回;
}
釋放掉鎖并讓線程處于等待狀態。如果超時就返回;
// 好了,執行到這一句說明我們已經持有鎖了,有人把我們喚醒了
// 但是喚醒了不代表我們就有我們需要的資源了所以循環回去檢查
}
// 能執行到這一句說明:(a)我們擁有鎖,(b)我們有足夠的資源
減少資源數目;
(3)釋放鎖。
我們來寫代碼!
2 {
3 // 參數檢查工作是必須的
4 if (millisecondsTimeout < -1)
5 throw new ArgumentOutOfRangeException("millisecondsTimeout");
6 if (waitResNumber < 1)
7 throw new ArgumentOutOfRangeException("waitResNumber");
8 // 我們在后面要兩次處理超時,因此我們需要一個計時器
9 System.Diagnostics.StopWatch watch = null;
10 if (millisecondsTimeout != -1)
11 {
12 watch = System.Diagnostics.Stopwatch.StartNew();
13 }
14 // 現在我們來做第(1)步
15 if (!System.Threading.Monitor.TryEnter(this._globalLock))
16 {
17 if (millisecondsTimeout == 0)
18 return false;
19 if (!System.Threading.Monitor.TryEnter(this._globalLock, millisecondsTimeout))
20 {
21 return false;
22 }
23 }
24 // 現在我們已經獲得鎖了,我們要增加阻塞的線程數目,以便將來有人能夠喚醒我們
25 ++this._waitingThreadCount;
26 try
27 {
28 // 如果當前的資源數目不夠用的,我們就得等等了
29 while (this._currentResources - waitResNumber < 0)
30 {
31 if (millisecondsTimeout != -1)
32 {
33 // 看看是不是超時了
34 timeoutNum = UpdateTimeout(watch, millisecondsTimeout);
35 if (timeoutNum <= 0)
36 {
37 return false;
38 }
39 }
40 // 如果沒有超時我們就再等一下
41 if (!System.Threading.Monitor.Wait(this._globalLock, timeoutNum))
42 {
43 return false;
44 }
45 // 好的我們被喚醒了,趕快回去檢查檢查有沒有足夠的資源了
46 }
47 // 很好,我們現在有足夠的資源了,
48 // 并且沒有什么人能夠再更改資源數目了,因為鎖在我們手里!
49 this._currentResources -= waitResNumber;
50 }
51 finally
52 {
53 // 很好,我們安全的退出了,減少阻塞的線程數目并且釋放鎖
54 --this._waitingThreadCount;
55 System.Threading.Monitor.Exit(this._globalLock);
56 }
57 return true;
58 }
好的!我們基本上實現了一個信號量的精華部分了。但是這是不夠的。尤其是在Wait的過程中,我們直接就使用了Monitor,實際上,對于多核心處理器來說,一開始進行短暫的spin是更好的選擇!我們的代碼還有優化的空間!下一回,我們就在Wait中添加Spin優化,并最終實現一個完整的信號量。
信號量歷史悠久,折磨死了一代又一代的計算機專業學生,但是不得不承認其在Multi-thread環境下的巨大作用。最經典的案例莫過于管理一個環狀緩沖區。.NET 中的Semaphore對象等同于Win32中的Semaphore。屬于內核級對象,因此使用它的代價就比較大了。并且Semaphore對象每次僅僅能夠等待一個Count,這有的時候讓事情變得有些煩,例如你可能不得不將環狀緩沖區分割為一個個的Chunk(實際上這是一個好方法,因為我們應該對于Cache進行優化)。Qt中的信號量可以一次獲得多個Count,感覺很方便。綜上,我們希望自己動手實現一個輕量級的,支持一次獲得多個資源的信號量。
浙公網安備 33010602011771號