RabbitMQ集群部署篇
一:本文在上一篇<<消息中間件之一:RabbitMQ基礎(chǔ)原理篇>> 的基礎(chǔ)之上繼續(xù)對(duì)rabbitMQ完成安裝部署,命令介紹,rabbitMQ的集群部署,以及通過(guò)使用keepalived+haproxy作為反向代理訪問(wèn)rabbitMQ,從而實(shí)現(xiàn)負(fù)載均衡及高可用,具體配置如下:
192.168.10.101 rabbitmq-server1
192.168.10.102 rabbitmq-server2
192.168.10.103 rabbitmq-server3
1.1.2:各rabbitMQ服務(wù)器更改hosts文件:
[root@rabbitmq-server1 ~]# vim /etc/hosts
192.168.10.101 rabbitmq-server1
192.168.10.102 rabbitmq-server2
192.168.10.103 rabbitmq-server3
1.1.3:各rabbitMQ服務(wù)器安裝并啟動(dòng)rabbitMQ服務(wù):
[root@rabbitmq-server1 ~]# yum install rabbitmq-server
[root@rabbitmq-server1 ~]# systemctl start rabbitmq-server
[root@rabbitmq-server1 ~]# rabbitmq-plugins enable rabbitmq_management #開啟web管理界面插件
[root@rabbitmq-server1 ~]# systemctl restart rabbitmq-server #重啟rabbitMQ服務(wù)
1.1.4:常用命令:
[root@rabbitmq-server1 ~]# rabbitmqctl add_vhost xxx #創(chuàng)建vhost
Creating vhost "xxx" ...
...done.
[root@rabbitmq-server1 ~]# rabbitmqctl list_vhosts #遍歷所有vhost信息
Listing vhosts ...
/
xxx
...done.
[root@rabbitmq-server1 ~]# rabbitmqctl delete_vhost xxx #刪除vhost
Deleting vhost "xxx" ...
...done.
[root@rabbitmq-server1 ~]# rabbitmqctl add_user jack 123456 #添加用戶jack密碼為123456
Creating user "jack" ...
...done.
[root@rabbitmq-server1 ~]# rabbitmqctl change_password jack 654321 #更改jack用戶的密碼為654321
Changing password for user "jack" ...
...done.
[root@rabbitmq-server1 ~]# rabbitmqctl set_permissions -p xxx jack ".*" ".*" ".*" #綁定jack用戶對(duì)xxx的vhost的讀寫權(quán)限
Setting permissions for user "jack" in vhost "xxx" ...
...done.
[root@rabbitmq-server1 ~]# rabbitmqctl list_queues #列出所有隊(duì)列
Listing queues ...
...done.
1.1.5:訪問(wèn)web界面:默認(rèn)用戶名為guest密碼為guest

