前言

並行計算是使用並行計算機來減少單個計算問題所需要的時間,我們可以通過利用編程語言顯式的説明計算中的不同部分如何再不同的處理器上同時執行來設計我們的並行程序,最終達到大幅度提升程序效率的目的。

眾所周知,Python中的GIL限制了Python多線程並行對多核CPU的利用,但是我們仍然可以通過各種其他的方式來讓Python真正利用多核資源, 例如通過C/C++擴展來實現多線程/多進程, 以及直接利用Python的多進程模塊multiprocessing來進行多進程編程。

本文主要嘗試僅僅通過python內置的multiprocessing模塊對自己的動力學計算程序來進行優化和效率提升,其中:

實現了單機利用多核資源來實現並行並進行加速對比

使用manager模塊實現了簡單的多機的分佈式計算

本文並不是對Python的multiprocessing模塊的接口進行翻譯介紹,需要熟悉multiprocessing的童鞋可以參考官方文檔https://docs.python.org/2/library/multiprocessing.html。

正文

最近想用自己的微觀動力學程序進行一系列的求解並將結果繪製成二維Map圖進行可視化,這樣就需要對二維圖上的多個點進行計算並將結果收集起來並進行繪製,由於每個點都需要進行一次ODE積分以及牛頓法求解方程組,因此要串行地繪製整張圖可能會遇到極低的效率問題尤其是對參數進行測試的時候,每畫一張圖都需要等很久的時間。其中繪製的二維圖中每個點都是獨立計算的,於是很自然而然的想到了進行並行化處理。

串行的原始版本

由於腳本比較長,而且實現均為自己的程序,腳本的大致結構如下, 本質是一個二重循環,循環的變量分別為反應物氣體(O2 和 CO)的分壓的值:


import time

import numpy as np

# 省略若干...

pCOs = np.linspace(1e-5, 0.5, 10)

pO2s = np.linspace(1e-5, 0.5, 10)

if "__main__" == __name__:

try:

start = time.time()

for i, pO2 in enumerate(pO2s):

# ...

for j, pCO in enumerate(pCOs):

# 針對當前的分壓值 pCO, pO2進行動力學求解

# 具體代碼略...

end = time.time()

t = end - start

finally:

# 收集計算的結果並進行處理繪圖

整體過程就這麼簡單,我需要做的就是使用multiprocessing的接口來對這個二重循環進行並行化。

使用單核串行繪製100個點所需要的時間如下, 總共花了240.76秒:

python學習交流羣:923414804,羣內每天分享乾貨,包括最新的企業級案例學習資料和零基礎入門教程,歡迎小夥伴入羣學習。

二維map圖繪製的效果如下:

進行多進程並行處理

multiprocessing模塊

multiprocessing模塊提供了類似threading模塊的接口,並對進程的各種操作進行了良好的封裝,提供了各種進程間通信的接口例如Pipe, Queue等等,可以幫助我們實現進程間的通信,同步等操作。

使用Process類來動態創建進程實現並行

multiprocessing模塊提供了Process能讓我們通過創建進程對象並執行該進程對象的start方法來創建一個真正的進程來執行任務,該接口類似threading模塊中的線程類Thread.

但是當被操作對象數目不大的時候可以使用Process動態生成多個進程,但是如果需要的進程數一旦很多的時候,手動限制進程的數量以及處理不同進程返回值會變得異常的繁瑣,因此這個時候我們需要使用進程池來簡化操作。

使用進程池來管理進程

multiprocessing模塊提供了一個進程池Pool類,負責創建進程池對象,並提供了一些方法來講運算任務offload到不同的子進程中執行,並很方便的獲取返回值。例如我們現在要進行的循環並行便很容易的將其實現。

對於這裏的單指令多數據流的並行,我們可以直接使用Pool.map()來將函數映射到參數列表中。Pool.map其實是map函數的並行版本,此函數將會阻塞直到所有進程全部結束,而且此函數返回的結果順序仍然不變。

首先,我先把針對每對分壓數據的處理過程封裝成一個函數,這樣可以將函數對象傳遞給子進程執行。


import time

from multiprocessing import Pool

import numpy as np

# 省略若干...

pCOs = np.linspace(1e-5, 0.5, 10)

