介紹如何使用 Celery 分散式工作佇列排程系統,派送大量計算工作,監控排程、並於計算完成後取回結果。
Celery 是一個以 Python 開發的分散式工作佇列排程系統,可以結合各類型的資料庫與 broker 處理即時或預定的排程工作,監控工作的排程狀況,等待計算完成之後再取回計算結果。
安裝 Celery
Celery 可以透過 pip
或 easy_install
安裝:
# 安裝 Celery
pip install celery
選擇 Broker
Celery 支援的 broker 有很多種,包含 RabbitMQ、Redis、Amazon SQS 等,這裡我們採用 Redis,首先在系統上安裝好 Redis 資料庫,關於 Redis 資料庫的安裝與設定,可以參考 Ubuntu Linux 安裝、設定 Redis 資料庫教學與範例。
若不想在系統內安裝原生的 Redis 資料庫,亦可使用 Docker 運行 Redis 資料庫:
# 以 Docker 執行 Redis 資料庫 docker run -d -p 6379:6379 redis
Celery 入門
若要使用 Celery 進行工作的排程,最簡易的方式就是先建立一個 Celery application 物件,並自行定義工作的函數,再透過 Celery 來管理工作排程。
建立 Celery Application
首先建立一個 tasks.py
指令稿,內容如下:
from celery import Celery # 建立 Celery Application app = Celery('tasks', broker='redis://localhost/') # 定義工作函數 @app.task def add(x, y): return x + y
這裡我們的 broker 採用剛剛上面安裝好的 Redis 資料庫,亦可採用其他的 broker,例如 RabbitMQ。
執行 Celery Worker
建立好含有 application 的 tasks
模組之後,我們可以用手動的方式執行 Celery 的 worker,並加上 -A
參數將 application 的模組名稱指定為 tasks
。另外由於在開發階段,我們希望查看較詳細的記錄,所以加上 --loglevel=INFO
參數。
# 手動執行 Celery Worker celery -A tasks worker --loglevel=INFO
-------------- celery@imgqc v5.2.3 (dawn-chorus) --- ***** ----- -- ******* ---- Linux-5.4.0-94-generic-x86_64-with-glibc2.29 2022-01-18 16:13:45 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: __main__:0x7f0da7ee17c0 - ** ---------- .> transport: redis://localhost:6379// - ** ---------- .> results: disabled:// - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . tasks.add [2022-01-18 16:13:45,920: INFO/MainProcess] Connected to redis://localhost:6379// [2022-01-18 16:13:45,923: INFO/MainProcess] mingle: searching for neighbors [2022-01-18 16:13:46,929: INFO/MainProcess] mingle: all alone [2022-01-18 16:13:46,937: INFO/MainProcess] celery@imgqc ready.
在 Celery worker 執行之後,就會處於等待工作的狀態,它會負責接收工作,指派給 worker 進行處理。
這裡我們為了示範 Celery 的運作方式,所以採用手動啟動 worker 的方式,在正式的服務環境下,通常都會採用背景 daemon(例如 systemd
)的方式來啟動 Celery worker。
若要查看 celery
指令的各種參數用法,可以加上 --help
查看其說明:
# 查看 Celery 指令參數用法 celery --help celery worker --help
派送工作
若要派送工作至 Celery 的 worker,最簡單的方式就是呼叫工作函數的 delay()
函數,它是 apply_async()
函數的簡易版本:
from tasks import add # 透過 Celery 派送工作 add.delay(4, 4)
在派送工作之後,之前我們執行的 Celery worker 輸出訊息應該就會出現類似這樣的資訊:
[2022-01-18 19:52:32,374: INFO/ForkPoolWorker-1] Task tasks.add[220b79ac-87d8-409b-99b0-fe831030474a] succeeded in 0.0005772580007032957s: 8
delay()
函數會傳回一個 AsyncResult
物件,可用於追蹤工作的狀態以及取得計算結果,不過這些功能必須在 Celery application 有設置 backend 的情況下才能使用,請參考下面的說明。
取得計算結果
如果需要追蹤工作的狀態,Celery 就必須有地方存放這些狀態的資訊,而 backend 的作用就是儲存這些資訊。我們可以修改 tasks.py
指令稿,在建立 Celery application 時加上 backend 的設定,最簡單的方式就是透過 RPC 的方式將工作狀態送回 Python 程式中:
# 建立 Celery Application(以 RPC 送回工作狀態) app = Celery('tasks', backend='rpc://', broker='redis://localhost/')
可以利用其他的資料庫來存放工作狀態,例如 Redis 資料庫:
# 建立 Celery Application(以 Redis 儲存工作狀態) app = Celery('tasks', backend='redis://localhost/', broker='redis://localhost/')
設置了 backend 之後,按照同樣的方式派送工作:
from tasks import add # 透過 Celery 派送工作 result = add.delay(4, 4)
此時我們就可以經由傳回的 AsyncResult
物件來追蹤工作狀態,例如檢查工作是否已經執行完成:
# 檢查工作是否已經完成
result.ready()
True
以及取得計算結果:
# 取得計算結果
result.get()
8
若不需要查看計算結果,則可呼叫 forget()
函數釋放儲存空間:
# 釋放儲存空間
result.forget()
由於 backend 在儲存計算結果時也會需要一定的空間,所以對於每一個 AsyncResult
物件都一定要呼叫 get()
或是 forget()
函數,這樣這些計算結果所占用的儲存資源才會被釋放掉。
Celery 設定檔
在大部分的情況下,Celery 不太需要調整設定就可以正常運作,但若進一步了解各種細部的設定,可以讓 Celery 的運作更符合自己的需要,可用的 Celery 設定相當多,可參考 Celery 的 Configuration and defaults 說明網頁。
調整設定最直接的方式就是更改 Celery application 物件的屬性值,例如將 task_serializer
選項設定為 json
:
# 設定 task_serializer 選項設定為 json app.conf.task_serializer = 'json'
若要更改多項設定,可以使用 update()
函數:
# 更改多項設定 app.conf.update( task_serializer='json', accept_content=['json'], result_serializer='json', timezone='Asia/Taipei', enable_utc=False, )
在大型的專案中,建議將所有的設定統一集中在一個模組檔案中(通常命名為 celeryconfig.py
),以下是一個簡單的範例:
# celeryconfig.py 設定檔內容 # Broker 與 Backend 設定 broker_url = 'redis://localhost/' result_backend = 'rpc://' # 其他設定 task_serializer = 'json' result_serializer = 'json' accept_content = ['json'] timezone = 'Asia/Taipei' enable_utc = False
建立好 celeryconfig.py
設定檔案之後,可以使用 config_from_object()
函數載入設定:
# 從 celeryconfig.py 載入設定 app.config_from_object('celeryconfig')
若要檢測 celeryconfig.py
設定檔中是否有語法錯誤,可以執行以下指令進行測試:
# 檢測 celeryconfig.py 設定是否有語法錯誤 python -m celeryconfig
Celery 的設定有相當豐富的用途,例如設定工作派送路由,將指定工作派送至特定的佇列:
# 設定工作派送路由 task_routes = { 'tasks.add': 'low-priority', }
或是指定特定工作派送的速度上限:
# 設定工作派送速度 task_annotations = { 'tasks.add': {'rate_limit': '10/m'} }
如果使用 RabbitMQ 或 Redis 作為 broker,甚至可以在執行階段動態改變工作派送速度:
# 動態設定 tasks.add 工作派送速度(RabbitMQ 或 Redis) celery -A tasks control rate_limit tasks.add 10/m
-> celery@imgqc: OK new rate limit set successfully
執行之後,就會通知 Celery worker 伺服器更改指定工作的派送速度:
[2022-01-19 08:13:22,111: INFO/MainProcess] New rate limit for tasks of type tasks.add: 10/m.