.NET Core中RabbitMQ初識
.NET Core中RabbitMQ初識
前提
前段時間上班無事,上網沖浪看到了消息隊列RabbitMQ,就想著學習一下,網上看了點資料在嗶哩嗶哩上看的到codeman講的一個rabbitmq的視頻,就跟著仔細學習一下,敲一下代碼。視頻地址: rabbitmq視頻。
RabbitMq介紹
什么是消息隊列
MQ全稱為Message Queue,即消息隊列。“消息隊列”是在消息的傳輸過程中保存消息的容器。它是典型的:生產者、消費者模型。生產者不斷向消息隊列中生產消息,消費者不斷的從隊列中獲取消息。因為消息的生產和消費都是異步的,而且只關心消息的發送和接收,沒有業務邏輯的侵入,這樣就實現了生產者和消費者的解耦。

應用場景
削峰填谷
在一個時間段很多用戶同時進行請求我們的A系統,我的MQ容器就可以用來存儲請求按照每秒多少的請求進行發送,減輕服務器的壓力。
? 
-
使用了MQ之后,限制消息消費的速度為3000,這樣一來,高峰就被“削”掉了,但是因為消息積壓,在高峰期過后一段時間內,消費消息的速度還是會維持在3000,直到消費完擠壓的消息,這就叫做“填谷”。
-
使用MQ后,可以提供系統穩定性。

異步提速
-
在不使用MQ的情況下我們正常用戶通過訂單系統進行下單,我們需要900多ms,這就會出現用戶的體驗不好。

-
在使用MQ的情況出現了總耗時只要25ms就給到了用戶回應
這樣提升了用戶體驗感

所有的問題當你解決一個問題就會出現另外的問題,外部依賴多系統的穩定性就越差,MQ但凡掛了,系統就會出問題,后面就會使用mq集群來解決這一問題。
消息模型
點對點模式


在上圖的模型中,有以下概念:
- Producer:生產者,也就是要發送消息的程序
- Consumer:消費者:消息的接受者,會一直等待消息到來。
- Queue:消息隊列。可以緩存消息;生產者向其中投遞消息,消費者從其中取出消息。
- 點對點模式只會有一個消費者進行消費
代碼附上

