Fastapi項(xiàng)目,在接口中調(diào)用同步方法,如果該同步方法,耗時(shí)較長(zhǎng)(比如連接redis超時(shí)),會(huì)造成整個(gè)項(xiàng)目接口的阻塞,這是任何接口的訪問(wèn)都會(huì)被阻塞超時(shí)
一、為什么會(huì)阻塞
FastAPI 是基于異步框架(如 asyncio 或 anyio)構(gòu)建的,它的核心是一個(gè)事件循環(huán)(Event Loop)。事件循環(huán)負(fù)責(zé)調(diào)度和執(zhí)行所有的異步任務(wù)。當(dāng)你在異步函數(shù)中直接調(diào)用同步阻塞代碼時(shí),事件循環(huán)會(huì)被阻塞,無(wú)法繼續(xù)處理其他任務(wù),直到同步代碼執(zhí)行完畢。
from fastapi import FastAPI import time app = FastAPI() def sync_function(): time.sleep(5) # 模擬一個(gè)耗時(shí)的同步操作 return "Done" @app.get("/") async def root(): result = sync_function() # 直接調(diào)用同步函數(shù) return {"result": result}
在這個(gè)例子中time.sleep(5) 是一個(gè)同步阻塞操作,它會(huì)阻塞事件循環(huán) 5 秒鐘。在這期間,F(xiàn)astAPI 無(wú)法處理其他請(qǐng)求,整個(gè)應(yīng)用的并發(fā)性能會(huì)大幅下降。
二、如何避免阻塞
將同步代碼放到線程池或進(jìn)程池中執(zhí)行,將同步代碼改為異步實(shí)現(xiàn)。有以下幾種常用解決方案
1. 使用 run_in_threadpool 或 asyncio.to_thread
將同步代碼放到線程池中執(zhí)行,避免阻塞事件循環(huán)。
from fastapi import FastAPI from fastapi.concurrency import run_in_threadpool import time app = FastAPI() def sync_function(): time.sleep(5) # 模擬一個(gè)耗時(shí)的同步操作 return "Done" @app.get("/") async def root(): result = await run_in_threadpool(sync_function) # 在線程池中運(yùn)行 return {"result": result}
2 使用 run_in_processpool
對(duì)于 CPU 密集型的同步代碼,可以使用進(jìn)程池來(lái)避免阻塞事件循環(huán)。
from fastapi import FastAPI from fastapi.concurrency import run_in_processpool import time app = FastAPI() def cpu_intensive_function(): # 模擬一個(gè) CPU 密集型的操作 result = sum(i * i for i in range(10**6)) return result @app.get("/") async def root(): result = await run_in_processpool(cpu_intensive_function) # 在進(jìn)程池中運(yùn)行 return {"result": result}
3. 將同步代碼改為異步實(shí)現(xiàn)
如果可能,盡量將同步代碼改為異步實(shí)現(xiàn)。例如,使用 asyncio.sleep 代替 time.sleep,或者使用異步庫(kù)代替同步庫(kù)。
from fastapi import FastAPI import asyncio app = FastAPI() async def async_function(): await asyncio.sleep(5) # 異步等待 return "Done" @app.get("/") async def root(): result = await async_function() # 直接調(diào)用異步函數(shù) return {"result": result}
4. 使用 anyio.to_thread.run_sync
如果你使用的是 anyio 庫(kù),可以使用 anyio.to_thread.run_sync 來(lái)運(yùn)行同步代碼。
from fastapi import FastAPI import anyio import time app = FastAPI() def sync_function(): time.sleep(5) # 模擬一個(gè)耗時(shí)的同步操作 return "Done" @app.get("/") async def root(): result = await anyio.to_thread.run_sync(sync_function) # 在線程池中運(yùn)行 return {"result": result}
總結(jié)
-
直接調(diào)用同步代碼會(huì)阻塞事件循環(huán),導(dǎo)致整個(gè)應(yīng)用的性能下降。
-
解決方案:
-
對(duì)于 I/O 密集型任務(wù),使用
run_in_threadpool或asyncio.to_thread。 -
對(duì)于 CPU 密集型任務(wù),使用
run_in_processpool。 -
盡量將同步代碼改為異步實(shí)現(xiàn)。
-
-
最佳實(shí)踐:在 FastAPI 中,盡量避免直接調(diào)用同步阻塞代碼,始終使用異步或線程池/進(jìn)程池來(lái)處理同步任務(wù)。
浙公網(wǎng)安備 33010602011771號(hào)