消息隊列-RabbitMQ
一:RabbitMQ
組件原理圖:
生產(chǎn)者發(fā)送消息到 broker server(RabbitMQ), 在 Broker 內(nèi)部,用戶創(chuàng)建 Exchange/Queue,通過 Binding 規(guī)則將兩者聯(lián)系在一起,Exchange 分發(fā)消息, 根據(jù)類型/binding 的不同分發(fā)策略有區(qū)別,消息最后來到 Queue 中,等待消費者取走。

組件介紹:
Broker: 接收和分發(fā)消息的應(yīng)用,RabbitMQ Server 就是 Message Broker Virtual host: 出于多租戶和安全因素設(shè)計的,把 AMQP 的基本組件劃分到一個虛擬的分組中,類似于網(wǎng)絡(luò)中的 namespace 概念,當(dāng)多個不同的用戶使用同一個RabbitMQ server 提供的服務(wù)時,可以劃分出多個 vhost,每個用戶在自己的 vhost創(chuàng)建 exchange/queue 等。 Connection: publisher/consumer 和 broker 之間的 TCP 連接。 Channel: 如果每一次訪問 RabbitMQ 都建立一個 Connection,在消息量大的時候建立 TCP Connection 的開銷將是巨大的,效率也較低。Channel 是在 connection內(nèi)部建立的邏輯連接,如果應(yīng)用程序支持多線程,通常每個 thread 創(chuàng)建單獨的channel 進行通訊,AMQP method 包含了 channel id 幫助客戶端和 message broker識別 channel,
所以 channel 之間是完全隔離的。Channel 作為輕量級的 Connection極大減少了操作系統(tǒng)建立 TCP connection 的開銷。 Exchange: message 到達 broker 的第一站,根據(jù)分發(fā)規(guī)則,匹配查詢表中的 routing key,分發(fā)消息到 queue 中去。常用的類型有:direct (point-to-point), topic (publish subscribe) and fanout (multicast)。 Queue: 消息最終被送到這里等待 consumer 取走。 Binding: exchange 和 queue 之間的虛擬連接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查詢表中,用于 message 的分發(fā)依據(jù)。
https://www.rabbitmq.com/download.html #官網(wǎng)下載地址
https://github.com/rabbitmq/rabbitmq-server/releases #github 下載地址
二、單機部署
安裝包:rabbitmq-server_3.7.22-1_all.deb
要求:
添加主機解析:hostname,運行之后不能修改域名

快速安裝:https://www.rabbitmq.com/install-debian.html
2.1:登陸 web 管理界面
默認(rèn)不允許:

允許方式:
#vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.3/ebin/rabbit.app
39 {loopback_users, []}, #刪除被禁止登陸的 guest 賬戶 # systemctl restart rabbitmq-server.service #重啟 rabbitmq 服務(wù)
三、RabbitMQ 集群部署
普通模式:創(chuàng)建好 RabbitMQ 之后的默認(rèn)模式。
(HA)鏡像模式:把需要的隊列做成鏡像隊列。
消息實體會主動在鏡像節(jié)點間同步,而不是在 consumer 取數(shù)據(jù)時臨時拉取。該模式帶來的副作用也很明顯,除了降低系統(tǒng)性能外,如果鏡像隊列數(shù)量過多,加之大量的消息進入,集群內(nèi)部的網(wǎng)絡(luò)帶寬將會被這種同步通訊大大消耗掉。
3.1 修改hosts主機名
192.168.134.192 rabbitmq-server-1 192.168.134.193 rabbitmq-server-2 192.168.134.194 rabbitmq-server-3
3.2 安裝rabbitmq
sudo apt-get update -y ## Install prerequisites sudo apt-get install curl gnupg -y ## Install RabbitMQ signing key curl -fsSL https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo apt-key add - ## Install apt HTTPS transport sudo apt-get install apt-transport-https ## Add Bintray repositories that provision latest RabbitMQ and Erlang 21.x releases sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list <<EOF ## Installs the latest Erlang 22.x release. ## Change component to "erlang-21.x" to install the latest 21.x version. ## "bionic" as distribution name should work for any later Ubuntu or Debian release. ## See the release to distribution mapping table in RabbitMQ doc guides to learn more. deb https://dl.bintray.com/rabbitmq-erlang/debian bionic erlang deb https://dl.bintray.com/rabbitmq/debian bionic main EOF ## Update package indices sudo apt-get update -y ## Install rabbitmq-server and its dependencies sudo apt-get install rabbitmq-server -y --fix-missing
3.3:同步cookie文件
https://www.rabbitmq.com/clustering.html
scp /var/lib/rabbitmq/.erlang.cookie 192.168.134.193:/var/lib/rabbitmq/.erlang.cookie scp /var/lib/rabbitmq/.erlang.cookie 192.168.134.194:/var/lib/rabbitmq/.erlang.cookie
3.4:開啟插件管理
# vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.22/ebin/rabbit.app 39 {loopback_users, []}, #刪除被禁止登陸的 guest 賬戶 # systemctl restart rabbitmq-server.service #重啟 rabbitmq 服務(wù)
集群中有兩種節(jié)點類型:
內(nèi)存節(jié)點:只將數(shù)據(jù)保存到內(nèi)存
磁盤節(jié)點:保存數(shù)據(jù)到內(nèi)存和磁盤。
內(nèi)存節(jié)點雖然不寫入磁盤,但是它執(zhí)行比磁盤節(jié)點要好,集群中,只需要一個 磁盤節(jié)點來保存數(shù)據(jù)就足夠了如果集群中只有內(nèi)存節(jié)點,那么不能全部停止它
們,否則所有數(shù)據(jù)消息在服務(wù)器全部停機之后都會丟失。
3.5 添加節(jié)點
在一個 rabbitmq 集群里,有 3 臺。其中 1 臺使用磁盤模式,其它節(jié) 點使用內(nèi)存模式,內(nèi)存節(jié)點無訪問速度更快,由于磁盤 IO 相對較慢,因此可作 為數(shù)據(jù)備份使用。
在 rabbitmq-server-1 作為內(nèi)存節(jié)點連接起來,并作為內(nèi)存節(jié)點,在 rabbitmq-server-1 執(zhí)行以下命令:
rabbitmqctl stop_app #停止 app 服務(wù) rabbitmqctl reset #清空元數(shù)據(jù) #將rabbitmq-server-1 添加到集群當(dāng)中,并成為內(nèi)存節(jié)點,不加--ram 默認(rèn)是磁盤節(jié)點 root@rabbitmq-server-1:~# rabbitmqctl join_cluster rabbit@rabbitmq-server-3 --ram Clustering node rabbit@rabbitmq-server-1 with rabbit@rabbitmq-server-3 rabbitmqctl start_app #啟動 app 服務(wù)
在 rabbitmq-server-2 作為內(nèi)存節(jié)點連接起來,并作為內(nèi)存節(jié)點,在 rabbitmq-server-2 執(zhí)行以下命令:
root@rabbitmq-server-2:~# rabbitmqctl stop_app Stopping rabbit application on node rabbit@rabbitmq-server-2 ... root@rabbitmq-server-2:~# rabbitmqctl reset Resetting node rabbit@rabbitmq-server-2 ... root@rabbitmq-server-2:~# rabbitmqctl join_cluster rabbit@rabbitmq-server-3 --ram Clustering node rabbit@rabbitmq-server-2 with rabbit@rabbitmq-server-3 root@rabbitmq-server-2:~# rabbitmqctl start_app Starting node rabbit@rabbitmq-server-2 ... completed with 3 plugins.
驗證

3.6 將集群設(shè)置為鏡像模式:
只要在其中一臺節(jié)點執(zhí)行以下命令即可:
root@rabbitmq-server-1:~# rabbitmqctl set_policy ha-all "#" '{"ha-mode":"all"}'

四、RabbitMQ 常用命令
rabbitmqctl add_vhost xxx #創(chuàng)建 vhost rabbitmqctl list_vhosts #列出所有 vhost rabbitmqctl list_queues #列出所有隊列 rabbitmqctl add_user jack 123456 #添加賬戶 jack 密碼為 123456 rabbitmqctl change_password jack 654321 Changing #更改用戶密碼
五、使用python處理rabbitmq
監(jiān)控rabbitmq是否正常
import subprocess running_list = [] error_list = [] false = "false" true = "true" def get_status(): obj = subprocess.Popen(("curl -s -u guest:guest http://localhost:15672/api/nodes &> /dev/null"), shell=True, stdout=subprocess.PIPE) data = obj.stdout.read() data1 = eval(data) #print(data1) for i in data1: if i.get("running") == "true": running_list.append(i.get("name")) else: error_list.append(i.get("name")) def count_server(): if len(running_list) < 3: # 可以判斷錯誤列表大于0或者運行列表小于3,3未總計的節(jié)點數(shù)量 print(100) # 100就是集群內(nèi)有節(jié)點運行不正常了 else: print(50) # 50為所有節(jié)點全部運行正常 def main(): get_status() count_server() if __name__ == "__main__": main()
![]()
浙公網(wǎng)安備 33010602011771號