<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      [源碼分析] 消息隊列 Kombu 之 Hub

      [源碼分析] 消息隊列 Kombu 之 Hub

      0x00 摘要

      本系列我們介紹消息隊列 Kombu。Kombu 的定位是一個兼容 AMQP 協議的消息隊列抽象。通過本文,大家可以了解 Kombu 中的 Hub 概念。

      0x01 示例代碼

      下面使用如下代碼來進行說明。

      本示例來自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感謝。

      def main(arguments):
          hub = Hub()
          exchange = Exchange('asynt_exchange')
          queue = Queue('asynt_queue', exchange, 'asynt_routing_key')
      
          def send_message(conn):
              producer = Producer(conn)
              producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')
              print('message sent')
      
          def on_message(message):
              print('received: {0!r}'.format(message.body))
              message.ack()
              # hub.stop()  # <-- exit after one message
      
          conn = Connection('redis://localhost:6379')
          conn.register_with_event_loop(hub)
      
          def p_message():
              print(' kombu ')
      
          with Consumer(conn, [queue], on_message=on_message):
              send_message(conn)
              hub.timer.call_repeatedly(3, p_message)
              hub.run_forever()
      
      if __name__ == '__main__':
          sys.exit(main(sys.argv[1:]))
      

      0x02 來由

      前文中,Consumer部分有一句代碼沒有分析:

      hub.run_forever()
      

      此時,hub與Connection已經聯系起來,具體如下:

      具體如下圖:

      +----------------------+               +-------------------+
      | Consumer             |               | Channel           |
      |                      |               |                   |        +-----------------------------------------------------------+
      |                      |               |    client  +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
      |      channel  +--------------------> |                   |        +-----------------------------------------------------------+
      |                      |               |    pool           |
      |                      |   +---------> |                   | <------------------------------------------------------------+
      |      queues          |   |           |                   |                                                              |
      |                      |   |    +----> |    connection +---------------+                                                  |
      |        |             |   |    |      |                   |           |                                                  |
      +----------------------+   |    |      +-------------------+           |                                                  |
               |                 |    |                                      v                                                  |
               |                 |    |      +-------------------+       +---+-----------------+       +--------------------+   |
               |                 |    |      | Connection        |       | redis.Transport     |       | MultiChannelPoller |   |
               |                 |    |      |                   |       |                     |       |                    |   |
               |                 |    |      |                   |       |                     |       |     _channels +--------+
               |                 |    |      |                   |       |        cycle +------------> |     _fd_to_chan    |
               |                 |    |      |     transport +---------> |                     |       |     _chan_to_sock  |
               |       +-------->+    |      |                   |       |                     |    +------+ poller         |
               |       |              |      +-------------------+       +---------------------+    |  |     after_read     |
               |       |              |                                                             |  |                    |
               |       |              |                                                             |  +--------------------+
               |       |              |      +------------------+                   +---------------+
               |       |              |      | Hub              |                   |
               |       |              |      |                  |                   v
               |       |              |      |                  |            +------+------+
               |       |              |      |      poller +---------------> | _poll       |
               |       |              |      |                  |            |             |         +-------+
               |       |              |      |                  |            |    _poller+---------> |  poll |
               v       |              |      +------------------+            |             |         +-------+
                       |              |                                      +-------------+
          +-------------------+       |      +----------------+
          | Queue      |      |       |      | Exchange       |
          |      _chann+l     |       +----+ |                |
          |                   |              |                |
          |      exchange +----------------> |     channel    |
          |                   |              |                |
          |                   |              |                |
          +-------------------+              +----------------+
      
      

      手機如下:

      現在我們知道:

      • Consumers:接受消息的抽象類,consumer需要聲明一個queue,并將queue與指定的exchange綁定,然后從queue里面接收消息。
      • Exchange:MQ 路由,消息發送者將消息發至Exchange,Exchange負責將消息分發至隊列。
      • Queue:對應的 queue 抽象,存儲著即將被應用消費掉的消息,Exchange負責將消息分發Queue,消費者從Queue接收消息;
      • Channel:與AMQP中概念類似,可以理解成共享一個Connection的多個輕量化連接,是操作的抽象;

      但是,我們只是大致知道 poll 是用來做什么的,但是不知道consumer,poll 究竟如何與Hub交互。我們本文就接著分析。

      0x03 Poll一般步驟

      在linux系統中,使用Poll的一般步驟如下:

      1. Create an epoll object——創建1個epoll對象;
      2. Tell the epoll object to monitor specific events on specific sockets——告訴epoll對象,在指定的socket上監聽指定的事件;
      3. Ask the epoll object which sockets may have had the specified event since the last query——詢問epoll對象,從上次查詢以來,哪些socket發生了哪些指定的事件;
      4. Perform some action on those sockets——在這些socket上執行一些操作;
      5. Tell the epoll object to modify the list of sockets and/or events to monitor——告訴epoll對象,修改socket列表和(或)事件,并監控;
      6. Repeat steps 3 through 5 until finished——重復步驟3-5,直到完成;
      7. Destroy the epoll object——銷毀epoll對象;

      所以我們就需要在 Hub 代碼中看看 kombu 如何使用 Poll。

      0x04 建立 Hub

      在建立 Hub 這里會建立 Hub 內部的 Poller。

      _get_poller, eventio.py:312
      poll, eventio.py:328
      _create_poller, hub.py:113
      __init__, hub.py:96
      main, hub_receive.py:23
      <module>, hub_receive.py:46
      

      具體代碼是:

      def _get_poller():
          if detect_environment() != 'default':
              # greenlet
              return _select
          elif epoll:
              # Py2.6+ Linux
              return _epoll
          elif kqueue and 'netbsd' in sys.platform:
              return _kqueue
          elif xpoll:
              return _poll
          else:
              return _select
      

      這樣,在 Hub內部就建立了 poller。

      class Hub:
          """Event loop object.
      
          Arguments:
              timer (kombu.asynchronous.Timer): Specify custom timer instance.
          """
          def __init__(self, timer=None):
              self.timer = timer if timer is not None else Timer()
      
              self.readers = {}
              self.writers = {}
              self.on_tick = set()
              self.on_close = set()
              self._ready = set()
      
              self._running = False
              self._loop = None
      
              self._create_poller()
      
          @property
          def poller(self):
              if not self._poller:
                  self._create_poller()
              return self._poller
      
          @poller.setter
          def poller(self, value):
              self._poller = value
      
          def _create_poller(self):
              self._poller = poll()
              self._register_fd = self._poller.register
              self._unregister_fd = self._poller.unregister
      

      這里需要注意的是:

      在 MultiChannelPoller 之中,也會生成一個 poller,但是在注冊時候,Transport 會使用 hub 的 poller,而非 MultiChannelPoller 內部的 poller

      on_poll_init, redis.py:333
      register_with_event_loop, redis.py:1061
      register_with_event_loop, connection.py:266
      main, hub_receive.py:38
      <module>, hub_receive.py:46
      

      在 kombu.transport.redis.Transport 代碼如下:

      def register_with_event_loop(self, connection, loop):
          cycle = self.cycle
          cycle.on_poll_init(loop.poller) # 這里賦值。
          cycle_poll_start = cycle.on_poll_start
          add_reader = loop.add_reader
          on_readable = self.on_readable   
      

      繼續深入,看到進一步賦值:

      def on_poll_init(self, poller):
          self.poller = poller # 這里賦值
          for channel in self._channels:
              return channel.qos.restore_visible(
                  num=channel.unacked_restore_limit,
              )
      

      0x05 Forever in Hub

      hub.run_forever() 主要作用是:

      • 建立loop
      • 因為Hub里面有Channel,有poll,所以現在就把Channel與poll聯系起來,包括socket,socket的file等待。
      • 進行poll,有消息就相應處理;

      比如維護如下變量:

      self._fd_to_chan[sock.fileno()] = (channel, type)
      self._chan_to_sock[(channel, client, type)] = sock
      self.poller.register(sock, self.eventflags)
      

      具體 run_forever 如下:

      def run_forever(self):
          self._running = True
          try:
              while 1:
                  try:
                      self.run_once()
                  except Stop:
                      break
          finally:
              self._running = False
      

      于是又有調用如下,這里就進入了loop:

      def run_once(self):
          try:
              next(self.loop)
          except StopIteration:
              self._loop = None
      

      5.1 建立loop

      next(self.loop) 繼續調用,建立loop。這就是Hub的作用

      調用stack如下:

      create_loop, hub.py:279
      run_once, hub.py:193
      run_forever, hub.py:185
      main, testUb.py:51
      <module>, testUb.py:55
      

      簡化版代碼如下:

      def create_loop(self, ...):
      
          while 1:
              todo = self._ready
              self._ready = set()
      
              for tick_callback in on_tick:
                  tick_callback() # 這里回調用戶方法
      
              for item in todo:
                  if item:
                      item()
      
              poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
      
              if readers or writers:
                  to_consolidate = []
                  events = poll(poll_timeout) # 等待消息
      
                  for fd, event in events or ():
                      if fd in consolidate and \
                              writers.get(fd) is None:
                          to_consolidate.append(fd)
                          continue
                      cb = cbargs = None
      
                      if event & READ:
                          cb, cbargs = readers[fd] # 讀取redis
                      elif event & WRITE:
                          cb, cbargs = writers[fd] # 處理redis
      
                      if isinstance(cb, generator):
                          try:
                              next(cb) 
                      else:
                          cb(*cbargs) # 調用用戶代碼
                  if to_consolidate:
                      consolidate_callback(to_consolidate)
              else:
                  # no sockets yet, startup is probably not done.
                  sleep(min(poll_timeout, 0.1))
              yield
      

      下面我們逐步分析。

      0x06 啟動Poll

      循環最開始將啟動 Poll。 tick_callback 的作用就是啟動 Poll。就是建立一個機制,當 redis 有消息時候,得到通知

      while 1:
          todo = self._ready
          self._ready = set()
      
          for tick_callback in on_tick:
              tick_callback()
      

      此時:tick_callback的數值為:<function Transport.register_with_event_loop.<locals>.on_poll_start >,所以 tick_callback就調用到 Transport.register_with_event_loop.<locals>.on_poll_start

      6.1 回顧如何注冊回調

      Transport方法如何注冊,我們需要回顧,在前面代碼這里會注冊回調方法。

      conn.register_with_event_loop(hub)
      

      具體注冊如下:

      def register_with_event_loop(self, connection, loop):
      
          cycle_poll_start = cycle.on_poll_start
          add_reader = loop.add_reader
          on_readable = self.on_readable
      
          def _on_disconnect(connection):
              if connection._sock:
                  loop.remove(connection._sock)
          cycle._on_connection_disconnect = _on_disconnect
      
          def on_poll_start():
              cycle_poll_start()
              [add_reader(fd, on_readable, fd) for fd in cycle.fds]
              
          loop.on_tick.add(on_poll_start)
      

      on_poll_start就是在這里注冊的,就是把 on_poll_start 注冊到 hub 的 on_tick 回調之中

      loop.on_tick.add(on_poll_start)
      

      所以前面的如下代碼就調用到了 on_poll_start。

      for tick_callback in on_tick:
          tick_callback()
      

      6.2 Transport啟動

      所以,我們回到on_poll_start。

      def on_poll_start():
          cycle_poll_start()
          [add_reader(fd, on_readable, fd) for fd in cycle.fds]
      

      可以看到,有兩部分代碼:

      • poll_start : 這部分是 把 Channel 對應的 socket 同poll聯系起來,一個 socket 在 linux 系統中就是一個file,就可以進行 poll 操作;
      • add_reader :這部分是 把 poll 對應的 fd 添加到 MultiChannelPoller 這里,這樣 MultiChannelPoller 就可以 打通 redis queue ----> Channel ---> socket ---> poll ---> fd ---> 讀取 redis 這條通路了,就是如果 redis 有數據來了,MultiChannelPoller 就馬上通過 poll 得到通知,就去 redis 讀取;

      讓我們逐一看看。

      6.3 poll_start in MultiChannelPoller

      這里就是把Channel對應的 socket 同poll聯系起來,一個 socket 在 linux 系統中就是一個file,就可以進行 poll 操作

      此時代碼進入到MultiChannelPoller,數據如下:

      self = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7f84e7928940>
       after_read = {set: 0} set()
       eventflags = {int} 25
       fds = {dict: 0} {}
       poller = {_poll} <kombu.utils.eventio._poll object at 0x7f84e75f4d68>
      

      可以看出來,此處就是針對channel來進行注冊,把所有的channel注冊到 poll上。

      def on_poll_start(self):
          for channel in self._channels:
              if channel.active_queues:           # BRPOP mode?
                  if channel.qos.can_consume():
                      self._register_BRPOP(channel)
              if channel.active_fanout_queues:    # LISTEN mode?
                  self._register_LISTEN(channel)
      

      對于 redis 的使用,有兩種方法:BRPOP mode 和 LISTEN mode。分別對應 list 和 subscribe。

      6.3.1 _register_BRPOP

      我們先來看看 _register_BRPOP,這里做了兩個判斷,第一個是判斷當前的 channel 是否放進了 epoll 模型里面,如果沒有,那么就放進去;同時,如果之前這個 channel 不在 epoll 里面,那么這次放進去了,但是,這個 connection 還沒有對 epoll 其效果,所以發送一個 _brpop_start

      def _register_BRPOP(self, channel):
          """Enable BRPOP mode for channel."""
          ident = channel, channel.client, 'BRPOP'
          if not self._client_registered(channel, channel.client, 'BRPOP'):
              channel._in_poll = False
              self._register(*ident)
          if not channel._in_poll:  # send BRPOP
              channel._brpop_start()
      
      6.3.1.1 注冊到MultiChannelPoller

      一個 Connection 對應一個 Hub它們之間的樞紐是 MultiChannelPoller,它負責找出哪個 Channel 是可用的,這些 Channel 都是來自同一個 Connection。具體注冊代碼如下:

      def _register(self, channel, client, type):
          if (channel, client, type) in self._chan_to_sock:
              self._unregister(channel, client, type)
          if client.connection._sock is None:   # not connected yet.
              client.connection.connect()
              
          sock = client.connection._sock
          self._fd_to_chan[sock.fileno()] = (channel, type)
          self._chan_to_sock[(channel, client, type)] = sock
          self.poller.register(sock, self.eventflags)
      

      這里的client是Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>

      注意到這里client.connection._sock的數值是socket。

      client.connection._sock = {socket} <socket.socket fd=8, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 52353), raddr=('127.0.0.1', 6379)>
       family = {AddressFamily} AddressFamily.AF_INET
       proto = {int} 6
       timeout = {NoneType} None
       type = {SocketKind} SocketKind.SOCK_STREAM
      

      經過此階段之后。

      _fd_to_chan有意義,具體fd是 chanel 對應的 redis socket的fd

      def _register(self, channel, client, type):
          if (channel, client, type) in self._chan_to_sock:
              self._unregister(channel, client, type)
          if client.connection._sock is None:   # not connected yet.
              client.connection.connect()
          sock = client.connection._sock
          self._fd_to_chan[sock.fileno()] = (channel, type)
          self._chan_to_sock[(channel, client, type)] = sock
          self.poller.register(sock, self.eventflags)
      

      這里就是把channel與自己對應的socket聯系起來,也把channel與socket的file聯系起來

      變量如下:

      self = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7f9056a436a0>
       after_read = {set: 0} set()
       eventflags = {int} 25
       fds = {dict: 1} 
        8 = {tuple: 2} (<kombu.transport.redis.Channel object at 0x7f9056a57278>, 'BRPOP')
        __len__ = {int} 1
       poller = {_poll} <kombu.utils.eventio._poll object at 0x7f9056583048>
      

      這樣,從 socket fd 可以找到 對應的 channel,也能從 channel 找到 對應的 socket fd 。

      如下圖:

      +----------------------------------------------------------------------------------+
      |                                                                                  |
      |   MultiChannelPoller                                                             |
      |                                                                                  |
      |                                       +---------------------------------------+  |
      |                                       |  socket fd 1 : [ Channel 1, 'BRPOP']  |  |
      |           fds   +------------------>  |                                       |  |
      |                                       |  socket fd 2 : [ Channel 2, 'BRPOP']  |  |
      |                                       |                                       |  |
      |                                       |             ......                    |  |
      |                                       |                                       |  |
      |                                       |  socket fd 3 : [ Channel 3, 'BRPOP']  |  |
      |                                       +---------------------------------------+  |
      |                                                                                  |
      |                                                                                  |
      |                                                                                  |
      +----------------------------------------------------------------------------------+
      
      6.3.1.2 注冊到Poll

      繼續處理register,就是把socket注冊到poll

      class _poll:
      
          def __init__(self):
              self._poller = xpoll()
              self._quick_poll = self._poller.poll
              self._quick_register = self._poller.register
              self._quick_unregister = self._poller.unregister
      
          def register(self, fd, events):
              fd = fileno(fd)
              poll_flags = 0
              if events & ERR:
                  poll_flags |= POLLERR
              if events & WRITE:
                  poll_flags |= POLLOUT
              if events & READ:
                  poll_flags |= POLLIN
              self._quick_register(fd, poll_flags)
              return fd
      

      此時如下,我們僅僅以 fd 3 為例:

      下面就是 Channel ---> socket ---> poll ---> fd 這條通路。

      +----------------------------------------------------------------------------------+
      |                                                                                  |
      |   MultiChannelPoller                                                             |
      |                                                                                  |
      |                                       +---------------------------------------+  |
      |                                       |  socket fd 1 : [ Channel 1, 'BRPOP']  |  |
      |           fds   +------------------>  |                                       |  |
      |                                       |  socket fd 2 : [ Channel 2, 'BRPOP']  |  |
      |                                       |                                       |  |
      |                                       |             ......                    |  |
      |                                       |                                       |  |
      |                                       |  socket fd 3 : [ Channel 3, 'BRPOP']  |  |
      |                                       |      +                                |  |
      |                                       |      |                                |  |
      |                                       +---------------------------------------+  |
      |                                              |                                   |
      +----------------------------------------------------------------------------------+
                                                     |
                                                     |
                                                     v
      
                                                  poll with OS
      
      6.3.1.3 _brpop_start

      若這個 connection 還沒有對 epoll 其效果,就發送一個 _brpop_start作用為選擇下一次讀取的queue

      _brpop_start如下:

      def _brpop_start(self, timeout=1):
          queues = self._queue_cycle.consume(len(self.active_queues))
          if not queues:
              return
          keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps
                  for queue in queues] + [timeout or 0]
          self._in_poll = self.client.connection
          self.client.connection.send_command('BRPOP', *keys)
      

      此時stack如下:

      _register, redis.py:296
      _register_BRPOP, redis.py:312
      on_poll_start, redis.py:328
      on_poll_start, redis.py:1072
      create_loop, hub.py:294
      run_once, hub.py:193
      run_forever, hub.py:185
      main, testUb.py:51
      <module>, testUb.py:55
      

      此時如下,現在我們有兩條通路:

      • Channel ---> socket ---> poll ---> fd 這條通路;
      • MultiChannelPoller ---> 讀取 redis 這條通路;
      • 因為這個時候 下一次 讀取的 queue 已經確定了,所以已經 打通 Redis queue ----> Channel ---> socket ---> poll ---> fd 這條通路了。
      +----------------------------------------------------------------------------------+
      |                                                                                  |
      |   MultiChannelPoller                                                             |
      |                                                                                  |
      |                                       +---------------------------------------+  |
      |                                       |  socket fd 1 : [ Channel 1, 'BRPOP']  |  |
      |           fds   +------------------>  |                                       |  |
      |                                       |  socket fd 2 : [ Channel 2, 'BRPOP']  |  |
      |                                       |                                       |  |
      |                                       |             ......                    |  |
      |                                       |                                       |  |
      |                                       |  socket fd 3 : [ Channel 3, 'BRPOP']  |  |
      |          connection                   |      +                                |  |
      |              +                        |      |                                |  |
      |              |                        +---------------------------------------+  |
      |              |                               |                                   |
      +----------------------------------------------------------------------------------+
                     |                               |
                     |                               |
                     v                               v
      
              Redis Queue   +----------------->   poll with OS
      
      

      6.3.2 _register_LISTEN

      本文沒有相關部分,如果有topic 相關則會調用這里。Celery event 就利用了這種方法

      def _register_LISTEN(self, channel):
          """Enable LISTEN mode for channel."""
          if not self._client_registered(channel, channel.subclient, 'LISTEN'):
              channel._in_listen = False
              self._register(channel, channel.subclient, 'LISTEN')
          if not channel._in_listen:
              channel._subscribe()  # send SUBSCRIBE
      

      注冊如下:

      _subscribe, redis.py:656
      _register_LISTEN, redis.py:322
      on_poll_start, redis.py:330
      on_poll_start, redis.py:1072
      create_loop, hub.py:294
      asynloop, loops.py:81
      start, consumer.py:592
      start, bootsteps.py:116
      start, consumer.py:311
      start, bootsteps.py:365
      start, bootsteps.py:116
      start, worker.py:204
      worker, worker.py:327
      

      此時變量如下:

      c = {PubSub} <redis.client.PubSub object at 0x7fb09e750400>
      keys = {list: 1}
       0 = {str} '/0.celery.pidbox'
           
      self = {Channel} <kombu.transport.redis.Channel object at 0x7fb09e6c8c88>
      

      6.4 注冊 reader in MultiChannelPoller

      上面可以看到,把所有的 channel 注冊到 poll上,對所有的 queue 都發起了監聽請求,也就是說任一個隊列有消息過來,那么都會被響應到,那么響應給誰呢?需要看看 add_reader 這個函數做了啥:

      就是說,前面那些注冊到 poll,其實沒有注冊響應方法,現在需要注冊

      復習下,add_reader 在 on_poll_start 這里。

      def on_poll_start():
          cycle_poll_start()
          [add_reader(fd, on_readable, fd) for fd in cycle.fds]
      

      cycle.fds 具體是得到了所有fd。

      @property
      def fds(self):
          return self._fd_to_chan
      

      具體添加是在 Hub 類中。

      • 這里會再次嘗試添加。
      • 然后會把 fd 與 callback 聯系起來。
      class Hub:
          def add_reader(self, fds, callback, *args):
              return self.add(fds, callback, READ | ERR, args)
      
          def add(self, fd, callback, flags, args=(), consolidate=False):
              fd = fileno(fd)
              try:
                  self.poller.register(fd, flags)
              except ValueError:
                  self._remove_from_loop(fd)
                  raise
              else:
                  dest = self.readers if flags & READ else self.writers
                  if consolidate:
                      self.consolidate.add(fd)
                      dest[fd] = None
                  else:
                      dest[fd] = callback, args
      
      

      注意,這里設置的是:hub 的成員變量,self.readers ,其在后續 poll 消息產生的就用到了,就調用這些callback,就是 Transport.on_readable。

      readers = {dict: 1} 
       8 = {tuple: 2} (<bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7faee4128f98>>, (8,))
        0 = {method} <bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7faee4128f98>>
        1 = {tuple: 1} 8
      

      stack為:

      register, eventio.py:187
      add, hub.py:164
      add_reader, hub.py:213
      <listcomp>, redis.py:1073
      on_poll_start, redis.py:1073
      create_loop, hub.py:294
      run_once, hub.py:193
      run_forever, hub.py:185
      main, testUb.py:51
      <module>, testUb.py:55
      

      所以此時為如下,依然不知道響應給誰

      +----------------------------------------------------------------------------------+
      |                                                                                  |
      |   MultiChannelPoller                                                             |
      |                                                                                  |
      |                                       +---------------------------------------+  |
      |                                       |  socket fd 1 : [ Channel 1, 'BRPOP']  |  |
      |           fds   +------------------>  |                                       |  |
      |                                       |  socket fd 2 : [ Channel 2, 'BRPOP']  |  |
      |                                       |                                       |  |
      |                                       |             ......                    |  |
      |                                       |                                       |  |
      |                                       |  socket fd 3 : [ Channel 3, 'BRPOP']  |  |
      |          connection                   |      +                                |  |
      |              +                        |      |                                |  |
      |              |                        +---------------------------------------+  |
      |              |                               |                                   |
      +----------------------------------------------------------------------------------+
                     |                               |
                     |                               |
                     v                               v
      
              Redis Queue   +------------------>  poll with OS
      
      
      
       +---------------------------------------------------------------------------------+
       |                                                                                 |
       |    Hub                                                                          |
       |                                     +--------------------------------------+    |
       |                                     |fd 3 : [ Transport.on_readable, fd 3] |    |
       |                                     |                                      |    |
       |       readers  +------------------> |       ......                         |    |
       |                                     |                                      |    |
       |                                     |fd 1 : [ Transport.on_readable, fd 1] |    |
       |                                     +--------------------------------------+    |
       |                                                                                 |
       +---------------------------------------------------------------------------------+
      
      

      因為這個流程十分復雜,為了簡化,我們這里提前劇透,在 消費函數時候,Transport 會設置 自己的 _callbacks[queue] 為一個回調函數,所以 MultiChannelPoller 讀取 queue 這部分也可以聯系起來

          def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
              """Consume from `queue`."""
              self._tag_to_queue[consumer_tag] = queue
              self._active_queues.append(queue)
      
              def _callback(raw_message):
                  message = self.Message(raw_message, channel=self)
                  if not no_ack:
                      self.qos.append(message, message.delivery_tag)
                  return callback(message)
      
              self.connection._callbacks[queue] = _callback # 這里設置
              
              self._consumers.add(consumer_tag)
      
              self._reset_cycle()
      

      6.5 啟動timer

      然后是啟動poll的timer,定期做業務操作。

      poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
      #  print('[[[HUB]]]: %s' % (self.repr_active(),))
      if readers or writers:
          to_consolidate = []
          try:
              events = poll(poll_timeout)
              #  print('[EVENTS]: %s' % (self.repr_events(events),))
          except ValueError:  # Issue celery/#882
              return
      

      6.6 poll

      然后是進行poll,若對應的file有消息,就處理(讀取redis中的內容),然后進行下一次poll。

      對于我們例子,下面簡略版代碼就是進行Poll:

      poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
      
      if readers or writers:
          to_consolidate = []
          try:
              events = poll(poll_timeout)
          except ValueError:  # Issue celery/#882
              return
      
          for fd, event in events or ():
              cb = cbargs = None
      
              if event & READ:
                  try:
                      cb, cbargs = readers[fd]
              elif event & WRITE:
                  try:
                      cb, cbargs = writers[fd]
      
              if isinstance(cb, generator):
                  next(cb)
      
              else:
                  try:
                      cb(*cbargs)
                  except Empty:
                      pass
      else:
          # no sockets yet, startup is probably not done.
          sleep(min(poll_timeout, 0.1))
      yield
      

      6.6.1 poll方法

      具體的poll方法如下,就是調用系統的方法來進行poll:

      def poll(self, timeout, round=math.ceil,
               POLLIN=POLLIN, POLLOUT=POLLOUT, POLLERR=POLLERR,
               READ=READ, WRITE=WRITE, ERR=ERR, Integral=Integral):
          timeout = 0 if timeout and timeout < 0 else round((timeout or 0) * 1e3)
          event_list = self._quick_poll(timeout)
      
          ready = []
          for fd, event in event_list:
              events = 0
              if event & POLLIN:
                  events |= READ
              if event & POLLOUT:
                  events |= WRITE
              if event & POLLERR or event & POLLNVAL or event & POLLHUP:
                  events |= ERR
              assert events
              if not isinstance(fd, Integral):
                  fd = fd.fileno()
              ready.append((fd, events))
          return ready
      

      6.6.2 callback

      在 create_loop 代碼中可以看到

      def create_loop(self,
                      generator=generator, sleep=sleep, min=min, next=next,
                      Empty=Empty, StopIteration=StopIteration,
                      KeyError=KeyError, READ=READ, WRITE=WRITE, ERR=ERR):
          readers, writers = self.readers, self.writers
          
          cb, cbargs = readers[fd]
          cb(*cbargs)
          
      

      這就是說,poll回調的時候,會調用reader中對應fd的回調函數來處理。

      readers就是在之前 6.4 那節 設定的。

      其內容是,就是 8 這個fd 對應的回調函數是Transport.on_readable:

      readers = {dict: 1} 
       8 = {tuple: 2} (<bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7ffe7482ddd8>>, (8,))
        0 = {method} <bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7ffe7482ddd8>>
        1 = {tuple: 1} 8
        __len__ = {int} 2
      

      因此回調到<kombu.transport.redis.Transport object at 0x7ffe7482ddd8>。

      def on_readable(self, fileno):
          """Handle AIO event for one of our file descriptors."""
          self.cycle.on_readable(fileno)
      

      進而調用到

      <kombu.transport.redis.MultiChannelPoller object at 0x7faee4166d68>

      def on_readable(self, fileno):
          chan, type = self._fd_to_chan[fileno]
          if chan.qos.can_consume():
              chan.handlers[type]()
      

      從 socket fd 可以找到 對應的 channel,也能從 channel 找到 對應的 socket fd 。從 channel 找到 channel 的 callback。

      對應 self._fd_to_chan[fileno],取出 socket fd 對應 callback,進行處理。這里的callback如下:

      handlers = {dict: 2}
       'BRPOP' = {method} <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7faee418dfd0>>
       'LISTEN' = {method} <bound method Channel._receive of <kombu.transport.redis.Channel object at 0x7faee418dfd0>>
      

      于是調用 Channel._brpop_read 或者 Channel._receive 從redis 中 讀取消息。

      具體調用堆棧如下:

      _brpop_read, redis.py:734
      on_readable, redis.py:358
      on_readable, redis.py:1087
      create_loop, hub.py:361
      run_once, hub.py:193
      run_forever, hub.py:185
      main, testUb.py:51
      <module>, testUb.py:55
      

      邏輯如下:

      +--------------+    socket
      |     redis    | <------------> port +-->  fd +--->+                    +--->  channel +--> handlers  'BRPOP' = Channel._brpop_read
      |              |                                   |                    |                             'LISTEN' = Channel._receive
      |              |    socket                         |                    |
      |              | <------------> port +-->  fd +--->---> _fd_to_chan +------->  channel +--> handlers  'BRPOP' = Channel._brpop_read
      |  port=6379   |                                   |                    |                             'LISTEN' = Channel._receive
      |              |    socket                         |                    |
      |              | <------------> port +-->  fd +--->+                    +--->  channel +--> handlers  'BRPOP' = Channel._brpop_read
      +--------------+                                                                                      'LISTEN' = Channel._receive
      
      

      此時手機為:

      如果加入poll,則如下:

                  +---------------------------------------------------------------------------------------------------------------------------------------+
                  |                                     +--------------+                                   6                       parse_response         |
                  |                                +--> | Linux Kernel | +---+                                                                            |
                  |                                |    +--------------+     |                                                                            |
                  |                                |                         |                                                                            |
                  |                                |                         |  event                                                                     |
                  |                                |  1                      |                                                                            |
                  |                                |                         |  2                                                                         |
                  |                                |                         |                                                                            |
          +-------+---+    socket                  +                         |                                                                            |
          |   redis   | <------------> port +-->  fd +--->+                  v                                                                            |
          |           |                                   |           +------+--------+                                                                   |
          |           |    socket                         |           |  Hub          |                                                                   |
          |           | <------------> port +-->  fd +--->----------> |               |                                                                   |
          | port=6379 |                                   |           |               |                                                                   |
          |           |    socket                         |           |     readers +----->  Transport.on_readable                                        |
          |           | <------------> port +-->  fd +--->+           |               |                     +                                             |
          +-----------+                                               +---------------+                     |                                             |
                                                                                                            |                                             |
                                                              3                                             |                                             |
                   +----------------------------------------------------------------------------------------+                                             |
                   |                                                                                                                                      v
                   |                                                                                                                                                  _receive_callback
                   |                                                                                                                            5    +-------------+                      +-----------+
      +------------+------+                     +-------------------------+                                    'BRPOP' = Channel._brpop_read +-----> | Channel     | +------------------> | Consumer  |
      |       Transport   |                     |  MultiChannelPoller     |      +--->  channel +--> handlers  'LISTEN' = Channel._receive           +-------------+                      +---+-------+
      |                   |                     |                         |      |                                                                                                            |
      |                   | on_readable(fileno) |                         |      |                                                                         ^                                  |
      |           cycle +---------------------> |          _fd_to_chan +------------->  channel +--> handlers  'BRPOP' = Channel._brpop_read               |                                  |
      |                   |        4            |                         |      |                             'LISTEN' = Channel._receive                 |                                  |
      |  _callbacks[queue]|                     |                         |      |                                                                         |                            on_m  |
      |          +        |                     +-------------------------+      +--->  channel +--> handlers  'BRPOP' = Channel._brpop_read               |                                  |
      +-------------------+                                                                                    'LISTEN' = Channel._receive                 |                                  |
                 |                                                                                                                                         |                                  v
                 |                                                7           _callback                                                                    |
                 +-----------------------------------------------------------------------------------------------------------------------------------------+                            User Function
      
      

      此時手機為:

      0x07 接收消息

      現在消息已經被放置于redis 隊列中,那么消息又被如何使用呢?

      從上節得知,當poll提示有消息時候,會通過 Channel._brpop_read 或者 Channel._receive 從 redis 中 讀取消息。

      具體堆棧如下:

      _brpop_read, redis.py:734
      on_readable, redis.py:358
      on_readable, redis.py:1087
      create_loop, hub.py:361
      run_once, hub.py:193
      run_forever, hub.py:185
      main, testUb.py:51
      <module>, testUb.py:55
      

      即:在 hub 的 loop中,通過 redis 驅動代碼 從 redis 隊列中取出消息,然后調用Transport傳遞過來的_deliver方法,最后調用userfunction

      def _brpop_read(self, **options):
          try:
              try:
                  dest__item = self.client.parse_response(self.client.connection,
                                                          'BRPOP',
                                                          **options)
              except self.connection_errors:
                  # if there's a ConnectionError, disconnect so the next
                  # iteration will reconnect automatically.
                  self.client.connection.disconnect()
                  raise
              if dest__item:
                  dest, item = dest__item
                  dest = bytes_to_str(dest).rsplit(self.sep, 1)[0]
                  self._queue_cycle.rotate(dest)
                  self.connection._deliver(loads(bytes_to_str(item)), dest) #調用用戶function
                  return True
              else:
                  raise Empty()
          finally:
              self._in_poll = None
      

      7.1 從驅動讀取

      7.1.1 從redis讀取

      這里會從redis驅動讀取,文件是 redis/connection.py,具體就是通過 SocketBuffer 類從 redis 對應的 socket 讀取。代碼為:

      def readline(self):
          buf = self._buffer
          buf.seek(self.bytes_read)
          data = buf.readline()
          while not data.endswith(SYM_CRLF):
              # there's more data in the socket that we need
              self._read_from_socket()
              buf.seek(self.bytes_read)
              data = buf.readline()
      
          self.bytes_read += len(data)
      
          # purge the buffer when we've consumed it all so it doesn't
          # grow forever
          if self.bytes_read == self.bytes_written:
              self.purge()
      
          return data[:-2]
      

      當讀到 response 之后,調用 Redis驅動中對應命令的 回調方法來處理。此處命令為BRPOP。回調方法為:string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None)

      代碼為:

      def parse_response(self, connection, command_name, **options):
          "Parses a response from the Redis server"
          try:
              response = connection.read_response()
          except ResponseError:
              if EMPTY_RESPONSE in options:
                  return options[EMPTY_RESPONSE]
              raise
          if command_name in self.response_callbacks:
              return self.response_callbacks[command_name](response, **options)
          return response
      

      此時上下文相關變量為:

      command_name = {str} 'BRPOP'
      connection = {Connection} Connection<host=localhost,port=6379,db=0>
      options = {dict: 0} {}
      self = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
       connection = {Connection} Connection<host=localhost,port=6379,db=0>
       connection_pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
       response_callbacks = {CaseInsensitiveDict: 179} {.
        'AUTH' = {type} <class 'bool'>
        'EXPIRE' = {type} <class 'bool'>
      	.....
        'LLEN' = {type} <class 'int'>
        'LPUSHX' = {type} <class 'int'>
        'PFADD' = {type} <class 'int'>
        'PFCOUNT' = {type} <class 'int'>
      		......
        'SWAPDB' = {function} <function bool_ok at 0x7fbad4276620>
        'WATCH' = {function} <function bool_ok at 0x7fbad4276620>
        'UNWATCH' = {function} <function bool_ok at 0x7fbad4276620>
        'BLPOP' = {function} <function Redis.<lambda> at 0x7fbad4276f28>
        'BRPOP' = {function} <function Redis.<lambda> at 0x7fbad4276f28>
         ....
      

      這些代碼堆棧如下:

      readline, connection.py:251
      read_response, connection.py:324
      read_response, connection.py:739
      parse_response, client.py:915
      _brpop_read, redis.py:738
      on_readable, redis.py:358
      handle_event, redis.py:362
      get, redis.py:380
      drain_events, base.py:960
      drain_events, connection.py:318
      main, testUb.py:50
      <module>, testUb.py:53
      

      7.2 分發消息

      loop從驅動得到消息之后,進行 deliver 分發。

      self.connection._deliver(loads(bytes_to_str(item)), dest)
      

      所做的事情是根據隊列取出注冊到此隊列的回調函數列表,然后對消息執行列表中的所有回調函數

      def _deliver(self, message, queue):
          try:
              callback = self._callbacks[queue]
          except KeyError:
              logger.warning(W_NO_CONSUMERS, queue)
              self._reject_inbound_message(message)
          else:
              callback(message)
      

      7.2.1 找到callback

      此時 self是

      <kombu.transport.redis.Transport object at 0x7faee4128f98>

      callback如下:

      self._callbacks = {dict: 1} 
       'asynt_queue' = {function} <function Channel.basic_consume.<locals>._callback at 0x7faee244a2f0>
      

      這里意味著 asynt_queue 這個 queue 對應的 callback 是 Channel.basic_consume。

      7.2.2 何時設定callback

      調用的 callback 是 Channel 這里定義的。basic_consume就是把傳入的參數 callback 數值,實際這個傳入的參數 callback就是 Consumer. _receive_callback。

      def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
          """Consume from `queue`."""
          self._tag_to_queue[consumer_tag] = queue
          self._active_queues.append(queue)
      
          def _callback(raw_message):
              message = self.Message(raw_message, channel=self)
              if not no_ack:
                  self.qos.append(message, message.delivery_tag)
              return callback(message)
      
          self.connection._callbacks[queue] = _callback
          self._consumers.add(consumer_tag)
      
          self._reset_cycle()
      

      設置是在上面函數里面這句,

          self.connection._callbacks[queue] = _callback
      

      stack如下:

      basic_consume, base.py:632
      basic_consume, redis.py:598
      consume, entity.py:738
      _basic_consume, messaging.py:594
      consume, messaging.py:473
      __enter__, messaging.py:430
      main, testUb.py:46
      <module>, testUb.py:55
      

      7.2.3 調用到用戶方法

      Consumer的函數定義如下:

      def _receive_callback(self, message):
          accept = self.accept
          on_m, channel, decoded = self.on_message, self.channel, None
          try:
              m2p = getattr(channel, 'message_to_python', None)
              if m2p:
                  message = m2p(message)
              if accept is not None:
                  message.accept = accept
              if message.errors:
                  return message._reraise_error(self.on_decode_error)
              decoded = None if on_m else message.decode()
          except Exception as exc:
              if not self.on_decode_error:
                  raise
              self.on_decode_error(message, exc)
          else:
              return on_m(message) if on_m else self.receive(decoded, message)
      

      self.on_message就是用戶方法,所以最終調用到用戶方法

      on_message, testUb.py:36
      _receive_callback, messaging.py:620
      _callback, base.py:630
      _deliver, base.py:980
      _brpop_read, redis.py:748
      on_readable, redis.py:358
      on_readable, redis.py:1087
      create_loop, hub.py:361
      run_once, hub.py:193
      run_forever, hub.py:185
      main, testUb.py:51
      <module>, testUb.py:55
      

      此時如下:

      +----------------------+               +-------------------+
      | Producer             |               | Channel           |
      |                      |               |                   |        +-----------------------------------------------------------+
      |                      |               |    client  +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
      |      channel   +------------------>  |                   |        +-----------------------------------------------------------+
      |                      |               |    pool           |
      |      exchange        |   +---------> |                   | <------------------------------------------------------------+
      |                      |   |           |                   |                                                              |
      |      connection      |   |    +----> |    connection +---------------+                                                  |
      |             +        |   |    |      |                   |           |                                                  |
      |             |        |   |    |      +-------------------+   +----------------------------------------------------------------+
      +--+-------------------+   |    |                              |       v                                                  |     |
         |          |            |    |      +-------------------+   |   +---+-----------------+       +--------------------+   |     |
         |          |            |    |      | Connection        |   |   | redis.Transport     |       | MultiChannelPoller |   |     |
         |          +----------------------> |                   |   |   |                     |       |                    |   |     |
         |                       |    |      |     _sock  <----------+   |                     |       |     _channels +--------+     |
         |                       |    |      |                   |       |        cycle +------------> |     _fd_to_chan    |         |
         |                       |    |      |     transport +---------> |                     |       |     _chan_to_sock+-----------+
         |             +-------->+    |      |                   |       |                     |    +------+ poller         |
         |             |              |      +-------------------+       +---------------------+    |  |     after_read     |
         |             |              |                                                             |  |                    |
         |             |              |                                                             |  +--------------------+
         |             |              |      +------------------+                   +---------------+
         |             |              |      | Hub              |                   |
         |             |              |      |                  |                   v
         |             |              |      |                  |            +------+------+
         |             |              |      |      poller +---------------> | _poll       |
         | publish     |              |      |                  |            |             |         +-------+
         +--------------------------------+  |                  |            |    _poller+---------> |  poll |
                       |              |   |  +------------------+            |             |         +-------+
                       |              |   |                                  +-------------+
          +-------------------+       |   +-----> +----------------+
          | Queue      |      |       |           | Exchange       |
          |      _channel     |       +---------+ |                |
          |                   |                   |                |
          |      exchange +-------------------->  |     channel    |
          |                   |                   |                |
          |                   |                   |                |
          +-------------------+                   +----------------+
      
      

      手機如下:

      動態邏輯如下:

       +-----+           +-----------+     +--------------------+ +---------+ +-------+ +--------+
       | Hub |           | Transport |     | MultiChannelPoller | |  _poll  | |Channel| |Consumer|
       +--+--+           +----+------+     +------------+-------+ +----+----+ +---+---+ +------+-+
          |                   |                         |              |          |            |
          v                   |                         |              |          |            |
      create_loop             |                         |              |          |            |
          +                   |                         |              |          |            |
          |   on_poll_start   |                         |              |          |            |
          |                   |                         |              |          |            |
          | +---------------> |     on_poll_start       |              |          |            |
          |                   |                         |              |          |            |
          |                   | +-------------------->  |              |          |            |
          |                   |                         |              |          |            |
          |                   |                  _register_BRPOP       |          |            |
          |                   |                         |              |          |            |
          |               add_reader                    |   register   |          |            |
          |                   +                         | +----------> |          |            |
          |                   |         register        |              |          |            |
       fire_timers            | +------------------------------------> |          |            |
          |                   |                         |              |          |            |
          |    poll           |                         |              |          |            |
          | +--------------------------------------------------------> |          |            |
          |                   |                         |              |          |            |
          |                   |                         |              |          |            |
          +                   |                         |              |          |            |
      for fd, event in events |                         |              |          |            |
          |                   |                         |              |          |            |
          |                   |                         |              |          |            |
      cb, cbargs = readers[fd]|                         |              |          |            |
          +                   |                         |              |          |            |
          |                   |                         |              |          |            |
          |                   |                         |              |          |            |
       cb(*cbargs             |                         |              |          |            |
          +                   |                         |              |          |            |
          |   on_readable     |                         |              |          |            |
          |                   |                         |              |          |            |
          | +-------------->  |    on_readable          |              |          |            |
          |                   |                         |              |          |            |
          |                   +-----------------------> |              |          |            |
          |                   |                         |              |          |            |
          |                   |                         |              |          |            |
          |                   |        chan, type = _fd_to_chan[fileno]|          |            |
          |                   |                         |              |          |            |
          |                   |                         |  _brpop_read |          |            |
          |                   |                         |              |          |            |
          |                   |                         | +---------------------> |            |
          |                   |     _deliver            |              |          |            |
          |                   |                         |              |          |            |
          |                   |  <----------------------------------------------+ |            |
          |                   |                         |              |          |            |
          |                   |                         |              |          |            |
          |                   |     _callback           |              |          |            |
          |                   |                         |              |          |            |
          |                   |  +----------------------------------------------> |            |
          |                   |                         |              |          +            +
          |                   |                         |              |         _receive_callback
          |                   |                         |              |          |            +
          |                   |                         |              |          | +--------->+
          |                   |                         |              |          |            |
          v                   v                         v              v          v            v
      
      

      手機如下:

      0xFF 參考

      celery 7 優秀開源項目kombu源碼分析之registry和entrypoint

      (二)放棄pika,選擇kombu

      kombu消息框架<二>

      AMQP中的概念

      AMQP的基本概念

      深入理解AMQP協議

      kombu和消息隊列總結

      關于epoll版服務器的理解(Python實現)

      celery源碼解讀

      Kombu源碼分析(一)概述

      posted @ 2021-03-16 06:37  羅西的思考  閱讀(833)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 精品视频福利| 亚洲乱码日产精品bd在线看| 无码日韩精品一区二区三区免费 | 蕲春县| 图片区 小说区 区 亚洲五月| 正在播放酒店约少妇高潮| 精品一区二区三区四区色| 亚洲色av天天天天天天| 亚洲永久精品日本久精品| 日本中文字幕有码在线视频| 国产午夜福利免费入口| 国内精品久久人妻无码妲| 欧美一区二区三区欧美日韩亚洲| 人妻精品久久无码区| 亚洲av日韩在线资源| 日韩欧美亚洲综合久久| 亚洲国产成人AⅤ毛片奶水| 狠狠婷婷综合久久久久久| 九九热免费精品在线视频| 青青青爽在线视频观看| 四虎成人精品无码| 国产丰满乱子伦无码专区 | 夜夜添无码一区二区三区| 丰满少妇人妻久久久久久| 久久人妻精品大屁股一区| 朔州市| 国产一区二区在线有码| 狠狠色综合久久狠狠色综合| 野外做受三级视频| 亚洲综合无码日韩国产加勒比| 亚洲伊人久久精品影院| 国产亚洲精品久久久久婷婷图片 | 国产午夜精品无码一区二区 | 日韩免费码中文在线观看| 国产成人精品1024免费下载| 国产精品久久久久aaaa| 亚洲va在线∨a天堂va欧美va| 午夜福利一区二区在线看| 深田えいみ禁欲后被隔壁人妻 | 亚洲一区二区三区激情在线| 国产精品福利自产拍在线观看 |