二: 部署rabbitMQ集群:
2.1:Rabbitmq的集群是依賴于erlang的集群來(lái)工作的,所以必須先構(gòu)建起erlang的集群環(huán)境。而Erlang的集群中各節(jié)點(diǎn)是通過(guò)一個(gè)magic cookie來(lái)實(shí)現(xiàn)的,這個(gè)cookie存放在 /var/lib/rabbitmq/.erlang.cookie 中,文件是400的權(quán)限。所以必須保證各節(jié)點(diǎn)cookie保持一致,否則節(jié)點(diǎn)之間就無(wú)法通信。
[root@rabbitmq-server1 ~]# scp /var/lib/rabbitmq/.erlang.cookie 192.168.10.102:/var/lib/rabbitmq/.erlang.cookie
root@192.168.10.102's password:
.erlang.cookie 100% 20 0.0KB/s 00:00
[root@rabbitmq-server1 ~]# scp /var/lib/rabbitmq/.erlang.cookie 192.168.10.103:/var/lib/rabbitmq/.erlang.cookie
root@192.168.10.103's password:
.erlang.cookie 100% 20 0.0KB/s 00:00
2.2:各節(jié)點(diǎn)重啟rabbitMQ服務(wù)并驗(yàn)證端口:
2.2.1:各節(jié)點(diǎn)啟動(dòng)服務(wù):
[root@rabbitmq-server1 ~]# systemctl restart rabbitmq-server
[root@rabbitmq-server2 ~]# systemctl restart rabbitmq-server
[root@rabbitmq-server3 ~]# systemctl restart rabbitmq-server
2.2.2:驗(yàn)證端口:
5672:消費(fèi)者訪問(wèn)的 端口
15672:web管理端口
25672:集群狀態(tài)通信端口
2.3:停止所有節(jié)點(diǎn)RabbitMq服務(wù),然后使用detached參數(shù)獨(dú)立運(yùn)行,這步很關(guān)鍵,尤其增加節(jié)點(diǎn)停止節(jié)點(diǎn)后再次啟動(dòng)遇到無(wú)法啟動(dòng)都可以參照這個(gè)順序
[root@rabbitmq-server1 ~]# systemctl stop rabbitmq-server
[root@rabbitmq-server2 ~]# systemctl stop rabbitmq-server
[root@rabbitmq-server3 ~]# systemctl stop rabbitmq-server
[root@rabbitmq-server1 ~]# rabbitmq-server -detached
[root@rabbitmq-server2 ~]# rabbitmq-server -detached
[root@rabbitmq-server3 ~]# rabbitmq-server -detached
2.4:查看各rabbitMQ服務(wù)的集群狀態(tài):
[root@rabbitmq-server1 ~]# rabbitmqctl cluster_status
Cluster status of node 'rabbit@rabbitmq-server1' ...
[{nodes,[{disc,['rabbit@rabbitmq-server1']}]},
{running_nodes,['rabbit@rabbitmq-server1']},
{cluster_name,<<"rabbit@rabbitmq-server1">>},
{partitions,[]}]
...done.
[root@rabbitmq-server2 ~]# rabbitmqctl cluster_status
Cluster status of node 'rabbit@rabbitmq-server2' ...
[{nodes,[{disc,['rabbit@rabbitmq-server2']}]},
{running_nodes,['rabbit@rabbitmq-server2']},
{cluster_name,<<"rabbit@rabbitmq-server2">>},
{partitions,[]}]
...done.
[root@rabbitmq-server3 rabbitmq]# rabbitmqctl cluster_status
Cluster status of node 'rabbit@rabbitmq-server3' ...
[{nodes,[{disc,['rabbit@rabbitmq-server3']}]},
{running_nodes,['rabbit@rabbitmq-server3']},
{cluster_name,<<"rabbit@rabbitmq-server3">>},
{partitions,[]}]
...done.
2.5:將rabbitMQ的節(jié)點(diǎn)添加到集群:
2.5.1:在rabbitmq-server1作為內(nèi)存節(jié)點(diǎn)連接起來(lái),并作為內(nèi)存節(jié)點(diǎn),在rabbitmq-server1執(zhí)行以下命令:
[root@rabbitmq-server1 ~]# rabbitmqctl cluster_status
Cluster status of node 'rabbit@rabbitmq-server1' ...
[{nodes,[{disc,['rabbit@rabbitmq-server1']}]},
{running_nodes,['rabbit@rabbitmq-server1']},
{cluster_name,<<"rabbit@rabbitmq-server1">>}, #未添加到集群之前只有自己一臺(tái)節(jié)點(diǎn)
{partitions,[]}]
...done.
[root@rabbitmq-server1 ~]# rabbitmqctl stop_app #停止應(yīng)程序
[root@rabbitmq-server1 ~]# rabbitmqctl reset #清空元數(shù)據(jù)
[root@rabbitmq-server1 ~]# rabbitmqctl join_cluster rabbit@rabbitmq-server2 --ram #將rabbitmq-server1添加到集群當(dāng)中,并成為內(nèi)存節(jié)點(diǎn),不加--ram默認(rèn)是磁盤節(jié)點(diǎn)
Clustering node 'rabbit@rabbitmq-server1' with 'rabbit@rabbitmq-server2' ...
...done.
[root@rabbitmq-server1 ~]# rabbitmqctl cluster_status
Cluster status of node 'rabbit@rabbitmq-server1' ...
[{nodes,[{disc,['rabbit@rabbitmq-server2']}, #默認(rèn)是磁盤節(jié)點(diǎn)
{ram,['rabbit@rabbitmq-server1']}]}] #內(nèi)存節(jié)點(diǎn)
...done.
[root@rabbitmq-server1 ~]# rabbitmqctl start_app #不要忘記啟動(dòng)應(yīng)用程序
Starting node 'rabbit@rabbitmq-server1' ...
...done.
[root@rabbitmq-server1 ~]# rabbitmqctl cluster_status #添加之后的集群狀態(tài)
Cluster status of node 'rabbit@rabbitmq-server1' ...
[{nodes,[{disc,['rabbit@rabbitmq-server2']}, #默認(rèn)的是磁盤節(jié)點(diǎn)
{ram,['rabbit@rabbitmq-server1']}]}] #自己被添加為內(nèi)存節(jié)點(diǎn)
...done.
2.5.2:將rabbitmq-server3添加到集群:
[root@rabbitmq-server3 rabbitmq]# rabbitmqctl cluster_status #未添加到集群之后只有自己一個(gè)節(jié)點(diǎn)
Cluster status of node 'rabbit@rabbitmq-server3' ...
[{nodes,[{disc,['rabbit@rabbitmq-server3']}]},
{running_nodes,['rabbit@rabbitmq-server3']},
{cluster_name,<<"rabbit@rabbitmq-server3">>},
{partitions,[]}]
...done.
[root@rabbitmq-server3 rabbitmq]# rabbitmqctl stop_app #停止應(yīng)用程序
Stopping node 'rabbit@rabbitmq-server3' ...
...done.
[root@rabbitmq-server3 rabbitmq]# rabbitmqctl reset #重設(shè)元數(shù)據(jù)
Resetting node 'rabbit@rabbitmq-server3' ...
...done.
[root@rabbitmq-server3 rabbitmq]# rabbitmqctl join_cluster rabbit@rabbitmq-server2 --ram #添加到集群當(dāng)中并設(shè)置為內(nèi)存節(jié)點(diǎn)
Clustering node 'rabbit@rabbitmq-server3' with 'rabbit@rabbitmq-server2' ...
...done.
[root@rabbitmq-server3 rabbitmq]# rabbitmqctl start_app #驅(qū)動(dòng)應(yīng)用程序
Starting node 'rabbit@rabbitmq-server3' ...
...done.
[root@rabbitmq-server3 rabbitmq]# rabbitmqctl cluster_status #查看集群狀態(tài)
Cluster status of node 'rabbit@rabbitmq-server3' ...
[{nodes,[{disc,['rabbit@rabbitmq-server2']}, #一個(gè)磁盤節(jié)點(diǎn),集群當(dāng)中最少有一個(gè)磁盤節(jié)點(diǎn)用于消息的持久化
{ram,['rabbit@rabbitmq-server3','rabbit@rabbitmq-server1']}]}, #兩個(gè)內(nèi)存節(jié)點(diǎn)
{running_nodes,['rabbit@rabbitmq-server1','rabbit@rabbitmq-server2', #當(dāng)前正在運(yùn)行的節(jié)點(diǎn)
'rabbit@rabbitmq-server3']},
{cluster_name,<<"rabbit@rabbitmq-server2">>}, #集群名稱
{partitions,[]}]
...done.
2.6:更改為鏡像模式:
2.6.1:只要在其中一臺(tái)節(jié)點(diǎn)執(zhí)行以下命令即可:
[root@rabbitmq-server1 ~]# rabbitmqctl set_policy ha-all "#" '{"ha-mode":"all"}' #"#"為任意0個(gè)或多個(gè)即為所有,也可以使用"^test"匹配開頭,還可以使用其他正則匹配
Setting policy "ha-all" for pattern "#" to "{\"ha-mode\":\"all\"}" with priority "0" ...
...done.
2.6.2:在任意一臺(tái)節(jié)點(diǎn)的管理界面創(chuàng)建一個(gè)queue,驗(yàn)證是可以同步到其他節(jié)點(diǎn):

