Python

Python 使用 multiprocessing 模組開發多核心平行運算程式教學與範例

介紹如何使用 Python 的 multiprocessing 模組,開發多核心平行運算程式。

Python 的 multiprocessing 是一個多行程模組,功能類似多執行緒的 threading 模組,可以讓開發者使用多核心的 CPU 進行平行化程式設計,加速程式處理速度。

Pool 資源池

Pool 是由多個工作者行程(worker process)所組成的資源池,可以將工作分配給其內部的各個工作者行程進行平行處理。

map 函數

Pool 最簡單的使用方式就是將資料以列表的形式打包好,透過其 map 函數將實際的工作處理函數與列表資料傳入其中。

假設我們想要以 Poolmap 函數平行計算多個數值的平方,就可以這樣寫:

from multiprocessing import Pool

# 計算數值平方的函數
def f(x):
    return x*x

# 建立含有 4 個工作者行程的 Pool
with Pool(processes=4) as p:
    # 以 map 平行計算數值的平方
    print(p.map(f, [1, 2, 3]))

Poolmap 函數可以視為 Python 內建 map 函數的平行化版本,呼叫 Poolmap 函數可以將資料分割之後,再以 Pool 中的工作者行程平行處理,等到所有資料都處理完成之後,再一次傳回所有的結果。

Poolmap 函數有一個 chunksize 參數可以調整資料分割的大小,當資料列表的長度很長的時候,可以將 chunksize 設為較大的值,讓 map 每次切多一點資料給各個工作者行程,這樣可以減少資料傳遞的次數,提升程式執行效能。以下是比較不同 chunksize 對程式執行速度的範例程式:

from multiprocessing import Pool
import time

def f(x):
    return x*x

with Pool(processes=4) as p:

    start = time.time() # 開始測量執行時間

    # 設定 chunksize 為 1
    result1 = p.map(f, range(1000000), chunksize=1)

    end = time.time() # 結束測量執行時間
    print("chunksize 為 1,執行時間為 %f 秒" % (end - start))


    start = time.time() # 開始測量執行時間

    # 設定 chunksize 為 1000
    result2 = p.map(f, range(1000000), chunksize=1000)

    end = time.time() # 結束測量執行時間
    print("chunksize 為 1000,執行時間為 %f 秒" % (end - start))

    if result1 == result2:
        print("結果相同")
chunksize 為 1,執行時間為 24.855215 秒
chunksize 為 1000,執行時間為 0.257618 秒
結果相同

調整 chunksize 並不會影響程式結構或計算結果,但是卻對執行時間影響很大。

imap 函數

imap 函數是 map 函數的可迭代(iterable)版本,其參數的用法皆與 map 函數相同:

from multiprocessing import Pool

def f(x):
    return x*x

with Pool(processes=4) as p:
    # 以 imap 平行計算數值的平方
    for i in p.imap(f, [1, 2, 3]):
        print(i)
1
4
9

imapchunksize 設定方式與概念皆與 map 相同:

from multiprocessing import Pool

def f(x):
    return x*x

with Pool(processes=4) as p:
    # 設定 chunksize 為 1000
    for i in p.imap(f, range(1000000), chunksize=1000):
        print(i)

imap_unordered 函數

imap_unordered 函數跟 imap 函數非常類似,只不過 imap_unordered 所傳回的計算結果不會完全依照輸入資料的順序傳回,而是將先處理完成的資料先傳回,可增進整體程式的執行效率:

from multiprocessing import Pool

def f(x):
    return x*x

with Pool(processes=4) as p:
    # 以 imap_unordered 平行計算數值的平方
    for i in p.imap_unordered(f, [1, 2, 3]):
        print(i)
1
9
4

imap_unorderedchunksize 設定方式也都相同。

以下是一個用來比較 imap_unorderedimap 差異的範例,imap_unordered 會將先處理完成的資料先傳回,而 imap 則是為了要保持原有的資料順序,所以會有潛在的等待時間,這就會造成程式執行效率上的差異:

from multiprocessing import Pool
import time

def f(x):
    time.sleep(x)

with Pool(processes=4) as p:

    start = time.time() # 開始測量執行時間

    # 以 imap 平行處理
    for i in p.imap(f, [3, 2, 1]):
        time.sleep(1)

    end = time.time() # 結束測量執行時間
    print("imap 執行時間為 %f 秒" % (end - start))


    start = time.time() # 開始測量執行時間

    # 以 imap_unordered 平行處理
    for i in p.imap_unordered(f, [3, 2, 1]):
        time.sleep(1)

    end = time.time() # 結束測量執行時間
    print("imap_unordered 執行時間為 %f 秒" % (end - start))
imap 執行時間為 6.007628 秒
imap_unordered 執行時間為 4.004911 秒

所以如果我們不在意計算結果的傳回順序,就可以採用效率較好的 imap_unordered 函數。

starmap 函數

starmap 函數可以用來處理多輸入參數的狀況,以下是每次傳遞兩個輸入的數值,平行計算總合的範例。

from multiprocessing import Pool

# 計算總和
def f(x, y):
    return x+y

with Pool(processes=4) as p:
    # 以 starmap 傳遞多個輸入參數資料
    print(p.starmap(f, [(1, 2), (3, 4)]))
[3, 7]

apply_async 函數

apply_async 函數可以將指定的函數放進 Pool 資源池中執行,以下是平行執行多個函數的範例:

from multiprocessing import Pool
import time

def f1(x):
    time.sleep(1)
    return x*x

def f2(x):
    time.sleep(1)
    return x*x*x

with Pool(processes=4) as p:

    start = time.time() # 開始測量執行時間

    # 平行呼叫多個函數
    r1 = p.apply_async(f1, (2,))
    r2 = p.apply_async(f2, (2,))

    # 取得計算結果
    print(r1.get(timeout=3))
    print(r2.get(timeout=3))

    end = time.time() # 結束測量執行時間
    print("執行時間:%f 秒" % (end - start))
4
8
執行時間:1.002772 秒

map_async 函數

map_async 函數是 map 函數的非同步版本,可以將工作放進 Pool 資源池中執行,等執行完成後再一次取回:

from multiprocessing import Pool
import time

def f1(x):
    time.sleep(1)
    return x*x

def f2(x):
    time.sleep(1)
    return x*x*x

with Pool(processes=4) as p:

    start = time.time() # 開始測量執行時間

    # 平行呼叫多個函數
    r1 = p.map_async(f1, [1, 2, 3])
    r2 = p.map_async(f2, [1, 2, 3])

    # 取得計算結果
    print(r1.get(timeout=3))
    print(r2.get(timeout=3))

    end = time.time() # 結束測量執行時間
    print("執行時間:%f 秒" % (end - start))
[1, 4, 9]
[1, 8, 27]
執行時間:2.003540 秒

starmap_async 函數

starmap_async 函數是 starmap 函數的非同步版本,可以將工作放進 Pool 資源池中執行,等執行完成後再一次取回:

from multiprocessing import Pool
import time

def f1(x, y):
    time.sleep(1)
    return x+y

def f2(x, y):
    time.sleep(1)
    return x*y

with Pool(processes=4) as p:

    start = time.time() # 開始測量執行時間

    # 平行呼叫多個函數
    r1 = p.starmap_async(f1, [(1, 2), (3, 4)])
    r2 = p.starmap_async(f2, [(1, 2), (3, 4)])

    # 取得計算結果
    print(r1.get(timeout=3))
    print(r2.get(timeout=3))

    end = time.time() # 結束測量執行時間
    print("執行時間:%f 秒" % (end - start))
[3, 7]
[2, 12]
執行時間:1.002267 秒

Process 行程

multiprocessing 模組中的 Process 可以用來建立獨立的行程,平行處理不同的工作。

from multiprocessing import Process

# 實際要執行的工作
def f(name):
    print('Hello', name)

# 建立獨立行程
p = Process(target=f, args=('Bob',))

# 開始執行
p.start()

# 原行程可處理其他工作...

# 等待獨立行程結束
p.join()
Hello Bob

共享記憶體

如果需要在行程之間共享資料,可以使用 ValueArray 來儲存資料,讓多個行程可以同時存取。以下是透過 ValueArray 取回計算結果的範例。

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 1.24
    for i in range(len(a)):
        a[i] = a[i] * 2

# 建立共享記憶體的 Value 與 Array
num = Value('d', 0.0)
arr = Array('i', range(10))

# 以獨立行程執行
p = Process(target=f, args=(num, arr))
p.start()
p.join()

# 取得計算結果
print(num.value)
print(arr[:])
1.24
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

如果有多個行程都會同時更改到共享記憶體中的資料,就必須搭配 Lock 鎖定記憶體中的資料之後再進行修改,才不會因為競爭條件(race condition)造成資料錯誤:

from multiprocessing import Process, Value

def f(c):
    for i in range(1000):
        # 取得 Lock 之後更改內容
        with counter.get_lock():
            c.value += 1

# 建立共享記憶體的 Value
counter = Value('i', 0)

# 以獨立行程執行
p1 = Process(target=f, args=(counter,))
p2 = Process(target=f, args=(counter,))
p1.start()
p2.start()
p1.join()
p2.join()

# 取得計算結果
print(counter.value)
2000

Queue

Queue 是另外一種在行程之間遞送資料的方法,以下是一個簡單的使用範例。

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

# 建立共享佇列
q = Queue()

p = Process(target=f, args=(q,))
p.start()

# 從佇列取得資料
print(q.get())

p.join()
[42, None, 'hello']

Pipe

Pipe 可以讓兩個行程之間進行雙向的資料傳輸,以下是一個簡單的範例。

from multiprocessing import Process, Pipe

def f(conn):
    # 透過 Pipe 傳送資料
    conn.send([42, None, 'hello'])

    # 關閉 Pipe 連線
    conn.close()

# 建立可雙向傳輸的 Pipe
parent_conn, child_conn = Pipe()

p = Process(target=f, args=(child_conn,))
p.start()

# 從 Pipe 取得資料
print(parent_conn.recv())

p.join()
[42, None, 'hello']

在預設的情況下,建立 Pipe 的時候所產生兩個 Connection 物件都具有發送與接收資料的能力(sendrecv 函數)。

參考資料

Share
Published by
Office Guide

Recent Posts

Python 使用 PyAutoGUI 自動操作滑鼠與鍵盤

本篇介紹如何在 Python ...

1 年 ago

Ubuntu Linux 以 WireGuard 架設 VPN 伺服器教學與範例

本篇介紹如何在 Ubuntu ...

1 年 ago

Linux 網路設定 ip 指令用法教學與範例

本篇介紹如何在 Linux 系...

1 年 ago

Linux 以 Cryptsetup、LUKS 加密 USB 隨身碟教學與範例

介紹如何在 Linux 系統中...

1 年 ago