<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      摘要:

      在了解celery的異步任務隊列之前我們來先了解下Node 似乎我與人聊過了node.js這個框架,他是一個簡單運行將js運行在服務器上的

      非阻塞的異步框架,一個線程就可以跑起整個項目,但是我想強調的是他的

      異步非阻塞和事件驅動!他是怎么把通過一個線程處理這么多請求的,處理一個請求不會阻塞嗎?請聽我一一道來!

      nodejs最大的優勢在于一個請求過來,他接入請求,并處理他,當事件遇到一些io操作的時候,就把他的事件丟在一個隊列里面,我們稱之為

      隊列(其實并不是棧區,棧是先進后出的概念,我們說他是隊列),然后依次排隊處理棧,等處理完之后又繼續處理請求,棧區發出一個完成信號的時候,繼續處理棧區內容,就做到異步非阻塞,而他的事件驅動就是這個隊列區完成io操作的信號驅使。

        Celery 是 Distributed Task Queue,分布式任務隊列,分布式決定了可以有多個 worker 的存在,隊列表示其是異步操作,即存在一個產生任務提出需求的工頭,和一群等著被分配工作的碼農。

              在 Python 中定義 Celery 的時候,我們要引入 Broker,中文翻譯過來就是“中間人”的意思,在這里 Broker 起到一個中間人的角色。在工頭提出任務的時候,把所有的任務放到 Broker 里面,在 Broker 的另外一頭,一群碼農等著取出一個個任務準備著手做。

       

              這種模式注定了整個系統會是個開環系統,工頭對于碼農們把任務做的怎樣是不知情的。所以我們要引入 Backend 來保存每次任務的結果。這個 Backend 有點像我們的 Broker,也是存儲任務的信息用的,只不過這里存的是那些任務的返回結果。我們可以選擇只讓錯誤執行的任務返回結果到 Backend,這樣我們取回結果,便可以知道有多少任務執行失敗了。

       

              Celery(芹菜)是一個異步任務隊列/基于分布式消息傳遞的作業隊列。它側重于實時操作但對調度支持也很好。Celery用于生產系統每天處理數以百萬計的任務。Celery是用Python編寫的,但該協議可以在任何語言實現。它也可以與其他語言通過webhooks實現。Celery建議的消息隊列是RabbitMQ,但提供有限支持Redis, Beanstalk, MongoDB, CouchDB, 和數據庫(使用SQLAlchemy的或Django的 ORM) 。Celery是易于集成Django, Pylons and Flask,使用 django-celery, celery-pylons and Flask-Celery 附加包即可。

       

      什么是celery

      目前不支持win平臺,但是我們處理后依舊可以使用

      聊完這個nodejs這個東西之后,我們引出了一個celery這個框架,他和nodejs相似但是他是一個簡單的可靠的分布式系統

      他專注于實時處理異步任務隊列 同時也支持任務調度

      celery三部分

      消息中間件   不是celery的自己創立而是第三方  redis(也支持在中間件里)

       

      broker是一個消息傳輸的中間件,可以理解為一個郵箱(概念)。每當應用程序調用celery的異步任務的時候,會向broker傳遞消息,而后celery的worker將會取到消息,進行對于的程序執行(怎么執行的)。好吧,這個郵箱可以看成是一個消息隊列。其中Broker的中文意思是 經紀人 ,其實就是一開始說的 消息隊列 ,用來發送和接受消息。這個Broker有幾個方案可供選擇:RabbitMQ (消息隊列),Redis(緩存數據庫),數據庫(不推薦),等等

       

      執行單元  工人 處理執行單元任務的函數

      消息結果存儲 可以理解數據庫 同樣的用第三方

       

      我們在哪里用?

      說實在的我們標題就能理解出 他是異步框架 所以適合大量io密集型的場景(或耗時任務

      已經他自己支持定時調度 比如每天凌晨把日志同步起來 或者更新下緩存

      好牛逼 在python咋用?

       

      pip install celery  下載這個框架 不建議使用Django的

      定義執行任務的函數

      #把我們的函數裝飾為異步執行任務
      import celery
      import time
      # broker='redis://127.0.0.1:6379/2' #版本不加密碼
      backend='redis://:123456@127.0.0.1:6379/1' #任務提交后放在這個庫里
      broker='redis://:123456@127.0.0.1:6379/2' #任務執行完后結果的db
      cel=celery.Celery('test',backend=backend,broker=broker) 
      #裝飾函數
      @cel.task
      def add(x,y):
          return x+y

       

       創造一個可以開啟任務功能的模塊

      from celery_app_task import add
      result = add.delay(4,5) #4,5為參數  #啟動任務
      print(result.id)  #拿到這個用于查看任務的CDK

      啟動我們的功能

      celery worker -A celery_app_task -l info  其他平臺
      celery worker -A celery_app_task -l info -P eventlet #window
      
      函數啟動
      from celery_app_task import cel
      if __name__ == '__main__':
          cel.worker_main()

      查看結果

      async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)#r任務序列號
      
      if async.successful():
          result = async.get()
          print(result)
          # result.forget() # 將結果刪除
      elif async.failed():
          print('執行失敗')
      elif async.status == 'PENDING':
          print('任務等待中被執行')
      elif async.status == 'RETRY':
          print('任務異常后正在重試')
      elif async.status == 'STARTED':
          print('任務已經開始被執行')

      多任務結果

      可能用戶任務 和訂單任務是分開的 所以我們需要多任務 ,之前一個肯定是不行了

      pro_cel
          ├── celery_task# celery相關文件夾
          │   ├── celery.py   # celery連接和配置相關文件,必須叫這個名字
          │   └── tasks1.py    #  所有任務函數
          │    └── tasks2.py    #  所有任務函數
          ├── check_result.py # 檢查結果
          └── send_task.py    # 觸發任務

      任務文件

      import time
      from celery_task.celery import cel
      
      #test1.py
      @cel.task
      def test_celery(res):
          time.sleep(5)
          return "test_celery任務結果:%s"%res
      
      #test2.py
      import time
      from celery_task.celery import cel
      @cel.task
      def test_celery2(res):
          time.sleep(5)
          return "test_celery2任務結果:%s"%res

       celery.py 任務分發作用

      from celery import Celery
      
      cel = Celery('celery_demo',
                   broker='redis://127.0.0.1:6379/1',
                   backend='redis://127.0.0.1:6379/2',
                   # 包含以下兩個任務文件,去相應的py文件中找任務,對多個任務做分類
                   include=['celery_task.tasks1',
                            'celery_task.tasks2'
                            ])
      
      # 時區
      cel.conf.timezone = 'Asia/Shanghai'
      # 是否使用UTC
      cel.conf.enable_utc = False

      拿到任務CDK

      from celery_task.tasks1 import test_celery
      from celery_task.tasks2 import test_celery2
      
      # 立即告知celery去執行test_celery任務,并傳入一個參數
      result = test_celery.delay('第一個的執行')
      print(result.id)
      result = test_celery2.delay('第二個的執行')
      print(result.id)

      查看結果

      from celery.result import AsyncResult
      from celery_task.celery import cel
      
      async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel)  #任務序列號
      
      if async.successful():
          result = async.get()
          print(result)
          # result.forget() # 將結果刪除,執行完成,結果不會自動刪除
          # async.revoke(terminate=True)  # 無論現在是什么時候,都要終止
          # async.revoke(terminate=False) # 如果任務還沒有開始執行呢,那么就可以終止。
      elif async.failed():
          print('執行失敗')
      elif async.status == 'PENDING':
          print('任務等待中被執行')
      elif async.status == 'RETRY':
          print('任務異常后正在重試')
      elif async.status == 'STARTED':
          print('任務已經開始被執行')

      開啟work

      celery worker -A celery_task -l info  -P  eventlet

      開啟任務

      send_task.py

      檢查任務執行結果

      (執行check_result.py)

      定時任務

      本質:使用apply_async并設定時間    參數eta傳遞要執行的事件

      from celery_app_task import add
      from datetime import datetime
      
      # 方式一
      # v1 = datetime(2019, 2, 13, 18, 19, 56)
      # print(v1)
      # v2 = datetime.utcfromtimestamp(v1.timestamp())
      # print(v2)
      # result = add.apply_async(args=[1, 3], eta=v2)
      # print(result.id)
      
      # 方式二
      ctime = datetime.now()
      # 默認用utc時間
      utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
      from datetime import timedelta
      time_delay = timedelta(seconds=10)
      task_time = utc_ctime + time_delay
      
      # 使用apply_async并設定時間
      result = add.apply_async(args=[4, 3], eta=task_time)
      print(result.id)

      像Linux的contab任務

      本質cel.conf.beat_schedule = { ‘任務1’:{‘tas’:'任務',schedule:"執行頻率",args:“參數”  } ,‘任務二’:{} }

       

       

      from datetime import timedelta
      from celery import Celery
      from celery.schedules import crontab
      
      cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[
          'celery_task.tasks1',
          'celery_task.tasks2',
      ])
      cel.conf.timezone = 'Asia/Shanghai'
      cel.conf.enable_utc = False
      
      cel.conf.beat_schedule = {
          # 名字隨意命名
          'add-every-10-seconds': {
              # 執行tasks1下的test_celery函數
              'task': 'celery_task.tasks1.test_celery',
              # 每隔2秒執行一次
              # 'schedule': 1.0,
              # 'schedule': crontab(minute="*/1"),
              'schedule': timedelta(seconds=2),
              # 傳遞參數
              'args': ('test',)
          },
          # 'add-every-12-seconds': {
          #     'task': 'celery_task.tasks1.test_celery',
          #     每年4月11號,8點42分執行
          #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
          #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
          #     'args': (16, 16)
          # },
      }

      啟動beat:

      celery beat -A celery_task -l info

      啟動work:

      celery worker -A celery_task -l info -P  eventlet

       

      Django 引入celeary

      在項目目錄下創建celeryconfig.py

      import djcelery
      djcelery.setup_loader()
      CELERY_IMPORTS=(
          'app01.tasks',
      )
      #有些情況可以防止死鎖
      CELERYD_FORCE_EXECV=True
      # 設置并發worker數量
      CELERYD_CONCURRENCY=4
      #允許重試
      CELERY_ACKS_LATE=True
      # 每個worker最多執行100個任務被銷毀,可以防止內存泄漏
      CELERYD_MAX_TASKS_PER_CHILD=100
      # 超時時間
      CELERYD_TASK_TIME_LIMIT=12*30

      settings.py

      INSTALLED_APPS = [
          ...
          'djcelery',
          'app01'
      ]
      from djagocele import celeryconfig
      BROKER_BACKEND='redis'
      BOOKER_URL='redis://127.0.0.1:6379/1'
      CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'

       任務.py

      from celery import task
      @task
      def sa():
          pass

       

      server/clery.py

      import celery
      import os
      
      os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings")
      
      import django
      django.setup()
      
      app = celery.Celery(main="celery_app", broker='redis://127.0.0.1:6379/2')
      
      app.autodiscover_tasks(['api'])

      celery -A server.celery  worker -l info

       

      from server.celery import app
      
      @app.task
      def Add(x, y):
          print("asdasda")
          return x + y
      

       

      posted on 2019-07-12 16:08  謝Rain  閱讀(617)  評論(1)    收藏  舉報
      主站蜘蛛池模板: 中文字幕一区二区久久综合| 亚洲嫩模喷白浆在线观看| 国产蜜臀久久av一区二区| 免费无码又爽又刺激高潮虎虎视频 | 亚洲成在人线AⅤ中文字幕| 唐人社导航福利精品| 久久婷婷综合色丁香五月| 国产午夜成人久久无码一区二区| 国产在线观看播放av| 成人午夜在线观看日韩| 久久精品久久电影免费理论片| 亚洲精品成人一二三专区| 99热精品毛片全部国产无缓冲| 成熟女人特级毛片www免费| 国产精品免费重口又黄又粗| 国产香蕉九九久久精品免费| 国产亚洲精品成人aa片新蒲金| 99精品视频在线观看免费蜜桃| 国产精品国产精品一区精品| 九九热视频在线免费观看 | 在线看无码的免费网站| 精品国产av无码一区二区三区| 不卡一区二区国产在线| 国产精品久久久久久影视| 欧美精品国产综合久久| 国产精品午夜福利在线观看| 亚洲欧美综合人成在线 | 少妇熟女高潮流白浆| 日韩成人一区二区二十六区| 护士张开腿被奷日出白浆| 无套中出极品少妇白浆| 成人精品视频一区二区三区| 五月天中文字幕mv在线| 強壮公弄得我次次高潮A片| 亚洲第一区二区快射影院| 亚洲成人精品综合在线| 国产精品十八禁一区二区| 东方四虎在线观看av| 性欧美大战久久久久久久| 吉川爱美一区二区三区视频| 不卡免费一区二区日韩av|