新增兩個項目一個生產者 Z.RabbitMq.Producer,一個消費者Z.RabbitMQ.Consumer01
- 項目 Z.RabbitMq.Producer新增
HelloProducer類
-
public class HelloProducer { public static void HelloWorldShow() { var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.Port = 5672; factory.UserName = "admin"; factory.Password = "admin"; factory.VirtualHost = "my_vhost"; // 獲取TCP 長連接 using (var connection = factory.CreateConnection()) { // 創建通信“通道”,相當于TCP中的虛擬連接 using (var channel = connection.CreateModel()) { /* * 創建隊列,聲明并創建一個隊列,如果隊列已存在,則使用這個隊列 * 第一個參數:隊列名稱ID * 第二個參數:是否持久化,false對應不持久化數據,MQ停掉數據就會丟失 * 第三個參數:是否隊列私有化,false則代表所有的消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用 * 第四個:是否自動刪除,false代表連接停掉后不自動刪除這個隊列 * 其他額外參數為null */ channel.QueueDeclare(RabbitConstant.QUEUE_HELLO_WORLD, true, false, false, null); Console.ForegroundColor = ConsoleColor.Red; string message = "hello CodeMan 666"; var body = Encoding.UTF8.GetBytes(message); /* * exchange:交換機,暫時用不到,在進行發布訂閱時才會用到 * 路由key * 額外的設置屬性 * 最后一個參數是要傳遞的消息字節數組 */ channel.BasicPublish("", RabbitConstant.QUEUE_HELLO_WORLD, null, body); Console.WriteLine($"producer消息:{message}已發送"); } } } }
- 項目 Z.RabbitMQ.Consumer01新增
HelloConsumer類
-
public class HelloConsumer { public static void HelloWorldShow() { var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.Port = 5672;//5672是RabbitMQ默認的端口號 factory.UserName = "admin"; factory.Password = "admin"; factory.VirtualHost = "my_vhost"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { /* * 創建隊列,聲明并創建一個隊列,如果隊列已存在,則使用這個隊列 * 第一個參數:隊列名稱ID * 第二個參數:是否持久化,false對應不持久化數據,MQ停掉數據就會丟失 * 第三個參數:是否隊列私有化,false則代表所有的消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用 * 第四個:是否自動刪除,false代表連接停掉后不自動刪除這個隊列 * 其他額外參數為null */ //RabbitConstant.QUEUE_HELLO_WORLD 對應的生產者一樣名稱 "helloworld.queue" channel.QueueDeclare(RabbitConstant.QUEUE_HELLO_WORLD, true, false, false, null); Console.ForegroundColor = ConsoleColor.Cyan; EventingBasicConsumer consumers = new EventingBasicConsumer(channel); // 觸發事件 consumers.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); // false只是確認簽收當前的消息,設置為true的時候則代表簽收該消費者所有未簽收的消息 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine($"Consumer01接收消息:{message}"); }; /* * 從MQ服務器中獲取數據 * 創建一個消息消費者 * 第一個參數:隊列名 * 第二個參數:是否自動確認收到消息,false代表手動確認消息,這是MQ推薦的做法 * 第三個參數:要傳入的IBasicConsumer接口 * */ //RabbitConstant.QUEUE_HELLO_WORLD == helloworld.queue channel.BasicConsume(RabbitConstant.QUEUE_HELLO_WORLD, false, consumers); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } }
work消息模型
工作隊列或者競爭消費者模式

work queues與入門程序相比,多了一個消費端,兩個消費端共同消費同一個隊列中的消息,但是一個消息只能被一個消費者獲取。
接下來我們來模擬這個流程:
P:生產者:任務的發布者
C1:消費者1:領取任務并且完成任務,假設完成速度較慢(模擬耗時)
C2:消費者2:領取任務并且完成任務,假設完成速度較快
代碼附上
新增一個工具類用來獲取rabbitmq的連接信息
public class RabbitUtils
{
public static ConnectionFactory GetConnection()
{
var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1";
factory.Port = 5672;//5672是RabbitMQ默認的端口號
factory.UserName = "admin";
factory.Password = "admin";
factory.VirtualHost = "my_vhost";
return factory;
}
}
-
消費者1(C1)在剛剛的 Z.RabbitMQ.Consumer01新增
SmsReceive類在
Program.cs中的main函數中進行調用SmsReceive.Sender();消費者1 延遲30ms接受到信息
public class SmsReceive { public static void Sender() { //使用工具類創建連接 var connection = RabbitUtils.GetConnection().CreateConnection(); var channel = connection.CreateModel(); channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null); // 如果不寫basicQos(1),則自動MQ會將所有請求平均發送給所有消費者 // basicQos,MQ不再對消費者一次發送多個請求,而是消費者處理完一個消息后(確認后),在從隊列中獲取一個新的 channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Thread.Sleep(30); Console.WriteLine($"SmsSender-發送短信成功:{message}"); channel.BasicAck(ea.DeliveryTag, false); }; channel.BasicConsume(RabbitConstant.QUEUE_SMS, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } -
消費者2(C2)在剛剛的 Z.RabbitMQ.Consumer02新增
SmsReceive類
消費者1 延遲60ms接受到信息
public class SmsReceive { public static void Sender() { var connection = RabbitUtils.GetConnection().CreateConnection(); var channel = connection.CreateModel(); channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null); // 如果不寫basicQos(1),則自動MQ會將所有請求平均發送給所有消費者 // basicQos,MQ不再對消費者一次發送多個請求,而是消費者處理完一個消息后(確認后),在從隊列中獲取一個新的 channel.BasicQos(0, 1, false);//處理完一個取一個 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Thread.Sleep(60); Console.WriteLine($"SmsSender-發送短信成功:{message}"); channel.BasicAck(ea.DeliveryTag, false); }; channel.BasicConsume(RabbitConstant.QUEUE_SMS, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } -
生產者Z.RabbitMq.Producer中創建SmsSender類在main函數進行調用
- 發送100條車票訂閱的消息
public class SmsSender { public static void Sender() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null); for (int i = 0; i < 100; i++) { Sms sms = new Sms("乘客" + i, "139000000" + i, "您的車票已預定成功"); string jsonSms = JsonConvert.SerializeObject(sms); var body = Encoding.UTF8.GetBytes(jsonSms); channel.BasicPublish("", RabbitConstant.QUEUE_SMS, null, body); Console.WriteLine($"正在發送內容:{jsonSms}"); } Console.WriteLine("發送數據成功"); } } } }
運行結構如下

能者多勞
- 消費者1比消費者2的效率要快,一次任務的耗時較短
- 消費者2大量時間處于空閑狀態,消費者1一直忙碌
通過channel.BasicAck(ea.DeliveryTag, false);來完成能者多勞的效果,在完成上一次請求之后再去取下一條消息,這就會出現服務器快的消費的更多,慢的消費的更少。
發布訂閱模式
Publish/subscribe(交換機類型:Fanout,也稱為廣播 )


和前面兩種模式不同:
- 聲明Exchange,不再聲明Queue
- 發送消息到Exchange,不再發送到Queue,通過exchange發送到queue上
消費者1收到的天氣
項目.RabbitMq.Consumer01 創建WeatherFanout使用exchange(交換機)
public class WeatherFanout
{
public static void Weather()
{
using (var connection = RabbitUtils.GetConnection().CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout);
// 聲明隊列信息
channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
/*
* queueBind 用于將隊列與交換機綁定
* 參數1:隊列名
* 參數2:交換機名
* 參數3:路由Key(暫時用不到)
*/
channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += ((model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"百度收到的氣象信息:{message}");
channel.BasicAck(ea.DeliveryTag, false);
});
channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);
Console.WriteLine("Press [Enter] to exit");
Console.Read();
}
}
}
}
消費者2收到的天氣
項目.RabbitMq.Consumer02 創建WeatherFanout使用exchange(交換機)
代碼與消費者01一樣
生產者發送天氣
生產者把消息推送到交換機上
public class WeatherFanout
{
public static void Weather()
{
using (var connection = RabbitUtils.GetConnection().CreateConnection())
{
using (var channel = connection.CreateModel())
{
string message = "20度";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER, "", null, body);
Console.WriteLine("天氣信息發送成功!");
}
}
}
}
最后得到效果

