本文首發於公眾號:Hunter後端
原文鏈接:Python筆記四之協程
協程是一種運行在單線程下的併發編程模型,它的特點是能夠在一個線程內實現多個任務的併發操作,通過在執行任務時主動讓出執行權,讓其他任務繼續執行,從而實現併發。
以下所有的代碼都是在 Python 3.8 版本中運行。
本篇筆記目錄如下:
- asyncio
async
await -
併發運行協程任務
- 獲取協程返回結果
- asyncio.gather()
- 報錯處理
- 超時處理
- 用協程的方式訪問網絡接口
1、asyncio
在 Python 中,協程使用 asyncio 模塊來實現,asyncio 是用來編寫併發代碼的庫,使用的 async/await 語法。
async
我們使用 async 做前綴將普通函數變成異步函數,比如:
import asyncio
import time
async def say_after(delay, what):
now = time.time()
await asyncio.sleep(delay)
print(what, " 花時間:", time.time() - now)
return time.time(
async def main():
print("started at: ", time.strftime("%X"))
await say_after(1, "hello")
await say_after(2, "world")
print("finished at: ", time.strftime("%X"))
asyncio.run(main())
函數前加上 async 就將其變成了一個異步函數,在這裏我們通過 asyncio.run() 的方式在外層調用異步函數。
await
在 main() 函數裏,我們通過 await 的方式表示在異步函數,也就是 main 函數裏暫停當前的操作,等待後面跟着的 say_after() 異步函數執行完成。
await say_after() 就是我們前面説過的在執行任務的時候主動讓出執行權,讓其他任務執行。
2、併發運行協程任務
在上面 main() 函數的兩個 await say_after() 中,可以看到兩次 print() 出來的時間差約為 3s,因為我們兩次調用 say_after() 分別用了 1 秒和 2 秒時間,所以這兩次 await 操作是暫停當前任務的串行執行。
如果我們想要實現協程的併發操作,可以使用 asyncio.create_task()
async def main():
task1 = asyncio.create_task(say_after(1, "hello"))
task2 = asyncio.create_task(say_after(2, "hello"))
print("started at: ", time.strftime("%X"))
await task1
await task2
print("finished at: ", time.strftime("%X"))
asyncio.run(main())
# started at: 11:40:03
# hello 花時間: 1.0013182163238525
# hello 花時間: 2.001201868057251
# finished at: 11:40:05
在 say_after() 函數中,有一個 await asyncio.sleep() 的操作,它的作用是在協程中主動掛起當前任務一段時間,並將控制權返回給事件循環,允許其他協程繼續執行。
它模擬了在協程中等待一定時間的行為,比如在協程中發起網絡請求後,協程會掛起等待網絡請求的響應返回,或者異步 IO 操作中的等待 IO 操作完成等。
所以在上面這個函數操作中,我們通過 asyncio.create_task() 將協程函數 say_after() 添加到事件循環中進行自動調度,並在合適的時機執行。
所以在上面的操作中,程序檢測到 say_after() 中需要進行 sleep 的操作,就會自動對其進行調度,切換到事件循環的下一個任務執行,這樣就實現了協程任務的併發操作。
也因此,程序執行的整體時間會比前面的操作快 1 秒左右。
獲取協程返回結果
協程的返回結果直接在 await 前賦值即可:
result1 = await task1
print(result1)
asyncio.gather()
asyncio.gather() 也可以用於併發執行協程任務,但是與 asyncio.create_task() 略有不同。
在 create_task() 的操作是將協程函數添加到事件循環中進行調度,返回的是一個 Task 對象,而 gather() 則可以直接接收多個協程任務併發執行,並等待他們全部完成,返回 Future 對象表示任務結果。
gather() 的使用方法如下:
async def main():
results = await asyncio.gather(
say_after(1, "hello"),
say_after(2, "world"),
)
asyncio.gather() 除了可以接收異步函數,還可以接受 asyncio.create_task() 返回的結果,也就是返回的 task 對象,比如下面的操作也是合法的:
async def main():
task = asyncio.create_task(say_after(1, "hello"))
results = await asyncio.gather(
say_after(1, "hello"),
say_after(2, "world"),
task,
)
3、報錯處理
如果在併發操作中有一些報錯,比如下面的示例:
import asyncio
import time
async def say_after(delay, what):
now = time.time()
await asyncio.sleep(delay)
print(what, " 花時間:", time.time() - now)
return time.time()
async def say_error(delay, err_msg="error"):
await asyncio.sleep(delay)
raise Exception(err_msg
async def main():
results = await asyncio.gather(
say_after(1, "hello"),
say_error(2, "error"),
say_after(3, "world"),
)
print(results)
asyncio.run(main())
在上面的操作中,三個協程函數,在執行到第二個的時候,程序其實就直接返回報錯了,如果想要忽略報錯繼續執行之後的操作,可以加上 return_exceptions 參數,設為 True:
async def main():
results = await asyncio.gather(
say_after(1, "hello"),
say_error(2, "error"),
say_after(3, "world"),
return_exceptions=True,
)
print(result)
# [1691045418.774685, Exception('error'), 1691045420.774549]
這樣就會將報錯信息直接也返回,且執行之後的協程函數。
4、超時處理
我們可以為協程函數執行的時間預設一個時間,如果超出這個時間則返回報錯信息,我們可以使用 asyncio.wait_for(),比如:
async def main_4():
results = await asyncio.gather(
say_after(1, "hello"),
say_error(2, "error"),
asyncio.wait_for(say_after(30, "world"), timeout=3),
return_exceptions=True,
)
print(results)
# [1691045925.265661, Exception('error'), TimeoutError()]
在上面的操作中,我們給第三個任務加了個 3 秒的超時處理,但是該協程會執行 30 秒,所以返回的報錯裏是一個 TimeoutError()。
5、用協程的方式訪問網絡接口
接下來我們用協程的方式來訪問一個接口,與不用協程的方式進行比對。
首先我們建立一個服務端,用 Django、Flask都可以,只是提供一個訪問接口,以下是用 Flask 建立的示例:
from flask import Flask
import time
def create_app():
app = Flask(__name__)
@app.route("/test")
def test():
time.sleep(1)
return str(time.time())
return app
運行這段代碼就提供了我們需要的服務器接口。
使用協程的方式訪問接口我們這裏用到的是 aiohttp,是第三方庫,需要提前安裝:
pip3 install aiohttp==3.8.5
進行測試的腳本如下:
import asyncio
import aiohttp
import requests
import time
CALL_TIMES = 10000
def connect_url(url):
return requests.get(url)
def run_connect_url(url):
results = []
for i in range(CALL_TIMES):
result = connect_url(url)
results.append(result)
return results
async def connect_url_by_session(session, url):
async with session.get(url) as response:
return await response.text()
async def run_connect(url):
async with aiohttp.ClientSession() as session:
tasks = []
for i in range(CALL_TIMES):
tasks.append(connect_url_by_session(session, url))
results = await asyncio.gather(*tasks)
return results
if __name__ == "__main__":
url = "http://127.0.0.1:5000/test"
t1 = time.time()
run_connect_url(url)
print(f"串行調用次數: {CALL_TIMES},耗時:{time.time() - t1}")
t2 = time.time()
asyncio.run(run_connect(url))
print(f"協程調用次數:{CALL_TIMES},耗時:{time.time() - t2}")
在這裏,aiohttp 的具體用法看代碼即可,我們可以通過修改 CALL_TIMES 來修改調用次數,我這裏調用 1000 次和 10000 次的結果分別如下:
串行調用次數: 1000,耗時:3.2450389862060547
協程調用次數:1000,耗時:1.3642120361328125
串行調用次數: 10000,耗時:32.830286741256714
協程調用次數:10000,耗時:12.519049882888794
可以看到使用協程的方式對於接口的訪問效率有了明顯的提升。
如果想獲取更多相關文章,可掃碼關注閲讀: