python進程、線程
一、基礎概念
二、multiprocessing 進程模塊
三、queue模塊
四、multiprocessing.dummy 線程模塊
五、threading 線程模塊
------------------------------------------------------------------
一、基礎概念
進程就是一個程序在一個數據集上的一次動態執行過程。進程一般由程序、數據集、進程控制塊三部分組成。一個程序至少有一個進程,一個進程至少有一個線程。多線程程序只要有一個線程死掉,整個進程也死掉了,而一個進程死掉并不會對另外一個進程造成影響,多進程程序更健壯。進程是資源分配的最小單位,進程有自己的獨立地址空間,每啟動一個進程,系統就會為它分配地址空間,建立數據表來維護代碼段、堆棧段和數據段。
線程是共享進程中的數據,使用相同的地址空間,因此CPU切換一個線程的花費遠比進程要小很多,同時創建一個線程的開銷也比進程要小很多。線程的引入減小了程序并發執行時的開銷。線程是程序執行的最小單位(資源調度的最小單位)。
所謂協程,可以簡單的理解為多個相互協作的子程序。在同一個線程中,當一個子程序阻塞時,我們可以讓程序馬上從一個子程序切換到另一個子程序,從而避免CPU因程序阻塞而閑置,這樣就可以提升CPU的利用率,相當于用一種協作的方式加速了程序的執行。Python中可以使用多線程和多進程來實現并發,協程也可以實現并發編程。協程實現了協作式并發。線程是系統級別的它們由操作系統調度,而協程則是程序級別的由程序根據需要自己調度。greenlet、gevent(第三方模塊)可以實現協程
Python中實現并發編程的三種方案:多線程、多進程、異步I/O。并發編程的好處在于可以提升程序的執行效率以及改善用戶體驗;壞處在于并發的程序不容易開發和調試,同時對其他程序來說它并不友好。異步任務通常通過多任務協作處理的方式來實現,由于執行時間和順序的不確定,因此需要通過回調式編程或者 `future`對象來獲取任務執行的結果。Python 3通過 `asyncio`模塊和 `await`和 `async`關鍵字(在Python 3.7中正式被列為關鍵字)來支持異步處理。當程序不需要真正的并發性或并行性,而是更多的依賴于異步處理和回調時,`asyncio`就是一種很好的選擇。如果程序中有大量的等待與休眠時,也應該考慮 `asyncio`,它很適合編寫沒有實時數據處理需求的Web應用服務器。
一般多線程要比多進程效率更高,因為進程間的切換需要的資源和開銷更大。
在python中計算密集型任務通常使用多進程。
IO密集型任務中,如讀寫文件,在網絡間通信等,適用于多線程。
線程之間的通信更方便,同一進程下的線程共享全局變量、靜態變量等數據,對于線程如何處理好同步與互斥是編寫多線程程序的難點。而進程之間的通信需要以通信的方式進行。
同步,異步,并行,并發,串行
同步:多任務開始執行,一個任務A執行結束,才可以執行另一個任務B。只存在一個線程。任務 A、B、C 全部執行完成后才算是結束。
異步:多任務開始執行,只需要主任務 A 執行完成就算結束,主任務執行的時候,可以同時執行異步任務 B、C,主任務 A 可以不需要等待異步任務 B、C 的結果。

并發:多個任務在同一個時間段內同時執行,如果是單核心計算機,CPU 會不斷地切換任務來完成并發操作。
并行:多任務在同一個時刻同時執行,計算機需要有多核心,每個核心獨立執行一個任務,多個任務同時執行,不需要切換。
串行:是同步的一種實現,就是沒有并發,所有任務一個一個執行完成。
并發不一定并行,但并行一定并發

并發和并行其實是異步線程實現的兩種形式。并行其實是真正的異步,多核CUP可以同時開啟多條線程供多個任務同時執行,互不干擾,如上圖的并行,其實和異步圖例一樣。但是并發就不一樣了,是一個偽異步。在單核CUP中只能有一條線程,但是又想執行多個任務。這個時候,只能在一條線程上不停的切換任務,比如任務A執行了20%,任務A停下里,線程讓給任務B,任務執行了30%停下,再讓任務A執行。這樣我們用的時候,由于CUP處理速度快,你看起來好像是同時執行,其實不是的,同一時間只會執行單個任務。
串行是同步的一種實現,就是沒有并發,所有任務一個一個執行完成。

并發、并行,是邏輯結構的設計模式。
同步、異步,是邏輯調用方式。
二、queue模塊
queue(隊列)是python的標準庫,在python2中模塊名為Queue,python3中改為了queue。
在python中,多個線程之間的數據是共享的,多個線程進行數據交換的時候不能夠保證數據的安全性和一致性,所以當多個線程需要進行數據交換的時候,隊列就出現了,隊列可以完美解決線程間的數據交換,保證線程間數據的安全性和一致性(簡單的來說就是多線程需要加鎖,很可能會造成死鎖,而queue自帶鎖。所以多線程結合queue會好的很多)。所以多線程下queue可以隨意使用,不會出現寫沖突。
multiprocessing中有一個Queue類(multiprocessing.Queue), queue中也有一個Queue類(queue.Queue)。queue.Queue是進程內非阻塞隊列。隊列是多個進程各自私有的。multiprocess.Queue是跨進程通信隊列,隊列是是各子進程共有,它支持多進程之間的交互,比如master-worker模式下,master進程寫入,work進程消費的模式,支持進程之間的通信。
1、常見隊列
""" queue.Queue(maxsize) 先進先出隊列 queue.LifoQueue(maxsize) 后進先出隊列 queue.PriorityQueue(maxsize) 優先級隊列 queue.deque雙線隊列
maxsize是個整數,指明了隊列中能存放的數據個數的上限。一旦達到上限,插入會導致阻塞,直到隊列中的數據被消費掉。如果maxsize小于或者等于0,隊列大小沒有限制。 """
#單向隊列 ,先進先出 import queue q = queue.Queue() q.put(123) q.put(456) q.put(789) print(q.get()) # 123 #單向隊列,后進先出 import queue q = queue.LifoQueue() q.put(123) q.put(456) q.put(789) print(q.get()) # 789 #優先級隊列 ,參數是元組,值越小優先級越高 import queue q = queue.PriorityQueue() q.put((4,123)) q.put((3,456)) q.put((2,789)) q.put((1,111)) q.put((0,666)) print(q.get()) # (0,666) #雙向隊列 import queue q = queue.deque() q.append(123) q.append(456) q.appendleft(666) print(q.pop()) # 456 print(q.popleft()) # 666
2、Queue 常見方法
import queue q = queue.Queue(maxsize=0) # 構造一個先進顯出隊列,maxsize指定隊列長度,為0時,表示隊列長度無限制。
q.join() # 等到隊列為空的時候,在執行別的操作 q.qsize() # 返回隊列的大小 (不可靠) q.empty() # 當隊列為空的時候,返回True 否則返回False (不可靠) q.full() # 當隊列滿的時候,返回True,否則返回False (不可靠),與 maxsize 大小對應 q.put(item, block=True, timeout=None) # 將item放入Queue尾部,item必須存在,可以參數block默認為True,表示當隊列滿時,會等待隊列給出可用位置, #為False時為非阻塞,此時如果隊列已滿,會引發queue.Full 異常。 可選參數timeout,表示 會阻塞設置的時間,過后, #如果隊列無法給出放入item的位置,則引發 queue.Full 異常 q.get(block=True, timeout=None) # 移除并返回隊列頭部的一個值,可選參數block默認為True,表示獲取值的時候,如果隊列為空,則阻塞,為False時,不阻塞, #若此時隊列為空,則引發 queue.Empty異常。 可選參數timeout,表示會阻塞設置的時候,過后,如果隊列為空,則引發Empty異常。 q.put_nowait(item) # 等效于 put(item,block=False) q.get_nowait() # 等效于 get(item,block=False)
3、生產者與消費者
import threading
import time
import queue
def producer(i):
q.put(i)
def consumer():
print(q.get())
time.sleep(1)
q = queue.Queue()
for i in range(10):
t = threading.Thread(target=producer,args=(i,))
t.start()
for i in range(10):
t = threading.Thread(target=consumer)
t.start()
三、multiprocessing模塊
python中的多線程無法利用多核優勢,如果想要充分的使用CPU資源(os.cpu_count()查看),在python中大部分情況需要使用多進程。multiprocess中幾乎包含了和進程有關的所有子模塊。大致分為四個部分:創建進程部分、進程同步部分、進程池部分、進程之間數據共享。multiprocessing常用組件及功能:
管理進程模塊:
- Process(用于創建進程模塊)
- Pool(用于創建管理進程池)
- Queue(用于進程通信,資源共享)
- Value,Array(用于進程通信,資源共享)
- Pipe(用于管道通信)
- Manager(用于資源共享,Manager模塊常與Pool模塊一起使用)
1、運行方式
windows系統下,想要啟動一個子進程,必須加上*if __name__=="__main__":*,linux則不需要。
2、父進程中的全局變量能被子進程共享嗎?
不行,因為每個進程享有獨立的內存數據,如果想要共享資源,可以使用Manage類,或者Queue等模塊。
3、子進程中還能再創建子進程嗎?
可以,子進程可以再創建進程,線程中也可以創建進程。
1、multiprocess.Process模塊
Process模塊是一個創建進程的模塊,借助這個模塊,就可以完成進程的創建。
Process([group [, target [, name [, args [, kwargs]]]]]) 1 group參數未使用,值始終為None 2 target表示調用對象,即子進程要執行的任務 3 args為傳給target函數的位置參數,是一個元組形式,必須有逗號,args=(1,2,'egon',) 4 kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
5 name為子進程的名稱
方法: 1 p.start():啟動進程,并調用該子進程中的p.run() 2 p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法 3 p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵尸進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會
被釋放,進而導致死鎖 4 p.is_alive():如果p仍然運行,返回True 5 p.join([timeout]):主線程等待p終止。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程
屬性: 1 p.daemon:默認值為False,如果設為True,代表p為后臺運行的守護進程。設定為True后,p不能創建自己的新進程,必須在p.start()之前設置 2 p.name:進程的名稱 3 p.pid:進程的pid 4 p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可) 5 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性(了解即可)
Windows操作系統中由于沒有fork(linux操作系統中創建進程的機制),因此用process()直接創建子進程會無限遞歸創建。所以必須在創建子進程的部分使用
if __name__ ==‘__main__’ 判斷保護起來,import 的時候就不會遞歸運行了。
Process創建子進程
#單獨創建子進程 1;
import os
from multiprocessing import Process
def func1(name):
print('hello', name)
print(f"我是子進程:{name},我的ID是:{os.getpid()},我的父進程id是:{os.getppid()}")
def func2():
print('hello')
if __name__ == '__main__':
p1 = Process(target=func1, args=('測試',)) # 此處傳參必須是元組數據類型
p1.start()
print(f"我是父進程:{os.getpid()}")
p2 = Process(target=func2)
p2.start()
# 執行結果
'''
我是父進程:52164
hello
我是子進程:測試,我的ID是:70088,我的父進程id是:52164
'''
#單獨創建子進程 2:
# 通過繼承Process類的形式開啟進程的方式
import os
from multiprocessing import Process
class MyProcess(Process):
#需要的傳參必須寫入到__init__方法里面且必須加上super().__init__();因為父類Process里面也有__init__方法。
def __init__(self, name):
super().__init__()
self.name = name
def run(self): #固定名字run !
print(os.getpid())
print('%s 正在聊天' % self.name)
if __name__ == '__main__':
p1 = MyProcess('xiaobai_1')
p2 = MyProcess('xiaohei_2')
p1.start()
p2.start()
#結果
'''
10688
xiaobai_1 正在聊天
72912
xiaohei_2 正在聊天
'''
#多進程 1
#默認情況下,創建好的子進程是隨機啟動的,子進程的執行順序不是根據自動順序決定的
import time
from multiprocessing import Process
def func(name):
print("hello 進程 %d" % name )
time.sleep(1)
if __name__ == '__main__':
for i in range(10):
p = Process(target=func, args=(i,))
p.start()
#結果
'''
hello 進程 0
hello 進程 5
hello 進程 3
hello 進程 8
hello 進程 1
hello 進程 7
hello 進程 6
hello 進程 2
hello 進程 9
hello 進程 4
'''
#多進程 2
#創建好子進程后將其放入鏈表中,按順序啟動
import time
from multiprocessing import Process
def func(name):
print("hello 進程 %d" % name )
time.sleep(0.1)
if __name__ == '__main__':
p_lst = []
for i in range(10):
p = Process(target=func, args=(i,))
p_lst.append(p)
p.start()
p.join() # 加上join方法后,父進程就會阻塞等待子進程結束而結束。
print("父進程執行中")
#結果
'''
hello 進程 0
hello 進程 1
hello 進程 2
hello 進程 3
hello 進程 4
hello 進程 5
hello 進程 6
hello 進程 7
hello 進程 8
hello 進程 9
父進程執行中
'''
2、multiprocessing.Pool模塊(進程池)
Pool類描述了一個工作進程池,進程池內部維護一個進程序列,當使用時就去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。
如果需要多個子進程時可以考慮使用進程池(pool)來管理。pool創建子進程的方法與Process不同,是通過p.apply_async(func,args=(args))實現,一個池子里能同時運行的任務是取決你電腦的cpu數量,如我的電腦現在是有4個cpu,那會子進程task0,task1,task2,task3可以同時啟動,task4則在之前的一個某個進程結束后才開始。
Pool(processes: Optional[int] = ...,initializer: Optional[Callable[..., Any]] = ..., initargs: Iterable[Any] = ..., maxtasksperchild: Optional[int] = ...) processes :使用的工作進程的數量,如果processes是None那么使用 os.cpu_count()返回的數量。 initializer: 如果initializer是None,那么每一個工作進程在開始的時候會調用initializer(*initargs)。 maxtasksperchild:工作進程退出之前可以完成的任務數,完成后用一個心的工作進程來替代原進程,來讓閑置的資源被釋放。maxtasksperchild默認是None,意味著只 要Pool存在工作進程就會一直存活。 方法: apply(func[, args[, kwds]]) :使用arg和kwds參數調用func函數,結果返回前會一直阻塞,apply_async()更適合并發執行。 apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :會返回一個結果對象。callback可以接收一個參數然后被調用,調用失敗時則用error_callback替換callback。 close() : 阻止更多的任務提交到pool,待任務完成后,工作進程會退出。 terminate() : 不管任務是否完成,立即停止工作進程。在對pool對象進程垃圾回收的時候,會立即調用terminate()。 join() : wait工作線程的退出,在調用join()前,必須調用close() or terminate()。這樣是因為被終止的進程需要被父進程調用wait(join等價與wait),否則進程會成為僵尸進程。 map(func, iterable[, chunksize]) map_async(func, iterable[, chunksize[, callback[, error_callback]]]) imap(func, iterable[, chunksize]) imap_unordered(func, iterable[, chunksize]) starmap(func, iterable[, chunksize]) starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
#apply同步執行:阻塞式
from multiprocessing import Pool
import time,os
def f1(i):
time.sleep(0.5)
print("%s開始執行,進程號為%d"%(i,os.getpid()))
return i + 100
if __name__ == "__main__":
pool = Pool(2) #開啟2個進程
for i in range(1,10): #會循環調用創建的2個進程來執行任務
pool.apply(func=f1,args=(i,))
pool.close()
pool.join()
'''
2開始執行,進程號為5832
4開始執行,進程號為5832
6開始執行,進程號為5832
8開始執行,進程號為5832
1開始執行,進程號為6848
3開始執行,進程號為6848
5開始執行,進程號為6848
7開始執行,進程號為6848
9開始執行,進程號為6848
'''
#apply_async異步執行:非阻塞式
from multiprocessing import Pool
import os,time,random
def worker(msg):
print("%s開始執行,進程號為%d"%(msg,os.getpid()))
time.sleep(random.random()*2) # random.random()隨機成0-1的浮點數
print(msg,"執行完畢")
return 'worker'+str(msg)
def f2(arg):
print(arg)
if __name__ == '__main__':
po = Pool(3) # 最大的進程數為3
for i in range(0,10): #每次循環將會用空閑出來的子進程去調用目標
po.apply_async(worker,(i,),callback=f2)
print("----start----")
po.close() # 關閉進程池,關閉后po不再接受新的請求
po.join() # 等待po中的所有子進程執行完成,必須放在close語句之后,如果沒有添加join(),會導致有的代碼沒有運行就已經結束了
print("-----end-----")
'''
----start----
worker2
worker1
worker0
worker4
worker6
worker3
worker5
worker8
worker9
worker7
2開始執行,進程號為3156
2 執行完畢
3開始執行,進程號為3156
3 執行完畢
8開始執行,進程號為3156
8 執行完畢
0開始執行,進程號為716
0 執行完畢
5開始執行,進程號為716
5 執行完畢
9開始執行,進程號為716
9 執行完畢
1開始執行,進程號為6896
1 執行完畢
4開始執行,進程號為6896
4 執行完畢
6開始執行,進程號為6896
6 執行完畢
7開始執行,進程號為6896
7 執行完畢
-----end-----
'''
from multiprocessing.pool import Pool
def hhh(i):
return i * 2
def job1(z):
return z[0]* z[1]
if __name__ == '__main__':
pool = Pool(2)
hh = pool.map(hhh, [i for i in range(16)]) #map()會將第二個參數列表元素一個個的傳入第一個參數的函數中
res = pool.map(job1, [(2, 3), (3, 4)])
print(hh)
print(res)
from multiprocessing.pool import Pool
import time
def add_test (i):
time.sleep(1)
print(i * i)
if __name__ == "__main__":
pool = Pool(3)
pool.map_async(add_test, [i for i in range(16)])
pool.close()
pool.join()
3、守護進程
主進程創建守護進程,守護進程會隨著主進程的結束而結束。被置為守護進程的子進程加了join()(起到阻塞作用),那么主進程會等子進程都運行完。
守護進程是一種在系統后臺執行的程序,它獨立于控制終端并且執行一些周期任務或觸發事件。假如你編寫了一個python服務程序,并且在命令行下啟動,而你的命令行會話又被終端所控制,python服務成了終端程序的一個子進程。因此如果你關閉了終端,這個命令行程序也會隨之關閉。要使你的python服務不受終端影響而常駐系統,就需要將它變成守護進程。
import time
from multiprocessing import Process
def foo():
print(123)
time.sleep(3)
print("end123")
def bar():
print(456)
time.sleep(2)
print("end456")
if __name__ == '__main__':
p1=Process(target=foo)
p2=Process(target=bar)
p1.daemon=True #守護進程
p1.start()
# p1.join()
p2.start()
p2.join()
#打印該行則主進程代碼結束,則守護進程p1應該被終止.可能p1執行的打印信息任務會因為主進程打?。╩ain----)被終止.
print("-----main-----")
#結果:
#如果foo的用時小于bar,那么foo可以正常完成執行
"""
456
end456
-----main-----
"""
四、進程間通信/同步
1、為什么進程之間需要通信?
- 數據傳輸。一個進程需要將它的數據發送給另一個進程;
- 資源共享。多個進程之間共享同樣的資源;
- 事件通知。一個進程需要向另一個或一組進程發送消息,通知它們發生了某種事件;
- 進程控制。有些進程希望完全控制另一個進程的執行(如Debug進程),該控制進程希望能夠攔截另一個進程的所有操作,并能夠及時知道它的狀態改變。
2、進程間通信的幾種主要手段:
- 管道(Pipe):管道可用于具有親緣關系進程間的通信,有名管道克服了管道沒有名字的限制,因此,除具有管道所具有的功能外,它還允許無親緣關系進程間的通信;
- 信號(Signal):信號是比較復雜的通信方式,用于通知接受進程有某種事件發生,除了用于進程間通信外,進程還可以發送信號給進程本身;
- 消息隊列(Message):消息隊列是消息的鏈接表,有足夠權限的進程可以向隊列中添加消息,被賦予讀權限的進程則可以讀走隊列中的消息。消息隊列克服了信號承載信息量少,管道只能承載無格式字節流以及緩沖區大小受限等缺點。
- 共享內存:使得多個進程可以訪問同一塊內存空間,是最快的可用IPC形式。是針對其他通信機制運行效率較低而設計的。往往與其它通信機制,如信號量結合使用,來達到進程間的同步及互斥。
- 信號量(semaphore):主要作為進程間以及同一進程不同線程之間的同步手段。
- 套接口(Socket):更為一般的進程間通信機制,可用于不同機器之間的進程間通信。起初是由Unix系統的BSD分支開發出來的,但現在一般可以移植到其它類Unix系統上:Linux和System V的變種都支持套接字。

3、Semaphore
信號量,是在進程同步過程中一個比較重要的角色??梢钥刂婆R界資源的數量,保證各個進程之間的互斥和同步??梢杂脕砜刂茖蚕碣Y源的訪問數量,例如池的最大連接數。
實例:演示一下進程之間利用Semaphore做到同步和互斥,以及控制臨界資源數量
from multiprocessing import Process, Semaphore, Lock, Queue
import time
buffer = Queue(10)
empty = Semaphore(2)
full = Semaphore(0)
lock = Lock()
class Consumer(Process):
def run(self):
global buffer, empty, full, lock
while True:
full.acquire()
lock.acquire()
buffer.get()
print('Consumer pop an element')
time.sleep(1)
lock.release()
empty.release()
class Producer(Process):
def run(self):
global buffer, empty, full, lock
while True:
empty.acquire()
lock.acquire()
buffer.put(1)
print('Producer append an element')
time.sleep(1)
lock.release()
full.release()
if __name__ == '__main__':
p = Producer()
c = Consumer()
p.daemon = c.daemon = True
p.start()
c.start()
p.join()
c.join()
print 'Ended!'
如上代碼實現了注明的生產者和消費者問題,定義了兩個進程類,一個是消費者,一個是生產者。
定義了一個共享隊列,利用了Queue數據結構,然后定義了兩個信號量,一個代表緩沖區空余數,一個表示緩沖區占用數。
生產者Producer使用empty.acquire()方法來占用一個緩沖區位置,然后緩沖區空閑區大小減小1,接下來進行加鎖,對緩沖區進行操作。然后釋放鎖,然后讓代表占用的緩沖區位置數量+1,消費者則相反。
運行結果如下:
Producer append an element Producer append an element Consumer pop an element Consumer pop an element Producer append an element Producer append an element Consumer pop an element Consumer pop an element Producer append an element Producer append an element Consumer pop an element Consumer pop an element Producer append an element Producer append an element
可以發現兩個進程在交替運行,生產者先放入緩沖區物品,然后消費者取出,不停地進行循環。
4、Lock
當多個進程使用同一份數據資源的時候,就會引發數據安全或順序混亂問題。
具體場景:所有的任務在打印的時候都會向同一個標準輸出(stdout)輸出。這樣輸出的字符會混合在一起,無法閱讀。使用Lock同步,在一個任務輸出完成之后,再允許另一個任務輸出,可以避免多個任務同時向終端輸出。
# 多進程搶占輸出資源
import os
import time
import random
from multiprocessing import Process
def work(n):
print('%s: %s is running' % (n, os.getpid()))
time.sleep(random.random())
print('%s: %s is done' % (n, os.getpid()))
if __name__ == '__main__':
for i in range(3):
p = Process(target=work, args=(i,))
p.start()
# 執行結果
"""
0: 14316 is running
1: 9900 is running
2: 10056 is running
1: 9900 is done
2: 10056 is done
0: 14316 is done
"""
# 使用鎖維護執行順序
# 執行結果 """ 0: 15276 is running 0: 15276 is done 1: 6360 is running 1: 6360 is done 2: 14776 is running 2: 14776 is done """
上面這種情況雖然使用加鎖的形式實現了順序的執行,但是程序又重新變成串行了。加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行修改,犧牲了速度卻保證了數據的安全性。
我們一種能夠兼顧效率高和幫我們處理好鎖問題,這就是mutiprocessing模塊為我們提供的基于消息的進程間通信機制:隊列和管道。隊列和管道都是將數據存放于內存中,隊列又是基于(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來。
與多線程不同,多進程之間不會共享全局變量,所以多進程通信需要借助“外力”。這些常用的外力有:Pipe,Queue、Value/Array,Manager。多個進程時,通常使用消息傳遞來進行進程之間的通信,對于傳遞消息可以使用Pipe()(用于兩個進程之間的連接)或隊列Queue(允許多個生產者和消費者)。
我們應該盡可能使用管道和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可擴展性。
5、Pipe通信
Pipe常用來在兩個進程間通信,兩個進程分別位于管道的兩端。管道是可以同時發送和接受消息的。Queue適用于絕大多數場景,為滿足普遍性而不得不多方考慮,它因此顯得“重”。Pipe更為輕巧,速度更快
multiprocessing.Pipe([duplex])
(con1, con2) = Pipe()
con1管道的一端,負責存儲,也可以理解為發送信息
con2管道的另一端,負責讀取,也可以理解為接受信息
from multiprocessing import Process, Pipe
def send(pipe):
pipe.send(['spam'] + [42, 'egg']) # send 傳輸一個列表
pipe.close()
if __name__ == '__main__':
(con1, con2) = Pipe() # 創建兩個 Pipe 實例
sender = Process(target=send, args=(con1, )) # 函數的參數,args 一定是實例化之后的 Pipe 變量,不能直接寫 args=(Pip(),)
sender.start() # Process 類啟動進程
print(f"con2 got: {con2.recv()}") # 管道的另一端 con2 從send收到消息
con2.close()
from multiprocessing import Process, Pipe
import time
def consumer(p, name):
left, right = p
left.close()
while True:
try:
baozi = right.recv()
print('%s 收到包子:%s' % (name, baozi))
except EOFError:
right.close()
break
def producer(seq, p):
left, right = p
right.close()
for i in seq:
left.send(i)
time.sleep(1)
else:
left.close()
if __name__ == '__main__':
left, right = Pipe()
c1 = Process(target=consumer, args=((left, right), 'c1'))
c1.start()
seq = (i for i in range(10))
producer(seq, (left, right))
right.close()
left.close()
c1.join()
print('進程間通信-管道-主進程')
管道是可以同時發送和接受消息的:
from multiprocessing import Process, Pipe
def talk(pipe):
pipe.send(dict(name='Bob', spam=42)) # 傳輸一個字典
reply = pipe.recv() # 接收傳輸的數據
print('talker got:', reply)
if __name__ == '__main__':
(parentEnd, childEnd) = Pipe() # 創建兩個 Pipe() 實例,也可以改成 conf1, conf2
child = Process(target=talk, args=(childEnd,)) # 創建一個 Process 進程,名稱為 child
child.start() # 啟動進程
print('parent got:', parentEnd.recv()) # parentEnd 是一個 Pip() 管道,可以接收 child Process 進程傳輸的數據
parentEnd.send({x * 2 for x in 'spam'}) # parentEnd 是一個 Pip() 管道,可以使用 send 方法來傳輸數據
child.join() # 傳輸的數據被 talk 函數內的 pip 管道接收,并賦值給 reply
print('parent exit')
'''
parent got: {'name': 'Bob', 'spam': 42}
talker got: {'ss', 'mm', 'pp', 'aa'}
parent exit
'''
6、Queue
1)、Queue
Queue.put()放數據,默認有block=True和timeout兩個參數。當block=True時,寫入是阻塞式的,阻塞時間由timeout確定。當隊列q被(其他線程)寫滿后,這段代碼就會阻塞,直至其他線程取走數據。Queue.put()方法加上 block=False 的參數,即可解決這個隱蔽的問題。但要注意,非阻塞方式寫隊列,當隊列滿時會拋出 exception Queue.Full 的異常
Queue.get():取出隊列中目前排在最前面的數據(默認阻塞),Queue.get([block[, timeout]])獲取隊列,timeout等待時間
Queue.full() #判斷隊列是否滿了
Queue.empty() #判斷隊列是否為空
Queue.qsize() 返回隊列的大小 ,不過在 Mac OS 上沒法運行。
from multiprocessing import Queue
from multiprocessing import Process
def new_put(q):
q.put('你好')
def new_get(q):
ret = q.get()
print(ret)
if __name__ == '__main__':
q = Queue()
p = Process(target=new_put, args=(q, ))
p.start()
print(p.name)
g = Process(target=new_get, args=(q, ))
g.start()
print(g.name)
from multiprocessing import Process, Queue
def producer(q): # 生產
for i in range(1, 6):
q.put(i) # 添加一個任務
print("生產%s饅頭" % i)
def consumer(q): # 消費
while 1:
sth = q.get()
print("消費%s饅頭" % sth)
if __name__ == '__main__':
q = Queue(4)
p1 = Process(target=producer, args=(q, ))
p2 = Process(target=producer, args=(q, ))
p3 = Process(target=producer, args=(q, ))
c1 = Process(target=consumer, args=(q, ))
c2 = Process(target=consumer, args=(q, ))
# 將消費者設置為守護進程,因為消費者里面是死循環
c1.daemon = True
c2.daemon = True
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
print("stop")
import time
from multiprocessing import Pool,Manager
def producer(queue):
queue.put("a")
time.sleep(2)
def consumer(queue):
time.sleep(2)
data = queue.get()
print(data)
if __name__ == "__main__":
queue = Manager().Queue()
pool = Pool() #pool中的進程間通信需要使用Manager
pool.apply_async(producer,args=(queue,))
pool.apply_async(consumer, args=(queue,))
pool.close()
2)、JoinableQueue
JoinableQueue比Queue多了task_done和join方法
q.task_done() 使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。也就是put取出了,計數-1。
q.join() 生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。
from multiprocessing import JoinableQueue, Queue
q = JoinableQueue()# 用法和Queue相似
q.put("ocean") # 隊列放入一個任務,內存在一個計數機制,+1
print(q.get())
q.task_done() # 完成一次任務,計數機制-1
q.join() # 計數機制不為0的時候,阻塞等待計數器為0后通過
from multiprocessing import Process, JoinableQueue
def producer(q): # 生產
for i in range(1, 6):
q.put(i) # 添加一個任務
print("生產%s饅頭" % i)
def consumer(q): # 消費
while 1:
sth = q.get()
print("消費%s饅頭" % sth)
name__ == '__main__':
# 繼承了Queue,多了兩個功能,join() task_done()
q = JoinableQueue(4)
p1 = Process(target=producer, args=(q, ))
p2 = Process(target=producer, args=(q, ))
p3 = Process(target=producer, args=(q, ))
c1 = Process(target=consumer, args=(q, ))
c2 = Process(target=consumer, args=(q, ))
# 將消費者設置為守護進程,因為消費者里面是死循環
c1.daemon = True
c2.daemon = True
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
print("stop")
7、Array,Value
無論是Value()還是Array(),第一個參數都是typecode或type。typecode表示類型碼,在Python中已經預先設計好了,如”c“表示char類型,“i”表示singed int類型等等。這種方式不易
記憶,建議用type表達類型,這里需要借助ctypes模塊。
# typecode int_typecode = Value("i", 512) float_typecode = Value("f", 1024.0) char_typecode = Value("c", b"a") # 第二個參數是byte型 # type import ctypes int_type = Value(ctypes.c_int, 512) float_type = Value(ctypes.c_float, 1024.0) char_type = Value(ctypes.c_char, b"a") # 第二個參數是byte型
import multiprocessing
def func(num):
num.value=10.78 #子進程改變數值的值,主進程跟著改變
if __name__=="__main__":
num=multiprocessing.Value("d",10.0) # d表示數值,主進程與子進程共享這個value。(主進程與子進程都是用的同一個value)
print(num.value) #10.0
p=multiprocessing.Process(target=func,args=(num,))
p.start()
p.join()
print(num.value) #10.78
import multiprocessing
def func(num):
num[2]=9999 #子進程改變數組,主進程跟著改變
if __name__=="__main__":
num=multiprocessing.Array("i",[1,2,3,4,5]) #主進程與子進程共享這個數組
print(num[:]) #[1, 2, 3, 4, 5]
p=multiprocessing.Process(target=func,args=(num,))
p.start()
p.join()
print(num[:]) #[1, 2, 9999, 4, 5]
from multiprocessing import Process,Array
def agk(i,arr):
arr[i]=i+100
if __name__=="__main__":
target=Array("i",10)
#target=Array('i',rang(10)) #[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
print(target[:]) #[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
for i in range(10):
t=Process(target=agk,args=(i,target))
t.start()
t.join()
print(target[:]) #[100, 101, 102, 103, 104, 105, 106, 107, 108, 109] 主進程可以取到子進程運行的數據
from multiprocessing import Process, Value, Array
import ctypes
def producer(num, string):
num.value = 1024 #int型
string[0] = b"z" # 只能一個一個的賦值,byte型
string[1] = b"t"
string[2] = b"y"
def consumer(num, string):
print(num.value)
print(b"".join(string))
if __name__ == "__main__":
num = Value(ctypes.c_int, 512)
string = Array(ctypes.c_char, 3) # 設置一個長度為3的數組,字符串需要通過Array實現,而不是Value。
proProcess = Process(target=producer, args=(num, string))
conProcess = Process(target=consumer, args=(num, string))
proProcess.start()
conProcess.start()
8、Manager
Python中進程間共享數據除了基本的pipe、queue、value+array外,還提供了更高層次的封裝。Queue和管道Pipe都僅能在進程之間傳遞數據,不能修改數據,multiprocessing.Manager可以實現在進程之間同時修改一份數據。Manager()返回的manager對象控制了一個server進程,此進程包含的python對象可以被其他的進程通過proxies來訪問。從而達到多進程間數據通信且安全。Manager支持的類型有:
dict= Manager().dict() # 字典對象
list=Manager().list() #列表
queue = Manager().Queue() # 隊列
lock = Manager().Lock() # 普通鎖
rlock = Manager().RLock() # 可沖入鎖
cond = Manager().Condition() # 條件鎖
semaphore = Manager().Semaphore() # 信號鎖
event = Manager().Event() # 事件鎖
namespace = Manager().Namespace() # 命名空間
from multiprocessing import Process,Manager
def f(d,n):
d["name"] = "zhangyanlin"
d["age"] = 18
d["Job"] = "pythoner"
n.reverse()
if __name__ == "__main__":
with Manager() as man:
d = man.dict()
n = man.list(range(10))
p = Process(target=f,args=(d,n))
p.start()
p.join()
print(d)
print(n)
#結果
'''
{'name': 'zhangyanlin', 'age': 18, 'Job': 'pythoner'}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
'''
from multiprocessing import Process,Pool,Manager
#通過Queue獲取子進程的值,并將他們組成列表
def new_put(q): #放入隊列
q.put('你好')
#j_list=[]
def new_get(q): #從隊列中取出
ret = q.get()
#在此處加入列表是沒用的,每個進程都單獨把值加入了列表的拷貝中,主進程中j_list還是[]
#j_list.append(ret)
return ret
j_list=[]
def ru():
manager = Manager()
q = manager.Queue()
for i in range(3):
p = Process(target=new_put, args=(q, ))
p.start()
p = Pool(3)
for j in range(3):
g = p.apply_async(func=new_get, args=(q, ))
j_list.append(g.get())#通過get()獲取進程函數的返回值并加入列表
if __name__ == '__main__':
ru()
print(j_list)
'''
['你好', '你好', '你好']
'''
#獲取進程函數返回值
from multiprocessing import Pool
def sayHi(num):
print('hi')
return num
def mycallback(x):
print(x)
list1.append(x)
if __name__ == '__main__':
pool = Pool(4)
list1 = []
for i in range(4):
pool.apply_async(sayHi, (i,), callback=mycallback)
pool.close()
pool.join()
print(list1)#[0, 1, 2, 3]
from multiprocessing import Pool
def test(p):
return p
if __name__=="__main__":
pool = Pool(processes=5)
result=[]
for i in range(10):
pp=pool.apply_async(test, args=(i,))#維持執行的進程總數為10,當一個進程執行完后添加新進程
result.append(pp)
pool.close()
pool.join()
result_r=[]
for i in result:
result_r.append(i.get())
print(result_r) #[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
import multiprocessing
from multiprocessing import Manager
def worker(procnum, returns):
print(str(procnum) + ' represent!')
returns.append(procnum)
return returns
if __name__ == '__main__':
manager = Manager()
return_list = manager.list() #也可以使用dict
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i, return_list))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print(return_list)
'''
3 represent!
2 represent!
1 represent!
0 represent!
4 represent!
[2, 3, 1, 0, 4]
'''
from multiprocessing import Process,Manager
import os
def func1(shareList,shareValue,shareDict,lock,i):
with lock:
shareValue.value+=1
shareDict[1]='1'
shareDict[2]='2'
shareList[i%5]=i+12
print("%s開始執行,進程號為%d"%(i,os.getpid()))
if __name__ == '__main__':
manager=Manager()
list1=manager.list([1,2,3,4,5])
dict1=manager.dict()
array1=manager.Array('i',range(10))
value1=manager.Value('i',1)
lock=manager.Lock()
proc=[]
for i in range(10): #生成10個進程
m=Process(target=func1,args=(list1,value1,dict1,lock,i))
m.start()
proc.append(m)
for p in proc:
p.join()
print(list1)
print(dict1)
print(array1)
print(value1)
'''
6開始執行,進程號為11532
2開始執行,進程號為7604
4開始執行,進程號為11596
0開始執行,進程號為8392
1開始執行,進程號為16564
3開始執行,進程號為2904
5開始執行,進程號為14280
7開始執行,進程號為12152
8開始執行,進程號為13252
9開始執行,進程號為13120
[17, 13, 19, 20, 21]
{1: '1', 2: '2'}
array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
Value('i', 11)
'''
from selenium import webdriver
from multiprocessing.dummy import Process, Manager
import time
def worker(i, return_list):
browser=webdriver.Chrome()
return_list.append(browser)
time.sleep(1)
print(i)
return_list[i].get('http://selenium-python.readthedocs.io')
if __name__ == '__main__':
manager = Manager()
return_list = manager.list() #也可以使用dict
jobs = []
for i in range(3):
p = Process(target=worker, args=(i, return_list))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print(return_list)
五、multiprocessing.dummy模塊(線程)
multiprocessing.dummy 是multiprocessing的一個子庫,可以實現多線程,用法與multiprocessing基本相同。multiprocessing.dummy是對threading的封裝。. threading 和 multiprocess.dummy的功能一樣,都是多線程。只是multiprocess.dummy的接口與multiprocess的接口相同,使得很方便在多進程與多線程間切換。python的多線程只會使用一個cpu,大多用在處理IO密集型程序中;如果希望處理計算密集型程序,應該使用多進程。
getpid打印自身進程號,getppid打印父進程進程號。
multiprocessing.current_process().name 獲取進程名
multiprocessing.current_process().ident 獲取進程號
threading.currentThread().name 獲取線程名
threading.currentThread().ident 獲取線程號
multiprocessing.dummy.current_process().name 獲取線程名
multiprocessing.dummy.current_process().ident 獲取線程號
1、子線程無返回值
Multiprocessing.dummy.Pool() 與Multiprocessing.Pool() 的用法一樣
非阻塞方法--線程并發執行
multiprocessing.dummy.Pool.apply_async() 和 multiprocessing.dummy.Pool.imap()
阻塞方法---線程順序執行
multiprocessing.dummy.Pool.apply()和 multiprocessing.dummy.Pool.map()
from multiprocessing.dummy import Pool as Pool
import time
def func(msg):
print('msg:', msg)
time.sleep(2)
print('end:')
pool = Pool(processes=3)
for i in range(1, 5):
msg = 'hello %d' % (i)
pool.apply_async(func, (msg,)) # 非阻塞
# pool.apply(func,(msg,)) # 阻塞,apply()源自內建函數,用于間接的調用函數,并且按位置把元祖或字典作為參數傳入。
# pool.imap(func,[msg,]) # 非阻塞, 注意與apply傳的參數的區別
# pool.map(func, [msg, ]) # 阻塞
print('Mark~~~~~~~~~~~~~~~')
pool.close()
pool.join() # 調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束
print('sub-process done')
2、子線程有返回值
與多進程一樣,只有multiprocessing.dummy.Pool.apply_async()可以有返回值,apply,map,imap不可以設置返回值.
六、threading模塊(線程)
threading 模塊通過對 thread 進行二次封裝,提供了更方便的 api 來處理線程。
- threading.currentThread(): 返回當前的線程變量。
- threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
- threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
1、threading.Thread
Thread(group: None = ..., target: (*args: Any, **kwargs: Any) -> Any | None = ..., name: str | None = ..., args: Iterable = ..., kwargs:
Mapping[str, Any] | None = ..., *, daemon: bool | None = ...) Thread的參數: * group *應該為None; 當實現ThreadGroup類時保留給以后的擴展。 * target *是要由run()方法調用的可調用對象。 默認為無,表示什么都不會被調用。 * name *是線程名稱。 默認情況下,唯一名稱的格式為“ Thread-N”,其中N是一個小十進制數字。 * args *是目標調用的參數元組。 默認為()。 * kwargs *是用于目標調用的關鍵字參數字典。 默認為{}。 如果子類覆蓋了該構造函數,則必須確保在對線程執行其他任何操作之前,先調用基類的構造函數(Thread .__ init __())。 Thread的方法: t.start() : 激活線程, t.getName() : 獲取線程的名稱 t.setName() : 設置線程的名稱 t.name : 獲取或設置線程的名稱 t.is_alive() : 判斷線程是否為激活狀態 t.isAlive() :判斷線程是否為激活狀態 t.setDaemon() 設置為后臺線程或前臺線程(即是否為守護線程,默認為False)必須在執行start()方法之后才可以使用。 t.isDaemon() : 判斷是否為守護線程 t.ident :獲取線程的標識符。線程標識符是一個非零整數,只有在調用了start()方法之后該屬性才有效,否則它只返回None。 t.join() :逐個執行每個線程,執行完畢后繼續往下執行,該方法使得多線程變得無意義 t.run() :線程被cpu調度后自動執行線程對象的run方法
#線程開啟方法1 from threading import Thread # 創建線程的模塊 def task(name): print(name) if __name__ == '__main__': # 開啟線程 參數1:方法名(不要帶括號) 參數2:參數(元祖) 返回對象 p = Thread(target=task, args=('線程1',)) p.start() # 只是給操作系統發送了一個就緒信號,并不是執行。操作系統接收信號后安排cpu運行 print('主') #線程開啟方法2 - 類的方法 from threading import Thread # 創建線程的模塊 class MyThread(Thread): def __init__(self, name): super().__init__() self.name = name def run(self): # 固定名字run !??!必須用固定名 print(self.name) if __name__ == '__main__': # 必須要這樣啟動 p = MyThread('子線程1') p.start() print('主)
七、線程通信/同步
python多線程中提供Lock 、Rlock 、Semaphore 、Event 、Condition 用來保證線程之間的同步。
- Lock & RLock:互斥鎖,用來保證多線程訪問共享變量的問題
- Semaphore對象:Lock互斥鎖的加強版,可以被多個線程同時擁有,而Lock只能被某一個線程同時擁有。
- Event對象:它是線程間通信的方式,相當于信號,一個線程可以給另外一個線程發送信號后讓其執行操作。
- Condition對象:其可以在某些事件觸發或者達到特定的條件后才處理數據
多個線程競爭一個資源 - 保護臨界資源 - 鎖(Lock/RLock)
多個線程競爭多個資源(線程數>資源數) - 信號量(Semaphore)
多個線程的調度 - 暫停線程執行/喚醒等待中的線程 - Condition
2、threading.RLock和threading.Lock
Lock鎖是Python的原始鎖,在鎖定時不屬于任何一個線程。在調用了 lock.acquire() 方法后進入鎖定狀態,lock.release()方法可以解鎖。一個線程上的Lock鎖,可以由另外線程解鎖。如果是RLock則報錯。RLock允許在同一線程中被多次acquire。而Lock卻不允許這種情況。
RLock被稱為重入鎖,RLock鎖是一個可以被同一個線程多次 acquire 的鎖,不論同一個線程調用了多少次的acquire,最后它都必須調用相同次數的 release 才能完全釋放鎖,這個時候其他的線程才能獲取這個鎖。重入鎖必須由獲取它的線程釋放,一旦線程獲得了重入鎖,同一個線程再次獲取它將不阻塞。
#未使用鎖
import threading
import time
num = 0
def show():
global num
num +=1
time.sleep(1)
print(f'num={num}')
def dos():
for i in range(5):
t = threading.Thread(target=show)
t.start()
print('main thread stop')
if __name__ == '__main__':
dos()
'''
main thread stop
num=5num=5num=5
num=5
num=5
'''
import threading
import time
num = 0
lock = threading.RLock()
def Func():
lock.acquire() # 獲得鎖
global num
num += 1
time.sleep(1)
print(f'num={num}')
lock.release() #釋放鎖
def dos():
for i in range(5):
t = threading.Thread(target=Func)
t.start()
print('main thread stop')
if __name__ == '__main__':
dos()
'''
main thread stop
num=1
num=2
num=3
num=4
num=5
'''
3、threading.Event
python線程的事件用于主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。
event = threading.Event()
event.clear() # 重置event,將“Flag”置為 False,使得所有該event事件都處于待命狀態
event.wait() # 等待接收event的指令,決定是否阻塞程序執行
event.set()# 發送event指令,將“Flag”置為True,使所有設置該event事件的線程執行
event.isSet() #判斷標識位是否為Ture。
#當線程執行的時候,如果flag為False,則線程會阻塞,當flag為True的時候,線程不會阻塞。它提供了本地和遠程的并發性。
from threading import Event,Thread
def dosomthing(event):
print('start')
event.wait()
print('execute')
event_obj = Event()
def des():
for i in range(5):
t = Thread(target=dosomthing, args=(event_obj,))
t.start()
if input('input:') == 'True':
event_obj.clear() #重置event,使得event.wait()起到阻塞作用
des()
event_obj.set()
else:
event_obj.clear()# 重置event,使得event.wait()起到阻塞作用
des()
'''
input:True
start
start
start
start
start
execute
execute
execute
execute
execute
input:i
start
start
start
start
start
'''
4、threading.Condition
Condition類實現了一個conditon變量,這個變量允許一個或多個線程等待,直到他們被另一個線程通知。
cond = threading.Condition()
cond.acquire() #進入鎖定狀態
cond.release() #解鎖
cond.wait() # 等待指定觸發,同時會釋放對鎖的獲取,直到被notify才重新占有瑣。
cond.notify() # 發送指定,觸發執行
import threading
import time
con = threading.Condition()
num = 0
# 生產者
class Producer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global num
con.acquire() # 鎖定線程
print ("開始添加!")
while num<=5:
num += 1
print(f"鍋里有{num}個魚丸")
time.sleep(1)
if num >= 5:
print ("火鍋滿了,快吃魚丸!")
con.notify() # 喚醒等待的線程
con.wait()# 等待通知
con.release()# 釋放鎖
# 消費者
class Consumers(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
con.acquire()
global num
print ("開始吃啦!")
for i in range(10):
num -= 1
print (f"剩余{num}個魚丸")
time.sleep(1)
if num <= 0:
print ("鍋里沒貨了,趕緊加魚丸!")
con.notify() # 喚醒其它線程
con.wait()# 等待通知
con.release()# 釋放鎖
p = Producer()
c = Consumers()
p.start()
c.start()
5、threading與queue模塊
from queue import Queue
q = Queue(maxsize=0)
# 默認阻塞程序,等待隊列消息,可設置超時時間
q.get(block=True, timeout=None)
# 發送消息:默認會阻塞程序至隊列中有空閑位置放入數據
q.put(item, block=True, timeout=None)
# 等待所有的消息都被消費完
q.join()
# 通知隊列任務處理已經完成,當所有任務都處理完成時,join() 阻塞將會解除
q.task_done()
# 查詢當前隊列的消息個數
q.qsize()
# 隊列消息是否都被消費完,返回 True/False
q.empty()
# 檢測隊列里消息是否已滿
q.full()
import time
import queue
# 下面來通過多線程來處理Queue里面的任務:
def work(q):
while True:
if q.empty():
return
else:
t = q.get()
print(f"當前線程sleep {t} 秒")
time.sleep(t)
def main():
q = queue.Queue()
for i in range(5):
q.put(i) # 往隊列里生成消息
# 單線程
work(q)
if __name__ == "__main__":
start = time.time()
main()
print('耗時:', time.time() - start)
'''
當前線程sleep 0 秒
當前線程sleep 1 秒
當前線程sleep 2 秒
當前線程sleep 3 秒
當前線程sleep 4 秒
耗時: 10.023889064788818
'''
import threading
import time
import queue
# 下面來通過多線程來處理Queue里面的任務:
def work(q):
while True:
if q.empty():
return
else:
t = q.get()
print(f"當前線程sleep {t} 秒")
time.sleep(t)
def main():
q = queue.Queue()
for i in range(5):
q.put(i) # 往隊列里生成消息
#多線程
thread_num = 5
threads = []
for i in range(thread_num):
t = threading.Thread(target=work, args=(q,))
threads.append(t)
for i in range(thread_num):
threads[i].start()
for i in range(thread_num):
threads[i].join()
if __name__ == "__main__":
start = time.time()
main()
print('耗時:', time.time() - start)
'''
當前線程sleep 0 秒
當前線程sleep 1 秒
當前線程sleep 2 秒當前線程sleep 3 秒
當前線程sleep 4 秒
耗時: 4.0168352127075195
'''
6、默認線程、守護線程、阻塞線程
守護線程:子線程會隨著主線程的結束而結束,無論子線程是否執行完畢
阻塞線程:主線程會等待子線程的執行結束,才繼續執行
#在Python中,默認情況下主線程執行完自己的任務后就退出了,此時子線程會繼續執行自己的任務,直到自己的任務結束
import threading
import time
def thread():
time.sleep(2)
print('---子線程結束---')
def main():
t1 = threading.Thread(target=thread)
t1.start()
print('---主線程結束---')
if __name__ =='__main__':
main()
'''
---主線程結束---
---子線程結束---
'''
#當我們使用setDaemon(True)時,這時子線程為守護線程,主線程一旦執行結束,則全部子線程被強制終止
import threading
import time
def test_thread():
time.sleep(3)
print('---子線程結束---')
def main():
t1 = threading.Thread(target=test_thread,daemon=True) #設置子線程守護主線程
t1.start()
print('---主線程結束---')
if __name__ =='__main__':
main() #主線程結束,子線程來不及執行就被強制結束
'''
---主線程結束---
'''
#線程阻塞,即主線程任務結束以后,進入堵塞狀態,一直等待所有的子線程結束以后,主線程再終止
import threading
import time
def thread():
time.sleep(5)
print('---子線程結束---')
def main():
t1 = threading.Thread(target=thread)
t1.start()
t1.join()
print('---主線程結束---')
if __name__ =='__main__':
main() #主線程任務結束以后,進入堵塞狀態,一直等待所有的子線程結束以后,主線程再終止
'''
---子線程結束---
---主線程結束---
'''
在Python中,線程阻塞和線程同步是多線程編程中的兩個重要概念,它們之間有著密切的關系但又有所區別。 線程阻塞 線程阻塞指的是線程在執行過程中,因為某些條件未滿足而暫停執行,等待條件滿足后再繼續執行的過程。阻塞通常發生在以下幾種情況: 等待資源:如等待I/O操作完成(讀寫文件、網絡通信等)。 等待鎖:在嘗試獲取一個已經被其他線程持有的鎖時,線程會被阻塞,直到該鎖被釋放。 調用sleep()函數:主動讓出CPU控制權,等待指定時間后恢復執行。 線程同步 線程同步則是指在多線程環境中,為了防止多個線程同時訪問同一份資源造成數據的不一致性或沖突,引入的一種機制來控制線程的執行順序。常見的線程同步工具包括互斥鎖(Lock)、信號量(Semaphore)、條件變量(Condition)等。 關系 線程阻塞往往是線程同步機制的一個結果或者實現手段。例如,當一個線程試圖獲取一個已被其他線程鎖定的資源時,該線程會被阻塞,直到鎖被釋放,這個過程實際上就是一種線程同步的體現。通過這種阻塞和喚醒機制,可以確保多個線程安全地訪問共享資源,避免了諸如競態條件等問題。 簡而言之,線程同步關注的是如何協調多個線程對共享資源的訪問,以保證數據的一致性和完整性;而線程阻塞是實現這一目標過程中,線程因等待某種條件而暫停執行的一種狀態。二者相輔相成,共同維護多線程程序的正確執行。


浙公網安備 33010602011771號