本節內容

  1. 操作系統發展史介紹

  2. 進程與線程區別

  3. python GIL全局解釋器鎖

  4. 線程(語法,join,線程鎖,線程變守護進程,event,queue,線程池)

  5. 進程(語法,進程間通訊,進程池)

  6. 協程

操作系統發展史

手工操作(無操作系統)

1946年第一臺計算機誕生 ---- 20世紀50年代中期,還未出現操作系統,計算機采用手工操作方式。

手工操作

程序員將應用程序和帶有數據的已穿孔的紙帶或卡片裝入輸入機,然后啟動輸入機把程序和數據輸入計算機內存,接著通過控制臺開關啟動程序針對數據運行;計算完畢,打印機輸出計算結果;用戶取走結果并卸下紙帶或卡片后,下一個用戶上機。

 

 

 手工操作方式兩個特點:

  1. 用戶獨占全機。不會出現因資源已被其它用戶占用而等待的現象,但資源的利用率低

  2. CPU 等待手工操作。CPU 利用不充分。

20世紀50年代后期,出現了人機矛盾:手工操作的慢速度和計算機的高速度之間形成了尖銳矛盾,手工操作方式已嚴重損害了系統資源的利用率(使資源利用率降為百分之幾,甚至更低),不能容忍。唯一的解決辦法:只有擺脫人的手工操作,實現作業的自動過渡。這樣就出現了成批處理

批操作系統

批操作系統:加載在計算機上的一個系統軟件,在它的控制下,計算機能夠自動地、成批地處理一個或多個用戶的作業(包括程序,數據和命令)。

聯機批操作系統

首先出現的是聯機批處理系統,即作業的輸入/輸出由 CPU 來處理。主機與輸入機之間增加一個存儲設備 ---- 磁帶,在運行于主機上的監督程序的自動控制下,計算機可自動完成:成批地把輸入機上的用戶作業讀入磁帶,依次把磁帶上的用戶作業讀入主機內存并執行并把計算結果向輸出機輸出。完成了上一批作業后,監督程序又從輸入機上輸入另一批作業,保存在磁帶上,并按上述步驟重復處理。

 

 

 監督程序不停的處理各個作業,從而實現了作業到作業的自動連接,減少了作業建立時間和手工操作時間,有效克服了人機矛盾,提高了計算機的利用率。

但是,在作業輸入和輸出時,主機的高速 CPU 仍處于空閑狀態,等待慢速的輸入/輸出設備完成工作:主機處于“忙等”狀態。

脫機批處理系統

為克服與緩解高速主機與慢速外設的矛盾,提高 CPU 的利用率,又引入了脫機批處理系統,即輸入/輸出脫離主機控制。

這種方式的顯著特征是:增加一臺不與主機直接相連而專門用于與輸入/輸出設備打交道的衛星機。

其功能是:

  1. 從輸入機上讀取用戶作業并放到輸入磁帶上。

  2. 從輸出磁帶上讀取執行結果并傳給輸出機。

這樣,主機不是直接與慢速的輸入/輸出設備打交道,而是與速度相對較快的磁帶機發生關系,有效緩解了主機與設備的矛盾。主機與衛星機可并行工作,二者分工明確,可以充分發揮主機的高速計算能力。

脫機批處理系統:20世紀60年代應用十分廣泛,它極大緩解了人機矛盾及主機與外設的矛盾。IBM-7090/7094:配備的監督程序就是脫機批處理系統,是現代操作系統的原型。

不足:每次主機內存中僅存放一道作業,每當它運行期間發出輸入/輸出(I/O)請求后,高速的CPU便處于等待低速的I/O完成狀態,致使CPU空閑。

為改善CPU的利用率,又引入了多道程序系統。

多道程序系統

多道程序設計技術

所謂多道程序設計技術,就是指允許多個程序同時進入內存并運行。即同時把多個程序放入內存,并允許它們交替在CPU中運行,它們共享系統中的各種硬、軟件資源。當一道程序因I/O請求而暫停運行時,CPU便立即轉去運行另一道程序。

