celery
一. celery 簡介
Celery 是一個專注于實時處理和任務調度的分布式任務隊列, 同時提供操作和維護分布式系統所需的工具.. 所謂任務就是消息, 消息中的有效載荷中包含要執行任務需要的全部數據.
Celery 是一個分布式隊列的管理工具, 可以用 Celery 提供的接口快速實現并管理一個分布式的任務隊列.
Celery 本身不是任務隊列, 是管理分布式任務隊列的工具. 它封裝了操作常見任務隊列的各種操作, 我們使用它可以快速進行任務隊列的使用與管理.
1.Celery 特性 :
- 方便查看定時任務的執行情況, 如 是否成功, 當前狀態, 執行任務花費的時間等.
- 使用功能齊備的管理后臺或命令行添加,更新,刪除任務.
- 方便把任務和配置管理相關聯.
- 可選 多進程, Eventlet 和 Gevent 三種模型并發執行.
- 提供錯誤處理機制.
- 提供多種任務原語, 方便實現任務分組,拆分,和調用鏈.
- 支持多種消息代理和存儲后端.
- Celery 是語言無關的.它提供了python 等常見語言的接口支持
2.Celery 組件
1. Celery 扮演生產者和消費者的角色,
Celery Beat : 任務調度器. Beat 進程會讀取配置文件的內容, 周期性的將配置中到期需要執行的任務發送給任務隊列.
Celery Worker : 執行任務的消費者, 通常會在多臺服務器運行多個消費者, 提高運行效率.
Broker : 消息代理, 隊列本身. 也稱為消息中間件. 接受任務生產者發送過來的任務消息, 存進隊列再按序分發給任務消費方(通常是消息隊列或者數據庫).
Producer : 任務生產者. 調用 Celery API , 函數或者裝飾器, 而產生任務并交給任務隊列處理的都是任務生產者.
Result Backend : 任務處理完成之后保存狀態信息和結果, 以供查詢.
2. 產生任務的方式 :
發布者發布任務(WEB 應用)
任務調度按期發布任務(定時任務)
3. celery 依賴三個庫: 這三個庫, 都由 Celery 的開發者開發和維護.
billiard : 基于 Python2.7 的 multisuprocessing 而改進的庫, 主要用來提高性能和穩定性.
librabbitmp : C 語言實現的 Python 客戶端,
kombu : Celery 自帶的用來收發消息的庫, 提供了符合 Python 語言習慣的, 使用 AMQP 協議的高級接口.
3.選擇消息代理
使用于生產環境的消息代理有 RabbitMQ 和 Redis, 官方推薦 RabbitMQ.
4.Celery 序列化
在客戶端和消費者之間傳輸數據需要 序列化和反序列化. Celery 支出的序列化方案如下所示:
| 方案 | 說明 |
|---|---|
| pickle | pickle 是Python 標準庫中的一個模塊, 支持 Pyuthon 內置的數據結構, 但他是 Python 的專有協議. Celery 官方不推薦. |
| json | json 支持多種語言, 可用于跨語言方案. |
| yaml | yaml 表達能力更強, 支持的數據類型較 json 多, 但是 python 客戶端的性能不如 json |
| msgpack | 二進制的類 json 序列化方案, 但比 json 的數據結構更小, 更快. |
二. 安裝,配置與簡單示例
1.安裝
pip install celery, redis, msgpack
Celery 配置參數匯總
| 配置項 | 說明 |
|---|---|
| CELERY_DEFAULT_QUEUE | 默認隊列 |
| CELERY_BROKER_URL | Broker 地址 |
| CELERY_RESULT_BACKEND | 結果存儲地址 |
| CELERY_TASK_SERIALIZER | 任務序列化方式 |
| CELERY_RESULT_SERIALIZER | 任務執行結果序列化方式 |
| CELERY_TASK_RESULT_EXPIRES | 任務過期時間 |
| CELERY_ACCEPT_CONTENT | 指定任務接受的內容類型(序列化) |
詳情鏈接 https://blog.csdn.net/libing_thinking/article/details/78812472
2.初始demo
1.目錄

2.配置文件
CELERY_BROKER_URL = 'redis://localhost:6379/0' CELERY_RESULT_BACKEND = 'redis://localhost:6379/1' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TASK_RESULT_EXPIRES = 60 * 5 # 任務過期時間 CELERY_ACCEPT_CONTENT = ["json"] # 指定任務接受的內容類型.
3.task 文件
import time
from celery import Celery
from celery_test import celery_setting as CelerySetting
# 創建一個Celery實例,這就是我們用戶的應用app
app = Celery('celery_test.s1')
app.config_from_object(CelerySetting)
# 為應用創建任務,my_first_task
@app.task
def task_1(x, y):
# time.sleep(3)
return x + y
# celery worker -A celery_test.s1 -n task_1 -c 1 -l info -P gevent
@app.task
def tsum(numbers):
return sum(numbers)
# celery worker -A celery_test.s1 -n tsum -c 1 -l info -P gevent
@app.task
def mul(x, y):
return x * y
# celery worker -A celery_test.s1 -n mul -c 1 -l info -P gevent
4.調用任務及回調文件
from celery_test.s1 import task_1, tsum, app, mul
from celery import chain, chord, group
from celery.result import AsyncResult
# 1.將任務交給Celery的Worker執行
res = task_1.delay(1, 4)
print(res.id) # 返回任務ID
print(res.state) # 狀態 如果任務未完成 PENDING
print(res.status) # 狀態 如果任務未完成 none
print(res.successful()) # 是否成功 如果任務未完成 none
print(res.ready()) # 返回布爾值, 任務執行完成, 返回 True, 否則返回 False.
print(res.wait()) # 等待任務完成, 返回任務執行結果
print(res.get()) # 獲取任務執行結果
print(res.result) # 任務執行結果
# 2.查看結果 異步獲取任務返回值
async_task = AsyncResult(id="030df922-b7c3-44c9-b137-bf8e3d5e8eb5", app=app)
# 判斷異步任務是否執行成功
print(async_task.successful())
if async_task.successful():
print(async_task.get()) # 獲取異步任務的返回值
3.定義任務隊列

Celery 默認使用名為 celery 的隊列 (可以通過 CELERY_DEFAULT_QUEUE 修改) 來存放任務. 我們可以使用 優先級不同的隊列 來確保高優先級的任務優先執行
# coding:utf-8
from __future__ import absolute_import
from celery import Celery
from celery_queue import celery_setting as CelerySetting
from kombu import Queue
app = Celery('celery_queue.s1')
app.config_from_object(CelerySetting)
app.conf.update({
"CELERY_QUEUES": ( # 設置add隊列,綁定routing_key
Queue('queue_1', routing_key='queue_1', durable=False, max_priority=10), # durable:持久化 max_priorty:優先級
Queue('queue_2', routing_key='queue_2', durable=False),
Queue('queue_3', routing_key='queue_3'),
),
"CELERY_ROUTES": { # projq.tasks.add這個任務進去add隊列并routeing_key為queue_1
'celery_queue.s1.add': {
'queue': 'queue_1',
'routing_key': 'queue_1',
},
'celery_queue.s1.tsum': {
'queue': 'queue_2',
'routing_key': 'queue_2',
},
'celery_queue.s1.mul': {
'queue': 'queue_3',
'routing_key': 'queue_3',
}
}}
)
@app.task()
def add(x, y):
return x + y
# celery worker -A celery_queue.s1 -n add -c 1 -l info -P gevent
@app.task
def tsum(numbers):
return sum(numbers)
# celery worker -A celery_queue.s1 -n tsum -c 1 -l info -P gevent
@app.task
def mul(x, y):
return x * y
# celery worker -A celery_queue.s1 -n mul -c 1 -l info -P gevent
閱后即焚模式
from kombu import Queue
Queue('transient', routing_key='transient', delivery_mode=1)
4.簡單實際應用的例子

