<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      [源碼分析] 消息隊列 Kombu 之 啟動過程

      [源碼分析] 消息隊列 Kombu 之 啟動過程

      0x00 摘要

      本系列我們介紹消息隊列 Kombu。Kombu 的定位是一個兼容 AMQP 協議的消息隊列抽象。通過本文,大家可以了解 Kombu 是如何啟動,以及如何搭建一個基本的架子。

      因為之前有一個綜述,所以大家會發現,一些概念講解文字會同時出現在后續文章和綜述之中。

      0x01 示例

      下面使用如下代碼來進行說明。

      本示例來自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感謝。

      def main(arguments):
          hub = Hub()
          exchange = Exchange('asynt_exchange')
          queue = Queue('asynt_queue', exchange, 'asynt_routing_key')
      
          def send_message(conn):
              producer = Producer(conn)
              producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')
              print('message sent')
      
          def on_message(message):
              print('received: {0!r}'.format(message.body))
              message.ack()
              # hub.stop()  # <-- exit after one message
      
          conn = Connection('redis://localhost:6379')
          conn.register_with_event_loop(hub)
      
          def p_message():
              print(' kombu ')
      
          with Consumer(conn, [queue], on_message=on_message):
              send_message(conn)
              hub.timer.call_repeatedly(3, p_message)
              hub.run_forever()
      
      if __name__ == '__main__':
          sys.exit(main(sys.argv[1:]))
      

      0x02 啟動

      讓我們順著程序流程看看Kombu都做了些什么,也可以對 Kombu 內部有所了解。

      本文關注的重點是:Connection,Channel 和 Hub 是如何聯系在一起的。

      2.1 Hub

      在程序開始,我們建立了Hub。

      Hub的作用是建立消息Loop,但是此時尚未建立,因此只是一個靜態實例。

      hub = Hub()
      

      其定義如下:

      class Hub:
          """Event loop object.
          Arguments:
              timer (kombu.asynchronous.Timer): Specify custom timer instance.
          """
          def __init__(self, timer=None):
              self.timer = timer if timer is not None else Timer()
      
              self.readers = {}
              self.writers = {}
              self.on_tick = set()
              self.on_close = set()
              self._ready = set()
      
              self._running = False
              self._loop = None
      
              self.consolidate = set()
              self.consolidate_callback = None
      
              self.propagate_errors = ()
              self._create_poller()
      

      因為此時沒有建立loop,所以目前重要的步驟是建立Poll,其Stack如下:

      _get_poller, eventio.py:321
      poll, eventio.py:328
      _create_poller, hub.py:113
      __init__, hub.py:96
      main, testUb.py:22
      <module>, testUb.py:55
      

      在eventio.py中有如下,我們可以看到Kombu可以使用多種模型來進行內核消息處理:

      def _get_poller():
          if detect_environment() != 'default':
              # greenlet
              return _select
          elif epoll:
              # Py2.6+ Linux
              return _epoll
          elif kqueue and 'netbsd' in sys.platform:
              return _kqueue
          elif xpoll:
              return _poll
          else:
              return _select
      

      因為本機情況,這里選擇的是:_poll。

      +------------------+
      | Hub              |
      |                  |
      |                  |            +-------------+
      |      poller +---------------> | _poll       |
      |                  |            |             |         +-------+
      |                  |            |    _poller+---------> |  poll |
      +------------------+            |             |         +-------+
                                      +-------------+
      

      2.2 Exchange與Queue

      其次建立了Exchange與Queue。

      • Exchange:交換機,消息發送者將消息發至 Exchange,Exchange 負責將消息分發至 Queue;
      • Queue:消息隊列,存儲著即將被應用消費掉的消息,Exchange 負責將消息分發 Queue,消費者從 Queue 接收消息;

      因為此時也沒有具體消息,所以我們暫且無法探究Exchange機制。

      exchange = Exchange('asynt')
      queue = Queue('asynt', exchange, 'asynt')
      

      此時將把Exchange與Queue聯系起來。圖示如下:

      +------------------+
      | Hub              |
      |                  |
      |                  |            +-------------+
      |      poller +---------------> | _poll       |
      |                  |            |             |         +-------+
      |                  |            |    _poller+---------> |  poll |
      +------------------+            |             |         +-------+
                                      +-------------+
      
      
      +----------------+         +-------------------+
      | Exchange       |         | Queue             |
      |                |         |                   |
      |                |         |                   |
      |     channel    | <------------+ exchange     |
      |                |         |                   |
      |                |         |                   |
      +----------------+         +-------------------+
      

      2.3 Connection

      第三步就是建立Connection。

      Connection是對 MQ 連接的抽象,一個 Connection 就對應一個 MQ 的連接。現在就是對'redis://localhost:6379'連接進行抽象。

      conn = Connection('redis://localhost:6379')
      

      2.3.1 定義

      由定義注釋可知,Connection是到broker的連接。從具體代碼可以看出,Connection更接近是一個邏輯概念,具體功能都委托給別人完成。

      消息從來不直接發送給隊列,甚至 Producers 都可能不知道隊列的存在。 Producer如何才能將消息發送給Consumer呢?這中間需要經過 Message Broker 的處理和傳遞。

      AMQP中,承擔 Message Broker 功能的就是 AMQP Server。也正是從這個角度講,AMQP 的 Producer 和Consumer 都是 AMQP Client。

      在Kombu 體系中,用 transport 對所有的 broker 進行了抽象,為不同的 broker 提供了一致的解決方案。通過Kombu,開發者可以根據實際需求靈活的選擇或更換broker。

      Connection主要成員變量是,但是此時沒有賦值:

      • _connection:
      • _transport:就是上面提到的對 broker 的抽象。
      • cycle:與broker交互的調度策略。
      • failover_strategy:在連接失效時,選取其他hosts的策略。
      • heartbeat:用來實施心跳。

      代碼如下:

      class Connection:
          """A connection to the broker"""
      
          port = None
          virtual_host = '/'
          connect_timeout = 5
      
          _connection = None
          _default_channel = None
          _transport = None
          uri_prefix = None
      
          #: The cache of declared entities is per connection,
          #: in case the server loses data.
          declared_entities = None
      
          #: Iterator returning the next broker URL to try in the event
          #: of connection failure (initialized by :attr:`failover_strategy`).
          cycle = None
      
          #: Additional transport specific options,
          #: passed on to the transport instance.
          transport_options = None
      
          #: Strategy used to select new hosts when reconnecting after connection
          #: failure.  One of "round-robin", "shuffle" or any custom iterator
          #: constantly yielding new URLs to try.
          failover_strategy = 'round-robin'
      
          #: Heartbeat value, currently only supported by the py-amqp transport.
          heartbeat = None
      
          resolve_aliases = resolve_aliases
          failover_strategies = failover_strategies
      
          hostname = userid = password = ssl = login_method = None
      

      2.3.2 init 與 transport

      Connection內部主要任務是建立了transport。

      Stack大致如下:

      Transport, redis.py:1039
      <module>, redis.py:1031
      import_module, __init__.py:126
      symbol_by_name, imports.py:56
      resolve_transport, __init__.py:70
      get_transport_cls, __init__.py:85
      __init__, connection.py:183
      main, testUb.py:40
      <module>, testUb.py:55
      

      2.4 Transport

      在Kombu體系中,用transport對所有的broker進行了抽象,為不同的broker提供了一致的解決方案。通過Kombu,開發者可以根據實際需求靈活的選擇或更換broker。

      Transport:真實的 MQ 連接,也是真正連接到 MQ(redis/rabbitmq) 的實例。就是存儲和發送消息的實體,用來區分底層消息隊列是用amqp、Redis還是其它實現的。

      Transport負責具體操作,但是很多操作移交給 loop 與 MultiChannelPoller 進行。

      2.4.1 定義

      其主要成員變量為:

      • 本transport的驅動類型,名字;
      • 對應的 Channel;
      • cycle:MultiChannelPoller,具體下文提到;

      定義如下:

      class Transport(virtual.Transport):
          """Redis Transport."""
      
          Channel = Channel
      
          polling_interval = None  # disable sleep between unsuccessful polls.
          default_port = DEFAULT_PORT
          driver_type = 'redis'
          driver_name = 'redis'
      
          implements = virtual.Transport.implements.extend(
              asynchronous=True,
              exchange_type=frozenset(['direct', 'topic', 'fanout'])
          )
      
          def __init__(self, *args, **kwargs):
              if redis is None:
                  raise ImportError('Missing redis library (pip install redis)')
              super().__init__(*args, **kwargs)
      
              # Get redis-py exceptions.
              self.connection_errors, self.channel_errors = self._get_errors()
              # All channels share the same poller.
              self.cycle = MultiChannelPoller()
      

      2.4.2 移交操作

      Transport負責具體操作,但是很多操作移交給 loop 與 MultiChannelPoller 進行,具體從下面代碼可見。

      def register_with_event_loop(self, connection, loop):
          cycle = self.cycle
          cycle.on_poll_init(loop.poller)
          cycle_poll_start = cycle.on_poll_start
          add_reader = loop.add_reader
          on_readable = self.on_readable
      
          def _on_disconnect(connection):
              if connection._sock:
                  loop.remove(connection._sock)
          cycle._on_connection_disconnect = _on_disconnect
      
          def on_poll_start():
              cycle_poll_start()
              [add_reader(fd, on_readable, fd) for fd in cycle.fds]
              
          loop.on_tick.add(on_poll_start)
          loop.call_repeatedly(10, cycle.maybe_restore_messages)
          
          health_check_interval = connection.client.transport_options.get(
              'health_check_interval',
              DEFAULT_HEALTH_CHECK_INTERVAL
          )
          
          loop.call_repeatedly(
              health_check_interval,
              cycle.maybe_check_subclient_health
          )
      

      其中重點是MultiChannelPoller。一個Connection有一個Transport, 一個Transport有一個MultiChannelPoller,對poll操作都是由MultiChannelPoller完成,redis操作由channel完成

      2.4.3 MultiChannelPoller

      定義如下,可以理解為執行engine,主要作用是:

      • 收集channel;
      • 建立fd到channel的映射;
      • 建立channel到socks的映射;
      • 使用poll;
      class MultiChannelPoller:
          """Async I/O poller for Redis transport."""
      
          eventflags = READ | ERR
      
          def __init__(self):
              # active channels
              self._channels = set()
              # file descriptor -> channel map.
              self._fd_to_chan = {}
              # channel -> socket map
              self._chan_to_sock = {}
              # poll implementation (epoll/kqueue/select)
              self.poller = poll()
              # one-shot callbacks called after reading from socket.
              self.after_read = set()
      

      2.4.4 獲取

      Transport是預先生成的,若需要,則依據名字取得。

      TRANSPORT_ALIASES = {
          'amqp': 'kombu.transport.pyamqp:Transport',
          'amqps': 'kombu.transport.pyamqp:SSLTransport',
          'pyamqp': 'kombu.transport.pyamqp:Transport',
          'librabbitmq': 'kombu.transport.librabbitmq:Transport',
          'memory': 'kombu.transport.memory:Transport',
          'redis': 'kombu.transport.redis:Transport',
      	......
          'pyro': 'kombu.transport.pyro:Transport'
      }
      
      _transport_cache = {}
      
      
      def resolve_transport(transport=None):
          """Get transport by name. """
          if isinstance(transport, str):
              try:
                  transport = TRANSPORT_ALIASES[transport]
              except KeyError:
                  if '.' not in transport and ':' not in transport:
                      from kombu.utils.text import fmatch_best
                      alt = fmatch_best(transport, TRANSPORT_ALIASES)
              else:
                  if callable(transport):
                      transport = transport()
              return symbol_by_name(transport)
          return transport
      
      def get_transport_cls(transport=None):
          """Get transport class by name.
          """
          if transport not in _transport_cache:
              _transport_cache[transport] = resolve_transport(transport)
          return _transport_cache[transport]
      

      此時Connection數據如下,注意其部分成員變量尚且沒有意義:

      conn = {Connection} <Connection: redis://localhost:6379// at 0x7faa910cbd68>
       alt = {list: 0} []
       connect_timeout = {int} 5
       connection = {Transport} <kombu.transport.redis.Transport object at 0x7faa91277710>
       cycle = {NoneType} None
       declared_entities = {set: 0} set()
       default_channel = {Channel} <kombu.transport.redis.Channel object at 0x7faa912700b8>
       failover_strategies = {dict: 2} {'round-robin': <class 'itertools.cycle'>, 'shuffle': <function shufflecycle at 0x7faa9109a0d0>}
       failover_strategy = {type} <class 'itertools.cycle'>
       heartbeat = {int} 0
       host = {str} 'localhost:6379'
       hostname = {str} 'localhost'
       manager = {Management} <kombu.transport.virtual.base.Management object at 0x7faa91270160>
       port = {int} 6379
       recoverable_channel_errors = {tuple: 0} ()
       resolve_aliases = {dict: 2} {'pyamqp': 'amqp', 'librabbitmq': 'amqp'}
       transport = {Transport} <kombu.transport.redis.Transport object at 0x7faa91277710>
       transport_cls = {str} 'redis'
       uri_prefix = {NoneType} None
       userid = {NoneType} None
       virtual_host = {str} '/'
      

      至此,Kombu的基本就建立完成,但是彼此之間沒有建立邏輯聯系。

      所以此時示例如下,注意此時三者沒有聯系:

      +-------------------+       +---------------------+       +--------------------+
      | Connection        |       | redis.Transport     |       | MultiChannelPoller |
      |                   |       |                     |       |                    |
      |                   |       |                     |       |     _channels      |
      |                   |       |        cycle +------------> |     _fd_to_chan    |
      |     transport +---------> |                     |       |     _chan_to_sock  |
      |                   |       |                     |       |     poller         |
      +-------------------+       +---------------------+       |     after_read     |
                                                                |                    |
                                                                +--------------------+
      +------------------+
      | Hub              |
      |                  |
      |                  |            +-------------+
      |      poller +---------------> | _poll       |
      |                  |            |             |         +-------+
      |                  |            |    _poller+---------> |  poll |
      +------------------+            |             |         +-------+
                                      +-------------+
      +----------------+         +-------------------+
      | Exchange       |         | Queue             |
      |                |         |                   |
      |                |         |                   |
      |     channel    | <------------+ exchange     |
      |                |         |                   |
      |                |         |                   |
      +----------------+         +-------------------+
      

      0x03 Connection注冊hub

      之前我們提到,基本架子已經建立起來,但是各個模塊之間彼此沒有聯系,下面我們就看看如何建立聯系。

      示例代碼來到:

      conn.register_with_event_loop(hub)
      

      這里進行了注冊,此時作用是把hub與Connection聯系起來。隨之調用到:

      def register_with_event_loop(self, loop):
          self.transport.register_with_event_loop(self.connection, loop)
      

      進而調用到transport類:<kombu.transport.redis.Transport object at 0x7fd23e962dd8>

      具體代碼如下:

      def register_with_event_loop(self, connection, loop):
          cycle = self.cycle
          cycle.on_poll_init(loop.poller)# 這里建立聯系,loop就是hub
          cycle_poll_start = cycle.on_poll_start
          add_reader = loop.add_reader
          on_readable = self.on_readable
      
          def _on_disconnect(connection):
              if connection._sock:
                  loop.remove(connection._sock)
          cycle._on_connection_disconnect = _on_disconnect
      
          def on_poll_start():
              cycle_poll_start()
              [add_reader(fd, on_readable, fd) for fd in cycle.fds]
              
          loop.on_tick.add(on_poll_start)
          loop.call_repeatedly(10, cycle.maybe_restore_messages)
          
          health_check_interval = connection.client.transport_options.get(
              'health_check_interval',
              DEFAULT_HEALTH_CHECK_INTERVAL
          )
          
          loop.call_repeatedly(
              health_check_interval,
              cycle.maybe_check_subclient_health
          )
      

      3.1 建立Channel

      注冊最初是建立Channel。這里有一個連接的動作,就是在這里,建立了Channel。

      @property
      def connection(self):
          """The underlying connection object"""
          if not self._closed:
              if not self.connected:
                  return self._ensure_connection(
                      max_retries=1, reraise_as_library_errors=False
                  )
              return self._connection
      

      具體建立是在 base.py 中完成,這是 Transport 基類。Stack 如下:

      create_channel, base.py:920
      establish_connection, base.py:938
      _establish_connection, connection.py:801
      _connection_factory, connection.py:866
      retry_over_time, functional.py:325
      _ensure_connection, connection.py:439
      connection, connection.py:859
      register_with_event_loop, connection.py:266
      main, testUb.py:41
      <module>, testUb.py:55
      

      3.2 Channel

      Channel:與AMQP中概念類似,可以理解成共享一個Connection的多個輕量化連接。就是真正的連接。

      可以認為是 redis 操作和連接的封裝。每個 Channel 都可以與 redis 建立一個連接,在此連接之上對 redis 進行操作,每個連接都有一個 socket,每個 socket 都有一個 file,從這個 file 可以進行 poll。

      為了更好的說明,我們提前給出這個通訊流程大約如下:

                  +---------------------------------------------------------------------------------------------------------------------------------------+
                  |                                     +--------------+                                   6                       parse_response         |
                  |                                +--> | Linux Kernel | +---+                                                                            |
                  |                                |    +--------------+     |                                                                            |
                  |                                |                         |                                                                            |
                  |                                |                         |  event                                                                     |
                  |                                |  1                      |                                                                            |
                  |                                |                         |  2                                                                         |
                  |                                |                         |                                                                            |
          +-------+---+    socket                  +                         |                                                                            |
          |   redis   | <------------> port +-->  fd +--->+                  v                                                                            |
          |           |                                   |           +------+--------+                                                                   |
          |           |    socket                         |           |  Hub          |                                                                   |
          |           | <------------> port +-->  fd +--->----------> |               |                                                                   |
          | port=6379 |                                   |           |               |                                                                   |
          |           |    socket                         |           |     readers +----->  Transport.on_readable                                        |
          |           | <------------> port +-->  fd +--->+           |               |                     +                                             |
          +-----------+                                               +---------------+                     |                                             |
                                                                                                            |                                             |
                                                              3                                             |                                             |
                   +----------------------------------------------------------------------------------------+                                             |
                   |                                                                                                                                      v
                   |                                                                                                                                                  _receive_callback
                   |                                                                                                                            5    +-------------+                      +-----------+
      +------------+------+                     +-------------------------+                                    'BRPOP' = Channel._brpop_read +-----> | Channel     | +------------------> | Consumer  |
      |       Transport   |                     |  MultiChannelPoller     |      +------>  channel . handlers  'LISTEN' = Channel._receive           +-------------+                      +---+-------+
      |                   |                     |                         |      |                                                                                           8                |
      |                   | on_readable(fileno) |                         |      |                                                                         ^                                  |
      |           cycle +---------------------> |          _fd_to_chan +---------------->  channel . handlers  'BRPOP' = Channel._brpop_read               |                                  |
      |                   |        4            |                         |      |                             'LISTEN' = Channel._receive                 |                                  |
      |  _callbacks[queue]|                     |                         |      |                                                                         |                            on_m  |  9
      |          +        |                     +-------------------------+      +------>  channel . handlers  'BRPOP' = Channel._brpop_read               |                                  |
      +-------------------+                                                                                    'LISTEN' = Channel._receive                 |                                  |
                 |                                                                                                                                         |                                  v
                 |                                                7           _callback                                                                    |
                 +-----------------------------------------------------------------------------------------------------------------------------------------+                            User Function
      
      

      手機上如下:

      3.2.1 定義

      Channel 主要成員是:

      • async_pool :redis異步連接池;
      • pool :redis連接池;
      • channel_id :Channel ID;
      • client :就是StrictRedis之類的driver;
      • connection :對應的Transport;
      • cycle = {FairCycle} <FairCycle: 0/0 []>
      • queue_order_strategy :獲取queue的策略;
      • state :BrokerState狀態;
      • subclient :PubSub所用的client;
        keyprefix_queue = '{p}_kombu.binding.%s'.format(p=KEY_PREFIX) :bing用到的key;

      比如_get_client可以看出來client。

      def _get_client(self):
          if redis.VERSION < (3, 2, 0):
              raise VersionMismatch(
                  'Redis transport requires redis-py versions 3.2.0 or later. '
                  'You have {0.__version__}'.format(redis))
          return redis.StrictRedis
      

      簡化版定義如下:

      class Channel(virtual.Channel):
          """Redis Channel."""
      
          QoS = QoS
      
          _client = None
          _subclient = None
          keyprefix_queue = '{p}_kombu.binding.%s'.format(p=KEY_PREFIX)
          keyprefix_fanout = '/{db}.'
          sep = '\x06\x16'
          _fanout_queues = {}
          unacked_key = '{p}unacked'.format(p=KEY_PREFIX)
          unacked_index_key = '{p}unacked_index'.format(p=KEY_PREFIX)
          unacked_mutex_key = '{p}unacked_mutex'.format(p=KEY_PREFIX)
          unacked_mutex_expire = 300  # 5 minutes
          unacked_restore_limit = None
          visibility_timeout = 3600   # 1 hour
          max_connections = 10
          queue_order_strategy = 'round_robin'
      
          _async_pool = None
          _pool = None
      
          from_transport_options = (
              virtual.Channel.from_transport_options +
              ('sep',
               'ack_emulation',
               'unacked_key',
      		 ......
               'max_connections',
               'health_check_interval',
               'retry_on_timeout',
               'priority_steps')  # <-- do not add comma here!
          )
      
          connection_class = redis.Connection if redis else None
      

      3.2.2 基類

      基類定義如下:

      class Channel(AbstractChannel, base.StdChannel):
          """Virtual channel.
      
          Arguments:
              connection (ConnectionT): The transport instance this
                  channel is part of.
          """
      
          #: message class used.
          Message = Message
      
          #: QoS class used.
          QoS = QoS
      
          #: flag to restore unacked messages when channel
          #: goes out of scope.
          do_restore = True
      
          #: mapping of exchange types and corresponding classes.
          exchange_types = dict(STANDARD_EXCHANGE_TYPES)
      
          #: flag set if the channel supports fanout exchanges.
          supports_fanout = False
      
          #: Binary <-> ASCII codecs.
          codecs = {'base64': Base64()}
      
          #: Default body encoding.
          #: NOTE: ``transport_options['body_encoding']`` will override this value.
          body_encoding = 'base64'
      
          #: counter used to generate delivery tags for this channel.
          _delivery_tags = count(1)
      
          #: Optional queue where messages with no route is delivered.
          #: Set by ``transport_options['deadletter_queue']``.
          deadletter_queue = None
      
          # List of options to transfer from :attr:`transport_options`.
          from_transport_options = ('body_encoding', 'deadletter_queue')
      
          # Priority defaults
          default_priority = 0
          min_priority = 0
          max_priority = 9
      

      最終具體舉例如下:

      self = {Channel} <kombu.transport.redis.Channel object at 0x7fe61aa88cc0>
       Client = {type} <class 'redis.client.Redis'>
       Message = {type} <class 'kombu.transport.virtual.base.Message'>
       QoS = {type} <class 'kombu.transport.redis.QoS'>
       active_fanout_queues = {set: 0} set()
       active_queues = {set: 0} set()
       async_pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
       auto_delete_queues = {set: 0} set()
       channel_id = {int} 1
       client = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
       codecs = {dict: 1} {'base64': <kombu.transport.virtual.base.Base64 object at 0x7fe61a987668>}
       connection = {Transport} <kombu.transport.redis.Transport object at 0x7fe61aa399b0>
       connection_class = {type} <class 'redis.connection.Connection'>
       cycle = {FairCycle} <FairCycle: 0/0 []>
       deadletter_queue = {NoneType} None
       exchange_types = {dict: 3} {'direct': <kombu.transport.virtual.exchange.DirectExchange object at 0x7fe61aa53588>, 'topic': <kombu.transport.virtual.exchange.TopicExchange object at 0x7fe61aa53550>, 
       handlers = {dict: 2} {'BRPOP': <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7fe61aa88cc0>>, 'LISTEN': <bound method Channel._receive of <kombu.transport.redis.Channel object at 0x7fe61aa88cc0>>}
       pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
       qos = {QoS} <kombu.transport.redis.QoS object at 0x7fe61aa88e48>
       queue_order_strategy = {str} 'round_robin'
       state = {BrokerState} <kombu.transport.virtual.base.BrokerState object at 0x7fe61a987748>
       subclient = {PubSub} <redis.client.PubSub object at 0x7fe61aa39cc0>
      

      3.2.3 redis消息回調函數

      關于上面成員變量,這里需要說明的是

       handlers = {dict: 2} 
        {
          'BRPOP': <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7fe61aa88cc0>>, 
          'LISTEN': <bound method Channel._receive of <kombu.transport.redis.Channel object at 0x7fe61aa88cc0>>
        }
      

      這是redis有消息時的回調函數,即:

      • BPROP 有消息時候,調用 Channel._brpop_read;
      • LISTEN 有消息時候,調用 Channel._receive;

      3.2.4 Redis 直接相關的主要成員

      與Redis 直接相關的成員定義在:redis/client.py。

      與 Redis 直接相關的主要成員是如下,會利用如下變量進行具體 redis操作:

      • async_pool :redis異步連接池;
      • pool :redis連接池;
      • client :就是StrictRedis之類的driver;
      • subclient :PubSub所用的client;

      分別對應如下類型:

      channel = {Channel} <kombu.transport.redis.Channel object at 0x7fabeea23b00>
       Client = {type} <class 'redis.client.Redis'>
       async_pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
       client = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
       connection = {Transport} <kombu.transport.redis.Transport object at 0x7fabeea23940>
       connection_class = {type} <class 'redis.connection.Connection'>
       connection_class_ssl = {type} <class 'redis.connection.SSLConnection'>
       pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
       subclient = {PubSub} <redis.client.PubSub object at 0x7fabeea97198>
      

      具體代碼如下:

      def _create_client(self, asynchronous=False):
          if asynchronous:
              return self.Client(connection_pool=self.async_pool)
          return self.Client(connection_pool=self.pool)
      
      def _get_pool(self, asynchronous=False):
          params = self._connparams(asynchronous=asynchronous)
          self.keyprefix_fanout = self.keyprefix_fanout.format(db=params['db'])
          return redis.ConnectionPool(**params)
      
      def _get_client(self):
          if redis.VERSION < (3, 2, 0):
              raise VersionMismatch(
                  'Redis transport requires redis-py versions 3.2.0 or later. '
                  'You have {0.__version__}'.format(redis))
          return redis.StrictRedis
      
      @property
      def pool(self):
          if self._pool is None:
              self._pool = self._get_pool()
          return self._pool
      
      @property
      def async_pool(self):
          if self._async_pool is None:
              self._async_pool = self._get_pool(asynchronous=True)
          return self._async_pool
      
      @cached_property
      def client(self):
          """Client used to publish messages, BRPOP etc."""
          return self._create_client(asynchronous=True)
      
      @cached_property
      def subclient(self):
          """Pub/Sub connection used to consume fanout queues."""
          client = self._create_client(asynchronous=True)
          return client.pubsub()
      

      因為添加了Channel,所以此時如下:

      +-----------------+
      | Channel         |
      |                 |      +-----------------------------------------------------------+
      |    client  +---------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
      |                 |      +-----------------------------------------------------------+
      |                 |
      |                 |      +---------------------------------------------------+-+
      |    pool  +---------->  |ConnectionPool<Connection<host=localhost,port=6379 > |
      |                 |      +---------------------------------------------------+-+
      |                 |
      |                 |
      |                 |
      |    connection   |
      |                 |
      +-----------------+
      
      +-------------------+       +---------------------+       +--------------------+
      | Connection        |       | redis.Transport     |       | MultiChannelPoller |
      |                   |       |                     |       |                    |
      |                   |       |                     |       |     _channels      |
      |                   |       |        cycle +------------> |     _fd_to_chan    |
      |     transport +---------> |                     |       |     _chan_to_sock  |
      |                   |       |                     |       |     poller         |
      +-------------------+       +---------------------+       |     after_read     |
                                                                |                    |
      +------------------+                                      +--------------------+
      | Hub              |
      |                  |
      |                  |            +-------------+
      |      poller +---------------> | _poll       |
      |                  |            |             |         +-------+
      |                  |            |    _poller+---------> |  poll |
      +------------------+            |             |         +-------+
                                      +-------------+
      +----------------+         +-------------------+
      | Exchange       |         | Queue             |
      |                |         |                   |
      |                |         |                   |
      |     channel    | <------------+ exchange     |
      |                |         |                   |
      |                |         |                   |
      +----------------+         +-------------------+
      
      

      3.3 channel 與 Connection 聯系

      講到這里,基本道理大家都懂,但是具體兩者之間如何聯系,我們需要再剖析下。

      3.3.1 從Connection得到channel

      在Connection定義中有如下,原來 Connection 是通過 transport 來得到 channel:

      def channel(self):
          """Create and return a new channel."""
          self._debug('create channel')
          chan = self.transport.create_channel(self.connection)
          return chan
      

      3.3.2 Transport具體創建

      在Transport之中有:

      def create_channel(self, connection):
          try:
              return self._avail_channels.pop()
          except IndexError:
              channel = self.Channel(connection)
              self.channels.append(channel)
              return channel
      

      原來在 Transport 有兩個channels 列表:

      self._avail_channels
      self.channels
      

      如果_avail_channels 有內容則直接獲取,否則生成一個新的Channel。

      在真正連接時候,會調用 establish_connection 放入self._avail_channels。

      def establish_connection(self):
          # creates channel to verify connection.
          # this channel is then used as the next requested channel.
          # (returned by ``create_channel``).
          self._avail_channels.append(self.create_channel(self))
          return self     # for drain events
      

      其堆棧如下:

      __init__, redis.py:557
      create_channel, base.py:921
      establish_connection, base.py:939
      _establish_connection, connection.py:801
      _connection_factory, connection.py:866
      retry_over_time, functional.py:313
      _ensure_connection, connection.py:439
      connection, connection.py:859
      channel, connection.py:283
      <module>, node.py:11
      

      3.3.3 建立聯系

      在init中有:

      def __init__(self, *args, **kwargs):
          super().__init__(*args, **kwargs)
      
          if not self.ack_emulation:  # disable visibility timeout
              self.QoS = virtual.QoS
      
          self._queue_cycle = cycle_by_name(self.queue_order_strategy)()
          self.Client = self._get_client()
          self.ResponseError = self._get_response_error()
          self.active_fanout_queues = set()
          self.auto_delete_queues = set()
          self._fanout_to_queue = {}
          self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive}
       
          ......
      
          self.connection.cycle.add(self)  # add to channel poller.
      
          if register_after_fork is not None:
              register_after_fork(self, _after_fork_cleanup_channel)
      

      重點是:

      self.connection.cycle.add(self)  # add to channel poller.
      

      這就是把 Channel與Transport 中的 poller 聯系起來,這樣Transport可以利用Channel去與真實的redis進行交互。

      堆棧如下:

      add, redis.py:277
      __init__, redis.py:531
      create_channel, base.py:920
      establish_connection, base.py:938
      _establish_connection, connection.py:801
      _connection_factory, connection.py:866
      retry_over_time, functional.py:325
      _ensure_connection, connection.py:439
      connection, connection.py:859
      register_with_event_loop, connection.py:266
      main, testUb.py:41
      

      因為已經聯系起來,所以此時如下:

      +-----------------+
      | Channel         |
      |                 |      +-----------------------------------------------------------+
      |    client  +---------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
      |                 |      +-----------------------------------------------------------+
      |                 |
      |                 |      +---------------------------------------------------+-+
      |    pool  +---------->  |ConnectionPool<Connection<host=localhost,port=6379 > |
      |                 |      +---------------------------------------------------+-+
      |                 |
      |                 |   <------------------------------------------------------------+
      |                 |                                                                |
      |    connection +---------------+                                                  |
      |                 |             |                                                  |
      +-----------------+             |                                                  |
                                      v                                                  |
      +-------------------+       +---+-----------------+       +--------------------+   |
      | Connection        |       | redis.Transport     |       | MultiChannelPoller |   |
      |                   |       |                     |       |                    |   |
      |                   |       |                     |       |     _channels +--------+
      |                   |       |        cycle +------------> |     _fd_to_chan    |
      |     transport +---------> |                     |       |     _chan_to_sock  |
      |                   |       |                     |       |     poller         |
      +-------------------+       +---------------------+       |     after_read     |
                                                                |                    |
      +------------------+                                      +--------------------+
      | Hub              |
      |                  |
      |                  |            +-------------+
      |      poller +---------------> | _poll       |
      |                  |            |             |         +-------+
      |                  |            |    _poller+---------> |  poll |
      +------------------+            |             |         +-------+
                                      +-------------+
      +----------------+         +-------------------+
      | Exchange       |         | Queue             |
      |                |         |                   |
      |                |         |                   |
      |     channel    | <------------+ exchange     |
      |                |         |                   |
      |                |         |                   |
      +----------------+         +-------------------+
      
      

      3.3 Transport 與 Hub 聯系

      on_poll_init 這里就是把 kombu.transport.redis.Transport 與 Hub 聯系起來。

      self.poller = poller把Transport與Hub的poll聯系起來。這樣 Transport 就可以利用 poll。

      def on_poll_init(self, poller):
          self.poller = poller
          for channel in self._channels:
              return channel.qos.restore_visible(
                  num=channel.unacked_restore_limit,
              )
      

      此時變量如下:

      poller = {_poll} <kombu.utils.eventio._poll object at 0x7fb9bcd87240>
      self = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7fb9bcdd6a90>
       after_read = {set: 0} set()
       eventflags = {int} 25
       fds = {dict: 0} {}
       poller = {_poll} <kombu.utils.eventio._poll object at 0x7fb9bcd87240>
      

      因此,我們最終如下:

      +-----------------+
      | Channel         |
      |                 |      +-----------------------------------------------------------+
      |    client  +---------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
      |                 |      +-----------------------------------------------------------+
      |                 |
      |                 |      +---------------------------------------------------+-+
      |    pool  +---------->  |ConnectionPool<Connection<host=localhost,port=6379 > |
      |                 |      +---------------------------------------------------+-+
      |                 |
      |                 |   <------------------------------------------------------------+
      |                 |                                                                |
      |    connection +---------------+                                                  |
      |                 |             |                                                  |
      +-----------------+             |                                                  |
                                      v                                                  |
      +-------------------+       +---+-----------------+       +--------------------+   |
      | Connection        |       | redis.Transport     |       | MultiChannelPoller |   |
      |                   |       |                     |       |                    |   |
      |                   |       |                     |       |     _channels +--------+
      |                   |       |        cycle +------------> |     _fd_to_chan    |
      |     transport +---------> |                     |       |     _chan_to_sock  |
      |                   |       |                     |    +<----+  poller         |
      +-------------------+       +---------------------+    |  |     after_read     |
                                                             |  |                    |
      +------------------+                    +--------------+  +--------------------+
      | Hub              |                    |
      |                  |                    v
      |                  |            +-------+-----+
      |      poller +---------------> | _poll       |
      |                  |            |             |         +-------+
      |                  |            |    _poller+---------> |  poll |
      +------------------+            |             |         +-------+
                                      +-------------+
      +----------------+         +-------------------+
      | Exchange       |         | Queue             |
      |                |         |                   |
      |                |         |                   |
      |     channel    | <------------+ exchange     |
      |                |         |                   |
      |                |         |                   |
      +----------------+         +-------------------+
      
      

      0x04 總結

      具體如圖,可以看出來,上面三個基本模塊已經聯系到了一起。

      可以看到,

      • 目前是以Transport為中心,把 Channel代表的真實 redis 與 Hub其中的poll聯系起來,但是具體如何使用則尚未得知。
      • 用戶是通過Connection來作為API入口,connection可以得到Transport。

      既然基本架構已經搭好,所以從下文開始,我們看看 Consumer 是如何運作的。

      0xFF 參考

      celery 7 優秀開源項目kombu源碼分析之registry和entrypoint

      (二)放棄pika,選擇kombu

      kombu消息框架<二>

      AMQP中的概念

      AMQP的基本概念

      深入理解AMQP協議

      kombu和消息隊列總結

      關于epoll版服務器的理解(Python實現)

      celery源碼解讀

      posted @ 2021-03-04 16:47  羅西的思考  閱讀(901)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 一区二区三区激情免费视频| 91久久性奴调教国产免费| 国产中文字幕精品喷潮| 国产婷婷综合在线视频| 欧美亚洲色综久久精品国产| 国产精品亚洲二区亚瑟| 亚洲国产另类久久久精品网站| 国产精品嫩草99av在线| 国产一区二区三中文字幕| 国产精品毛片一区二区| 樱花草视频www日本韩国| 亚洲熟女乱综合一区二区| 精品国产第一国产综合精品| 在线观看成人永久免费网站| 广汉市| 国产精品自偷一区在线观看| 国产中文字幕一区二区| 论坛| 蜜臀av午夜精品福利| free性开放小少妇| 青青草国产精品日韩欧美| 无码人妻一区二区三区AV| 乱色欧美激惰| 国产综合精品一区二区三区| 精品久久精品午夜精品久久| 久久综合激情网| 亚洲综合在线日韩av| 门国产乱子视频观看| 97se亚洲综合自在线| 亚洲乱妇老熟女爽到高潮的片| 视频一区二区三区刚刚碰| 亚洲熟妇精品一区二区| 丰满少妇内射一区| 99国产欧美另类久久久精品| 国产农村老熟女国产老熟女| 国产亚洲精品成人无码精品网站| 欧美z0zo人禽交另类视频| 久久热在线视频精品视频| 亚洲国产无线乱码在线观看| 久久精品国产色蜜蜜麻豆| 香港日本三级亚洲三级|