單道程序的運行過程:
在A程序計算時,I/O空閑, A程序I/O操作時,CPU空閑(B程序也是同樣);必須A工作完成后,B才能進入內存中開始工作,兩者是串行的,全部完成共需時間=T1+T2。


多道程序的運行過程:
將A、B兩道程序同時存放在內存中,它們在系統的控制下,可相互穿插、交替地在CPU上運行:當A程序因請求I/O操作而放棄CPU時,B程序就可占用CPU運行,這樣 CPU不再空閑,而正進行A I/O操作的I/O設備也不空閑,顯然,CPU和I/O設備都處于“忙”狀態,大大提高了資源的利用率,從而也提高了系統的效率,A、B全部完成所需時間<<T1+T2。

多道程序設計技術不僅使CPU得到充分利用,同時改善I/O設備和內存的利用率,從而提高了整個系統的資源利用率和系統吞吐量(單位時間內處理作業(程序)的個數),最終提高了整個系統的效率。

單處理機系統中多道程序運行時的特點:
(1)多道:計算機內存中同時存放幾道相互獨立的程序;
(2)宏觀上并行:同時進入系統的幾道程序都處于運行過程中,即它們先后開始了各自的運行,但都未運行完畢;
(3)微觀上串行:實際上,各道程序輪流地用CPU,并交替運行。

多道程序系統的出現,標志著操作系統漸趨成熟的階段,先后出現了作業調度管理、處理機管理、存儲器管理、外部設備管理、文件系統管理等功能。
多道批處理系統
20世紀60年代中期,在前述的批處理系統中,引入多道程序設計技術后形成多道批處理系統(簡稱:批處理系統)。
它有兩個特點:
(1)多道:系統內可同時容納多個作業。這些作業放在外存中,組成一個后備隊列,系統按一定的調度原則每次從后備作業隊列中選取一個或多個作業進入內存運行,運行作業結束、退出運行和后備作業進入運行均由系統自動實現,從而在系統中形成一個自動轉接的、連續的作業流。
(2)成批:在系統運行過程中,不允許用戶與其作業發生交互作用,即:作業一旦進入系統,用戶就不能直接干預其作業的運行。

批處理系統的追求目標:提高系統資源利用率和系統吞吐量,以及作業流程的自動化。

批處理系統的一個重要缺點:不提供人機交互能力,給用戶使用計算機帶來不便。
雖然用戶獨占全機資源,并且直接控制程序的運行,可以隨時了解程序運行情況。但這種工作方式因獨占全機造成資源效率極低。

一種新的追求目標:既能保證計算機效率,又能方便用戶使用計算機。 20世紀60年代中期,計算機技術和軟件技術的發展使這種追求成為可能。

分時系統

由于CPU速度不斷提高和采用分時技術,一臺計算機可同時連接多個用戶終端,而每個用戶可在自己的終端上聯機使用計算機,好象自己獨占機器一樣。

分時技術:把處理機的運行時間分成很短的時間片,按時間片輪流把處理機分配給各聯機作業使用。

若某個作業在分配給它的時間片內不能完成其計算,則該作業暫時中斷,把處理機讓給另一作業使用,等待下一輪時再繼續其運行。由于計算機速度很快,作業運行輪轉得很快,給每個用戶的印象是,好象他獨占了一臺計算機。而每個用戶可以通過自己的終端向系統發出各種操作控制命令,在充分的人機交互情況下,完成作業的運行。

具有上述特征的計算機系統稱為分時系統,它允許多個用戶同時聯機使用計算機。


特點:
(1)多路性。若干個用戶同時使用一臺計算機。微觀上看是各用戶輪流使用計算機;宏觀上看是各用戶并行工作。
(2)交互性。用戶可根據系統對請求的響應結果,進一步向系統提出新的請求。這種能使用戶與系統進行人機對話的工作方式,明顯地有別于批處理系統,因而,分時系統又被稱為交互式系統。
(3)獨立性。用戶之間可以相互獨立操作,互不干擾。系統保證各用戶程序運行的完整性,不會發生相互混淆或破壞現象。
(4)及時性。系統可對用戶的輸入及時作出響應。分時系統性能的主要指標之一是響應時間,它是指:從終端發出命令到系統予以應答所需的時間。
分時系統的主要目標:對用戶響應的及時性,即不至于用戶等待每一個命令的處理時間過長。