Routing 路由模型

P:生產者,向Exchange發送消息,發送消息時,會指定一個routing key。
X:Exchange(交換機),接收生產者的消息,然后把消息遞交給 與routing key完全匹配的隊列
C1:消費者,其所在隊列指定了需要routing key 為 error 的消息
C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息
-
隊列與交換機的綁定,不能是任意綁定,而是要指定一個RoutingKey
-
消息的發送方在向Exchange發送消息時,也必須指定消息的RoutingKey
-
Exchange不再把消息交給每一個綁定的隊列,而是根據消息的RoutingKey進行判斷,只有隊列的RoutingKey與消息的RoutingKey完全一致,才會接收消息
生產者
public class WeatherDirect
{
public static void Weather()
{
Dictionary<string, string> area = new Dictionary<string, string>();
area.Add("china.hunan.changsha.20210525", "中國湖南長沙20210525天氣數據");
area.Add("china.hubei.wuhan.20210525", "中國湖北武漢20210525天氣數據");
area.Add("china.hubei.xiangyang.20210525", "中國湖北襄陽20210525天氣數據");
area.Add("us.cal.lsj.20210525", "美國加州洛杉磯20210525天氣數據");
using (var connection = RabbitUtils.GetConnection().CreateConnection())
{
using (var channel = connection.CreateModel())
{
foreach (var item in area)
{
channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, item.Key,
null, Encoding.UTF8.GetBytes(item.Value));
}
Console.WriteLine("氣象信息發送成功!");
}
}
}
}
消費者1
接受百度路由的路由消息
public class WeatherDirect
{
public static void Weather()
{
using (var connection = RabbitUtils.GetConnection().CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct);
channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
/*
* queueBind 用于將隊列與交換機綁定
* 參數1:隊列名
* 參數2:交換機名
* 參數3:路由Key(暫時用不到)
*/
channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20210525");
channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525");
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += ((model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"百度收到的氣象信息:{message}");
channel.BasicAck(ea.DeliveryTag, false);
});
channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);
Console.WriteLine("Press [Enter] to exit");
Console.Read();
}
}
}
}
消費者2
接受新浪的路由信息
public class WeatherDirect
{
public static void Weather()
{
using (var connection = RabbitUtils.GetConnection().CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct);
// 聲明隊列信息
channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);
/*
* queueBind 用于將隊列與交換機綁定
* 參數1:隊列名
* 參數2:交換機名
* 參數3:路由Key
*/
channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.xiangyang.20210525");
channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20210525");
channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525");
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += ((model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"新浪收到的氣象信息:{message}");
channel.BasicAck(ea.DeliveryTag, false);
});
channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer);
Console.WriteLine("Press [Enter] to exit");
Console.Read();
}
}
}
}
最后得到的效果
- 新浪接收對應新浪的routingkey的信息
- 百度接收對應百度的routingkey的信息

Topics 通配符模式

