在說到異步前,先來理一下幾個容易混淆的概念,并行、多線程、異步。
并行,一般指并行計算,是說同一時刻有多條指令同時被執行,這些指令可能執行于同一CPU的多核上,或者多個CPU上,或者多個物理主機甚至多個網絡中。
多線程,一般指同一進程中多個線程(包含其數據結構、上下文與代碼片段)協作運行。在多核計算機中多個線程將有機會同時運行于多個核上,如果線程中進行的是計算,則行成并行計算。
異步,與同步相對應,是指呼叫另一操作后,不等待其結果,繼續執行之后的操作,若之后沒有其他操作,當前線程將進入睡眠狀態,而CPU時間將有機會切至其他線程。在異步操作完成后通過回調函數的方式獲取通知與結果。異步的實現方式有多種,如多線程與完成端口。多線程將異步操作放入另一線程中運行,通過輪詢或回調方法得到完成通知;完成端口,由操作系統接管異步操作的調度,通過硬件中斷,在完成時觸發回調方法,此方式不需要占用額外線程。
本文討論.NET下的異步,以及其進化過程中出現的多種異步模式。
首先看一下兩段需要花較長時間運行的代碼在同步方式下的情形。
public class ProgramClass
{
public static void Main()
{
using (var fs = new FileStream("Data.dat", FileMode.OpenOrCreate, FileAccess.Write, FileShare.None, 4096))
{
fs.Write(new byte[100], 0, 100);
}
DoSomething();
Console.WriteLine("END");
}
static string DoSomething()
{
Thread.Sleep(2000);
return "Finished";
}
}
同步方式運行時,所有操作會順序執行,當某方法被阻塞時,線程即進入阻塞狀態。該情形下,CPU時間無法得到充分利用,當前線程長時間處于阻塞狀態,任務總時間長。