分時系統可以同時接納數十個甚至上百個用戶,由于內存空間有限,往往采用對換(又稱交換)方式的存儲方法。即將未“輪到”的作業放入磁盤,一旦“輪到”,再將其調入內存;而時間片用完后,又將作業存回磁盤(俗稱“滾進”、“滾出“法),使同一存儲區域輪流為多個用戶服務。

多用戶分時系統是當今計算機操作系統中最普遍使用的一類操作系統。

實時系統

雖然多道批處理系統和分時系統能獲得較令人滿意的資源利用率和系統響應時間,但卻不能滿足實時控制與實時信息處理兩個應用領域的需求。于是就產生了實時系統,即系統能夠及時響應隨機發生的外部事件,并在嚴格的時間范圍內完成對該事件的處理。
實時系統在一個特定的應用中常作為一種控制設備來使用。
實時系統可分成兩類:
(1)實時控制系統。當用于飛機飛行、導彈發射等的自動控制時,要求計算機能盡快處理測量系統測得的數據,及時地對飛機或導彈進行控制,或將有關信息通過顯示終端提供給決策人員。當用于軋鋼、石化等工業生產過程控制時,也要求計算機能及時處理由各類傳感器送來的數據,然后控制相應的執行機構。
(2)實時信息處理系統。當用于預定飛機票、查詢有關航班、航線、票價等事宜時,或當用于銀行系統、情報檢索系統時,都要求計算機能對終端設備發來的服務請求及時予以正確的回答。此類對響應及時性的要求稍弱于第一類。
實時操作系統的主要特點:
(1)及時響應。每一個信息接收、分析處理和發送的過程必須在嚴格的時間限制內完成。
(2)高可靠性。需采取冗余措施,雙機系統前后臺工作,也包括必要的保密措施等。

操作系統發展圖譜

進程與線程

什么是進程(process)?

進程(Process)是計算機中的程序關于某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操作系統結構的基礎。在早期面向進程設計的計算機結構中,進程是程序的基本執行實體;在當代面向線程設計的計算機結構中,進程是線程的容器。程序是指令、數據及其組織形式的描述,進程是程序的實體。

程序并不能單獨運行,只有將程序裝載到內存中,系統為它分配資源才能運行,而這種執行的程序就稱之為進程。程序和進程的區別就在于:程序是指令的集合,它是進程運行的靜態描述文本;進程是程序的一次執行活動,屬于動態概念。

在多道編程中,我們允許多個程序同時加載到內存中,在操作系統的調度下,可以實現并發地執行。這是這樣的設計,大大提高了CPU的利用率。進程的出現讓每個用戶感覺到自己獨享CPU,因此,進程就是為了在CPU上實現多道編程而提出的。

有了進程為什么還要線程?

進程有很多優點,它提供了多道編程,讓我們感覺我們每個人都擁有自己的CPU和其他資源,可以提高計算機的利用率。很多人就不理解了,既然進程這么優秀,為什么還要線程呢?其實,仔細觀察就會發現進程還是有很多缺陷的,主要體現在兩點上:

  • 進程只能在一個時間干一件事,如果想同時干兩件事或多件事,進程就無能為力了。

  • 進程在執行的過程中如果阻塞,例如等待輸入,整個進程就會掛起,即使進程中有些工作不依賴于輸入的數據,也將無法執行。

例如,我們在使用qq聊天, qq做為一個獨立進程如果同一時間只能干一件事,那他如何實現在同一時刻 即能監聽鍵盤輸入、又能監聽其它人給你發的消息、同時還能把別人發的消息顯示在屏幕上呢?你會說,操作系統不是有分時么?但我的親,分時是指在不同進程間的分時呀, 即操作系統處理一會你的qq任務,又切換到word文檔任務上了,每個cpu時間片分給你的qq程序時,你的qq還是只能同時干一件事呀。

