[源碼分析] 消息隊列 Kombu 之 Consumer
[源碼分析] 消息隊列 Kombu 之 Consumer
- [源碼分析] 消息隊列 Kombu 之 Consumer
- 0x00 摘要
- 0x01 綜述功能
- 0x02 示例代碼
- 0x03 定義
- 0x04 Init
- 0x05 完善聯(lián)系
- 0x06 消費消息
- 6.1 drain_events in Connection
- 6.2 drain_events in Transport
- 6.3 get in MultiChannelPoller
- 6.3.1 _register_BRPOP in MultiChannelPoller
- 6.3.2 register in _poll
- 6.3.3 poll(timeout) in MultiChannelPoller
- 6.3.4 注冊到redis驅(qū)動,負(fù)載均衡
- 6.3.4 handle_event in MultiChannelPoller
- 6.3.5 on_readable in MultiChannelPoller
- 6.3.6 _brpop_read in Channel
- 6.3.7 從redis讀取
- 6.3.8 回到_brpop_read
- 6.3.9 _deliver in Transport
- 6.3.10 basic_consume in Channel
- 6.3.11 _receive_callback in Consumer
- 0xFF 參考
0x00 摘要
本系列我們介紹消息隊列 Kombu。Kombu 的定位是一個兼容 AMQP 協(xié)議的消息隊列抽象。通過本文,大家可以了解 Kombu 中的 Consumer 概念。
0x01 綜述功能
Consumer 的作用主要如下:
- Exchange:MQ 路由,消息發(fā)送者將消息發(fā)至Exchange,Exchange負(fù)責(zé)將消息分發(fā)至隊列。
- Queue:對應(yīng)的隊列抽象,存儲著即將被應(yīng)用消費掉的消息,Exchange負(fù)責(zé)將消息分發(fā)Queue,消費者從Queue接收消息;
- Consumers : 是接受消息的抽象類,consumer需要聲明一個queue,并將queue與指定的exchange綁定,然后從queue里面接收消息。就是說,從用戶角度,知道了一個 exchange,就可以從中讀取消息,具體這個消息就是從 queue 中讀取的。
在具體的實現(xiàn)中,Consumer 把 queue 與 channel 聯(lián)系起來。queue 里面有一個 channel,用來訪問redis。Queue 也有 Exchange,知道訪問具體 redis 哪個key(就是queue對應(yīng)的那個key)。即 Consumer 消費消息是通過 Queue 來消費,然后 Queue 又轉(zhuǎn)嫁給 Channel。
所以服務(wù)端的邏輯大致為:
- 建立連接;
- 創(chuàng)建Exchange ;
- 創(chuàng)建Queue,并將Exchange與Queue綁定,Queue的名稱為routing_key ;
- 創(chuàng)建Consumer對Queue監(jiān)聽;
0x02 示例代碼
下面使用如下代碼來進(jìn)行說明。
本示例來自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感謝。
def main(arguments):
hub = Hub()
exchange = Exchange('asynt_exchange')
queue = Queue('asynt_queue', exchange, 'asynt_routing_key')
def send_message(conn):
producer = Producer(conn)
producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')
print('message sent')
def on_message(message):
print('received: {0!r}'.format(message.body))
message.ack()
# hub.stop() # <-- exit after one message
conn = Connection('redis://localhost:6379')
conn.register_with_event_loop(hub)
def p_message():
print(' kombu ')
with Consumer(conn, [queue], on_message=on_message):
send_message(conn)
hub.timer.call_repeatedly(3, p_message)
hub.run_forever()
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
前文已經(jīng)完成了構(gòu)建部分,下面來到了Consumer部分,即如下代碼:
with Consumer(conn, [queue], on_message=on_message):
send_message(conn)
hub.timer.call_repeatedly(
3, p_message
)
hub.run_forever()
0x03 定義
3.1 定義
Consumer主要成員變量如下:
- channel:存在 (kombu.Connection, Channel) 這兩種可能,一個 Connection 就對應(yīng)一個 MQ 的連接,Channel可以理解成共享一個Connection的多個輕量化連接。
- queues:(Sequence[kombu.Queue])類型。對應(yīng) queue 抽象,存儲著即將被應(yīng)用消費掉的消息,Exchange負(fù)責(zé)將消息分發(fā)Queue,消費者從Queue接收消息
- on_message:消息響應(yīng)方法;
這也是調(diào)用時傳入的變量。
class Consumer:
"""Message consumer.
Arguments:
channel (kombu.Connection, ChannelT): see :attr:`channel`.
queues (Sequence[kombu.Queue]): see :attr:`queues`.
no_ack (bool): see :attr:`no_ack`.
auto_declare (bool): see :attr:`auto_declare`
callbacks (Sequence[Callable]): see :attr:`callbacks`.
on_message (Callable): See :attr:`on_message`
on_decode_error (Callable): see :attr:`on_decode_error`.
prefetch_count (int): see :attr:`prefetch_count`.
"""
#: The connection/channel to use for this consumer.
channel = None
#: A single :class:`~kombu.Queue`, or a list of queues to
#: consume from.
queues = None
#: Flag for automatic message acknowledgment.
no_ack = None
#: List of callbacks called in order when a message is received.
callbacks = None
#: Optional function called whenever a message is received.
on_message = None
#: List of accepted content-types.
accept = None
#: Initial prefetch count
prefetch_count = None
#: Mapping of queues we consume from.
_queues = None
_tags = count(1) # global
3.2 Queue
我們也給出 Queue 的定義,其中主要成員變量如下:
- exchange (Exchange): 就是 queue 綁定的 Exchange;
- routing_key (str): 就是 queue 對應(yīng)的 key;
- channel :queue 綁定的 信道;
具體定義如下:
class Queue(MaybeChannelBound):
"""A Queue declaration.
channel (ChannelT): The channel the Queue is bound to (if bound).
"""
ContentDisallowed = ContentDisallowed
name = ''
exchange = Exchange('')
routing_key = ''
durable = True
exclusive = False
auto_delete = False
no_ack = False
attrs = (
('name', None),
('exchange', None),
('routing_key', None),
('queue_arguments', None),
('binding_arguments', None),
('consumer_arguments', None),
('durable', bool),
('exclusive', bool),
('auto_delete', bool),
('no_ack', None),
('alias', None),
('bindings', list),
('no_declare', bool),
('expires', float),
('message_ttl', float),
('max_length', int),
('max_length_bytes', int),
('max_priority', int)
)
0x04 Init
在此方法中,先處理調(diào)用,隨之建立聯(lián)系。
def __init__(self, channel, queues=None, no_ack=None, auto_declare=None,
callbacks=None, on_decode_error=None, on_message=None,
accept=None, prefetch_count=None, tag_prefix=None):
self.channel = channel
self.queues = maybe_list(queues or [])
self.no_ack = self.no_ack if no_ack is None else no_ack
self.callbacks = (self.callbacks or [] if callbacks is None
else callbacks)
self.on_message = on_message
self.tag_prefix = tag_prefix
self._active_tags = {}
self.accept = prepare_accept_content(accept)
self.prefetch_count = prefetch_count
if self.channel:
self.revive(self.channel)
4.1 處理調(diào)用
4.1.1 queues
傳入的參數(shù)queues被作為成員變量保存起來。
self.queues = maybe_list(queues or [])
4.1.2 channel
傳入的參數(shù)Connection被作為成員變量保存起來。
self.channel = channel
4.1.3 on_message
傳入的參數(shù)on_message 作為消息響應(yīng)方法保存起來。
self.on_message = on_message
4.2 建立聯(lián)系
用如下方法把 Exchange,Queue 與 Connection 聯(lián)系起來。
def revive(self, channel):
"""Revive consumer after connection loss."""
self._active_tags.clear()
channel = self.channel = maybe_channel(channel)
# modify dict size while iterating over it is not allowed
for qname, queue in list(self._queues.items()):
# name may have changed after declare
self._queues.pop(qname, None)
queue = self._queues[queue.name] = queue(self.channel)
queue.revive(channel)
if self.auto_declare:
self.declare()
if self.prefetch_count is not None:
self.qos(prefetch_count=self.prefetch_count)
進(jìn)一步調(diào)用:
when_bound, entity.py:598
maybe_bind, abstract.py:76
bind, abstract.py:70
bind, entity.py:590
__call__, abstract.py:66
revive, messaging.py:400
__init__, messaging.py:382
main, testUb.py:46
<module>, testUb.py:55
由此進(jìn)入到了Queue類。
4.2.1 channel與queue
這里用如下方法把queue與channel聯(lián)系起來。queue 里面有一個 channel,用來訪問redis,Queue 也有 Exchange,知道訪問具體 redis 哪里。
每一個 Consumer 初始化的時候都是和 Channel 綁定的,也就是說我們 Consumer 包含了 Queue 也就和 Connection 關(guān)聯(lián)起來了!
Consumer 消費消息是通過 Queue 來消費,然后 Queue 又轉(zhuǎn)嫁給 Channel。
channel = {Channel} <kombu.transport.redis.Channel object at 0x7f9056a57278>
self = {Queue} <Queue asynt -> <Exchange asynt(direct) bound to chan:1> -> asynt bound to chan:1>
這樣,conneciton就是queue的成員變量。
def revive(self, channel):
"""Revive channel after the connection has been re-established.
"""
if self.is_bound:
self._channel = channel
self.when_bound()
4.2.2 channel與exchange
之前我們知道,Queue是包括了exchange成員變量,目前channel也是exchange的成員變量。
Exchange:交換機(jī),消息發(fā)送者將消息發(fā)至Exchange,Exchange負(fù)責(zé)將消息分發(fā)至隊列。
于是經(jīng)由如下方法,準(zhǔn)備把channel與exchange聯(lián)系起來。
def when_bound(self):
if self.exchange:
self.exchange = self.exchange(self.channel)
此時變量如下:
channel = {Channel} <kombu.transport.redis.Channel object at 0x7f9056a57278>
self = {Exchange} Exchange asynt(direct)
進(jìn)而直接在Exchange基類,使用方法maybe_bind把channel與exchange聯(lián)系起來。
class MaybeChannelBound(Object):
"""Mixin for classes that can be bound to an AMQP channel."""
_channel = None
def __call__(self, channel):
"""`self(channel) -> self.bind(channel)`."""
return self.bind(channel)
def bind(self, channel):
"""Create copy of the instance that is bound to a channel."""
return copy(self).maybe_bind(channel)
def maybe_bind(self, channel):
"""Bind instance to channel if not already bound."""
if not self.is_bound and channel:
self._channel = maybe_channel(channel)
self.when_bound()
self._is_bound = True
return self
4.2.3 Exchange & Binding
這里會把 Exchange 和 queue 聯(lián)系。就是把 Exchange 和 routing_key 聯(lián)系起來,然后把這些聯(lián)系規(guī)則放到redis 之中。
堆棧如下:
_queue_bind, redis.py:814
queue_bind, base.py:568
bind_to, entity.py:674
queue_bind, entity.py:662
_create_queue, entity.py:617
declare, entity.py:606
declare, messaging.py:417
revive, messaging.py:404
__init__, messaging.py:382
具體為
class Queue(MaybeChannelBound):
def __init__(self, name='', exchange=None, routing_key='',
channel=None, bindings=None, on_declared=None,
**kwargs):
super().__init__(**kwargs)
self.name = name or self.name
if isinstance(exchange, str):
self.exchange = Exchange(exchange)
elif isinstance(exchange, Exchange):
self.exchange = exchange
self.routing_key = routing_key or self.routing_key
self.bindings = set(bindings or [])
self.on_declared = on_declared
# allows Queue('name', [binding(...), binding(...), ...])
if isinstance(exchange, (list, tuple, set)):
self.bindings |= set(exchange)
if self.bindings:
self.exchange = None
# exclusive implies auto-delete.
if self.exclusive:
self.auto_delete = True
self.maybe_bind(channel)
def queue_bind(self, nowait=False, channel=None):
"""Create the queue binding on the server."""
return self.bind_to(self.exchange, self.routing_key,
self.binding_arguments,
channel=channel, nowait=nowait)
def bind_to(self, exchange='', routing_key='',
arguments=None, nowait=False, channel=None):
if isinstance(exchange, Exchange):
exchange = exchange.name
return (channel or self.channel).queue_bind(
queue=self.name,
exchange=exchange,
routing_key=routing_key,
arguments=arguments,
nowait=nowait,
)
4.2.3.1 Channel binding
具體調(diào)用到Channel,代碼位于 kombu/transport/redis.py。
def _queue_bind(self, exchange, routing_key, pattern, queue):
if self.typeof(exchange).type == 'fanout':
# Mark exchange as fanout.
self._fanout_queues[queue] = (
exchange, routing_key.replace('#', '*'),
)
with self.conn_or_acquire() as client:
client.sadd(self.keyprefix_queue % (exchange,),
self.sep.join([routing_key or '',
pattern or '',
queue or '']))
代碼然后調(diào)用到redis client。
# SET COMMANDS
def sadd(self, name, *values):
"Add ``value(s)`` to set ``name``"
return self.execute_command('SADD', name, *values)
具體變量如下,我們代碼中,exchange內(nèi)容為_kombu.binding.asynt_exchange。routing_key的是asynt_routing_key。
name = {str} '_kombu.binding.asynt_exchange'
self = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
values = {tuple: 1} asynt_routing_keysynt_queue
我們看看Redis內(nèi)容,發(fā)現(xiàn)新建內(nèi)容如下:
127.0.0.1:6379> smembers _kombu.binding.asynt_exchange
1) "asynt_routing_key\x06\x16\x06\x16asynt_queue"
集合名字為:self.keyprefix_queue % (exchange,), 對于我們就為:_kombu.binding.asynt_exchange。
集合每個item為:routing_key + sep + pattern + sep + queue。我們這里sep = '\x06\x16'。
4.2.3.2 使用
當(dāng)發(fā)消息時候,Exchange的作用是將發(fā)送的 routing_key 轉(zhuǎn)化為 queue 的名字。這樣發(fā)送就知道發(fā)到哪個 queue 。這里的 exchange 內(nèi)容為 _kombu.binding.asynt_exchange。
def get_table(self, exchange):
key = self.keyprefix_queue % exchange
with self.conn_or_acquire() as client:
values = client.smembers(key)
if not values:
raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key))
return [tuple(bytes_to_str(val).split(self.sep)) for val in values]
得到的集合內(nèi)容為:
{b'asynt_routing_key\x06\x16\x06\x16asynt_queue'}
即從 exchange 得到 routing_key ---> queue 的規(guī)則,然后再依據(jù) routing_key 得到 queue。就知道 Consumer 和 Producer 需要依據(jù)哪個 queue 交換消息。
邏輯如下:
+---------------------------------+
| exchange |
| |
1 routing_key x | |
+----------+ | | +------------+
| Producer | +-----------------> | routing_key x ---> queue x | | Consumer |
+--------+-+ | | +------------+
| | routing_key y ---> queue y |
| | | ^
| | routing_key z ---> queue z | |
| | | |
| +---------------------------------+ |
| |
| |
| |
| |
| |
| |
| |
| |
| +-----------+ |
| 2 message | | 3 message |
+-------------------------------> | queue X | +--------------------+
| |
+-----------+
因此,此時總體邏輯如下圖:
+----------------------+ +-------------------+
| Consumer | | Channel |
| | | | +-----------------------------------------------------------+
| | | client +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
| channel +--------------------> | | +-----------------------------------------------------------+
| | | pool |
| | +---------> | | <------------------------------------------------------------+
| queues | | | | |
| | | +----> | connection +---------------+ |
| | | | | | | | |
+----------------------+ | | +-------------------+ | |
| | | v |
| | | +-------------------+ +---+-----------------+ +--------------------+ |
| | | | Connection | | redis.Transport | | MultiChannelPoller | |
| | | | | | | | | |
| | | | | | | | _channels +--------+
| | | | | | cycle +------------> | _fd_to_chan |
| | | | transport +---------> | | | _chan_to_sock |
| +-------->+ | | | | | +------+ poller |
| | | +-------------------+ +---------------------+ | | after_read |
| | | | | |
| | | | +--------------------+
| | | +------------------+ +---------------+
| | | | Hub | |
| | | | | v
| | | | | +------+------+
| | | | poller +---------------> | _poll |
| | | | | | | +-------+
| | | | | | _poller+---------> | poll |
v | | +------------------+ | | +-------+
| | +-------------+
+-------------------+ | +----------------+
| Queue | | | | Exchange |
| _chann+l | +----+ | |
| | | |
| exchange +----------------> | channel |
| | | |
| | | |
+-------------------+ +----------------+
手機(jī)如下:

現(xiàn)在我們知道:
- Consumers:接受消息的抽象類,consumer需要聲明一個queue,并將queue與指定的exchange綁定,然后從queue里面接收消息。
- Exchange:MQ 路由,消息發(fā)送者將消息發(fā)至Exchange,Exchange負(fù)責(zé)將消息分發(fā)至隊列。
- Queue:對應(yīng)的 queue 抽象,存儲著即將被應(yīng)用消費掉的消息,Exchange負(fù)責(zé)將消息分發(fā)Queue,消費者從Queue接收消息;
- Channel:與AMQP中概念類似,可以理解成共享一個Connection的多個輕量化連;
于是邏輯鏈已經(jīng)形成,大約是這樣的,后文完善:
- Producer發(fā)送消息到Exchange;
- Exchange中有成員變量Channel,也有成員變量Queues。
- 于是Exchange負(fù)責(zé)通過Channel將消息分發(fā)至Queue,Exchange的作用只是將發(fā)送的
routing_key轉(zhuǎn)化為queue的名字。 - Consumer去Queue取消息;
邏輯大致通了,但是缺少動態(tài)操作完成此邏輯,我們將在后續(xù)完善動態(tài)邏輯。
0x05 完善聯(lián)系
在init之后,第二步會完善聯(lián)系。
python的上下文管理。在python中實現(xiàn)了__enter__和__exit__方法,即支持上下文管理器協(xié)議。上下文管理器就是支持上下文管理器協(xié)議的對象,它是為了with而生。當(dāng)with語句在開始運行時,會在上下文管理器對象上調(diào)用 enter 方法。with語句運行結(jié)束后,會在上下文管理器對象上調(diào)用 exit 方法。
所以這里是調(diào)用__enter__,即 consumer 函數(shù),其目的如下:
- 調(diào)用Channel繼續(xù)處理,
Channel將Consumer標(biāo)簽,Consumer要消費的隊列,以及標(biāo)簽與隊列的映射關(guān)系都記錄下來,等待循環(huán)調(diào)用。 - 另外,還通過
Transport將隊列與回調(diào)函數(shù)列表的映射關(guān)系記錄下來,以便于從隊列中取出消息后執(zhí)行回調(diào)函數(shù)。
class Consumer:
"""Message consumer.
Arguments:
channel (kombu.Connection, ChannelT): see :attr:`channel`.
queues (Sequence[kombu.Queue]): see :attr:`queues`.
no_ack (bool): see :attr:`no_ack`.
auto_declare (bool): see :attr:`auto_declare`
callbacks (Sequence[Callable]): see :attr:`callbacks`.
on_message (Callable): See :attr:`on_message`
on_decode_error (Callable): see :attr:`on_decode_error`.
prefetch_count (int): see :attr:`prefetch_count`.
"""
def __enter__(self):
self.consume()
return self
5.1 遍歷Queue
使用_basic_consume方法處理Consumer相關(guān)的隊列列表中的每一項,其中處理最后一個Queue時設(shè)置標(biāo)志nowait=False。
def consume(self, no_ack=None):
"""Start consuming messages.
Can be called multiple times, but note that while it
will consume from new queues added since the last call,
it will not cancel consuming from removed queues (
use :meth:`cancel_by_queue`).
Arguments:
no_ack (bool): See :attr:`no_ack`.
"""
queues = list(self._queues.values())
if queues:
no_ack = self.no_ack if no_ack is None else no_ack
H, T = queues[:-1], queues[-1]
for queue in H:
self._basic_consume(queue, no_ack=no_ack, nowait=True)
self._basic_consume(T, no_ack=no_ack, nowait=False)
_basic_consume方法代碼如下:
是將消費者標(biāo)簽以及回調(diào)函數(shù)傳給Queue的consume方法。
def _basic_consume(self, queue, consumer_tag=None,
no_ack=no_ack, nowait=True):
tag = self._active_tags.get(queue.name)
if tag is None:
tag = self._add_tag(queue, consumer_tag)
queue.consume(tag, self._receive_callback,
no_ack=no_ack, nowait=nowait)
return tag
5.2 consume in Queue
對于每一個 queue,都會調(diào)用其 consume 函數(shù)。
Queue的consume方法代碼:
class Queue(MaybeChannelBound):
def consume(self, consumer_tag='', callback=None,
no_ack=None, nowait=False):
"""Start a queue consumer.
Consumers last as long as the channel they were created on, or
until the client cancels them.
Arguments:
consumer_tag (str): Unique identifier for the consumer.
The consumer tag is local to a connection, so two clients
can use the same consumer tags. If this field is empty
the server will generate a unique tag.
no_ack (bool): If enabled the broker will automatically
ack messages.
nowait (bool): Do not wait for a reply.
callback (Callable): callback called for each delivered message.
"""
if no_ack is None:
no_ack = self.no_ack
return self.channel.basic_consume(
queue=self.name,
no_ack=no_ack,
consumer_tag=consumer_tag or '',
callback=callback,
nowait=nowait,
arguments=self.consumer_arguments)
前面提到,queue與channel已經(jīng)聯(lián)系了起來。
每一個 Consumer 初始化的時候都是和 Channel 綁定的,也就是說我們 Consumer 包含了 Queue 也就和 Connection 關(guān)聯(lián)起來了!
Consumer 消費消息是通過 Queue 來消費,然后 Queue 又轉(zhuǎn)嫁給 Channel。
5.3 consume in Channel
因此又回到了Channel,就是Channel的basic_consume代碼:
調(diào)用到基類basic_consume方法。
class Channel(virtual.Channel):
def basic_consume(self, queue, *args, **kwargs):
if queue in self._fanout_queues:
exchange, _ = self._fanout_queues[queue]
self.active_fanout_queues.add(queue)
self._fanout_to_queue[exchange] = queue
ret = super().basic_consume(queue, *args, **kwargs)
# Update fair cycle between queues.
#
# We cycle between queues fairly to make sure that
# each queue is equally likely to be consumed from,
# so that a very busy queue will not block others.
#
# This works by using Redis's `BRPOP` command and
# by rotating the most recently used queue to the
# and of the list. See Kombu github issue #166 for
# more discussion of this method.
self._update_queue_cycle()
return ret
基類是 virtual.Channel,其作用是:
Channel將Consumer標(biāo)簽,Consumer要消費的隊列,以及標(biāo)簽與隊列的映射關(guān)系都記錄下來,等待循環(huán)調(diào)用。另外,還通過Transport將隊列與回調(diào)函數(shù)列表的映射關(guān)系記錄下來,以便于從隊列中取出消息后執(zhí)行回調(diào)函數(shù)。
變量是:
- _tag_to_queue:標(biāo)簽與隊列的映射關(guān)系;
- _active_queues:
Consumer要消費的隊列; - _consumers:
Consumer標(biāo)簽; - connection:
Transport; - connection._callbacks:隊列與回調(diào)函數(shù)列表的映射關(guān)系;
數(shù)值如下:
self._tag_to_queue = {dict: 1} {'None1': 'asynt'}
self._active_queues = {list: 1} ['asynt']
self._consumers = {set: 1} {'None1'}
self.connection = {Transport} <kombu.transport.redis.Transport object at 0x7fb3ee0155f8>
self.connection._callbacks = {dict: 1} {'asynt': <function Channel.basic_consume.<locals>._callback at 0x7fb3ecd4a2f0>}
代碼如下:
def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
"""Consume from `queue`."""
self._tag_to_queue[consumer_tag] = queue
self._active_queues.append(queue)
def _callback(raw_message):
message = self.Message(raw_message, channel=self)
if not no_ack:
self.qos.append(message, message.delivery_tag)
return callback(message)
self.connection._callbacks[queue] = _callback
self._consumers.add(consumer_tag)
self._reset_cycle()
_reset_cycle 代碼如下,看起來就是調(diào)用了 FairCycle,實際上沒有用到,因為cycle已經(jīng)有預(yù)設(shè)。cycle是一個MultiChannelPoller實例。
def _reset_cycle(self):
self._cycle = FairCycle(
self._get_and_deliver, self._active_queues, Empty)
具體如下圖:
+----------+ +-------+ +---------+
| Consumer | | Queue | | Channel |
+----+-----+ +---+---+ +-----+---+
| | |
| | |
__enter__ | |
| | |
| | |
consume | |
| | |
| | |
_basic_consume | |
| | |
| | |
| consume | |
+------------> | |
| | basic_consume |
| | |
| | +-----------> |
| | |
| | |
| | _reset_cycle
| | |
| | |
| | |
| | |
| | |
v v v
0x06 消費消息
為了更好的分析,我們暫時注銷hub,使用drain_events消費消息,這樣更直觀。
就是說,Consumer 已經(jīng)和 Channel 聯(lián)系起來,知道讀取redis 中的哪個key。但是現(xiàn)在缺少一個讀取消息的引擎。這個引擎可以驅(qū)動消息讀取,每次有消息,就調(diào)用 consumer 中的回調(diào)函數(shù)來處理消息。
在沒有引擎的情況下,drain_events 就可以起到引擎的作用。
with Consumer(conn, [queue], on_message=on_message):
send_message(conn)
# hub.timer.call_repeatedly(3, p_message)
# hub.run_forever()
conn.drain_events(timeout=1)
6.1 drain_events in Connection
drain_events 調(diào)用 Connection 的方法來進(jìn)行消費。
def drain_events(self, **kwargs):
"""Wait for a single event from the server.
Arguments:
timeout (float): Timeout in seconds before we give up.
"""
return self.transport.drain_events(self.connection, **kwargs)
6.2 drain_events in Transport
在 Transport中的drain_events ,是在無限執(zhí)行get(self._deliver, timeout=timeout)
get是self.cycle的一個方法,cycle是一個MultiChannelPoller實例:
所以get是<bound method MultiChannelPoller.get of <kombu.transport.redis.MultiChannelPoller object at 0x7feab312b358>>
def drain_events(self, connection, timeout=None):
time_start = monotonic()
get = self.cycle.get
polling_interval = self.polling_interval
if timeout and polling_interval and polling_interval > timeout:
polling_interval = timeout
while 1:
try:
get(self._deliver, timeout=timeout)
except Empty:
if timeout is not None and monotonic() - time_start >= timeout:
raise socket.timeout()
if polling_interval is not None:
sleep(polling_interval)
else:
break
6.3 get in MultiChannelPoller
Transport相關(guān)聯(lián)的每一個channel都要執(zhí)行drain_events。具體分兩步:
-
對于每一個channel都注冊;
-
進(jìn)行poll;
代碼如下:
def get(self, callback, timeout=None):
self._in_protected_read = True
try:
for channel in self._channels:
if channel.active_queues: # BRPOP mode?
if channel.qos.can_consume():
self._register_BRPOP(channel)
if channel.active_fanout_queues: # LISTEN mode?
self._register_LISTEN(channel)
events = self.poller.poll(timeout)
if events:
for fileno, event in events:
ret = self.handle_event(fileno, event)
if ret:
return
# - no new data, so try to restore messages.
# - reset active redis commands.
self.maybe_restore_messages()
raise Empty()
finally:
self._in_protected_read = False
while self.after_read:
try:
fun = self.after_read.pop()
except KeyError:
break
else:
fun()
6.3.1 _register_BRPOP in MultiChannelPoller
具體注冊如下,我們先來看看 _register_BRPOP,這里做了兩個判斷,第一個是判斷當(dāng)前的 channel 是否放進(jìn)了 epoll 模型里面,如果沒有,那么就放進(jìn)去;同時,如果之前這個 channel 不在 epoll 里面,那么這次放進(jìn)去了。
def _register_BRPOP(self, channel):
"""Enable BRPOP mode for channel."""
ident = channel, channel.client, 'BRPOP'
if not self._client_registered(channel, channel.client, 'BRPOP'):
channel._in_poll = False
self._register(*ident)
if not channel._in_poll: # send BRPOP
channel._brpop_start()
6.3.2 register in _poll
最終進(jìn)行Poll注冊,這樣當(dāng)redis的socket對應(yīng)的fd有消息,就會進(jìn)行處理。
變量如下:<kombu.utils.eventio._poll object at 0x7feab2d7d780>
def register(self, fd, events):
fd = fileno(fd)
poll_flags = 0
if events & ERR:
poll_flags |= POLLERR
if events & WRITE:
poll_flags |= POLLOUT
if events & READ:
poll_flags |= POLLIN
self._quick_register(fd, poll_flags)
return fd
6.3.3 poll(timeout) in MultiChannelPoller
當(dāng)poll有消息,則相應(yīng)處理。
events = self.poller.poll(timeout)
if events:
for fileno, event in events:
ret = self.handle_event(fileno, event)
if ret:
return
6.3.4 注冊到redis驅(qū)動,負(fù)載均衡
但是,這個 connection 還沒有對 epoll 起效果,所以發(fā)送一個 _brpop_start。
這里可以看到,是對 asynt_queue 發(fā)起了監(jiān)聽請求,也就是說隊列有消息過來,會被響應(yīng)到。
變量如下:
keys = {list: 5} ['asynt_queue', 'asynt_queue\x06\x163', 'asynt_queue\x06\x166', 'asynt_queue\x06\x169', 1]
queues = {list: 1} ['asynt_queue']
代碼如下:
def _brpop_start(self, timeout=1):
queues = self._queue_cycle.consume(len(self.active_queues))
if not queues:
return
keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps
for queue in queues] + [timeout or 0]
self._in_poll = self.client.connection
self.client.connection.send_command('BRPOP', *keys)
此處有一個負(fù)載均衡需要說明:
_queue_cycle屬于均衡策略,就是選擇下一次哪個queue的策略,items就是具體queue列表。比如:
class round_robin_cycle:
"""Iterator that cycles between items in round-robin."""
def __init__(self, it=None):
self.items = it if it is not None else []
def update(self, it):
"""Update items from iterable."""
self.items[:] = it
def consume(self, n):
"""Consume n items."""
return self.items[:n]
_brpop_start就是啟動了下一次讀取,選擇哪一個queue。
consume, scheduling.py:79
_brpop_start, redis.py:725
_register_BRPOP, redis.py:314
on_poll_start, redis.py:328
on_poll_start, redis.py:1072
create_loop, hub.py:294
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:49
<module>, testUb.py:53
6.3.4 handle_event in MultiChannelPoller
因為已經(jīng)把 file 和 poll 聯(lián)系起來,所以對調(diào)用對應(yīng)的響應(yīng)方法,而響應(yīng)方法會進(jìn)行read消息。
def handle_event(self, fileno, event):
if event & READ:
return self.on_readable(fileno), self
elif event & ERR:
chan, type = self._fd_to_chan[fileno]
chan._poll_error(type)
6.3.5 on_readable in MultiChannelPoller
這里聽說 Redis 已經(jīng)準(zhǔn)備好了,所以就來獲取拿到的結(jié)果,然后就解析起來了,解析成功之后,自然要處理這個消息呀,于是乎又回到了這里 redis.py:
提取fd對應(yīng)的channel的響應(yīng)方法如下:
def on_readable(self, fileno):
chan, type = self._fd_to_chan[fileno]
if chan.qos.can_consume():
chan.handlers[type]()
6.3.6 _brpop_read in Channel
前面對chan.handlers已經(jīng)進(jìn)行了注冊。
handlers = {dict: 2}
'BRPOP' = {method} <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7fbad4170f28>>
'LISTEN' = {method} <bound method Channel._receive of <kombu.transport.redis.Channel object at 0x7fbad4170f28>>
因此調(diào)用_brpop_read。
def _brpop_read(self, **options):
try:
try:
dest__item = self.client.parse_response(self.client.connection,
'BRPOP',
**options)
except self.connection_errors:
# if there's a ConnectionError, disconnect so the next
# iteration will reconnect automatically.
self.client.connection.disconnect()
raise
if dest__item:
dest, item = dest__item
dest = bytes_to_str(dest).rsplit(self.sep, 1)[0]
self._queue_cycle.rotate(dest)
self.connection._deliver(loads(bytes_to_str(item)), dest)
return True
else:
raise Empty()
finally:
self._in_poll = None
6.3.7 從redis讀取
這里會從redis驅(qū)動讀取,文件/redis/connection.py,從SocketBuffer讀取。
代碼為:
def readline(self):
buf = self._buffer
buf.seek(self.bytes_read)
data = buf.readline()
while not data.endswith(SYM_CRLF):
# there's more data in the socket that we need
self._read_from_socket()
buf.seek(self.bytes_read)
data = buf.readline()
self.bytes_read += len(data)
# purge the buffer when we've consumed it all so it doesn't
# grow forever
if self.bytes_read == self.bytes_written:
self.purge()
return data[:-2]
當(dāng)讀到 response 之后,調(diào)用 Redis驅(qū)動中對應(yīng)命令的 回調(diào)方法來處理。此處命令為BRPOP。回調(diào)方法為:string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None)。
代碼為:
def parse_response(self, connection, command_name, **options):
"Parses a response from the Redis server"
try:
response = connection.read_response()
except ResponseError:
if EMPTY_RESPONSE in options:
return options[EMPTY_RESPONSE]
raise
if command_name in self.response_callbacks:
return self.response_callbacks[command_name](response, **options)
return response
變量為:
command_name = {str} 'BRPOP'
connection = {Connection} Connection<host=localhost,port=6379,db=0>
options = {dict: 0} {}
self = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
connection = {Connection} Connection<host=localhost,port=6379,db=0>
connection_pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
response_callbacks = {CaseInsensitiveDict: 179} {.
'LPUSH' = {function} <function Redis.<lambda> at 0x7fbad4276ea0>
'RPUSH' = {function} <function Redis.<lambda> at 0x7fbad4276ea0>
'SORT' = {function} <function sort_return_tuples at 0x7fbad4275f28>
'ZSCORE' = {function} <function float_or_none at 0x7fbad4276598>
'ZINCRBY' = {function} <function float_or_none at 0x7fbad4276598>
'BLPOP' = {function} <function Redis.<lambda> at 0x7fbad4276f28>
'BRPOP' = {function} <function Redis.<lambda> at 0x7fbad4276f28>
....
這些代碼堆棧如下:
readline, connection.py:251
read_response, connection.py:324
read_response, connection.py:739
parse_response, client.py:915
_brpop_read, redis.py:738
on_readable, redis.py:358
handle_event, redis.py:362
get, redis.py:380
drain_events, base.py:960
drain_events, connection.py:318
main, testUb.py:50
<module>, testUb.py:53
6.3.8 回到_brpop_read
從Redis驅(qū)動獲得消息后,回到了 _brpop_read,信息如下:
dest__item = {tuple: 2}
0 = {bytes: 11} b'asynt_queue'
1 = {bytes: 321} b'{"body": "aGVsbG8gd29ybGQ=", "content-encoding": "utf-8", "content-type": "text/plain", "headers": {}, "properties": {"delivery_mode": 2, "delivery_info": {"exchange": "asynt_exchange", "routing_key": "asynt_routing_key"}, "priority": 0, "body_encoding":
6.3.9 _deliver in Transport
當(dāng)獲得消息之后,會取出對應(yīng)queue的callback,進(jìn)行調(diào)用。
變量如下:<kombu.transport.redis.Transport object at 0x7feab30f25c0>
def _deliver(self, message, queue):
try:
callback = self._callbacks[queue]
except KeyError:
logger.warning(W_NO_CONSUMERS, queue)
self._reject_inbound_message(message)
else:
callback(message)
6.3.10 basic_consume in Channel
代碼繼續(xù)走到 basic_consume
<kombu.transport.redis.Channel object at 0x7feab3235f28>
def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
"""Consume from `queue`."""
self._tag_to_queue[consumer_tag] = queue
self._active_queues.append(queue)
def _callback(raw_message):
message = self.Message(raw_message, channel=self)
if not no_ack:
self.qos.append(message, message.delivery_tag)
return callback(message)
self.connection._callbacks[queue] = _callback
self._consumers.add(consumer_tag)
self._reset_cycle()
6.3.11 _receive_callback in Consumer
上文的 _callback 就是 _receive_callback in Consumer,所以這時候就調(diào)用過去。
<Consumer: [<Queue asynt -> <Exchange asynt(direct) bound to chan:1> -> asynt bound to chan:1>]>
def _receive_callback(self, message):
accept = self.accept
on_m, channel, decoded = self.on_message, self.channel, None
try:
m2p = getattr(channel, 'message_to_python', None)
if m2p:
message = m2p(message)
if accept is not None:
message.accept = accept
if message.errors:
return message._reraise_error(self.on_decode_error)
decoded = None if on_m else message.decode()
except Exception as exc:
if not self.on_decode_error:
raise
self.on_decode_error(message, exc)
else:
return on_m(message) if on_m else self.receive(decoded, message)
最終調(diào)用用戶方法。
on_message, testUb.py:36
_receive_callback, messaging.py:620
_callback, base.py:630
_deliver, base.py:980
_brpop_read, redis.py:748
on_readable, redis.py:358
handle_event, redis.py:362
get, redis.py:380
drain_events, base.py:960
drain_events, connection.py:318
main, testUb.py:50
<module>, testUb.py:53
具體如下:
+----------+ +---------+ +------------------+ +------+ +---------+ +-----+ +---------+
|Connection| |Transport| |MultiChannelPoller| |_poll | | Channel | |redis| |Consumer |
+----+-----+ +------+--+ +------------+-----+ +----+-+ +-----+---+ +--+--+ +---+-----+
| | | | | | |
+ | | | | | |
drain_events | | | | | |
+ + | | | | |
+-------> drain_events | | | | |
| + + | | | |
| | +------------> get | | | |
| | + | | | |
| | + | | | |
| | _register_BRPOP | | | |
| | + + | | |
| | | +-----------> register | | |
| | | + | | |
| | + | | | |
| | poll | | | |
| | + | | | |
| | + | | | |
| | handle_event | | | |
| | + | | | |
| | + | | | |
| | on_readable | | | |
| | + | + | |
| | | +----------------->_brpop_read | |
| | | | + | |
| + | | +---------> | |
| _deliver <-------------------------------------+ | |
| + | | | | |
| | | | | | |
| | | | | | |
| | +----------------------------------> basic|consume | |
| | | | | | |
| | | | +---------> | |
| | | | | | |
| | | | | | |
| | | | | v |
| | | | | |
| | | | | _receive_ca|lback
| | | | | |
v v v v v |
v
從上圖可以看出模塊的用途。
手機(jī)上如圖

至此,我們已經(jīng)完成了 Consumer 的分析,下文我們進(jìn)行 Producer 的分析。
0xFF 參考
celery 7 優(yōu)秀開源項目kombu源碼分析之registry和entrypoint
浙公網(wǎng)安備 33010602011771號