Rabbit MQ
AMQP簡介
介紹rabbitMQ之前。先介紹一下AMQP協議,因為rabbitMQ是基于AMQP協議實現的一個服務程序。(目前為止應該也是唯一實現了AMQP協議的服務)
AMQP定義
AMQP(高級消息隊列協議)是一個網絡協議。它支持符合要求的客戶端應用(application)和消息中間件代理(messaging middleware brockers)之間進行通信。
消息代理和他們所扮演的角色
消息代理(message brokers)從發布者(publishers)亦稱生產者(producers)那兒接收消息,并根據既定的路由規則把接收到的消息發送給處理消息的消費者(consumers)。
由于AMQP是一個網絡協議,所以這個過程中的發布者,消費者,消息代理 可以存在于不同的設備上。
AMQP 0-9-1 模型簡介
AMQP 0-9-1的工作過程如下圖:消息(message)被發布者(publisher)發送給交換機(exchange),交換機常常被比喻成郵局或者郵箱。然后交換機將收到的消息根據路由規則分發給綁定的隊列(queue)。最后AMQP代理會將消息投遞給訂閱了此隊列的消費者,或者消費者按照需求自行獲取。

發布者(publisher)發布消息時可以給消息指定各種消息屬性(message meta-data)。有些屬性有可能會被消息代理(brokers)使用,然而其他的屬性則是完全不透明的,它們只能被接收消息的應用所使用。
從安全角度考慮,網絡是不可靠的,接收消息的應用也有可能在處理消息的時候失敗。基于此原因,AMQP模塊包含了一個消息確認(message acknowledgements)的概念:當一個消息從隊列中投遞給消費者后(consumer),消費者會通知一下消息代理(broker),這個可以是自動的也可以由處理消息的應用的開發者執行。當“消息確認”被啟用的時候,消息代理不會完全將消息從隊列中刪除,直到它收到來自消費者的確認回執(acknowledgement)。
在某些情況下,例如當一個消息無法被成功路由時,消息或許會被返回給發布者并被丟棄。或者,如果消息代理執行了延期操作,消息會被放入一個所謂的死信隊列中。此時,消息發布者可以選擇某些參數來處理這些特殊情況。
隊列,交換機和綁定統稱為AMQP實體(AMQP entities)。
AMQP是一個可編的程協議
AMQP 0-9-1是一個可編程協議,某種意義上說AMQP的實體和路由規則是由應用本身定義的,而不是由消息代理定義。包括像聲明隊列和交換機,定義他們之間的綁定,訂閱隊列等等關于協議本身的操作。
這雖然能讓開發人員自由發揮,但也需要他們注意潛在的定義沖突。當然這在實踐中很少會發生,如果發生,會以配置錯誤(misconfiguration)的形式表現出來。
應用程序(Applications)聲明AMQP實體,定義需要的路由方案,或者刪除不再需要的AMQP實體。
交換機和交換機類型
交換機是用來發送消息的AMQP實體。交換機拿到一個消息之后將它路由給一個或零個隊列。它使用哪種路由算法是由交換機類型和被稱作綁定(bindings)的規則所決定的。AMQP 0-9-1的代理提供了四種交換機
| Name(交換機類型) | Default pre-declared names(預聲明的默認名稱) |
|---|---|
| Direct exchange(直連交換機) | (Empty string) and amq.direct |
| Fanout exchange(扇型交換機) | amq.fanout |
| Topic exchange(主題交換機) | amq.topic |
| Headers exchange(頭交換機) | amq.match (and amq.headers in RabbitMQ) |
除交換機類型外,在聲明交換機時還可以附帶許多其他的屬性,其中最重要的幾個分別是:
- Name (交換機的名字)
- Type (交換機的種類)
- Passive (被動模式,布爾值)
- Durable (消息代理重啟后,交換機是否還存在)
- Auto_delete (當所有與之綁定的消息隊列都完成了對此交換機的使用后,刪掉它)
- Arguments(依賴代理本身)
交換機可以有兩個狀態:持久(durable)和暫存(transient)。持久化的交換機會在消息代理(broker)重啟后依舊存在,而暫存的交換機則不會(它們需要在代理再次上線后重新被聲明)。然而并不是所有的應用場景都需要持久化的交換機。
默認交換機
默認交換機(default exchange)實際上是一個由消息代理預先聲明好的沒有名字(名字為空字符串)的直連交換機(direct exchange)。它有一個特殊的屬性使得它對于簡單應用特別有用處:那就是每個新建隊列(queue)都會自動綁定到默認交換機上,綁定的路由鍵(routing key)名稱與隊列名稱相同。
舉個栗子:當你聲明了一個名為"search-indexing-online"的隊列,AMQP代理會自動將其綁定到默認交換機上,綁定(binding)的路由鍵名稱也是"search-indexing-online"。因此,當攜帶著名為"search-indexing-online"的路由鍵的消息被發送到默認交換機的時候,此消息會被默認交換機路由至名為"search-indexing-online"的隊列中。換句話說,默認交換機貌似能夠直接將消息投遞給隊列,盡管技術上并沒有做相關的操作。
直連交換機
直連型交換機(direct exchange)是根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列的。直連交換機用來處理消息的單播路由(unicast routing)(盡管它也可以處理多播路由)。下邊介紹它是如何工作的:
- 將一個隊列綁定到某個交換機上,同時賦予該綁定一個路由鍵(routing key)
- 當一個攜帶著路由鍵為
R的消息被發送給直連交換機時,交換機會把它路由給綁定值同樣為R的隊列。
直連交換機經常用來循環分發任務給多個工作者(workers)。當這樣做的時候,我們需要明白一點,在AMQP 0-9-1中,消息的負載均衡是發生在消費者(consumer)之間的,而不是隊列(queue)之間。
直連型交換機圖例:

扇型交換機
扇型交換機(funout exchange)將消息路由給綁定到它身上的所有隊列,而不理會綁定的路由鍵。如果N個隊列綁定到某個扇型交換機上,當有消息發送給此扇型交換機時,交換機會將消息的拷貝分別發送給這所有的N個隊列。扇型用來交換機處理消息的廣播路由(broadcast routing)。
因為扇型交換機投遞消息的拷貝到所有綁定到它的隊列,所以他的應用案例都極其相似:
- 大規模多用戶在線(MMO)游戲可以使用它來處理排行榜更新等全局事件
- 體育新聞網站可以用它來近乎實時地將比分更新分發給移動客戶端
- 分發系統使用它來廣播各種狀態和配置更新
- 在群聊的時候,它被用來分發消息給參與群聊的用戶。(AMQP沒有內置presence的概念,因此XMPP可能會是個更好的選擇)
扇型交換機圖例:

主題交換機
主題交換機(topic exchanges)通過對消息的路由鍵和隊列到交換機的綁定模式之間的匹配,將消息路由給一個或多個隊列。主題交換機經常用來實現各種分發/訂閱模式及其變種。主題交換機通常用來實現消息的多播路由(multicast routing)。
主題交換機擁有非常廣泛的用戶案例。無論何時,當一個問題涉及到那些想要有針對性的選擇需要接收消息的 多消費者/多應用(multiple consumers/applications) 的時候,主題交換機都可以被列入考慮范圍。
使用案例:
- 分發有關于特定地理位置的數據,例如銷售點
- 由多個工作者(workers)完成的后臺任務,每個工作者負責處理某些特定的任務
- 股票價格更新(以及其他類型的金融數據更新)
- 涉及到分類或者標簽的新聞更新(例如,針對特定的運動項目或者隊伍)
- 云端的不同種類服務的協調
- 分布式架構/基于系統的軟件封裝,其中每個構建者僅能處理一個特定的架構或者系統。
頭交換機
有時消息的路由操作會涉及到多個屬性,此時使用消息頭就比用路由鍵更容易表達,頭交換機(headers exchange)就是為此而生的。頭交換機使用多個消息屬性來代替路由鍵建立路由規則。通過判斷消息頭的值能否與指定的綁定相匹配來確立路由規則。
我們可以綁定一個隊列到頭交換機上,并給他們之間的綁定使用多個用于匹配的頭(header)。這個案例中,消息代理得從應用開發者那兒取到更多一段信息,換句話說,它需要考慮某條消息(message)是需要部分匹配還是全部匹配。上邊說的“更多一段消息”就是"x-match"參數。當"x-match"設置為“any”時,消息頭的任意一個值被匹配就可以滿足條件,而當"x-match"設置為“all”的時候,就需要消息頭的所有值都匹配成功。
頭交換機可以視為直連交換機的另一種表現形式。頭交換機能夠像直連交換機一樣工作,不同之處在于頭交換機的路由規則是建立在頭屬性值之上,而不是路由鍵。路由鍵必須是一個字符串,而頭屬性值則沒有這個約束,它們甚至可以是整數或者哈希值(字典)等。
隊列
AMQP中的隊列(queue)跟其他消息隊列或任務隊列中的隊列是很相似的:它們存儲著即將被應用消費掉的消息。隊列跟交換機共享某些屬性,但是隊列也有一些另外的屬性。
- Name
- Durable(消息代理重啟后,隊列依舊存在)
- Exclusive(只被一個連接(connection)使用,而且當連接關閉后隊列即被刪除)
- Auto_delete(當最后一個消費者退訂后即被刪除)
- Arguments(一些消息代理用他來完成類似與TTL的某些額外功能)
隊列在聲明(declare)后才能被使用。如果一個隊列尚不存在,聲明一個隊列會創建它。如果聲明的隊列已經存在,并且屬性完全相同,那么此次聲明不會對原有隊列產生任何影響。如果聲明中的屬性與已存在隊列的屬性有差異,那么一個錯誤代碼為406的通道級異常就會被拋出。
隊列名稱
隊列的名字可以由應用(application)來取,也可以讓消息代理(broker)直接生成一個。隊列的名字可以是最多255字節的一個utf-8字符串。若希望AMQP消息代理生成隊列名,需要給隊列的name參數賦值一個空字符串:在同一個通道(channel)的后續的方法(method)中,我們可以使用空字符串來表示之前生成的隊列名稱。之所以之后的方法可以獲取正確的隊列名是因為通道可以默默地記住消息代理最后一次生成的隊列名稱。
以"amq."開始的隊列名稱被預留做消息代理內部使用。如果試圖在隊列聲明時打破這一規則的話,一個通道級的403 (ACCESS_REFUSED)錯誤會被拋出。
隊列持久化
持久化隊列(Durable queues)會被存儲在磁盤上,當消息代理(broker)重啟的時候,它依舊存在。沒有被持久化的隊列稱作暫存隊列(Transient queues)。并不是所有的場景和案例都需要將隊列持久化。
持久化的隊列并不會使得路由到它的消息也具有持久性。倘若消息代理掛掉了,重新啟動,那么在重啟的過程中持久化隊列會被重新聲明,無論怎樣,只有經過持久化的消息才能被重新恢復。
綁定
綁定(Binding)是交換機(exchange)將消息(message)路由給隊列(queue)所需遵循的規則。如果要指示交換機“E”將消息路由給隊列“Q”,那么“Q”就需要與“E”進行綁定。綁定操作需要定義一個可選的路由鍵(routing key)屬性給某些類型的交換機。路由鍵的意義在于從發送給交換機的眾多消息中選擇出某些消息,將其路由給綁定的隊列。
打個比方:
- 隊列(queue)是我們想要去的位于紐約的目的地
- 交換機(exchange)是JFK機場
- 綁定(binding)就是JFK機場到目的地的路線。能夠到達目的地的路線可以是一條或者多條
擁有了交換機這個中間層,很多由發布者直接到隊列難以實現的路由方案能夠得以實現,并且避免了應用開發者的許多重復勞動。
如果AMQP的消息無法路由到隊列(例如,發送到的交換機沒有綁定隊列),消息會被就地銷毀或者返還給發布者。如何處理取決于發布者設置的消息屬性。
消費者
消息如果只是存儲在隊列里是沒有任何用處的。被應用消費掉,消息的價值才能夠體現。在AMQP 0-9-1 模型中,有兩種途徑可以達到此目的:
- 將消息投遞給應用 ("push API")
- 應用根據需要主動獲取消息 ("pull API")
使用push API,應用(application)需要明確表示出它在某個特定隊列里所感興趣的,想要消費的消息。如是,我們可以說應用注冊了一個消費者,或者說訂閱了一個隊列。一個隊列可以注冊多個消費者,也可以注冊一個獨享的消費者(當獨享消費者存在時,其他消費者即被排除在外)。
每個消費者(訂閱者)都有一個叫做消費者標簽的標識符。它可以被用來退訂消息。消費者標簽實際上是一個字符串。
消息確認
消費者應用(Consumer applications) - 用來接受和處理消息的應用 - 在處理消息的時候偶爾會失敗或者有時會直接崩潰掉。而且網絡原因也有可能引起各種問題。這就給我們出了個難題,AMQP代理在什么時候刪除消息才是正確的?AMQP 0-9-1 規范給我們兩種建議:
- 當消息代理(broker)將消息發送給應用后立即刪除。(使用AMQP方法:basic.deliver或basic.get-ok)
- 待應用(application)發送一個確認回執(acknowledgement)后再刪除消息。(使用AMQP方法:basic.ack)
前者被稱作自動確認模式(automatic acknowledgement model),后者被稱作顯式確認模式(explicit acknowledgement model)。在顯式模式下,由消費者應用來選擇什么時候發送確認回執(acknowledgement)。應用可以在收到消息后立即發送,或將未處理的消息存儲后發送,或等到消息被處理完畢后再發送確認回執(例如,成功獲取一個網頁內容并將其存儲之后)。
如果一個消費者在尚未發送確認回執的情況下掛掉了,那AMQP代理會將消息重新投遞給另一個消費者。如果當時沒有可用的消費者了,消息代理會死等下一個注冊到此隊列的消費者,然后再次嘗試投遞。
拒絕消息
當一個消費者接收到某條消息后,處理過程有可能成功,有可能失敗。應用可以向消息代理表明,本條消息由于“拒絕消息(Rejecting Messages)”的原因處理失敗了(或者未能在此時完成)。當拒絕某條消息時,應用可以告訴消息代理如何處理這條消息——銷毀它或者重新放入隊列。當此隊列只有一個消費者時,請確認不要由于拒絕消息并且選擇了重新放入隊列的行為而引起消息在同一個消費者身上無限循環的情況發生。
Negative Acknowledgements
在AMQP中,basic.reject方法用來執行拒絕消息的操作。但basic.reject有個限制:你不能使用它決絕多個帶有確認回執(acknowledgements)的消息。但是如果你使用的是RabbitMQ,那么你可以使用被稱作negative acknowledgements(也叫nacks)的AMQP 0-9-1擴展來解決這個問題。更多的信息請參考幫助頁面
預取消息
在多個消費者共享一個隊列的案例中,明確指定在收到下一個確認回執前每個消費者一次可以接受多少條消息是非常有用的。這可以在試圖批量發布消息的時候起到簡單的負載均衡和提高消息吞吐量的作用。For example, if a producing application sends messages every minute because of the nature of the work it is doing.(???例如,如果生產應用每分鐘才發送一條消息,這說明處理工作尚在運行。)
注意,RabbitMQ只支持通道級的預取計數,而不是連接級的或者基于大小的預取。
消息屬性和有效載荷(消息主體)
AMQP模型中的消息(Message)對象是帶有屬性(Attributes)的。有些屬性及其常見,以至于AMQP 0-9-1 明確的定義了它們,并且應用開發者們無需費心思思考這些屬性名字所代表的具體含義。例如:
- Content type(內容類型)
- Content encoding(內容編碼)
- Routing key(路由鍵)
- Delivery mode (persistent or not)
投遞模式(持久化 或 非持久化) - Message priority(消息優先權)
- Message publishing timestamp(消息發布的時間戳)
- Expiration period(消息有效期)
- Publisher application id(發布應用的ID)
有些屬性是被AMQP代理所使用的,但是大多數是開放給接收它們的應用解釋器用的。有些屬性是可選的也被稱作消息頭(headers)。他們跟HTTP協議的X-Headers很相似。消息屬性需要在消息被發布的時候定義。
AMQP的消息除屬性外,也含有一個有效載荷 - Payload(消息實際攜帶的數據),它被AMQP代理當作不透明的字節數組來對待。消息代理不會檢查或者修改有效載荷。消息可以只包含屬性而不攜帶有效載荷。它通常會使用類似JSON這種序列化的格式數據,為了節省,協議緩沖器和MessagePack將結構化數據序列化,以便以消息的有效載荷的形式發布。AMQP及其同行者們通常使用"content-type" 和 "content-encoding" 這兩個字段來與消息溝通進行有效載荷的辨識工作,但這僅僅是基于約定而已。
消息能夠以持久化的方式發布,AMQP代理會將此消息存儲在磁盤上。如果服務器重啟,系統會確認收到的持久化消息未丟失。簡單地將消息發送給一個持久化的交換機或者路由給一個持久化的隊列,并不會使得此消息具有持久化性質:它完全取決與消息本身的持久模式(persistence mode)。將消息以持久化方式發布時,會對性能造成一定的影響(就像數據庫操作一樣,健壯性的存在必定造成一些性能犧牲)。
消息確認
由于網絡的不確定性和應用失敗的可能性,處理確認回執(acknowledgement)就變的十分重要。有時我們確認消費者收到消息就可以了,有時確認回執意味著消息已被驗證并且處理完畢,例如對某些數據已經驗證完畢并且進行了數據存儲或者索引操作。
這種情形很常見,所以 AMQP 0-9-1 內置了一個功能叫做 消息確認(message acknowledgements),消費者用它來確認消息已經被接收或者處理。如果一個應用崩潰掉(此時連接會斷掉,所以AMQP代理亦會得知),而且消息的確認回執功能已經被開啟,但是消息代理尚未獲得確認回執,那么消息會被從新放入隊列(并且在還有還有其他消費者存在于此隊列的前提下,立即投遞給另外一個消費者)。
協議內置的消息確認功能將幫助開發者建立強大的軟件。
AMQP 0-9-1 方法
AMQP 0-9-1由許多方法(methods)構成。方法即是操作,這跟面向對象編程中的方法沒半毛錢關系。AMQP的方法被分組在類(class)中。這里的類僅僅是對AMQP方法的邏輯分組而已。在 AMQP 0-9-1參考中有對AMQP方法的詳細介紹。
讓我們來看看交換機類,有一組方法被關聯到了交換機的操作上。這些方法如下所示:
- exchange.declare
- exchange.declare-ok
- exchange.delete
- exchange.delete-ok
(請注意,RabbitMQ網站參考中包含了特用于RabbitMQ的交換機類的擴展,這里我們不對其進行討論)
以上的操作來自邏輯上的配對:exchange.declare 和 exchange.declare-ok,exchange.delete 和 exchange.delete-ok. 這些操作分為“請求 - requests”(由客戶端發送)和“響應 - responses”(由代理發送,用來回應之前提到的“請求”操作)。
如下的例子:客戶端要求消息代理使用exchange.declare方法聲明一個新的交換機:

如上圖所示,exchange.declare方法攜帶了好幾個參數。這些參數可以允許客戶端指定交換機名稱、類型、是否持久化等等。
操作成功后,消息代理使用exchange.declare-ok方法進行回應:

exchange.declare-ok方法除了通道號之外沒有攜帶任何其他參數(通道-channel 會在本指南稍后章節進行介紹)。
AMQP隊列類的配對方法 - queue.declare方法 和 queue.declare-ok有著與其他配對方法非常相似的一系列事件:


不是所有的AMQP方法都有與其配對的“另一半”。許多(basic.publish是最被廣泛使用的)都沒有相對應的“響應”方法,另外一些(如basic.get)有著一種以上與之對應的“響應”方法。
連接
AMQP連接通常是長連接。AMQP是一個使用TCP提供可靠投遞的應用層協議。AMQP使用認證機制并且提供TLS(SSL)保護。當一個應用不再需要連接到AMQP代理的時候,需要優雅的釋放掉AMQP連接,而不是直接將TCP連接關閉。
通道
有些應用需要與AMQP代理建立多個連接。無論怎樣,同時開啟多個TCP連接都是不合適的,因為這樣做會消耗掉過多的系統資源并且使得防火墻的配置更加困難。AMQP 0-9-1提供了通道(channels)來處理多連接,可以把通道理解成共享一個TCP連接的多個輕量化連接。
在涉及多線程/進程的應用中,為每個線程/進程開啟一個通道(channel)是很常見的,并且這些通道不能被線程/進程共享。
一個特定通道上的通訊與其他通道上的通訊是完全隔離的,因此每個AMQP方法都需要攜帶一個通道號,這樣客戶端就可以指定此方法是為哪個通道準備的。
虛擬主機
為了在一個單獨的代理上實現多個隔離的環境(用戶、用戶組、交換機、隊列 等),AMQP提供了一個虛擬主機(virtual hosts - vhosts)的概念。這跟Web servers虛擬主機概念非常相似,這為AMQP實體提供了完全隔離的環境。當連接被建立的時候,AMQP客戶端來指定使用哪個虛擬主機。
AMQP是可擴展的
AMQP 0-9-1 擁有多個擴展點:
- 定制化交換機類型 可以讓開發者們實現一些開箱即用的交換機類型尚未很好覆蓋的路由方案。例如 geodata-based routing。
- 交換機和隊列的聲明中可以包含一些消息代理能夠用到的額外屬性。例如RabbitMQ中的per-queue message TTL即是使用該方式實現。
- 特定消息代理的協議擴展。例如RabbitMQ所實現的擴展。
- 新的 AMQP 0-9-1 方法類可被引入。
- 消息代理可以被其他的插件擴展,例如RabbitMQ的管理前端 和 已經被插件化的HTTP API。
這些特性使得AMQP 0-9-1模型更加靈活,并且能夠適用于解決更加寬泛的問題。
AMQP 0-9-1 客戶端生態系統
AMQP 0-9-1 擁有眾多的適用于各種流行語言和框架的客戶端。其中一部分嚴格遵循AMQP規范,提供AMQP方法的實現。另一部分提供了額外的技術,方便使用的方法和抽象。有些客戶端是異步的(非阻塞的),有些是同步的(阻塞的),有些將這兩者同時實現。有些客戶端支持“供應商的特定擴展”(例如RabbitMQ的特定擴展)。
因為AMQP的主要目標之一就是實現交互性,所以對于開發者來講,了解協議的操作方法而不是只停留在弄懂特定客戶端的庫就顯得十分重要。這樣一來,開發者使用不同類型的庫與協議進行溝通時就會容易的多。
RabbitMQ
安裝
- 使用
sudo apt-get install rabbitmq-server安裝rabbitmq,之后開啟rabbitmq的web管理頁面 - 使用
sudo /usr/sbin/rabbitmq-plugins enable rabbitmq_management開啟rabbitmq的web管理頁面插件 - 使用
sudo /etc/init.d/rabbitmq-server start啟動rabbitmq服務 - 打開瀏覽器訪問服務器的web頁面:http://ipaddr:15672,默認登錄賬號/密碼:guest/guest(為安全起見,建議生產環境不使用該賬號)
- rabbitmq的連接端口默認是5672端口,可以使用
sudo netstat -ntlp查看,如果開啟了web管理頁面的話,應該能看到5672,15672,25672幾個端口被打開
實現最簡單的隊列通信
在實現通訊之前需要注意幾個點,當我們要使用遠程來連接rabbitMQ的時候需要增加一個用戶并設置權限:
在rabbitMQ的主機終端執行以下命令:
# 啟動rabbitMQ服務端<br>sudo /etc.init.d/rabbitmq-server start<br># 創建一個用戶sudo rabbitmqctl add_user gyc 123123# 設置用戶為administrator角色dudo rabbitmqctl set_user_tags gyc administrator# 設置權限sudo rabbitmqctl set_permissions -p "/" gyc '.''.''.'# 然后重啟rabbiMQ服務sudo /etc/init.d/rabbitmq-server restart# 然后可以使用剛才的用戶遠程連接rabbitmq server了。

