Categories: Python

Celery 分散式工作佇列排程系統入門教學(一)

介紹如何使用 Celery 分散式工作佇列排程系統,派送大量計算工作,監控排程、並於計算完成後取回結果。

Celery 是一個以 Python 開發的分散式工作佇列排程系統,可以結合各類型的資料庫與 broker 處理即時或預定的排程工作,監控工作的排程狀況,等待計算完成之後再取回計算結果。

安裝 Celery

Celery 可以透過 pipeasy_install 安裝:

# 安裝 Celery
pip install celery

選擇 Broker

Celery 支援的 broker 有很多種,包含 RabbitMQ、Redis、Amazon SQS 等,這裡我們採用 Redis,首先在系統上安裝好 Redis 資料庫,關於 Redis 資料庫的安裝與設定,可以參考 Ubuntu Linux 安裝、設定 Redis 資料庫教學與範例

若不想在系統內安裝原生的 Redis 資料庫,亦可使用 Docker 運行 Redis 資料庫:

# 以 Docker 執行 Redis 資料庫
docker run -d -p 6379:6379 redis

Celery 入門

若要使用 Celery 進行工作的排程,最簡易的方式就是先建立一個 Celery application 物件,並自行定義工作的函數,再透過 Celery 來管理工作排程。

建立 Celery Application

首先建立一個 tasks.py 指令稿,內容如下:

from celery import Celery

# 建立 Celery Application
app = Celery('tasks', broker='redis://localhost/')

# 定義工作函數
@app.task
def add(x, y):
    return x + y

這裡我們的 broker 採用剛剛上面安裝好的 Redis 資料庫,亦可採用其他的 broker,例如 RabbitMQ。

執行 Celery Worker

建立好含有 application 的 tasks 模組之後,我們可以用手動的方式執行 Celery 的 worker,並加上 -A 參數將 application 的模組名稱指定為 tasks。另外由於在開發階段,我們希望查看較詳細的記錄,所以加上 --loglevel=INFO 參數。

# 手動執行 Celery Worker
celery -A tasks worker --loglevel=INFO
 -------------- celery@imgqc v5.2.3 (dawn-chorus)
--- ***** -----
-- ******* ---- Linux-5.4.0-94-generic-x86_64-with-glibc2.29 2022-01-18 16:13:45
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         __main__:0x7f0da7ee17c0
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add

[2022-01-18 16:13:45,920: INFO/MainProcess] Connected to redis://localhost:6379//
[2022-01-18 16:13:45,923: INFO/MainProcess] mingle: searching for neighbors
[2022-01-18 16:13:46,929: INFO/MainProcess] mingle: all alone
[2022-01-18 16:13:46,937: INFO/MainProcess] celery@imgqc ready.

在 Celery worker 執行之後,就會處於等待工作的狀態,它會負責接收工作,指派給 worker 進行處理。

這裡我們為了示範 Celery 的運作方式,所以採用手動啟動 worker 的方式,在正式的服務環境下,通常都會採用背景 daemon(例如 systemd)的方式來啟動 Celery worker。

若要查看 celery 指令的各種參數用法,可以加上 --help 查看其說明:

# 查看 Celery 指令參數用法
celery --help
celery worker --help

派送工作

若要派送工作至 Celery 的 worker,最簡單的方式就是呼叫工作函數的 delay() 函數,它是 apply_async() 函數的簡易版本:

from tasks import add

# 透過 Celery 派送工作
add.delay(4, 4)

在派送工作之後,之前我們執行的 Celery worker 輸出訊息應該就會出現類似這樣的資訊:

[2022-01-18 19:52:32,374: INFO/ForkPoolWorker-1] Task tasks.add[220b79ac-87d8-409b-99b0-fe831030474a] succeeded in 0.0005772580007032957s: 8

delay() 函數會傳回一個 AsyncResult 物件,可用於追蹤工作的狀態以及取得計算結果,不過這些功能必須在 Celery application 有設置 backend 的情況下才能使用,請參考下面的說明。

取得計算結果