再直白一點, 一個操作系統就像是一個工廠,工廠里面有很多個生產車間,不同的車間生產不同的產品,每個車間就相當于一個進程,且你的工廠又窮,供電不足,同一時間只能給一個車間供電,為了能讓所有車間都能同時生產,你的工廠的電工只能給不同的車間分時供電,但是輪到你的qq車間時,發現只有一個干活的工人,結果生產效率極低,為了解決這個問題,應該怎么辦呢?。。。。沒錯,你肯定想到了,就是多加幾個工人,讓幾個人工人并行工作,這每個工人,就是線程!

什么是線程(thread)?

 線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以并發多個線程,每條線程并行執行不同的任務。

線程是執行上下文,它是CPU執行指令流所需的所有信息。假設你在讀一本書,你現在想休息一下,但是你想回到你停下來的地方繼續閱讀。實現這一點的一種方法是記下頁碼、行號和單詞編號。所以閱讀一本書的執行上下文是這3個數字。如果你有一個室友,而她也在使用同樣的方法,她可以在你不使用的時候拿著這本書,從她停下來的地方繼續閱讀。然后你可以把它拿回來,從你原來的地方繼續。線程的工作方式相同。CPU給你一種錯覺,認為它同時進行多個計算。它通過在每次計算上花費一點時間來做到這一點。它可以這樣做,因為它為每個計算都有一個執行上下文。就像你可以和你的朋友共享一本書一樣,許多任務也可以共享一個CPU。在技術層面上,執行上下文(因此是線程)由CPU寄存器的值組成。最后:線程與進程不同。線程是執行的上下文,而進程是與計算關聯的一組資源。一個進程可以有一個或多個線程。說明:與進程關聯的資源包括內存頁(進程中的所有線程都具有相同的內存視圖)、文件描述符(例如,開放套接字)和安全憑據(例如,啟動進程的用戶的ID)。

進程與線程的區別

   1. 線程共享創建它的進程的地址空間;進程有自己的地址空間。
   2. 線程可以直接訪問其進程的數據段;進程有自己的父進程數據段副本。
   3. 線程可以直接與其進程的其他線程通信;進程必須使用進程間通信與兄弟進程通信。
   4. 新線程很容易創建;新進程需要父進程的重復。
   5. 線程可以對同一進程的線程進行相當大的控制;進程只能對子進程進行控制。
   6. 對主線程的更改(取消、優先級更改等)可能會影響進程的其他線程的行為;對父進程的更改不會影響子進程。

 python GIL

 首先需要明確的一點是GIL并不是Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標準,但是可以用不同的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行。像其中的JPython就沒有GIL。然而因為CPython是大部分環境下默認的Python執行環境。所以在很多人的概念里CPython就是Python,也就想當然的把GIL歸結為Python語言的缺陷。所以這里要先明確一點:GIL并不是Python的特性,Python完全可以不依賴于GIL

無論你啟多少個線程,有多少個 CPU ,python 執行的時候在同一時刻下只允許一個線程運行。

線程

python threading 模塊

 線程有2種調用方式

直接調用

 1 import threading
 2 import time
 3  
 4 def sayhi(num): #定義每個線程要運行的函數
 5  
 6     print("running on number:%s" %num)
 7  
 8     time.sleep(3)
 9  
10 if __name__ == '__main__':
11  
12     t1 = threading.Thread(target=sayhi,args=(1,)) #生成一個線程實例
13     t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一個線程實例
14  
15     t1.start() #啟動線程
16     t2.start() #啟動另一個線程
17  
18     print(t1.getName()) #獲取線程名
19     print(t2.getName())

 

繼承式調用

 1 import threading
 2 import time
 3  
 4  
 5 class MyThread(threading.Thread):
 6     def __init__(self,num):
 7         threading.Thread.__init__(self)
 8         self.num = num
 9  
10     def run(self):#定義每個線程要運行的函數
11  
12         print("running on number:%s" %self.num)
13  
14         time.sleep(3)
15  
16 if __name__ == '__main__':
17  
18     t1 = MyThread(1)
19     t2 = MyThread(2)
20     t1.start()
21     t2.start()

 

 join & daemon

 一些線程執行后臺任務,比如發送保持連接的數據包,或者執行定期的垃圾收集,等等。這些只在主程序運行時有用,一旦其他非守護進程的線程退出,就可以將它們殺死。
