介紹如何使用 Python 的 multiprocessing
模組,開發多核心平行運算程式。
Python 的
multiprocessing
是一個多行程模組,功能類似多執行緒的 threading
模組,可以讓開發者使用多核心的 CPU 進行平行化程式設計,加速程式處理速度。
Pool
資源池
Pool
是由多個工作者行程(worker process)所組成的資源池,可以將工作分配給其內部的各個工作者行程進行平行處理。
map
函數
Pool
最簡單的使用方式就是將資料以列表的形式打包好,透過其 map
函數將實際的工作處理函數與列表資料傳入其中。
假設我們想要以 Pool
的 map
函數平行計算多個數值的平方,就可以這樣寫:
from multiprocessing import Pool # 計算數值平方的函數 def f(x): return x*x # 建立含有 4 個工作者行程的 Pool with Pool(processes=4) as p: # 以 map 平行計算數值的平方 print(p.map(f, [1, 2, 3]))
Pool
的 map
函數可以視為 Python 內建 map
函數的平行化版本,呼叫 Pool
的 map
函數可以將資料分割之後,再以 Pool
中的工作者行程平行處理,等到所有資料都處理完成之後,再一次傳回所有的結果。
Pool
的 map
函數有一個 chunksize
參數可以調整資料分割的大小,當資料列表的長度很長的時候,可以將 chunksize
設為較大的值,讓 map
每次切多一點資料給各個工作者行程,這樣可以減少資料傳遞的次數,提升程式執行效能。以下是比較不同 chunksize
對程式執行速度的範例程式:
from multiprocessing import Pool import time def f(x): return x*x with Pool(processes=4) as p: start = time.time() # 開始測量執行時間 # 設定 chunksize 為 1 result1 = p.map(f, range(1000000), chunksize=1) end = time.time() # 結束測量執行時間 print("chunksize 為 1,執行時間為 %f 秒" % (end - start)) start = time.time() # 開始測量執行時間 # 設定 chunksize 為 1000 result2 = p.map(f, range(1000000), chunksize=1000) end = time.time() # 結束測量執行時間 print("chunksize 為 1000,執行時間為 %f 秒" % (end - start)) if result1 == result2: print("結果相同")
chunksize 為 1,執行時間為 24.855215 秒 chunksize 為 1000,執行時間為 0.257618 秒 結果相同
調整 chunksize
並不會影響程式結構或計算結果,但是卻對執行時間影響很大。
imap
函數
imap
函數是 map
函數的可迭代(iterable)版本,其參數的用法皆與 map
函數相同:
from multiprocessing import Pool def f(x): return x*x with Pool(processes=4) as p: # 以 imap 平行計算數值的平方 for i in p.imap(f, [1, 2, 3]): print(i)
1 4 9
imap
的 chunksize
設定方式與概念皆與 map
相同:
from multiprocessing import Pool def f(x): return x*x with Pool(processes=4) as p: # 設定 chunksize 為 1000 for i in p.imap(f, range(1000000), chunksize=1000): print(i)
imap_unordered
函數
imap_unordered
函數跟 imap
函數非常類似,只不過 imap_unordered
所傳回的計算結果不會完全依照輸入資料的順序傳回,而是將先處理完成的資料先傳回,可增進整體程式的執行效率:
from multiprocessing import Pool def f(x): return x*x with Pool(processes=4) as p: # 以 imap_unordered 平行計算數值的平方 for i in p.imap_unordered(f, [1, 2, 3]): print(i)
1 9 4
imap_unordered
的 chunksize
設定方式也都相同。
以下是一個用來比較 imap_unordered
與 imap
差異的範例,imap_unordered
會將先處理完成的資料先傳回,而 imap
則是為了要保持原有的資料順序,所以會有潛在的等待時間,這就會造成程式執行效率上的差異:
from multiprocessing import Pool import time def f(x): time.sleep(x) with Pool(processes=4) as p: start = time.time() # 開始測量執行時間 # 以 imap 平行處理 for i in p.imap(f, [3, 2, 1]): time.sleep(1) end = time.time() # 結束測量執行時間 print("imap 執行時間為 %f 秒" % (end - start)) start = time.time() # 開始測量執行時間 # 以 imap_unordered 平行處理 for i in p.imap_unordered(f, [3, 2, 1]): time.sleep(1) end = time.time() # 結束測量執行時間 print("imap_unordered 執行時間為 %f 秒" % (end - start))
imap 執行時間為 6.007628 秒 imap_unordered 執行時間為 4.004911 秒
所以如果我們不在意計算結果的傳回順序,就可以採用效率較好的 imap_unordered
函數。
starmap
函數
starmap
函數可以用來處理多輸入參數的狀況,以下是每次傳遞兩個輸入的數值,平行計算總合的範例。
from multiprocessing import Pool # 計算總和 def f(x, y): return x+y with Pool(processes=4) as p: # 以 starmap 傳遞多個輸入參數資料 print(p.starmap(f, [(1, 2), (3, 4)]))
[3, 7]
apply_async
函數
apply_async
函數可以將指定的函數放進 Pool
資源池中執行,以下是平行執行多個函數的範例:
from multiprocessing import Pool import time def f1(x): time.sleep(1) return x*x def f2(x): time.sleep(1) return x*x*x with Pool(processes=4) as p: start = time.time() # 開始測量執行時間 # 平行呼叫多個函數 r1 = p.apply_async(f1, (2,)) r2 = p.apply_async(f2, (2,)) # 取得計算結果 print(r1.get(timeout=3)) print(r2.get(timeout=3)) end = time.time() # 結束測量執行時間 print("執行時間:%f 秒" % (end - start))
4 8 執行時間:1.002772 秒
map_async
函數
map_async
函數是 map
函數的非同步版本,可以將工作放進 Pool 資源池中執行,等執行完成後再一次取回:
from multiprocessing import Pool import time def f1(x): time.sleep(1) return x*x def f2(x): time.sleep(1) return x*x*x with Pool(processes=4) as p: start = time.time() # 開始測量執行時間 # 平行呼叫多個函數 r1 = p.map_async(f1, [1, 2, 3]) r2 = p.map_async(f2, [1, 2, 3]) # 取得計算結果 print(r1.get(timeout=3)) print(r2.get(timeout=3)) end = time.time() # 結束測量執行時間 print("執行時間:%f 秒" % (end - start))
[1, 4, 9] [1, 8, 27] 執行時間:2.003540 秒
starmap_async
函數
starmap_async
函數是 starmap
函數的非同步版本,可以將工作放進 Pool 資源池中執行,等執行完成後再一次取回:
from multiprocessing import Pool import time def f1(x, y): time.sleep(1) return x+y def f2(x, y): time.sleep(1) return x*y with Pool(processes=4) as p: start = time.time() # 開始測量執行時間 # 平行呼叫多個函數 r1 = p.starmap_async(f1, [(1, 2), (3, 4)]) r2 = p.starmap_async(f2, [(1, 2), (3, 4)]) # 取得計算結果 print(r1.get(timeout=3)) print(r2.get(timeout=3)) end = time.time() # 結束測量執行時間 print("執行時間:%f 秒" % (end - start))
[3, 7] [2, 12] 執行時間:1.002267 秒
Process
行程
multiprocessing
模組中的 Process
可以用來建立獨立的行程,平行處理不同的工作。
from multiprocessing import Process # 實際要執行的工作 def f(name): print('Hello', name) # 建立獨立行程 p = Process(target=f, args=('Bob',)) # 開始執行 p.start() # 原行程可處理其他工作... # 等待獨立行程結束 p.join()
Hello Bob
共享記憶體
如果需要在行程之間共享資料,可以使用 Value
或 Array
來儲存資料,讓多個行程可以同時存取。以下是透過 Value
與 Array
取回計算結果的範例。
from multiprocessing import Process, Value, Array def f(n, a): n.value = 1.24 for i in range(len(a)): a[i] = a[i] * 2 # 建立共享記憶體的 Value 與 Array num = Value('d', 0.0) arr = Array('i', range(10)) # 以獨立行程執行 p = Process(target=f, args=(num, arr)) p.start() p.join() # 取得計算結果 print(num.value) print(arr[:])
1.24 [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
如果有多個行程都會同時更改到共享記憶體中的資料,就必須搭配 Lock 鎖定記憶體中的資料之後再進行修改,才不會因為競爭條件(race condition)造成資料錯誤:
from multiprocessing import Process, Value def f(c): for i in range(1000): # 取得 Lock 之後更改內容 with counter.get_lock(): c.value += 1 # 建立共享記憶體的 Value counter = Value('i', 0) # 以獨立行程執行 p1 = Process(target=f, args=(counter,)) p2 = Process(target=f, args=(counter,)) p1.start() p2.start() p1.join() p2.join() # 取得計算結果 print(counter.value)
2000
Queue
Queue
是另外一種在行程之間遞送資料的方法,以下是一個簡單的使用範例。
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) # 建立共享佇列 q = Queue() p = Process(target=f, args=(q,)) p.start() # 從佇列取得資料 print(q.get()) p.join()
[42, None, 'hello']
Pipe
Pipe
可以讓兩個行程之間進行雙向的資料傳輸,以下是一個簡單的範例。
from multiprocessing import Process, Pipe def f(conn): # 透過 Pipe 傳送資料 conn.send([42, None, 'hello']) # 關閉 Pipe 連線 conn.close() # 建立可雙向傳輸的 Pipe parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() # 從 Pipe 取得資料 print(parent_conn.recv()) p.join()
[42, None, 'hello']
在預設的情況下,建立 Pipe
的時候所產生兩個 Connection
物件都具有發送與接收資料的能力(send
與 recv
函數)。