Python的多線程
1、進程和線程
現代操作系統(Windows,macOS,Linux)都可以執行多任務,多任務就是同時運行多個任務。
現在,多核CPU已經非常普及了,但是,即使過去的單核CPU,也可以執行多任務。由于CPU執行代碼都是順序執行的,操作系統輪流讓各個任務交替執行,任務1執行0.01秒,切換到任務2,任務2執行0.01秒,再切換到任務3,執行0.01秒……這樣反復執行下去。表面上看,每個任務都是交替執行的,但是,由于CPU的執行速度實在是太快了,我們感覺就像所有任務都在同時執行一樣。
真正的并行執行多任務只能在多核CPU上實現,但是,由于任務數量遠遠多于CPU的核心數量,所以,操作系統也會自動把很多任務輪流調度到每個核心上執行。
線程是最小的執行單元,而進程由至少一個線程組成。如何調度進程和線程,完全由操作系統決定,程序自己不能決定什么時候執行,執行多長時間。
如果要同時執行多個任務,有三種解決方案:
一種是啟動多個進程,每個進程雖然只有一個線程,但多個進程可以一塊執行多個任務。還有一種方法是啟動一個進程,在一個進程內啟動多個線程,這樣,多個線程也可以一塊執行多個任務。當然還有第三種方法,就是啟動多個進程,每個進程再啟動多個線程,這樣同時執行的任務就更多了,當然這種模型更復雜,實際很少采用。
總結一下就是,多任務的實現有3種方式:
- 多進程模式;
- 多線程模式;
- 多進程+多線程模式。
多線程類似于同時執行多個不同程序,多線程運行有如下優點:
- 使用線程可以把占據長時間的程序中的任務放到后臺去處理。
- 用戶界面可以更加吸引人,比如用戶點擊了一個按鈕去觸發某些事件的處理,可以彈出一個進度條來顯示處理的進度。
- 程序的運行速度可能加快。
- 在一些等待的任務實現上如用戶輸入、文件讀寫和網絡收發數據等,線程就比較有用了。在這種情況下我們可以釋放一些珍貴的資源如內存占用等等。
2、多進程
2.1、創建新進程(Process)
通過 OS 模塊的 fork 調用可以在 Linux 和 Unix 系統上 生成多個進程,但只能在 Linux、Unix系統上運行,詳情可參考:https://www.liaoxuefeng.com/wiki/1016959663602400/1017628290184064
模塊我們可以實現跨平臺使用多進程。multiprocessing模塊是跨平臺版本的多進程模塊,通過該
multiprocessing模塊提供了一個進程類Process來創建子進程。創建子進程時,只需要傳入一個執行函數和函數的參數。進程實例用start()方法啟動,join()方法可以等待子進程結束后再繼續往下運行,通常用于進程間的同步。
from multiprocessing import Process import os # 子進程要執行的代碼 def run_proc(name): print('Run child process %s (%s)...' % (name, os.getpid())) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Process(target=run_proc, args=('test',)) print('Child process will start.') p.start() p.join() print('Child process end.') #上面將輸出: Parent process 9204. Child process will start. Run child process test (2172)... Child process end.
2.2、進程池
如果要啟動大量的子進程,可以用進程池的方式批量創建子進程:
from multiprocessing import Pool import os, time, random def long_time_task(name): print('第 %s (%s) 個進程正在執行...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('第 %s 個進程的執行時間為: %0.2f 秒.' % (name, (end - start))) if __name__=='__main__': print('父進程 %s.' % os.getpid()) p = Pool(4) #Pool(4)表示最多同時執行4個進程 for i in range(5): p.apply_async(long_time_task, args=(i,)) print('等待所有的進程執行完畢') p.close() p.join() #對進程池實例對象調用join()方法會阻塞父進程,知道所有的子進程都執行完畢 print('所有的進程都已經執行完畢')
上述代碼的執行結果可能如下:
父進程 3016. 等待所有的進程執行完畢 第 0 (10420) 個進程正在執行... 第 1 (10224) 個進程正在執行... 第 2 (9020) 個進程正在執行... 第 3 (8672) 個進程正在執行... 第 0 個進程的執行時間為: 0.92 秒. 第 4 (10420) 個進程正在執行... 第 2 個進程的執行時間為: 1.79 秒. 第 4 個進程的執行時間為: 1.28 秒. 第 3 個進程的執行時間為: 2.72 秒. 第 1 個進程的執行時間為: 2.96 秒. 所有的進程都已經執行完畢
執行結果當中,第幾個進程先執行完畢是不確定的。可以看到,有多個進程是同時執行的。
對Pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close(),調用close()之后就不能繼續添加新的Process了。Pool(n)表示最多同時執行n個進程,所以從上述執行結果來看,可以看到執行到第3個進程時,剛好有4個進程在執行,此時第4個進程必須等到前面的進程有一個執行完畢后才能執行。Pool的默認大小是CPU的核數。
2.3、進程間的通信
Process之間肯定是需要通信的,操作系統提供了很多機制來實現進程間的通信。Python的multiprocessing模塊包裝了底層的機制,提供了Queue、Pipes等多種方式來交換數據。
我們以Queue為例,在父進程中創建兩個子進程,一個往Queue里寫數據,一個從Queue里讀數據:
from multiprocessing import Process, Queue import os, time, random # 寫數據進程執行的代碼: def write(q): print('進程開始寫操作。。 %s' % os.getpid()) for value in ['A', 'B', 'C']: print('插入值 %s 到 queue 當中...' % value) q.put(value) time.sleep(random.random()) # 讀數據進程執行的代碼: def read(q): print('進程開始讀操作。。 %s' % os.getpid()) while True: value = q.get(True) print('從queue當中讀取值 %s from.' % value) if __name__=='__main__': # 父進程創建Queue,并傳給各個子進程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,))
# 啟動子進程pw,進行寫入操作 pw.start() # 啟動子進程pr,進行讀取操作 pr.start()
# 等待pw結束 pw.join()
# pr進程里是死循環,無法等待其結束,只能強行終止: pr.terminate()
上述代碼的執行結果如下:
進程開始寫操作。。 1940 插入值 A 到 queue 當中... 進程開始讀操作。。 5892 從queue當中讀取值 A from. 插入值 B 到 queue 當中... 從queue當中讀取值 B from. 插入值 C 到 queue 當中... 從queue當中讀取值 C from.
2、多線程
多任務可以由多進程完成,也可以由一個進程內的多線程完成。
Python的標準庫提供了兩個模塊:_thread和threading,_thread是低級模塊,threading是高級模塊,對_thread進行了封裝。絕大多數情況下,我們只需要使用threading這個高級模塊。
2.1、使用threading模塊創建線程
啟動一個線程就是把一個函數傳入并創建Thread實例,然后調用start()開始執行:
import time, threading # 新線程執行的代碼: def loop(): print('線程 %s 正在執行...' % threading.current_thread().name) n = 0 while n < 5: n = n + 1 print('線程 %s >>> %s' % (threading.current_thread().name, n)) time.sleep(1) print('線程 %s 執行完畢.' % threading.current_thread().name) print('線程 %s 正在執行...' % threading.current_thread().name) t = threading.Thread(target=loop, name='新線程AA') #給線程傳遞函數參數且命名 t.start() t.join() print('線程 %s 執行完畢.' % threading.current_thread().name)
上述代碼的執行結果如下:
線程 MainThread 正在執行... 線程 新線程AA 正在執行... 線程 新線程AA >>> 1 線程 新線程AA >>> 2 線程 新線程AA >>> 3 線程 新線程AA >>> 4 線程 新線程AA >>> 5 線程 新線程AA 執行完畢. 線程 MainThread 執行完畢.
由于任何進程默認就會啟動一個線程,我們把該線程稱為主線程,主線程又可以啟動新的線程,Python的threading模塊有個current_thread()函數,它永遠返回當前線程的實例。
主線程實例的名字叫MainThread,子線程的名字在創建時指定,我們可以隨意給子線程命名。名字僅僅在打印時用來顯示,完全沒有其他意義,如果不起名字Python就自動給線程命名為Thread-1,Thread-2……
通過繼承來創建線程:
我們可以通過直接從 threading.Thread 繼承創建一個新的子類,并實例化后調用 start() 方法啟動新線程,即它調用了線程的 run() 方法:
#!/usr/bin/python3 import threading import time exitFlag = 0 class myThread (threading.Thread): def __init__(self, threadID, name, counter): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter def run(self): print ("開始線程:" + self.name) print_time(self.name, self.counter, 5) print ("退出線程:" + self.name) def print_time(threadName, delay, counter): while counter: if exitFlag: threadName.exit() time.sleep(delay) print ("%s: %s" % (threadName, time.ctime(time.time()))) counter -= 1 # 創建新線程 thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2) # 開啟新線程 thread1.start() thread2.start() thread1.join() thread2.join() print ("退出主線程")
上述代碼的執行結果:
開始線程:Thread-1 開始線程:Thread-2 Thread-1: Tue Jan 26 23:26:17 2021 Thread-2: Tue Jan 26 23:26:18 2021 Thread-1: Tue Jan 26 23:26:18 2021 Thread-1: Tue Jan 26 23:26:19 2021 Thread-2: Tue Jan 26 23:26:20 2021 Thread-1: Tue Jan 26 23:26:20 2021 Thread-1: Tue Jan 26 23:26:21 2021 退出線程:Thread-1 Thread-2: Tue Jan 26 23:26:22 2021 Thread-2: Tue Jan 26 23:26:24 2021 Thread-2: Tue Jan 26 23:26:26 2021 退出線程:Thread-2 退出主線程
2.1.1、threading模塊的常見方法
threading模塊的常見方法方法:
- threading.currentThread(): 返回當前的線程變量。
- threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
- threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
線程類實例的常見方法:除了使用方法外,線程模塊同樣提供了Thread類來處理線程,Thread類提供了以下方法:
- run(): 用以表示線程活動的方法。
- start():啟動線程活動
- join([time]): 等待至線程中止。這阻塞調用線程直至線程的join() 方法被調用中止-正常退出或者拋出未處理的異常-或者是可選的超時發生。
- isAlive(): 返回線程是否活動的。
- getName(): 返回線程名。
- setName(): 設置線程名。
2.2、給線程加鎖
多線程和多進程最大的不同在于,多進程中,同一個變量,各自有一份拷貝存在于每個進程中,互不影響,而多線程中,所有變量都由所有線程共享,所以,任何一個變量都可以被任何一個線程修改,因此,線程之間共享數據最大的危險在于多個線程同時改一個變量,把內容給改亂了。
要想避免多線程執行時,導致共享變量發生數據混亂的問題,可以給多個線程共享的修改變量的方法加鎖。python中創建一個鎖通過threading.Lock()來實現。
代碼實例:
import time, threading # 假定這是你的銀行存款: balance = 0 lock = threading.Lock() def change_it(n): # 先存后取,結果應該為0: global balance balance = balance + n balance = balance - n def run_thread(n): for i in range(100000): # 先要獲取鎖: lock.acquire() try: # 放心地改吧: change_it(n) finally: # 改完了一定要釋放鎖: lock.release() t1 = threading.Thread(target=run_thread, args=(5,)) t2 = threading.Thread(target=run_thread, args=(8,)) t1.start() t2.start() t1.join() t2.join()
上述代碼,當多個線程同時執行lock.acquire()時,只有一個線程能成功地獲取鎖,然后繼續執行代碼,其他線程就繼續等待直到獲得鎖為止。獲得鎖的線程用完后一定要釋放鎖,否則那些苦苦等待鎖的線程將永遠等待下去,成為死線程。所以我們用try...finally來確保鎖一定會被釋放。
鎖的好處就是確保了某段關鍵代碼只能由一個線程從頭到尾完整地執行,壞處當然也很多,首先是阻止了多線程并發執行,包含鎖的某段代碼實際上只能以單線程模式執行,效率就大大地下降了。其次,由于可以存在多個鎖,不同的線程持有不同的鎖,并試圖獲取對方持有的鎖時,可能會造成死鎖,導致多個線程全部掛起,既不能執行,也無法結束,只能靠操作系統強制終止。
參考:https://www.liaoxuefeng.com/wiki/1016959663602400/1017629247922688

浙公網安備 33010602011771號