如果沒有守護進程線程,在程序完全退出之前,您必須跟蹤它們,并告訴它們退出。通過將它們設置為守護進程線程,可以讓它們運行并忽略它們,當程序退出時,任何守護進程線程都會自動終止。

 

 1 #_*_coding:utf-8_*_
 2 
 3  
 4 import time
 5 import threading
 6  
 7  
 8 def run(n):
 9  
10     print('[%s]------running----\n' % n)
11     time.sleep(2)
12     print('--done--')
13  
14 def main():
15     for i in range(5):
16         t = threading.Thread(target=run,args=[i,])
17         t.start()
18         t.join(1)
19         print('starting thread', t.getName())
20  
21  
22 m = threading.Thread(target=main,args=[])
23 m.setDaemon(True) #將main線程設置為Daemon線程,它做為程序主線程的守護線程,當主線程退出時,m線程也會退出,由m啟動的其它子線程會同時退出,不管是否執行完任務
24 m.start()
25 m.join(timeout=2)
26 print("---main thread done----")

 

 注意:守護進程所在線程在關閉時突然停止。它們的資源(如打開的文件、數據庫事務等)可能無法正確釋放。如果您希望線程正常停止,請將其設置為非守護進程,并使用適當的信號機制(如事件)。

 線程鎖(互斥鎖mutex)

 1 import time
 2 import threading
 3  
 4 def addNum():
 5     global num #在每個線程中都獲取這個全局變量
 6     print('--get num:',num )
 7     time.sleep(1)
 8     num  -=1 #對此公共變量進行-1操作
 9  
10 num = 100  #設定一個共享變量
11 thread_list = []
12 for i in range(100):
13     t = threading.Thread(target=addNum)
14     t.start()
15     thread_list.append(t)
16  
17 for t in thread_list: #等待所有線程執行完畢
18     t.join()
19  
20  
21 print('final num:', num )

正常來講,這個num結果應該是0, 但在python 2.7上多運行幾次,會發現,最后打印出來的num結果不總是0,為什么每次運行的結果不一樣呢? 哈,很簡單,假設你有A,B兩個線程,此時都 要對num 進行減1操作, 由于2個線程是并發同時運行的,所以2個線程很有可能同時拿走了num=100這個初始變量交給cpu去運算,當A線程去處完的結果是99,但此時B線程運算完的結果也是99,兩個線程同時CPU運算的結果再賦值給num變量后,結果就都是99。那怎么辦呢? 很簡單,每個線程在要修改公共數據時,為了避免自己在還沒改完的時候別人也來修改此數據,可以給這個數據加一把鎖, 這樣其它線程想修改此數據時就必須等待你修改完畢并把鎖釋放掉后才能再訪問此數據。 

*注:不要在3.x上運行,不知為什么,3.x上的結果總是正確的,可能是自動加了鎖

加鎖:

 1 import time
 2 import threading
 3  
 4 def addNum():
 5     global num #在每個線程中都獲取這個全局變量
 6     print('--get num:',num )
 7     time.sleep(1)
 8     lock.acquire() #修改數據前加鎖
 9     num  -=1 #對此公共變量進行-1操作
10     lock.release() #修改后釋放
11  
12 num = 100  #設定一個共享變量
13 thread_list = []
14 lock = threading.Lock() #生成全局鎖
15 for i in range(100):
16     t = threading.Thread(target=addNum)
17     t.start()
18     thread_list.append(t)
19  
20 for t in thread_list: #等待所有線程執行完畢
21     t.join()
22  
23 print('final num:', num )

 

 GUL vs Lock

 Python已經有一個GIL來保證同一時間只能有一個線程來執行了,為什么這里還需要lock? 注意,這里的lock是用戶級的lock,跟那個GIL沒關系。

 

 既然用戶程序已經自己有鎖了,那為什么C python還需要GIL呢?加入GIL主要的原因是為了降低程序的開發的復雜度,比如現在的你寫python不需要關心內存回收的問題,因為Python解釋器幫你自動定期進行內存回收,你可以理解為python解釋器里有一個獨立的線程,每過一段時間它起wake up做一次全局輪詢看看哪些內存數據是可以被清空的,此時你自己的程序 里的線程和 py解釋器自己的線程是并發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程中的clearing時刻,可能一個其它線程正好又重新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,為了解決類似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題,  這可以說是Python早期版本的遺留問題。

