摘要:
在了解celery的異步任務隊列之前我們來先了解下Node 似乎我與人聊過了node.js這個框架,他是一個簡單運行將js運行在服務器上的
非阻塞的異步框架,一個線程就可以跑起整個項目,但是我想強調的是他的
異步非阻塞和事件驅動!他是怎么把通過一個線程處理這么多請求的,處理一個請求不會阻塞嗎?請聽我一一道來!
nodejs最大的優勢在于一個請求過來,他接入請求,并處理他,當事件遇到一些io操作的時候,就把他的事件丟在一個隊列里面,我們稱之為
隊列(其實并不是棧區,棧是先進后出的概念,我們說他是隊列),然后依次排隊處理棧,等處理完之后又繼續處理請求,棧區發出一個完成信號的時候,繼續處理棧區內容,就做到異步非阻塞,而他的事件驅動就是這個隊列區完成io操作的信號驅使。
Celery 是 Distributed Task Queue,分布式任務隊列,分布式決定了可以有多個 worker 的存在,隊列表示其是異步操作,即存在一個產生任務提出需求的工頭,和一群等著被分配工作的碼農。
在 Python 中定義 Celery 的時候,我們要引入 Broker,中文翻譯過來就是“中間人”的意思,在這里 Broker 起到一個中間人的角色。在工頭提出任務的時候,把所有的任務放到 Broker 里面,在 Broker 的另外一頭,一群碼農等著取出一個個任務準備著手做。
這種模式注定了整個系統會是個開環系統,工頭對于碼農們把任務做的怎樣是不知情的。所以我們要引入 Backend 來保存每次任務的結果。這個 Backend 有點像我們的 Broker,也是存儲任務的信息用的,只不過這里存的是那些任務的返回結果。我們可以選擇只讓錯誤執行的任務返回結果到 Backend,這樣我們取回結果,便可以知道有多少任務執行失敗了。
Celery(芹菜)是一個異步任務隊列/基于分布式消息傳遞的作業隊列。它側重于實時操作,但對調度支持也很好。Celery用于生產系統每天處理數以百萬計的任務。Celery是用Python編寫的,但該協議可以在任何語言實現。它也可以與其他語言通過webhooks實現。Celery建議的消息隊列是RabbitMQ,但提供有限支持Redis, Beanstalk, MongoDB, CouchDB, 和數據庫(使用SQLAlchemy的或Django的 ORM) 。Celery是易于集成Django, Pylons and Flask,使用 django-celery, celery-pylons and Flask-Celery 附加包即可。
什么是celery
目前不支持win平臺,但是我們處理后依舊可以使用
聊完這個nodejs這個東西之后,我們引出了一個celery這個框架,他和nodejs相似但是他是一個簡單的可靠的分布式系統
他專注于實時處理異步任務隊列 同時也支持任務調度
celery三部分
消息中間件 不是celery的自己創立而是第三方 redis(也支持在中間件里)
broker是一個消息傳輸的中間件,可以理解為一個郵箱(概念)。每當應用程序調用celery的異步任務的時候,會向broker傳遞消息,而后celery的worker將會取到消息,進行對于的程序執行(怎么執行的)。好吧,這個郵箱可以看成是一個消息隊列。其中Broker的中文意思是 經紀人 ,其實就是一開始說的 消息隊列 ,用來發送和接受消息。這個Broker有幾個方案可供選擇:RabbitMQ (消息隊列),Redis(緩存數據庫),數據庫(不推薦),等等
執行單元 工人 處理執行單元任務的函數
消息結果存儲 可以理解數據庫 同樣的用第三方
我們在哪里用?
說實在的我們標題就能理解出 他是異步框架 所以適合大量io密集型的場景(或耗時任務)
已經他自己支持定時調度 比如每天凌晨把日志同步起來 或者更新下緩存
好牛逼 在python咋用?

pip install celery 下載這個框架 不建議使用Django的
定義執行任務的函數
#把我們的函數裝飾為異步執行任務 import celery import time # broker='redis://127.0.0.1:6379/2' #版本不加密碼 backend='redis://:123456@127.0.0.1:6379/1' #任務提交后放在這個庫里 broker='redis://:123456@127.0.0.1:6379/2' #任務執行完后結果的db cel=celery.Celery('test',backend=backend,broker=broker) #裝飾函數 @cel.task def add(x,y): return x+y
創造一個可以開啟任務功能的模塊
from celery_app_task import add result = add.delay(4,5) #4,5為參數 #啟動任務 print(result.id) #拿到這個用于查看任務的CDK
啟動我們的功能
celery worker -A celery_app_task -l info 其他平臺 celery worker -A celery_app_task -l info -P eventlet #window 函數啟動 from celery_app_task import cel if __name__ == '__main__': cel.worker_main()
查看結果
async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)#r任務序列號 if async.successful(): result = async.get() print(result) # result.forget() # 將結果刪除 elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中被執行') elif async.status == 'RETRY': print('任務異常后正在重試') elif async.status == 'STARTED': print('任務已經開始被執行')
多任務結果
可能用戶任務 和訂單任務是分開的 所以我們需要多任務 ,之前一個肯定是不行了
pro_cel ├── celery_task# celery相關文件夾 │ ├── celery.py # celery連接和配置相關文件,必須叫這個名字 │ └── tasks1.py # 所有任務函數 │ └── tasks2.py # 所有任務函數 ├── check_result.py # 檢查結果 └── send_task.py # 觸發任務
任務文件
import time from celery_task.celery import cel #test1.py @cel.task def test_celery(res): time.sleep(5) return "test_celery任務結果:%s"%res #test2.py import time from celery_task.celery import cel @cel.task def test_celery2(res): time.sleep(5) return "test_celery2任務結果:%s"%res
celery.py 任務分發作用
from celery import Celery cel = Celery('celery_demo', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', # 包含以下兩個任務文件,去相應的py文件中找任務,對多個任務做分類 include=['celery_task.tasks1', 'celery_task.tasks2' ]) # 時區 cel.conf.timezone = 'Asia/Shanghai' # 是否使用UTC cel.conf.enable_utc = False
拿到任務CDK
from celery_task.tasks1 import test_celery from celery_task.tasks2 import test_celery2 # 立即告知celery去執行test_celery任務,并傳入一個參數 result = test_celery.delay('第一個的執行') print(result.id) result = test_celery2.delay('第二個的執行') print(result.id)
查看結果
from celery.result import AsyncResult from celery_task.celery import cel async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel) #任務序列號 if async.successful(): result = async.get() print(result) # result.forget() # 將結果刪除,執行完成,結果不會自動刪除 # async.revoke(terminate=True) # 無論現在是什么時候,都要終止 # async.revoke(terminate=False) # 如果任務還沒有開始執行呢,那么就可以終止。 elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中被執行') elif async.status == 'RETRY': print('任務異常后正在重試') elif async.status == 'STARTED': print('任務已經開始被執行')
開啟work
celery worker -A celery_task -l info -P eventlet
開啟任務
send_task.py
檢查任務執行結果
(執行check_result.py)
定時任務
本質:使用apply_async并設定時間 參數eta傳遞要執行的事件
from celery_app_task import add from datetime import datetime # 方式一 # v1 = datetime(2019, 2, 13, 18, 19, 56) # print(v1) # v2 = datetime.utcfromtimestamp(v1.timestamp()) # print(v2) # result = add.apply_async(args=[1, 3], eta=v2) # print(result.id) # 方式二 ctime = datetime.now() # 默認用utc時間 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=10) task_time = utc_ctime + time_delay # 使用apply_async并設定時間 result = add.apply_async(args=[4, 3], eta=task_time) print(result.id)
像Linux的contab任務
本質cel.conf.beat_schedule = { ‘任務1’:{‘tas’:'任務',schedule:"執行頻率",args:“參數” } ,‘任務二’:{} }
from datetime import timedelta from celery import Celery from celery.schedules import crontab cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[ 'celery_task.tasks1', 'celery_task.tasks2', ]) cel.conf.timezone = 'Asia/Shanghai' cel.conf.enable_utc = False cel.conf.beat_schedule = { # 名字隨意命名 'add-every-10-seconds': { # 執行tasks1下的test_celery函數 'task': 'celery_task.tasks1.test_celery', # 每隔2秒執行一次 # 'schedule': 1.0, # 'schedule': crontab(minute="*/1"), 'schedule': timedelta(seconds=2), # 傳遞參數 'args': ('test',) }, # 'add-every-12-seconds': { # 'task': 'celery_task.tasks1.test_celery', # 每年4月11號,8點42分執行 # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # 'args': (16, 16) # }, }
啟動beat:
celery beat -A celery_task -l info
啟動work:
celery worker -A celery_task -l info -P eventlet
Django 引入celeary
在項目目錄下創建celeryconfig.py
import djcelery djcelery.setup_loader() CELERY_IMPORTS=( 'app01.tasks', ) #有些情況可以防止死鎖 CELERYD_FORCE_EXECV=True # 設置并發worker數量 CELERYD_CONCURRENCY=4 #允許重試 CELERY_ACKS_LATE=True # 每個worker最多執行100個任務被銷毀,可以防止內存泄漏 CELERYD_MAX_TASKS_PER_CHILD=100 # 超時時間 CELERYD_TASK_TIME_LIMIT=12*30
settings.py
INSTALLED_APPS = [ ... 'djcelery', 'app01' ] from djagocele import celeryconfig BROKER_BACKEND='redis' BOOKER_URL='redis://127.0.0.1:6379/1' CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'
任務.py
from celery import task @task def sa(): pass
server/clery.py
import celery import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings") import django django.setup() app = celery.Celery(main="celery_app", broker='redis://127.0.0.1:6379/2') app.autodiscover_tasks(['api'])
celery -A server.celery worker -l info
from server.celery import app
@app.task
def Add(x, y):
print("asdasda")
return x + y

浙公網安備 33010602011771號