pO2s = np.linspace(1e-5, 0.5, 10)

def task(pO2):

'''接受一個O2分壓,根據當前的CO分壓進行動力學求解'''

# 代碼細節省略...

if "__main__" == __name__:

try:

start = time.time()

pool = Pool()                # 創建進程池對象,進程數與multiprocessing.cpu_count()相同

tofs = pool.map(task, pCOs)  # 並行執行函數

end = time.time()

t = end - start

finally:

# 收集計算的結果並進行處理繪圖

使用兩個核心進行計算,計算時間從240.76s降到了148.61秒, 加速比為1.62

對不同核心的加速效果進行測試

為了查看使用不同核心數對程序效率的改善,我對不同的核心數和加速比進行了測試繪圖,效果如下:

運行核心數與程序運行時間:

運行核心數與加速比:

可見,由於我外層循環只循環了10次因此使用的核心數超過10以後核心數的增加並不能對程序進行加速,也就是多餘的核心都浪費掉了。

使用manager實現簡單的分佈式計算

前面使用了multiprocessing包提供的接口我們使用了再一台機器上進行多核心計算的並行處理,但是multiprocessing的用處還有更多,通過multiprocessing.managers模塊,我們可以實現簡單的多機分佈式並行計算,將計算任務分佈到不同的計算機中運行。

Managers提供了另外的多進程通信工具,他提供了在多台計算機之間共享數據的接口和數據對象,這些數據對象全部都是通過代理類實現的,比如ListProxy和DictProxy等等,他們都實現了與原生list和dict相同的接口,但是他們可以通過網絡在不同計算機中的進程中進行共享。

好了現在我們開始嘗試將繪圖程序改造成可以在多台計算機中分佈式並行的程序。改造的主要思想是:

使用一台計算機作為服務端(server),此台計算機通過一個Manager對象來管理共享對象,任務分配以及結果的接收,並再收集結果以後進行後處理(繪製二維map圖)。

其他多台計算機可以作為客户端來接收server的數據進行計算,並將結果傳到共享數據中,讓server可以收集。同時再client端可以同時進行上文所實現的多進程並行來充分利用計算機的多核優勢。

大致可總結為下圖:

服務進程

首先服務端需要一個manager對象來管理共享對象


def get_manager():

'''創建服務端manager對象.

'''

# 自定義manager類

class JobManager(BaseManager):

pass

# 創建任務隊列,並將此數據對象共享在網絡中

jobid_queue = Queue()

JobManager.register('get_jobid_queue', callable=lambda: jobid_queue)

# 創建列表代理類,並將其共享再網絡中

tofs = [None]*N

JobManager.register('get_tofs_list', callable=lambda: tofs, proxytype=ListProxy)

# 將分壓參數共享到網絡中

JobManager.register('get_pCOs', callable=lambda: pCOs, proxytype=ListProxy)

JobManager.register('get_pO2s', callable=lambda: pCOs, proxytype=ListProxy)

# 創建manager對象並返回

manager = JobManager(address=(ADDR, PORT), authkey=AUTHKEY)

return manager

BaseManager.register是一個類方法,它可以將某種類型或者可調用的對象綁定到manager對象並共享到網絡中,使得其他在網絡中的計算機能夠獲取相應的對象。

例如,

1

JobManager.register('get_jobid_queue', callable=lambda: jobid_queue)

我就將一個返回任務隊列的函數對象同manager對象綁定並共享到網絡中,這樣在網絡中的進程就可以通過自己的manager對象的get_jobid_queue方法得到相同的隊列,這樣便實現了數據的共享.

2. 創建manager對象的時候需要兩個參數,

