介紹 Celery 分散式工作佇列排程系統細部功能的使用方式與範例。
模組檔案結構
建立一個 proj
模組,檔案結構如下:
proj/__init__.py /celery.py /tasks.py
其中 celery.py
的內容如下:
from celery import Celery # 建立 Celery Application app = Celery( 'proj', # 以 __main__ 執行時的名稱 broker='redis://localhost/', # Broker 位址 backend='redis://localhost/', # Backend 位址 include=['proj.tasks'] # 自動引入的模組 ) # Celery 設定(可自行調整) app.conf.update( result_expires=3600, # 計算結果保存時間 ) if __name__ == '__main__': app.start()
broker
參數可設定 broker 的位置,backend
參數可設定儲存計算結果的位置,可用的 broker 與 backend 可參考 Celery 的 Backends and Brokers 文件。
接著建立 tasks.py
,此檔案中包含了各種自行定義的工作函數,內容如下:
from .celery import app # 自訂相加工作函數 @app.task def add(x, y): return x + y # 自訂相乘工作函數 @app.task def mul(x, y): return x * y # 自訂加總工作函數 @app.task def xsum(numbers): return sum(numbers)
若想要忽略個別工作的計算結果,可以使用 @app.task(ignore_result=True)
這個選項。
執行 Celery Worker
建立好 proj
模組之後,我們可以用手動的方式執行 Celery 的 worker,並加上 -A
或 --app
參數將 application 的模組名稱指定為 proj
。在開發與測試階段,可以用 -l
或 --loglevel
參數指定輸出記錄的層級為 INFO
或 DEBUG
,獲取更詳細的資訊。
# 手動執行 Celery Worker celery -A proj worker --loglevel=INFO
-------------- celery@imgqc v5.2.3 (dawn-chorus) --- ***** ----- -- ******* ---- Linux-5.4.0-94-generic-x86_64-with-glibc2.29 2022-01-22 11:17:15 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: proj:0x7f83f9903a00 - ** ---------- .> transport: redis://localhost:6379// - ** ---------- .> results: redis://localhost/ - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . proj.tasks.add . proj.tasks.mul . proj.tasks.xsum [2022-01-22 11:17:15,431: INFO/MainProcess] Connected to redis://localhost:6379// [2022-01-22 11:17:15,433: INFO/MainProcess] mingle: searching for neighbors [2022-01-22 11:17:16,439: INFO/MainProcess] mingle: all alone [2022-01-22 11:17:16,447: INFO/MainProcess] celery@imgqc ready.
這裡的 transport
就是指 broker 的設定,我們也可以在執行 celery
指令時,以 -b
或 --broker
參數更改 broker 的設定;而 results
則是 backend 的設定,可用 --result-backend
參數更改。
concurrency
為 prefork 的 worker 行程數量,當所有的 worker 都處於忙碌狀態時,新增的工作就被需等待現有的工作被執行完之後,才能由空出的 worker 來執行。
預設的 concurrency
值為機器的 CPU 核心數量,我們也可以透過 celery
指令的 -c
或 --concurrency
參數來更改此設定值。concurrency
的值要設為多少並沒有絕對的標準,但對於 I/O-bound 的問題,可以嘗試提高 concurrency
的值,而根據實務上的經驗,若 concurrency
的值超過兩倍的 CPU 核心數量,效能通常不會比較好。
Celery 也支援 Eventlet 與 Gevent,詳細說明可參考 Celery 的 Concurrency 文件。
task events
是 Celery 用來傳送工作事件訊息的功能選項,主要用於監控工作的處理狀態(例如 celery events
或 Flower),詳細說明可參考 Celery 的 Monitoring and Management Guide 文件。
queues
則是 worker 取得工作的來源佇列,一個 worker 可以同時從多個來源佇列取得工作,透過佇列我們可以調配工作的配送方式,關於佇列的詳細操作與功能可以參考 Celery 的 Routing Tasks 文件。
若要停止 Celery worker 的執行,可按下 Ctrl + c。
背景執行 Celery Worker
在正式服務的環境中,通常會以背景執行的方式啟動 Celery worker,這部分可以參考 Celery 的 Daemonization 文件。
在 Celery 的 daemon 指令稿中,主要都是透過 celery multi
指令來控制背景執行的 Celery worker,以下是一些基本的指令綱要:
# 啟動背景執行 Celery Worker celery multi start w1 -A proj --loglevel=INFO \ --pidfile=celery-%n.pid --logfile=celery-%n%I.log # 重新啟動背景執行的 Celery Worker celery multi restart w1 -A proj --loglevel=INFO \ --pidfile=celery-%n.pid --logfile=celery-%n%I.log # 停止背景執行的 Celery Worker(非同步) celery multi stop w1 -A proj --loglevel=INFO \ --pidfile=celery-%n.pid --logfile=celery-%n%I.log # 停止背景執行的 Celery Worker(同步) celery multi stopwait w1 -A proj --loglevel=INFO \ --pidfile=celery-%n.pid --logfile=celery-%n%I.log
這裡的 multi stop
是非同步的呼叫,執行之後會馬上返回,若要等待 Celery worker 完全停止再返回,則可改用 multi stopwait
這個同步的版本。
--pidfile
與 --logfile
參數分別用於指定 PID 檔案與記錄檔的位置,若在正式服務環境,應該要建立標準的系統目錄,放在系統目錄之下,標準的路徑通常是 /var/run/celery/%n.pid
與 /var/log/celery/%n%I.log
。
celery multi
也可以一次啟動多個背景執行的 workers:
# 啟動 10 個背景執行的 Workers: # * 第 1-3 個 Workers 負責 images 與 video 佇列處理 # * 第 4,5 個 Workers 負責 data 佇列處理,並設定記錄輸出層級為 DEBUG # * 其餘 Workers 皆負責 default 佇列處理 celery multi start 10 -l INFO -Q:1-3 images,video -Q:4,5 data -Q default -L:4,5 DEBUG
關於背景執行 worker 的說明,請參考 Celery 的 celery.bin.multi
文件。
指定 Celery Application
celery
指令的 -A
或 --app
參數可以用來指定 Celery 所採用的 application 實體,標準格式為 module.path:attribute
。
除了標準格式之外,也可以採用簡略的格式,僅指明模組名稱。假設採用 --app=proj
參數,則 celery
會依據以下規則與順序尋找 application 實體:
proj.app
屬性。proj.celery
屬性。proj
模組中任何 Celery application 類型的屬性。proj.celery.app
屬性。proj.celery.celery
屬性。proj.celery
模組中任何 Celery application 類型的屬性。
派送工作
若要派送工作至 Celery 的 worker,最簡單的方式就是使用 delay()
函數:
from proj.tasks import add # 派送 add(2, 2) 工作至 Celery Worker add.delay(2, 2)
delay()
函數是比較簡略的派送工作方式,若需要更進階的功能,可以改用 apply_async()
函數,上面的 delay()
函數呼叫就相當於執行以下這一行 apply_async()
函數呼叫:
# 派送 add(2, 2) 工作至 Celery Worker add.apply_async((2, 2))
apply_async()
函數可以指定額外的參數,例如派送的佇列、預計執行時間(用於顯示剩餘時間):
# 派送至 lopri 佇列,預計執行 10 秒 add.apply_async((2, 2), queue='lopri', countdown=10)
若直接呼叫工作函數,則會在目前的行程中直接進行計算:
# 直接於目前行程中進行計算 add(2, 2)
關於工作的派送,可以參考 Celery 的 Calling Tasks 文件。
每項工作在派送時都會自動產生一組唯一的 UUID 代碼,作為工作識別碼(task id)。dealy()
與 apply_async()
函數傳回一個 AsyncResult
物件,在啟用 backend 的情況下,可透過此 AsyncResult
物件追蹤工作處理狀態。
由於不同的應用情境適合採用不同的 backend,甚至許多狀況下完全不需要 backend,因此在預設的情況下 backend 是停用的,另外對於工作與 worker 的監控,Celery 有專門的事件訊息(可參考 Celery 的 Monitoring and Management Guide 文件),並不需要使用到 backend。
在有設置 backend 的環境下,就可以透過 AsyncResult
物件取回計算結果:
# 派送 add(2, 2) 工作至 Celery Worker res = add.delay(2, 2) # 取得計算結果 print(res.get(timeout=1))
4
我們也可以經由 id
屬性查詢工作的 UUID 識別碼:
# 查詢工作的 UUID 識別碼 print(res.id)
b21dc45b-e9b8-42ce-9194-f7f1fbcdc5b4
若有例外(exception)發生時,也可以透過 AsyncResult
物件的 get()
函數查看 traceback:
# 產生錯誤的工作 res = add.delay(2, '2') # 查看例外與 traceback print(res.get(timeout=1))
Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/home/ubuntu/tmp/celery/lib/python3.8/site-packages/celery/result.py", line 224, in get return self.backend.wait_for_pending( File "/home/ubuntu/tmp/celery/lib/python3.8/site-packages/celery/backends/asynchronous.py", line 223, in wait_for_pending return result.maybe_throw(callback=callback, propagate=propagate) File "/home/ubuntu/tmp/celery/lib/python3.8/site-packages/celery/result.py", line 336, in maybe_throw self.throw(value, self._to_remote_traceback(tb)) File "/home/ubuntu/tmp/celery/lib/python3.8/site-packages/celery/result.py", line 329, in throw self.on_ready.throw(*args, **kwargs) File "/home/ubuntu/tmp/celery/lib/python3.8/site-packages/vine/promises.py", line 234, in throw reraise(type(exc), exc, tb) File "/home/ubuntu/tmp/celery/lib/python3.8/site-packages/vine/utils.py", line 30, in reraise raise value TypeError: unsupported operand type(s) for +: 'int' and 'str'
若不想將例外傳遞出來,可以加上 propagate=False
參數,僅由函數傳回例外物件:
# 不傳遞例外 print(res.get(propagate=False))
unsupported operand type(s) for +: 'int' and 'str'
在不傳遞例外的狀況下,可由 AsyncResult
物件的 failed()
或 successful()
函數來判斷執行是否成功:
# 執行是否失敗 print(res.failed())
True
# 執行是否成功 print(res.successful())
False
或是由 state
屬性查看工作狀態:
# 工作執行狀態 print(res.state)
FAILURE
工作在單一時間點僅會有一種狀態,正常的工作狀態的轉換為:
PENDING -> STARTED -> SUCCESS
STARTED
狀態只有在啟用 task_track_started
選項,或是工作函數加上 @task(track_started=True)
選項時才會產生,在預設的狀態之下,只會出現 PENDING
與 SUCCESS
兩種狀態。
PENDING
實際上只是一種預設的狀態,對於所有無法得知狀態的工作,預設都是 PENDING
:
from proj.celery import app # 不存在的工作 res = app.AsyncResult('this-id-does-not-exist') # 顯示工作狀態 print(res.state)
PENDING
若工作遇到例外狀況,並且會執行重試的動作,則狀態的變化就會類似這樣:
PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS
關於工作的狀態,可以參考 Celery 的 Tasks 文件。
Canvas 設計工作流程
Signature
除了直接以 delay()
或 apply_async()
派送工作之外,我們也可以使用 signature()
將函數以及傳入的參數包裝起來,當成參數傳遞給其他函數或行程,在其他地方進行呼叫:
# 包裝函數呼叫 s1 = add.signature((2, 2), countdown=10) res = s1.delay()
對於單純的函數呼叫,也可以使用簡略版的包裝方式:
# 包裝函數呼叫 s2 = add.s(2, 2) res = s2.delay()
在使用 signature()
包裝函數呼叫時,也可以只包裝部分傳入的參數,其餘的參數由呼叫者自行輸入:
# 包裝 add(?, 2) 函數呼叫 s3 = add.s(2) # 呼叫 add(8, 2) res = s3.delay(8)
各種參數也可以在呼叫時再加上去,或是覆蓋原先的設定:
# 包裝函數呼叫 s3 = add.s(2, 2, debug=True) # 覆寫參數設定 s3.delay(debug=False)
Group
Celery 的 group()
含數可以將多個函數呼叫的 signature 包裝起來,以平行的方式執行:
from celery import group from proj.tasks import add # 建立 Group my_group = group([add.s(2, 2), add.s(4, 4)]) # 執行 Group res = my_group() # 取得執行結果 print(res.get())
[4, 8]
group()
也支援部分參數的包裝與呼叫:
# 建立 Group my_group = group([add.s(2), add.s(4)]) # 執行 Group res = my_group(10) # 取得執行結果 print(res.get())
[12, 14]
Chain
Celery 的 group()
含數可以將多個函數呼叫的 signature 串接起來,類似 Unix 管線的方式執行多個工作,同時亦支援資料傳遞:
from celery import chain from proj.tasks import add, mul # 建立 Chain,計算 (4 + 4) * 8 my_chain = chain(add.s(4, 4) | mul.s(8)) # 執行 Chain res = my_chain() # 取得執行結果 print(res.get())
64
chain()
也支援部分參數的包裝與呼叫:
# 建立 Chain,計算 (? + 4) * 8 my_chain = chain(add.s(4) | mul.s(8)) # 執行 Chain res = my_chain(4) # 取得執行結果 print(res.get())
我們也可以使用以下這樣的簡略寫法來建立一條 chain 並執行:
# Chain 簡略寫法 (add.s(4, 4) | mul.s(8))().get()
64
Chord
chord 就是一個含有回呼函數(callback function)的 group:
from celery import chord from proj.tasks import add, xsum # 建立 Chord my_chord = chord([add.s(2, 2), add.s(4, 4)], xsum.s()) # 執行 Chord res = my_chord() # 取得執行結果 print(res.get())
12
當一個 group 串接另一個工作函數時,也會會自動轉為 chord:
# 建立 Chord my_chord = group([add.s(2, 2), add.s(4, 4)]) | xsum.s() # 執行 Chord res = my_chord() # 取得執行結果 print(res.get())
12
我們可以利用 signature 的語法任意組合出各種處理流程,例如:
upload_document.s(file) | group(apply_filter.s() for filter in filters)
關於 Canvas 工作流程的語法,可以參考 Celery 的 Canvas: Designing Work-flows 文件。
路由
Celery 支援所有 AMQP 所提供的路由功能,除此之外亦可使用簡易的具名佇列來控制路由。
task_routes
可以用來設定工作的路由:
app.conf.update( # 設定工作路由 task_routes = { 'proj.tasks.add': {'queue': 'hipri'}, }, )
或是在派送工作時動態指定佇列:
# 動態指定佇列 add.apply_async((2, 2), queue='hipri')
而在執行 Celery worker 時可以使用 celery
指令的 -Q
或 --queues
參數來指定工作來源佇列:
# 從 hipri 佇列取得工作 celery -A proj worker -Q hipri
亦可同時從多個佇列獲取新工作,例如同時從 hipri
與預設的 celery
這兩個佇列獲取工作:
# 從 hipri 佇列取得工作 celery -A proj worker -Q hipri,celery
在指定多個佇列時,佇列的先後順序並不會影響取得工作的優先度。關於路由的方式,可以參考 Celery 的 Routing Tasks 文件。
遠端控制
若採用 RabbitMQ(AMQP)、Radis 或 Qpid 作為 broker,則可在執行期間動態監控 worker 的狀態。
例如我們可以查看 worker 目前正在執行的工作:
# 查看 Worker 目前正在執行的工作 celery -A proj inspect active
這裡 Celery 會以廣播的方式將訊息傳送給所有的 workers。
若只想對特定的 worker 進行查詢,可以使用 --destination
參數指定 worker:
# 查看 celery@example.com 這個 Worker 目前正在執行的工作 celery -A proj inspect active --destination=celery@example.com
celery
的 inspect
指令包含了許多查詢功能,若要查詢詳細用法可以加上 --help
參數顯示說明:
# 查詢 celery inspect 指令用法 celery -A proj inspect --help
而 celery
的 control
指令則是可以對 worker 進行操控:
# 查詢 celery control 指令用法 celery -A proj control --help
例如若要啟用事件訊息功能(用於監看 worker 與工作),可以執行:
# 啟用事件訊息功能 celery -A proj control enable_events
當事件訊息功能啟用之後,可以執行以下指令傾印 worker 的事件:
# 傾印 worker 的事件 celery -A proj events --dump
或是以 curses 介面查看:
# 查看 worker 的事件 celery -A proj events
當查看完畢之後,可再將事件訊息功能停用:
# 停用事件訊息功能 celery -A proj control disable_events
時區
Celery 所有內部的日期與時間資訊都是採 UTC 時區,而當 worker 接收到訊息時,會轉為當地時間,若想要採用跟系統不一樣的時區設定,可以調整 timezone
設定:
# 更改時區設定 app.conf.timezone = 'Asia/Taipei'