<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Celery分布式系統

      1.什么是Celery

      Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統

      專注于實時處理的異步任務隊列

      同時也支持任務調度

      2.Celery架構

      img

      Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。

      2.1消息中間件

      Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等

      2.2任務執行單元

      Worker是Celery提供的任務執行的單元,worker并發的運行在分布式的系統節點中。

      2.3任務結果存儲

      Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括AMQP, redis等

      2.4版本支持情況

      Celery version 4.0 runs on
              Python ?2.7, 3.4, 3.5?
              PyPy ?5.4, 5.5?
          This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.
      
          If you’re running an older version of Python, you need to be running an older version of Celery:
      
              Python 2.6: Celery series 3.1 or earlier.
              Python 2.5: Celery series 3.0 or earlier.
              Python 2.4 was Celery series 2.2 or earlier.
      
          Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
      

      3.使用場景

      異步任務:將耗時操作任務提交給Celery去異步執行,比如發送短信/郵件、消息推送、音視頻處理等等

      定時任務:定時執行某件事情,比如每天數據統計

      4.Celery的安裝配置

      pip install celery

      消息中間件:RabbitMQ/Redis

      app=Celery('任務名',backend='xxx',broker='xxx')

      5.Celery執行異步任務

      5.1基本使用

      創建項目celerytest

      創建py文件:celery_app_task.py

      import celery
      import time
      # broker='redis://127.0.0.1:6379/2' 不加密碼
      backend='redis://:123456@127.0.0.1:6379/1'
      broker='redis://:123456@127.0.0.1:6379/2'
      cel=celery.Celery('test',backend=backend,broker=broker)
      @cel.task
      def add(x,y):
          return x+y
      

      創建py文件:add_task.py,添加任務

      from celery_app_task import add
      result = add.delay(4,5)
      print(result.id)
      

      創建py文件:run.py,執行任務,或者使用命令執行:celery worker -A celery_app_task -l info

      注:windows下:celery worker -A celery_app_task -l info -P eventlet

      from celery_app_task import cel
      if __name__ == '__main__':
          cel.worker_main()
          # cel.worker_main(argv=['--loglevel=info')
      

      創建py文件:result.py,查看任務執行結果

      from celery.result import AsyncResult
      from celery_app_task import cel
      
      async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)
      
      if async.successful():
          result = async.get()
          print(result)
          # result.forget() # 將結果刪除
      elif async.failed():
          print('執行失敗')
      elif async.status == 'PENDING':
          print('任務等待中被執行')
      elif async.status == 'RETRY':
          print('任務異常后正在重試')
      elif async.status == 'STARTED':
          print('任務已經開始被執行')
      

      執行 add_task.py,添加任務,并獲取任務ID

      執行 run.py ,或者執行命令:celery worker -A celery_app_task -l info

      執行 result.py,檢查任務狀態并獲取結果

      5.2多任務結構

      pro_cel
          ├── celery_task# celery相關文件夾
          │   ├── celery.py   # celery連接和配置相關文件,必須叫這個名字
          │   └── tasks1.py    #  所有任務函數
          │   └── tasks2.py    #  所有任務函數
          ├── check_result.py # 檢查結果
          └── send_task.py    # 觸發任務
      

      celery.py

      from celery import Celery
      
      cel = Celery('celery_demo',
                   broker='redis://127.0.0.1:6379/1',
                   backend='redis://127.0.0.1:6379/2',
                   # 包含以下兩個任務文件,去相應的py文件中找任務,對多個任務做分類
                   include=['celery_task.tasks1',
                            'celery_task.tasks2'
                            ])
      
      # 時區
      cel.conf.timezone = 'Asia/Shanghai'
      # 是否使用UTC
      cel.conf.enable_utc = False
      

      tasks1.py

      import time
      from celery_task.celery import cel
      
      @cel.task
      def test_celery(res):
          time.sleep(5)
          return "test_celery任務結果:%s"%res
      

      tasks2.py

      import time
      from celery_task.celery import cel
      @cel.task
      def test_celery2(res):
          time.sleep(5)
          return "test_celery2任務結果:%s"%res
      

      check_result.py

      from celery.result import AsyncResult
      from celery_task.celery import cel
      
      async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel)
      
      if async.successful():
          result = async.get()
          print(result)
          # result.forget() # 將結果刪除,執行完成,結果不會自動刪除
          # async.revoke(terminate=True)  # 無論現在是什么時候,都要終止
          # async.revoke(terminate=False) # 如果任務還沒有開始執行呢,那么就可以終止。
      elif async.failed():
          print('執行失敗')
      elif async.status == 'PENDING':
          print('任務等待中被執行')
      elif async.status == 'RETRY':
          print('任務異常后正在重試')
      elif async.status == 'STARTED':
          print('任務已經開始被執行')
      

      send_task.py

      from celery_task.tasks1 import test_celery
      from celery_task.tasks2 import test_celery2
      
      # 立即告知celery去執行test_celery任務,并傳入一個參數
      result = test_celery.delay('第一個的執行')
      print(result.id)
      result = test_celery2.delay('第二個的執行')
      print(result.id)
      

      添加任務(執行send_task.py),開啟work:celery worker -A celery_task -l info -P eventlet,檢查任務執行結果(執行check_result.py)

      6.Celery執行定時任務

      6.1設定時間讓celery執行一個任務

      add_task.py

      from celery_app_task import add
      from datetime import datetime
      
      # 方式一
      # v1 = datetime(2019, 2, 13, 18, 19, 56)
      # print(v1)
      # v2 = datetime.utcfromtimestamp(v1.timestamp())
      # print(v2)
      # result = add.apply_async(args=[1, 3], eta=v2)
      # print(result.id)
      
      # 方式二
      ctime = datetime.now()
      # 默認用utc時間
      utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
      from datetime import timedelta
      time_delay = timedelta(seconds=10) # 10秒
      task_time = utc_ctime + time_delay # utc時間 + 延時時間10秒
      
      # 使用apply_async并設定時間
      result = add.apply_async(args=[4, 3], eta=task_time)
      print(result.id)
      

      6.2類似于contab的定時任務

      多任務結構中celery.py修改如下

      from datetime import timedelta
      from celery import Celery
      from celery.schedules import crontab
      
      cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[
          'celery_task.tasks1',
          'celery_task.tasks2',
      ])
      cel.conf.timezone = 'Asia/Shanghai'
      cel.conf.enable_utc = False
      
      cel.conf.beat_schedule = {
          # 名字隨意命名
          'add-every-10-seconds': {
              # 執行tasks1下的test_celery函數
              'task': 'celery_task.tasks1.test_celery',
              # 每隔2秒執行一次
              # 'schedule': 1.0,
              # 'schedule': crontab(minute="*/1"),
              'schedule': timedelta(seconds=2),
              # 傳遞參數
              'args': ('test',)
          },
          # 'add-every-12-seconds': {
          #     'task': 'celery_task.tasks1.test_celery',
          #     每年4月11號,8點42分執行
          #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
          #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
          #     'args': (16, 16)
          # },
      }
      

      啟動一個beat:celery beat -A celery_task -l info

      啟動work執行:celery worker -A celery_task -l info -P eventlet

      7.Django中使用Celery

      安裝包

      celery==3.1.25
      django-celery==3.1.20
      

      7.1在項目目錄下創建celeryconfig.py

      import djcelery
      djcelery.setup_loader()
      CELERY_IMPORTS=(
          'app01.tasks',
      )
      #有些情況可以防止死鎖
      CELERYD_FORCE_EXECV=True
      # 設置并發worker數量
      CELERYD_CONCURRENCY=4
      #允許重試
      CELERY_ACKS_LATE=True
      # 每個worker最多執行100個任務被銷毀,可以防止內存泄漏
      CELERYD_MAX_TASKS_PER_CHILD=100
      # 超時時間
      CELERYD_TASK_TIME_LIMIT=12*30
      

      在app01目錄下創建tasks.py

      from celery import task
      @task
      def add(a,b):
          with open('a.text', 'a', encoding='utf-8') as f:
              f.write('a')
          print(a+b)
      

      視圖函數views.py

      from django.shortcuts import render,HttpResponse
      from app01.tasks import add
      from datetime import datetime
      def test(request):
          # result=add.delay(2,3)
          ctime = datetime.now()
          # 默認用utc時間
          utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
          from datetime import timedelta
          time_delay = timedelta(seconds=5)
          task_time = utc_ctime + time_delay
          result = add.apply_async(args=[4, 3], eta=task_time)
          print(result.id)
          return HttpResponse('ok')
      

      settings.py

      INSTALLED_APPS = [
          ...
          'djcelery',
          'app01'
      ]
      
      ...
      
      from djagocele import celeryconfig
      BROKER_BACKEND='redis'
      BOOKER_URL='redis://127.0.0.1:6379/1'
      CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'
      

      參考:https://home.cnblogs.com/u/liuqingzheng/

      posted @ 2019-08-05 13:52  丶小白吖  閱讀(454)  評論(1)    收藏  舉報
      主站蜘蛛池模板: 久久精品国产99国产精品亚洲 | 鲁丝片一区二区三区免费| 女女互揉吃奶揉到高潮视频| 津市市| 亚洲精品成人老司机影视| 国产伦精品一区二区三区| 欧洲无码一区二区三区在线观看 | 国产精品高清中文字幕| 亚洲无线看天堂av| 国产精品播放一区二区三区| 欧美丝袜高跟鞋一区二区| 少妇人妻偷人一区二区| 国产三级精品三级在线看| 国产精品午夜av福利| 真人抽搐一进一出视频| 男人一天堂精品国产乱码| 国产一区二区三区18禁| 国产麻豆剧传媒精品国产av| 欧美国产成人久久精品| 亚洲精品中文字幕二区| 亚洲a∨无码无在线观看| 国产成人啪精品视频免费网 | 精品一区二区久久久久久久网站| 成人爽A毛片在线视频淮北| 131mm少妇做爰视频| 亚洲第一成人网站| 四虎库影成人在线播放| 春色校园综合人妻av| 99久久免费精品国产色| 中文字幕国产原创国产| 阿瓦提县| 丝袜美腿视频一区二区三区 | 久久高清超碰AV热热久久| 国产免费高清69式视频在线观看| 成人综合婷婷国产精品久久蜜臀 | 精品人妻伦九区久久aaa片| 久热色精品在线观看视频| 久热这里只有精品12| 男人下部进女人下部视频| 国产精品毛片在线完整版| 人妻少妇偷人无码视频|