<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      [源碼解析] 并行分布式框架 Celery 之架構 (1)

      [源碼解析] 并行分布式框架 Celery 之架構 (1)

      0x00 摘要

      Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統,專注于實時處理的異步任務隊列,同時也支持任務調度。

      前面我們用幾篇文章分析了 Kombu,為 Celery 的分析打下了基礎。

      [源碼分析] 消息隊列 Kombu 之 mailbox

      [源碼分析] 消息隊列 Kombu 之 Hub

      [源碼分析] 消息隊列 Kombu 之 Consumer

      [源碼分析] 消息隊列 Kombu 之 Producer

      [源碼分析] 消息隊列 Kombu 之 啟動過程

      [源碼解析] 消息隊列 Kombu 之 基本架構

      本系列將繼續通過源碼分析,和大家一起深入學習 Celery。本文是系列第一篇,借鑒了幾位網友的大作,按照自己的理解再重新整理,遂得此文。

      0x01 Celery 簡介

      1.1 什么是 Celery

      Celery是Python世界中最受歡迎的后臺工作管理者之一。它是一個簡單、靈活且可靠的,處理大量消息的分布式系統,專注于實時處理的異步任務隊列,同時也支持任務調度。

      利用多線程,如Eventlet,gevent等,Celery的任務能被并發地執行在單個或多個工作服務器(worker servers)上。任務能異步執行(后臺運行)或同步執行(等待任務完成)。Celery用于生產系統時候每天可以處理數以百萬計的任務。

      Celery是用Python編寫的,但該協議可以在任何語言實現。它也可以與其他語言通過webhooks實現。

      Celery建議的消息隊列是RabbitMQ,但也支持Redis, Beanstalk, MongoDB, CouchDB, 和數據庫(使用SQLAlchemy的或Django的 ORM) 。并且可以同時充當生產者和消費者。

      1.2 場景

      使用Celery的常見場景如下:

      • Web應用。當用戶觸發的一個操作需要較長時間才能執行完成時,可以把它作為任務交給Celery去異步執行,執行完再返回給用戶。這段時間用戶不需要等待,提高了網站的整體吞吐量和響應時間。

      • 定時任務。生產環境經常會跑一些定時任務。假如你有上千臺的服務器、上千種任務,定時任務的管理很困難,Celery可以幫助我們快速在不同的機器設定不同種任務。

      • 同步完成的附加工作都可以異步完成。比如發送短信/郵件、推送消息、清理/設置緩存等。

      1.3 特性

      Celery提供了如下的特性:

      • 方便地查看定時任務的執行情況,比如執行是否成功、當前狀態、執行任務花費的時間等。

      • 可以使用功能齊備的管理后臺或者命令行添加、更新、刪除任務。

      • 方便把任務和配置管理相關聯。

      • 可選多進程、Eventlet 和 Gevent 三種模式并發執行。

      • 提供錯誤處理機制。

      • 提供多種任務原語,方便實現任務分組、拆分和調用鏈。

      • 支持多種消息代理和存儲后端。

      1.4 區別

      消息隊列和任務隊列,最大的不同之處就在于理念的不同 -- 消息隊列傳遞的是“消息”,任務隊列傳遞的是“任務”

      • 消息隊列用來快速消費隊列中的消息。消息隊列更側重于消息的吞吐、處理,具有有處理海量信息的能力。另外利用消息隊列的生長者和消費者的概念,也可以實現任務隊列的功能,但是還需要進行額外的開發。
      • 任務隊列是用來執行一個耗時任務。任務隊列則提供了執行任務所需的功能,比如任務的重試,結果的返回,任務狀態記錄等。雖然也有并發的處理能力,但一般不適用于高吞吐量快速消費的場景。

      0x02 Celery的架構

      Celery 的基本邏輯為:分布式異步消息任務隊列。

      在 Celery 中,采用的是分布式的管理方式,每個節點之間都是通過廣播/單播進行通信,從而達到協同效果。實際上,只有部分輔助管理功能才會協同,基礎業務功能反而沒有借助協同

      2.1 組件

      Celery包含如下組件:

      • Celery Beat:任務調度器,Beat進程會讀取配置文件的內容,周期性地將配置中到期需要執行的任務發送給任務隊列。

      • Celery Worker:執行任務的消費者,通常會在多臺服務器運行多個消費者來提高執行效率。

      • Broker:消息代理,或者叫作消息中間件,接受任務生產者發送過來的任務消息,存進隊列再按序分發給任務消費方(通常是消息隊列或者數據庫)。

      • Producer:調用了Celery提供的API、函數或者裝飾器而產生任務并交給任務隊列處理的都是任務生產者。

      • Result Backend:任務處理完后保存狀態信息和結果,以供查詢。Celery默認已支持 Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy 等方式。

      再理解一下:

      • 系統可以有多個"消息隊列"(message Queue),不同的消息可以指定發送給不同的Message Queue。
      • 上述功能是通過Exchange來實現的,發送消息到"消息隊列"中時,可以指定 routing_key,Exchange 通過routing_key 來把消息路由(routes)到不同的"消息隊列"中去(Celery的底層依賴Kombu,里面涉及Exchange)。
      • exchange 對應 一個消息隊列(queue),即:通過 "消息路由" 的機制使exchange對應queue,每個queue對應每個worker。

      2.2 任務流程

      Celery 通過消息機制進行通信,通常使用中間人(Broker)作為客戶端和職程(Worker)調節。啟動一個任務的流程是:

      • 客戶端向消息隊列發送一條消息;
      • 然后中間人(Broker)將消息傳遞給一個職程(Worker),支持RabbitMQ、Redis等作為Broker。;
      • 最后由職程(Worker)進行執行中間人(Broker)分配的任務;

      2.3 架構圖

      Celery的架構圖如下所示:

       +-----------+            +--------------+
       | Producer  |            |  Celery Beat |
       +-------+---+            +----+---------+
               |                     |
               |                     |
               v                     v
      
             +-------------------------+
             |          Broker         |
             +------------+------------+
                          |
                          |
                          |
           +-------------------------------+
           |              |                |
           v              v                v
      +----+-----+   +----+------+   +-----+----+
      | Exchange |   |  Exchange |   | Exchange |
      +----+-----+   +----+------+   +----+-----+
           |              |               |
           v              v               v
      
        +-----+       +-------+       +-------+
        |queue|       | queue |       | queue |
        +--+--+       +---+---+       +---+---+
           |              |               |
           |              |               |
           v              v               v
      
      +---------+     +--------+     +----------+
      | worker  |     | Worker |     |  Worker  |
      +-----+---+     +---+----+     +----+-----+
            |             |               |
            |             |               |
            +-----------------------------+
                          |
                          |
                          v
                      +---+-----+
                      | backend |
                      +---------+
      
      

      0x03 Celery 設計推理

      目前我們得到如下信息:

      • Celery 的基本邏輯為:分布式異步消息任務隊列;
      • Celery底層依賴 Kombu,基于 Kombu 完成基本功能;
      • 之前我們通過若干文章,基本了解了 Kombu 的大致邏輯;

      下面我們就需要依據 Kombu來推論 Celery 應該如何設計。

      3.1 Celery 基本功能

      首先,我們看看為了完成基本功能,Celery 應該具備哪些組件(模塊),我們會提出一些問題,這些問題將在后續的分析中陸續得到解答

      因為Celery 的基本邏輯為:分布式異步消息任務隊列,所以Celery包含如下基礎組件:

      • Producer:需要有一個組件完成如下功能 :把用戶定義的代碼打包整合成任務提交給任務隊列處理。問題就在于:
        • 對于任務,也就是task如何處理?
        • task的本質是什么?
        • task 應該包括哪些功能?
        • 如果task是函數,如何把task函數傳遞給服務端?如果task函數內容很大怎么辦?
        • 如何把task相關信息從客戶端傳遞到服務端?
      • Broker:為了解耦合,需要有一個中間組件來緩存消息。這就是 消息代理,或者叫作消息中間件。其作用是接受任務生產者發送過來的任務消息,存進隊列再按序分發給任務消費方(通常是消息隊列或者數據庫)。問題在于:
        • 如何區分不同的消息來源,即如何路由?
        • 是否有容錯機制?
      • Worker:需要有一個組件來執行任務,這就是 Worker:
        • Worker 需要從 broker 接受任務。這就需要一個consumer,問題就是:Consumer 如何從 broker 獲取消息
        • 接受任務之后,Worker 需要了解任務,知道怎么執行任務,執行任務。所以有一個問題:Worker 怎么知道 client 端的任務?
        • 通常會在多臺服務器運行多個 worker 來提高執行效率。這就涉及到一個問題:多個 worker 之間如何協調?如何在多個 Worker 之間分配任務?
      • Result Backend:任務處理完后保存狀態信息和結果,以供查詢。Celery默認已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。

      3.2 Celery 輔助功能

      以上為基礎功能,但是作為分布式異步消息任務隊列,我們還需要輔助功能(以及相關問題),比如。

      • 用于執行定時任務的timer;

      • 需要處理監控事件;

      • 如何通過遠程命令管理;

      • worker 出現問題,如何處理;

      • 如何提高并發?

      • 如何封裝amqp?

      • 如何進行消息循環引擎?

      • 以上功能哪些屬于帶有分布式特點的?

      3.3 如何劃分

      進一步問題是:這些輔助功能是作為基礎功能模塊的一部分?還是獨立出來成為一個功能模塊?

      這其實是一個哲學問題,每種實現都有其道理,或者說,很多決定其實就是作者靈光一現(臨時拍腦袋)的產物。

      比如我們后面提到的 Consumer 組件,表面上看,就是一個從broker獲取消息的功能模塊,直接使用 kombu 的 consumer 就可以做到。

      但是實際上,celery Consumer 組件的概念遠遠要大于Kombu的Consumer,不只是利用了Kombu的Consumer從broker取得消息。也包括消息的消費,分發,監控,心跳等一系列功能。可以說,除了消息循環引擎 被 hub 承擔,多進程被 Pool,Autoscaler 承擔,定時任務被 timer,beat 承擔之外,其他主要功能都被 Consumer 承擔。

      因此,我們需要看看:

      • 哪些組件可以利用 Kombu直接完成,哪些需要Celery自己重新設計。

      • 若重新設計,哪些可以基于Kombu設計,如何調用相應Kombu模塊。

      • 若使用Kombu模塊作為Celery模塊的變量,這些Kombu模塊分別屬于哪些Celery模塊。

      0x04 對 AMQP / Kombu 的封裝

      Celery如果想成為消息處理系統,首先需要解決消息協議和消息傳輸問題。

      • 消息協議由 AMQP(Advanced Message Queuing Protocol:高級消息隊列協議)解決。Celery 支持所有AMQP路由機制,可以通過配置的方式,執行相關的消息路由。
      • 消息實現和傳輸由 Kombu 解決。由之前對 Kombu 的分析我們知道,Kombu 的定位是一個兼容 AMQP 協議的消息隊列抽象,是一個把消息傳遞封裝成統一接口的庫。

      所以我們首先看看如何封裝 AMQP / Kombu。

      具體封裝是在 celery/app/amqp.py 文件中,其中主要有兩個類:AMQP 和 Queues。

      4.1 封裝

      AMQP類的功能是 發送/接受消息,是對amqp協議實現的再一次封裝,在這里其實就是對 kombu 類的再一次封裝。

      我們可以看到,其內部成員變量都是來自于 Kombu。比如 Connection, Consumer, Exchange, Producer, Queue, pools。

      from kombu import Connection, Consumer, Exchange, Producer, Queue, pools
      
      class AMQP:
          """App AMQP API: app.amqp."""
      
          Connection = Connection
          Consumer = Consumer
          Producer = Producer
      
          #: compat alias to Connection
          BrokerConnection = Connection
      
          queues_cls = Queues
      
          #: Cached and prepared routing table.
          _rtable = None
      
          #: Underlying producer pool instance automatically
          #: set by the :attr:`producer_pool`.
          _producer_pool = None
      
          # Exchange class/function used when defining automatic queues.
          # For example, you can use ``autoexchange = lambda n: None`` to use the
          # AMQP default exchange: a shortcut to bypass routing
          # and instead send directly to the queue named in the routing key.
          autoexchange = None
      

      為了更好的理解,我們打印出amqp類的具體內容來看看。

      amqp = {AMQP}  
       BrokerConnection = {type} <class 'kombu.connection.Connection'>
       Connection = {type} <class 'kombu.connection.Connection'>
       Consumer = {type} <class 'kombu.messaging.Consumer'>
       Producer = {type} <class 'kombu.messaging.Producer'>
       app = {Celery} <Celery myTest at 0x252bd2903c8>
       autoexchange = {NoneType} None
       default_exchange = {Exchange} Exchange celery(direct)
       default_queue = {Queue} <unbound Queue celery -> <unbound Exchange celery(direct)> -> celery>
       producer_pool = {ProducerPool} <kombu.pools.ProducerPool object at 0x00000252BDC8F408>
       publisher_pool = {ProducerPool} <kombu.pools.ProducerPool object at 0x00000252BDC8F408>
       queues = {Queues: 1} {'celery': <unbound Queue celery -> <unbound Exchange celery(direct)> -> celery>}
       queues_cls = {type} <class 'celery.app.amqp.Queues'>
       router = {Router} <celery.app.routes.Router object at 0x00000252BDC6B248>
       routes = {tuple: 0} ()
       task_protocols = {dict: 2} {1: <bound method AMQP.as_task_v1 of <celery.app.amqp.AMQP object at 0x00000252BDC74148>>, 2: <bound method AMQP.as_task_v2 of <celery.app.amqp.AMQP object at 0x00000252BDC74148>>}
        _event_dispatcher = {EventDispatcher} <celery.events.dispatcher.EventDispatcher object at 0x00000252BE750348>
        _producer_pool = {ProducerPool} <kombu.pools.ProducerPool object at 0x00000252BDC8F408>
        _rtable = {tuple: 0} ()
      

      具體邏輯如下:

      +---------+
      | Celery  |    +----------------------------+
      |         |    |   celery.app.amqp.AMQP     |
      |         |    |                            |
      |         |    |                            |
      |         |    |          BrokerConnection +----->  kombu.connection.Connection
      |         |    |                            |
      |   amqp+----->+          Connection       +----->  kombu.connection.Connection
      |         |    |                            |
      +---------+    |          Consumer         +----->  kombu.messaging.Consumer
                     |                            |
                     |          Producer         +----->  kombu.messaging.Producer
                     |                            |
                     |          producer_pool    +----->  kombu.pools.ProducerPool
                     |                            |
                     |          queues           +----->  celery.app.amqp.Queues
                     |                            |
                     |          router           +----->  celery.app.routes.Router
                     +----------------------------+
      

      4.2 Queues

      Queues 則是一個擴展,一個邏輯概念,可以認為是 Broker 概念的進一步縮減版

      Producer 把任務發送給 Queues,Worker 從 Queues 獲取任務,進行消費。

      app.amqp.queues 就是 Queues 的一個實例,在其中存儲了本 Worker 可以讀取的所有 kombu.Queue。

      class Queues(dict):
          """Queue name? declaration mapping.
      
          Arguments:
              queues (Iterable): Initial list/tuple or dict of queues.
              create_missing (bool): By default any unknown queues will be
                  added automatically, but if this flag is disabled the occurrence
                  of unknown queues in `wanted` will raise :exc:`KeyError`.
              max_priority (int): Default x-max-priority for queues with none set.
          """
      
          #: If set, this is a subset of queues to consume from.
          #: The rest of the queues are then used for routing only.
          _consume_from = None
      
          def __init__(self, queues=None, default_exchange=None,
                       create_missing=True, autoexchange=None,
                       max_priority=None, default_routing_key=None):
              dict.__init__(self)
              self.aliases = WeakValueDictionary()
              self.default_exchange = default_exchange
              self.default_routing_key = default_routing_key
              self.create_missing = create_missing
              self.autoexchange = Exchange if autoexchange is None else autoexchange
              self.max_priority = max_priority
              if queues is not None and not isinstance(queues, Mapping):
                  queues = {q.name: q for q in queues}
              queues = queues or {}
              for name, q in queues.items():
                  self.add(q) if isinstance(q, Queue) else self.add_compat(name, **q)
      

      對于一個 Consumer,可以配置其 queue,一個 Consumer 可以有多個queue,比如:

      def add_consumer(state, queue, exchange=None, exchange_type=None,
                       routing_key=None, **options):
          """Tell worker(s) to consume from task queue by name."""
          state.consumer.call_soon(
              state.consumer.add_task_queue,
              queue, exchange, exchange_type or 'direct', routing_key, **options)
          return ok(f'add consumer {queue}')
      

      add_consumer 名字個人認為有一定誤導,其實是添加 queue,但是名字看起來像添加 Consumer。

      而在 Consumer 之中,會對 queues 進行具體配置。

      def add_task_queue(self, queue, exchange=None, exchange_type=None,
                         routing_key=None, **options):
          cset = self.task_consumer
          queues = self.app.amqp.queues
          if queue in queues:
              q = queues[queue]
          else:
              exchange = queue if exchange is None else exchange
              exchange_type = ('direct' if exchange_type is None
                               else exchange_type)
              q = queues.select_add(queue,
                                    exchange=exchange,
                                    exchange_type=exchange_type,
                                    routing_key=routing_key, **options)
          if not cset.consuming_from(queue):
              cset.add_queue(q)
              cset.consume()
              info('Started consuming from %s', queue)
      

      0x05 TBC

      通過以上的分析,大家應該對 Celery 的架構有了初步的了解。在下篇文章中,我們將從幾個方面做進一步思考,敬請期待。

      0xFF 參考

      Nginx資料之Master與Worker基礎概念

      1: Worker 啟動流程概述

      2: Worker 的執行引擎

      3: Task 對象的實現

      4: 定時任務的實現

      5: 遠程控制管理

      6: Events 的實現

      7: Worker 之間的交互

      8: State 和 Result

      Spark分布式計算引擎的應用

      mfc 消息消息隊列概念_消息隊列和任務隊列到底有什么不同?

      posted @ 2021-03-26 22:40  羅西的思考  閱讀(2880)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 性欧美vr高清极品| 亚洲熟妇自偷自拍另类| 亚洲高清aⅴ日本欧美视频| 国产欧美久久一区二区| 亚洲色欲或者高潮影院| 亚洲成人av综合一区| 久久88香港三级台湾三级播放| 国产乱码精品一区二三区| 任我爽精品视频在线播放| 377P欧洲日本亚洲大胆| 内射老妇bbwx0c0ck| 国产色悠悠综合在线观看 | 美女18禁一区二区三区视频| 国产精品老熟女一区二区| 漂亮人妻中文字幕丝袜| 中文字幕人妻精品在线| 亚洲国产一区二区三区| 国产尤物精品自在拍视频首页| 欧美性猛交xxxx黑人猛交| 真实国产老熟女无套内射| 精品国产亚洲午夜精品a| 久久人人97超碰精品| 蜜臀av性久久久久蜜臀aⅴ麻豆| 色噜噜在线视频免费观看| 男人天堂亚洲天堂女人天堂| 国产精品久久中文字幕| 精品精品久久宅男的天堂| 一区二区中文字幕av| 性按摩玩人妻hd中文字幕| 免费久久人人香蕉av| 人妻一区二区三区三区| 久久视频在线视频| 中文字幕无码av不卡一区| 东京热人妻无码一区二区av| 色噜噜亚洲精品中文字幕| 国产精品自拍视频免费看| 欧美老少配性行为| 亚洲男人av香蕉爽爽爽爽| 国产仑乱无码内谢| 国产国产午夜福利视频| 国产精品一区二区香蕉|