【RabbitMQ】主題(Topics)與主題交換機(Topic Exchange)
本章目標
-
理解主題交換機(Topic Exchange)的強大路由能力。
-
掌握通配符
*和#的使用規則。 -
學習基于模式匹配的復雜消息路由。
-
實現一個支持多維度過濾的智能消息系統。
一、理論部分
1. 主題交換機(Topic Exchange)簡介
主題交換機是RabbitMQ中最靈活也是最強大的交換機類型。它結合了扇形交換機的廣播能力和直連交換機的精確匹配能力,同時引入了模式匹配的概念。
主題交換機的工作方式:
-
消息仍然帶有路由鍵(Routing Key),但路由鍵必須是由點號分隔的單詞列表(如:
usa.news、europe.weather.alert)。 -
隊列通過綁定鍵(Binding Key) 綁定到交換機,綁定鍵也使用相同的點號分隔格式。
-
綁定鍵支持兩種通配符進行模式匹配。
2. 通配符規則
主題交換機的強大之處在于綁定鍵支持通配符:
-
*(星號):匹配恰好一個單詞-
示例:
*.orange.*可以匹配quick.orange.rabbit,但不能匹配quick.orange.fox.jumps
-
-
#(井號):匹配零個或多個單詞-
示例:
lazy.#可以匹配lazy、lazy.fox、lazy.brown.fox、lazy.pink.fox.jumps.over
-
3. 路由鍵格式最佳實踐
路由鍵通常采用層次結構,便于模式匹配:
-
<facility>.<severity>:auth.info、kernel.error -
<region>.<service>.<event>:usa.payment.success、europe.order.cancelled -
<category>.<subcategory>.<action>:news.sports.update、weather.alert.severe
4. 使用場景
主題交換機適用于需要復雜、靈活的消息路由場景:
-
新聞訂閱系統:用戶可以根據興趣訂閱特定主題(如
sports.*、*.finance) -
物聯網設備監控:按設備類型、地理位置、告警級別路由消息
-
微服務事件總線:基于事件類型和來源進行精細路由
二、實操部分:構建智能新聞分發系統
我們將構建一個新聞分發系統,其中:
-
生產者發送帶有分類路由鍵的新聞消息
-
消費者可以根據興趣訂閱特定模式的新聞
第1步:創建項目
-
創建一個新的解決方案。
-
添加一個控制臺應用程序項目作為生產者:
EmitLogTopic。 -
添加多個消費者項目:
-
ReceiveNewsAll- 接收所有新聞 -
ReceiveSportsNews- 接收所有體育新聞 -
ReceiveUSNews- 接收所有美國新聞 -
ReceiveCriticalAlerts- 接收所有緊急警報 -
ReceiveWeatherUpdates- 接收所有天氣更新
-
-
為所有項目添加
RabbitMQ.ClientNuGet包。
第2步:編寫新聞生產者(EmitLogTopic.cs)
using System.Text; using RabbitMQ.Client; var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { // 聲明主題交換機 channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic); // 路由鍵格式:<category>.<region>.<severity> // 示例:news.usa.info, sports.europe.alert, weather.asia.critical var routingKey = (args.Length > 0) ? args[0] : "anonymous.info"; var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "topic_logs", routingKey: routingKey, basicProperties: null, body: body); Console.WriteLine($" [x] Sent '{routingKey}':'{message}'"); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine();
第3步:編寫接收所有新聞的消費者(ReceiveNewsAll.cs)
using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic); var queueName = channel.QueueDeclare().QueueName; // 使用 # 匹配所有消息 channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "#"); Console.WriteLine($" [*] Waiting for ALL news. Queue: {queueName}"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($" [x] [ALL] Received '{ea.RoutingKey}':'{message}'"); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }
第4步:編寫接收體育新聞的消費者(ReceiveSportsNews.cs)
using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic); var queueName = channel.QueueDeclare().QueueName; // 匹配所有體育相關的新聞:sports.*.* channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "sports.#"); Console.WriteLine($" [*] Waiting for SPORTS news. Queue: {queueName}"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($" [x] [SPORTS] Received '{ea.RoutingKey}':'{message}'"); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }
第5步:編寫接收美國新聞的消費者(ReceiveUSNews.cs)
using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic); var queueName = channel.QueueDeclare().QueueName; // 匹配所有美國相關的新聞:*.usa.* channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "*.usa.*"); Console.WriteLine($" [*] Waiting for USA news. Queue: {queueName}"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($" [x] [USA] Received '{ea.RoutingKey}':'{message}'"); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }
第6步:編寫接收緊急警報的消費者(ReceiveCriticalAlerts.cs)
using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic); var queueName = channel.QueueDeclare().QueueName; // 匹配所有緊急級別的消息:*.*.critical channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "*.*.critical"); Console.WriteLine($" [*] Waiting for CRITICAL alerts. Queue: {queueName}"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($" [x] [CRITICAL] Received '{ea.RoutingKey}':'{message}'"); Console.WriteLine(" -> Sending emergency notification!"); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }
第7步:編寫接收天氣更新的消費者(ReceiveWeatherUpdates.cs)
using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic); var queueName = channel.QueueDeclare().QueueName; // 匹配所有天氣相關的更新:weather.* // 一個隊列可以綁定多個模式 channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "weather.#"); channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "*.alert"); // 也接收所有警報 Console.WriteLine($" [*] Waiting for WEATHER updates and ALERTS. Queue: {queueName}"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($" [x] [WEATHER/ALERT] Received '{ea.RoutingKey}':'{message}'"); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }
第8步:運行與演示
-
啟動所有消費者
打開六個終端窗口,分別運行所有消費者程序。 -
發送各種類型的新聞消息
cd EmitLogTopic # 發送體育新聞 dotnet run "sports.usa.score" "Team USA wins gold medal" dotnet run "sports.europe.update" "Champions League finals scheduled" # 發送美國相關新聞 dotnet run "news.usa.politics" "Election results announced" dotnet run "tech.usa.innovation" "Silicon Valley startup raises $10M" # 發送緊急警報 dotnet run "weather.usa.critical" "Tornado warning for Midwest" dotnet run "safety.europe.critical" "Security alert: System maintenance" # 發送天氣更新 dotnet run "weather.asia.update" "Monsoon season begins" dotnet run "news.europe.alert" "Breaking: Major announcement" # 發送其他消息 dotnet run "entertainment.hollywood.gossip" "Celebrity wedding announced"
-
觀察路由結果并分析模式匹配
消息路由鍵 ALL SPORTS USA CRITICAL WEATHER/ALERT sports.usa.score? ? ? ? ? sports.europe.update? ? ? ? ? news.usa.politics? ? ? ? ? tech.usa.innovation? ? ? ? ? weather.usa.critical? ? ? ? ? safety.europe.critical? ? ? ? ? ( *.alert)weather.asia.update? ? ? ? ? news.europe.alert? ? ? ? ? ( *.alert)entertainment.hollywood.gossip? ? ? ? ? -
測試復雜場景
-
發送
weather.alert.severe.critical- 觀察哪些消費者能收到 -
發送
sports.alert- 測試多個模式的匹配 -
在管理后臺查看綁定關系,理解通配符的實際效果
-
第9步:通配符規則詳解示例
為了更好理解通配符,讓我們看一些匹配示例:
綁定鍵 *.orange.* 的匹配情況:
-
?
quick.orange.rabbit(匹配) -
?
lazy.orange.elephant(匹配) -
?
quick.orange.fox.lazy(不匹配 - 四個單詞) -
?
orange(不匹配 - 只有一個單詞) -
?
quick.brown.fox(不匹配 - 中間不是orange)
綁定鍵 lazy.# 的匹配情況:
-
?
lazy(匹配) -
?
lazy.fox(匹配) -
?
lazy.brown.fox(匹配) -
?
lazy.pink.fox.jumps.over(匹配) -
?
quick.lazy.fox(不匹配 - 第一個單詞不是lazy)
本章總結
在這一章中,我們深入學習了RabbitMQ中最強大的主題交換機,掌握了基于模式匹配的復雜消息路由:
-
主題交換機(Topic Exchange):理解了基于通配符的模式匹配路由機制。
-
通配符規則:掌握了
*(匹配一個單詞)和#(匹配零個或多個單詞)的使用方法。 -
路由鍵設計:學習了使用點號分隔的層次化路由鍵設計最佳實踐。
-
復雜路由場景:實現了支持多維度過濾的智能新聞分發系統。
-
多重模式綁定:掌握了單個隊列綁定多個模式的高級用法。
主題交換機提供了無與倫比的靈活性,是構建復雜事件驅動系統的理想選擇。在下一章,我們將轉向另一個重要主題:消息可靠性保障,學習如何確保消息在復雜的分布式環境中絕不丟失。

浙公網安備 33010602011771號