如果需要追蹤工作的狀態,Celery 就必須有地方存放這些狀態的資訊,而 backend 的作用就是儲存這些資訊。我們可以修改 tasks.py 指令稿,在建立 Celery application 時加上 backend 的設定,最簡單的方式就是透過 RPC 的方式將工作狀態送回 Python 程式中:

# 建立 Celery Application(以 RPC 送回工作狀態)
app = Celery('tasks', backend='rpc://', broker='redis://localhost/')

可以利用其他的資料庫來存放工作狀態,例如 Redis 資料庫:

# 建立 Celery Application(以 Redis 儲存工作狀態)
app = Celery('tasks', backend='redis://localhost/', broker='redis://localhost/')

設置了 backend 之後,按照同樣的方式派送工作:

from tasks import add

# 透過 Celery 派送工作
result = add.delay(4, 4)

此時我們就可以經由傳回的 AsyncResult 物件來追蹤工作狀態,例如檢查工作是否已經執行完成:

# 檢查工作是否已經完成
result.ready()
True

以及取得計算結果:

# 取得計算結果
result.get()
8

若不需要查看計算結果,則可呼叫 forget() 函數釋放儲存空間:

# 釋放儲存空間
result.forget()

由於 backend 在儲存計算結果時也會需要一定的空間,所以對於每一個 AsyncResult 物件都一定要呼叫 get() 或是 forget() 函數,這樣這些計算結果所占用的儲存資源才會被釋放掉。

Celery 設定檔

在大部分的情況下,Celery 不太需要調整設定就可以正常運作,但若進一步了解各種細部的設定,可以讓 Celery 的運作更符合自己的需要,可用的 Celery 設定相當多,可參考 Celery 的 Configuration and defaults 說明網頁

調整設定最直接的方式就是更改 Celery application 物件的屬性值,例如將 task_serializer 選項設定為 json

# 設定 task_serializer 選項設定為 json
app.conf.task_serializer = 'json'

若要更改多項設定,可以使用 update() 函數:

# 更改多項設定
app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='Asia/Taipei',
    enable_utc=False,
)

在大型的專案中,建議將所有的設定統一集中在一個模組檔案中(通常命名為 celeryconfig.py),以下是一個簡單的範例:

# celeryconfig.py 設定檔內容

# Broker 與 Backend 設定
broker_url = 'redis://localhost/'
result_backend = 'rpc://'

# 其他設定
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Taipei'
enable_utc = False

建立好 celeryconfig.py 設定檔案之後,可以使用 config_from_object() 函數載入設定:

# 從 celeryconfig.py 載入設定
app.config_from_object('celeryconfig')

若要檢測 celeryconfig.py 設定檔中是否有語法錯誤,可以執行以下指令進行測試:

# 檢測 celeryconfig.py 設定是否有語法錯誤
python -m celeryconfig

Celery 的設定有相當豐富的用途,例如設定工作派送路由,將指定工作派送至特定的佇列:

# 設定工作派送路由
task_routes = {
    'tasks.add': 'low-priority',
}

或是指定特定工作派送的速度上限:

# 設定工作派送速度
task_annotations = {
    'tasks.add': {'rate_limit': '10/m'}
}

如果使用 RabbitMQ 或 Redis 作為 broker,甚至可以在執行階段動態改變工作派送速度:

# 動態設定 tasks.add 工作派送速度(RabbitMQ 或 Redis)
celery -A tasks control rate_limit tasks.add 10/m
->  celery@imgqc: OK
        new rate limit set successfully

執行之後,就會通知 Celery worker 伺服器更改指定工作的派送速度:

[2022-01-19 08:13:22,111: INFO/MainProcess] New rate limit for tasks of type tasks.add: 10/m.

參考資料

Share
Published by
Office Guide

Recent Posts

Python 使用 PyAutoGUI 自動操作滑鼠與鍵盤

本篇介紹如何在 Python ...

1 年 ago

Ubuntu Linux 以 WireGuard 架設 VPN 伺服器教學與範例

本篇介紹如何在 Ubuntu ...

1 年 ago

Linux 網路設定 ip 指令用法教學與範例

本篇介紹如何在 Linux 系...

1 年 ago

Linux 以 Cryptsetup、LUKS 加密 USB 隨身碟教學與範例

介紹如何在 Linux 系統中...

1 年 ago