Categories: Python

Celery 分散式工作佇列排程系統入門教學(二)

介紹 Celery 分散式工作佇列排程系統細部功能的使用方式與範例。

模組檔案結構

建立一個 proj 模組,檔案結構如下:

proj/__init__.py
    /celery.py
    /tasks.py

其中 celery.py 的內容如下:

from celery import Celery

# 建立 Celery Application
app = Celery(
    'proj',                       # 以 __main__ 執行時的名稱
    broker='redis://localhost/',  # Broker 位址
    backend='redis://localhost/', # Backend 位址
    include=['proj.tasks']        # 自動引入的模組
)

# Celery 設定(可自行調整)
app.conf.update(
    result_expires=3600, # 計算結果保存時間
)

if __name__ == '__main__':
    app.start()

broker 參數可設定 broker 的位置,backend 參數可設定儲存計算結果的位置,可用的 broker 與 backend 可參考 Celery 的 Backends and Brokers 文件

接著建立 tasks.py,此檔案中包含了各種自行定義的工作函數,內容如下:

from .celery import app

# 自訂相加工作函數
@app.task
def add(x, y):
    return x + y

# 自訂相乘工作函數
@app.task
def mul(x, y):
    return x * y

# 自訂加總工作函數
@app.task
def xsum(numbers):
    return sum(numbers)

若想要忽略個別工作的計算結果,可以使用 @app.task(ignore_result=True) 這個選項。

執行 Celery Worker

建立好 proj 模組之後,我們可以用手動的方式執行 Celery 的 worker,並加上 -A--app 參數將 application 的模組名稱指定為 proj。在開發與測試階段,可以用 -l--loglevel 參數指定輸出記錄的層級為 INFODEBUG,獲取更詳細的資訊。

# 手動執行 Celery Worker
celery -A proj worker --loglevel=INFO
 -------------- celery@imgqc v5.2.3 (dawn-chorus)
--- ***** -----
-- ******* ---- Linux-5.4.0-94-generic-x86_64-with-glibc2.29 2022-01-22 11:17:15
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         proj:0x7f83f9903a00
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . proj.tasks.add
  . proj.tasks.mul
  . proj.tasks.xsum

[2022-01-22 11:17:15,431: INFO/MainProcess] Connected to redis://localhost:6379//
[2022-01-22 11:17:15,433: INFO/MainProcess] mingle: searching for neighbors
[2022-01-22 11:17:16,439: INFO/MainProcess] mingle: all alone
[2022-01-22 11:17:16,447: INFO/MainProcess] celery@imgqc ready.

這裡的 transport 就是指 broker 的設定,我們也可以在執行 celery 指令時,以 -b--broker 參數更改 broker 的設定;而 results 則是 backend 的設定,可用 --result-backend 參數更改。

concurrency 為 prefork 的 worker 行程數量,當所有的 worker 都處於忙碌狀態時,新增的工作就被需等待現有的工作被執行完之後,才能由空出的 worker 來執行。

預設的 concurrency 值為機器的 CPU 核心數量,我們也可以透過 celery 指令的 -c--concurrency 參數來更改此設定值。concurrency 的值要設為多少並沒有絕對的標準,但對於 I/O-bound 的問題,可以嘗試提高 concurrency 的值,而根據實務上的經驗,若 concurrency 的值超過兩倍的 CPU 核心數量,效能通常不會比較好。

Celery 也支援 Eventlet 與 Gevent,詳細說明可參考 Celery 的 Concurrency 文件

task events 是 Celery 用來傳送工作事件訊息的功能選項,主要用於監控工作的處理狀態(例如 celery eventsFlower),詳細說明可參考 Celery 的 Monitoring and Management Guide 文件

queues 則是 worker 取得工作的來源佇列,一個 worker 可以同時從多個來源佇列取得工作,透過佇列我們可以調配工作的配送方式,關於佇列的詳細操作與功能可以參考 Celery 的 Routing Tasks 文件

若要停止 Celery worker 的執行,可按下 Ctrl + c

背景執行 Celery Worker

在正式服務的環境中,通常會以背景執行的方式啟動 Celery worker,這部分可以參考 Celery 的 Daemonization 文件

在 Celery 的 daemon 指令稿中,主要都是透過 celery multi 指令來控制背景執行的 Celery worker,以下是一些基本的指令綱要:

# 啟動背景執行 Celery Worker
celery multi start w1 -A proj --loglevel=INFO \
  --pidfile=celery-%n.pid --logfile=celery-%n%I.log

# 重新啟動背景執行的 Celery Worker
celery multi restart w1 -A proj --loglevel=INFO \
  --pidfile=celery-%n.pid --logfile=celery-%n%I.log

# 停止背景執行的 Celery Worker(非同步)
celery multi stop w1 -A proj --loglevel=INFO \
  --pidfile=celery-%n.pid --logfile=celery-%n%I.log

# 停止背景執行的 Celery Worker(同步)
celery multi stopwait w1 -A proj --loglevel=INFO \
  --pidfile=celery-%n.pid --logfile=celery-%n%I.log

這裡的 multi stop 是非同步的呼叫,執行之後會馬上返回,若要等待 Celery worker 完全停止再返回,則可改用 multi stopwait 這個同步的版本。

--pidfile--logfile 參數分別用於指定 PID 檔案與記錄檔的位置,若在正式服務環境,應該要建立標準的系統目錄,放在系統目錄之下,標準的路徑通常是 /var/run/celery/%n.pid/var/log/celery/%n%I.log
celery multi 也可以一次啟動多個背景執行的 workers:

# 啟動 10 個背景執行的 Workers:
#   * 第 1-3 個 Workers 負責 images 與 video 佇列處理
#   * 第 4,5 個 Workers 負責 data 佇列處理,並設定記錄輸出層級為 DEBUG
#   * 其餘 Workers 皆負責 default 佇列處理
celery multi start 10 -l INFO -Q:1-3 images,video -Q:4,5 data
    -Q default -L:4,5 DEBUG

關於背景執行 worker 的說明,請參考 Celery 的 celery.bin.multi 文件

指定 Celery Application

celery 指令的 -A--app 參數可以用來指定 Celery 所採用的 application 實體,標準格式為 module.path:attribute

除了標準格式之外,也可以採用簡略的格式,僅指明模組名稱。假設採用 --app=proj 參數,則 celery 會依據以下規則與順序尋找 application 實體:

  1. proj.app 屬性。
  2. proj.celery 屬性。
  3. proj 模組中任何 Celery application 類型的屬性。
  4. proj.celery.app 屬性。
  5. proj.celery.celery 屬性。
  6. proj.celery 模組中任何 Celery application 類型的屬性。

派送工作

若要派送工作至 Celery 的 worker,最簡單的方式就是使用 delay() 函數:

from proj.tasks import add

# 派送 add(2, 2) 工作至 Celery Worker
add.delay(2, 2)

delay() 函數是比較簡略的派送工作方式,若需要更進階的功能,可以改用 apply_async() 函數,上面的 delay() 函數呼叫就相當於執行以下這一行 apply_async() 函數呼叫:

# 派送 add(2, 2) 工作至 Celery Worker
add.apply_async((2, 2))

apply_async() 函數可以指定額外的參數,例如派送的佇列、預計執行時間(用於顯示剩餘時間):

# 派送至 lopri 佇列,預計執行 10 秒
add.apply_async((2, 2), queue='lopri', countdown=10)

若直接呼叫工作函數,則會在目前的行程中直接進行計算:

# 直接於目前行程中進行計算
add(2, 2)

關於工作的派送,可以參考 Celery 的 Calling Tasks 文件

每項工作在派送時都會自動產生一組唯一的 UUID 代碼,作為工作識別碼(task id)。dealy()apply_async() 函數傳回一個 AsyncResult 物件,在啟用 backend 的情況下,可透過此 AsyncResult 物件追蹤工作處理狀態。

由於不同的應用情境適合採用不同的 backend,甚至許多狀況下完全不需要 backend,因此在預設的情況下 backend 是停用的,另外對於工作與 worker 的監控,Celery 有專門的事件訊息(可參考 Celery 的 Monitoring and Management Guide 文件),並不需要使用到 backend。

在有設置 backend 的環境下,就可以透過 AsyncResult 物件取回計算結果:

# 派送 add(2, 2) 工作至 Celery Worker
res = add.delay(2, 2)

# 取得計算結果
print(res.get(timeout=1))
4

我們也可以經由 id 屬性查詢工作的 UUID 識別碼:

# 查詢工作的 UUID 識別碼
print(res.id)
b21dc45b-e9b8-42ce-9194-f7f1fbcdc5b4

若有例外(exception)發生時,也可以透過 AsyncResult 物件的 get() 函數查看 traceback:

# 產生錯誤的工作
res = add.delay(2, '2')

# 查看例外與 traceback
print(res.get(timeout=1))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/ubuntu/tmp/celery/lib/python3.8/site-packages/celery/result.py", line 224, in get
    return self.backend.wait_for_pending(
  File "/home/ubuntu/tmp/celery/lib/python3.8/site-packages/celery/backends/asynchronous.py", line 223, in wait_for_pending
    return result.maybe_throw(callback=callback, propagate=propagate)
  File "/home/ubuntu/tmp/celery/lib/python3.8/site-packages/celery/result.py", line 336, in maybe_throw
    self.throw(value, self._to_remote_traceback(tb))
  File "/home/ubuntu/tmp/celery/lib/python3.8/site-packages/celery/result.py", line 329, in throw
    self.on_ready.throw(*args, **kwargs)
  File "/home/ubuntu/tmp/celery/lib/python3.8/site-packages/vine/promises.py", line 234, in throw
    reraise(type(exc), exc, tb)
  File "/home/ubuntu/tmp/celery/lib/python3.8/site-packages/vine/utils.py", line 30, in reraise
    raise value
TypeError: unsupported operand type(s) for +: 'int' and 'str'

若不想將例外傳遞出來,可以加上 propagate=False 參數,僅由函數傳回例外物件:

# 不傳遞例外
print(res.get(propagate=False))
unsupported operand type(s) for +: 'int' and 'str'

在不傳遞例外的狀況下,可由 AsyncResult 物件的 failed()successful() 函數來判斷執行是否成功:

# 執行是否失敗
print(res.failed())
True
# 執行是否成功
print(res.successful())
False

或是由 state 屬性查看工作狀態:

# 工作執行狀態
print(res.state)
FAILURE

工作在單一時間點僅會有一種狀態,正常的工作狀態的轉換為:

PENDING -> STARTED -> SUCCESS

STARTED 狀態只有在啟用 task_track_started 選項,或是工作函數加上 @task(track_started=True) 選項時才會產生,在預設的狀態之下,只會出現 PENDINGSUCCESS 兩種狀態。

PENDING 實際上只是一種預設的狀態,對於所有無法得知狀態的工作,預設都是 PENDING

from proj.celery import app

# 不存在的工作
res = app.AsyncResult('this-id-does-not-exist')

# 顯示工作狀態
print(res.state)
PENDING

若工作遇到例外狀況,並且會執行重試的動作,則狀態的變化就會類似這樣:

PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

關於工作的狀態,可以參考 Celery 的 Tasks 文件

Canvas 設計工作流程

Signature

除了直接以 delay()apply_async() 派送工作之外,我們也可以使用 signature() 將函數以及傳入的參數包裝起來,當成參數傳遞給其他函數或行程,在其他地方進行呼叫:

# 包裝函數呼叫
s1 = add.signature((2, 2), countdown=10)
res = s1.delay()

對於單純的函數呼叫,也可以使用簡略版的包裝方式:

# 包裝函數呼叫
s2 = add.s(2, 2)
res = s2.delay()

在使用 signature() 包裝函數呼叫時,也可以只包裝部分傳入的參數,其餘的參數由呼叫者自行輸入:

# 包裝 add(?, 2) 函數呼叫
s3 = add.s(2)

# 呼叫 add(8, 2)
res = s3.delay(8)

各種參數也可以在呼叫時再加上去,或是覆蓋原先的設定:

# 包裝函數呼叫
s3 = add.s(2, 2, debug=True)

# 覆寫參數設定
s3.delay(debug=False)

Group

Celery 的 group() 含數可以將多個函數呼叫的 signature 包裝起來,以平行的方式執行:

from celery import group
from proj.tasks import add

# 建立 Group
my_group = group([add.s(2, 2), add.s(4, 4)])

# 執行 Group
res = my_group()

# 取得執行結果
print(res.get())
[4, 8]

group() 也支援部分參數的包裝與呼叫:

# 建立 Group
my_group = group([add.s(2), add.s(4)])

# 執行 Group
res = my_group(10)

# 取得執行結果
print(res.get())
[12, 14]

Chain

Celery 的 group() 含數可以將多個函數呼叫的 signature 串接起來,類似 Unix 管線的方式執行多個工作,同時亦支援資料傳遞:

from celery import chain
from proj.tasks import add, mul

# 建立 Chain,計算 (4 + 4) * 8
my_chain = chain(add.s(4, 4) | mul.s(8))

# 執行 Chain
res = my_chain()

# 取得執行結果
print(res.get())
64

chain() 也支援部分參數的包裝與呼叫:

# 建立 Chain,計算 (? + 4) * 8
my_chain = chain(add.s(4) | mul.s(8))

# 執行 Chain
res = my_chain(4)

# 取得執行結果
print(res.get())

我們也可以使用以下這樣的簡略寫法來建立一條 chain 並執行:

# Chain 簡略寫法
(add.s(4, 4) | mul.s(8))().get()
64

Chord

chord 就是一個含有回呼函數(callback function)的 group:

from celery import chord
from proj.tasks import add, xsum

# 建立 Chord
my_chord = chord([add.s(2, 2), add.s(4, 4)], xsum.s())

# 執行 Chord
res = my_chord()

# 取得執行結果
print(res.get())
12

當一個 group 串接另一個工作函數時,也會會自動轉為 chord:

# 建立 Chord
my_chord = group([add.s(2, 2), add.s(4, 4)]) | xsum.s()

# 執行 Chord
res = my_chord()

# 取得執行結果
print(res.get())
12

我們可以利用 signature 的語法任意組合出各種處理流程,例如:

upload_document.s(file) | group(apply_filter.s() for filter in filters)

關於 Canvas 工作流程的語法,可以參考 Celery 的 Canvas: Designing Work-flows 文件

路由

Celery 支援所有 AMQP 所提供的路由功能,除此之外亦可使用簡易的具名佇列來控制路由。

task_routes 可以用來設定工作的路由:

app.conf.update(
    # 設定工作路由
    task_routes = {
        'proj.tasks.add': {'queue': 'hipri'},
    },
)

或是在派送工作時動態指定佇列:

# 動態指定佇列
add.apply_async((2, 2), queue='hipri')

而在執行 Celery worker 時可以使用 celery 指令的 -Q--queues 參數來指定工作來源佇列:

# 從 hipri 佇列取得工作
celery -A proj worker -Q hipri

亦可同時從多個佇列獲取新工作,例如同時從 hipri 與預設的 celery 這兩個佇列獲取工作:

# 從 hipri 佇列取得工作
celery -A proj worker -Q hipri,celery

在指定多個佇列時,佇列的先後順序並不會影響取得工作的優先度。關於路由的方式,可以參考 Celery 的 Routing Tasks 文件

遠端控制

若採用 RabbitMQ(AMQP)、Radis 或 Qpid 作為 broker,則可在執行期間動態監控 worker 的狀態。

例如我們可以查看 worker 目前正在執行的工作:

# 查看 Worker 目前正在執行的工作
celery -A proj inspect active

這裡 Celery 會以廣播的方式將訊息傳送給所有的 workers。

若只想對特定的 worker 進行查詢,可以使用 --destination 參數指定 worker:

# 查看 celery@example.com 這個 Worker 目前正在執行的工作
celery -A proj inspect active --destination=celery@example.com

celeryinspect 指令包含了許多查詢功能,若要查詢詳細用法可以加上 --help 參數顯示說明:

# 查詢 celery inspect 指令用法
celery -A proj inspect --help

celerycontrol 指令則是可以對 worker 進行操控:

# 查詢 celery control 指令用法
celery -A proj control --help

例如若要啟用事件訊息功能(用於監看 worker 與工作),可以執行:

# 啟用事件訊息功能
celery -A proj control enable_events

當事件訊息功能啟用之後,可以執行以下指令傾印 worker 的事件:

# 傾印 worker 的事件
celery -A proj events --dump

或是以 curses 介面查看:

# 查看 worker 的事件
celery -A proj events

當查看完畢之後,可再將事件訊息功能停用:

# 停用事件訊息功能
celery -A proj control disable_events

時區

Celery 所有內部的日期與時間資訊都是採 UTC 時區,而當 worker 接收到訊息時,會轉為當地時間,若想要採用跟系統不一樣的時區設定,可以調整 timezone 設定:

# 更改時區設定
app.conf.timezone = 'Asia/Taipei'

參考資料

Share
Published by
Office Guide

Recent Posts

Python 使用 PyAutoGUI 自動操作滑鼠與鍵盤

本篇介紹如何在 Python ...

1 年 ago

Ubuntu Linux 以 WireGuard 架設 VPN 伺服器教學與範例

本篇介紹如何在 Ubuntu ...

1 年 ago

Linux 網路設定 ip 指令用法教學與範例

本篇介紹如何在 Linux 系...

1 年 ago

Linux 以 Cryptsetup、LUKS 加密 USB 隨身碟教學與範例

介紹如何在 Linux 系統中...

1 年 ago