開始異步化
為提高CPU使用率,從而減少任務時間,采用多線程方式實現異步調用。
public class ProgramClass
{
public static void Main()
{
Thread writeThread = new Thread(new ThreadStart(WriteWapper));
Thread doSomethingThread = new Thread(new ParameterizedThreadStart(DoSomethingWapper));
ClosureClass closure = new ClosureClass();
writeThread.Start();
doSomethingThread.Start(closure);//閉包對象,用于變量穿越
writeThread.Join();
doSomethingThread.Join();
Console.WriteLine(closure.Result);
}
//將方法包裝成適于線程調用的簽名
private static void WriteWapper()
{
using (var fs = new FileStream("Data.dat", FileMode.OpenOrCreate, FileAccess.Write, FileShare.None, 4096))
{
fs.Write(new byte[100], 0, 100);
}
}
//將方法包裝成適于線程調用的簽名
static void DoSomethingWapper(object state)
{
ClosureClass closure = state as ClosureClass;
var result = DoSomething();
if (closure != null)
{
closure.Result = result;
}
}
static string DoSomething()
{
Thread.Sleep(2000);
return "Finished";
}
//閉包輔助類,用于存儲在方法間傳遞內部變量與參數
class ClosureClass
{
//存儲方法返回值
public string Result { get; set; }
}
}
利用多線程將耗時操作放入其他線程中進行處理,主線程繼續做自己的事(本例中,主線程進行等待其他線程完成)。從而減少任務處理時間。
【注意】本例中,write與dosomething操作內部均有線程等待,在單核中依然可以通過操作系統的線程切換提高CPU使用率,但是如果操作是需要大量CPU計算,則在單核情況下并不一定能夠提高CPU使用率,并且可能增加線程調試的開銷,因此單核情況下此種方式不適合用于密集型運算。
【提示】對于線程的入口方法,我們往往會對其進行包裝,形成一致的方法簽名、處理異常、攔截請求等。在本例中,由于被調用的方法有輸入與輸出,困此采用輔助對象進行傳遞,在C#2開始引入的閉包,采用類似的原理實現,從而減少大量的代碼,并提高程序可讀性。
public class ProgramClass
{
public static void Main()
{
string result = null;
Thread writeThread = new Thread(new ThreadStart(WriteWapper));
Thread doSomethingThread = new Thread(new ThreadStart(() =>
{
result = DoSomething();//跨方法訪問臨時變量,形成閉包
}));
writeThread.Start();
doSomethingThread.Start();
writeThread.Join();
doSomethingThread.Join();
Console.WriteLine(result);
}
//將方法包裝成適于線程調用的簽名
private static void WriteWapper()
{
using (var fs = new FileStream("Data.dat", FileMode.OpenOrCreate, FileAccess.Write, FileShare.None, 4096))
{
fs.Write(new byte[100], 0, 100);
}
}
static string DoSomething()
{
Thread.Sleep(2000);
return "Finished";
}
}
開啟一個新線程將帶來可觀的開銷,因此我們希望能夠重用線程,在.NET中,可以采用線程池達到這一目的,同時簡化線程的操作。
public class ProgramClass
{
public static void Main()
{
string result = null;
AutoResetEvent resetEvent = new AutoResetEvent(false);
ThreadPool.QueueUserWorkItem(new WaitCallback(state =>
{
result = DoSomething();
resetEvent.Set();
}));
resetEvent.WaitOne();
Console.WriteLine(result);
}
static string DoSomething()
{
Thread.Sleep(2000);
return "Finished";
}
}
由于線程池中,我們無法對線程進行更為細致的操作,為得到操作完成的通知,我們需要在包裝方法中,在操作完成后加入適當的代碼,本例中我們采用ResetEvent進行線程的同步。
【注意】在ASP.NET中,所有的WEB線程均運行于線程池,因此線程池中的線程是非常寶貴的資源,耗盡線程池中的線程將可能引起所有的請求進入等待隊列,從而無法提供服務,在ASP.NET中的線程池操作應該更為謹慎。
完成端口與異步模型
到這里為止,都是采用多線程的方式手動實現了異步,正如前面所說,多線程不適用于單核密集運算,在非密集運算下也會產生線程調度的開銷,在需要大量線程的應用中會浪費寶貴資源。考察需要阻塞等待的場景,往往是與系統外部數據交換有關,如大量內存數據的復制、讀寫磁盤文件、訪問網絡等,這種情況下,在硬件完成操作前CPU無能為力,因此只能等待,更完美的方案是發出指令后不進入等待,當操作完畢后通過某種方式得到通知并執行相關代碼,稱為完成端口。

完成端口編程復雜,并且需要操作系統支持,使用中需要先判斷是否支持,再采用不同的方式去實現,并且實現的方法多樣,在異步使用頻繁的今天,為簡化異步操作,往往會制訂一種統一的異步模型,并且這類模型也在不斷進化中。
在介紹異步模型時,我們會用不同的方法先將一個普通方法異步調用,再調用類庫中提供的異步方法,然后實現一個自己的異步方法,最后將多個異步方法按順序調用包裝成新的異步方法。
在早期的.NET中,采用 BeginXXX/EndXXX 方式實現異步。
對于普通的方法,可以采用委托的 BeginInvoke / EndInvoke 實現異步化。
public class ProgramClass
{
public static void Main()
{
string result = null;
var doSomgthingDelegate = new Func<string>(DoSomething);
var asyncResult = doSomgthingDelegate.BeginInvoke(new AsyncCallback(aresult =>
{
result = doSomgthingDelegate.EndInvoke(aresult);
}), null);
asyncResult.AsyncWaitHandle.WaitOne();
Console.WriteLine(result);
}
static string DoSomething()
{
Thread.Sleep(2000);
return "Finished";
}
}
委托的異步內部采用線程池實現。
有些類庫中的方法,實現了異步版本。
public static void Main()
{
using (var fs = new FileStream("Data.dat", FileMode.OpenOrCreate, FileAccess.Write, FileShare.None, 4096
, FileOptions.Asynchronous))
{
fs.Write(new byte[100], 0, 100);
var asyncResult = fs.BeginWrite(new byte[100], 0, 100, new AsyncCallback(aresult => {
fs.EndWrite(aresult);//執行完畢后的回調方法
}), null);
asyncResult.AsyncWaitHandle.WaitOne();
}
}
對于類庫的方法的異步版本,內部會進行判斷決定采用何種方式實現。
【注意】對于FileStream,必須加上FileOptions.Asynchronous才會有機會使用完成端口。
現在我們可以根據這個模型來實現自己的異步方法。
public class ProgramClass
{
public static void Main()
{
DoSomeThing();
var result = BeginDoSomeThing(1, new AsyncCallback(aresult =>
{
ProgramClass.EndDoSomeThing(aresult);
}), null);
result.AsyncWaitHandle.WaitOne();
}
//同步版本
public static string DoSomeThing()
{
Thread.Sleep(2000);
return "Finished";
}
//異步版本開始
public static IAsyncResult BeginDoSomeThing(int arg1, AsyncCallback callback, object state)
{
var asyncResult = new DoSomethingAsyncResult(callback, state);
Timer timer = null;
timer = new Timer(new TimerCallback(s =>
{
timer.Dispose();
asyncResult.SetComplete("Finished");
}), state, TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(2));
return asyncResult;
}
//異步版本結束
public static string EndDoSomeThing(IAsyncResult asyncResult)
{
DoSomethingAsyncResult result = asyncResult as DoSomethingAsyncResult;
if (result != null)
{
return result.Result;
}
return null;
}
//AsyncResult對象
public class DoSomethingAsyncResult : IAsyncResult
{
private AsyncCallback _asyncCallback;
private AutoResetEvent _asyncWaitHandle;
public DoSomethingAsyncResult(AsyncCallback asyncCallback, object state)
{
AsyncState = state;
_asyncCallback = asyncCallback;
_asyncWaitHandle = new AutoResetEvent(false);
}
//設置結果
public void SetComplete(string result)
{
Result = result;
IsCompleted = true;
if (_asyncCallback != null)
{
_asyncCallback(this);
}
_asyncWaitHandle.Set();
}
public string Result
{
get;
private set;
}
public object AsyncState
{
get;
private set;
}
public WaitHandle AsyncWaitHandle
{
get { return _asyncWaitHandle; }
}
public bool CompletedSynchronously
{
get { return false; }
}
public bool IsCompleted
{
get;
private set;
}
}
}
本例中,采用定時器觸發完成動作,實際中,可以在需要的時候觸發完成。
對于BeginXXX/EndXXX模式,調用BeginXXX表示開始一個異步方法,前面的參數表示方法所需的參數(可無),倒數第二個參數為回調方法(可空),最后一個參數用于穿越整個過程的相關對象(可空)。返回的IAsyncResult存儲了異步方法的相關狀態信息,一般來說我們自己的異步方法需要一個實現了該接口的類,類中包含了回調方法、等待對象、相關參數與結果等。
異步方法的協作有三種方法,第一種,通過輪詢 IsCompleted 屬性,直到為true時,觸發完成動作。
第二種,通過回調方法,當異步方法完成時,由異步方法調用回調方法。

第三種,通過WaitHandler等待異步方法完成,當異步方法完成時,由異步方法發出完成信號,使等待結束。