routingkey支持通配符匹配格式
- 通配符格式
- Topic類型與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。只不過
- Topic類型Exchange可以讓隊列在綁定RoutingKey的時候使用通配符
- RoutingKey一般都是由一個或多個單詞組成,多個單詞之間以“.”分隔,例如:item.insert
- 通配符規則:#匹配一個或多個詞,*恰好匹配一個詞,例如item.#能夠匹配item.insert.user或者item.insert,item.只能匹配item.insert或者item.user
生產者
public class WeatherTopic
{
public static void Weather()
{
Dictionary<string, string> area = new Dictionary<string, string>();
area.Add("china.hunan.changsha.20210525", "中國湖南長沙20210525天氣數據");
area.Add("china.hubei.wuhan.20210525", "中國湖北武漢20210525天氣數據");
area.Add("china.hubei.xiangyang.20210525", "中國湖北襄陽20210525天氣數據");
area.Add("us.cal.lsj.20210525", "美國加州洛杉磯20210525天氣數據");
using (var connection = RabbitUtils.GetConnection().CreateConnection())
{
using (var channel = connection.CreateModel())
{
foreach (var item in area)
{
channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, item.Key,
null, Encoding.UTF8.GetBytes(item.Value));
}
Console.WriteLine("氣象信息發送成功!");
}
}
}
}
消費者1
獲取交換機中通配符為china.#的信息
- ("china.hunan.changsha.20210525", "中國湖南長沙20210525天氣數據");
- ("china.hubei.wuhan.20210525", "中國湖北武漢20210525天氣數據");
- ("china.hubei.xiangyang.20210525", "中國湖北襄陽20210525天氣數據");
public class WeatherTopic
{
public static void Weather()
{
using (var connection = RabbitUtils.GetConnection().CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic);
// 聲明隊列信息
channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
/*
* queueBind 用于將隊列與交換機綁定
* 參數1:隊列名
* 參數2:交換機名
* 參數3:路由Key(暫時用不到)
*/
channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.#");
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += ((model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"百度收到的氣象信息:{message}");
channel.BasicAck(ea.DeliveryTag, false);
});
channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);
Console.WriteLine("Press [Enter] to exit");
Console.Read();
}
}
}
}
消費者2
獲取交換機中通配符為china.hubei.*.20210525的信息
- ("china.hubei.wuhan.20210525", "中國湖北武漢20210525天氣數據")
- ("china.hubei.xiangyang.20210525", "中國湖北襄陽20210525天氣數據")
public class WeatherTopic
{
public static void Weather()
{
using (var connection = RabbitUtils.GetConnection().CreateConnection())
{
using (var channel = connection.CreateModel())
{
/*
* 生產者發送消息
* 隊列名稱
* 交換機名稱
* 路由key
*
*/
channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic);
// 聲明隊列信息
channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);
/*
* queueBind 用于將隊列與交換機綁定
* 參數1:隊列名
* 參數2:交換機名
* 參數3:路由Key(暫時用不到)
*/
channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.hubei.*.20210525");
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += ((model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"新浪收到的氣象信息:{message}");
channel.BasicAck(ea.DeliveryTag, false);
});
channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer);
Console.WriteLine("Press [Enter] to exit");
Console.Read();
}
}
}
}
最后得到的效果
- 百度獲取
china.#的信息 - 新浪獲取
china.hubei.*.20210525的信息

RPC

基本概念:
-
Callback queue 回調隊列,客戶端向服務器發送請求,服務器端處理請求后,將其處理結果保存在一個存儲體中。而客戶端為了獲得處理結果,那么客戶在向服務器發送請求時,同時發送一個回調隊列地址reply_to。
-
Correlation id 關聯標識,客戶端可能會發送多個請求給服務器,當服務器處理完后,客戶端無法辨別在回調隊列中的響應具體和那個請求時對應的。為了處理這種情況,客戶端在發送每個請求時,同時會附帶一個獨有correlation_id屬性,這樣客戶端在回調隊列中根據correlation_id字段的值就可以分辨此響應屬于哪個請求。
流程說明:
- 當客戶端啟動的時候,它創建一個匿名獨享的回調隊列。
- 在 RPC 請求中,客戶端發送帶有兩個屬性的消息:一個是設置回調隊列的 reply_to 屬性,另一個是設置唯一值的 correlation_id 屬性。
- 將請求發送到一個 rpc_queue 隊列中。
- 服務器等待請求發送到這個隊列中來。當請求出現的時候,它執行他的工作并且將帶有執行結果的消息發送給 reply_to 字段指定的隊列。
- 客戶端等待回調隊列里的數據。當有消息出現的時候,它會檢查 correlation_id 屬性。如果此屬性的值與請求匹配,將它返回給應用
分享幾題面試題
RabbitMQ中消息可能有的幾種狀態?
-
alpha: 消息內容(包括消息體、屬性和 headers) 和消息索引都存儲在內存中 。
- beta: 消息內容保存在磁盤中,消息索引保存在內存中。
- gamma: 消息內容保存在磁盤中,消息索引在磁盤和內存中都有 。
- delta: 消息內容和索引都在磁盤中 。
-
死信隊列?
DLX,全稱為 Dead-Letter-Exchange,死信交換器,死信郵箱。當消息在一個隊列中變成死信 (dead message) 之后,它能被重新被發送到另一個交換器中,這個交換器就是 DLX,綁定 DLX 的隊列就稱之 為死信隊列。
-
導致的死信的幾種原因?
- 消息被拒(Basic.Reject /Basic.Nack) 且 requeue = false。
- 消息TTL過期。
- 隊列滿了
到這里就結束,大家如果需要看視頻學習就是點最上面的鏈接就行了
源碼:github

浙公網安備 33010602011771號