關於協程與RxPY
協程(coroutine)是一個有很長曆史的概念,它是計算機程序的一類組件,推廣了協作式多任務的子程序。其詳細的概念和歷史請參照維基百科中的條目:https://en.wikipedia.org/wiki/Coroutine 。
Python天生支持的生成器(generator)其實就是協程的一種實現,生成器允許執行被掛起與被恢復。但是由於缺乏更多語法上的支持,以及缺乏利用生成器實現異步編程的成熟模式,限制了生成器作為協程參與協作式多任務編程的用途。不過現在情況發生了改變,Python自3.6版本開始添加了async/await的語法直接支持協程的異步編程,同時在asyncio庫中提供了協程編程的接口以及必要的基礎實現。在Python裏,多個協程是通過消息循環(Evet Loop)的調度從而異步執行的。請參見筆者的另一篇文章,感受一下協程與傳統的順序執行以及多線程之間的聯繫和區別:Python 協程(Coroutine)體驗
RxPY是響應式編程(Reactive programming)在Python中的實現。響應式編程或反應式編程是一種面向數據流和變化傳播的聲明式編程範式。其詳細的信息請參照維基百科中的條目:https://en.wikipedia.org/wiki/Reactive_programming。在RxPY中,數據流事實上是以同步的方式執行的。組成數據處理管道(Data Pipe)的所有函數均以同步的運行棧的方式調度。只是通過全面的依賴反轉使得開發者能以方便靈活的聲明方式來組裝和更改數據處理管道。從而獲得極大的靈活性和可擴展性。
在一些特定的情況下,如何結合這兩種編程範式,使得它們能夠相互協作就變得非常有趣。這樣做既能利用Reactive聲明編程的方式來簡化程序,又能利用協程異步運行的方式來提高IO的並行程度,從而提高程序的執行速度和效率。例如我們有數據處理管道,在其數據源或是數據目標支持異步IO的情況下,就可以考慮二者的結合運用。
這個帖子 https://blog.oakbits.com/rxpy-and-asyncio.html 提供了一些關於二者結合使用的用例和代碼實現。在 RxPY 文檔 裏也提供了一個關於在RxPY中使用asyncio作為數據源的例子。
關於協程池
跟線程相比,協程本身以及其調度都要輕量很多。但這並不意味着協程可以不受限制地部署和運行。其限制主要來自於內外兩個方向。內部的限制主要是有限的運行資源例如內存,處理器等。我們知道一個協程對應於一個內存對象,過多的活動協程將有可能會耗盡內存,也可能會導致等待處理器的時間超過IO的等待時間,從而不再能提高程序運行的速度,使得增加的內存資源消耗變得毫無意義。外部的限制主要來自於數據源或數據目標的並行性限制。例如我們要通過數據API分頁地獲取一個大數據集,而作為數據源的API很可能對能夠安全訪問的並行度有着或明或暗的限制。超過限制數目的並行請求將有可能被拒絕,甚至會導致API本身運行/數據發生混亂,直至停止服務。協程池是一種在協程環境中限制並行度的簡單有效的方法。跟多線程環境下的線程池的概念相似,協程池擁有一定數量的協程,每個協程獨立地領取並執行任務。當某個協程完成一個任務時,該協程將繼續領取下一個任務直至所有任務完成。除此以外,asynio中提供的同步原語也可以被用來限制並行的程度和活動協程的數量,同步原語的使用不在本文的討論範圍。
程序實列
以下程序可以在Python 3.6+上運行。
import asyncio
import random
import rx
import functools
import selectors
import time
def job_generator(): # 1
job_id = 1
while True:
yield job_id
job_id += 1
async def worker(worker_ame, job_gen, observer): # 2
for job_id in job_gen:
if job_id >= 200: # 3
job_gen.close()
break
await asyncio.sleep(random.uniform(0.01, 0.1)) # 4
observer.on_next(f'{worker_ame}: {job_id}') # 5
def data_source(): # 6
def on_subscribe(observer, scheduler): # 7
async def _aio_sub(loop): # 8
tasks = [] # 9
job_gen = job_generator()
for i in range(3):
task = asyncio.create_task(worker(f'Worker-{i}', job_gen, observer))
tasks.append(task)
# Wait until all worker tasks are finished/cancelled.
try:
await asyncio.gather(*tasks) # 10
loop.call_soon(observer.on_completed)
except Exception as e: # 11
loop.call_soon(functools.partial(observer.on_error, e))
for task in tasks:
task.cancel()
raise e
selector = selectors.SelectSelector()
loop = asyncio.SelectorEventLoop(selector)
asyncio.set_event_loop(loop)
loop.run_until_complete(_aio_sub(loop))
return rx.create(on_subscribe)
if __name__ == '__main__':
started_at = time.monotonic()
source = data_source() # 12
source.subscribe( # 13
on_next = lambda i: print("Received {0}".format(i)),
on_error = lambda e: print("Error Occurred: {0}".format(e)),
on_completed = lambda: print("Done!"),
)
total_time = time.monotonic() - started_at # 14
print('====')
print(f'Used {total_time:.2f} seconds')
一個可能的輸出看起來像是這個樣子:
c:\PortableApps>python RxPYWithAsyncIO.py
Received Worker-0: 1
Received Worker-0: 4
Received Worker-0: 5
Received Worker-1: 2
Received Worker-2: 3
Received Worker-0: 6
Received Worker-2: 8
Received Worker-1: 7
Received Worker-1: 11
Received Worker-0: 9
..........
Received Worker-1: 188
Received Worker-0: 184
Received Worker-2: 190
Received Worker-0: 192
Received Worker-1: 191
Received Worker-1: 195
Received Worker-0: 194
Received Worker-2: 193
Received Worker-1: 196
Received Worker-1: 199
Received Worker-2: 198
Received Worker-0: 197
Done!
====
Used 3.52 seconds
注意在輸出中我們期望看到的幾處關鍵內容:
- 200行形如 Recieved <CoroutineName>: <Job_Id> 的輸出。其中CoroutineName為Worker-0,1,2之一。Job_Id的值為0-199。特別的,輸出的Job_Id不會是按照順序的,這是因為在代碼中,每個Job的運行時間是一個隨機值。不同job的運行時間有長有短。另外可以觀察到所有的協程均參與了任務的處理,它們在輸出中交替出現。
- 所有Job完成後輸出的“Done!”。這是在所有協程完成以後,Observable發送on_complete到註冊(Subscribe)的Observer,由Observer打印出的消息。
- 最後輸出的總的消耗的時間,通常在使用3個協程的情況下是3秒多。可以看到本次運行的時間是3.52秒
下面讓我們沿着代碼中標註的序號來做詳細的解讀。 - job_generator是任務生成器,其實例為所有協程共享。工作協程worker從這裏領取任務並完成。之後領取下一個任務。從一個唯一的任務生成器領取任務保證了任務不會被重複分發。在訪問數據API並且分頁獲取數據的應用中,任務生成器所產生的內容可能是帶有頁號的URL;在網絡爬蟲中,其產生的內容可能是目標網站的地址。注意到這裏使用了job而不是task作為其名稱,這是為了跟asyncio中的task(對應於一個可並行運行的協程)有所區別。
- worker是一個異步函數,運行起來就是一個工作協程。多個worker組成了本例中數據源的協程池。Worker接受一個名字用於輸出日誌,一個任務生成器job_generator的實例,以及作為數據目標的observer。Worker持續地從任務生成器中領取任務,完成任務,將結果發送到observer。由於協程都是輪流調度的,不會發生多個協程同時運行的情況,因此在協程間共享的數據資源例如job_generator和observer不需要做任何的保護。
- 選擇在worker中判斷結束條件是有現實意義的。很多時候我們不太能夠在一開始就確定所有的任務,很可能需要在一個或者多個任務的結果中去判斷是否還有餘下的工作。例如在分頁獲取數據的應用中,常常需要在返回的數據集中查看是否有下一頁的URL或是判斷當前頁號是否已經達到或超過了數據集的總頁數。當發現結束條件達到的時候,除了結束本協程外,還要通知其他協程儘快結束工作,為此關閉任務生成器是一個好方法。當其他協程完成手上的任務再次嘗試領取時會發現已經沒有更多的任務了。
- 這行代碼模擬了一個異步的IO,等待一段隨機的時間。在現實環境中通常會使用異步IO獲取數據,例如使用aiohttp訪問http數據API。
- 通過調用observer的on_next方法將任務的結果放入數據處理管道。這裏只是簡單地將job_id加上處理協程的名字作為結果。現實環境中大多需要處理和轉換IO獲取的結果數據。注意這裏的on_next方法是同步方法,如果管道中的數據處理過於耗時的話,會嚴重阻塞整個協程池的運行。有必要的話,這裏需要使用並行化的方法來保證on_next調用盡快返回,這部分內容不在本文的討論範圍之內。
- 一個簡單的factory函數,使用RxPY的create方法將函數on_subscribe包裝成observable並返回。這是RxPY中常用的手法,詳情請參見其文檔和實例。
- on_subscribe函數是跨越同步和異步世界的橋樑,它本身實現了RxPY關於Observable的接口協議,是一個傳統的同步函數。在其內部定義了頂層的異步函數_aio_sub,被驅動運行時首先創建並啓動消息循環(Event Loop),使用loop.run_until_complete將異步函數_aio_sub放置並運行在消息循環上。這一步同時也將自己的線程動力(就是活動的運行棧)交到消息循環,並等待_aio_sub運行結束,最後以傳統同步的方式結束並退出。
- _aio_sub是異步世界的頂層入口。當被消息循環驅動運行的時候,它首先創建協程池,然後等待所有在協程池中的協程運行結束,收集運行結果並返回。
- 在協程池中,每個協程worker都將被創建成一個asyncio的task,這使得它們能夠被消息循環交替地調度運行,並由創建者等待收集運行結果。本例地代碼中可以看出我們部署了3個協程。讀者可以自行調整協程和任務的數量,觀察總的運行時間。
- asyncio.gather有三個作用。等待所有task運行結束;收集所有task的返回值,本例中由於worker本身沒有返回值,所以當一切運行順利的話,收集到的返回值將會全部是空值None;當協程運行中有任何異常拋出時,將會拋出第一個產生的異常。
- 本例中當第一個異常被捕獲時,程序將異常傳遞到數據流上從而終止數據流。然後取消所有協程的運行並拋出異常,這事實上終止了數據管道的運行。這樣的行為對於分頁獲取數據的應用是合理的。對於需要爬大量網站的網絡爬蟲來説,少數網站拋出異常是正常的,在這種情況下應該只將捕獲的異常記入日誌,然後繼續等待協程池的運行直至結束。這需要首先改動協程worker的運行邏輯。
- 創建observable作為數據源。RxPY的常見代碼。
- 在數據源上註冊一個最簡單的observer。該observer僅僅將收到的數據和事件打印出來。在這一步,複雜的數據處理管道也可以被組裝。其最後的註冊調用source.subscribe實際上通過運行on_subscribe函數而啓動了整個數據管道的運行。我們已經知道RxPY的調用是同步的,當source.subscribe運行結束返回的時候,我們能夠確定的是數據管道的運行已經結束,要麼所有期望的數據都已經通過管道順利處理,要麼在處理中發生並拋出了異常。
- 計算並輸出運行時間的簡單方法,沒有什麼需要多解釋的。
相關鏈接
- Python asyncio官方文檔:https://docs.python.org/3/lib...
- RxPY:https://pypi.org/project/Rx/