Django 實戰:Celery 異步任務從環境搭建到調用全掌握
一、Celery入門
介紹
Celery 是一個簡單、靈活且可靠的分布式任務隊列系統,專注于實時處理的異步任務隊列,同時也支持任務調度。Celery是實現異步任務、定時任務的一種工具。
Celery 的核心功能
- 異步任務處理:將耗時的任務異步執行,不阻塞主程序,從而提高系統的響應速度和擴展性。例如郵件發送、消息推送等。
- 定時任務調度:可以按照預設的時間間隔或特定時間點執行任務,例如定時清理日志、定時統計數據等。
- 分布式任務執行:支持在多臺服務器上運行 worker 進程,擴展到分布式環境中。
- 任務狀態跟蹤和結果存儲:可以跟蹤任務的執行狀態,并將任務的執行結果存儲。
Celery 的架構
- 消息中間件 (Broker):負責接收任務生產者發送的消息并將任務存入隊列。常用的消息中間件有 Redis 和 RabbitMQ
- 任務執行單元 (Worker):執行任務的實際工作進程,會從消息隊列中取出任務并執行 。
- 任務結果存儲 (Backend):用于存儲任務執行結果,可以是 Redis、RabbitMQ 或數據庫 。
- 任務調度器 (Beat):用于調度定時任務,會周期性地將到期需要執行的任務發送給消息隊列 。
Celery 的工作流程
- 任務生產者將任務發送到消息隊列。
- 消息隊列存儲任務,直到任務消費者獲取它們。
- 任務消費者從消息隊列中獲取任務,并在本地執行。
- 執行完成后,任務結果存儲到結果存儲后端。
- 任務生產者可以通過
AsyncResult查詢任務的狀態和結果。
生產環境建議
- Windows 平臺上安裝Celery,只能用于開發環境或測試環境。生產環境,建議使用 Linux 平臺。
安裝
安裝Redis 作為消息中間件(過程略)
安裝 Celery 和 Redis客戶端
pip install redis
pip install celery
二、Celery與Django集成實戰
配置Celery實例
Django項目結構示例
- mysite/
- manage.py
- mysite/
- __init__.py
- settings.py
- urls.py
定義 Celery 實例:創建文件mysite\mysite\celery.py
"""定義和配置 Celery 實例"""
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
# 創建 Celery 實例
app = Celery("mysite")
# 加載配置文件中的 Celery 配置
app.config_from_object("django.conf:settings", namespace="CELERY")
# 自動發現并加載任務
app.autodiscover_tasks(["myapp_infra", "myapp_system"] + settings.MY_APPS, force=True)
配置 Django 啟動時會加載應用:修改文件mysite\mysite\__init__.py
"""Django 啟動時加載Celery實例"""
from .celery import app as celery_app
__all__ = ("celery_app",)
配置Celery:修改Django項目文件settings.py
### Celery配置
CELERY_RESULT_BACKEND = "redis://localhost:6379/2"
CELERY_BROKER_URL = "redis://localhost:6379/3"
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_ENABLE_UTC = True
CELERY_RESULT_EXTENDED = True # 啟用后才會記錄 task_name、date_started 等字段
CELERY_TASK_TRACK_STARTED = True # 記錄任務開始時間
定義任務
發現任務:Celery 將自動從所有已安裝的應用APP中發現任務,需要遵守以下目錄結構
- myapp_system/
- tasks.py
- models.py
- myapp_infra/
- tasks.py
- models.py
定義任務:創建文件myapp_infra/tasks.py,使用@shared_task裝飾器定義 Celery 任務
"""定義 Celery 任務"""
from celery import shared_task
from time import sleep
@shared_task
def send_email_task(subject, message, recipient_list):
"""
發送電子郵件的任務
"""
print("發送郵件任務開始執行...")
sleep(3)
print(f"主題: {subject}")
print(f"內容: {message}")
print(f"收件人: {recipient_list}")
print("#" * 10, "\n")
調用任務
調用任務:在視圖或其他代碼中,使用 .delay() 方法將任務發送到 Celery 隊列中。
from rest_framework.views import APIView
from rest_framework.response import Response
from .tasks import send_email_task
class SendEmailView(APIView):
def get(self, request):
subject = "Hello from Celery"
message = "This is a test email sent using Celery."
recipient_list = ["user@example.com"]
# 異步發送郵件
send_email_task.delay(subject, message, recipient_list)
return Response({"message": "Email sent successfully"})
啟動
啟動Celery工作進程:在開發和測試環境中,使用下面命令啟動Celery工作進程
# 進入Django項目目錄(包含manage.py的目錄)
celery -A mysite worker -l INFO -P solo
啟動Django項目
# 進入Django項目目錄(包含manage.py的目錄)
python manage.py runserver
實戰效果
當訪問SendEmailView 視圖,會看到日志中顯示任務已觸發和執行。
[2025-04-14 09:11:53,151: INFO/MainProcess] Task mybooks.tasks.send_email_task[c37a8725-aa59-43b4-9949-74a753c019a2] received
[2025-04-14 09:11:55,185: WARNING/MainProcess] 發送郵件任務開始執行...
[2025-04-14 09:11:58,186: WARNING/MainProcess] 主題: Hello from Celery
[2025-04-14 09:11:58,186: WARNING/MainProcess] 內容: This is a test email sent using Celery.
[2025-04-14 09:11:58,186: WARNING/MainProcess] 收件人: ['user@example.com']
[2025-04-14 09:11:58,187: WARNING/MainProcess] ##########
[2025-04-14 09:11:58,187: WARNING/MainProcess]
三、delay_on_commit
使用場景
delay_on_commit 是 Celery 提供的一個用于確保任務在數據庫事務提交后執行的機制。考慮以下場景:
- send_email任務可能會在視圖將事務提交到數據庫之前啟動,因此任務可能無法找到用戶。
- 如果事務回滾,任務仍然會執行,處理一個不存在或無效的訂單。
# views.py
def create_user(request):
user = User.objects.create(username=request.POST['username'])
send_email.delay(user.pk)
return HttpResponse('User created')
# task.py
@shared_task
def send_email(user_pk):
user = User.objects.get(pk=user_pk)
# send email ...
使用方法
delay_on_commit 會將任務調度延遲到當前事務成功提交后執行。如果事務回滾,任務也不會被調度。
# views.py
from django.db import transaction
from .tasks import send_email_task
@transaction.atomic
def create_user(request):
user = User.objects.create(username="zhangsan") # 數據庫操作
# 正確:事務提交后才會觸發任務
send_email_task.delay_on_commit(user.id)
# 如果此處拋出異常導致事務回滾,任務不會被執行
return HttpResponse("OK")
您正在閱讀的是《Django從入門到實戰》專欄!關注不迷路~

本文詳解 Celery 核心概念、架構組成及工作流程,并實戰演示如何在 Django 項目中集成 Celery,實現異步任務調用與事務提交控制,助你掌握從配置到部署的全流程開發技巧。
浙公網安備 33010602011771號