1、基本概念
除了順序執(zhí)行和并行執(zhí)行的模型以外,還有異步模型,這是事件驅(qū)動模型的基礎(chǔ)。異步活動的執(zhí)行模型可以只有一個單一的主控制流,能在單核心系統(tǒng)和多核心系統(tǒng)中運行。
在并發(fā)執(zhí)行的異步模型中,許多任務(wù)被穿插在同一時間線上,所有的任務(wù)都由一個控制流執(zhí)行(單一線程)。任務(wù)的執(zhí)行可能被暫停或恢復(fù),中間的這段時間線程將會執(zhí)行其他任務(wù)。大致如下:

如上圖所示,任務(wù)(不同的顏色表示不同的任務(wù))可能被其他任務(wù)插入,但是都處在同一個線程下。這表明當某一個任務(wù)執(zhí)行的時候,其他任務(wù)都暫停了。與多線程編程模型很大的一點不同是,多線程的某個任務(wù)在時間線上什么時候掛起某個活動或恢復(fù)某個活動由系統(tǒng)決定,而在異步中,程序員必須假設(shè)線程可能在任何時間被掛起和替換。
程序員可以將任務(wù)編寫成許多可以間隔執(zhí)行的小步驟,如果一個任務(wù)需要另一個任務(wù)的輸出,那么被依賴的任務(wù)必須接收它的輸入。
2、使用Python的concurrent.futures模塊
這個模塊具有線程池和進程池、管理并行編程任務(wù)、處理非確定性的執(zhí)行流程、進程/線程同步等功能。
此模塊由一下部分組成:
- concurrent.futures.Executor:這是一個虛擬基類,提供了異步執(zhí)行的方法。
- submit(function, argument):調(diào)度函數(shù)(可調(diào)用的對象)的執(zhí)行,將argument作為參數(shù)傳入。
- map(function, argument):將argument作為參數(shù)執(zhí)行函數(shù),以異步的方式。
- shutdown(Wait=True):發(fā)出讓執(zhí)行者釋放所有資源的信號。
- concurrent.futures.Future:其中包括函數(shù)的異步執(zhí)行。Future對象是submit任務(wù)(即帶有參數(shù)的functions)到executor的實例。
Executor是抽象類,可以通過子類訪問,即線程或進程的ExecutorPools。因為線程或進程的實例是依賴于資源的任務(wù),所以最好以池的形式將他們組織在一起,作為可以重用的launcher和executor。
線程池和進程池是用于在程序中優(yōu)化和簡化線程/進程的使用。通過池可以提交任務(wù)給executor。池由兩部分組成,一部分是內(nèi)部的隊列,存放著待執(zhí)行的任務(wù);另一部分是一系列的進程或線程,用于執(zhí)行這些任務(wù)。池的概念主要目的是為了重用:讓線程或進程在生命周期內(nèi)可以多次使用。他減少了創(chuàng)建線程和進程的開銷,提高了程序性能。重用不是必須的規(guī)則,但它是程序員在應(yīng)用中使用池的主要原因。

