介紹如何在 Ubuntu Linux 環境中整合 Django 網頁架構與 Celery 分散式工作佇列排程系統,讓網頁應用程式處理大量運算工作。
建立 Django 應用程式
這裡我們採用 Ubuntu Linux 以 Django 搭配 PostgreSQL、Nginx、Gunicorn 開發布署教學與範例中所敘述的作法,建立基礎的 Django 應用程式。
安裝 Celery 分散式工作佇列排程系統
這裡我們採用 Redis 作為 Celery 的 broker,先依照 Ubuntu Linux 安裝、設定 Redis 資料庫教學與範例安裝好 Redis 資料庫。
接著在原來 Django 應用程式的 virtualenv 環境之下,安裝 Python 的 Celery 與 Redis 相關模組:
# 安裝 Celery 與 Redis 相關模組 pip install -U "celery[redis]"
整合 Django 與 Celery
在建立好的基本 Django 應用程式中,目錄結構大致上為:
manage.py myproject - __init__.py - asgi.py - settings.py - urls.py - wsgi.py
首先在 myproject
目錄之下建立一個 celery.py
指令稿,內容如下:
# myproject/celery.py 指令稿 import os from celery import Celery # Django 預設的 settings 模組(Celery 用的) os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings') # 建立 Celery Application app = Celery('myproject') # 從 celeryconfig.py 讀取 Celery 的各種設定 app.config_from_object('myproject.celeryconfig') # 自動從已註冊 Django Application 目錄下的 tasks.py 載入工作 app.autodiscover_tasks() # 除錯用的工作(正式環境可省略) @app.task(bind=True) def debug_task(self): print(f'Request: {self.request!r}')
這裡設定的 DJANGO_SETTINGS_MODULE
環境變數是為了讓 Celery 可以找的到 Django 的設定檔案(必須在建立 Celery Application 之前設定)。
此處的 debug_task()
只是除錯用的工作函數,其加上 bind=True
之後,就可以取得工作物件(self
),輸出工作物件本身的相關訊息。
這裡我們設計讓 Celery 從 celeryconfig.py
這個獨立的設定檔讀取各種設定值,所以還要在 myproject
目錄之下再建立一個 celeryconfig.py
指令稿,放置各種 Celery 的設定,設定檔的語法說明可以參考 Celery 的 Configuration and defaults 文件,以下是一個基本的設定檔範例:
# myproject/celeryconfig.py 指令稿 # Broker 位址 broker_url = 'redis://localhost/' # Backend 位址 result_backend = 'redis://localhost/'
Celery application 的 autodiscover_tasks()
函數會自動從每個 Django application 目錄下的 tasks.py
載入工作,若要另外加上其他的工作,可以在 celeryconfig.py
設定檔內將個別的模組加入 imports
中。
修改 __init__.py
指令稿,加入以下內容,讓 Django 應用程式可以自動載入 Celery 的 app
,讓 shared_task
可以正常使用此 app
:
# myproject/__init__.py 指令稿 # 確保此 Celery app 會在 Django 啟動時被載入, # 讓 shared_task 可以使用此 app from .celery import app as celery_app __all__ = ('celery_app',)
建立 Django App
參考 Django 的官方教學,建立一個名稱為 demoapp
的 Django app:
# 建立 Django app
python manage.py startapp demoapp
修改 demoapp/urls.py
,設定 URL 對應的 view:
from django.urls import path from . import views # 設定 URL 對應的 View urlpatterns = [ path('', views.IndexView.as_view(), name='index'), path('submit_job', views.SubmitJobView.as_view(), name='submit_job'), ]
修改 demoapp/models.py
,定義工作類別:
from django.db import models class Job(models.Model): # 工作名稱 name = models.CharField(max_length=32) # 工作接收時間 receive_time = models.DateTimeField() # 工作完成時間(可為空值) finish_time = models.DateTimeField(null=True, blank=True) # 預設輸出格式 def __str__(self): return f"{self.name}({self.receive_time})"
建立 demoapp/tasks.py
,定義工作函數:
from demoapp.models import Job from celery import shared_task from celery.utils.log import get_task_logger import time, datetime # 取得 logger logger = get_task_logger(__name__) # 工作函數 @shared_task def run_job(job_id): # 取得工作詳細資訊 j = Job.objects.get(id=job_id) # 執行工作 time.sleep(5) # 記錄訊息 logger.info(f'Job ID: {job_id}') # 寫入工作執行結果 j.finish_time = datetime.datetime.now() # 儲存 j.save()
修改 demoapp/views.py
,定義工作列表與派送工作的 view 類別:
from django.shortcuts import render from django.http import HttpResponse from django.utils.decorators import method_decorator from django.views.decorators.csrf import csrf_exempt from django.views import View, generic from .models import Job import json, datetime from .tasks import run_job # 工作列表 class IndexView(generic.ListView): model = Job template_name = 'demoapp/index.html' context_object_name = 'job_list' # 派送工作 @method_decorator(csrf_exempt, name='dispatch') class SubmitJobView(View): def get(self, request): return HttpResponse("Please use POST method.") def post(self, request): if request.body: try: # 解析 JSON json_data = json.loads(request.body) # 建立 Job 物件 job = Job( name = json_data['name'], receive_time = datetime.datetime.now(), ) # 存入資料庫 job.save() # 派送工作至 Celery run_job.delay(job.id) except json.JSONDecodeError: return HttpResponse(json.dumps( {'status': 'FAIL', 'message': 'JSON decoding error'})) return HttpResponse(json.dumps({'status': 'OK', 'message': ''}))
建立 templates/demoapp/index.html
,定義工作列表的樣板:
{% if job_list %} <ul> {% for job in job_list %} <li>{{job.name}}: receive:{{job.receive_time|date:"Y-m-d H:i:s"}}, finish:{{job.finish_time|date:"Y-m-d H:i:s"}}</li> {% endfor %} </ul> {% else %} <p>No jobs are available.</p> {% endif %}
建立 Django 的資料庫遷移(migration)指令稿,並執行資料庫遷移:
# 建立資料庫遷移(migration)指令稿 python manage.py makemigrations # 執行資料庫遷移 python manage.py migrate
測試工作派送
啟動 Django 測試用伺服器:
# 啟動 Django 測試用伺服器
python manage.py runserver
啟動 Celery worker:
# 手動執行 Celery Worker celery -A myproject worker --loglevel=INFO
建立一個測試用的工作描述 JSON 檔案 test_job.json
:
{ "name":"Test Job" }
使用 httpie 以 POST 的方式送出工作的 JSON 資料:
# 送出工作的 JSON 資料 http http://127.0.0.1:8000/demoapp/submit_job < test_job.json
HTTP/1.1 200 OK Content-Length: 31 Content-Type: text/html; charset=utf-8 Cross-Origin-Opener-Policy: same-origin Date: Thu, 17 Feb 2022 02:36:20 GMT Referrer-Policy: same-origin Server: WSGIServer/0.2 CPython/3.9.5 X-Content-Type-Options: nosniff X-Frame-Options: DENY { "message": "", "status": "OK" }
此時 Celery worker 的輸出應該會有類似的訊息:
[2022-02-17 10:36:25,330: INFO/ForkPoolWorker-2] Task demoapp.tasks.run_job[077bf15c-f2b4-4714-aa6c-987cd217d0dc] succeeded in 5.059055116958916s: None
以 systemd 控管 Celery
新增 /etc/systemd/system/celery.service
服務設定檔,內容如下:
[Unit] Description=Celery Service After=network.target [Service] Type=forking # 執行服務的使用者與群組 User=myuser Group=www-data # 設定檔 EnvironmentFile=/etc/conf.d/celery # 工作目錄 WorkingDirectory=/home/myuser/myprojectdir # 可寫入的執行期間目錄(位於 /run/ 之下) RuntimeDirectory=celery # 啟動服務執行指令 ExecStart=/bin/sh -c '${CELERY_BIN} -A $CELERY_APP multi start $CELERYD_NODES \ --pidfile=${CELERYD_PID_FILE} --logfile=${CELERYD_LOG_FILE} \ --loglevel="${CELERYD_LOG_LEVEL}" $CELERYD_OPTS' # 停止服務執行指令 ExecStop=/bin/sh -c '${CELERY_BIN} multi stopwait $CELERYD_NODES \ --pidfile=${CELERYD_PID_FILE} --logfile=${CELERYD_LOG_FILE} \ --loglevel="${CELERYD_LOG_LEVEL}"' # 重新啟動服務執行指令 ExecReload=/bin/sh -c '${CELERY_BIN} -A $CELERY_APP multi restart $CELERYD_NODES \ --pidfile=${CELERYD_PID_FILE} --logfile=${CELERYD_LOG_FILE} \ --loglevel="${CELERYD_LOG_LEVEL}" $CELERYD_OPTS' # 自動重新啟動 Restart=always [Install] WantedBy=multi-user.target
建立 /etc/conf.d/celery
設定檔,內容如下:
# 設定節點名稱(單一節點) CELERYD_NODES="w1" # 設定節點名稱(多節點) #CELERYD_NODES="w1 w2 w3" # celery 指令路徑 CELERY_BIN="/home/myuser/myprojectdir/myprojectenv/bin/celery" # Celery App CELERY_APP="myproject" # celery 指令額外參數 CELERYD_OPTS="--time-limit=300 --concurrency=4" # 記錄檔設定 # - %n: 節點名稱 # - %I: 子行程編號 CELERYD_PID_FILE="/var/run/celery/%n.pid" CELERYD_LOG_FILE="/var/log/celery/%n%I.log" CELERYD_LOG_LEVEL="INFO"
建立放置記錄檔的目錄 /var/log/celery
,並設定好權限:
# 建立目錄 sudo mkdir /var/log/celery # 設定權限 sudo chown myuser:www-data /var/log/celery
讓 systemd
重新載入設定檔,立即啟動、同時設定開機自動啟動 celery.service
服務:
# 讓 systemd 重新載入設定檔 sudo systemctl daemon-reload # 立即啟動、同時設定開機自動啟動 celery.service sudo systemctl enable --now celery.service
logrotate 設定
新增 Celery 的 logrotate 設定檔 /etc/logrotate.d/celery
,內容如下:
/var/log/celery/*.log { daily missingok rotate 14 compress delaycompress notifempty create 0640 health www-data copytruncate }
布署 Django
Django 應用程式的布署,請參考 Ubuntu Linux 以 Django 搭配 PostgreSQL、Nginx、Gunicorn 開發布署教學與範例。