前言
在異步方法中調用同步方法,會直接阻塞整個事件循環,導致應用在執行同步方法期間無法處理其他任何併發請求,從而拖垮整個服務的性能。
為了解決這個問題,核心思路是將同步方法交給外部線程池去執行。
方法1, 使用 to_thread
Python 3.9 後可以使用 asyncio.to_thread 方法,將同步函數跑在獨立的線程中,並返回一個協程供 await
import asyncio
import time
from fastapi import FastAPI
app = FastAPI()
def sync_task(name: str):
time.sleep(2)
return f"Hello {name}, sync task done!"
@app.get("/async-call")
async def async_endpoint():
result = await asyncio.to_thread(sync_task, "World")
return {"message": result}
方法2, 直接定義同步路由
FastAPI支持定義同步路由,FastAPI會自動在一個外部線程池中運行該函數。不過出於代碼整體設計的考慮,個人不建議這麼做。
方法3, 使用 run_in_threadpool
FastAPI 基於 Starlette, 而 Starlette 提供一個工具函數 run_in_threadpool,這種方式類似於 asyncio.to_thread,在某些老版本的 FastAPI 或特定的 contextvars 傳遞場景下更常用。
from fastapi.concurrency import run_in_threadpool
@app.get("/method3")
async def starlette_endpoint():
result = await run_in_threadpool(sync_task, "Starlette")
return {"message": result}
方法4, 使用進程池
對於CPU密集型任務,應該使用多進程ProcessPoolExecutor來操作
import concurrent.futures
import math
from fastapi import FastAPI
app = FastAPI()
# 創建一個全局進程池
executor = concurrent.futures.ProcessPoolExecutor()
def cpu_intensive_calculation(n: int):
# 模擬重度 CPU 計算
return sum(math.isqrt(i) for i in range(n))
@app.get("/cpu-bound-task")
async def cpu_task():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor, cpu_intensive_calculation, 10**7)
return {"result": result}