<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      [源碼解析] 并行分布式框架 Celery 之 worker 啟動 (1)

      [源碼解析] 并行分布式框架 Celery 之 worker 啟動 (1)

      0x00 摘要

      Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統(tǒng),專注于實時處理的異步任務隊列,同時也支持任務調(diào)度。Celery 是調(diào)用其Worker 組件來完成具體任務處理。

      $ celery --app=proj worker -l INFO
      $ celery -A proj worker -l INFO -Q hipri,lopri
      $ celery -A proj worker --concurrency=4
      $ celery -A proj worker --concurrency=1000 -P eventlet
      $ celery worker --autoscale=10,0
      

      所以我們本文就來講解 worker 的啟動過程。

      0x01 Celery的架構

      前面我們用幾篇文章分析了 Kombu,為 Celery 的分析打下了基礎。

      [源碼分析] 消息隊列 Kombu 之 mailbox

      [源碼分析] 消息隊列 Kombu 之 Hub

      [源碼分析] 消息隊列 Kombu 之 Consumer

      [源碼分析] 消息隊列 Kombu 之 Producer

      [源碼分析] 消息隊列 Kombu 之 啟動過程

      [源碼解析] 消息隊列 Kombu 之 基本架構

      以及

      源碼解析 并行分布式框架 Celery 之架構 (2)

      [源碼解析] 并行分布式框架 Celery 之架構 (2)

      下面我們再回顧下 Celery 的結構。Celery的架構圖如下所示:

       +-----------+            +--------------+
       | Producer  |            |  Celery Beat |
       +-------+---+            +----+---------+
               |                     |
               |                     |
               v                     v
      
             +-------------------------+
             |          Broker         |
             +------------+------------+
                          |
                          |
                          |
           +-------------------------------+
           |              |                |
           v              v                v
      +----+-----+   +----+------+   +-----+----+
      | Exchange |   |  Exchange |   | Exchange |
      +----+-----+   +----+------+   +----+-----+
           |              |               |
           v              v               v
      
        +-----+       +-------+       +-------+
        |queue|       | queue |       | queue |
        +--+--+       +---+---+       +---+---+
           |              |               |
           |              |               |
           v              v               v
      
      +---------+     +--------+     +----------+
      | worker  |     | Worker |     |  Worker  |
      +-----+---+     +---+----+     +----+-----+
            |             |               |
            |             |               |
            +-----------------------------+
                          |
                          |
                          v
                      +---+-----+
                      | backend |
                      +---------+
      
      

      0x02 示例代碼

      其實網(wǎng)上難以找到調(diào)試Celery worker的辦法。我們可以去其源碼看看,發(fā)現(xiàn)如下:

      # def test_worker_main(self):
      #     from celery.bin import worker as worker_bin
      #
      #     class worker(worker_bin.worker):
      #
      #         def execute_from_commandline(self, argv):
      #             return argv
      #
      #     prev, worker_bin.worker = worker_bin.worker, worker
      #     try:
      #         ret = self.app.worker_main(argv=['--version'])
      #         assert ret == ['--version']
      #     finally:
      #         worker_bin.worker = prev
      

      所以我們可以模仿來進行,使用如下啟動worker,進行調(diào)試。

      from celery import Celery
      
      app = Celery('tasks', broker='redis://localhost:6379')
      
      @app.task()
      def add(x, y):
          return x + y
      
      if __name__ == '__main__':
          app.worker_main(argv=['worker'])
      

      0x03 邏輯概述

      當啟動一個worker的時候,這個worker會與broker建立鏈接(tcp長鏈接),然后如果有數(shù)據(jù)傳輸,則會創(chuàng)建相應的channel, 這個連接可以有多個channel。然后,worker就會去borker的隊列里面取相應的task來進行消費了,這也是典型的消費者生產(chǎn)者模式。

      這個worker主要是有四部分組成的,task_pool, consumer, scheduler, mediator。其中,task_pool主要是用來存放的是一些worker,當啟動了一個worker,并且提供并發(fā)參數(shù)的時候,會將一些worker放在這里面。

      celery默認的并發(fā)方式是prefork,也就是多進程的方式,這里只是celery對multiprocessing pool進行了輕量的改造,然后給了一個新的名字叫做prefork,這個pool與多進程的進程池的區(qū)別就是這個task_pool只是存放一些運行的worker。

      consumer也就是消費者,主要是從broker那里接受一些message,然后將message轉(zhuǎn)化為celery.worker.request.Request 的一個實例。

      Celery 在適當?shù)臅r候,會把這個請求包裝進Task中,Task就是用裝飾器app_celery.task()裝飾的函數(shù)所生成的類,所以可以在自定義的任務函數(shù)中使用這個請求參數(shù),獲取一些關鍵的信息。此時,已經(jīng)了解了task_pool和consumer。

      接下來,這個worker具有兩套數(shù)據(jù)結構,這兩套數(shù)據(jù)結構是并行運行的,他們分別是 'ET時刻表' 、就緒隊列。

      就緒隊列:那些 立刻就需要運行的task, 這些task到達worker的時候會被放到這個就緒隊列中等待consumer執(zhí)行。

      我們下面看看如何啟動Celery

      0x04 Celery應用

      程序首先會來到Celery類,這是Celery的應用。

      可以看到主要就是:各種類名稱,TLS, 初始化之后的各種signal。

      位置在:celery/app/base.py,其定義如下:

      class Celery:
          """Celery application."""
      
          amqp_cls = 'celery.app.amqp:AMQP'
          backend_cls = None
          events_cls = 'celery.app.events:Events'
          loader_cls = None
          log_cls = 'celery.app.log:Logging'
          control_cls = 'celery.app.control:Control'
          task_cls = 'celery.app.task:Task'
          registry_cls = 'celery.app.registry:TaskRegistry'
      
          #: Thread local storage.
          _local = None
          _fixups = None
          _pool = None
          _conf = None
          _after_fork_registered = False
      
          #: Signal sent when app is loading configuration.
          on_configure = None
      
          #: Signal sent after app has prepared the configuration.
          on_after_configure = None
      
          #: Signal sent after app has been finalized.
          on_after_finalize = None
      
          #: Signal sent by every new process after fork.
          on_after_fork = None
      

      對于我們的示例代碼,入口是:

      def worker_main(self, argv=None):
          if argv is None:
              argv = sys.argv
      
          if 'worker' not in argv:
              raise ValueError(
                  "The worker sub-command must be specified in argv.\n"
                  "Use app.start() to programmatically start other commands."
              )
      
          self.start(argv=argv)
      

      4.1 添加子command

      celery/bin/celery.py 會進行添加 子command,我們可以看出來。

      這些 Commnd 是可以在命令行作為子命令直接使用的

      celery.add_command(purge)
      celery.add_command(call)
      celery.add_command(beat)
      celery.add_command(list_)
      celery.add_command(result)
      celery.add_command(migrate)
      celery.add_command(status)
      celery.add_command(worker)
      celery.add_command(events)
      celery.add_command(inspect)
      celery.add_command(control)
      celery.add_command(graph)
      celery.add_command(upgrade)
      celery.add_command(logtool)
      celery.add_command(amqp)
      celery.add_command(shell)
      celery.add_command(multi)
      

      每一個都是command。我們以worker為例,具體如下:

      worker = {CeleryDaemonCommand} <CeleryDaemonCommand worker>
       add_help_option = {bool} True
       allow_extra_args = {bool} False
       allow_interspersed_args = {bool} True
       context_settings = {dict: 1} {'allow_extra_args': True}
       epilog = {NoneType} None
       name = {str} 'worker'
       options_metavar = {str} '[OPTIONS]'
       params = {list: 32} [<CeleryOption hostname>, ...... , <CeleryOption executable>]
      

      4.2 入口點

      然后會引入Celery 命令入口點 Celery。

      def start(self, argv=None):
          from celery.bin.celery import celery
      
          celery.params[0].default = self
      
          try:
              celery.main(args=argv, standalone_mode=False)
          except Exit as e:
              return e.exit_code
          finally:
              celery.params[0].default = None
      

      4.3 緩存屬性cached_property

      Celery 中,大量的成員變量是被cached_property修飾的

      使用 cached_property修飾過的函數(shù),就變成是對象的屬性,該對象第一次引用該屬性時,會調(diào)用函數(shù),對象第二次引用該屬性時就直接從詞典中取了,即 Caches the return value of the get method on first call。

      很多知名Python項目都自己實現(xiàn)過 cached_property,比如Werkzeug,Django。

      因為太有用,所以 Python 3.8 給 functools 模塊添加了 cached_property 類,這樣就有了官方的實現(xiàn)。

      Celery 的代碼舉例如下:

          @cached_property
          def Worker(self):
              """Worker application.
              """
              return self.subclass_with_self('celery.apps.worker:Worker')
      
          @cached_property
          def Task(self):
              """Base task class for this app."""
              return self.create_task_cls()
      
          @property
          def pool(self):
              """Broker connection pool: :class:`~@pool`.
              """
              if self._pool is None:
                  self._ensure_after_fork()
                  limit = self.conf.broker_pool_limit
                  pools.set_limit(limit)
                  self._pool = pools.connections[self.connection_for_write()]
              return self._pool
      

      所以,最終,Celery的內(nèi)容應該是這樣的:

      app = {Celery} <Celery tasks at 0x7fb8e1538400>
       AsyncResult = {type} <class 'celery.result.AsyncResult'>
       Beat = {type} <class 'celery.apps.beat.Beat'>
       GroupResult = {type} <class 'celery.result.GroupResult'>
       Pickler = {type} <class 'celery.app.utils.AppPickler'>
       ResultSet = {type} <class 'celery.result.ResultSet'>
       Task = {type} <class 'celery.app.task.Task'>
       WorkController = {type} <class 'celery.worker.worker.WorkController'>
       Worker = {type} <class 'celery.apps.worker.Worker'>
       amqp = {AMQP} <celery.app.amqp.AMQP object at 0x7fb8e2444860>
       annotations = {tuple: 0} ()
       autofinalize = {bool} True
       backend = {DisabledBackend} <celery.backends.base.DisabledBackend object at 0x7fb8e25fd668>
       builtin_fixups = {set: 1} {'celery.fixups.django:fixup'}
       clock = {LamportClock} 1
       conf = {Settings: 163} Settings({'broker_url': 'redis://localhost:6379', 'deprecated_settings': set(), 'cache_...
       configured = {bool} True
       control = {Control} <celery.app.control.Control object at 0x7fb8e2585f98>
       current_task = {NoneType} None
       current_worker_task = {NoneType} None
       events = {Events} <celery.app.events.Events object at 0x7fb8e25ecb70>
       loader = {AppLoader} <celery.loaders.app.AppLoader object at 0x7fb8e237a4a8>
       main = {str} 'tasks'
       on_after_configure = {Signal} <Signal: app.on_after_configure providing_args={'source'}>
       on_after_finalize = {Signal} <Signal: app.on_after_finalize providing_args=set()>
       on_after_fork = {Signal} <Signal: app.on_after_fork providing_args=set()>
       on_configure = {Signal} <Signal: app.on_configure providing_args=set()>
       pool = {ConnectionPool} <kombu.connection.ConnectionPool object at 0x7fb8e26e9e80>
       producer_pool = {ProducerPool} <kombu.pools.ProducerPool object at 0x7fb8e26f02b0>
       registry_cls = {type} <class 'celery.app.registry.TaskRegistry'>
       set_as_current = {bool} True
       steps = {defaultdict: 2} defaultdict(<class 'set'>, {'worker': set(), 'consumer': set()})
       tasks = {TaskRegistry: 10} {'celery.chain': <@task: celery.chain of tasks at 0x7fb8e1538400>, 'celery.starmap': <@task: celery.starmap of tasks at 0x7fb8e1538400>, 'celery.chord': <@task: celery.chord of tasks at 0x7fb8e1538400>, 'celery.backend_cleanup': <@task: celery.backend_clea
       user_options = {defaultdict: 0} defaultdict(<class 'set'>, {})
      

      具體部分成員變量舉例如下圖:

      +---------------------------------------+
      |  Celery                               |
      |                                       |
      |                              Beat+-----------> celery.apps.beat.Beat
      |                                       |
      |                              Task+-----------> celery.app.task.Task
      |                                       |
      |                     WorkController+----------> celery.worker.worker.WorkController
      |                                       |
      |                            Worker+-----------> celery.apps.worker.Worker
      |                                       |
      |                              amqp +----------> celery.app.amqp.AMQP
      |                                       |
      |                           control +----------> celery.app.control.Control
      |                                       |
      |                            events  +---------> celery.app.events.Events
      |                                       |
      |                            loader +----------> celery.loaders.app.AppLoader
      |                                       |
      |                              pool +----------> kombu.connection.ConnectionPool
      |                                       |
      |                     producer_pool +----------> kombu.pools.ProducerPool
      |                                       |
      |                             tasks +----------> TaskRegistry
      |                                       |
      |                                       |
      +---------------------------------------+
      

      0x05 Celery 命令

      Celery的命令總?cè)肟跒閏elery方法,具體在:celery/bin/celery.py。

      代碼縮減版如下:

      @click.pass_context
      def celery(ctx, app, broker, result_backend, loader, config, workdir,
                 no_color, quiet, version):
          """Celery command entrypoint."""
      
          if loader:
              # Default app takes loader from this env (Issue #1066).
              os.environ['CELERY_LOADER'] = loader
          if broker:
              os.environ['CELERY_BROKER_URL'] = broker
          if result_backend:
              os.environ['CELERY_RESULT_BACKEND'] = result_backend
          if config:
              os.environ['CELERY_CONFIG_MODULE'] = config
          ctx.obj = CLIContext(app=app, no_color=no_color, workdir=workdir,
                               quiet=quiet)
      
          # User options
          worker.params.extend(ctx.obj.app.user_options.get('worker', []))
          beat.params.extend(ctx.obj.app.user_options.get('beat', []))
          events.params.extend(ctx.obj.app.user_options.get('events', []))
      
          for command in celery.commands.values():
              command.params.extend(ctx.obj.app.user_options.get('preload', []))
      

      在方法中,會遍歷celery.commands,拓展param,具體如下。這些 commands 就是之前剛剛提到的子Command:

      celery.commands = 
       'report' = {CeleryCommand} <CeleryCommand report>
       'purge' = {CeleryCommand} <CeleryCommand purge>
       'call' = {CeleryCommand} <CeleryCommand call>
       'beat' = {CeleryDaemonCommand} <CeleryDaemonCommand beat>
       'list' = {Group} <Group list>
       'result' = {CeleryCommand} <CeleryCommand result>
       'migrate' = {CeleryCommand} <CeleryCommand migrate>
       'status' = {CeleryCommand} <CeleryCommand status>
       'worker' = {CeleryDaemonCommand} <CeleryDaemonCommand worker>
       'events' = {CeleryDaemonCommand} <CeleryDaemonCommand events>
       'inspect' = {CeleryCommand} <CeleryCommand inspect>
       'control' = {CeleryCommand} <CeleryCommand control>
       'graph' = {Group} <Group graph>
       'upgrade' = {Group} <Group upgrade>
       'logtool' = {Group} <Group logtool>
       'amqp' = {Group} <Group amqp>
       'shell' = {CeleryCommand} <CeleryCommand shell>
       'multi' = {CeleryCommand} <CeleryCommand multi>
      

      0x06 worker 子命令

      Work子命令是 Command 總命令的一員,也是我們直接在命令行加入 worker 參數(shù)時候,調(diào)用到的子命令。

      $ celery -A proj worker -l INFO -Q hipri,lopri
      

      worker 子命令繼承了click.BaseCommand,為

      定義在celery/bin/worker.py。

      因此如下代碼間接調(diào)用到 worker 命令:

      celery.main(args=argv, standalone_mode=False)
      

      定義如下:

      def worker(ctx, hostname=None, pool_cls=None, app=None, uid=None, gid=None,
                 loglevel=None, logfile=None, pidfile=None, statedb=None,
                 **kwargs):
          """Start worker instance.
      
          Examples
          --------
          $ celery --app=proj worker -l INFO
          $ celery -A proj worker -l INFO -Q hipri,lopri
          $ celery -A proj worker --concurrency=4
          $ celery -A proj worker --concurrency=1000 -P eventlet
          $ celery worker --autoscale=10,0
      
          """
          app = ctx.obj.app
          maybe_drop_privileges(uid=uid, gid=gid)
          worker = app.Worker(
              hostname=hostname, pool_cls=pool_cls, loglevel=loglevel,
              logfile=logfile,  # node format handled by celery.app.log.setup
              pidfile=node_format(pidfile, hostname),
              statedb=node_format(statedb, hostname),
              no_color=ctx.obj.no_color,
              **kwargs)
          worker.start()
          return worker.exitcode
      

      此時流程如下圖,可以看到,從 Celery 應用就進入到了具體的 worker 命令:

            +----------+
            |   User   |
            +----+-----+
                 |
                 |  worker_main
                 |
                 v
       +---------+------------+
       |        Celery        |
       |                      |
       |  Celery application  |
       |  celery/app/base.py  |
       |                      |
       +---------+------------+
                 |
                 |  celery.main
                 |
                 v
       +---------+------------+
       |  @click.pass_context |
       |       celery         |
       |                      |
       |                      |
       |    CeleryCommand     |
       | celery/bin/celery.py |
       |                      |
       +---------+------------+
                 |
                 |
                 |
                 v
      +----------+------------+
      |   @click.pass_context |
      |        worker         |
      |                       |
      |                       |
      |     WorkerCommand     |
      | celery/bin/worker.py  |
      +-----------------------+
      

      0x07 Worker application

      此時在該函數(shù)中會實例化app的Worker,Worker application 就是 worker 的實例此時的app就是前面定義的Celery類的實例app

      定義在:celery/app/base.py。

      @cached_property
      def Worker(self):
          """Worker application.
      
          See Also:
              :class:`~@Worker`.
          """
          return self.subclass_with_self('celery.apps.worker:Worker')
      

      此時subclass_with_self利用了Python的type動態(tài)生成類實例的屬性。

      def subclass_with_self(self, Class, name=None, attribute='app',
                             reverse=None, keep_reduce=False, **kw):
          """Subclass an app-compatible class.
          """
          Class = symbol_by_name(Class)                               # 導入該類
          reverse = reverse if reverse else Class.__name__            # 判斷是否傳入值,如沒有則使用類的名稱
      
          def __reduce__(self):                                       # 定義的方法 該方法在pickle過程中會被調(diào)用
              return _unpickle_appattr, (reverse, self.__reduce_args__()) 
      
          attrs = dict(
              {attribute: self},                                      # 默認設置app的值為self
              __module__=Class.__module__,    
              __doc__=Class.__doc__,
              **kw)                                                   # 填充屬性
          if not keep_reduce:                                         
              attrs['__reduce__'] = __reduce__                        # 如果默認則生成的類設置__reduce__方法
      
          return type(bytes_if_py2(name or Class.__name__), (Class,), attrs) # 利用type實誠類實例
      
      

      此時就已經(jīng)從 worker 命令 得到了一個celery.apps.worker:Worker的實例,然后調(diào)用該實例的start方法,此時首先分析一下Worker類的實例化的過程。

      我們先回顧下:

      我們的執(zhí)行從 worker_main 這個程序入口,來到了 Celery 應用。然后進入了 Celery Command,然后又進入到了 Worker 子Command,具體如下圖。

                                           +----------------------+
            +----------+                   |  @cached_property    |
            |   User   |                   |      Worker          |
            +----+-----+            +--->  |                      |
                 |                  |      |                      |
                 |  worker_main     |      |  Worker application  |
                 |                  |      |  celery/app/base.py  |
                 v                  |      +----------------------+
       +---------+------------+     |
       |        Celery        |     |
       |                      |     |
       |  Celery application  |     |
       |  celery/app/base.py  |     |
       |                      |     |
       +---------+------------+     |
                 |                  |
                 |  celery.main     |
                 |                  |
                 v                  |
       +---------+------------+     |
       |  @click.pass_context |     |
       |       celery         |     |
       |                      |     |
       |                      |     |
       |    CeleryCommand     |     |
       | celery/bin/celery.py |     |
       |                      |     |
       +---------+------------+     |
                 |                  |
                 |                  |
                 |                  |
                 v                  |
      +----------+------------+     |
      |   @click.pass_context |     |
      |        worker         |     |
      |                       |     |
      |                       |     |
      |     WorkerCommand     |     |
      | celery/bin/worker.py  |     |
      +-----------+-----------+     |
                  |                 |
                  +-----------------+
      
      

      下面就會正式進入 worker,Celery 把 worker 的正式邏輯成為 work as a program。

      我們在下文將接下來繼續(xù)看后續(xù) work as a program 的啟動過程。

      0xFF 參考

      Celery 源碼學習(二)多進程模型

      celery原理初探

      celery源碼分析-wroker初始化分析(上)

      celery源碼分析-worker初始化分析(下)

      celery worker初始化--DAG實現(xiàn)

      python celery多worker、多隊列、定時任務

      celery 詳細教程-- Worker篇

      使用Celery

      Celery 源碼解析一:Worker 啟動流程概述

      Celery 源碼解析二:Worker 的執(zhí)行引擎

      Celery 源碼解析三:Task 對象的實現(xiàn)

      Celery 源碼解析四:定時任務的實現(xiàn)

      Celery 源碼解析五:遠程控制管理

      Celery 源碼解析六:Events 的實現(xiàn)

      Celery 源碼解析七:Worker 之間的交互

      Celery 源碼解析八:State 和 Result

      posted @ 2021-03-29 21:27  羅西的思考  閱讀(2437)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 国精产品一区一区三区有限公司杨| 亚洲avav天堂av在线网爱情| 国内自拍小视频在线看| 国产中文字幕精品在线| 日韩乱码视频一区二区三区| 九九热精品免费在线视频| AV最新高清无码专区| 免费无码黄网站在线观看| 国产精品成人av电影不卡| 成人亚欧欧美激情在线观看| 国产成人无码久久久精品一| 在线亚洲午夜片av大片| 不卡一区二区三区在线视频| 亚洲欧美成人久久综合中文网| 一本色道久久东京热| 久久一日本综合色鬼综合色| 另类 亚洲 图片 激情 欧美 | 老司机午夜精品视频资源| 无限看片在线版免费视频大全| 日韩高清免费一码二码三码 | 永久国产盗摄一区二区色欲| 成人3D动漫一区二区三区| 国产日韩av一区二区在线| 天堂中文8资源在线8| 国产女人水真多18毛片18精品| 国产精品一区 在线播放| 欧洲精品色在线观看| 亚洲国产成人精品无码区在线观看| 亚洲午夜无码久久久久小说| 久久精品国产99亚洲精品| 无遮高潮国产免费观看| 国产女人在线视频| 国产精品熟女亚洲av麻豆| 熟妇人妻av中文字幕老熟妇| 中文字幕日韩一区二区不卡 | 国产麻豆一精品一av一免费| 国产亚洲一本大道中文在线| 中文字幕亚洲人妻一区| 国产精品一区二区三区卡| 青青在线视频一区二区三区| 亚洲男人的天堂在线观看|