address, 便是manager所在的ip以及用於監聽與服務端連接的端口號,例如我如果是在內網中的192.168.0.1地址的5000端口進行監聽,那麼此參數可以是('192.169.0.1, 5000)`

authkey, 顧名思義,就是一個認證碼,用於驗證客户端時候可以連接到服務端,此參數必須是一個字符串對象.

進行任務分配

上面我們將一個任務隊列綁定到了manager對象中,現在我需要將隊列進行填充,這樣才能將任務發放到不同的客户端來進行並行執行。


def fill_jobid_queue(manager, nclient):

indices = range(N)

interval = N/nclient

jobid_queue = manager.get_jobid_queue()

start = 0

for i in range(nclient):

jobid_queue.put(indices[start: start+interval])

start += interval

if N % nclient > 0:

jobid_queue.put(indices[start:])

這裏所謂的任務其實就是相應參數在list中的index值,這樣不同計算機中得到的結果可以按照相應的index將結果填入到結果列表中,這樣服務端就能在共享的網絡中收集各個計算機計算的結果。

啓動服務端進行監聽


def run_server():

# 獲取manager

manager = get_manager()

print "Start manager at {}:{}...".format(ADDR, PORT)

# 創建一個子進程來啓動manager

manager.start()

# 填充任務隊列

fill_jobid_queue(manager, NNODE)

shared_job_queue = manager.get_jobid_queue()

shared_tofs_list = manager.get_tofs_list()

queue_size = shared_job_queue.qsize()

# 循環進行監聽,直到結果列表被填滿

while None in shared_tofs_list:

if shared_job_queue.qsize() < queue_size:

queue_size = shared_job_queue.qsize()

print "Job picked..."

return manager

任務進程

服務進程負責進行簡單的任務分配和調度,任務進程則只負責獲取任務並進行計算處理。

在任務進程(客户端)中基本代碼與我們上面單機中的多核運行的腳本基本相同(因為都是同一個函數處理不同的數據),但是我們也需要為客户端創建一個manager來進行任務的獲取和返回。


def get_manager():

class WorkManager(BaseManager):

pass

# 由於只是從共享網絡中獲取,因此只需要註冊名字即可

WorkManager.register('get_jobid_queue')

WorkManager.register('get_tofs_list')

WorkManager.register('get_pCOs')

WorkManager.register('get_pO2s')

# 這裏的地址和驗證碼要與服務端相同才可以進行數據共享

manager = WorkManager(address=(ADDR, PORT), authkey=AUTHKEY)

return manager

在客户端我們仍然可以多進程利用多核資源來加速計算。


if "__main__" == __name__:

manager = get_manager()

print "work manager connect to {}:{}...".format(ADDR, PORT)

# 將客户端本地的manager連接到相應的服務端manager

manager.connect()

# 獲取共享的結果收集列表

shared_tofs_list = manager.get_tofs_list()

# 獲取共享的任務隊列

shared_jobid_queue = manager.get_jobid_queue()

# 從服務端獲取計算參數

pCOs = manager.get_pCOs()

shared_pO2s = manager.get_pO2s()

# 創建進程池在本地計算機進行多核並行

pool = Pool()

while 1:

try:

indices = shared_jobid_queue.get_nowait()

pO2s = [shared_pO2s[i] for i in indices]

print "Run {}".format(str(pO2s))

tofs_2d = pool.map(task, pO2s)

# Update shared tofs list.

for idx, tofs_1d in zip(indices, tofs_2d):

shared_tofs_list[idx] = tofs_1d

# 直到將任務隊列中的任務全部取完,結束任務進程

except Queue.Empty:

break

下面我將在3台在同一局域網中的電腦來進行簡單的分佈式計算測試,

其中一台是實驗室器羣中的管理節點, 內網ip為10.10.10.245

另一台為集羣中的一個節點, 共有12個核心

最後一台為自己的本本,4個核心

先在服務端運行服務腳本進行任務分配和監聽:

1

python server.py

2. 在兩個客户端運行任務腳本來獲取任務隊列中的任務並執行

1

python worker.py

當任務隊列為空且任務完成時,任務進程終止; 當結果列表中的結果收集完畢時,服務進程也會終止。

執行過程如圖:

執行結果如下圖:

上面的panel為服務端監聽,左下為自己的筆記本運行結果,右下panel為集羣中的其中一個節點。

可見運行時間為56.86s,無奈,是我的本子脱了後腿(-_-!)

總結

本文通過python內置模塊multiprocessing實現了單機內多核並行以及簡單的多台計算機的分佈式並行計算,multiprocessing為我們提供了封裝良好並且友好的接口來使我們的Python程序更方面利用多核資源加速自己的計算程序,希望能對使用python實現並行話的童鞋有所幫助。

參考