Python 使用非阻塞 IO
Python 的默認 IO 沒有非阻塞 (Non-blocking) 的功能,默認情況下,以任何方式調用 read,都可能會被阻塞。
subprocess 中的 stdout/stderr 流
場景描述
假設我們現在需要通過 subprocess 調用一個子程序比如 aria2c, 然后需要實時分析它的 stdout 內容。
那么問題就來了:
import time
import shlex
import subprocess
from subprocess import PIPE
cmd = shlex.split("ria2c -x16 'http://host/file.zip'", stdout=PIPE)
aria2c = subprocess.Popen(cmd, capture_output=True, text=True, encoding='utf-8')
while aria2c.poll() is None: # is running
line = aria2c.stdout.readline() # blocking
# wait
time.sleep(1)
解決辦法
- 使用新線程去調用 read() 并保存到一個 buffer 中,主線程直接讀 buffer。
- 開銷太大
- 使用標準庫 select 檢查是否可讀,比較優雅。
- 使用 fcntl 為 stdout 設置 O_NONBLOCK 標志
socket 連接中的 io 流
非阻塞讀取的實現:
import time
import socket
from asyncio import IncompleteReadError
class SocketStreamReader:
def __init__(self, sock: socket.socket):
sock.setblocking(False) # non-blocking
self.sock = sock
self._recv_buffer = bytearray()
def read(self, num_bytes: int = -1) -> bytes:
raise NotImplementedError
def readexactly(self, num_bytes: int) -> bytes:
buf = bytearray(num_bytes)
pos = 0
while pos < num_bytes:
n = self._recv_into(memoryview(buf)[pos:])
if n == 0:
raise IncompleteReadError(bytes(buf[:pos]), num_bytes)
pos += n
return bytes(buf)
def readline(self) -> bytes:
return self.readuntil(b"\n")
def readuntil(self, separator: bytes = b"\n") -> bytes:
if len(separator) != 1:
raise ValueError("Only separators of length 1 are supported.")
chunk = bytearray(4096)
start = 0
buf = bytearray(len(self._recv_buffer))
bytes_read = self._recv_into(memoryview(buf))
assert bytes_read == len(buf)
while True:
idx = buf.find(separator, start)
if idx != -1:
break
start = len(self._recv_buffer)
bytes_read = self._recv_into(memoryview(chunk))
if bytes_read == 0:
return None
buf += memoryview(chunk)[:bytes_read]
result = bytes(buf[: idx + 1])
self._recv_buffer = b"".join(
(memoryview(buf)[idx + 1 :], self._recv_buffer)
)
return result
def _recv_into(self, view: memoryview) -> int:
bytes_read = min(len(view), len(self._recv_buffer))
view[:bytes_read] = self._recv_buffer[:bytes_read]
self._recv_buffer = self._recv_buffer[bytes_read:]
if bytes_read == len(view):
return bytes_read
try:
bytes_read += self.sock.recv_into(view[bytes_read:])
except BlockingIOError: # socket not avaliable now
return 0
return bytes_read
socket_pair = "192.168.31.22", 8080
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(socket_pair)
socket_reader = SocketStreamReader(sock)
while True: # 問題:這里會
line = reader.readline()
if line is None:
time.sleep(0.0001)
else:
line = line.decode()
print(line)

浙公網安備 33010602011771號