Python復習
最近一直在搞Rust,時間一長,Python許多東西反倒不那么熟悉了。不過這正好是個把Python系統復習一遍的機會。
Python中一切皆對象
一切皆對象是好多面相對象的語言都提的一句話,不過Python中是真一切皆對象。包括常見的函數和類,也是對象,作為Python中的一等公民,可以['被賦值給一個變量', '添加到集合對象中', '作為函數參數/返回值']。
type、object、class的關系
- 所有類都繼承object,包括type
- type實例化出了所有對象,包括object和它自己
魔法函數
class A:
def __init__(self, names: list):
self.names = names
def __getitem__(self, item):
return self.names[item]
// 無法打開迭代器時會嘗試調用__getitem__方法
for i in A([1, 2, 3, 4, 5, 6]):
print(i)
常見魔法函數
-
字符串相關
# __repr__ # __str__ class A: def __init__(self, names: list): self.names = names def __getitem__(self, item): return self.names[item] def __str__(self): return ', '.join([str(n) for n in self.names]) def __repr__(self): return f'from __repr__ {self.__str__()}' a = A([1, 2, 3, 4, 5, 6]) b = A([7, 8, 9, 10]) a.names.extend(b) # __getitem__ print(a) # __str__ a # __repr__ -
集合、序列相關
# __len__ # __getitem__ # __setitem__ # __delitem__ # __contains__ -
迭代相關
# __iter__ # __next__ -
可調用
# __call__ -
with上下文管理器
# __enter__ # __exit__ -
數值相關
# __abs__ # __bool__ # __int__ # __float__ # __hash__ # __index__ -
元類相關
# __new__ # __init__ -
屬性相關
# __getattr__, __setattr__ # __getattribute, __setattribute__ # __dir__ -
屬性描述符
# __get__ # __set__ # __delete__ -
協程
# __await__ # __aiter__ # __anext__ # __aenter__ # __aexit__
Python中的類和對象
鴨子類型
即在分類時更關注對象的行為。Golang中的面向接口編程應該也借鑒了這種思想(Golang的錯誤處理太丑了)。我覺得這一點是克服面向對象編程中某些冗雜問題而保持靈活性的很關鍵的一點,但是我更傾向于直接移除繼承這個特性,繼承真的是多數混亂的開始,它帶來的問題簡直比它帶來的便捷更多。
抽象基類
import abc
import random
class Payment(metaclass=abc.ABCMeta):
@abc.abstractmethod
def pay(self):
...
class WeChatPay(Payment):
def pay(self):
print('use WeChatPay')
class AliPay(Payment):
def pay(self):
print('use AliPay')
def get_payment():
x = random.randint(0, 1)
if x:
return WeChatPay()
return AliPay()
a = get_payment()
a.pay()
類變量和對象變量
class A:
class_var = 'A'
def __init__(self):
self.inst_var = 'self'
a = A()
A.class_var = 'class_var_changed_by_class'
print(a.inst_var, a.class_var, A.class_var)
# 當實例訪問自己不存在的變量時可以查找讀取類(或父類)的變量
# 但當實例嘗試修改(賦值)自己不存在的變量時,會為實例創建該變量并賦值
a.class_var = 'self'
print(a.inst_var, a.class_var, A.class_var)
# 注意由于python中MRO的深度優先和廣度優先都存在問題,所以已改為C3算法
class D:
...
class C(D):
...
class B(D):
...
class A(B, C):
...
print(A.__mro__)
數據封裝
python中沒有其他語言中那么多或者那么細分的權限修飾關鍵詞,但是仍可以通過語法約定私有屬性。
class A:
def __init__(self, x):
self.__x = x
def get_x(self):
return self.__x
a = A(99)
print(a.get_x())
print(a._A__x) # 雙下劃線開頭的私有屬性并非以絕對方式禁止實例直接讀取,而更像是一種約定
自省
python可以通過一定方式獲取對象當前的內部結構
class A:
x = 'A'
class B(A):
def __init__(self, x):
self.x = x
b = B('self')
print(b.__dict__)
print(A.__dict__)
dir(b)
super
class A:
def __init__(self):
print('A')
self.x = 'A'
class B(A):
def __init__(self):
print('B')
self.x = 'B'
super().__init__()
b = B()
print(b.x)
super()其實并非調用父類的方法,而是從MRO列表中找到給定類的之后的第一個類查找要調用的方法
class D:
def __init__(self):
print('D')
class B(D):
def __init__(self):
print('B')
super().__init__()
class C(D):
def __init__(self):
print('C')
super().__init__()
class A(B, C):
def __init__(self):
print('A')
super().__init__()
a = A()
mixin模式
這種模式和新興語言中的去繼承、使用組合或者嵌入這類思想有異曲同工之妙。每個mixin類只實現單一功能,擺脫和其他類的強關聯。這樣,使用者可以自由組合各種功能而無需擔心多繼承帶來的屬性混亂。
with上下文管理器協議
class A:
def __enter__(self):
print('__enter__')
if not hasattr(self, 'x'):
self.x = 'x' # 僅為演示,不應該在__init__方法外定義實例屬性
return self
def __exit__(self, *_args):
print('__exit__')
def say_x(self):
print(self.x)
with A() as a:
a.say_x()
還可以通過contextmanager來聚合上下文管理器協議的實現
from contextlib import contextmanager
@contextmanager
def open_source(source):
print(f'{source} open')
yield
print(f'{source} close')
with open_source('source') as obj:
print('process with source')
元類編程
property動態屬性
class A:
def __init__(self, x):
self._x = x
@property
def x(self):
return self._x
@x.setter
def x(self, val):
self._x = val
a = A(123)
print(a.x)
a.x = 456
print(a.x)
屬性相關魔法函數
class A:
def __init__(self, x, y):
self.x = x
self.y = y
# 當查找不到屬性時進入此方法
def __getattr__(self, item):
print(item)
print('attribute not found')
# 當查找屬性時會優先無條件進入__getattribute__
# 但很多時候不應該自定義此方法
a = A(123, 456)
a.z
屬性描述符
class CheckStr:
def __get__(self, instance, owner):
return self.value
def __set__(self, instance, value):
if not isinstance(value, int):
raise ValueError
self.value = value
def __delete__(self, instance):
...
class A:
to_check = CheckStr()
a = A()
# 此處需要注意,屬性查找順序
# 1.當屬性出現在類或父類的__dict__中且為data descriptor,優先進入__get__方法
# 2.當屬性出現在obj的__dict__中直接返回obj.__dict__[屬性名]
# 3.當屬性出現在類或父類的__dict__中且為non-data descriptor,進入non-data descriptor的__get__方法
# 4.當屬性出現在類或父類的__dict__中且不是屬性描述符時返回cls.__dict__[屬性名]
# 5.如果有__getattr__則進入
# 6.拋出AttributeError
a.to_check = 123
print(a.to_check)
自定義元類
class Field:
...
class CHarField(Field):
def __init__(self, db_column='', max_length=10):
self.db_column = db_column
self._value = None
if not isinstance(max_length, int):
raise TypeError('max_length should be int')
if max_length <= 0:
raise ValueError('max_length should be positive')
self.max_length = max_length
def __get__(self, instance, owner):
return self._value
def __set__(self, instance, value):
if not isinstance(value, str):
raise TypeError(f'expected str instance, {type(value)} found')
if len(value) > self.max_length:
raise ValueError(f'exceed maximum length')
self._value = value
class IntField(Field):
def __init__(self, db_column='', min_value=0, max_value=100):
self.db_column = db_column
self._value = None
if not (isinstance(min_value, int) and isinstance(max_value, int)):
raise TypeError('min_value and max_value should be both int')
if max_value < min_value:
raise ValueError('max_value < min_value')
self.min_value = min_value
self.max_value = max_value
def __get__(self, instance, owner):
return self._value
def __set__(self, instance, value):
if not isinstance(value, int):
raise TypeError(f'expected int instance, {type(value)} found')
if value < self.min_value or value > self.max_value:
raise ValueError(f'value should between {self.min_value} and {self.max_value}')
self._value = value
class ModelMetaClass(type):
def __new__(cls, *args, **kwargs):
name, bases, attrs = args
if name == 'BaseModel':
return super().__new__(cls, *args, **kwargs)
field = dict()
for k, v in attrs.items():
if isinstance(v, Field):
field[k] = v
_meta = dict()
db_table = name.lower()
if (attrs_meta := attrs.get('Meta', None)) and (table := getattr(attrs_meta, 'db_table', None)):
db_table = table
_meta['db_table'] = db_table
attrs['_meta'] = _meta
attrs['field'] = field
return super().__new__(cls, name, bases, attrs, **kwargs)
class BaseModel(metaclass=ModelMetaClass):
def __init__(self, *args, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
super().__init__()
def save(self):
fields = list()
values = list()
for k, v in self.field.items():
col = getattr(v, 'db_column', None) or k.lower()
fields.append(col)
values.append(str(getattr(self, k)))
sql = f'insert {self._meta["db_table"]}({", ".join(fields)}) value({", ".join(values)})'
sql = f'"{sql}"'
print(sql)
class User(BaseModel):
name = CHarField(db_column='', max_length=10)
age = IntField(db_column='', min_value=0, max_value=100)
class Meta:
db_table = 'user'
if __name__ == '__main__':
user = User(name='abc', age=18)
user.save()
迭代器和生成器
可迭代對象和迭代器
# Iterable實現了__iter__
class Iterable(Protocol[_T_co]):
@abstractmethod
def __iter__(self) -> Iterator[_T_co]: ...
# Iterator增加實現了__next__
class Iterator(Iterable[_T_co], Protocol[_T_co]):
@abstractmethod
def __next__(self) -> _T_co: ...
def __iter__(self) -> Iterator[_T_co]: ...
from collections.abc import Iterable, Iterator
class IteratorA:
def __init__(self, li):
self.li = li
self.index = 0
def __next__(self):
try:
res = self.li[self.index]
except IndexError:
raise StopIteration
self.index += 1
return res
class A:
def __init__(self, a):
self.a = a
def __iter__(self):
return IteratorA(self.a)
if __name__ == '__main__':
a = A([1, 2, 3])
iter_a = iter(a)
while True:
try:
print(next(iter_a))
except StopIteration:
break
生成器
函數中出現yield關鍵字即稱該函數為生成器函數。
python中調用函數創建的棧幀對象分配在堆內存上,即棧幀對象不依賴于調用行為,在回收前可以獨立存在。
def gen():
a = 1
yield a
a += 1
yield a
a += 1
yield a
return a
if __name__ == '__main__':
import dis
g = gen()
print(dis.dis(g))
'''
2 0 LOAD_CONST 1 (1)
2 STORE_FAST 0 (a)
3 4 LOAD_FAST 0 (a)
6 YIELD_VALUE
8 POP_TOP
4 10 LOAD_FAST 0 (a)
12 LOAD_CONST 1 (1)
14 INPLACE_ADD
16 STORE_FAST 0 (a)
5 18 LOAD_FAST 0 (a)
20 YIELD_VALUE
22 POP_TOP
6 24 LOAD_FAST 0 (a)
26 LOAD_CONST 1 (1)
28 INPLACE_ADD
30 STORE_FAST 0 (a)
7 32 LOAD_FAST 0 (a)
34 YIELD_VALUE
36 POP_TOP
8 38 LOAD_FAST 0 (a)
40 RETURN_VALUE
None
'''
print(g.gi_frame.f_lasti) # -1
print(g.gi_frame.f_locals) # {}
next(g)
print(g.gi_frame.f_lasti) # 6
print(g.gi_frame.f_locals) # {'a': 1}
next(g)
print(g.gi_frame.f_lasti) # 20
print(g.gi_frame.f_locals) # {'a': 2}
next(g)
print(g.gi_frame.f_lasti) # 34
print(g.gi_frame.f_locals) # {'a': 3}
def read_file(f, sep, length):
buf = ''
while True:
while sep in buf:
pos = buf.index(sep)
yield buf[:pos]
buf = buf[pos+len(sep):]
chunk = f.read(length)
if not chunk:
yield buf
break
buf += chunk
多線程、多進程
多線程通信
import time
import threading
import wx
keep_going = True
lock = threading.Lock()
class A(threading.Thread):
def __init__(self, name, tasks):
super().__init__(name=name)
self.tasks = tasks
self.setDaemon(True)
@staticmethod
def handle_task(task):
print(f'開始處理任務{task}')
time.sleep(1)
print(f'任務{task}處理完成')
def run(self) -> None:
global keep_going
global lock
task = None
for task in self.tasks:
lock.acquire()
if not keep_going:
lock.release()
break
lock.release()
self.handle_task(task)
print(f'任務處理已停止/結束,當前任務{task}')
class TestFrame(wx.Frame):
def __init__(self, parent, max_seconds, task_handler: A):
wx.Frame.__init__(self, parent, -1)
self.max_seconds = max_seconds * 4
self.task_handler = task_handler
self.run()
def run(self):
dlg = wx.ProgressDialog("實例", "內容", maximum=self.max_seconds, parent=self,
style=0 | wx.PD_APP_MODAL | wx.PD_CAN_ABORT | wx.PD_ESTIMATED_TIME |
wx.PD_REMAINING_TIME)
global keep_going
count = 0
self.task_handler.start()
while keep_going and count < self.max_seconds:
count += 1
wx.MilliSleep(250)
wx.Yield()
global lock
lock.acquire()
if count >= self.max_seconds / 2:
(keep_going, skip) = dlg.Update(count, "時間過半")
else:
(keep_going, skip) = dlg.Update(count)
lock.release()
dlg.Destroy()
print(f'當前剩余時間{(self.max_seconds-count) // 4}秒')
self.Destroy()
class App(wx.App):
def OnInit(self):
task_handler = A('task_handler', list(range(1, 21)))
frame = TestFrame(None, 20, task_handler)
frame.Show()
return True
if __name__ == '__main__':
app = App()
app.MainLoop()
線程同步
存在數據競爭的場景或者臟讀敏感的場景應該加鎖,對于典型的生產者消費者模型,可以用Queue實現線程間安全通信。
在同一個線程中有多次acquire的需求,可以用RLock。
Condition
復雜的線程間(尤其是線程交替邏輯比較多的)通訊可以使用Condition,注意start順序。
Condition的鎖:wait()的時候會釋放Condition層的鎖,并在deque中放入一把臨時鎖,其他拿到Condition層鎖的地方可以繼續執行并在調用notify()時取出Queue中的臨時鎖并釋放。
import threading
cond = threading.Condition()
class A(threading.Thread):
def __init__(self, cond: threading.Condition):
super().__init__(name='A')
self.cond = cond
self.rang = iter(range(0, 10, 2))
def run(self) -> None:
with self.cond:
print(f'{self.name}: {next(self.rang)}')
self.cond.notify()
for i in self.rang:
self.cond.wait()
print(f'{self.name}: {i}')
self.cond.notify()
class B(threading.Thread):
def __init__(self, cond):
super().__init__(name='B')
self.cond = cond
self.rang = iter(range(1, 10, 2))
def run(self) -> None:
with self.cond:
for i in self.rang:
self.cond.wait()
print(f'{self.name}: {i}')
self.cond.notify()
if __name__ == '__main__':
a = A(cond)
b = B(cond)
b.start()
a.start()
a.join()
b.join()
Semaphore
import threading
import time
cond = threading.Condition()
class A(threading.Thread):
def __init__(self, task, semaphore: threading.Semaphore):
super().__init__()
self.task = task
self.semaphore = semaphore
def run(self) -> None:
print(f'開始處理任務{self.task}')
time.sleep(2)
self.semaphore.release()
print(f'任務{self.task}處理結束')
class B(threading.Thread):
def __init__(self, semaphore: threading.Semaphore):
super().__init__()
self.task = range(0, 12)
self.semaphore = semaphore
def run(self) -> None:
tmp_li = list()
for i in self.task:
self.semaphore.acquire()
tmp_thread = A(i, self.semaphore)
tmp_thread.start()
tmp_li.append(tmp_thread)
_ = [t.join() for t in tmp_li if t.is_alive()]
print('全部結束')
if __name__ == '__main__':
semaphore = threading.Semaphore(4)
b = B(semaphore)
b.start()
線程池
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
def handle_task(task):
print(f'開始處理任務{task}')
time.sleep(2)
print(f'任務{task}處理結束')
return f'任務{task}處理結束'
if __name__ == '__main__':
tasks = range(10)
executor = ThreadPoolExecutor(max_workers=3)
handle_threads = [executor.submit(handle_task, task) for task in tasks]
# handle_results = [handle_thread.result() for handle_thread in handle_threads]
# print(handle_results)
# as_completed返回生成器,且執行完一個線程yield一個結果
for handle_thread in as_completed(handle_threads):
print(f'全局已知:{handle_thread.result()}')
print('全局退出')
多進程
對于IO操作(即壓力在硬盤、內存、網絡等外部瓶頸上的情況)更密集的場景,多進程是沒有優勢的,反而可能因為切換或者調度的成本而顯劣勢,此時使用多線程會更合適。而對于CPU計算密集的任務,多進程會有優勢。
import time
from concurrent.futures import ThreadPoolExecutor, as_completed, ProcessPoolExecutor
def fib(n):
if n <= 2:
return 1
return fib(n-2) + fib(n-1)
if __name__ == '__main__':
tasks = range(30, 40)
start_time = time.time()
with ThreadPoolExecutor() as executor:
handle_threads = [executor.submit(fib, task) for task in tasks]
for handle_thread in as_completed(handle_threads):
print(f'thread:{handle_thread.result()}')
thread_time = time.time()
print(f'ThreadPoolExecutor time: {thread_time - start_time}')
with ProcessPoolExecutor() as executor:
handle_processes = [executor.submit(fib, task) for task in tasks]
for handle_process in as_completed(handle_processes):
print(f'process:{handle_process.result()}')
print(f'ProcessPoolExecutor time: {time.time() - thread_time}')
multiprocessing
import multiprocessing
def fib(n):
if n <= 2:
return 1
return fib(n-2) + fib(n-1)
if __name__ == '__main__':
pool = multiprocessing.Pool(multiprocessing.cpu_count() - 1)
for result in pool.imap_unordered(fib, [36, 22, 40, 16]):
print(result)
print('全局退出')
Manager
multiprocessing里實現了可以進程間通信的Queue(不能用于pool進程池),可以使用Manager().Queue()來用于進程池通信。Manager中也維護了Python中常見的值和容器結構,可以用于進程間的安全通信。
import time
from multiprocessing import Pool, Manager
def put(q):
q.put(123)
time.sleep(2)
def get(q):
time.sleep(2)
print(q.get())
if __name__ == '__main__':
q = Manager().Queue(6)
with Pool(2) as pool:
pool.apply_async(put, (q, ))
pool.apply_async(get, (q, ))
pool.close()
pool.join()
print('全局退出')
對于兩個進程間的簡單通信可以考慮下性能更高的Pipe。
協程和異步IO
概念
并發即通過一定調度策略使一段時間內多個程序在一個CPU上運行,來模擬并行的效果,但某一瞬間,只有一個程序在運行。
并行是真同時多個程序運行在多個CPU上。
同步即調用IO操作時,等待IO操作完成再返回的方式。
異步即調用IO操作時,不等待IO操作完成就返回的方式。
阻塞即調用函數時當前線程被掛起。
非阻塞即調用函數時當前線程不被掛起,立即返回。
IO多路復用
select,pool,epool都是IO多路復用的機制,本質上都是同步IO。
在并發高、連接活躍度不高的情況下,epoll比select更合適,比如web網站。
在并發不高、連接活躍度高的情況下,select比epool更合適,比如游戲服務。
from selectors import DefaultSelector
這種select代碼不好維護,DEBUG也不好做。
協程
為了可以以同步方式編寫更加高效的代碼,可以使用函數切換代替線程切換。而實現函數執行中的暫停、切換、傳值需要生成器的某些特性。
def gen():
x = yield '123'
print(x)
yield '456'
if __name__ == '__main__':
g = gen()
print(next(g)) # 此處獲取一個值后生成器暫停等待傳入值
print(g.send('x')) # 此處將值傳入生成器并解除生成器暫停狀態并嘗試返回下一個yield
yield from
data = {'a': [1, 2, 3], 'b': [4, 5, 6], 'c': [7, 8, 9]}
def gen():
li = list()
s = 0
while 1:
tmp = yield
if tmp:
li.append(tmp)
s += tmp
else:
break
return s, li
def shell(data_new, k):
while True:
data_new[k] = yield from gen()
if __name__ == '__main__':
data_new = dict()
for k, v in data.items():
sh = shell(data_new, k)
sh.send(None)
for num in v:
sh.send(num)
sh.send(None)
print(data_new)
asyncio
import asyncio
import time
async def handle(task):
print(f'處理任務{task}開始')
await asyncio.sleep(2)
print(f'處理任務{task}結束')
if __name__ == '__main__':
start_time = time.time()
loop = asyncio.get_event_loop()
handles = [handle(task) for task in range(1, 20)]
# loop.run_until_complete(asyncio.wait(handles))
loop.run_until_complete(asyncio.gather(*handles)) # gather可以接受分組任務,并且分組任務可以取消,類似場景可以優先考慮gather
print(f'用時:{time.time() - start_time}秒')
asyncio內容有點多,這個筆記字數有點多了,還是在另外一篇筆記中體現吧。
浙公網安備 33010602011771號