2.6.3:在其他rabbitMQ節(jié)點(diǎn)進(jìn)行驗(yàn)證queue是否存在:

2.6.4:使用命令也可以驗(yàn)證:
[root@rabbitmq-server1 ~]# rabbitmqctl list_queues
Listing queues ...
test 0
test1 0
test2 0
...done.
2.7:python 操作rabbitMQ:
2.7.1:安裝pika模塊:
# pip3 install pika
# easy_install pika
2.7.2:生產(chǎn)者代碼:
#!/bin/env python
#coding:utf-8
#Author: ZhangJie
import pika
#用戶名密碼
cert = pika.PlainCredentials("guest","guest")
#連接到rabbitMQ服務(wù)器
conn = pika.BlockingConnection(pika.ConnectionParameters("192.168.10.101",5672,'/',cert))
#創(chuàng)建頻道
chanel = conn.channel()
#聲明消息隊(duì)列,如果隊(duì)列不存在就創(chuàng)建,存在就將消息在此隊(duì)隊(duì)列中創(chuàng)建,如果將消息發(fā)送到不存在的隊(duì)列,則rabbitMQ會(huì)自動(dòng)清除這些消息
chanel.queue_declare(queue="hello")
#exchange告訴消息去往的隊(duì)列,routing_key是隊(duì)列名,body是要傳遞的消息內(nèi)容
chanel.basic_publish(exchange="",
routing_key="hello",
body="hello world!")
print("開始隊(duì)列")
#消息寫入完成,關(guān)閉連接
conn.close()
2.7.3:消費(fèi)者代碼:
#!/bin/env python
#coding:utf-8
#Author: ZhangJie
import pika
#用戶名密碼
cert = pika.PlainCredentials("guest","guest")
#連接到rabbitMQ服務(wù)器
conn = pika.BlockingConnection(pika.ConnectionParameters("192.168.10.101",5672,'/',cert))
#創(chuàng)建頻道
chanel = conn.channel()
#聲明消息隊(duì)列,如果隊(duì)列不存在就創(chuàng)建,存在就將消息在此隊(duì)隊(duì)列中創(chuàng)建,如果將消息發(fā)送到不存在的隊(duì)列,則rabbitMQ會(huì)自動(dòng)清除這些消息
chanel.queue_declare(queue="hello")
#exchange告訴消息去往的隊(duì)列,routing_key是隊(duì)列名,body是要傳遞的消息內(nèi)容
chanel.basic_publish(exchange="",
routing_key="hello",
body="hello world1!")
print("開始隊(duì)列")
#消息寫入完成,關(guān)閉連接
conn.close()
2.7.4:執(zhí)行結(jié)果:

