动态

详情 返回 返回

Python筆記四之協程 - 动态 详情

本文首發於公眾號:Hunter後端

原文鏈接:Python筆記四之協程

協程是一種運行在單線程下的併發編程模型,它的特點是能夠在一個線程內實現多個任務的併發操作,通過在執行任務時主動讓出執行權,讓其他任務繼續執行,從而實現併發。

以下所有的代碼都是在 Python 3.8 版本中運行。

本篇筆記目錄如下:

  1. asyncio
    async
    await
  2. 併發運行協程任務

    1. 獲取協程返回結果
    2. asyncio.gather()
  3. 報錯處理
  4. 超時處理
  5. 用協程的方式訪問網絡接口

1、asyncio

在 Python 中,協程使用 asyncio 模塊來實現,asyncio 是用來編寫併發代碼的庫,使用的 async/await 語法。

async

我們使用 async 做前綴將普通函數變成異步函數,比如:

import asyncio 
import time

async def say_after(delay, what):
    now = time.time()
    await asyncio.sleep(delay)  
    print(what, " 花時間:", time.time() - now)
    return time.time(

async def main():
    print("started at: ", time.strftime("%X"))
    await say_after(1, "hello")
    await say_after(2, "world")
    print("finished at: ", time.strftime("%X"))

asyncio.run(main())

函數前加上 async 就將其變成了一個異步函數,在這裏我們通過 asyncio.run() 的方式在外層調用異步函數。

await

在 main() 函數裏,我們通過 await 的方式表示在異步函數,也就是 main 函數裏暫停當前的操作,等待後面跟着的 say_after() 異步函數執行完成。

await say_after() 就是我們前面説過的在執行任務的時候主動讓出執行權,讓其他任務執行。

2、併發運行協程任務

在上面 main() 函數的兩個 await say_after() 中,可以看到兩次 print() 出來的時間差約為 3s,因為我們兩次調用 say_after() 分別用了 1 秒和 2 秒時間,所以這兩次 await 操作是暫停當前任務的串行執行。

如果我們想要實現協程的併發操作,可以使用 asyncio.create_task()

async def main():

    task1 = asyncio.create_task(say_after(1, "hello"))
    task2 = asyncio.create_task(say_after(2, "hello"))
    print("started at: ", time.strftime("%X"))
    await task1
    await task2
    print("finished at: ", time.strftime("%X"))

asyncio.run(main())

# started at:  11:40:03
# hello  花時間: 1.0013182163238525
# hello  花時間: 2.001201868057251
# finished at:  11:40:05

say_after() 函數中,有一個 await asyncio.sleep() 的操作,它的作用是在協程中主動掛起當前任務一段時間,並將控制權返回給事件循環,允許其他協程繼續執行。

它模擬了在協程中等待一定時間的行為,比如在協程中發起網絡請求後,協程會掛起等待網絡請求的響應返回,或者異步 IO 操作中的等待 IO 操作完成等。

所以在上面這個函數操作中,我們通過 asyncio.create_task() 將協程函數 say_after() 添加到事件循環中進行自動調度,並在合適的時機執行。

所以在上面的操作中,程序檢測到 say_after() 中需要進行 sleep 的操作,就會自動對其進行調度,切換到事件循環的下一個任務執行,這樣就實現了協程任務的併發操作。

也因此,程序執行的整體時間會比前面的操作快 1 秒左右。

獲取協程返回結果

協程的返回結果直接在 await 前賦值即可:

result1 = await task1
print(result1)

asyncio.gather()

asyncio.gather() 也可以用於併發執行協程任務,但是與 asyncio.create_task() 略有不同。

create_task() 的操作是將協程函數添加到事件循環中進行調度,返回的是一個 Task 對象,而 gather() 則可以直接接收多個協程任務併發執行,並等待他們全部完成,返回 Future 對象表示任務結果。

gather() 的使用方法如下:

async def main():
    results = await asyncio.gather(
        say_after(1, "hello"),
        say_after(2, "world"),
    )

asyncio.gather() 除了可以接收異步函數,還可以接受 asyncio.create_task() 返回的結果,也就是返回的 task 對象,比如下面的操作也是合法的:

async def main():
    task = asyncio.create_task(say_after(1, "hello"))
    results = await asyncio.gather(
        say_after(1, "hello"),
        say_after(2, "world"),
        task,
    )

3、報錯處理

如果在併發操作中有一些報錯,比如下面的示例:

import asyncio
import time

async def say_after(delay, what):
    now = time.time()
    await asyncio.sleep(delay)
    print(what, " 花時間:", time.time() - now)
    return time.time()

async def say_error(delay, err_msg="error"):
    await asyncio.sleep(delay)
    raise Exception(err_msg


async def main():
    results = await asyncio.gather(
        say_after(1, "hello"),
        say_error(2, "error"),
        say_after(3, "world"),
    )

    print(results)

asyncio.run(main())

在上面的操作中,三個協程函數,在執行到第二個的時候,程序其實就直接返回報錯了,如果想要忽略報錯繼續執行之後的操作,可以加上 return_exceptions 參數,設為 True

async def main():
    results = await asyncio.gather(
        say_after(1, "hello"),
        say_error(2, "error"),
        say_after(3, "world"),
        return_exceptions=True,
    )
    
    print(result)

# [1691045418.774685, Exception('error'), 1691045420.774549]

這樣就會將報錯信息直接也返回,且執行之後的協程函數。

4、超時處理

我們可以為協程函數執行的時間預設一個時間,如果超出這個時間則返回報錯信息,我們可以使用 asyncio.wait_for(),比如:

async def main_4():
    results = await asyncio.gather(
        say_after(1, "hello"),
        say_error(2, "error"),
        asyncio.wait_for(say_after(30, "world"), timeout=3),
        return_exceptions=True,
    )

    print(results)
# [1691045925.265661, Exception('error'), TimeoutError()]

在上面的操作中,我們給第三個任務加了個 3 秒的超時處理,但是該協程會執行 30 秒,所以返回的報錯裏是一個 TimeoutError()

5、用協程的方式訪問網絡接口

接下來我們用協程的方式來訪問一個接口,與不用協程的方式進行比對。

首先我們建立一個服務端,用 Django、Flask都可以,只是提供一個訪問接口,以下是用 Flask 建立的示例:

from flask import Flask
import time

def create_app():
    app = Flask(__name__)
    
    @app.route("/test")
    def test():
        time.sleep(1)
        return str(time.time())
    
    return app

運行這段代碼就提供了我們需要的服務器接口。

使用協程的方式訪問接口我們這裏用到的是 aiohttp,是第三方庫,需要提前安裝:

pip3 install aiohttp==3.8.5

進行測試的腳本如下:

import asyncio
import aiohttp
import requests
import time

CALL_TIMES = 10000


def connect_url(url):
    return requests.get(url)


def run_connect_url(url):
    results = []
    for i in range(CALL_TIMES):
        result = connect_url(url)
        results.append(result)
    return results


async def connect_url_by_session(session, url):
    async with session.get(url) as response:
        return await response.text()


async def run_connect(url):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for i in range(CALL_TIMES):
            tasks.append(connect_url_by_session(session, url))
        results = await asyncio.gather(*tasks)
    return results


if __name__ == "__main__":
    url = "http://127.0.0.1:5000/test"

    t1 = time.time()
    run_connect_url(url)
    print(f"串行調用次數: {CALL_TIMES},耗時:{time.time() - t1}")

    t2 = time.time()
    asyncio.run(run_connect(url))
    print(f"協程調用次數:{CALL_TIMES},耗時:{time.time() - t2}")

在這裏,aiohttp 的具體用法看代碼即可,我們可以通過修改 CALL_TIMES 來修改調用次數,我這裏調用 1000 次和 10000 次的結果分別如下:

串行調用次數: 1000,耗時:3.2450389862060547
協程調用次數:1000,耗時:1.3642120361328125

串行調用次數: 10000,耗時:32.830286741256714
協程調用次數:10000,耗時:12.519049882888794

可以看到使用協程的方式對於接口的訪問效率有了明顯的提升。

如果想獲取更多相關文章,可掃碼關注閲讀:
image.png

user avatar jilodream 头像 chunzhendegaoshan 头像 eolink 头像 kubeexplorer 头像 runyubingxue 头像 youyudeshangpu_cny857 头像 jianghushinian 头像 banxiazhimo 头像 aipaobudehoutao 头像 fanjiapeng 头像 lintp 头像 liutos 头像
点赞 30 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.