當我們需要將多個異步方法包裝成一個異步方法時,方法內部將充斥著大量的回調方法。
類似于這樣:
public static IAsyncResult BeginDoSomeThing(int arg1, AsyncCallback callback, object state)
{
var asyncResult = new DoSomethingAsyncResult(callback, state);
Timer timer = null;
timer = new Timer(new TimerCallback(s =>
{
timer.Dispose();
using (var fs = new FileStream("Data.dat", FileMode.OpenOrCreate, FileAccess.Write, FileShare.None, 4096, FileOptions.Asynchronous))
{
var writeresult = fs.BeginWrite(new byte[100], 0, 100, new AsyncCallback(wresult =>
{
fs.EndWrite(wresult);
asyncResult.SetComplete("Finished");
}), null);
}
}), state, TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(2));
return asyncResult;
}
呼~幸好還有匿名方法與閉包,否則將是一件多么恐怖的事啊。
新的異步模型
我們清醒的看到,當需要多個異步方法需要協作時,代碼將顯得十分復雜,無法表現清晰的邏輯,于是,我們需要一個更好的異步模型。
從.NET4開始,引入了新的異步模型。
首先引入一個新概念:Task。
Task代表一個可以被執行的任務,我們可以讓他運行,關聯其他任務,等待他,獲取他的結果。值得注意的是,這里的Task可以是一個異步的任務,也可以是同步的任務,在沒有特別說明的情況下都指異步任務。而返回一個Task對象的方法,我們一般認為這是一個異步方法。new Task或者Task.Run將生成一個在線程池中運行的異步任務。
的按照慣例,我們看一下如何把一個普通的方法異步執行。
public static void Main()
{
var t1 = Task<int>.Run(() =>
{
Thread.Sleep(2000);
return 100;
}).ContinueWith(new Action<Task<int>>(t =>
{
Console.WriteLine(t.Result);
}));
t1.Wait();
}
對于類庫中提供的異步方法,也有了新版本,XXXAsync。
public static void Main()
{
using (var fs = new FileStream("Data.dat", FileMode.OpenOrCreate, FileAccess.Write, FileShare.None, 4096,
FileOptions.Asynchronous))
{
var task = fs.WriteAsync(new byte[100], 0, 100)
.ContinueWith(new Action<Task>(t => {
Console.WriteLine("Finished");
}));
task.Wait();
}
}
我們不再關心如何去開始,何時會結束,一切變成了一些有關或無關的任務。
讓我們自己寫一個異步方法吧。
public static Task<string> DoSomethingAsync(int value)
{
return Task<string>.Run(() =>
{
Thread.Sleep(2000);
return value.ToString();
}); ;
}
好吧,你肯定是以我在偷懶,為什么不像BeginXXX/EndXXX一樣從底層開始實現一個呢,那是因為Task的封裝比較嚴,我們無法直接對其擴展。為了達到獲取一個Task,在需要的時候設置完成與結果,可以借助 AsyncTaskMethodBuilder 來實現。
public class ProgramClass
{
public static void Main()
{
var task = ProcessAsync();
task.Wait();
var r = task.Result;
}
static Task<string> ProcessAsync()
{
//輔助工具
AsyncTaskMethodBuilder<string> builder = AsyncTaskMethodBuilder<string>.Create();
Timer timer = null;
timer = new Timer(s =>
{
timer.Dispose();
builder.SetResult("Finished");//在需要時設置結果
}, null, TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(2));
return builder.Task;//獲取需要的Task
}
}
類似的方法,我們封裝一個由多個異步方法組合成的異步方法。
public class ProgramClass
{
public static void Main()
{
var task = ProcessAsync();
task.Wait();
var r = task.Result;
}
static Task<string> ProcessAsync()
{
//輔助工具
AsyncTaskMethodBuilder<string> builder = AsyncTaskMethodBuilder<string>.Create();
DoSomethingAync1().GetAwaiter().OnCompleted(() =>
{
DoSomethingAync2().GetAwaiter().OnCompleted(() =>
{
DoSomethingAync2().GetAwaiter().OnCompleted(() =>
{
builder.SetResult("Finished");
});
});
});
return builder.Task;//獲取需要的Task
}
static Task<string> DoSomethingAync1() { ... }
static Task<string> DoSomethingAync2() { ... }
static Task<string> DoSomethingAync3() { ... }
}
組合異步方法調用后,按順序調用第一個異步方法,緊接著,產生需要的結果Task后返回。異步方法完成時回調指定的方法,并按順序繼續調用,所有方法完成后,把運行的最終結果設置給結果Task,那么整個任務即完成。

