<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      celery源碼解讀

       

      Celery啟動的入口:

      文件:Celery/bin/celery.py

       

      看下main函數做了什么事

      可以看到主要做了幾個事根據-P參數判斷是否需要打patch,如果是gevent或者eventlet則要打對應的補丁。

      然后執行命令行邏輯

       

       

      可以看到,這邊取出系統參數

      然后執行基類Command的execute_from_commandline

      文件:celery/bin/base.py

       

      setup_app_from_commandline是核心函數,作用是獲得我們的app對象和獲得我們的配置參數

      文件:Celery/bin/celery.py

       

      這邊主要獲取啟動類別及啟動參數,我們的類別是worker所以:

       

      這邊是開始準備啟動對應類別的對象,worker、beat等。

      self.commands是支持的命令:

       

      上面我們知道,我們的類型是worker,即celery.bin.worker.worker,初始化該類,然后執行run_from_argv函數

      文件:celery/bin/worker.py

       

      最后一行會執行到父類的__call__函數,

      文件:celery/bin/base.py

       

      這邊主要執行的是run函數

       

      這個函數主要是啟動worker

       

      終于進入worker了,現在這里涉及一些比較關鍵的東西了,

      文件:celery/worker/__init__.py  

      WorkController類里,是worker的基類

       

       

      這是worker的藍圖,這邊會形成一個依賴圖,是啟動的必要組件,分別負責worker的一部分任務,比較重要的幾個:

      Timer用于執行定時任務的 Timer,和 Consumer 那里的 timer 不同

      HubEvent loop 的封裝對象

      Pool構造各種執行池(線程/進程/協程)的

      Beat創建Beat進程,不過是以子進程的形式運行(不同于命令行中以beat參數運行)

       

      文件:celery/apps/worker.py

       

      文件:celery/apps/trace.py

       

      文件:celery/app/base.py

       

      init_before開始,這邊是最主要的,即綁定所有的task到我們的app,注冊task在下面

       

       

       

       

      每個task都有delayapply_async函數,這個可以用來幫我們啟動任務。

       

      文件:celery/worker/__init__.py

       

      這邊是設置關注及不關注的隊列,可以看到,celery支持ampq協議。

       

      調用setup_includes安裝一些通過CELERY_INCLUDE配置的模塊,保證所有的任務模塊都導入了

      最后初始化藍圖,并進行apply完成藍圖各個step的依賴關系圖的構建,并進行各個組件的初始化,依賴在component中已經標出

       

      這個requires就是依賴,說明hub依賴timer,上面藍圖聲明的組件都有互相依賴關系。

      回到文件:celery/worker/__init__.py執行start

       

      執行的是藍圖的start

       

      分別執行各個步驟的start,在apply時,會判斷step是否需要start,不start但是仍要create

       

      通過啟動日志看,worker啟動的stepPool,和Consumer

      如果換成prefork方式起,worker會多起hubautoscaler兩個step

       

      Hub依賴Timer,我們用gevent,所以include_iffalse,這個不需要start

      Hub創建時候引用的kombuHub組件,Connection會注冊到HubConnection是各種類型連接的封裝,對外提供統一接口

      Queue依賴Hub,這邊是基于Hub創建任務隊列

      下面是我們的worker啟動的step其中的一個,重點進行說明

      初始化線程/協程池,是否彈縮,最大和最小并發數

       

      Celery支持的幾種TaskPool

       

      我們是gevent,所以這邊直接找gevent的代碼。

       

      這邊直接引用geventPool

       

      下面看worker啟動的第二個step

      可以看到,這邊啟動的是celery.worker.consumer.Consumer,這邊就會涉及另一個重要的藍圖了。

      文件:celery/worker/consumerConsumer

       

      這是Consumer的藍圖,

       

      Consumer啟動的stepConnectioneventsmingleGossipTasksContorlHeartevent loop

       

      __init__初始化一些必要的組件,很多都是之前worker創建的。

      然后執行blueprintapply,做的事我worker之前是一樣的。

       

      執行Consumerstart,也就是執行blueprintstart

      啟動的step的基本功能:

      Connection:管理和brokerConnection連接

      Mingle:不同worker之間同步狀態用的

      Tasks:啟動消息Consumer

      Gossip:消費來自其他worker的事件

      Heart:發送心跳事件(consumer的心跳)

      Control:遠程命令管理服務

      其中ConnectionTasksHeartevent loop是最重要的幾個。

      先看Connection

       

      使用了consumerconnect()

       

      Conn引用了ampqconnectionampqConnection是直接使用的kombuConnection,上面說過,這個Connection是各種支持的類型(如redisrabbitMQ等)的抽象,對外提供統一接口。

      如果hub存在,會將連接注冊到event loop

      再看Tasks

       

      這邊引用的ampqTaskConsumerampqTaskConsumer繼承了kombuConsumer

      可以看到,在關鍵的幾個地方,celery都引用了kombuKombu對所有的MQ進行抽象,然后通過接口對外暴露出一致的APIRedis/RabbitMQ/MongoDB),KombuMQ的抽象如下:

      Message:生產消費的基本單位,就是一條條消息

      Connection:對 MQ 連接的抽象,一個 Connection 就對應一個 MQ 的連接

      Transport:真實的 MQ 連接,也是真正連接到 MQ(redis/rabbitmq) 的實例

      Producers: 發送消息的抽象類

      Consumers:接受消息的抽象類

      ExchangeMQ 路由,這個和 RabbitMQ 差不多,支持 5種 類型

      Queue:對應的 queue 抽象,其實就是一個字符串的封裝

      Hub是一個eventloopConnection注冊到Hub,一個Connection對應一個HubConsumer綁定了消息的處理函數,每一個Consumer初始化的時候都是和Channel綁定的,也就是說我們Consumer包含了Queue也就和Connection關聯起來了,Consumer消費消息是通過Queue來消費,然后Queue又轉嫁給Channel再轉給connectionChannelAMQPMQ的操作的封裝,ConnectionAMQP對連接的封裝那么兩者的關系就是對MQ的操作必然離不開連接,但是,Kombu并不直接讓Channel使用Connection來發送/接受請求,而是引入了一個新的抽象TransportTransport負責具體的MQ的操作,也就是說Channel的操作都會落到Transport上執行

      再看下event loop

       

      上面我們有了connection以及綁定connectionconsumer,下面看看消費者怎么消費消息,如果是帶hub的情況:

       

      先對consumer進行一些設置,

       

      然后開始進行循環。loopkombu創建的event loop,啟用事件循環機制,然后next這邊就開始不停的循環獲取消息并執行。

       

      這個是kombu里的部分實現,是對從池里取到的消息進行處理。

       

      看下同步代碼,register_callback將回調注冊consumer,然后執行consume

       

      再看消息循環那幾行,

      獲取到消息后,調用回調函數進行處理。

       

      回調函數使用的是create_task_handler()strategies是在上面的update_strategies里進行的更新,該函數是在Task里調用的

       

      打印一下strategies里的信息,只截部分圖:

       

      下面看下我們怎么啟動任務的,

       

      調用到appsend_task

       

      再調用到ampqpublish_task

       

      最終又交給kombupublish

      關于pool的選擇:

       

      使用的是apppool,即

       

      通過connection又走到了ampq再轉到kombu里。

       

       

      Workerconsumer基本大框架就是上面的流程,下面看下beat是怎么實現的。

      Beat起動的時候是celery beat,根據我們上面的分析,首先進入的應該是celey/bin/beat.py,然后調用該文件中的Beatrun函數:

       

      然后在指向appsBeat

       

      在apps里的Beat調用run

       

      主要執行了三個函數,init_loader主要初始化并綁定task,第二步設置一些頭信息之類的,關鍵是第三步,主干代碼

       

      主要是初始化servicestart

       

      Start最關鍵的部分是那個while循環體,只要不被shutdown,就會一直調用schedulertick

       

      這邊這個self.schedule就是我們準備調度的任務:

       

      下面看對這些任務的處理:

       

      這是判斷是否要執行任務的邏輯,如果要執行,則執行apply_async

       

      如果發現任務該執行了,則去tasks里獲取任務,并執行,這邊的apply_asyncworker那邊的沒區別,如果沒找到task,則將task注冊到broker

       

       

       

      怎樣將consumerconcurrency聯系起來

      這邊調用了_process_task,調用的是worker里的

      這邊調用各種池的啟動函數:

      但是queue里只是引用,后面還有別的處理

      在初始化consumer時候將調用池的操作傳了進去,成為了Consumer里的on_task_request

      在Tasks調用start的時候會更新strategies

      然后在這邊調用start_strategy

      然后就進入

      然后走入strategy的default

      這里取了consumeron_task_request,就是我們傳入的池執行的邏輯,_limit_task是這樣的:

      做了一些判斷,符合條件再執行。

      這個文件是strategydefault的下半個文件,做了一些流量控制,然后執行limit_task或者直接執行handler

      這邊因為使用的gevent,所以就走到geventapply_async

      這邊是起一個協程處理,這樣就將任務交給了gevent

      具體上面是執行流程,具體在哪里執行的呢?

      這邊注冊了callbackcreate_task_handlerstrategy這邊取值取值執行

       

      Qosack的處理部分:

      Kombutransportredis.py里的額basic_consume,調用channelbasic_consume

       

      在Kombu.transport.virtual.__init__.py文件中

       

      這里維護了一個dictself._delivered,一個setself._dirty和一個intprefetch_count

      如果no_ackFalse在執行consume后會向self._delivered中添加一條數據,

      ack后會向self._dirty中添加一條數據,然后,后面會將self._dirty逐條刪除,并同時刪除self._delivered中的數據,如果沒有ack,則不會刪除:

       

       

      每次拉任務的時候會調用can_consume

       

      比較prefetch_countself._deliveredself._dirty的值,如果小于預取限制,則允許,否則不允許。

       

      posted @ 2019-03-14 19:17  Small_office  閱讀(2773)  評論(1)    收藏  舉報
      主站蜘蛛池模板: 九九热精品在线观看| 精品国产成人午夜福利| 亚洲男人天堂东京热加勒比| 精品无码人妻一区二区三区| 久久国产成人午夜av影院| 国模精品视频一区二区三区| 在线观看亚洲欧美日本| 内射极品少妇xxxxxhd| 亚洲中文字幕久久精品码| 久久精品av国产一区二区| 国产精品熟妇视频国产偷人| 伊人久久大香线蕉AV网禁呦| 久久99日本免费国产精品| 久久综合伊人77777| 欧美最猛性xxxxx大叫 | 国产精品偷伦费观看一次| 欧美大胆老熟妇乱子伦视频| 天堂V亚洲国产V第一次| 人妻少妇偷人精品免费看| 亚洲色成人网站www永久四虎| 日本亚洲一区二区精品久久| 国内精品久久人妻无码不卡| 亚洲一区二区三区水蜜桃| 国产中年熟女大集合| 欧洲精品色在线观看| 久久精品国产精品第一区| 精品国产免费一区二区三区香蕉| 欧美性猛交xxxx免费看| 人妻色综合网站| 亚洲人成人网站色www| 人妻18毛片A级毛片免费看| 人妻少妇偷人精品一区| 久热这里只有精品视频3| 起碰免费公开97在线视频| 亚洲AV成人片在线观看| 国产精品久久久久久av| free性开放小少妇| 伊人狠狠色丁香婷婷综合| 韩国免费a级毛片久久| 亚洲电影天堂av2017| 国产精品av免费观看|