<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      [源碼分析] 消息隊列 Kombu 之 Producer

      [源碼分析] 消息隊列 Kombu 之 Producer

      0x00 摘要

      本系列我們介紹消息隊列 Kombu。Kombu 的定位是一個兼容 AMQP 協(xié)議的消息隊列抽象。通過本文,大家可以了解 Kombu 中的 Producer 概念。

      0x01 示例代碼

      下面使用如下代碼來進行說明。

      本示例來自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感謝。

      def main(arguments):
          hub = Hub()
          exchange = Exchange('asynt_exchange')
          queue = Queue('asynt_queue', exchange, 'asynt_routing_key')
      
          def send_message(conn):
              producer = Producer(conn)
              producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')
              print('message sent')
      
          def on_message(message):
              print('received: {0!r}'.format(message.body))
              message.ack()
              # hub.stop()  # <-- exit after one message
      
          conn = Connection('redis://localhost:6379')
          conn.register_with_event_loop(hub)
      
          def p_message():
              print(' kombu ')
      
          with Consumer(conn, [queue], on_message=on_message):
              send_message(conn)
              hub.timer.call_repeatedly(3, p_message)
              hub.run_forever()
      
      if __name__ == '__main__':
          sys.exit(main(sys.argv[1:]))
      

      0x02 來由

      前文已經(jīng)完成了構(gòu)建部分,Consumer部分,下面來到了Producer部分,即如下代碼:

      def send_message(conn):
      		producer = Producer(conn)
          producer.publish('hello world', exchange=exchange, routing_key='asynt')
         	print('message sent')
      

      我們知道,Transport需要把Channel與文件信息聯(lián)系起來,但是此時Transport信息如下,文件信息依然沒有,這是我們以后需要留意的

      transport = {Transport} <kombu.transport.redis.Transport object at 0x7f9056a26f98>
       Channel = {type} <class 'kombu.transport.redis.Channel'>
       Cycle = {type} <class 'kombu.utils.scheduling.FairCycle'>
       Management = {type} <class 'kombu.transport.virtual.base.Management'>
       channel_max = {int} 65535
       channels = {list: 2} [<kombu.transport.redis.Channel object at 0x7f9056a57278>, <kombu.transport.redis.Channel object at 0x7f9056b79cc0>]
       client = {Connection} <Connection: redis://localhost:6379// at 0x7f9056a26cc0>
       cycle = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7f9056a436a0>
        after_read = {set: 0} set()
        eventflags = {int} 25
        fds = {dict: 0} {}
        poller = {_poll} <kombu.utils.eventio._poll object at 0x7f9056583048>
       default_connection_params = {dict: 2} {'port': 6379, 'hostname': 'localhost'}
       default_port = {int} 6379
       driver_name = {str} 'redis'
       driver_type = {str} 'redis'
       implements = {Implements: 3} {'asynchronous': True, 'exchange_type': frozenset({'direct', 'topic', 'fanout'}), 'heartbeats': False}
       manager = {Management} <kombu.transport.virtual.base.Management object at 0x7f9056b79be0>
       polling_interval = {NoneType} None
       state = {BrokerState} <kombu.transport.virtual.base.BrokerState object at 0x7f9056a9ec50>
      

      0x03 建立

      3.1 定義

      Producer中,主要變量是:

      • _channel :就是channel;
      • exchange :exchange;

      但是本文示例沒有傳入exchange,這就有些奇怪,我們需要繼續(xù)看看

      class Producer:
          """Message Producer.
      
          Arguments:
              channel (kombu.Connection, ChannelT): Connection or channel.
              exchange (kombu.entity.Exchange, str): Optional default exchange.
              routing_key (str): Optional default routing key.
          """
      
          #: Default exchange
          exchange = None
      
          #: Default routing key.
          routing_key = ''
      
          #: Default serializer to use. Default is JSON.
          serializer = None
      
          #: Default compression method.  Disabled by default.
          compression = None
      
          #: By default, if a defualt exchange is set,
          #: that exchange will be declare when publishing a message.
          auto_declare = True
      
          #: Basic return callback.
          on_return = None
      
          #: Set if channel argument was a Connection instance (using
          #: default_channel).
          __connection__ = None
      

      3.2 init

      init代碼如下。

      def __init__(self, channel, exchange=None, routing_key=None,
                   serializer=None, auto_declare=None, compression=None,
                   on_return=None):
          self._channel = channel
          self.exchange = exchange
          self.routing_key = routing_key or self.routing_key
          self.serializer = serializer or self.serializer
          self.compression = compression or self.compression
          self.on_return = on_return or self.on_return
          self._channel_promise = None
          if self.exchange is None:
              self.exchange = Exchange('')
          if auto_declare is not None:
              self.auto_declare = auto_declare
      
          if self._channel:
              self.revive(self._channel)
      

      3.2.1 轉(zhuǎn)換channel

      這里有個重要轉(zhuǎn)換。

      • 最開始是把輸入?yún)?shù) Connection 賦值到 self._channel。
      • 然后 revive 方法做了轉(zhuǎn)換為 channel,即 self._channel 最終是 channel 類型。

      但是 exchange 依然沒有意義,是 direct 類型。

      代碼如下:

      def revive(self, channel):
          """Revive the producer after connection loss."""
          if is_connection(channel):
              connection = channel
              self.__connection__ = connection
              channel = ChannelPromise(lambda: connection.default_channel)
          if isinstance(channel, ChannelPromise):
              self._channel = channel
              self.exchange = self.exchange(channel)
          else:
              # Channel already concrete
              self._channel = channel
              if self.on_return:
                  self._channel.events['basic_return'].add(self.on_return)
              self.exchange = self.exchange(channel)
      

      此時變量為:

      producer = {Producer} 
       auto_declare = {bool} True
       channel = {Channel} <kombu.transport.redis.Channel object at 0x7f9056a57278>
       compression = {NoneType} None
       connection = {Connection} <Connection: redis://localhost:6379// at 0x7f9056a26cc0>
       exchange = {Exchange} Exchange ''(direct)
       on_return = {NoneType} None
       routing_key = {str} ''
       serializer = {NoneType} None
      

      邏輯如圖:

      +----------------------+               +-------------------+
      | Producer             |               | Channel           |
      |                      |               |                   |        +-----------------------------------------------------------+
      |                      |               |    client  +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
      |      channel   +------------------>  |                   |        +-----------------------------------------------------------+
      |                      |               |    pool           |
      |      exchange        |   +---------> |                   | <------------------------------------------------------------+
      |                      |   |           |                   |                                                              |
      |      connection      |   |    +----> |    connection +---------------+                                                  |
      |             +        |   |    |      |                   |           |                                                  |
      +--+-------------------+   |    |      +-------------------+           |                                                  |
         |          |            |    |                                      v                                                  |
         |          |            |    |      +-------------------+       +---+-----------------+       +--------------------+   |
         |          |            |    |      | Connection        |       | redis.Transport     |       | MultiChannelPoller |   |
         |          +----------------------> |                   |       |                     |       |                    |   |
         |                       |    |      |                   |       |                     |       |     _channels +--------+
         |                       |    |      |                   |       |        cycle +------------> |     _fd_to_chan    |
         |                       |    |      |     transport +---------> |                     |       |     _chan_to_sock  |
         |             +-------->+    |      |                   |       |                     |    +------+ poller         |
         |             |              |      +-------------------+       +---------------------+    |  |     after_read     |
         |             |              |                                                             |  |                    |
         |             |              |                                                             |  +--------------------+
         |             |              |      +------------------+                   +---------------+
         |             |              |      | Hub              |                   |
         |             |              |      |                  |                   v
         |             |              |      |                  |            +------+------+
         |             |              |      |      poller +---------------> | _poll       |
         | publish     |              |      |                  |            |             |         +-------+
         +--------------------------------+  |                  |            |    _poller+---------> |  poll |
                       |              |   |  +------------------+            |             |         +-------+
                       |              |   |                                  +-------------+
          +-------------------+       |   +-----> +----------------+
          | Queue      |      |       |           | Exchange       |
          |      _channel     |       +---------+ |                |
          |                   |                   |                |
          |      exchange +-------------------->  |     channel    |
          |                   |                   |                |
          |                   |                   |                |
          +-------------------+                   +----------------+
      
      

      手機如圖:

      0x04 發(fā)送

      發(fā)送消息是通過producer.publish完成。

      def send_message(conn):
          producer = Producer(conn)
          producer.publish('hello world', exchange=exchange, routing_key='asynt')
          print('message sent')
      

      此時傳入exchange作為參數(shù)。原來如果沒有 Exchange,是可以在這里進行補救

      producer.publish繼續(xù)調(diào)用到如下,可以看到分為兩步:

      • 調(diào)用channel的組裝消息函數(shù)prepare_message
      • 調(diào)用channel的發(fā)送消息basic_publish

      因此,最終發(fā)送消息還是通過channel完成。

      def _publish(self, body, priority, content_type, content_encoding,
                   headers, properties, routing_key, mandatory,
                   immediate, exchange, declare):
          channel = self.channel
          message = channel.prepare_message(
              body, priority, content_type,
              content_encoding, headers, properties,
          )
          if declare:
              maybe_declare = self.maybe_declare
              [maybe_declare(entity) for entity in declare]
      
          # handle autogenerated queue names for reply_to
          reply_to = properties.get('reply_to')
          if isinstance(reply_to, Queue):
              properties['reply_to'] = reply_to.name
          return channel.basic_publish(
              message,
              exchange=exchange, routing_key=routing_key,
              mandatory=mandatory, immediate=immediate,
          )
      

      4.1 組裝消息 in channel

      channel 的組裝消息函數(shù)prepare_message完成組裝功能,基本上是為消息添加各種屬性。

      def prepare_message(self, body, priority=None, content_type=None,
                          content_encoding=None, headers=None, properties=None):
          """Prepare message data."""
          properties = properties or {}
          properties.setdefault('delivery_info', {})
          properties.setdefault('priority', priority or self.default_priority)
      
          return {'body': body,
                  'content-encoding': content_encoding,
                  'content-type': content_type,
                  'headers': headers or {},
                  'properties': properties or {}}
      

      消息如下:

      message = {dict: 5} 
       'body' = {str} 'aGVsbG8gd29ybGQ='
       'content-encoding' = {str} 'utf-8'
       'content-type' = {str} 'text/plain'
       'headers' = {dict: 0} {}
        __len__ = {int} 0
       'properties' = {dict: 5} 
        'delivery_mode' = {int} 2
        'delivery_info' = {dict: 2} {'exchange': 'asynt_exchange', 'routing_key': 'asynt_routing_key'}
        'priority' = {int} 0
        'body_encoding' = {str} 'base64'
        'delivery_tag' = {str} '1b03590e-501c-471f-a5f9-f4fdcbe3379a'
        __len__ = {int} 5
      

      4.2 發(fā)送消息 in channel

      channel的發(fā)送消息basic_publish完成發(fā)送功能。此時使用了傳入的參數(shù)exchange。

      發(fā)送消息basic_publish方法是調(diào)用_put方法:

      def basic_publish(self, message, exchange, routing_key, **kwargs):
          """Publish message."""
          self._inplace_augment_message(message, exchange, routing_key)
          if exchange:
              return self.typeof(exchange).deliver(
                  message, exchange, routing_key, **kwargs
              )
          # anon exchange: routing_key is the destination queue
          return self._put(routing_key, message, **kwargs)
      

      4.3 deliver in exchange

      self.typeof(exchange).deliver代碼接著來到exchange。本文是DirectExchange。

      注意,這里用到了self.channel._put。就是Exchange的成員變量channel。

      class DirectExchange(ExchangeType):
          """Direct exchange.
      
          The `direct` exchange routes based on exact routing keys.
          """
      
          type = 'direct'
      
          def lookup(self, table, exchange, routing_key, default):
              return {
                  queue for rkey, _, queue in table
                  if rkey == routing_key
              }
      
          def deliver(self, message, exchange, routing_key, **kwargs):
              _lookup = self.channel._lookup
              _put = self.channel._put
              for queue in _lookup(exchange, routing_key):
                  _put(queue, message, **kwargs)
      

      4.4 binding 轉(zhuǎn)換

      我們知道,Exchange的作用只是將發(fā)送的 routing_key 轉(zhuǎn)化為 queue 的名字。這樣發(fā)送就知道發(fā)到哪個 queue

      因此依據(jù)_lookup方法得到對應的queue

      def _lookup(self, exchange, routing_key, default=None):
          """Find all queues matching `routing_key` for the given `exchange`.
      
          Returns:
              str: queue name -- must return the string `default`
                  if no queues matched.
          """
          if default is None:
              default = self.deadletter_queue
          if not exchange:  # anon exchange
              return [routing_key or default]
      
          try:
              R = self.typeof(exchange).lookup(
                  self.get_table(exchange),
                  exchange, routing_key, default,
              )
          except KeyError:
              R = []
      
          if not R and default is not None:
              warnings.warn(UndeliverableWarning(UNDELIVERABLE_FMT.format(
                  exchange=exchange, routing_key=routing_key)),
              )
              self._new_queue(default)
              R = [default]
          return R
      

      此處具體邏輯為:

      第一,調(diào)用到channel的方法。這里的 exchange 名字為 asynt_exchange。

      def get_table(self, exchange):
          key = self.keyprefix_queue % exchange
          with self.conn_or_acquire() as client:
              values = client.smembers(key)
              if not values:
                  raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key))
              return [tuple(bytes_to_str(val).split(self.sep)) for val in values]
      

      我們看看Redis內(nèi)容,發(fā)現(xiàn)集合內(nèi)容如下:

      127.0.0.1:6379> smembers _kombu.binding.asynt_exchange
      1) "asynt_routing_key\x06\x16\x06\x16asynt_queue"
      

      第二,因此得到對應binding為:

      {b'asynt_routing_key\x06\x16\x06\x16asynt_queue'}

      即從 exchange 得到 routing_key ---> queue 的規(guī)則,然后再依據(jù) routing_key 得到 queue。就知道 Consumer 和 Producer 需要依據(jù)哪個 queue 交換消息。

      邏輯如下:

                                        +---------------------------------+
                                        |         exchange                |
                                        |                                 |
                       1 routing_key x  |                                 |
      +----------+                      |                                 |      +------------+
      | Producer |  +-----------------> |   routing_key x --->  queue x   |      |  Consumer  |
      +--------+-+                      |                                 |      +------------+
               |                        |   routing_key y --->  queue y   |
               |                        |                                 |           ^
               |                        |   routing_key z --->  queue z   |           |
               |                        |                                 |           |
               |                        +---------------------------------+           |
               |                                                                      |
               |                                                                      |
               |                                                                      |
               |                                                                      |
               |                                                                      |
               |                                                                      |
               |                                                                      |
               |                                                                      |
               |                                  +-----------+                       |
               |        2 message                 |           |        3 message      |
               +------------------------------->  |  queue X  |  +--------------------+
                                                  |           |
                                                  +-----------+
      
      

      4.5 _put in channel

      channel的_put 方法被用來繼續(xù)處理,可以看到其最終調(diào)用到了client.lpush。

      client為:

      Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
      

      代碼為:

      def _put(self, queue, message, **kwargs):
          """Deliver message."""
          pri = self._get_message_priority(message, reverse=False)
      
          with self.conn_or_acquire() as client:
              client.lpush(self._q_for_pri(queue, pri), dumps(message))
      

      redis怎么區(qū)別不同的queue?

      實際是每個 queue 被賦予一個字符串 name,這個 name 就是 redis 對應的 list 的 key。知道應該向哪個 list 放消息,后續(xù)就是向此 list 中 lpush 消息。

      如下方法完成轉(zhuǎn)換功能。

      def _q_for_pri(self, queue, pri):
          pri = self.priority(pri)
          if pri:
              return f"{queue}{self.sep}{pri}"
          return queue
      

      現(xiàn)在發(fā)消息之后,redis內(nèi)容如下,我們可以看出來,消息作為list 的item,放入到之中。

      127.0.0.1:6379> lrange asynt_queue 0 -1
      1) "{\"body\": \"aGVsbG8gd29ybGQ=\", \"content-encoding\": \"utf-8\", \"content-type\": \"text/plain\", \"headers\": {}, \"properties\": {\"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"asynt_exchange\", \"routing_key\": \"asynt_routing_key\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"df7af424-e1ab-4c08-84b5-1cd5c97ed25d\"}}"
      127.0.0.1:6379> 
      

      0x05 總結(jié)

      現(xiàn)在我們總結(jié)如下:

      • Producers: 發(fā)送消息的抽象類;
      • Consumers:接受消息的抽象類,consumer需要聲明一個queue,并將queue與指定的exchange綁定,然后從queue里面接收消息;
      • Exchange:MQ 路由,消息發(fā)送者將消息發(fā)至Exchange,Exchange負責將消息分發(fā)至隊列;
      • Queue:對應的 queue 抽象,存儲著即將被應用消費掉的消息,Exchange負責將消息分發(fā)Queue,消費者從Queue接收消息;
      • Channel:與AMQP中概念類似,可以理解成共享一個Connection的多個輕量化連,就是真實redis連接;

      于是邏輯鏈已經(jīng)形成,大約是這樣的:

      • Producer的publish方法接受參數(shù)Exchange,于是就發(fā)送消息到此Exchange;
      • Producer調(diào)用channel的組裝消息函數(shù)prepare_message為消息添加各種屬性;
      • Producer調(diào)用channel的發(fā)送消息basic_publish發(fā)送消息,此時使用了傳入的參數(shù)exchange。
      • basic_publish方法調(diào)用exchange.deliver(exchange, routing_key)來發(fā)送消息;
      • Exchange中有成員變量Channel,也有成員變量Queues,每個queue對應一個routing_key;
      • deliver使用_lookup方法依據(jù)key得到對應的queue;
      • deliver使用Exchange成員變量Channel的_put方法來向queue中投放消息;
      • Channel拿到自己的redis連接池,即client為Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>;于是可以基于此進行redis操作;
      • redis怎么區(qū)別不同的queue,實際是每個queue被賦予一個字符串name,這就是redis對應的list的key;
      • 既然得到了名字為queue的list,則向此list中l(wèi)push消息。
      • Consumer去Queue取消息;

      動態(tài)邏輯如下:

             +------------+                        +------------+               +------------+      +-----------------------+
             |  producer  |                        |  channel   |               |  exchange  |      | Redis<ConnectionPool> |
             +---+--------+                        +----+-------+               +-------+----+      +----------+------------+
                 |                                      |                               |                      |
                 |                                      |                               |                      |
      publish('', exchange, routing_key)                |                               |                      |
                 |                                      |                               |                      |
                 |   prepare_message                    |                               |                      |
                 |                                      |                               |                      |
                 | +----------------------------------> |                               |                      |
                 |                                      |                               |                      |
                 | basic_publish (exchange, routing_key)|                               |                      |
                 |                                      |                               |                      |
                 | +----------------------------------> |                               |                      |
                 |                                      |                               |                      |
                 |                                      | deliver(exchange, routing_key)|                      |
                 |                                      |                               |                      |
                 |                                      +-----------------------------> |                      |
                 |                                      |                               |                      |
                 |                                      |                               |                      |
                 |                                      |                _lookup(exchange, routing_key)        |
                 |                                      |                               |                      |
                 |                                      |                               |                      |
                 |                                      |    _put(queue, message)       |                      |
                 |                                      v                               |                      |
                 |                                      | <---------------------------+ |                      |
                 |                                      |                               |                      |
                 |                                _q_for_pri(queue, pri)                |                      |
                 |                                      +                               |                      |
                 v                                      |                               |                      |
                 |                                      |     client.lpush              |                      |
                 |                                      | +--------------------------------------------------> |
                 |                                      |                               |                      |
                 v                                      v                               v                      v
      
      

      手機如下:

      0xFF 參考

      celery 7 優(yōu)秀開源項目kombu源碼分析之registry和entrypoint

      放棄pika,選擇kombu

      kombu消息框架<二>

      AMQP中的概念

      AMQP的基本概念

      深入理解AMQP協(xié)議

      kombu和消息隊列總結(jié)

      關(guān)于epoll版服務器的理解(Python實現(xiàn))

      posted @ 2021-03-12 22:27  羅西的思考  閱讀(681)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 亚洲一级特黄大片一级特黄| 夜夜躁狠狠躁日日躁| 国产精品三级黄色小视频| 亚洲av本道一区二区| 华池县| 女同亚洲精品一区二区三| 亚洲中文字幕亚洲中文精| 国产成人一区二区三区在线| 亚洲欧美人成网站在线观看看| 久久91精品牛牛| 成人福利国产午夜AV免费不卡在线| 久久这里只有精品首页| 日韩av一区二区精品不卡| 美女爽到高潮嗷嗷嗷叫免费网站| 婷婷六月色| 四虎影院176| 精品人妻一区二区三区蜜臀| 成人午夜电影福利免费| 精品少妇av蜜臀av| 妓女妓女一区二区三区在线观看| 国产精品无码一区二区三区电影 | 亚洲国产亚洲国产路线久久| 国产精品一区二区不卡91| 在线看片免费人成视频久网| 亚洲综合网国产精品一区| 久久精品免视看成人国产| 自偷自拍亚洲综合精品| 国产日韩av二区三区| 香港日本三级亚洲三级| 国产成人人综合亚洲欧美丁香花| 777米奇影视第四色| 精品一二三四区在线观看| 国产中文字幕在线一区| 亚洲18禁私人影院| 国产亚洲中文字幕久久网| 亚洲精品日韩中文字幕| 久久综合伊人77777| 极品无码国模国产在线观看| 一卡2卡三卡4卡免费网站| 大化| 国产人妻大战黑人20p|