【RabbitMQ】路由(Routing)與直連交換機(Direct Exchange)
本章目標
-
理解直連交換機(Direct Exchange)的工作原理。
-
掌握基于路由鍵(Routing Key)的消息過濾機制。
-
學習多重綁定(Multiple Bindings)的概念。
-
實現一個可以根據日志級別進行過濾的智能日志系統。
一、理論部分
1. 直連交換機(Direct Exchange)
直連交換機是比扇形交換機更智能的路由器。它的工作邏輯很簡單:
-
消息帶著一個路由鍵(Routing Key) 被發送到直連交換機。
-
隊列通過一個綁定鍵(Binding Key) 綁定到交換機。
-
當綁定鍵與路由鍵完全匹配時,消息就會被路由到該隊列。
2. 路由鍵(Routing Key)與綁定鍵(Binding Key)
-
路由鍵(Routing Key):生產者發送消息時指定的一個字符串,用于描述消息的特征或類型。
-
綁定鍵(Binding Key):消費者在將隊列綁定到交換機時指定的字符串,用于聲明該隊列對哪些消息感興趣。
對于直連交換機,路由鍵和綁定鍵必須精確匹配(完全相等)。
3. 多重綁定(Multiple Bindings)
一個隊列可以綁定多個綁定鍵,同樣,多個隊列也可以用相同的綁定鍵綁定到同一個交換機。這提供了很大的靈活性:
-
一個隊列,多個興趣:隊列Q1可以同時綁定
error和warning鍵,接收所有錯誤和警告消息。 -
多個隊列,相同興趣:隊列Q1和Q2都可以綁定
info鍵,這樣它們都會收到所有信息級別的消息(類似于扇形交換機的效果,但僅限于特定路由鍵)。
4. 使用場景
直連交換機非常適合需要有選擇性接收消息的場景,比如:
-
日志級別過濾:只接收特定級別(error、warning、info)的日志。
-
業務類型路由:根據消息類型(order.created、payment.processed)路由到不同的處理服務。
-
優先級處理:將高優先級任務路由到專用隊列。
二、實操部分:構建智能日志系統
我們將改進第4章的日志系統,使其能夠根據日志級別(error、warning、info)進行智能路由。
第1步:創建項目
-
創建一個新的解決方案。
-
添加一個控制臺應用程序項目作為生產者:
EmitLogDirect。 -
添加三個控制臺應用程序項目作為消費者:
-
ReceiveLogsDirect- 接收所有日志 -
ReceiveWarningsAndErrors- 只接收警告和錯誤 -
ReceiveErrorsOnly- 只接收錯誤
-
-
為所有項目添加
RabbitMQ.ClientNuGet包。
第2步:編寫智能日志生產者(EmitLogDirect.cs)
using System.Text; using RabbitMQ.Client; var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { // 1. 聲明一個直連交換機(Direct Exchange) channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct); // 2. 從命令行參數獲取日志級別和消息內容 // 用法:dotnet run [severity] [message] // 示例:dotnet run error "Database connection failed" var severity = (args.Length > 0) ? args[0] : "info"; var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!"; var body = Encoding.UTF8.GetBytes(message); // 3. 發布消息到直連交換機,并指定路由鍵(日志級別) channel.BasicPublish(exchange: "direct_logs", routingKey: severity, // 關鍵:使用日志級別作為路由鍵 basicProperties: null, body: body); Console.WriteLine($" [x] Sent '{severity}':'{message}'"); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine();
關鍵點:
-
交換機類型改為
ExchangeType.Direct。 -
使用命令行第一個參數作為路由鍵(日志級別)。
-
在
BasicPublish中明確指定routingKey參數。
第3步:編寫接收所有日志的消費者(ReceiveLogsDirect.cs)
這個消費者會綁定三個不同的綁定鍵,從而接收所有級別的日志。
using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { // 聲明直連交換機 channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct); var queueName = channel.QueueDeclare().QueueName; // 綁定到三個日志級別:接收所有消息 // 注意:一個隊列可以綁定多個路由鍵 channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "info"); channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "warning"); channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "error"); Console.WriteLine($" [*] Waiting for ALL logs (info, warning, error). Queue: {queueName}"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($" [x] [ALL] Received '{ea.RoutingKey}':'{message}'"); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }
關鍵點:
-
一個隊列通過多次調用
QueueBind綁定了三個不同的路由鍵。 -
通過
ea.RoutingKey可以獲取消息的實際路由鍵。
第4步:編寫接收警告和錯誤的消費者(ReceiveWarningsAndErrors.cs)
using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct); var queueName = channel.QueueDeclare().QueueName; // 只綁定到warning和error級別 channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "warning"); channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "error"); Console.WriteLine($" [*] Waiting for WARNINGS and ERRORS only. Queue: {queueName}"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($" [x] [WARN+ERR] Received '{ea.RoutingKey}':'{message}'"); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }
第5步:編寫只接收錯誤的消費者(ReceiveErrorsOnly.cs)
using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct); var queueName = channel.QueueDeclare().QueueName; // 只綁定到error級別 channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "error"); Console.WriteLine($" [*] Waiting for ERRORS only. Queue: {queueName}"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($" [x] [ERRORS ONLY] Received '{ea.RoutingKey}':'{message}'"); // 模擬錯誤處理(比如發送警報郵件) Console.WriteLine(" -> Sending alert email to admin..."); Thread.Sleep(1000); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }
第6步:運行與演示
-
啟動所有消費者
打開四個終端窗口,分別運行:# 終端1 - 接收所有日志 cd ReceiveLogsDirect dotnet run # 終端2 - 接收警告和錯誤 cd ReceiveWarningsAndErrors dotnet run # 終端3 - 只接收錯誤 cd ReceiveErrorsOnly dotnet run # 終端4 - 用于發送消息 cd EmitLogDirect
-
查看管理后臺
訪問 http://localhost:15672,查看direct_logs交換機的綁定情況:-
你應該看到三個隊列,每個隊列有不同的綁定鍵組合。
-
ReceiveLogsDirect的隊列綁定了三個鍵。 -
ReceiveWarningsAndErrors的隊列綁定了兩個鍵。 -
ReceiveErrorsOnly的隊列只綁定了一個鍵。
-
-
發送不同級別的日志消息
# 發送info消息 dotnet run info "User login successful" # 發送warning消息 dotnet run warning "Database connection slow" # 發送error消息 dotnet run error "Payment service unavailable" # 發送另一個info消息 dotnet run info "Cache updated successfully"
-
觀察路由結果
觀察各個消費者的輸出:-
信息級別(info)消息:
-
ReceiveLogsDirect:? 接收(因為它綁定了info) -
ReceiveWarningsAndErrors:? 不接收 -
ReceiveErrorsOnly:? 不接收
-
-
警告級別(warning)消息:
-
ReceiveLogsDirect:? 接收 -
ReceiveWarningsAndErrors:? 接收 -
ReceiveErrorsOnly:? 不接收
-
-
錯誤級別(error)消息:
-
ReceiveLogsDirect:? 接收 -
ReceiveWarningsAndErrors:? 接收 -
ReceiveErrorsOnly:? 接收(并模擬發送警報)
-
-
-
測試多重綁定效果
再啟動一個ReceiveErrorsOnly消費者(第二個實例):cd ReceiveErrorsOnly dotnet run發送一條error消息,你會發現兩個
ReceiveErrorsOnly實例都會收到消息,因為它們都使用相同的綁定鍵綁定到了同一個交換機。這展示了直連交換機也支持"一個路由鍵,多個隊列"的廣播式路由。
本章總結
在這一章中,我們學習了比扇形交換機更智能的直連交換機,并實現了精確的消息路由:
-
直連交換機(Direct Exchange):基于路由鍵和綁定鍵的精確匹配進行消息路由。
-
路由鍵與綁定鍵:理解了生產者指定路由鍵、消費者指定綁定鍵的分工。
-
多重綁定:掌握了隊列可以綁定多個路由鍵,以及多個隊列可以綁定相同路由鍵的靈活配置。
-
選擇性消息消費:實現了消費者只接收特定類型消息的智能日志系統。
直連交換機提供了精確的路由控制,但有時候我們需要更靈活的模式匹配。在下一章,我們將學習功能最強大的主題交換機(Topic Exchange),它支持通配符匹配,能夠實現極其復雜的消息路由規則。

浙公網安備 33010602011771號