| 云間錄 |
|
||
RabbitMQ簡介各大主流中間件對比
ActiveMQ 是 Apache 出品,最流行的,能力強勁的開源消息總線,并且它一 個完全支持 J M S 規范的消息中間件。 其豐富的 API 、多種集群構建模式使得他成為業界老牌消息中間件,在中 小型企業中應用廣泛! MQ 衡量指標:服務性能、數據存儲、集群架構
Kafka
RocketMQ是阿里開源的消息中間件,目前也已經孵化為Apache頂級項目, 它是純java開發,具有高吞吐量、高可用性、適合大規模分布式系統 應用的特點。 RocketMQ思路起源于Kafka,它對消息的可靠傳輸及事務 性做了優化, 目前在阿里集團被廣泛應用于交易、充值、流計算、消息推 送、日志流式處理、binglog分發等場景
RabbitMQ是使用Erlang語言開發的開源消息隊列系統,基于AMQP協議 來實現。 AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布 /訂閱)、可靠性、安全。AMQP協議更多用在企業系統內, 對數據_致 性、穩定性和可靠性要求很髙的場景,對性能和吞吐量的要求還在其次。
初識RabbitMQ RabbitMQ是—個開源的消息代理和隊列服務器,用來通過普通協議 在完全不同的應用之間共享數據,RabbitMQ是使用Erlang語言來編寫 的,并且RabbitMQ是基于AMQP協議的。
RabbitMQ高性能的原因? Erlang語言最初在于交換機領域的架構模式,這樣使得 RabbitMQ在Broker之間進行數據交互的性能是非常優秀的 Erlang的優點:Erlang有著和原生Socket—樣的延遲
AMQP定義: 是具有現代特征的二進制協議; 是一個提供統一消息服務的應用層標準高級消息隊列協議; 是應用層協議的一個開放標準,為面向消息的中間件設計; AMQP核心概念 Server:又稱Broker,接受客戶端的連接,實現AMQP實體服務 Connection:連接:應用程序與Broker的網絡連接 Channel:網絡通道,幾乎所有的操作都在Channel中進行,Channel是進行消息讀寫的通道;客戶端可建立多個Channel,每個Channel代表一個會話任務; Message:消息,服務器與應用程序之間傳遞的數據,由Properties和Body組成。Properties可以對消息進行裝飾,比如消息的優先級、延遲等高級特性;Body則就是消息體內容; Virtual host:虛擬地址,用于進行邏輯隔離,最上層的消息路由;一個Virtual Host里面可以有若干個Exchange和Queue,同一個Virtual Host里面不能有相同名稱的Exchange或Queue; Exchange:交換機,交換消息,根據路由鍵轉發消息到綁定的隊列; Binding:Exchange和Queue之間的虛擬連接,binding中可以包含routing key; Routing key:一個路由規則,虛擬機可用它來確定如何路由一個特定消息 Queue:也稱為Message Queue,消息隊列,保存消息并將它們轉發給消費者
RabbitMQ安裝及使用
Centos安裝方式
RabbitMQ安裝與使用
官網地址:https://www.rabbitmq.com/
提前準備:安裝Linux必要依賴包
下載RabbitMQ必須安裝包
配置文件修改
服務的啟動:rabbitmq-server start &
服務的停止:rabbitmqctl stop_app
管理插件:rabbitmq-plugins enable rabbitmq_management
訪問地址:http://ip:15672/
Docker安裝方式注意獲取鏡像的時候要獲取management版本的,不要獲取last版本的,management版本的才帶有管理界面 1.查詢鏡像
docker search rabbitmq:management
2.獲取鏡像
docker pull rabbitmq:management
3.運行鏡像
方式一:默認guest用戶,密碼也是guest
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management 方式二:設置用戶名和密碼
docker run -d \
--name my-rabbitmq \
-p 5672:5672 -p 15672:15672 \
-v /data:/var/lib/rabbitmq \
--hostname my-rabbitmq-host \
-e RABBITMQ_DEFAULT_VHOST=my_vhost \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--restart=always \
rabbitmq:management 參數說明: -d:后臺運行容器 -name:指定容器名 -p:指定服務運行的端口(5672:應用訪問端口;15672:控制臺Web端口號) -v:映射目錄或文件,啟動了一個數據卷容器,數據卷路徑為:/var/lib/rabbitmq,再將此數據卷映射到住宿主機的/data目錄 --hostname:主機名(RabbitMQ的一個重要注意事項是它根據所謂的 “節點名稱” 存儲數據,默認為主機名) -e:指定環境變量;(RABBITMQ_DEFAULT_VHOST:默認虛擬機名;RABBITMQ_DEFAULT_USER:默認的用戶名;RABBITMQ_DEFAULT_PASS:默認用戶名的密碼) --restart=always:當Docker重啟時,容器能自動啟動 rabbitmq:management:鏡像名
4、進入RabbitMQ管理平臺進行相關操作
注1:容器啟動后,可以通過docker logs 窗口ID/容器名字 查看日志
docker logs my-rabbitmq
注2:停止并刪除所有容器
docker stop $(docker ps -aq) && docker rm $(docker ps -aq)
常用操作命令 命令行與管控臺-基礎操作 rabbitmqctl stop_app:關閉應用 rabbitmqctl start_app:啟動應用 rabbitmqctl status:節點狀態 rabbitmqctl add_user username password:添加用戶 rabbitmqctl list_users:列出所有用戶 rabbitmqctl delete_user username:刪除用戶 rabbitmqctl clear_permissions -p vhostpath username:清除用戶權限 rabbitmqctl list_user_permissions username:列出用戶權限 rabbitmqctl change_password username newpassword:修改密碼 rabbitmqctl set_permissions -p vhostpath username “.*” “.*” “.*” rabbitmqctl add_vhost vhostpath:創建虛擬主機 rabbitmqctl list_vhosts:列出所有虛擬主機 rabbitmqctl list_permissions -p vhostpath:列出虛擬主機上所有權限 rabbitmqctl delete_vhost vhostpath:刪除虛擬主機 rabbitmqctl list_queues:查看所有隊列信息 rabbitmqctl -p vhostpath purge_queue blue:清除隊列里的消息
命令行與管控臺-高級操作
rabbitmqctl reset:移除所有數據,要在rabbitmqctl stop_app之后使用
rabbitmqctl join_cluster <clustermode> [--ram]:組成集群命令
rabbitmqctl cluster_status:查看集群狀態
rabbitmqctl change_cluster_node_type disc | ram:修改集群節點的存儲形式
rabbitmqctl forget_cluster_node {--offline} 忘記節點 (摘除節點)
rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2...] (修改節點名稱)
RabbitMQ快速入門
極速入門-消息生產與消費
ConnectionFactory:獲取連接工廠
Connection:一個鏈接
Channel:數據通信通道,課發送和接收消息
Queue:具體的消息存儲隊列
Producer & Consumer:生產和消費者
創建一個springboot項目: rabbitmq-api
導入pom依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
消費端代碼: package com.lingerqi.rabbitmqapi.quickstart;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
/**
* @author xyls
* @blog name & blog address 027@0030
* @create 2019-12-21 10:54
*/
public class Consumer {
public static void main(String[] args) throws Exception {
//1 創建一個ConnectionFactory, 并進行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.43.232");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通過連接工廠創建連接
Connection connection = connectionFactory.newConnection();
//3 通過connection創建一個Channel
Channel channel = connection.createChannel();
//4 聲明(創建)一個隊列
String queueName = "test001";
// 參數:隊列名稱、持久化與否、獨占與否、無消息隊列是否自動刪除、消息參數
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
channel.queueDeclare(queueName, true, false, false, null);
//5 創建消費者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6 設置Channel
// 參數:隊列名稱、自動簽收、消費者回調
// basicConsume(String queue, boolean autoAck, Consumer callback)
channel.basicConsume(queueName, true, queueingConsumer);
while(true){
//7 獲取消息(Delivery:傳送)
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消費端: " + msg);
//Envelope envelope = delivery.getEnvelope();
}
}
}
生產端代碼: package com.lingerqi.rabbitmqapi.quickstart;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author xyls
* @blog name & blog address 027@0030
* @create 2019-12-21 10:54
*/
public class Procuder {
public static void main(String[] args) throws Exception {
//1 創建一個ConnectionFactory, 并進行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.43.232");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通過連接工廠創建連接
Connection connection = connectionFactory.newConnection();
//3 通過connection創建一個Channel
Channel channel = connection.createChannel();
//4 通過Channel發送數據
for(int i=0; i < 5; i++){
String msg = "Hello RabbitMQ!";
//1 exchange 2 routingKey
channel.basicPublish("", "test001", null, msg.getBytes());
}
//5 記得要關閉相關的連接
channel.close();
connection.close();
}
}
交換機
交換機屬性: Name:交換機名稱 Type:交換機類型 direct、topic、fanout、headers Durability:是否需要持久化,true為持久化 Auto Delete:當最后一個綁定到Exchange上的隊列刪除后,自動刪除該Exchange Internal:當前Exchange是否用于RabbitMQ內部使用,默認為False Arguments:擴展參數,用于擴展AMQP協議,定制化使用
直流交換機直連交換機Direct Exchange(完全匹配路由key) 所有發送到Direct Exchange的消息會被轉發到RouteKey中指定的Queue 注意:Direct模式可以使用RabbitMQ自帶的Exchange:default Exchange,所以不需要將Exchange進行任何綁定(binding)操作, 消息傳遞時,RouteKey必須完全匹配才會被隊列接收,否則該消息會被拋棄;
消費端代碼 package com.lingerqi.rabbitmqapi.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* @author xyls
* @blog name & blog address 027@0030
* @create 2019-12-21 10:54
*/
public class Consumer4DirectExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.43.232");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test.direct";
//表示聲明了一個交換機
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
//表示聲明了一個隊列
channel.queueDeclare(queueName, false, false, false, null);
//建立一個綁定關系:
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循環獲取消息
while(true){
//獲取消息,如果沒有消息,這一步將會一直阻塞
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
生產端代碼: package com.lingerqi.rabbitmqapi.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author xyls
* @blog name & blog address 027@0030
* @create 2019-12-21 10:54
*/
public class Producer4DirectExchange {
public static void main(String[] args) throws Exception {
//1 創建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.43.232");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 創建Connection
Connection connection = connectionFactory.newConnection();
//3 創建Channel
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_direct_exchange";
String routingKey = "test.direct";
// String routingKey = "test.direct111"; //收不到
//5 發送
String msg = "Hello World RabbitMQ 4 Direct Exchange Message 111 ... ";
channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
}
}
主題交換機主題交換機Topic Exchange(匹配路由規則的交換機) 所有發送到Topic Exchange的消息被轉發到所有關系RouteKey中指定Topic的Queue上; Exchange將RouteKey和某Topic進行模糊匹配,此時隊列需要綁定一個Topic; 注意:可以使用通配符進行模糊匹配 符號:“#” 匹配一個或者多個詞 符號:“*” 匹配不多不少一個詞 列如: “log.#” 能夠匹配到 “log.info.oa” “log.*” 能夠匹配到 “log.err”
消費端代碼: package com.lingerqi.rabbitmqapi.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* @author xyls
* @blog name & blog address 027@0030
* @create 2019-12-21 10:54
*/
public class Consumer4TopicExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.43.232");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
String routingKey = "user.#";
// String routingKey = "user.*";
// 1 聲明交換機
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 2 聲明隊列
channel.queueDeclare(queueName, false, false, false, null);
// 3 建立交換機和隊列的綁定關系:
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循環獲取消息
while(true){
//獲取消息,如果沒有消息,這一步將會一直阻塞
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
生產端代碼:
package com.lingerqi.rabbitmqapi.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author xyls
* @blog name & blog address 027@0030
* @create 2019-12-21 10:54
*/
public class Producer4TopicExchange {
public static void main(String[] args) throws Exception {
//1 創建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.43.232");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 創建Connection
Connection connection = connectionFactory.newConnection();
//3 創建Channel
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
//5 發送
String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
channel.close();
connection.close();
}
}
輸出交換機輸出交換機Fanout Exchange(不做路由)
不處理路由鍵,只需要簡單的將隊列綁定到交換機上; 發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上; Fanout交換機轉發消息是最快的
消費端代碼: package com.lingerqi.rabbitmqapi.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* @author xyls
* @blog name & blog address 027@0030
* @create 2019-12-21 10:54
*/
public class Consumer4FanoutExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.43.232");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
String routingKey = ""; //不設置路由鍵
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循環獲取消息
while(true){
//獲取消息,如果沒有消息,這一步將會一直阻塞
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
生產端代碼: package com.lingerqi.rabbitmqapi.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author xyls
* @blog name & blog address 027@0030
* @create 2019-12-21 10:54
*/
public class Producer4FanoutExchange {
public static void main(String[] args) throws Exception {
//1 創建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.43.232");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 創建Connection
Connection connection = connectionFactory.newConnection();
//3 創建Channel
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_fanout_exchange";
//5 發送
for(int i = 0; i < 10; i ++) {
String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
channel.basicPublish(exchangeName, "", null , msg.getBytes());
}
channel.close();
connection.close();
}
}
Binding-綁定 Exchange和Exchange、Queue之間的連接關系; Binding中可以包含RoutingKey或者參數
Queue-消息隊列 消息隊列,實際存儲消息數據 Durability:是否持久化 Durable:是,Transient:否 Auto delete:如選yes,代表當最后一個監聽被移除之后,該Queue會自動被刪除
Message-消息 服務器和應用程序之間傳遞的數據 本質上就是一段數據,由Properties和Payload(Body)組成 常用屬性:delivery model、headers(自定義屬性)
Message-其他屬性 content_type、content_encoding、priority correlation_id、reply_to、expiration、message_id Timestamp、type、user_id、app_id、cluster_id
Virtual host-虛擬主機 虛擬地址,用于進行邏輯隔離,最上層的消息路由 一個Virtual Host里面可以有若干個Exchange和Queue 同一個Virtual Host里面不能有相同名稱的Exchange或Queue |
![]() |
|
|
博客園
|
|