1. celery_setting.py
'''
celery配置
'''
task_acks_late = True
worker_prefetch_multiplier = 1
# 限制最大使用內存,限制celery執行10個任務,就銷毀重建
worker_max_memory_per_child = 150000
task_reject_on_worker_lost = True
broker_pool_limit = 300
timezone = "Asia/Shanghai"
broker_url = 'amqp://guest:guest@localhost:5672/{vhost}?heartbeat=0'
# 優先級參數必須加
celery_acks_late = True
celeryd_prefetch_multiplier = 1
2. my_task.py
import time
from celery import Celery
from my_celery import celery_setting
from kombu import Exchange, Queue
app = Celery('celery1.my_task')
app.config_from_object(celery_setting)
app.conf.update(
broker_url="amqp://guest:guest@localhost:5672/{vhost}?heartbeat=0".format(vhost="test")
)
app.conf.task_queues = [
Queue('priority_test_1', Exchange('default', type='direct'), routing_key='default',
queue_arguments={'x-max-priority': 10}),
Queue('priority_test_2', Exchange('default', type='direct'), routing_key='default',
queue_arguments={'x-max-priority': 10}),
]
@app.task(bind=True,
queue='priority_test_1', # 指定隊列名
# max_retries=10, # 最大重試
# default_retry_delay=600, # 重試間隔時間
autoretry_for=(TypeError, KeyError) # 指定重試錯誤
)
def priority_test_1(self, data):
# print(self)
print(data)
@app.task(bind=True,
queue='priority_test_2',
)
def priority_test_2(self, data):
try:
print(data["1"])
time.sleep(2)
except (TypeError, KeyError) as exc:
raise self.retry(exc=exc, countdown=60 * 5, max_retries=5)
3. add_task.py
from my_celery.my_task import priority_test_1, priority_test_2
# for i in range(1, 10):
# priority_test_1.delay({"name": f"{i}"})
# priority_test_2.s({"name": f"{i}"}).apply_async(priority=i)
priority_test_1.delay({"name": f"{123}"})
priority_test_2.delay({"name": f"{123}"})
5.任務調度
1. celery_setting.py
# coding:utf-8
from kombu import Queue
BROKER_URL = 'amqp://guest:guest@127.0.0.1:5672/test' # 使用RabbitMQ作為消息代理
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 把任務結果存在了Redis
# CELERY_TASK_SERIALIZER = 'msgpack' # 任務序列化和反序列化使用msgpack方案
CELERY_TASK_SERIALIZER = 'json' # 任務序列化和反序列化使用json方案
CELERY_RESULT_SERIALIZER = 'json' # 讀取任務結果一般性能要求不高,所以使用了可讀性更好的JSON
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過期時間,不建議直接寫86400,應該讓這樣的magic數字表述更明顯
CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的內容類型
CELERY_QUEUES = ( # 設置add隊列,綁定routing_key
Queue('queue_1', routing_key='queue_1', durable=False, max_priority=10),
Queue('queue_2', routing_key='queue_2', durable=False),
Queue('queue_3', routing_key='queue_3'),
)
CELERY_ROUTES = { # projq.tasks.add這個任務進去add隊列并routeing_key為queue_1
'crontab_celery.s1.add': {
'queue': 'queue_1',
'routing_key': 'queue_1',
},
'crontab_celery.s1.tsum': {
'queue': 'queue_2',
'routing_key': 'queue_2',
},
'crontab_celery.s1.mul': {
'queue': 'queue_3',
'routing_key': 'queue_3',
}
}
from celery.schedules import crontab
from datetime import timedelta
# imports = [
# 'crontab_celery.s1.add',
# 'crontab_celery.s1.tsum'
# 'crontab_celery.s1.mul'
# ]
# schedules定時任務
CELERYBEAT_SCHEDULE = {
'crontab_celery.s1.add': {
'task': 'crontab_celery.s1.add',
'schedule': timedelta(seconds=1), # 每 3 秒執行一次
'args': (2, 2) # 任務函數參數
},
'crontab_celery.s1.tsum': {
'task': 'crontab_celery.s1.tsum',
# 'schedule': crontab(minute="*/1"), # 每分執行一次
'schedule': timedelta(seconds=2), # 每 3 秒執行一次
'args': ([1, 2, 3, 4, 5],) # 任務函數參數
},
'crontab_celery.s1.mul': {
'task': 'crontab_celery.s1.mul',
'schedule': crontab(minute="*/1"), # 每分執行一次
'args': (3, 3) # 任務函數參數
}
}
# celery -A crontab_celery.s1 beat # 定時任務啟動
2.s1.py
# coding:utf-8
from __future__ import absolute_import
from celery import Celery
from crontab_celery import celery_setting
app = Celery('crontab_celery.s1')
app.config_from_object(celery_setting)
@app.task
def add(x, y):
return x + y
# celery worker -A crontab_celery.s1 -n add -Q queue_1 -c 1 -l info -P gevent
@app.task
def tsum(numbers):
return sum(numbers)
# celery worker -A crontab_celery.s1 -n tsum -Q queue_2 -c 1 -l info -P gevent
@app.task
def mul(x, y):
return x * y
# celery worker -A crontab_celery.s1 -n mul -Q queue_3 -c 1 -l info -P gevent
3.命令
# celery -A crontab_celery.s1 beat # 使用 Beat 進程自動生成任務 # celery worker -A crontab_celery.s1 -n add -Q queue_1 -c 1 -l info -P gevent # 指定任務 add 和隊列queue_1 # celery worker -A crontab_celery.s1 -n tsum -Q queue_2 -c 1 -l info -P gevent # 指定 tsum 和 queue_2 # celery worker -A crontab_celery.s1 -n mul -Q queue_3 -c 1 -l info -P gevent # 指定 mul 和queue_3
6.任務綁定, 記錄日志, 重試
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task(bind=True)
def div(self, x, y):
logger.info(('Executing task id {0.id}, args: {0.args!r}'
'kwargs: {0.kwargs!r}').format(self.request))
try:
result = x/y
except ZeroDivisionError as e:
raise self.retry(exc=e, countdown=5, max_retries=3) # 發生 ZeroDivisionError 錯誤時, 每 5s 重試一次, 最多重試 3 次.
return result
# 當使用 bind=True 參數之后, 函數的參數發生變化, 多出了參數 self, 這這相當于把 div 編程了一個已綁定的方法, 通過 self 可以獲得任務的上下文.
7.信號系統 :
信號可以幫助我們了解任務執行情況, 分析任務運行的瓶頸. Celery 支持 7 種信號類型.
- 任務信號
- before_task_publish : 任務發布前
- after_task_publish : 任務發布后
- task_prerun : 任務執行前
- task_postrun : 任務執行后
- task_retry : 任務重試時
- task_success : 任務成功時
- task_failure : 任務失敗時
- task_revoked : 任務被撤銷或終止時
- 應用信號
- Worker 信號
- Beat 信號
- Eventlet 信號
- 日志信號
- 命令信號
不同的信號參數格式不同, 具體格式參見官方文檔
代碼示例 :
from celery.signals import after_task_publish, task_success
@after_task_publish.connect(sender="crontab_celery.s1.add", )
def task_sent_2(sender=None, headers=None, body=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
info = headers if 'task' in headers else body
print(1111111111)
print('crontab_celery.s1.add for task id {info[id]}'.format(
info=info,
))
@after_task_publish.connect(sender="crontab_celery.s1.tsum", )
def task_sent_1(sender=None, headers=None, body=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
info = headers if 'task' in headers else body
print(22222222222)
print('crontab_celery.s1.tsum for task id {info[id]}'.format(
info=info,
))
重新啟動 app時候會發現加任務打印
8.子任務與工作流:
1.子任務
from crontab_celery.s1 import add
from celery import signature
# 1.通過簽名的方法傳給其他任務, 成為一個子任務
task = add.subtask((2, 2), countdown=10) # 快捷方式 add.s((2,2), countdown-10)
task.apply_async()
# 2.通過如下方式生成子任務
task = signature('crontab_celery.s1.mul', args=(2, 2), countdown=10) # 快捷方式 add.s((2,2), countdown-10)
task.apply_async()
# 3.實現偏函數的方式非常有用, 這種方式可以讓任務在傳遞過程中財傳入參數.
partial = add.s(2)
partial.apply_async((4,))
2.工作流
子任務支持如下 5 種原語,實現工作流. 原語表示由若干指令組成的, 用于完成一定功能的過程
from crontab_celery.s1 import add, tsum
from celery import chain
from celery import group
from celery import chord
# 1.chain : 調用連, 前面的執行結果, 作為參數傳給后面的任務, 直到全部完成, 類似管道.
res = chain(add.s(2, 3), add.s(5), add.s(10))()
res.get()
print(res.get(), "result") # 20
print(res.parent.get(), "result.parent") # 10
print(res.parent.parent.get(), "result.parent.parent") # 5
# 2.管道式:
res = (add.s(2, 2) | add.s(4) | add.s(8))().get()
# 3.group 組任務
print(group(add.s(i, i) for i in range(10))().get())
# [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
# 4.chord 復合任務函數 任務全部完成時添加一個回調任務
res = chord((add.s(i, i) for i in range(10)), tsum.s())()
print(res.get()) # 90
# 5.map/starmap : 每個參數都作為任務的參數執行一遍, map 的參數只有一個, starmap 支持多個參數.
res = add.starmap(zip(range(10), range(10)))()
print(res)
# 相當于
# @app.task
# def temp():
# return [add(i, i) for i in range(10)]
# 5.chunks : 將任務分塊 按10個分塊去處理
add_chunks = add.chunks(zip(range(100), range(100)), 10)
result = add_chunks.delay()
print(result.get())
res = add.chunks(zip(range(100), range(100)), 10)()
print(res.get())
# 使用rabbitmq隊列沒成功!
9. 其他
1.關閉不想要的功能
@app.task(ignore_result=True) # 關閉任務執行結果.
def func():
pass
CELERY_DISABLE_RATE_LIMITS=True # 關閉限速
2.根據任務狀態執行不同操作
# s1.py
class MyTask(Task):
def on_success(self, retval, task_id, args, kwargs):
print 'task done: {0}'.format(retval)
return super(MyTask, self).on_success(retval, task_id, args, kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
print 'task fail, reason: {0}'.format(exc)
return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)
# 正確函數, 執行 MyTask.on_success() :
@app.task(base=MyTask)
def add(x, y):
return x + y
# 錯誤函數, 執行 MyTask.on_failure() :
@app.task #普通函數裝飾為 celery task
def add(x, y):
raise KeyError
return x + y
10. 管理命令
任務狀態回調 :
| 參數 | 說明 |
|---|---|
| PENDING | 任務等待中 |
| STARTED | 任務已開始 |
| SUCCESS | 任務執行成功 |
| FAILURE | 任務執行失敗 |
| RETRY | 任務將被重試 |
| REVOKED | 任務取消 |
| PROGRESS | 任務進行中 |
1.普通啟動命令
celery -A proj worker -l info
2.使用deamon方式 multi
$ celery multi start web -A proj -l info --pidfile=/path/to/celery_%n.pid --logfile=/path/to/celery_%n.log
# web 是對項目啟動的標識,
# %n 是對節點的格式化用法.
%n : 只包含主機名
%h : 包含域名的主機
%d : 只包含域名
%i : Prefork 類型的進程索引,如果是主進程, 則為 0.
%I : 帶分隔符的 Prefork 類型的進程索引. 假設主進程為 worker1, 那么進程池的第一個進程則為 worker1-1
3.常用 multi 相關命令:
$ celery multi show web # 查看 web 啟動時的命令 $ celery multi names web # 獲取 web 的節點名字 $ celery multi stop web # 停止 web 進程 $ celery multi restart web # 重啟 web $ celery multi kill web # 殺掉 web 進程
4.常用監控和管理命令
# shell : 交互時環境, 內置了 Celery 應用實例和全部已注冊的任務, 支持 默認解釋器,IPython,BPython # celery shell -A proj # result : 通過 task_id 在命令行獲得任務執行結果 # celery -A proj result TASK_ID # inspect active : 列出當前正在執行的任務 # celery -A proj inspect active # inspect stats : 列出 worker 的統計數據, 常用來查看配置是否正確以及系統的使用情況. # celery -A proj inspect stats
5.Flower web 監控工具
查看任務歷史,任務具體參數,開始時間等信息;
提供圖表和統計數據
實現全面的遠程控制功能, 包括但不限于 撤銷/終止任務, 關閉重啟 worker, 查看正在運行任務
提供一個 HTTP API , 方便集成.
6.Flower 的 supervisor 管理配置文件:
[program:flower]
command=/opt/PyProjects/venv/bin/flower -A celery_worker:celery --broker="redis://localhost:6379/2" --address=0.0.0.0 --port=5555
directory=/opt/PyProjects/app
autostart=true
autorestart=true
startretries=3
user=derby
stdout_logfile=/var/logs/%(program_name)s.log
stdout_logfile_maxbytes=50MB
stdout_logfile_backups=30
stderr_logfile=/var/logs/%(program_name)s-error.log
stderr_logfile_maxbytes=50MB
stderr_logfile_backups=3
7.Celery 自帶的事件監控工具顯示任務歷史等信息
celery -A proj event
** 需要把 CELERY_SEND_TASK_SEND_EVENT = True 設置, 才可以獲取時間
8.使用自動擴展
celery -A proj worker -l info --autoscale=6,3 # 平時保持 3 個進程, 最大時可以達到 6 個.
9.celery命令匯總
celery --help Usage: celery <command> [options] 用法:celery 命令 選項 Show help screen and exit. 顯示幫助信息并退出。 Options(選項): -A APP, --app=APP app instance to use (e.g. module.attr_name)-本次操作使用的App實例。 -b BROKER, --broker=BROKER -消息代理(服務器),用于傳遞數據的URL。 url to broker. default is 'amqp://guest@localhost//' --loader=LOADER name of custom loader class to use. - 自定義載入類的名稱。 --config=CONFIG Name of the configuration module-配置模塊的名稱。 --workdir=WORKING_DIRECTORY- 工作目錄。 Optional directory to change to after detaching. -C, --no-color -非彩色顯示。 -q, --quiet -靜默執行。 --version show program's version number and exit-顯示版本號。 -h, --help show this help message and exit -顯示本幫助。 ---- -- - - ---- Commands-命令列表 -------------- --- ------------ + Main-主要命令: | celery worker | celery events | celery beat | celery shell | celery multi | celery amqp + Remote Control-遠程控制: | celery status | celery inspect --help | celery inspect active | celery inspect active_queues | celery inspect clock | celery inspect conf None | celery inspect memdump | celery inspect memsample | celery inspect objgraph None | celery inspect ping | celery inspect registered | celery inspect report | celery inspect reserved | celery inspect revoked | celery inspect scheduled | celery inspect stats | celery control --help | celery control add_consumer <queue> [exchange [type [routing_key]]] | celery control autoscale [max] [min] | celery control cancel_consumer <queue> | celery control disable_events | celery control enable_events | celery control pool_grow [N=1] | celery control pool_shrink [N=1] | celery control rate_limit <task_name> <rate_limit> (e.g. 5/s | 5/m | 5/h)> | celery control time_limit <task_name> <soft_secs> [hard_secs] + Utils-使用命令: | celery purge | celery list | celery migrate | celery call | celery result | celery report + Extensions: | celery flower ---- -- - - --------- -- - -------------- --- ------------ Type 'celery <command> --help' for help using a specific command. 鍵入'celery <command> --help'可以獲得指定的命令的更詳細的幫助信息。
11. 在falsk中使用
Flask 文檔: 基于 Celery 的后臺任務
在 Flask 中使用 Celery

浙公網安備 33010602011771號