應用
python連接rabbitmq需要安裝一個包,pika,安裝命令:pip3 install pika。
例1:direct交換機demo 單播
rabbit_direct_publish.py
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
import pika # 鏈接rabbitMQimport sys# 建立TCP鏈接credentials = pika.PlainCredentials("gyc", "123123")connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.14.63", credentials=credentials))# 建立一個通道channel = connection.channel()# 聲明交換機名稱和類型channel.exchange_declare(exchange="direct_logs", type="direct")serverity = sys.argv[1] if len(sys.argv) > 1 else "info" # 獲取routing_keymessage = " ".join(sys.argv[2:]) or "info: Hello World" # 獲取消息內容# RabbitMQ消息不能直接發送到隊列,它需要經歷一個交換的過程。channel.basic_publish( exchange="direct_logs", # 指定消息發送到的交換機名稱 routing_key=serverity, # 指定消息的routing_key body=message # 消息體)print("[x] Send 'Hello World!'")connection.close() |
server端
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
import pikaimport sysprint(sys.argv)credentials = pika.PlainCredentials("gyc", "123123")connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.14.63", credentials=credentials))channel = connection.channel()channel.exchange_declare(exchange="direct_logs", type="direct")result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會再使用此queue的消費者# 斷開后,自動將queue刪除queue_name = result.method.queue # 取rabbitmq-server返回的queue名稱serverities = sys.argv[1:] # 獲取參數if not serverities: # 如果沒有參數會拋出異常,提示應該填寫什么參數 sys.stderr.write("Usage: %s[info] [warning] [error]\n" % sys.argv[0]) sys.exit(1)for serverity in serverities: # 將該隊列通過不同的routing_key綁定到交換機direct_logs上 channel.queue_bind( exchange="direct_logs", queue=queue_name, routing_key=serverity )print("[*] Waiting for logs. To exit press CTRL+C")def callback(ch, method, properties, body): # 定義回調函數 print("[x] %r:%r" % (method.routing_key, body))channel.basic_consume( callback, queue=queue_name,)channel.start_consuming() # 啟動偵聽消息隊列<br><br># 注:sys參數需要到終端上執行該py文件填寫 |
例子2:扇形交換機demo:廣播
rabbit_fanout_publish.py
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import pikaimport syscredentials = pika.PlainCredentials("gyc", "123123")connection = pika.BlockingConnection(pika.ConnectionParameters( host="192.168.14.63", credentials=credentials))channel = connection.channel()channel.exchange_declare(exchange="logs", # 聲明交換機logs 類型為fanout type="fanout")message = "".join(sys.argv[1:]) or "info: Hello World!"channel.basic_publish( exchange="logs", # 指定消息發送到交換機的名稱 routing_key="", # fanout類型為廣播,所以不需要指定routing_key,所有連接到該交換機的消息隊列都將能收到發過來消息 body=message)print(" [x] Send %r" % message)connection.close() |
rabbit_fanout_consumer.py
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
import pikacredentials = pika.PlainCredentials("gyc", "3778627")connection = pika.BlockingConnection(pika.ConnectionParameters( host="192.168.14.63", credentials=credentials))channel = connection.channel()channel.exchange_declare(exchange="logs", type="fanout")result = channel.queue_declare(exclusive=True)queue_name = result.method.queuechannel.queue_bind(exchange="logs", queue=queue_name)print("[*] Waiting for logs. To exit press CTRL+C")def callback(ch, method, properties, body): print("[x] %r" % body)channel.basic_consume( callback, queue=queue_name, no_ack=True)channel.start_consuming() |
例子3:topic交換機demo:頭交換機
rabbit_topic_publish.py
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
import pikaimport syscredentials = pika.PlainCredentials("gyc", "123123")connection = pika.BlockingConnection(pika.ConnectionParameters( host="192.168.14.63", credentials=credentials))channel = connection.channel()channel.exchange_declare(exchange="topic_logs", type="topic")routing_key = sys.argv[1] if len(sys.argv) > 1 else "anonymous.info"message = "".join(sys.argv[2:]) or "Hello World!"channel.basic_publish( exchange="topic_logs", routing_key=routing_key, body=message)print(" [x] Send %r:%r" % (routing_key, message))connection.close() |
rabbit_topic_consumer.py
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
import pikaimport syscredentials = pika.PlainCredentials("gyc", "3778627")connection = pika.BlockingConnection(pika.ConnectionParameters( host="192.168.14.63", credentials=credentials))channel = connection.channel()channel.exchange_declare(exchange="topic_logs", type="topic")result = channel.queue_declare(exclusive=True)queue_name = result.method.queuebinding_keys = sys.argv[1:]if not binding_keys: sys.stderr.write("Usage: %s [binding_keys]\n" % sys.argv[0]) sys.exit()for binding_key in binding_keys: channel.queue_bind(exchange="topic_logs", queue=queue_name, routing_key=binding_key)print("[*] Waiting for logs. To exit press CTRL+C")def callback(ch, method, properties, body): print("[x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback, queue=queue_name)channel.start_consuming()# 注:binding_keys就是你要設置的消息的鍵,需要在publish側的routing_key能匹配到 |
例子4:rpc調用demo:
rpc_client.py
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
import pikaimport uuidclass FibonacciRpcClient: def __init__(self): self.credentials = pika.PlainCredentials("gyc", "123123") self.connection = pika.BlockingConnection(pika.ConnectionParameters( host="192.168.14.63", credentials=self.credentials )) # 初始化, 建立TCP連接,前面加self是因為我們要在其他地方進行調用 self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) # 綁定回調函數和消息隊列 def on_response(self, ch, method, props, body): # 定義回調函數 if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) # 獲取UUID, 隨機唯一值,用來標識返回的數據是客戶端所需要的 self.channel.basic_publish(exchange="", # 以routing_key為rpc_queue,在消息頭中定義reply_to告訴server消息回給client生成的消息隊列,并且消息的correlation_id為client自己生成的uuid,server回消息時也將會帶上這個uuid routing_key="rpc_queue", properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), # 告訴server端回消息的隊列,返回的時候帶上這個uuid body=str(n)) # 消息體 while self.response is None: # 循環檢測self.response是否有返回值 self.connection.process_data_events() '''Will make sure that data events are processed. Dispatches timer and channel callbacks if not called from the scope of BlockingConnection or BlockingChannel callback. Your app can block on this method. :param float time_limit:suggested upper bound on processing time in seconds. The actual blocking time depends on the granularity of the underlying ioloop. Zero means return as soon as possible. None means there is no limit on processing time and the function will block until I/O produces actionalable events. Defaults to 0 for backward compatibility. This parameter is NEW in pika 0.10.0. 該方法可以傳遞參數time_limit=0,默認為0,即為不阻塞的檢測channel是否有消息回來,如果 有消息接收到,則執行回調函數,當time_limit不為0時,將每次檢測阻塞time_limit秒。該方法 在pika0.10.0中新加。 ''' return int(self.response)fibonacci_rpc = FibonacciRpcClient()print("[x] Requesting fib(30)")response = fibonacci_rpc.call(30)print("[.] Got %r" % response) |
rpc_server.py
import pikacredentials = pika.PlainCredentials("gyc", "123123")connection = pika.BlockingConnection(pika.ConnectionParameters( host="192.168.14.63", credentials=credentials))channel = connection.channel()channel.queue_declare(queue="rpc_queue") # 聲明queue名稱為rpc_queuedef fib(n): # 斐波那契計算函數 if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2)def on_request(ch, method, props, body): n = int(body) print("[.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange="", routing_key=props.reply_to, properties=pika.BasicProperties( correlation_id=props.correlation_id ), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) # 給rabbitmq_server發送ack確認消息channel.basic_qos(prefetch_count=1)channel.basic_consume(on_request, queue="rpc_queue")print("[x] Waiting RPC requests")channel.start_consuming()轉自 http://www.rzrgm.cn/sxzwj/p/6422870.html
Buy me a coffee. ?Get red packets.
浙公網安備 33010602011771號