python必坑知識點(并發(fā)編程)
1 進程和線程
線程,是計算機中可以被cpu調(diào)度的最小單元(真正在工作)。
進程,是計算機資源分配的最小單元(進程為線程提供資源)。
一個進程中可以有多個線程,同一個進程中的線程可以共享此進程中的資源。
通過 進程 和 線程 都可以將 串行 的程序變?yōu)?并發(fā)
2 多進程和多線程
多進程的開銷要比多線程的開銷大
from multiprocessing import Process
from threading import Thread
2.1 線程常見方法
`t.start()`,當前線程準備就緒(等待CPU調(diào)度,具體時間是由CPU來決定)。
`t.join()`,等待當前線程的任務(wù)執(zhí)行完畢后再向下繼續(xù)執(zhí)行。
`t.setDaemon(布爾值)` ,守護線程(必須放在start之前)
- `t.setDaemon(True)`,設(shè)置為守護線程,主線程執(zhí)行完畢后,子線程也自動關(guān)閉。
- `t.setDaemon(False)`,設(shè)置為非守護線程,主線程等待子線程,子線程執(zhí)行完畢后,主線程才結(jié)束。(默認)
線程名稱的設(shè)置和獲取
import threading
def task(arg):
# 獲取當前執(zhí)行此代碼的線程
name = threading.current_thread().getName()
print(name)
for i in range(10):
t = threading.Thread(target=task, args=(11,))
t.setName('日魔-{}'.format(i))
t.start()
自定義線程類,直接將線程需要做的事寫到run方法中。
import threading
class MyThread(threading.Thread):
def run(self):
print('執(zhí)行此線程', self._args) # self._args獲取傳入的參數(shù)
t = MyThread(args=(100,))
t.start()
3 GIL全局鎖
全局解釋器鎖(Global Interpreter Lock),CPython特有,讓一個進程中同一個時刻只能有一個線程可以被CPU調(diào)用
如果程序想利用 計算機的多核優(yōu)勢,讓CPU同時處理一些任務(wù),適合用多進程開發(fā)(即使資源開銷大)。
如果程序不利用 計算機的多核優(yōu)勢,適合用多線程開發(fā)。
- 計算密集型,用多進程,例如:大量的數(shù)據(jù)計算【累加計算示例】。
- IO密集型,用多線程,例如:文件讀寫、網(wǎng)絡(luò)數(shù)據(jù)傳輸【下載抖音視頻示例】。
4 線程安全
一個進程中可以有多個線程,且線程共享所有進程中的資源。
多個線程同時去操作一個"東西",可能會存在數(shù)據(jù)混亂的情況
5 線程鎖
Lock和RLock
RLock支持多次申請鎖和多次釋放;Lock不支持。開發(fā)中常用RLock。
-
Lock,同步鎖。import threading num = 0 lock_object = threading.Lock() def task(): print("開始") lock_object.acquire() # 第1個抵達的線程進入并上鎖,其他線程就需要再此等待。 global num for i in range(1000000): num += 1 lock_object.release() # 線程出去,并解開鎖,其他線程就可以進入并執(zhí)行了 print(num) for i in range(2): t = threading.Thread(target=task) t.start() -
RLock,遞歸鎖。import threading num = 0 lock_object = threading.RLock() def task(): print("開始") lock_object.acquire() # 第1個抵達的線程進入并上鎖,其他線程就需要再此等待。 global num for i in range(1000000): num += 1 lock_object.release() # 線程出去,并解開鎖,其他線程就可以進入并執(zhí)行了 print(num) for i in range(2): t = threading.Thread(target=task) t.start()
6 死鎖
死鎖,由于競爭資源或者由于彼此通信而造成的一種阻塞的現(xiàn)象。
兩種情況
1.單個線程中連續(xù)的獲取鎖
2.兩個或多個線程手上拿著對方需要的鎖
7 線程池
Python3中官方才正式提供線程池。
線程不是開的越多越好,開的多了可能會導(dǎo)致系統(tǒng)的性能更低了。
不建議:無限制的創(chuàng)建線程。
建議:使用線程池
# 等待線程池的任務(wù)執(zhí)行完畢
pool.shutdown(True)
# 任務(wù)執(zhí)行完后,在干點其他事
def done(response):
print("任務(wù)執(zhí)行后的返回值", response.result())
future = pool.submit(task, url)
future.add_done_callback(done)
# 或者
future_list = []
future = pool.submit(task, url)
future_list.append(future)
pool.shutdown(True)
for fu in future_list:
print(fu.result())
8 單例模式
面試題:手寫單例模式
單例模式:每次實例化類的對象時,都是最開始創(chuàng)建的那個對象,不再重復(fù)創(chuàng)建對象。
-
簡單實現(xiàn)
class Singleton: instance = None def __init__(self, name): self.name = name def __new__(cls, *args, **kwargs): # 返回空對象 if cls.instance: return cls.instance cls.instance = object.__new__(cls) return cls.instance obj1 = Singleton('alex') obj2 = Singleton('SB') print(obj1,obj2) -
多線程實現(xiàn)(加鎖)
import threading class Singleton: instance = None lock = threading.RLock() def __init__(self, name): self.name = name def __new__(cls, *args, **kwargs): # 加性能 if cls.instance: return cls.instance with cls.lock: if cls.instance: return cls.instance cls.instance = object.__new__(cls) return cls.instance def task(): obj = Singleton('x') print(obj) for i in range(10): t = threading.Thread(target=task) t.start()
9 進程
進程相關(guān)必須在if __name__ == '__main__':下運行
關(guān)于在Python中基于multiprocessiong模塊操作的進程:
Depending on the platform, multiprocessing supports three ways to start a process. These start methods are
fork,【“拷貝”幾乎所有資源】【支持文件對象/線程鎖等傳參】【unix】【任意位置開始】【快】
The parent process uses
os.fork()to fork the Python interpreter. The child process, when it begins, is effectively identical to the parent process. All resources of the parent are inherited by the child process. Note that safely forking a multithreaded process is problematic.Available on Unix only. The default on Unix.spawn,【run參數(shù)傳必備資源】【不支持文件對象/線程鎖等傳參】【unix、win】【main代碼塊開始】【慢】
The parent process starts a fresh python interpreter process. The child process will only inherit those resources necessary to run the process object’s
run()method. In particular, unnecessary file descriptors and handles from the parent process will not be inherited. Starting a process using this method is rather slow compared to using fork or forkserver.Available on Unix and Windows. The default on Windows and macOS.forkserver,【run參數(shù)傳必備資源】【不支持文件對象/線程鎖等傳參】【部分unix】【main代碼塊開始】
When the program starts and selects the forkserver start method, a server process is started. From then on, whenever a new process is needed, the parent process connects to the server and requests that it fork a new process. The fork server process is single threaded so it is safe for it to use
os.fork(). No unnecessary resources are inherited.Available on Unix platforms which support passing file descriptors over Unix pipes.
import multiprocessing
multiprocessing.set_start_method("spawn")
官方文檔:https://docs.python.org/3/library/multiprocessing.html
9.1 案例
import multiprocessing
def task():
print(name)
file_object.write("alex\n")
file_object.flush()
if __name__ == '__main__':
multiprocessing.set_start_method("fork")
name = []
file_object = open('x1.txt', mode='a+', encoding='utf-8')
file_object.write("武沛齊\n")
p1 = multiprocessing.Process(target=task)
p1.start()
# 文件內(nèi)容
"""
武沛齊
alex
武沛齊
"""
import multiprocessing
def task():
print(name)
file_object.write("alex\n")
file_object.flush()
if __name__ == '__main__':
multiprocessing.set_start_method("fork")
name = []
file_object = open('x1.txt', mode='a+', encoding='utf-8')
file_object.write("武沛齊\n")
file_object.flush()
p1 = multiprocessing.Process(target=task)
p1.start()
# 文件內(nèi)容
"""
武沛齊
alex
"""
import multiprocessing
import threading
import time
def func():
print("來了") # 阻塞,等待主線程task釋放鎖
with lock:
print(666)
time.sleep(1)
def task():
# 拷貝的鎖也是被申請走的狀態(tài)
# 被誰申請走了? 被子進程中的主線程申請走了
for i in range(10):
t = threading.Thread(target=func)
t.start()
time.sleep(2)
lock.release()
if __name__ == '__main__':
multiprocessing.set_start_method("fork")
name = []
lock = threading.RLock()
lock.acquire()
# print(lock)
# lock.acquire() # 申請鎖
# print(lock)
# lock.release()
# print(lock)
# lock.acquire() # 申請鎖
# print(lock)
p1 = multiprocessing.Process(target=task)
p1.start()
進程的名稱的設(shè)置和獲取
import os
import time
import threading
import multiprocessing
def func():
time.sleep(3)
def task(arg):
for i in range(10):
t = threading.Thread(target=func)
t.start()
print(os.getpid(), os.getppid())
print("線程個數(shù)", len(threading.enumerate()))
time.sleep(2)
print("當前進程的名稱:", multiprocessing.current_process().name)
if __name__ == '__main__':
print(os.getpid())
multiprocessing.set_start_method("spawn")
p = multiprocessing.Process(target=task, args=('xxx',))
p.name = "哈哈哈哈"
p.start()
print("繼續(xù)執(zhí)行...")
自定義進程類,直接將線程需要做的事寫到run方法中。
import multiprocessing
class MyProcess(multiprocessing.Process):
def run(self):
print('執(zhí)行此進程', self._args)
if __name__ == '__main__':
multiprocessing.set_start_method("spawn")
p = MyProcess(args=('xxx',))
p.start()
print("繼續(xù)執(zhí)行...")
CPU個數(shù),程序一般創(chuàng)建多少個進程?(利用CPU多核優(yōu)勢)。
import multiprocessing
multiprocessing.cpu_count()
import multiprocessing
if __name__ == '__main__':
count = multiprocessing.cpu_count()
for i in range(count - 1):
p = multiprocessing.Process(target=xxxx)
p.start()
9.2 數(shù)據(jù)共享
Shared memory
Data can be stored in a shared memory map using Value or Array. For example, the following code
'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint, (其u表示無符號)
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double
from multiprocessing import Process, Value, Array
def func(n, m1, m2):
n.value = 888
m1.value = 'a'.encode('utf-8')
m2.value = "武"
if __name__ == '__main__':
num = Value('i', 666)
v1 = Value('c')
v2 = Value('u')
p = Process(target=func, args=(num, v1, v2))
p.start()
p.join()
print(num.value) # 888
print(v1.value) # a
print(v2.value) # 武
from multiprocessing import Process, Value, Array
def f(data_array):
data_array[0] = 666
if __name__ == '__main__':
arr = Array('i', [11, 22, 33, 44]) # 數(shù)組:元素類型必須是int; 只能是這么幾個數(shù)據(jù)。
p = Process(target=f, args=(arr,))
p.start()
p.join()
print(arr[:])
Server process
A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.append(666)
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list()
p = Process(target=f, args=(d, l))
p.start()
p.join()
print(d)
print(l)
9.3 交換
multiprocessing supports two types of communication channel between processes
Queues