current.Futures提供了兩種Executor的子類,各自獨立操作一個線程池和一個進程池。這兩個子類分別是:
- concurrent.futures.ThreadPoolExecutor(max_workers)
- concurrent.futures.ProcessPoolExecutor(max_workers)
max_workers參數(shù)表示最多有多少個worker并行執(zhí)行任務(wù)
代碼測試:
import concurrent.futures import time number_list = [1,2,3,4,5,6,7,8,9,10] def evaluate_item(x): #For time consuming result_item = count(x) return result_item def count(number): for i in range(0, 10000000): i = i + 1 return i * number if __name__ == "__main__": # Sequential execution start_time = time.time() for item in number_list: print(evaluate_item(item)) print("Sequential execution in %s seconds" %(str(time.time() - start_time))) # Thread pool execution start_time_1 = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: futures = [executor.submit(evaluate_item, item) for item in number_list] for future in concurrent.futures.as_completed(futures): print(future.result()) print("Thread pool execution in %s seconds" %(str(time.time() - start_time_1))) # Process pool execution start_time_2 = time.time() with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: futures = [executor.submit(evaluate_item, item) for item in number_list] print("Process pool execution in %s seconds" %(str(time.time() - start_time_2)))
運行結(jié)果:
10000000 20000000 30000000 40000000 50000000 60000000 70000000 80000000 90000000 100000000 Sequential execution in 8.975373029708862 seconds 10000000 20000000 30000000 40000000 60000000 70000000 50000000 80000000 90000000 100000000 Thread pool execution in 8.699156045913696 seconds Process pool execution in 5.916198968887329 seconds
創(chuàng)建一個list存放10個數(shù)字,然后使用一個循環(huán)計算從1加到10000000,打印出和與number_list的乘積。
number_list = [1,2,3,4,5,6,7,8,9,10] def evaluate_item(x): #For time consuming result_item = count(x) return result_item def count(number): for i in range(0, 10000000): i = i + 1 return i * number
在主程序中,首先順序執(zhí)行了一次程序并打印其執(zhí)行時間:
start_time = time.time() for item in number_list: print(evaluate_item(item)) print("Sequential execution in %s seconds" %(str(time.time() - start_time)))
其次使用futures.ThreadPoolExecutor模塊的線程池并打印其時間:
start_time_1 = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: futures = [executor.submit(evaluate_item, item) for item in number_list] for future in concurrent.futures.as_completed(futures): print(future.result()) print("Thread pool execution in %s seconds" %(str(time.time() - start_time_1)))
ThreadPoolExecutor使用線程池中的一個線程執(zhí)行給定任務(wù)。池中一共有5個線程,每一個線程從池中取得一個任務(wù)然后執(zhí)行它,當任務(wù)執(zhí)行完成,再從池中拿到另一個任務(wù)。
最后是使用進程池:
start_time_2 = time.time() with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: futures = [executor.submit(evaluate_item, item) for item in number_list] print("Process pool execution in %s seconds" %(str(time.time() - start_time_2)))
和ThreadPoolExecutor一樣,ProcessPoolExecutor是一個executor,使用一個線程池來并行執(zhí)行任務(wù)。因為ProcessPoolExecutor使用了多核處理的模塊,讓我們可以不受GIL的限制,大大縮短執(zhí)行時間。
幾乎所有需要處理多個客戶端請求的服務(wù)應(yīng)用都會使用池。也有應(yīng)用要求需要立即執(zhí)行,或者要求對任務(wù)的線程有更多的控制器,這種情況下,池不是一個最佳選擇。
3、使用Asyncio管理事件循環(huán)
先入為主:
import asyncio import datetime import time def function_1(end_time, loop): print("function_1 called") if (loop.time() + 1.0) < end_time: loop.call_later(1, function_2, end_time, loop) else: loop.stop() def function_2(end_time, loop): print("function_2 called") if (loop.time() + 1.0) < end_time: loop.call_later(1, function_3, end_time, loop) else: loop.stop() def function_3(end_time, loop): print("function_3 called") if (loop.time() + 1.0) < end_time: loop.call_later(1, function_1, end_time, loop) else: loop.stop() def function_4(end_time, loop): print("function_4 called") if (loop.time() + 1.0) < end_time: loop.call_later(1, function_4, end_time, loop) else: loop.stop() loop = asyncio.get_event_loop() print(loop.time()) end_loop = loop.time() + 9.0 print(end_loop) loop.call_soon(function_1, end_loop, loop) #loop.call_soon(function_4, end_loop, loop) loop.run_forever() loop.close()
執(zhí)行結(jié)果:

上述例子定義了三個異步任務(wù),相繼執(zhí)行,如圖所示:

首先,我們要得到這個事件循環(huán):
loop = asyncio.get_event_loop()
然后我們通過call_soon方法調(diào)用了function_1()函數(shù)。
end_loop = loop.time() + 9.0
loop.call_soon(function_1, end_loop, loop)
function_1:
def function_1(end_time, loop): print("function_1 called") if (loop.time() + 1.0) < end_time: loop.call_later(1, function_2, end_time, loop) else: loop.stop()
- end_time定義了function_1可以運行的最長時間,并通過call_later方法傳入到function_2中作為參數(shù)
- loop通過get_event_loop()方法得到的事件循環(huán)
任務(wù)執(zhí)行結(jié)束之后,它會比較loop.time() + 1s和設(shè)定的運行時間,如果沒有超過,使用call_later在1秒之后執(zhí)行function_2(),function_2和3作用類似
如果運行時間超過了設(shè)定,時間循環(huán)終止。
概念解釋:
Python的Asyncio模塊提供了管理事件、協(xié)程、任務(wù)和線程方法,以及編寫并發(fā)代碼的原語。主要組件和概念包括:
- 事件循環(huán):在Asyncio模塊中,每一個進程都有一個事件循環(huán)。
- 協(xié)程:這是子程序的泛化概念。協(xié)程可以在執(zhí)行期間暫停,這樣就可以等待外部的處理(例如IO)完成之后,從之前暫停的地方恢復(fù)執(zhí)行。
- Futures:定義了Future對象,和concurrent.futures模塊一樣,表示尚未完成的計算。
- Tasks:這是Asyncio的子類,用于封裝和管理并行模式下的協(xié)程。
事件循環(huán):
在計算機系統(tǒng)中,可以產(chǎn)生事件的實體叫做事件源,能處理事件的實體叫做事件處理者,還有一些第三方實體叫做事件循環(huán)。它的作用是管理所有的事件,在整個程序運行過程中不斷循環(huán)執(zhí)行,追蹤事件發(fā)生的順序?qū)⑺麄兎诺疥犃兄校斨骶€程空閑的時候,調(diào)用相應(yīng)的事件處理者處理事件。
Asyncio管理事件循環(huán)的方法:
- loop = get_event_loop():得到當前上下文的事件循環(huán)。
- loop.call_later(time_delay, callback, argument):延后time_delay秒再執(zhí)行callback方法。
- loop.call_soon(callback, argument):盡可能快的調(diào)用callback。call_soon()函數(shù)結(jié)束,主線程回到事件循環(huán)之后就會馬上調(diào)用callback。
- loop.time():以float類型返回當前時間循環(huán)的內(nèi)部時間。
- asyncio.set_event_loop():為當前上下文設(shè)置時間循環(huán)。
- asyncio.new_event_loop():根據(jù)此策略創(chuàng)建一個新的時間循環(huán)并返回。
- loop.run_forever():在調(diào)用stop()之前將一直運行。run_forever真正開始執(zhí)行函數(shù)。
4、使用Asyncio管理協(xié)程
上述例子中一個程序變得很大而且復(fù)雜時,將其劃分為子程序,每一部分實現(xiàn)特定的任務(wù)。子程序不能單獨執(zhí)行,只能在主程序的請求下執(zhí)行,主程序負責(zé)協(xié)調(diào)使用各個子程序。協(xié)程是子程序的泛化,和子程序一樣的是,協(xié)程只負責(zé)計算任務(wù)的一步;不同的是協(xié)程沒有主程序來進行調(diào)度。因為協(xié)程通過管道連接在一起,沒有監(jiān)視函數(shù)負責(zé)順序調(diào)用他們。在協(xié)程中,執(zhí)行點可以被掛起,可以被之前掛起的點恢復(fù)執(zhí)行。通過協(xié)程池就可以插入到計算中:運行第一個任務(wù),直到它返回yield執(zhí)行權(quán),然后運行下一個,這樣順著執(zhí)行下去。
這種插入的控制組件就是前文提到的事件循環(huán),它持續(xù)追蹤所有的協(xié)程并執(zhí)行它們。
協(xié)程的另外一些重要特性如下:
- 協(xié)程可以有多個入口點,并可以yield多次
- 協(xié)程可以將執(zhí)行權(quán)交給其他協(xié)程
yield表示協(xié)程在此暫停,并且將執(zhí)行權(quán)交給其他協(xié)程,因為協(xié)程可以將值與控制權(quán)一起傳遞給另一個協(xié)程,所以yield一個值就表示將值傳給下一個執(zhí)行的協(xié)程。
測試用例:
import asyncio import time from random import randint @asyncio.coroutine def StartState(): print("Start State called \n") input_value = randint(0,1) time.sleep(1) print("I am StartState.input_value is %s" %input_value) if (input_value == 0): result = yield from State2(input_value) else: result = yield from State1(input_value) print("Resume of the Transition : \nStart State calling %s" %result) @asyncio.coroutine def State1(transition_value): outputValue = str("State 1 with transition value = %s \n" %transition_value) input_value = randint(0,1) time.sleep(1) print("...Evaluation...") print("I am State1.input_value is %s" %input_value) if input_value == 0: result = yield from State3(input_value) else: result = yield from State2(input_value) result = "State 1 calling %s" %result return outputValue + str(result) @asyncio.coroutine def State2(transition_value): outputValue = str("State 2 with transition value = %s \n" %transition_value) input_value = randint(0,1) time.sleep(1) print("...Evaluation...") print("I am State2.input_value is %s" %input_value) if input_value == 0: result = yield from State1(input_value) else: result = yield from State3(input_value) result = "State 2 calling %s" %result return outputValue + str(result) @asyncio.coroutine def State3(transition_value): outputValue = str("State 3 with transition value = %s \n" %transition_value) input_value = randint(0,1) time.sleep(1) print("...Evaluation...") print("I am State3.input_value is %s" %input_value) if input_value == 0: result = yield from State1(input_value) else: result = yield from EndState(input_value) result = "State 1 calling %s" %result return outputValue + str(result) @asyncio.coroutine def EndState(transition_value): outputValue = str("End State with transition value = %s \n" %transition_value) print("I am EndState.outputValue is %s" %outputValue) print("...Stop Computation...") return outputValue if __name__ == "__main__": print("Finite State Machine simulation with Asyncio Coroutine") loop = asyncio.get_event_loop() loop.run_until_complete(StartState())
上述代碼為使用Asyncio的協(xié)程來模擬有限狀態(tài)機(一個數(shù)學(xué)模型,不僅在工程領(lǐng)域應(yīng)用廣泛,在科學(xué)領(lǐng)域也很著名)。模擬的狀態(tài)機如下:

系統(tǒng)有四個狀態(tài),0和1是狀態(tài)機可以從一個狀態(tài)到另一個狀態(tài)的值,這個過程叫轉(zhuǎn)換。
運行結(jié)果(結(jié)果不唯一):

每一個狀態(tài)都由一個裝飾器裝飾:@asyncio.coroutine
通過yield from命令調(diào)用下一個協(xié)程。
啟動事件循環(huán):
if __name__ == "__main__": print("Finite State Machine simulation with Asyncio Coroutine") loop = asyncio.get_event_loop() loop.run_until_complete(StartState())
5、使用Asyncio控制任務(wù)
Asyncio是用來處理事件循環(huán)中的異步進程和并發(fā)任務(wù)執(zhí)行的。它還提供了asyncio.Task()類,可以在任務(wù)中使用協(xié)程。它的作用是在同一事件循環(huán)中,運行某一個任務(wù)的同時可以并發(fā)地運行多個任務(wù)。當協(xié)程被包在任務(wù)中,它會自動將任務(wù)和事件循環(huán)連接起來,當事件循環(huán)啟動的時候,任務(wù)自動運行。這樣就提供了一個可以自動驅(qū)動協(xié)程的機制。
Asyncio模塊為我們提供了asyncio.Task(coroutine)方法來處理計算任務(wù),它可以調(diào)度協(xié)程的執(zhí)行。任務(wù)對協(xié)程對象在事件循環(huán)的執(zhí)行負責(zé)。如果被包裹的協(xié)程要從future yield,那么任務(wù)會被掛起,等待future的計算結(jié)果。
當future計算完成,被包裹的協(xié)程將會拿到future返回的結(jié)果或異常(exception)繼續(xù)執(zhí)行。另外,需要注意的是事件循環(huán)一次只能運行一個任務(wù),除非還有其它事件循環(huán)在不同的線程并行運行,此任務(wù)才有可能和其他任務(wù)并行。當一個任務(wù)在等待future執(zhí)行的期間,事件循環(huán)會運行一個新的任務(wù)。
測試用例:
import asyncio @asyncio.coroutine def factorial(number): f = 1 for i in range(2, number + 1): print("Asyncio.Task: Compute factorial(%s)" %i) yield from asyncio.sleep(0.5) f *= i print("Asyncio.Task - factorial(%s) = %s" %(number, f)) @asyncio.coroutine def fibonacci(number): a,b = 0,1 for i in range(number): print("Asyncio.Task: Compute fibonacci(%s)" %i) yield from asyncio.sleep(0.5) a, b = b, a+b print("Asyncio.Task - fibonacci(%s) = %s" %(number, a)) @asyncio.coroutine def binomialCoeff(n, k): result = 1 for i in range(1, k+1): result = result * (n-i+1)/i print("Asyncio.Task:Compute binomialCoeff(%s)" %i) yield from asyncio.sleep(0.5) print("Asyncio.Task - binomialCoeff(%s, %s) = %s" %(n, k, result)) if __name__ == "__main__": tasks = [asyncio.Task(factorial(10)), asyncio.Task(fibonacci(10)), asyncio.Task(binomialCoeff(20, 10))] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) loop.close()
執(zhí)行結(jié)果:
Asyncio.Task: Compute factorial(2) Asyncio.Task: Compute fibonacci(0) Asyncio.Task:Compute binomialCoeff(1) Asyncio.Task: Compute factorial(3) Asyncio.Task: Compute fibonacci(1) Asyncio.Task:Compute binomialCoeff(2) Asyncio.Task: Compute factorial(4) Asyncio.Task: Compute fibonacci(2) Asyncio.Task:Compute binomialCoeff(3) Asyncio.Task: Compute factorial(5) Asyncio.Task: Compute fibonacci(3) Asyncio.Task:Compute binomialCoeff(4) Asyncio.Task: Compute factorial(6) Asyncio.Task: Compute fibonacci(4) Asyncio.Task:Compute binomialCoeff(5) Asyncio.Task: Compute factorial(7) Asyncio.Task: Compute fibonacci(5) Asyncio.Task:Compute binomialCoeff(6) Asyncio.Task: Compute factorial(8) Asyncio.Task: Compute fibonacci(6) Asyncio.Task:Compute binomialCoeff(7) Asyncio.Task: Compute factorial(9) Asyncio.Task: Compute fibonacci(7) Asyncio.Task:Compute binomialCoeff(8) Asyncio.Task: Compute factorial(10) Asyncio.Task: Compute fibonacci(8) Asyncio.Task:Compute binomialCoeff(9) Asyncio.Task - factorial(10) = 3628800 Asyncio.Task: Compute fibonacci(9) Asyncio.Task:Compute binomialCoeff(10) Asyncio.Task - fibonacci(10) = 55 Asyncio.Task - binomialCoeff(20, 10) = 184756.0
上述例子定義了三個線程,factorial,fibonacci,binomialCoeff,每一個都帶有asyncio.coroutine裝飾器:
將三個task放入到一個list中:
tasks = [asyncio.Task(factorial(10)), asyncio.Task(fibonacci(10)), asyncio.Task(binomialCoeff(20, 10))]
通過run_until_complete并行運行三個協(xié)程,asyncio.wait(tasks)表示運行直到所有給定的協(xié)程都完成。
最后關(guān)閉事件循環(huán):loop.close()
6、使用Asyncio和Futures
Asyncio模塊的另一個重要的組件是Futures。它和concurrent.futures.Futures很像,但是針對Asyncio的事件循環(huán)做了很多定制。asyncio.Futures類代表還未完成的結(jié)果,有可能是一個Exception,所以綜合來說,它是一種抽象的代表還沒有做完的事情。
實際上,必須處理一些結(jié)果的回調(diào)函數(shù)被加入到了這個類的實例中。
基本方法:
- cancel():取消future的執(zhí)行,調(diào)度回調(diào)函數(shù)
- result():返回future代表的結(jié)果
- exception():返回future中的Exception
- add_done_callback(fn):添加一個回調(diào)函數(shù),當future執(zhí)行的時候會調(diào)用這個回調(diào)函數(shù)
- remove_done_callback(fn):從call when done列表中移除所有的callback的實例
- set_result(result):將future標為執(zhí)行完成,并且設(shè)置result的值
- set_exception(exception):將future標為執(zhí)行完成,并設(shè)置Exception
測試用例:
# coding : utf-8 import asyncio import sys @asyncio.coroutine def first_coroutine(future, n): # 計算前n個數(shù)的和 count = 0 for i in range(1, n+1): count = count + i print("first yield") yield from asyncio.sleep(2) print("first_coroutine finished") # 將future標記為已完成,并設(shè)置result的值 future.set_result("first coroutine (sum of n integers) result = %s" %str(count)) @asyncio.coroutine def second_coroutine(future, n): count = 1 for i in range(2, n+1): count *= i print("second yield") yield from asyncio.sleep(1) print("second_coroutine finished") future.set_result("second coroutine (factorial) result = %s" %str(count)) def got_result(future): # 獲取future的set_result結(jié)果 print(future.result()) if __name__ == "__main__": N1 = int(sys.argv[1]) N2 = int(sys.argv[2]) loop = asyncio.get_event_loop() future1 = asyncio.Future() future2 = asyncio.Future() tasks = [first_coroutine(future1, N1), second_coroutine(future2, N2)] # 添加回調(diào)函數(shù) future1.add_done_callback(got_result) future2.add_done_callback(got_result) loop.run_until_complete(asyncio.wait(tasks)) loop.close()
運行結(jié)果:

浙公網(wǎng)安備 33010602011771號