<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      [源碼分析] 消息隊列 Kombu 之 Consumer

      [源碼分析] 消息隊列 Kombu 之 Consumer

      0x00 摘要

      本系列我們介紹消息隊列 Kombu。Kombu 的定位是一個兼容 AMQP 協(xié)議的消息隊列抽象。通過本文,大家可以了解 Kombu 中的 Consumer 概念。

      0x01 綜述功能

      Consumer 的作用主要如下:

      • Exchange:MQ 路由,消息發(fā)送者將消息發(fā)至Exchange,Exchange負(fù)責(zé)將消息分發(fā)至隊列。
      • Queue:對應(yīng)的隊列抽象,存儲著即將被應(yīng)用消費掉的消息,Exchange負(fù)責(zé)將消息分發(fā)Queue,消費者從Queue接收消息;
      • Consumers : 是接受消息的抽象類,consumer需要聲明一個queue,并將queue與指定的exchange綁定,然后從queue里面接收消息。就是說,從用戶角度,知道了一個 exchange,就可以從中讀取消息,具體這個消息就是從 queue 中讀取的。

      在具體的實現(xiàn)中,Consumer 把 queue 與 channel 聯(lián)系起來。queue 里面有一個 channel,用來訪問redis。Queue 也有 Exchange,知道訪問具體 redis 哪個key(就是queue對應(yīng)的那個key)。即 Consumer 消費消息是通過 Queue 來消費,然后 Queue 又轉(zhuǎn)嫁給 Channel。

      所以服務(wù)端的邏輯大致為:

      1. 建立連接;
      2. 創(chuàng)建Exchange ;
      3. 創(chuàng)建Queue,并將Exchange與Queue綁定,Queue的名稱為routing_key ;
      4. 創(chuàng)建Consumer對Queue監(jiān)聽;

      0x02 示例代碼

      下面使用如下代碼來進(jìn)行說明。

      本示例來自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感謝。

      def main(arguments):
          hub = Hub()
          exchange = Exchange('asynt_exchange')
          queue = Queue('asynt_queue', exchange, 'asynt_routing_key')
      
          def send_message(conn):
              producer = Producer(conn)
              producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')
              print('message sent')
      
          def on_message(message):
              print('received: {0!r}'.format(message.body))
              message.ack()
              # hub.stop()  # <-- exit after one message
      
          conn = Connection('redis://localhost:6379')
          conn.register_with_event_loop(hub)
      
          def p_message():
              print(' kombu ')
      
          with Consumer(conn, [queue], on_message=on_message):
              send_message(conn)
              hub.timer.call_repeatedly(3, p_message)
              hub.run_forever()
      
      if __name__ == '__main__':
          sys.exit(main(sys.argv[1:]))
      

      前文已經(jīng)完成了構(gòu)建部分,下面來到了Consumer部分,即如下代碼:

      with Consumer(conn, [queue], on_message=on_message):
          send_message(conn)
          hub.timer.call_repeatedly(
              3, p_message
          )
          hub.run_forever()
      

      0x03 定義

      3.1 定義

      Consumer主要成員變量如下:

      • channel:存在 (kombu.Connection, Channel) 這兩種可能,一個 Connection 就對應(yīng)一個 MQ 的連接,Channel可以理解成共享一個Connection的多個輕量化連接。
      • queues:(Sequence[kombu.Queue])類型。對應(yīng) queue 抽象,存儲著即將被應(yīng)用消費掉的消息,Exchange負(fù)責(zé)將消息分發(fā)Queue,消費者從Queue接收消息
      • on_message:消息響應(yīng)方法;

      這也是調(diào)用時傳入的變量。

      class Consumer:
          """Message consumer.
      
          Arguments:
              channel (kombu.Connection, ChannelT): see :attr:`channel`.
              queues (Sequence[kombu.Queue]): see :attr:`queues`.
              no_ack (bool): see :attr:`no_ack`.
              auto_declare (bool): see :attr:`auto_declare`
              callbacks (Sequence[Callable]): see :attr:`callbacks`.
              on_message (Callable): See :attr:`on_message`
              on_decode_error (Callable): see :attr:`on_decode_error`.
              prefetch_count (int): see :attr:`prefetch_count`.
          """
          
          #: The connection/channel to use for this consumer.
          channel = None
      
          #: A single :class:`~kombu.Queue`, or a list of queues to
          #: consume from.
          queues = None
      
          #: Flag for automatic message acknowledgment.
          no_ack = None
      
          #: List of callbacks called in order when a message is received.
          callbacks = None
      
          #: Optional function called whenever a message is received.
          on_message = None
      
          #: List of accepted content-types.
          accept = None
      
          #: Initial prefetch count
          prefetch_count = None
      
          #: Mapping of queues we consume from.
          _queues = None
      
          _tags = count(1)   # global
      

      3.2 Queue

      我們也給出 Queue 的定義,其中主要成員變量如下:

      • exchange (Exchange): 就是 queue 綁定的 Exchange;
      • routing_key (str): 就是 queue 對應(yīng)的 key;
      • channel :queue 綁定的 信道;

      具體定義如下:

      class Queue(MaybeChannelBound):
          """A Queue declaration.
      
              channel (ChannelT): The channel the Queue is bound to (if bound).
      
          """
      
          ContentDisallowed = ContentDisallowed
      
          name = ''
          exchange = Exchange('')
          routing_key = ''
      
          durable = True
          exclusive = False
          auto_delete = False
          no_ack = False
      
          attrs = (
              ('name', None),
              ('exchange', None),
              ('routing_key', None),
              ('queue_arguments', None),
              ('binding_arguments', None),
              ('consumer_arguments', None),
              ('durable', bool),
              ('exclusive', bool),
              ('auto_delete', bool),
              ('no_ack', None),
              ('alias', None),
              ('bindings', list),
              ('no_declare', bool),
              ('expires', float),
              ('message_ttl', float),
              ('max_length', int),
              ('max_length_bytes', int),
              ('max_priority', int)
          )
      

      0x04 Init

      在此方法中,先處理調(diào)用,隨之建立聯(lián)系。

      def __init__(self, channel, queues=None, no_ack=None, auto_declare=None,
                   callbacks=None, on_decode_error=None, on_message=None,
                   accept=None, prefetch_count=None, tag_prefix=None):
          self.channel = channel
          self.queues = maybe_list(queues or [])
          self.no_ack = self.no_ack if no_ack is None else no_ack
          self.callbacks = (self.callbacks or [] if callbacks is None
                            else callbacks)
          self.on_message = on_message
          self.tag_prefix = tag_prefix
          self._active_tags = {}
      
          self.accept = prepare_accept_content(accept)
          self.prefetch_count = prefetch_count
      
          if self.channel:
              self.revive(self.channel)
      

      4.1 處理調(diào)用

      4.1.1 queues

      傳入的參數(shù)queues被作為成員變量保存起來。

      self.queues = maybe_list(queues or [])
      

      4.1.2 channel

      傳入的參數(shù)Connection被作為成員變量保存起來。

      self.channel = channel
      

      4.1.3 on_message

      傳入的參數(shù)on_message 作為消息響應(yīng)方法保存起來。

      self.on_message = on_message
      

      4.2 建立聯(lián)系

      用如下方法把 Exchange,Queue 與 Connection 聯(lián)系起來

      def revive(self, channel):
          
          """Revive consumer after connection loss."""
          self._active_tags.clear()
          channel = self.channel = maybe_channel(channel)
          
          # modify dict size while iterating over it is not allowed
          for qname, queue in list(self._queues.items()):
              # name may have changed after declare
              self._queues.pop(qname, None)
              queue = self._queues[queue.name] = queue(self.channel)
              queue.revive(channel)
      
          if self.auto_declare:
              self.declare()
      
          if self.prefetch_count is not None:
              self.qos(prefetch_count=self.prefetch_count)
      

      進(jìn)一步調(diào)用:

      when_bound, entity.py:598
      maybe_bind, abstract.py:76
      bind, abstract.py:70
      bind, entity.py:590
      __call__, abstract.py:66
      revive, messaging.py:400
      __init__, messaging.py:382
      main, testUb.py:46
      <module>, testUb.py:55
      

      由此進(jìn)入到了Queue類。

      4.2.1 channel與queue

      這里用如下方法把queue與channel聯(lián)系起來。queue 里面有一個 channel,用來訪問redis,Queue 也有 Exchange,知道訪問具體 redis 哪里

      每一個 Consumer 初始化的時候都是和 Channel 綁定的,也就是說我們 Consumer 包含了 Queue 也就和 Connection 關(guān)聯(lián)起來了!

      Consumer 消費消息是通過 Queue 來消費,然后 Queue 又轉(zhuǎn)嫁給 Channel

      channel = {Channel} <kombu.transport.redis.Channel object at 0x7f9056a57278>
      
      self = {Queue} <Queue asynt -> <Exchange asynt(direct) bound to chan:1> -> asynt bound to chan:1>
      

      這樣,conneciton就是queue的成員變量。

      def revive(self, channel):
          """Revive channel after the connection has been re-established.
          """
          if self.is_bound:
              self._channel = channel
              self.when_bound()
      

      4.2.2 channel與exchange

      之前我們知道,Queue是包括了exchange成員變量,目前channel也是exchange的成員變量

      Exchange:交換機(jī),消息發(fā)送者將消息發(fā)至Exchange,Exchange負(fù)責(zé)將消息分發(fā)至隊列。

      于是經(jīng)由如下方法,準(zhǔn)備把channel與exchange聯(lián)系起來。

      def when_bound(self):
          if self.exchange:
              self.exchange = self.exchange(self.channel)
      

      此時變量如下:

      channel = {Channel} <kombu.transport.redis.Channel object at 0x7f9056a57278>
      
      self = {Exchange} Exchange asynt(direct)
      

      進(jìn)而直接在Exchange基類,使用方法maybe_bind把channel與exchange聯(lián)系起來。

      class MaybeChannelBound(Object):
          """Mixin for classes that can be bound to an AMQP channel."""
      
          _channel = None
      
          def __call__(self, channel):
              """`self(channel) -> self.bind(channel)`."""
              return self.bind(channel)
      
          def bind(self, channel):
              """Create copy of the instance that is bound to a channel."""
              return copy(self).maybe_bind(channel)
      
          def maybe_bind(self, channel):
              """Bind instance to channel if not already bound."""
              if not self.is_bound and channel:
                  self._channel = maybe_channel(channel)
                  self.when_bound()
                  self._is_bound = True
              return self
      

      4.2.3 Exchange & Binding

      這里會把 Exchange 和 queue 聯(lián)系。就是把 Exchange 和 routing_key 聯(lián)系起來,然后把這些聯(lián)系規(guī)則放到redis 之中

      堆棧如下:

      _queue_bind, redis.py:814
      queue_bind, base.py:568
      bind_to, entity.py:674
      queue_bind, entity.py:662
      _create_queue, entity.py:617
      declare, entity.py:606
      declare, messaging.py:417
      revive, messaging.py:404
      __init__, messaging.py:382
      

      具體為

      class Queue(MaybeChannelBound):
      
          def __init__(self, name='', exchange=None, routing_key='',
                       channel=None, bindings=None, on_declared=None,
                       **kwargs):
              super().__init__(**kwargs)
              self.name = name or self.name
              
              if isinstance(exchange, str):
                  self.exchange = Exchange(exchange)
              elif isinstance(exchange, Exchange):
                  self.exchange = exchange
                  
              self.routing_key = routing_key or self.routing_key
              self.bindings = set(bindings or [])
              self.on_declared = on_declared
      
              # allows Queue('name', [binding(...), binding(...), ...])
              if isinstance(exchange, (list, tuple, set)):
                  self.bindings |= set(exchange)
              if self.bindings:
                  self.exchange = None
      
              # exclusive implies auto-delete.
              if self.exclusive:
                  self.auto_delete = True
              self.maybe_bind(channel)
      
          def queue_bind(self, nowait=False, channel=None):
              """Create the queue binding on the server."""
              return self.bind_to(self.exchange, self.routing_key,
                                  self.binding_arguments,
                                  channel=channel, nowait=nowait)
      
          def bind_to(self, exchange='', routing_key='',
                      arguments=None, nowait=False, channel=None):
              if isinstance(exchange, Exchange):
                  exchange = exchange.name
      
              return (channel or self.channel).queue_bind(
                  queue=self.name,
                  exchange=exchange,
                  routing_key=routing_key,
                  arguments=arguments,
                  nowait=nowait,
              )
      
      4.2.3.1 Channel binding

      具體調(diào)用到Channel,代碼位于 kombu/transport/redis.py。

      def _queue_bind(self, exchange, routing_key, pattern, queue):
          if self.typeof(exchange).type == 'fanout':
              # Mark exchange as fanout.
              self._fanout_queues[queue] = (
                  exchange, routing_key.replace('#', '*'),
              )
          with self.conn_or_acquire() as client:
              client.sadd(self.keyprefix_queue % (exchange,),
                          self.sep.join([routing_key or '',
                                         pattern or '',
                                         queue or '']))
      

      代碼然后調(diào)用到redis client。

      # SET COMMANDS
      def sadd(self, name, *values):
          "Add ``value(s)`` to set ``name``"
          return self.execute_command('SADD', name, *values)
      

      具體變量如下,我們代碼中,exchange內(nèi)容為_kombu.binding.asynt_exchange。routing_key的是asynt_routing_key。

      name = {str} '_kombu.binding.asynt_exchange'
      self = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
      values = {tuple: 1} asynt_routing_keysynt_queue
      

      我們看看Redis內(nèi)容,發(fā)現(xiàn)新建內(nèi)容如下:

      127.0.0.1:6379> smembers _kombu.binding.asynt_exchange
      1) "asynt_routing_key\x06\x16\x06\x16asynt_queue"
      

      集合名字為:self.keyprefix_queue % (exchange,), 對于我們就為:_kombu.binding.asynt_exchange
      集合每個item為:routing_key + sep + pattern + sep + queue。我們這里sep = '\x06\x16'。

      4.2.3.2 使用

      當(dāng)發(fā)消息時候,Exchange的作用是將發(fā)送的 routing_key 轉(zhuǎn)化為 queue 的名字。這樣發(fā)送就知道發(fā)到哪個 queue 。這里的 exchange 內(nèi)容為 _kombu.binding.asynt_exchange。

      def get_table(self, exchange):
          key = self.keyprefix_queue % exchange
          with self.conn_or_acquire() as client:
              values = client.smembers(key)
              if not values:
                  raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key))
              return [tuple(bytes_to_str(val).split(self.sep)) for val in values]
      

      得到的集合內(nèi)容為:

      {b'asynt_routing_key\x06\x16\x06\x16asynt_queue'}

      即從 exchange 得到 routing_key ---> queue 的規(guī)則,然后再依據(jù) routing_key 得到 queue。就知道 Consumer 和 Producer 需要依據(jù)哪個 queue 交換消息。

      邏輯如下:

                                        +---------------------------------+
                                        |         exchange                |
                                        |                                 |
                       1 routing_key x  |                                 |
      +----------+                      |                                 |      +------------+
      | Producer |  +-----------------> |   routing_key x --->  queue x   |      |  Consumer  |
      +--------+-+                      |                                 |      +------------+
               |                        |   routing_key y --->  queue y   |
               |                        |                                 |           ^
               |                        |   routing_key z --->  queue z   |           |
               |                        |                                 |           |
               |                        +---------------------------------+           |
               |                                                                      |
               |                                                                      |
               |                                                                      |
               |                                                                      |
               |                                                                      |
               |                                                                      |
               |                                                                      |
               |                                                                      |
               |                                  +-----------+                       |
               |        2 message                 |           |        3 message      |
               +------------------------------->  |  queue X  |  +--------------------+
                                                  |           |
                                                  +-----------+
      
      

      因此,此時總體邏輯如下圖:

      +----------------------+               +-------------------+
      | Consumer             |               | Channel           |
      |                      |               |                   |        +-----------------------------------------------------------+
      |                      |               |    client  +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
      |      channel  +--------------------> |                   |        +-----------------------------------------------------------+
      |                      |               |    pool           |
      |                      |   +---------> |                   | <------------------------------------------------------------+
      |      queues          |   |           |                   |                                                              |
      |                      |   |    +----> |    connection +---------------+                                                  |
      |        |             |   |    |      |                   |           |                                                  |
      +----------------------+   |    |      +-------------------+           |                                                  |
               |                 |    |                                      v                                                  |
               |                 |    |      +-------------------+       +---+-----------------+       +--------------------+   |
               |                 |    |      | Connection        |       | redis.Transport     |       | MultiChannelPoller |   |
               |                 |    |      |                   |       |                     |       |                    |   |
               |                 |    |      |                   |       |                     |       |     _channels +--------+
               |                 |    |      |                   |       |        cycle +------------> |     _fd_to_chan    |
               |                 |    |      |     transport +---------> |                     |       |     _chan_to_sock  |
               |       +-------->+    |      |                   |       |                     |    +------+ poller         |
               |       |              |      +-------------------+       +---------------------+    |  |     after_read     |
               |       |              |                                                             |  |                    |
               |       |              |                                                             |  +--------------------+
               |       |              |      +------------------+                   +---------------+
               |       |              |      | Hub              |                   |
               |       |              |      |                  |                   v
               |       |              |      |                  |            +------+------+
               |       |              |      |      poller +---------------> | _poll       |
               |       |              |      |                  |            |             |         +-------+
               |       |              |      |                  |            |    _poller+---------> |  poll |
               v       |              |      +------------------+            |             |         +-------+
                       |              |                                      +-------------+
          +-------------------+       |      +----------------+
          | Queue      |      |       |      | Exchange       |
          |      _chann+l     |       +----+ |                |
          |                   |              |                |
          |      exchange +----------------> |     channel    |
          |                   |              |                |
          |                   |              |                |
          +-------------------+              +----------------+
      
      

      手機(jī)如下:

      現(xiàn)在我們知道:

      • Consumers:接受消息的抽象類,consumer需要聲明一個queue,并將queue與指定的exchange綁定,然后從queue里面接收消息。
      • Exchange:MQ 路由,消息發(fā)送者將消息發(fā)至Exchange,Exchange負(fù)責(zé)將消息分發(fā)至隊列。
      • Queue:對應(yīng)的 queue 抽象,存儲著即將被應(yīng)用消費掉的消息,Exchange負(fù)責(zé)將消息分發(fā)Queue,消費者從Queue接收消息;
      • Channel:與AMQP中概念類似,可以理解成共享一個Connection的多個輕量化連;

      于是邏輯鏈已經(jīng)形成,大約是這樣的,后文完善:

      • Producer發(fā)送消息到Exchange;
      • Exchange中有成員變量Channel,也有成員變量Queues。
      • 于是Exchange負(fù)責(zé)通過Channel將消息分發(fā)至Queue,Exchange的作用只是將發(fā)送的 routing_key 轉(zhuǎn)化為 queue 的名字。
      • Consumer去Queue取消息;

      邏輯大致通了,但是缺少動態(tài)操作完成此邏輯,我們將在后續(xù)完善動態(tài)邏輯。

      0x05 完善聯(lián)系

      在init之后,第二步會完善聯(lián)系。

      python的上下文管理。在python中實現(xiàn)了__enter__和__exit__方法,即支持上下文管理器協(xié)議。上下文管理器就是支持上下文管理器協(xié)議的對象,它是為了with而生。當(dāng)with語句在開始運行時,會在上下文管理器對象上調(diào)用 enter 方法。with語句運行結(jié)束后,會在上下文管理器對象上調(diào)用 exit 方法。

      所以這里是調(diào)用__enter__,即 consumer 函數(shù),其目的如下:

      • 調(diào)用Channel繼續(xù)處理,ChannelConsumer標(biāo)簽,Consumer要消費的隊列,以及標(biāo)簽與隊列的映射關(guān)系都記錄下來,等待循環(huán)調(diào)用。
      • 另外,還通過Transport將隊列與回調(diào)函數(shù)列表的映射關(guān)系記錄下來,以便于從隊列中取出消息后執(zhí)行回調(diào)函數(shù)。
      class Consumer:
          """Message consumer.
      
          Arguments:
              channel (kombu.Connection, ChannelT): see :attr:`channel`.
              queues (Sequence[kombu.Queue]): see :attr:`queues`.
              no_ack (bool): see :attr:`no_ack`.
              auto_declare (bool): see :attr:`auto_declare`
              callbacks (Sequence[Callable]): see :attr:`callbacks`.
              on_message (Callable): See :attr:`on_message`
              on_decode_error (Callable): see :attr:`on_decode_error`.
              prefetch_count (int): see :attr:`prefetch_count`.
          """
      
          def __enter__(self):
              self.consume()
              return self
      

      5.1 遍歷Queue

      使用_basic_consume方法處理Consumer相關(guān)的隊列列表中的每一項,其中處理最后一個Queue時設(shè)置標(biāo)志nowait=False

      def consume(self, no_ack=None):
          """Start consuming messages.
      
          Can be called multiple times, but note that while it
          will consume from new queues added since the last call,
          it will not cancel consuming from removed queues (
          use :meth:`cancel_by_queue`).
      
          Arguments:
              no_ack (bool): See :attr:`no_ack`.
          """
          queues = list(self._queues.values())
          if queues:
              no_ack = self.no_ack if no_ack is None else no_ack
      
              H, T = queues[:-1], queues[-1]
              for queue in H:
                  self._basic_consume(queue, no_ack=no_ack, nowait=True)
              self._basic_consume(T, no_ack=no_ack, nowait=False)
      

      _basic_consume方法代碼如下:

      是將消費者標(biāo)簽以及回調(diào)函數(shù)傳給Queueconsume方法。

      def _basic_consume(self, queue, consumer_tag=None,
                         no_ack=no_ack, nowait=True):
          tag = self._active_tags.get(queue.name)
          if tag is None:
              tag = self._add_tag(queue, consumer_tag)
              queue.consume(tag, self._receive_callback,
                            no_ack=no_ack, nowait=nowait)
          return tag
      

      5.2 consume in Queue

      對于每一個 queue,都會調(diào)用其 consume 函數(shù)。

      Queueconsume方法代碼:

      class Queue(MaybeChannelBound):
      
        def consume(self, consumer_tag='', callback=None,
                    no_ack=None, nowait=False):
            """Start a queue consumer.
      
            Consumers last as long as the channel they were created on, or
            until the client cancels them.
      
            Arguments:
                consumer_tag (str): Unique identifier for the consumer.
                    The consumer tag is local to a connection, so two clients
                    can use the same consumer tags. If this field is empty
                    the server will generate a unique tag.
      
                no_ack (bool): If enabled the broker will automatically
                    ack messages.
      
                nowait (bool): Do not wait for a reply.
      
                callback (Callable): callback called for each delivered message.
            """
            if no_ack is None:
                no_ack = self.no_ack
            return self.channel.basic_consume(
                queue=self.name,
                no_ack=no_ack,
                consumer_tag=consumer_tag or '',
                callback=callback,
                nowait=nowait,
                arguments=self.consumer_arguments)
      

      前面提到,queue與channel已經(jīng)聯(lián)系了起來。

      每一個 Consumer 初始化的時候都是和 Channel 綁定的,也就是說我們 Consumer 包含了 Queue 也就和 Connection 關(guān)聯(lián)起來了!

      Consumer 消費消息是通過 Queue 來消費,然后 Queue 又轉(zhuǎn)嫁給 Channel。

      5.3 consume in Channel

      因此又回到了Channel,就是Channelbasic_consume代碼:

      調(diào)用到基類basic_consume方法。

      class Channel(virtual.Channel):
      
          def basic_consume(self, queue, *args, **kwargs):
              
              if queue in self._fanout_queues:
                  exchange, _ = self._fanout_queues[queue]
                  self.active_fanout_queues.add(queue)
                  self._fanout_to_queue[exchange] = queue
                  
              ret = super().basic_consume(queue, *args, **kwargs)
      
              # Update fair cycle between queues.
              #
              # We cycle between queues fairly to make sure that
              # each queue is equally likely to be consumed from,
              # so that a very busy queue will not block others.
              #
              # This works by using Redis's `BRPOP` command and
              # by rotating the most recently used queue to the
              # and of the list.  See Kombu github issue #166 for
              # more discussion of this method.
              self._update_queue_cycle()
              return ret
      

      基類是 virtual.Channel,其作用是:

      ChannelConsumer標(biāo)簽,Consumer要消費的隊列,以及標(biāo)簽與隊列的映射關(guān)系都記錄下來,等待循環(huán)調(diào)用。另外,還通過Transport將隊列與回調(diào)函數(shù)列表的映射關(guān)系記錄下來,以便于從隊列中取出消息后執(zhí)行回調(diào)函數(shù)。

      變量是:

      • _tag_to_queue:標(biāo)簽與隊列的映射關(guān)系;
      • _active_queues:Consumer要消費的隊列;
      • _consumers:Consumer標(biāo)簽;
      • connection:Transport
      • connection._callbacks:隊列與回調(diào)函數(shù)列表的映射關(guān)系;

      數(shù)值如下:

      self._tag_to_queue = {dict: 1} {'None1': 'asynt'}
      self._active_queues = {list: 1} ['asynt']
      self._consumers = {set: 1} {'None1'}
      self.connection = {Transport} <kombu.transport.redis.Transport object at 0x7fb3ee0155f8>
      self.connection._callbacks = {dict: 1} {'asynt': <function Channel.basic_consume.<locals>._callback at 0x7fb3ecd4a2f0>}
      

      代碼如下:

      def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
          """Consume from `queue`."""
          self._tag_to_queue[consumer_tag] = queue
          self._active_queues.append(queue)
      
          def _callback(raw_message):
              message = self.Message(raw_message, channel=self)
              if not no_ack:
                  self.qos.append(message, message.delivery_tag)
              return callback(message)
      
          self.connection._callbacks[queue] = _callback
          self._consumers.add(consumer_tag)
      
          self._reset_cycle()
      

      _reset_cycle 代碼如下,看起來就是調(diào)用了 FairCycle,實際上沒有用到,因為cycle已經(jīng)有預(yù)設(shè)。cycle是一個MultiChannelPoller實例。

      def _reset_cycle(self):
          self._cycle = FairCycle(
              self._get_and_deliver, self._active_queues, Empty)
      

      具體如下圖:

      +----------+    +-------+     +---------+
      | Consumer |    | Queue |     | Channel |
      +----+-----+    +---+---+     +-----+---+
           |              |               |
           |              |               |
      __enter__           |               |
           |              |               |
           |              |               |
       consume            |               |
           |              |               |
           |              |               |
      _basic_consume      |               |
           |              |               |
           |              |               |
           |   consume    |               |
           +------------> |               |
           |              | basic_consume |
           |              |               |
           |              | +-----------> |
           |              |               |
           |              |               |
           |              |          _reset_cycle
           |              |               |
           |              |               |
           |              |               |
           |              |               |
           |              |               |
           v              v               v
      
      

      0x06 消費消息

      為了更好的分析,我們暫時注銷hub,使用drain_events消費消息,這樣更直觀。

      就是說,Consumer 已經(jīng)和 Channel 聯(lián)系起來,知道讀取redis 中的哪個key。但是現(xiàn)在缺少一個讀取消息的引擎。這個引擎可以驅(qū)動消息讀取,每次有消息,就調(diào)用 consumer 中的回調(diào)函數(shù)來處理消息

      在沒有引擎的情況下,drain_events 就可以起到引擎的作用

      with Consumer(conn, [queue], on_message=on_message):
          send_message(conn)
          # hub.timer.call_repeatedly(3, p_message)
          # hub.run_forever()
          conn.drain_events(timeout=1)
      

      6.1 drain_events in Connection

      drain_events 調(diào)用 Connection 的方法來進(jìn)行消費。

      def drain_events(self, **kwargs):
          """Wait for a single event from the server.
      
          Arguments:
              timeout (float): Timeout in seconds before we give up.
          """
          return self.transport.drain_events(self.connection, **kwargs)
      

      6.2 drain_events in Transport

      在 Transport中的drain_events ,是在無限執(zhí)行get(self._deliver, timeout=timeout)

      getself.cycle的一個方法,cycle是一個MultiChannelPoller實例:

      所以get<bound method MultiChannelPoller.get of <kombu.transport.redis.MultiChannelPoller object at 0x7feab312b358>>

      def drain_events(self, connection, timeout=None):
          time_start = monotonic()
          get = self.cycle.get
          polling_interval = self.polling_interval
          if timeout and polling_interval and polling_interval > timeout:
              polling_interval = timeout
          while 1:
              try:
                  get(self._deliver, timeout=timeout)
              except Empty:
                  if timeout is not None and monotonic() - time_start >= timeout:
                      raise socket.timeout()
                  if polling_interval is not None:
                      sleep(polling_interval)
              else:
                  break
      

      6.3 get in MultiChannelPoller

      Transport相關(guān)聯(lián)的每一個channel都要執(zhí)行drain_events。具體分兩步:

      • 對于每一個channel都注冊;

      • 進(jìn)行poll;

      代碼如下:

      def get(self, callback, timeout=None):
          self._in_protected_read = True
          try:
              for channel in self._channels:
                  if channel.active_queues:           # BRPOP mode?
                      if channel.qos.can_consume():
                          self._register_BRPOP(channel)
                  if channel.active_fanout_queues:    # LISTEN mode?
                      self._register_LISTEN(channel)
      
              events = self.poller.poll(timeout)
              if events:
                  for fileno, event in events:
                      ret = self.handle_event(fileno, event)
                      if ret:
                          return
              # - no new data, so try to restore messages.
              # - reset active redis commands.
              self.maybe_restore_messages()
              raise Empty()
          finally:
              self._in_protected_read = False
              while self.after_read:
                  try:
                      fun = self.after_read.pop()
                  except KeyError:
                      break
                  else:
                      fun()
      

      6.3.1 _register_BRPOP in MultiChannelPoller

      具體注冊如下,我們先來看看 _register_BRPOP,這里做了兩個判斷,第一個是判斷當(dāng)前的 channel 是否放進(jìn)了 epoll 模型里面,如果沒有,那么就放進(jìn)去;同時,如果之前這個 channel 不在 epoll 里面,那么這次放進(jìn)去了。

      def _register_BRPOP(self, channel):
          """Enable BRPOP mode for channel."""
          ident = channel, channel.client, 'BRPOP'
          if not self._client_registered(channel, channel.client, 'BRPOP'):
              channel._in_poll = False
              self._register(*ident)
          if not channel._in_poll:  # send BRPOP
              channel._brpop_start()
      

      6.3.2 register in _poll

      最終進(jìn)行Poll注冊,這樣當(dāng)redis的socket對應(yīng)的fd有消息,就會進(jìn)行處理。

      變量如下:<kombu.utils.eventio._poll object at 0x7feab2d7d780>

      def register(self, fd, events):
          fd = fileno(fd)
          poll_flags = 0
          if events & ERR:
              poll_flags |= POLLERR
          if events & WRITE:
              poll_flags |= POLLOUT
          if events & READ:
              poll_flags |= POLLIN
          self._quick_register(fd, poll_flags)
          return fd
      

      6.3.3 poll(timeout) in MultiChannelPoller

      當(dāng)poll有消息,則相應(yīng)處理。

      events = self.poller.poll(timeout)
      if events:
          for fileno, event in events:
              ret = self.handle_event(fileno, event)
              if ret:
                  return
      

      6.3.4 注冊到redis驅(qū)動,負(fù)載均衡

      但是,這個 connection 還沒有對 epoll 起效果,所以發(fā)送一個 _brpop_start

      這里可以看到,是對 asynt_queue 發(fā)起了監(jiān)聽請求,也就是說隊列有消息過來,會被響應(yīng)到。

      變量如下:

      keys = {list: 5} ['asynt_queue', 'asynt_queue\x06\x163', 'asynt_queue\x06\x166', 'asynt_queue\x06\x169', 1]
      queues = {list: 1} ['asynt_queue']
      

      代碼如下:

      def _brpop_start(self, timeout=1):
          queues = self._queue_cycle.consume(len(self.active_queues))
          if not queues:
              return
          keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps
                  for queue in queues] + [timeout or 0]
          self._in_poll = self.client.connection
          self.client.connection.send_command('BRPOP', *keys)
      

      此處有一個負(fù)載均衡需要說明:

      _queue_cycle屬于均衡策略,就是選擇下一次哪個queue的策略,items就是具體queue列表。比如:

      class round_robin_cycle:
          """Iterator that cycles between items in round-robin."""
      
          def __init__(self, it=None):
              self.items = it if it is not None else []
      
          def update(self, it):
              """Update items from iterable."""
              self.items[:] = it
      
          def consume(self, n):
              """Consume n items."""
              return self.items[:n]
      

      _brpop_start就是啟動了下一次讀取,選擇哪一個queue

      consume, scheduling.py:79
      _brpop_start, redis.py:725
      _register_BRPOP, redis.py:314
      on_poll_start, redis.py:328
      on_poll_start, redis.py:1072
      create_loop, hub.py:294
      run_once, hub.py:193
      run_forever, hub.py:185
      main, testUb.py:49
      <module>, testUb.py:53
      

      6.3.4 handle_event in MultiChannelPoller

      因為已經(jīng)把 file 和 poll 聯(lián)系起來,所以對調(diào)用對應(yīng)的響應(yīng)方法,而響應(yīng)方法會進(jìn)行read消息。

      def handle_event(self, fileno, event):
          if event & READ:
              return self.on_readable(fileno), self
          elif event & ERR:
              chan, type = self._fd_to_chan[fileno]
              chan._poll_error(type)
      

      6.3.5 on_readable in MultiChannelPoller

      這里聽說 Redis 已經(jīng)準(zhǔn)備好了,所以就來獲取拿到的結(jié)果,然后就解析起來了,解析成功之后,自然要處理這個消息呀,于是乎又回到了這里 redis.py

      提取fd對應(yīng)的channel的響應(yīng)方法如下:

      def on_readable(self, fileno):
          chan, type = self._fd_to_chan[fileno]
          if chan.qos.can_consume():
              chan.handlers[type]()
      

      6.3.6 _brpop_read in Channel

      前面對chan.handlers已經(jīng)進(jìn)行了注冊。

      handlers = {dict: 2} 
       'BRPOP' = {method} <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7fbad4170f28>>
       'LISTEN' = {method} <bound method Channel._receive of <kombu.transport.redis.Channel object at 0x7fbad4170f28>>
      

      因此調(diào)用_brpop_read。

      def _brpop_read(self, **options):
          try:
              try:
                  dest__item = self.client.parse_response(self.client.connection,
                                                          'BRPOP',
                                                          **options)
              except self.connection_errors:
                  # if there's a ConnectionError, disconnect so the next
                  # iteration will reconnect automatically.
                  self.client.connection.disconnect()
                  raise
              if dest__item:
                  dest, item = dest__item
                  dest = bytes_to_str(dest).rsplit(self.sep, 1)[0]
                  self._queue_cycle.rotate(dest)
                  self.connection._deliver(loads(bytes_to_str(item)), dest)
                  return True
              else:
                  raise Empty()
          finally:
              self._in_poll = None
      

      6.3.7 從redis讀取

      這里會從redis驅(qū)動讀取,文件/redis/connection.py,從SocketBuffer讀取。

      代碼為:

      def readline(self):
          buf = self._buffer
          buf.seek(self.bytes_read)
          data = buf.readline()
          while not data.endswith(SYM_CRLF):
              # there's more data in the socket that we need
              self._read_from_socket()
              buf.seek(self.bytes_read)
              data = buf.readline()
      
          self.bytes_read += len(data)
      
          # purge the buffer when we've consumed it all so it doesn't
          # grow forever
          if self.bytes_read == self.bytes_written:
              self.purge()
      
          return data[:-2]
      

      當(dāng)讀到 response 之后,調(diào)用 Redis驅(qū)動中對應(yīng)命令的 回調(diào)方法來處理。此處命令為BRPOP。回調(diào)方法為:string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None)

      代碼為:

      def parse_response(self, connection, command_name, **options):
          "Parses a response from the Redis server"
          try:
              response = connection.read_response()
          except ResponseError:
              if EMPTY_RESPONSE in options:
                  return options[EMPTY_RESPONSE]
              raise
          if command_name in self.response_callbacks:
              return self.response_callbacks[command_name](response, **options)
          return response
      

      變量為:

      command_name = {str} 'BRPOP'
      connection = {Connection} Connection<host=localhost,port=6379,db=0>
      options = {dict: 0} {}
      self = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
       connection = {Connection} Connection<host=localhost,port=6379,db=0>
       connection_pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
       response_callbacks = {CaseInsensitiveDict: 179} {.
        'LPUSH' = {function} <function Redis.<lambda> at 0x7fbad4276ea0>
        'RPUSH' = {function} <function Redis.<lambda> at 0x7fbad4276ea0>
        'SORT' = {function} <function sort_return_tuples at 0x7fbad4275f28>
        'ZSCORE' = {function} <function float_or_none at 0x7fbad4276598>
        'ZINCRBY' = {function} <function float_or_none at 0x7fbad4276598>
        'BLPOP' = {function} <function Redis.<lambda> at 0x7fbad4276f28>
        'BRPOP' = {function} <function Redis.<lambda> at 0x7fbad4276f28>
         ....
      

      這些代碼堆棧如下:

      readline, connection.py:251
      read_response, connection.py:324
      read_response, connection.py:739
      parse_response, client.py:915
      _brpop_read, redis.py:738
      on_readable, redis.py:358
      handle_event, redis.py:362
      get, redis.py:380
      drain_events, base.py:960
      drain_events, connection.py:318
      main, testUb.py:50
      <module>, testUb.py:53
      

      6.3.8 回到_brpop_read

      從Redis驅(qū)動獲得消息后,回到了 _brpop_read,信息如下:

      dest__item = {tuple: 2} 
       0 = {bytes: 11} b'asynt_queue'
       1 = {bytes: 321} b'{"body": "aGVsbG8gd29ybGQ=", "content-encoding": "utf-8", "content-type": "text/plain", "headers": {}, "properties": {"delivery_mode": 2, "delivery_info": {"exchange": "asynt_exchange", "routing_key": "asynt_routing_key"}, "priority": 0, "body_encoding":
      

      6.3.9 _deliver in Transport

      當(dāng)獲得消息之后,會取出對應(yīng)queue的callback,進(jìn)行調(diào)用。

      變量如下:<kombu.transport.redis.Transport object at 0x7feab30f25c0>

      def _deliver(self, message, queue):
          try:
              callback = self._callbacks[queue]
          except KeyError:
              logger.warning(W_NO_CONSUMERS, queue)
              self._reject_inbound_message(message)
          else:
              callback(message)
      

      6.3.10 basic_consume in Channel

      代碼繼續(xù)走到 basic_consume

      <kombu.transport.redis.Channel object at 0x7feab3235f28>

      def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
          """Consume from `queue`."""
          self._tag_to_queue[consumer_tag] = queue
          self._active_queues.append(queue)
      
          def _callback(raw_message):
              message = self.Message(raw_message, channel=self)
              if not no_ack:
                  self.qos.append(message, message.delivery_tag)
              return callback(message)
      
          self.connection._callbacks[queue] = _callback
          self._consumers.add(consumer_tag)
      
          self._reset_cycle()
      

      6.3.11 _receive_callback in Consumer

      上文的 _callback 就是 _receive_callback in Consumer,所以這時候就調(diào)用過去。

      <Consumer: [<Queue asynt -> <Exchange asynt(direct) bound to chan:1> -> asynt bound to chan:1>]>

      def _receive_callback(self, message):
          accept = self.accept
          on_m, channel, decoded = self.on_message, self.channel, None
          try:
              m2p = getattr(channel, 'message_to_python', None)
              if m2p:
                  message = m2p(message)
              if accept is not None:
                  message.accept = accept
              if message.errors:
                  return message._reraise_error(self.on_decode_error)
              decoded = None if on_m else message.decode()
          except Exception as exc:
              if not self.on_decode_error:
                  raise
              self.on_decode_error(message, exc)
          else:
              return on_m(message) if on_m else self.receive(decoded, message)
      

      最終調(diào)用用戶方法。

      on_message, testUb.py:36
      _receive_callback, messaging.py:620
      _callback, base.py:630
      _deliver, base.py:980
      _brpop_read, redis.py:748
      on_readable, redis.py:358
      handle_event, redis.py:362
      get, redis.py:380
      drain_events, base.py:960
      drain_events, connection.py:318
      main, testUb.py:50
      <module>, testUb.py:53
      

      具體如下:

      +----------+ +---------+ +------------------+    +------+ +---------+    +-----+  +---------+
      |Connection| |Transport| |MultiChannelPoller|    |_poll | | Channel |    |redis|  |Consumer |
      +----+-----+ +------+--+ +------------+-----+    +----+-+ +-----+---+    +--+--+  +---+-----+
           |              |                 |               |         |           |         |
           +              |                 |               |         |           |         |
        drain_events      |                 |               |         |           |         |
           +              +                 |               |         |           |         |
           +------->  drain_events          |               |         |           |         |
           |              +                 +               |         |           |         |
           |              | +------------> get              |         |           |         |
           |              |                 +               |         |           |         |
           |              |                 +               |         |           |         |
           |              |              _register_BRPOP    |         |           |         |
           |              |                 +               +         |           |         |
           |              |                 | +-----------> register  |           |         |
           |              |                 |               +         |           |         |
           |              |                 +               |         |           |         |
           |              |               poll              |         |           |         |
           |              |                 +               |         |           |         |
           |              |                 +               |         |           |         |
           |              |             handle_event        |         |           |         |
           |              |                 +               |         |           |         |
           |              |                 +               |         |           |         |
           |              |            on_readable          |         |           |         |
           |              |                 +               |         +           |         |
           |              |                 |  +----------------->_brpop_read     |         |
           |              |                 |               |         +           |         |
           |              +                 |               |         +---------> |         |
           |          _deliver  <-------------------------------------+           |         |
           |              +                 |               |         |           |         |
           |              |                 |               |         |           |         |
           |              |                 |               |         |           |         |
           |              | +----------------------------------> basic|consume    |         |
           |              |                 |               |         |           |         |
           |              |                 |               |         +---------> |         |
           |              |                 |               |         |           |         |
           |              |                 |               |         |           |         |
           |              |                 |               |         |           v         |
           |              |                 |               |         |                     |
           |              |                 |               |         |          _receive_ca|lback
           |              |                 |               |         |                     |
           v              v                 v               v         v                     |
                                                                                            v
      
      

      從上圖可以看出模塊的用途。

      手機(jī)上如圖

      至此,我們已經(jīng)完成了 Consumer 的分析,下文我們進(jìn)行 Producer 的分析。

      0xFF 參考

      celery 7 優(yōu)秀開源項目kombu源碼分析之registry和entrypoint

      (二)放棄pika,選擇kombu

      kombu消息框架<二>

      AMQP中的概念

      AMQP的基本概念

      深入理解AMQP協(xié)議

      kombu和消息隊列總結(jié)

      關(guān)于epoll版服務(wù)器的理解(Python實現(xiàn))

      celery源碼解讀

      Kombu源碼分析(一)概述

      posted @ 2021-03-09 15:52  羅西的思考  閱讀(1205)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 国产成人啪精品视频免费软件| 影音先锋男人站| 波多野结衣久久一区二区| 精品亚洲国产成人| 国产欧亚州美日韩综合区| 类乌齐县| 成人3D动漫一区二区三区| 韩国午夜理伦三级| 少妇人妻精品无码专区视频| 国产l精品国产亚洲区| 亚洲人午夜射精精品日韩| 日夜啪啪一区二区三区| 国产综合色产在线视频欧美| 国产高清国产精品国产专区| 国产精品亚洲精品日韩已满十八小| 99精品国产在热久久婷婷| 国产无套内射又大又猛又粗又爽| 国产久久热这里只有精品| 中文字幕日韩有码av| 99网友自拍视频在线| 国产精品多p对白交换绿帽| 国产成人精品中文字幕| 亚洲精品岛国片在线观看| 国产JJIZZ女人多水喷水| 强奷漂亮雪白丰满少妇av| 蜜臀午夜一区二区在线播放| 天堂av网一区二区三区| 小嫩模无套内谢第一次| 国产欧美综合在线观看第十页| 蜜桃一区二区三区在线看| 国产一区二区三区禁18| 999精品全免费观看视频| 兴业县| 亚洲精品宾馆在线精品酒店| 亚洲第一国产综合| 成人无码午夜在线观看| 久久精品亚洲精品国产色婷 | 天堂在/线中文在线资源 官网| 久久综合国产精品一区二区| 国产不卡免费一区二区| 亚洲精品一区二区麻豆|