如果異步方法有回返值,那么組合的異步方法看上去會復雜一點。
static Task<string> ProcessAsync()
{
//輔助工具
AsyncTaskMethodBuilder<string> builder = AsyncTaskMethodBuilder<string>.Create();
string r1, r2, r3;//用于存儲每一個任務的結構
var awaitor1 = DoSomethingAync1().GetAwaiter();
awaitor1.OnCompleted(() =>
{
r1 = awaitor1.GetResult();
var awaitor2 = DoSomethingAync2().GetAwaiter();
awaitor2.OnCompleted(() =>
{
r2 = awaitor2.GetResult();
var awaitor3 = DoSomethingAync3().GetAwaiter();
awaitor3.OnCompleted(() =>
{
r3 = awaitor3.GetResult();
builder.SetResult(r1 + r2 + r3);//計算最終結構并設置結果
});
});
});
return builder.Task;//獲取需要的Task
}
代碼雖然復雜了一點,但還能夠接受,這里的每個異步方法的返回值需要臨時變量來存儲,包括每個異步方法的TaskAwaiter對象,需要跨越多個方法,這里將形成閉包,使得這些對象無法盡快釋放,同時,每一個異步方法都將附加一個OnComplete回訪方法的委托對象,這些都是使用上述方法的代價,這些代價在理論上是可以被優化的,但是帶來的是更為復雜的代碼結果,暫且放下吧,因為,解決方案就在后面。
重口味語法糖
在C#5中,添加了 async/await 關鍵字,使得上面遺留的問題得以解決,而且重點是,用起來非常簡單!
上面的代碼在C#5時代可以寫成下面的樣子:
static async Task<string> ProcessAsync()
{
var r1 = await DoSomethingAync1();
var r2 = await DoSomethingAync2();
var r3 = await DoSomethingAync3();
return r1 + r2 + r3;
}
是不是震驚了。
他幾乎和同步方法寫法一致。程序的邏輯完全沒有因為異步而打亂,并且減少了代碼量,這就是語法糖的魅力。
語法糖的背后隱藏了不為人知的內部實現,特別重口味語法糖,我們需要知道他背后的實現,才不致于消化不良。
先看一下語法,async關鍵字告訴編譯器,對本方法使用語法糖,對于這類方法只能返回 void/Task/Task<T>,返回void/Task代表異步方法不返回任何結果,返回void在調用方看來是一個同步方法,沒有機會獲取異步回調。返回Task<T>代表返回結果為T的異步方法,在該方法內部,可以直接返回類型為T的結果。在返回結果前可以使用await關鍵字調用其他異步方法,并且可以直接獲取該異步方法的返回值。無需處理任何Task相關的內容。當async方法內部沒有任何await時,該方法效果與同步相同,僅僅是簡單包裝后Task而已。
該方法的執行順序與前面我們自己實現的相同,內部實現也有一些類似,同樣采用AsyncTaskBuilder構建Task對象,在我們自己實現的方法中,在方法內部(一個或多個匿名方法與閉包對象)實現多個異步方法的調度,而async/await語法糖則采用一個狀態機對象作為媒介進行多個異步方法的調度。

編譯后,async異步方法將執行過程委托給狀態機,自己則向AsyncTaskBuilder獲取Task返回,狀態機內部存儲方法內部參與計算的臨時變量(閉包),維護當前執行狀態,-1代表開始與中間狀態,-2代表結束,0-n代表正在執行第n個異步方法,狀態機的MoveNext方法按順序去調用其他的異步方法,如異步方法已執行完畢則繼續往下執行,如未完畢,則設置當前狀態,存儲任務的Awaiter對象,并關聯完成動作(狀態機方法本身的單例委托對象)后結束,當異步方法執行完畢,繼續調用狀態機MoveNext方法,按照狀態找到執行入口點,找到上次執行的Awaiter對象,并獲取執行結果,然后繼續找到下一個異步方法執行,重復以上的步驟,如果異步方法間有其他代碼,照本執行,當所有異步方法與內部代碼執行完畢后,通過AsyncTaskBuilder向異步方法的結果Task設置結果值,該Task即完成。

從編譯后的結果可以看到,這里不再存在閉包對象與多個回調方法及其委托對象,全部合在狀態機對象當中,而每一次異步方法調用后的Awaiter對象也可以在異步方法完成后釋放引用,在狀態機對象中根據簽名的種類提供必要的字段位置,狀態機本身也是結構體,最大限度上減少了空間的開銷與GC的壓力,而所有的這一切,編譯器通通搞定,而程序員,只需要關注邏輯的順序與結果的處理即可。
浙公網安備 33010602011771號