2.7.5:最終的queue截圖:

三:通過(guò)haproxy 實(shí)現(xiàn)反向代理:
3.1:haproxy安裝過(guò)程省略。主要配置如下:
[root@rabbitmq-server3 ~]# cat /etc/haproxy/haproxy.cfg
global
maxconn 100000
uid 99
gid 99
daemon
nbproc 1
log 127.0.0.1 local0 info
defaults
option http-keep-alive
#option forwardfor
maxconn 100000
mode tcp
timeout connect 300000ms
timeout client 300000ms
timeout server 300000ms
listen rabbitmq 0.0.0.0:5671
mode tcp
log global
balance roundrobin
server rabbitmq1 192.168.10.101:5672 check inter 2000 rise 3 fall 3
server rabbitmq2 192.168.10.102:5672 check inter 2000 rise 3 fall 3
server rabbitmq3 192.168.10.103:5672 check inter 2000 rise 3 fall 3
listen rabbitmq_cluster 0.0.0.0:15671
mode tcp
log global
balance roundrobin
server rqslave1 192.168.10.101:15672 check inter 2000 rise 3 fall 3
server rqslave2 192.168.10.102:15672 check inter 2000 rise 3 fall 3
server rqslave3 192.168.10.102:15672 check inter 2000 rise 3 fall 3
3.2:訪問(wèn)5671端口,驗(yàn)證可以訪問(wèn)到后端真實(shí)的rabbitMQ服務(wù)器:

3.3:使用python通過(guò)haproxy向rabbitMQ寫入消息:
3.3.1:寫入消息的的生產(chǎn)者代碼:
#!/bin/env python
#coding:utf-8
#Author: ZhangJie
import pika
#用戶名密碼
cert = pika.PlainCredentials("guest","guest")
#連接到rabbitMQ服務(wù)器
conn = pika.BlockingConnection(pika.ConnectionParameters("192.168.10.103",5671,'/',cert))
#創(chuàng)建頻道
chanel = conn.channel()
#聲明消息隊(duì)列,如果隊(duì)列不存在就創(chuàng)建,存在就將消息在此隊(duì)隊(duì)列中創(chuàng)建,如果將消息發(fā)送到不存在的隊(duì)列,則rabbitMQ會(huì)自動(dòng)清除這些消息
chanel.queue_declare(queue="hello")
#exchange告訴消息去往的隊(duì)列,routing_key是隊(duì)列名,body是要傳遞的消息內(nèi)容
chanel.basic_publish(exchange="",
routing_key="test2",
body="hello world1!")
print("消息寫入成功!")
#消息寫入完成,關(guān)閉連接
conn.close()
3.3.2:執(zhí)行結(jié)果:

3.4:通過(guò)haproxy反向代理想rabbitMQ服務(wù)器獲取消息:
3.4.1:獲取消息的消費(fèi)者腳本內(nèi)容:
#!/bin/env python
#coding:utf-8
#Author: ZhangJie
import pika
#用戶名
cert = pika.PlainCredentials("guest","guest")
#連接到服務(wù)器
conn = pika.BlockingConnection(pika.ConnectionParameters("192.168.10.103",5671,"/",cert))
#創(chuàng)建頻道
channel = conn.channel()
#聲明消息隊(duì)列,如果不存在就創(chuàng)建
channel.queue_declare(queue="hello")
# 定義一個(gè)回調(diào)函數(shù)來(lái)處理,這邊的回調(diào)函數(shù)就是將信息打印出來(lái)。
def callback(ch,method,properties,body):
print("[x] Received %r" % body)
channel.basic_consume(callback,
queue="test2",
no_ack=True) # no_ack=True表示在回調(diào)函數(shù)中不需要發(fā)送確認(rèn)標(biāo)識(shí)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 開始接收信息,并進(jìn)入阻塞狀態(tài),隊(duì)列里有信息才會(huì)調(diào)用callback進(jìn)行處理。按ctrl+c退出。
channel.start_consuming()
3.4.2:執(zhí)行結(jié)果:

浙公網(wǎng)安備 33010602011771號(hào)