RLock (遞歸鎖)

 就是在一個大鎖中還要再包含子鎖

 1 import threading,time
 2  
 3 def run1():
 4     print("grab the first part data")
 5     lock.acquire()
 6     global num
 7     num +=1
 8     lock.release()
 9     return num
10 def run2():
11     print("grab the second part data")
12     lock.acquire()
13     global  num2
14     num2+=1
15     lock.release()
16     return num2
17 def run3():
18     lock.acquire()
19     res = run1()
20     print('--------between run1 and run2-----')
21     res2 = run2()
22     lock.release()
23     print(res,res2)
24  
25  
26 if __name__ == '__main__':
27  
28     num,num2 = 0,0
29     lock = threading.RLock()
30     for i in range(10):
31         t = threading.Thread(target=run3)
32         t.start()
33  
34 while threading.active_count() != 1:
35     print(threading.active_count())
36 else:
37     print('----all threads done---')
38     print(num,num2)

 

 信號量(Semaphore)

 互斥鎖 同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 ,比如廁所有3個坑,那最多只允許3個人上廁所,后面的人只能等里面有人出來了才能再進去。

 1 import threading,time
 2  
 3 def run(n):
 4     semaphore.acquire()
 5     time.sleep(1)
 6     print("run the thread: %s\n" %n)
 7     semaphore.release()
 8  
 9 if __name__ == '__main__':
10  
11     num= 0
12     semaphore  = threading.BoundedSemaphore(5) #最多允許5個線程同時運行
13     for i in range(20):
14         t = threading.Thread(target=run,args=(i,))
15         t.start()
16  
17 while threading.active_count() != 1:
18     pass #print threading.active_count()
19 else:
20     print('----all threads done---')
21     print(num)

 

 計時器(Timer)

 此類表示僅在經過一定時間后才應運行的操作。與線程一樣,計時器通過調用其start()方法來啟動??梢酝ㄟ^調用cancel()方法停止計時器(在其操作開始之前)。計時器在執行其操作之前等待的間隔可能與用戶指定的間隔不完全相同。

1 def hello():
2     print("hello, world")
3  
4 t = Timer(30.0, hello)
5 t.start()  # after 30 seconds, "hello, world" will be printed

 

 events

 通過Event來實現兩個或多個線程間的交互,下面是一個紅綠燈的例子,即起動一個線程做交通指揮燈,生成幾個線程做車輛,車輛行駛按紅燈停,綠燈行的規則。

 1 import threading,time
 2 import random
 3 def light():
 4     if not event.isSet():
 5         event.set() #wait就不阻塞 #綠燈狀態
 6     count = 0
 7     while True:
 8         if count < 10:
 9             print('\033[42;1m--green light on---\033[0m')
10         elif count <13:
11             print('\033[43;1m--yellow light on---\033[0m')
12         elif count <20:
13             if event.isSet():
14                 event.clear()
15             print('\033[41;1m--red light on---\033[0m')
16         else:
17             count = 0
18             event.set() #打開綠燈
19         time.sleep(1)
20         count +=1
21 def car(n):
22     while 1:
23         time.sleep(random.randrange(10))
24         if  event.isSet(): #綠燈
25             print("car [%s] is running.." % n)
26         else:
27             print("car [%s] is waiting for the red light.." %n)
28 if __name__ == '__main__':
29     event = threading.Event()
30     Light = threading.Thread(target=light)
31     Light.start()
32     for i in range(3):
33         t = threading.Thread(target=car,args=(i,))
34         t.start()

 

 一個event使用的例子,員工進公司門要刷卡, 我們這里設置一個線程是“門”, 再設置幾個線程為“員工”,員工看到門沒打開,就刷卡,刷完卡,門開了,員工就可以通過。

 1 #_*_coding:utf-8_*_
 2 import threading
 3 import time
 4 import random
 5 
 6 def door():
 7     door_open_time_counter = 0
 8     while True:
 9         if door_swiping_event.is_set():
