【RabbitMQ】核心模型簡介,以及消息的生產(chǎn)與消費(fèi)
本章學(xué)習(xí)目標(biāo)
-
理解AMQP模型中的核心概念:Connection, Channel, Producer, Consumer, Queue。
-
創(chuàng)建一個(gè).NET項(xiàng)目并添加RabbitMQ客戶端庫。
-
使用C#編寫代碼發(fā)送一條消息("Hello World")。
-
使用C#編寫代碼接收并處理這條消息。
一、理論部分
1. AMQP 0-9-1 核心模型簡介
在編寫代碼前,我們需要理解幾個(gè)核心概念,它們構(gòu)成了RabbitMQ一切功能的基礎(chǔ):
-
生產(chǎn)者 (Producer):發(fā)送消息的應(yīng)用程序。
-
消費(fèi)者 (Consumer):接收消息的應(yīng)用程序。
-
隊(duì)列 (Queue):一個(gè)類似于郵箱的存儲(chǔ)結(jié)構(gòu),位于RabbitMQ內(nèi)部,用于存儲(chǔ)消息。多個(gè)生產(chǎn)者可以向同一個(gè)隊(duì)列發(fā)送消息,多個(gè)消費(fèi)者也可以從同一個(gè)隊(duì)列接收消息。消息只能存儲(chǔ)在隊(duì)列中。
-
連接 (Connection):一個(gè)TCP連接,應(yīng)用程序通過它與RabbitMQ服務(wù)器建立網(wǎng)絡(luò)連接。創(chuàng)建連接的開銷較大。
-
通道 (Channel):建立在連接之上的虛擬連接。幾乎所有的操作都在通道中進(jìn)行。使用通道的原因是為了避免頻繁創(chuàng)建和銷毀TCP連接帶來的巨大開銷。一個(gè)連接可以包含多個(gè)通道。
簡單工作流:Producer -> (Connection -> Channel) -> Queue -> (Channel -> Connection) -> Consumer
2. RabbitMQ.Client 庫
這是RabbitMQ官方提供的.NET客戶端庫,它實(shí)現(xiàn)了AMQP協(xié)議,是我們與RabbitMQ服務(wù)器交互的橋梁。我們將通過NuGet包管理器來安裝它。
二、實(shí)操部分:創(chuàng)建"Hello World"
我們將創(chuàng)建兩個(gè)控制臺應(yīng)用程序:一個(gè)生產(chǎn)者(Send)和一個(gè)消費(fèi)者(Receive)。
第1步:創(chuàng)建項(xiàng)目并添加NuGet包
-
打開IDE(如Visual Studio或VS Code),創(chuàng)建一個(gè)新的解決方案(Solution)。
-
在該解決方案中,創(chuàng)建兩個(gè)新的控制臺應(yīng)用程序項(xiàng)目,分別命名為
Send和Receive。 -
為兩個(gè)項(xiàng)目添加
RabbitMQ.ClientNuGet包。-
Visual Studio:右鍵點(diǎn)擊項(xiàng)目 -> "Manage NuGet Packages..." -> 瀏覽 -> 搜索
RabbitMQ.Client-> 安裝。 -
.NET CLI:
cd /path/to/Send dotnet add package RabbitMQ.Client cd /path/to/Receive dotnet add package RabbitMQ.Client
-
第2步:編寫生產(chǎn)者(Send.cs)
將 Send 項(xiàng)目中的 Program.cs 替換為以下代碼。請務(wù)必將 hostName、userName 和 password 替換為在上1章中設(shè)置的值(如果按教程做,應(yīng)該是 localhost, myuser, mypassword)。
using System.Text; using RabbitMQ.Client; // 1. 創(chuàng)建連接工廠(ConnectionFactory)并設(shè)置連接參數(shù) var factory = new ConnectionFactory() { HostName = "localhost", // RabbitMQ服務(wù)器地址 UserName = "myuser", // 用戶名 Password = "mypassword" // 密碼 }; // 2. 使用工廠創(chuàng)建一個(gè)連接(Connection)和一個(gè)通道(Channel) // 'using' 語句確保在代碼塊結(jié)束時(shí),連接和通道會(huì)被正確關(guān)閉和釋放,這是很重要的。 using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { // 3. 聲明一個(gè)隊(duì)列。如果隊(duì)列不存在,則創(chuàng)建它。 // 參數(shù)說明: // queue: "hello" - 隊(duì)列的名稱 // durable: false - 隊(duì)列是否持久化(服務(wù)器重啟后是否存在) // exclusive: false - 是否為當(dāng)前連接的專用隊(duì)列(其他連接不能訪問) // autoDelete: false - 當(dāng)最后一個(gè)消費(fèi)者斷開后,隊(duì)列是否自動(dòng)刪除 channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); // 4. 準(zhǔn)備要發(fā)送的消息 string message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); // 將消息轉(zhuǎn)換為字節(jié)數(shù)組 // 5. 發(fā)布消息到隊(duì)列 // 參數(shù)說明: // exchange: "" - 使用默認(rèn)的(無名)交換機(jī) // routingKey: "hello" - 路由鍵,對于默認(rèn)交換機(jī),它指定了消息要發(fā)送到的隊(duì)列名稱 // basicProperties: null - 消息屬性(如持久化) // body: body - 消息體 channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); Console.WriteLine($" [x] Sent {message}"); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine();
代碼解析:
-
我們首先創(chuàng)建了一個(gè)連接工廠,并配置了連接到我們本地RabbitMQ服務(wù)器所需的參數(shù)。
-
然后,我們建立了連接和通道。這是與RabbitMQ交互的標(biāo)準(zhǔn)方式。
-
QueueDeclare是冪等的——它只會(huì)在隊(duì)列不存在時(shí)創(chuàng)建它。 -
默認(rèn)交換機(jī)(
"")是一個(gè)直連交換機(jī)(Direct Exchange),它會(huì)把消息路由到routingKey完全匹配的隊(duì)列中。所以這里routingKey: "hello"意味著消息會(huì)被投遞到名為hello的隊(duì)列。
第3步:運(yùn)行生產(chǎn)者并查看管理后臺
-
運(yùn)行
Send項(xiàng)目(在Visual Studio中按F5,或使用CLI命令dotnet run)。 -
會(huì)在控制臺看到
[x] Sent Hello World!。 -
現(xiàn)在,打開RabbitMQ管理后臺 (http://localhost:15672),登錄后點(diǎn)擊 Queues 標(biāo)簽頁。應(yīng)該能看到一個(gè)名為
hello的隊(duì)列,并且它下面有 1 條消息正準(zhǔn)備被消費(fèi)("Ready"狀態(tài))!
第4步:編寫消費(fèi)者(Receive.cs)
現(xiàn)在我們來編寫消費(fèi)者程序,從 hello 隊(duì)列中取出消息。
將 Receive 項(xiàng)目中的 Program.cs 替換為以下代碼:
using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; // 同生產(chǎn)者一樣,創(chuàng)建連接和通道 var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { // 聲明隊(duì)列。這一步也是必需的,以防我們先啟動(dòng)消費(fèi)者,而隊(duì)列還不存在。 channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); // 創(chuàng)建一個(gè)事件消費(fèi)者(EventingBasicConsumer)對象,并關(guān)聯(lián)到我們的通道 var consumer = new EventingBasicConsumer(channel); // 當(dāng)消費(fèi)者收到消息時(shí),觸發(fā)這個(gè)事件 consumer.Received += (model, ea) => { // 消息體是字節(jié)數(shù)組,我們需要將其轉(zhuǎn)換回字符串 var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($" [x] Received {message}"); }; // 開始消費(fèi)隊(duì)列中的消息 // 參數(shù)說明: // queue: "hello" - 要消費(fèi)的隊(duì)列名稱 // autoAck: true - 自動(dòng)確認(rèn)模式。如果為true,消息一旦被送達(dá),RabbitMQ會(huì)立即將其從隊(duì)列中標(biāo)記為刪除。 // consumer: consumer - 我們上面定義的消費(fèi)者對象 channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); // 保持程序運(yùn)行,持續(xù)監(jiān)聽消息 }
代碼解析:
-
前面的連接、通道、隊(duì)列聲明步驟與生產(chǎn)者完全一致,這確保了所需的隊(duì)列存在。
-
我們創(chuàng)建了一個(gè)
EventingBasicConsumer對象,并為它的Received事件訂閱了一個(gè)處理方法。每當(dāng)有消息到達(dá)時(shí),這個(gè)匿名方法就會(huì)被調(diào)用。 -
BasicConsume方法啟動(dòng)消費(fèi)過程。它將我們的消費(fèi)者注冊到指定的隊(duì)列。 -
autoAck: true表示自動(dòng)確認(rèn)模式。這意味著消費(fèi)者一收到消息,RabbitMQ就認(rèn)為它已成功處理并立即從隊(duì)列中刪除該消息。這是一種簡單的模式,但如果消費(fèi)者在處理消息過程中崩潰,消息就會(huì)丟失。我們將在下一章學(xué)習(xí)更可靠的手動(dòng)確認(rèn)模式。
第5步:運(yùn)行消費(fèi)者
-
運(yùn)行
Receive項(xiàng)目。 -
會(huì)立刻在控制臺看到
[x] Received Hello World!。消費(fèi)者程序取走了我們之前發(fā)送的消息并打印了出來。 -
再次查看管理后臺的 Queues 頁面,會(huì)發(fā)現(xiàn)
hello隊(duì)列中的消息數(shù)又變回了 0。
嘗試一下:
-
先運(yùn)行
Receive程序,讓它保持運(yùn)行并監(jiān)聽消息。 -
然后再運(yùn)行
Send程序多次。 -
觀察
Receive的控制臺,它會(huì)實(shí)時(shí)地打印出每一條新收到的消息。
總結(jié)
我們已經(jīng)成功實(shí)現(xiàn)了一個(gè)RabbitMQ應(yīng)用程序。在本文中,我們:
-
創(chuàng)建了.NET項(xiàng)目并引入了
RabbitMQ.Client庫。 -
理解了AMQP模型中的
Connection、Channel、Queue、Producer和Consumer等核心概念。 -
編寫了生產(chǎn)者代碼,成功向名為
hello的隊(duì)列發(fā)送了一條消息。 -
編寫了消費(fèi)者代碼,成功從隊(duì)列中取出了消息并進(jìn)行處理。
-
使用了管理后臺來驗(yàn)證消息的流動(dòng)。
這是一個(gè)最簡單的模型,生產(chǎn)者和消費(fèi)者直接與隊(duì)列打交道。在下一章,我們將學(xué)習(xí)如何實(shí)現(xiàn)工作隊(duì)列(Work Queue)與消息確認(rèn)(Ack),讓多個(gè)消費(fèi)者共同處理任務(wù),這將使我們的應(yīng)用變得更加實(shí)用和強(qiáng)大。

浙公網(wǎng)安備 33010602011771號