Categories: Python

Django 網頁架構整合 Celery 工作佇列排程系統教學與範例

介紹如何在 Ubuntu Linux 環境中整合 Django 網頁架構與 Celery 分散式工作佇列排程系統,讓網頁應用程式處理大量運算工作。

建立 Django 應用程式

這裡我們採用 Ubuntu Linux 以 Django 搭配 PostgreSQL、Nginx、Gunicorn 開發布署教學與範例中所敘述的作法,建立基礎的 Django 應用程式。

安裝 Celery 分散式工作佇列排程系統

這裡我們採用 Redis 作為 Celery 的 broker,先依照 Ubuntu Linux 安裝、設定 Redis 資料庫教學與範例安裝好 Redis 資料庫。

接著在原來 Django 應用程式的 virtualenv 環境之下,安裝 Python 的 Celery 與 Redis 相關模組:

# 安裝 Celery 與 Redis 相關模組
pip install -U "celery[redis]"

整合 Django 與 Celery

在建立好的基本 Django 應用程式中,目錄結構大致上為:

manage.py
myproject
  - __init__.py
  - asgi.py
  - settings.py
  - urls.py
  - wsgi.py

首先在 myproject 目錄之下建立一個 celery.py 指令稿,內容如下:

# myproject/celery.py 指令稿
import os
from celery import Celery

# Django 預設的 settings 模組(Celery 用的)
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

# 建立 Celery Application
app = Celery('myproject')

# 從 celeryconfig.py 讀取 Celery 的各種設定
app.config_from_object('myproject.celeryconfig')

# 自動從已註冊 Django Application 目錄下的 tasks.py 載入工作
app.autodiscover_tasks()

# 除錯用的工作(正式環境可省略)
@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

這裡設定的 DJANGO_SETTINGS_MODULE 環境變數是為了讓 Celery 可以找的到 Django 的設定檔案(必須在建立 Celery Application 之前設定)。

此處的 debug_task() 只是除錯用的工作函數,其加上 bind=True 之後,就可以取得工作物件(self),輸出工作物件本身的相關訊息。

這裡我們設計讓 Celery 從 celeryconfig.py 這個獨立的設定檔讀取各種設定值,所以還要在 myproject 目錄之下再建立一個 celeryconfig.py 指令稿,放置各種 Celery 的設定,設定檔的語法說明可以參考 Celery 的 Configuration and defaults 文件,以下是一個基本的設定檔範例:

# myproject/celeryconfig.py 指令稿
# Broker 位址
broker_url = 'redis://localhost/'

# Backend 位址
result_backend = 'redis://localhost/'

Celery application 的 autodiscover_tasks() 函數會自動從每個 Django application 目錄下的 tasks.py 載入工作,若要另外加上其他的工作,可以在 celeryconfig.py 設定檔內將個別的模組加入 imports 中。

修改 __init__.py 指令稿,加入以下內容,讓 Django 應用程式可以自動載入 Celery 的 app,讓 shared_task 可以正常使用此 app

# myproject/__init__.py 指令稿
# 確保此 Celery app 會在 Django 啟動時被載入,
# 讓 shared_task 可以使用此 app
from .celery import app as celery_app

__all__ = ('celery_app',)

建立 Django App

參考 Django 的官方教學,建立一個名稱為 demoapp 的 Django app:

# 建立 Django app
python manage.py startapp demoapp

修改 demoapp/urls.py,設定 URL 對應的 view:

from django.urls import path
from . import views

# 設定 URL 對應的 View
urlpatterns = [
    path('', views.IndexView.as_view(), name='index'),
    path('submit_job', views.SubmitJobView.as_view(), name='submit_job'),
]

修改 demoapp/models.py,定義工作類別:

from django.db import models

class Job(models.Model):
    # 工作名稱
    name = models.CharField(max_length=32)

    # 工作接收時間
    receive_time = models.DateTimeField()

    # 工作完成時間(可為空值)
    finish_time = models.DateTimeField(null=True, blank=True)

    # 預設輸出格式
    def __str__(self):
        return f"{self.name}({self.receive_time})"

建立 demoapp/tasks.py,定義工作函數:

from demoapp.models import Job
from celery import shared_task
from celery.utils.log import get_task_logger
import time, datetime

# 取得 logger
logger = get_task_logger(__name__)

# 工作函數
@shared_task
def run_job(job_id):
    # 取得工作詳細資訊
    j = Job.objects.get(id=job_id)

    # 執行工作
    time.sleep(5)

    # 記錄訊息
    logger.info(f'Job ID: {job_id}')

    # 寫入工作執行結果
    j.finish_time = datetime.datetime.now()

    # 儲存
    j.save()

修改 demoapp/views.py,定義工作列表與派送工作的 view 類別:

from django.shortcuts import render
from django.http import HttpResponse
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from django.views import View, generic
from .models import Job
import json, datetime

from .tasks import run_job

# 工作列表
class IndexView(generic.ListView):
    model = Job
    template_name = 'demoapp/index.html'
    context_object_name = 'job_list'

# 派送工作
@method_decorator(csrf_exempt, name='dispatch')
class SubmitJobView(View):

    def get(self, request):
        return HttpResponse("Please use POST method.")

    def post(self, request):
        if request.body:
            try:
                # 解析 JSON
                json_data = json.loads(request.body)

                # 建立 Job 物件
                job = Job(
                    name = json_data['name'],
                    receive_time = datetime.datetime.now(),
                )

                # 存入資料庫
                job.save()

                # 派送工作至 Celery
                run_job.delay(job.id)

            except json.JSONDecodeError:
                return HttpResponse(json.dumps(
                    {'status': 'FAIL', 'message': 'JSON decoding error'}))

            return HttpResponse(json.dumps({'status': 'OK', 'message': ''}))

建立 templates/demoapp/index.html,定義工作列表的樣板:

{% if job_list %}
    <ul>
        {% for job in job_list %}
        <li>{{job.name}}:
            receive:{{job.receive_time|date:"Y-m-d H:i:s"}},
            finish:{{job.finish_time|date:"Y-m-d H:i:s"}}</li>
        {% endfor %}
    </ul>
{% else %}
    <p>No jobs are available.</p>
{% endif %}

建立 Django 的資料庫遷移(migration)指令稿,並執行資料庫遷移:

# 建立資料庫遷移(migration)指令稿
python manage.py makemigrations

# 執行資料庫遷移
python manage.py migrate

測試工作派送

啟動 Django 測試用伺服器:

# 啟動 Django 測試用伺服器
python manage.py runserver

啟動 Celery worker:

# 手動執行 Celery Worker
celery -A myproject worker --loglevel=INFO

建立一個測試用的工作描述 JSON 檔案 test_job.json

{
    "name":"Test Job"
}

使用 httpie 以 POST 的方式送出工作的 JSON 資料:

# 送出工作的 JSON 資料
http http://127.0.0.1:8000/demoapp/submit_job < test_job.json
HTTP/1.1 200 OK
Content-Length: 31
Content-Type: text/html; charset=utf-8
Cross-Origin-Opener-Policy: same-origin
Date: Thu, 17 Feb 2022 02:36:20 GMT
Referrer-Policy: same-origin
Server: WSGIServer/0.2 CPython/3.9.5
X-Content-Type-Options: nosniff
X-Frame-Options: DENY

{
    "message": "",
    "status": "OK"
}

此時 Celery worker 的輸出應該會有類似的訊息:

[2022-02-17 10:36:25,330: INFO/ForkPoolWorker-2] Task demoapp.tasks.run_job[077bf15c-f2b4-4714-aa6c-987cd217d0dc] succeeded in 5.059055116958916s: None

以 systemd 控管 Celery

新增 /etc/systemd/system/celery.service 服務設定檔,內容如下:

[Unit]
Description=Celery Service
After=network.target

[Service]
Type=forking

# 執行服務的使用者與群組
User=myuser
Group=www-data