The Queue class is a near clone of queue.Queue. For example
import multiprocessing
def task(q):
for i in range(10):
q.put(i)
if __name__ == '__main__':
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=task, args=(queue,))
p.start()
p.join()
print("主進程")
print(queue.get())
print(queue.get())
print(queue.get())
print(queue.get())
print(queue.get())
Pipes

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:
import time
import multiprocessing
def task(conn):
time.sleep(1)
conn.send([111, 22, 33, 44])
data = conn.recv() # 阻塞
print("子進程接收:", data)
time.sleep(2)
if __name__ == '__main__':
parent_conn, child_conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=task, args=(child_conn,))
p.start()
info = parent_conn.recv() # 阻塞
print("主進程接收:", info)
parent_conn.send(666)
9.4 進程鎖
import time
import multiprocessing
import os
def task(lock):
print("開始")
lock.acquire()
# 假設(shè)文件中保存的內(nèi)容就是一個值:10
with open('f1.txt', mode='r', encoding='utf-8') as f:
current_num = int(f.read())
print(os.getpid(), "排隊搶票了")
time.sleep(0.5)
current_num -= 1
with open('f1.txt', mode='w', encoding='utf-8') as f:
f.write(str(current_num))
lock.release()
if __name__ == '__main__':
multiprocessing.set_start_method("spawn")
lock = multiprocessing.RLock()
process_list = []
for i in range(10):
p = multiprocessing.Process(target=task, args=(lock,))
p.start()
process_list.append(p)
# spawn模式,需要特殊處理。
for item in process_list:
item.join()
9.5 進程池
進程回調(diào)函數(shù)跟線程不同,進程回調(diào)是由主進程調(diào)用的
import time
from concurrent.futures import ProcessPoolExecutor
def task(num):
print("執(zhí)行", num)
time.sleep(2)
if __name__ == '__main__':
pool = ProcessPoolExecutor(4)
for i in range(10):
pool.submit(task, i)
# 等待進程池中的任務(wù)都執(zhí)行完畢后,再繼續(xù)往后執(zhí)行。
pool.shutdown(True)
print(1)
import time
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
def task(num):
print("執(zhí)行", num)
time.sleep(2)
return num
def done(res):
print(multiprocessing.current_process())
time.sleep(1)
print(res.result())
time.sleep(1)
if __name__ == '__main__':
pool = ProcessPoolExecutor(4)
for i in range(50):
fur = pool.submit(task, i)
fur.add_done_callback(done) # done的調(diào)用由主進程處理(與線程池不同)
print(multiprocessing.current_process())
pool.shutdown(True)
注意:如果在進程池中要使用進程鎖,則需要基于Manager中的Lock和RLock來實現(xiàn)。
import time
import multiprocessing
from concurrent.futures.process import ProcessPoolExecutor
def task(lock):
print("開始")
with lock:
# 假設(shè)文件中保存的內(nèi)容就是一個值:10
with open('f1.txt', mode='r', encoding='utf-8') as f:
current_num = int(f.read())
print("排隊搶票了")
time.sleep(1)
current_num -= 1
with open('f1.txt', mode='w', encoding='utf-8') as f:
f.write(str(current_num))
if __name__ == '__main__':
pool = ProcessPoolExecutor()
# lock_object = multiprocessing.RLock() # 不能使用
manager = multiprocessing.Manager()
lock_object = manager.RLock() # Lock
for i in range(10):
pool.submit(task, lock_object)
10 協(xié)程
計算機中提供了:線程、進程 用于實現(xiàn)并發(fā)編程(真實存在)。
協(xié)程(Coroutine),是程序員通過代碼搞出來的一個東西(非真實存在)。
協(xié)程也可以被稱為微線程,是一種用戶態(tài)內(nèi)的上下文切換技術(shù)。
簡而言之,其實就是通過一個線程實現(xiàn)代碼塊相互切換執(zhí)行(來回跳著執(zhí)行)。
協(xié)程、線程、進程的區(qū)別?
線程,是計算機中可以被cpu調(diào)度的最小單元。
進程,是計算機資源分配的最小單元(進程為線程提供資源)。
一個進程中可以有多個線程,同一個進程中的線程可以共享此進程中的資源。
由于CPython中GIL的存在:
- 線程,適用于IO密集型操作。
- 進程,適用于計算密集型操作。
協(xié)程,協(xié)程也可以被稱為微線程,是一種用戶態(tài)內(nèi)的上下文切換技術(shù),在開發(fā)中結(jié)合遇到IO自動切換,就可以通過一個線程實現(xiàn)并發(fā)操作。
可以實現(xiàn)在線程中遇到IO操作時,自動在代碼塊中相互切換執(zhí)行
所以,在處理IO操作時,協(xié)程比線程更加節(jié)省開銷(協(xié)程的開發(fā)難度大一些)。
11 總結(jié)
案例:多線程socket服務(wù)端
-
服務(wù)端
import socket import threading def task(conn): while True: client_data = conn.recv(1024) data = client_data.decode('utf-8') print("收到客戶端發(fā)來的消息:", data) if data.upper() == "Q": break conn.sendall("收到收到".encode('utf-8')) conn.close() def run(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('127.0.0.1', 8001)) sock.listen(5) while True: # 等待客戶端來連接(主線程) conn, addr = sock.accept() # 創(chuàng)建子線程 t = threading.Thread(target=task, args=(conn,)) t.start() sock.close() if __name__ == '__main__': run() -
客戶端
import socket # 1. 向指定IP發(fā)送連接請求 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(('127.0.0.1', 8001)) while True: txt = input(">>>") client.sendall(txt.encode('utf-8')) if txt.upper() == 'Q': break reply = client.recv(1024) print(reply.decode("utf-8")) # 關(guān)閉連接,關(guān)閉連接時會向服務(wù)端發(fā)送空數(shù)據(jù)。 client.close()
11.2 并發(fā)和并行
-
串行,多個任務(wù)排隊按照先后順序逐一去執(zhí)行。
-
并發(fā),假設(shè)有多個任務(wù),只有一個CPU,那么在同一時刻只能處理一個任務(wù),為了避免串行,可以讓將任務(wù)切換運行(每個任務(wù)運行一點,然后再切換),達到并發(fā)效果(看似都在同時運行)。
并發(fā)在Python代碼中體現(xiàn):協(xié)程、多線程(由于CPython的GIL鎖限制,多個線程無法被CPU調(diào)度)。 -
并行,假設(shè)有多個任務(wù),有多個CPU,那么同一時刻每個CPU都是執(zhí)行一個任務(wù),任務(wù)就可以真正的同時運行。
并行在Python代碼中的體現(xiàn):多進程。
11.3 單例模式
在python開發(fā)和源碼中關(guān)于單例模式有兩種最常見的編寫方式,分別是:
-
基于
__new__方法實現(xiàn)import threading import time class Singleton: instance = None lock = threading.RLock() def __init__(self): self.name = "武沛齊" def __new__(cls, *args, **kwargs): if cls.instance: return cls.instance with cls.lock: if cls.instance: return cls.instance cls.instance = object.__new__(cls) return cls.instance obj1 = Singleton() obj2 = Singleton() print(obj1 is obj2) # True -
基于模塊導(dǎo)入方式
# utils.py class Singleton: def __init__(self): self.name = "武沛齊" ... single = Singleton()from xx import single print(single) from xx import single print(single)

浙公網(wǎng)安備 33010602011771號