10             print("\033[32;1mdoor opening....\033[0m")
11             door_open_time_counter +=1
12 
13         else:
14             print("\033[31;1mdoor closed...., swipe to open.\033[0m")
15             door_open_time_counter = 0 #清空計時器
16             door_swiping_event.wait()
17 
18 
19         if door_open_time_counter > 3:#門開了已經3s了,該關了
20             door_swiping_event.clear()
21 
22         time.sleep(0.5)
23 
24 
25 def staff(n):
26 
27     print("staff [%s] is comming..." % n )
28     while True:
29         if door_swiping_event.is_set():
30             print("\033[34;1mdoor is opened, passing.....\033[0m")
31             break
32         else:
33             print("staff [%s] sees door got closed, swipping the card....." % n)
34             print(door_swiping_event.set())
35             door_swiping_event.set()
36             print("after set ",door_swiping_event.set())
37         time.sleep(0.5)
38 door_swiping_event  = threading.Event() #設置事件
39 
40 
41 door_thread = threading.Thread(target=door)
42 door_thread.start()
43 
44 
45 
46 for i in range(5):
47     p = threading.Thread(target=staff,args=(i,))
48     time.sleep(random.randrange(3))
49     p.start()

 

 queue 隊列

  • class queue.Queue(maxsize=0) #先入先出
  • class queue.LifoQueue(maxsize=0) #last in fisrt out 
  • class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列
  • exception queue.Empty
    exception queue.Full
    Queue.qsize()
    Queue.empty() #return True if empty  
    Queue.full() # return True if full 
    Queue.put(itemblock=Truetimeout=None)
    Queue.put_nowait(item)
    Queue.get(block=Truetimeout=None)
  • Queue.get_nowait()
  • Queue.task_done()
  • Queue.join()

 生產者消費者模型

 1 import threading
 2 import queue
 3  
 4 def producer():
 5     for i in range(10):
 6         q.put("骨頭 %s" % i )
 7  
 8     print("開始等待所有的骨頭被取走...")
 9     q.join()
10     print("所有的骨頭被取完了...")
11  
12  
13 def consumer(n):
14  
15     while q.qsize() >0:
16  
17         print("%s 取到" %n  , q.get())
18         q.task_done() #告知這個任務執行完了
19  
20  
21 q = queue.Queue()
22  
23  
24  
25 p = threading.Thread(target=producer,)
26 p.start()
27  
28 c1 = consumer("李闖")
 1 import time,random
 2 import queue,threading
 3 q = queue.Queue()
 4 def Producer(name):
 5   count = 0
 6   while count <20:
 7     time.sleep(random.randrange(3))
 8     q.put(count)
 9     print('Producer %s has produced %s baozi..' %(name, count))
10     count +=1
11 def Consumer(name):
12   count = 0
13   while count <20:
14     time.sleep(random.randrange(4))
15     if not q.empty():
16         data = q.get()
17         print(data)
18         print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
19     else:
20         print("-----no baozi anymore----")
21     count +=1
22 p1 = threading.Thread(target=Producer, args=('A',))
23 c1 = threading.Thread(target=Consumer, args=('B',))
24 p1.start()
25 c1.start()

 

 多線程 multiprocessing

 1 from multiprocessing import Process
 2 import time
 3 def f(name):
 4     time.sleep(2)
 5     print('hello', name)
 6  
 7 if __name__ == '__main__':
 8     p = Process(target=f, args=('bob',))
 9     p.start()
10     p.join()
 1 from multiprocessing import Process
 2 import os
 3  
 4 def info(title):
 5     print(title)
 6     print('module name:', __name__)
 7     print('parent process:', os.getppid())
 8     print('process id:', os.getpid())
 9     print("\n\n")
10  
11 def f(name):
12     info('\033[31;1mfunction f\033[0m')
13     print('hello', name)
14  
15 if __name__ == '__main__':
16     info('\033[32;1mmain process line\033[0m')
17     p = Process(target=f, args=('bob',))
18     p.start()
19     p.join()

 

 進程間通訊

 不同進程間內存是共享的,要想實現兩個進程間的數據交換,可以用以下方法:

queue

 使用方法跟 threading 里的 queue 差不多

 1 from multiprocessing import Process, Queue
 2  
 3 def f(q):
 4     q.put([42, None, 'hello'])
 5  
 6 if __name__ == '__main__':
 7     q = Queue()
 8     p = Process(target=f, args=(q,))
 9     p.start()
10     print(q.get())    # prints "[42, None, 'hello']"
11     p.join()

 

pipes

 1 from multiprocessing import Process, Pipe
 2  
 3 def f(conn):
 4     conn.send([42, None, 'hello'])
 5     conn.close()
 6  
 7 if __name__ == '__main__':
 8     parent_conn, child_conn = Pipe()
 9     p = Process(target=f, args=(child_conn,))
10     p.start()
11     print(parent_conn.recv())   # prints "[42, None, 'hello']"
12     p.join()

 

 managers

 1 from multiprocessing import Process, Manager
 2  
 3 def f(d, l):
 4     d[1] = '1'
 5     d['2'] = 2
 6     d[0.25] = None
 7     l.append(1)
 8     print(l)
 9  
10 if __name__ == '__main__':
11     with Manager() as manager:
12         d = manager.dict()
13  
14         l = manager.list(range(5))
15         p_list = []
16         for i in range(10):
17             p = Process(target=f, args=(d, l))
18             p.start()
19             p_list.append(p)
20         for res in p_list:
21             res.join()
22  
23         print(d)
24         print(l)

 

 進程同步

 1 from multiprocessing import Process, Lock
 2  
 3 def f(l, i):
 4     l.acquire()
 5     try:
 6         print('hello world', i)
 7     finally:
 8         l.release()
 9  
10 if __name__ == '__main__':
11     lock = Lock()
12  
13     for num in range(10):
14         Process(target=f, args=(lock, num)).start()

 

 進程池

 進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進程,那么程序就會等待,直到進程池中有可用進程為止。

進程池中有兩個方法:
  apply

  apply_asyns

 1 from  multiprocessing import Process,Pool
 2 import time
 3  
 4 def Foo(i):
 5     time.sleep(2)
 6     return i+100
 7  
 8 def Bar(arg):
 9     print('-->exec done:',arg)
10  
11 pool = Pool(5)
12  
13 for i in range(10):
14     pool.apply_async(func=Foo, args=(i,),callback=Bar)
15     #pool.apply(func=Foo, args=(i,))
16  
17 print('end')
18 pool.close()
19 pool.join()#進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。

 協程

協程,又稱微線程,纖程。英文名Coroutine。一句話說明什么是線程:協程是一種用戶態的輕量級線程

協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:

協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當于進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。

協程的好處:

  • 無需線程上下文切換的開銷
  • 無需原子操作鎖定及同步的開銷
  • "原子操作(atomic operation)是不需要synchronized",所謂原子操作是指不會被線程調度機制打斷的操作;這種操作一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另一個線程)。原子操作可以是一個步驟,也可以是多個操作步驟,但是其順序是不可以被打亂,或者切割掉只執行部分。視作整體是原子性的核心。
  • 方便切換控制流,簡化編程模型
  • 高并發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用于高并發處理。

缺點:

  • 無法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
  • 進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序
 1 import time
 2 import queue
 3 def consumer(name):
 4     print("--->starting eating baozi...")
 5     while True:
 6         new_baozi = yield
 7         print("[%s] is eating baozi %s" % (name,new_baozi))
 8         #time.sleep(1)
 9  
10 def producer():
11  
12     r = con.__next__()
13     r = con2.__next__()
14     n = 0
15     while n < 5:
16         n +=1
17         con.send(n)
18         con2.send(n)
19         print("\033[32;1m[producer]\033[0m is making baozi %s" %n )
20  
21  
22 if __name__ == '__main__':
23     con = consumer("c1")
24     con2 = consumer("c2")
25     p = producer()

 

符合下面的條件就是協程:

  1. 必須在只有一個單線程里實現并發

  2. 修改共享數據不需加鎖

  3. 用戶程序里自己保存多個控制流的上下文棧

  4. 一個協程遇到 IO 操作自動切換到其它協程