# 設定檔
EnvironmentFile=/etc/conf.d/celery

# 工作目錄
WorkingDirectory=/home/myuser/myprojectdir

# 可寫入的執行期間目錄(位於 /run/ 之下)
RuntimeDirectory=celery

# 啟動服務執行指令
ExecStart=/bin/sh -c '${CELERY_BIN} -A $CELERY_APP multi start $CELERYD_NODES \
    --pidfile=${CELERYD_PID_FILE} --logfile=${CELERYD_LOG_FILE} \
    --loglevel="${CELERYD_LOG_LEVEL}" $CELERYD_OPTS'

# 停止服務執行指令
ExecStop=/bin/sh -c '${CELERY_BIN} multi stopwait $CELERYD_NODES \
    --pidfile=${CELERYD_PID_FILE} --logfile=${CELERYD_LOG_FILE} \
    --loglevel="${CELERYD_LOG_LEVEL}"'

# 重新啟動服務執行指令
ExecReload=/bin/sh -c '${CELERY_BIN} -A $CELERY_APP multi restart $CELERYD_NODES \
    --pidfile=${CELERYD_PID_FILE} --logfile=${CELERYD_LOG_FILE} \
    --loglevel="${CELERYD_LOG_LEVEL}" $CELERYD_OPTS'

# 自動重新啟動
Restart=always

[Install]
WantedBy=multi-user.target

建立 /etc/conf.d/celery 設定檔,內容如下:

# 設定節點名稱(單一節點)
CELERYD_NODES="w1"

# 設定節點名稱(多節點)
#CELERYD_NODES="w1 w2 w3"

# celery 指令路徑
CELERY_BIN="/home/myuser/myprojectdir/myprojectenv/bin/celery"

# Celery App
CELERY_APP="myproject"

# celery 指令額外參數
CELERYD_OPTS="--time-limit=300 --concurrency=4"

# 記錄檔設定
# - %n: 節點名稱
# - %I: 子行程編號
CELERYD_PID_FILE="/var/run/celery/%n.pid"
CELERYD_LOG_FILE="/var/log/celery/%n%I.log"
CELERYD_LOG_LEVEL="INFO"

建立放置記錄檔的目錄 /var/log/celery,並設定好權限:

# 建立目錄
sudo mkdir /var/log/celery

# 設定權限
sudo chown myuser:www-data /var/log/celery

systemd 重新載入設定檔,立即啟動、同時設定開機自動啟動 celery.service 服務:

# 讓 systemd 重新載入設定檔
sudo systemctl daemon-reload

# 立即啟動、同時設定開機自動啟動 celery.service
sudo systemctl enable --now celery.service

logrotate 設定

新增 Celery 的 logrotate 設定檔 /etc/logrotate.d/celery,內容如下:

/var/log/celery/*.log {
    daily
    missingok
    rotate 14
    compress
    delaycompress
    notifempty
    create 0640 health www-data
    copytruncate
}

布署 Django

Django 應用程式的布署,請參考 Ubuntu Linux 以 Django 搭配 PostgreSQL、Nginx、Gunicorn 開發布署教學與範例

參考資料

Share
Published by
Office Guide

Recent Posts

Python 使用 PyAutoGUI 自動操作滑鼠與鍵盤

本篇介紹如何在 Python ...

9 個月 ago

Ubuntu Linux 以 WireGuard 架設 VPN 伺服器教學與範例

本篇介紹如何在 Ubuntu ...

9 個月 ago

Linux 網路設定 ip 指令用法教學與範例

本篇介紹如何在 Linux 系...

10 個月 ago

Windows 使用 TPM 虛擬智慧卡保護 SSH 金鑰教學與範例

本篇介紹如何在 Windows...

11 個月 ago

Linux 以 Shamir’s Secret Sharing 分割保存金鑰教學與範例

介紹如何在 Linux 中使用...

11 個月 ago

Linux 以 Cryptsetup、LUKS 加密 USB 隨身碟教學與範例

介紹如何在 Linux 系統中...

11 個月 ago