RabbitMQ 入門 (Go) - 5. 使用 Fanout Exchange 做服務發現(下)

到目前為止,我一直專注于如何讓消息進出消息代理,也就是RabbitMQ。
實際上,我們可以繼續使用 RabbitMQ 和它的 Exchanges 來連接這個應用程序的其他部分,但是我想探索一個稍微不同的模型:我想使用協調器來跟蹤哪些類型的消費者得到消息通知。
這樣的話,我斷開了傳感器數據生成器和數據使用者之間的連接。
同時為了處理這些數據通信,我決定使用事件(event)來通知用戶系統中正在發生的事情,并讓他們決定是否要處理數據。
其原理大致如下:
-
在協調器內部,我們有構建好的 QueueListener。
-
我還需要構建另外一個類型,我叫它 EventAggregator。
-
來自RabbitMQ 的消息,它將通過一個異步的goroutine 進入QueueListener
-
goroutine 將把消息傳輸到一個事件對象(event object)中,并通過事件聚合對象(event aggregation object)進行廣播。
-
該對象將維護任何對事件感興趣的使用者的注冊表,并向其發送事件對象的副本。
-
這使我們能夠通過將數據轉儲到下游的 Queue 來為這些事件注冊其他應用程序,但它也可以讓使用者能夠在協調器內部進行設置,例如日志系統。
-
最后,如果使用者最終要通過 Queue 將數據發送到另一個應用程序,則可以對其進行預處理,以添加有用的附加數據,而最終使用者不必知道這些附加信息是如何到達那里的。
編寫代碼
創建 EventAggregator
在 coordinator 目錄下添加 eventaggregator.go,代碼如下:

-
第 28 行,建立 EventData struct,目前它的字段碰巧和 SensorMessage 是一樣的,但是兩個 struct 的職責不同,所以我們不復用 SensorMessage,而是單獨建立 EventData,以便它們以后可以獨立的進化;
-
第 5 行,建立了 EventAggregator struct,也就是事件聚合,它只有一個 listeners 字段,是一個 map,它的 key 是事件的名稱,它的值是回調函數的集合。當事件發生的時候,EventAggregator 就輪流調用為該事件注冊的回調函數;
-
第 9 行,就是 EventAggregator 的構造函數;
-
第 16 行,AddListener 方法,使用者通過該方法可以向 EventAggregator 注冊回調函數;
-
第 20 行,PublishEvent 方法用來發布事件。它接收事件名稱和事件的數據作為參數。這里需要判斷 EventAggregator 里是否已經注冊了該事件,如果注冊了,那么遍歷其對應的回調函數,并使用事件數據進行調用。
-
調用回調函數時,使用的不是 EventData 的指針,而是 EventData 的副本,這可以保證使用者不會把事件數據搞亂,影響其它使用者
-
取消訂閱的功能我就不做了。
把 EventAggregator 連接到 QueueListener
打開 queuelistener.go,添加代碼:

-
第19 行,在QueueListener struct 里面添加字段ea,類型是 *EventAggregator;
-
第 25 行,在 QueueListener 的構造函數里為 ea 自讀賦初始值。

在 AddListener 方法里,原來只是把原始數據打印到控制臺。現在添加如下代碼:
-
創建一個 EventData,其字段內容目前和傳感器的消息內容一樣;
-
使用 QueueListener 上的 EventAggregator 發布事件:
-
事件的名稱是 MessageReceived_傳感器名稱
-
第二個參數就是事件數據
發現早已運行的傳感器
最后我們要做的就是如何讓協調器發現在協調器上線前就已經在運行的傳感器。

目前我們的做法是這樣的:首先協調器先運行,然后傳感器在上線的時候立即把它們的數據Queue 發送過去,使用的是 Fanout Exchange,這樣多個協調器都可以被通知到。
但是,如果傳感器先運行,協調器后運行,那么協調器就無法知道傳感器的存在,為了解決這個問題,我這樣做:
-
我在消息代理中也就是 RabbitMQ 里,建立一個新的 Exchange,它是一個 Fanout Exchange,它和其它信息流的方向正好相反。
-
在這里,協調器將會向這個 Fanout Exchange 發出一個“發現”請求,這個信息將會發送給所有的傳感器。
-
傳感器接收到這個“發現”請求信息后,將會響應,將它們的數據 Queue 的名稱發送給我們以前建立的那個 Fanout Exchange(中間黃色的)。
-
這里會出現一些冗余的信息,但協調器里有過濾機制,所以就這樣吧。
我們首先測試一下先運行傳感器項目,再運行協調器項目的效果:

可以看到,協調器運行起來以后,沒有接收到該傳感器的數據。
修改 queuetools
我們要解決的就是這個問題,下面看代碼,首先看 queuetools.go:

這里改動不多,就是把要新建立的 Fanout Exchange 的名稱作為常量存在這里。
注意之前在這里定義的 SensorListQueue 已經不需要了,可以刪掉。
修改 queuelistener
然后看 queuelistener.go,在這里為 QueueListener 添加一個DiscoverSensors 方法:

該方法中首先我使用了 ExchangeDeclare 方法來聲明這個新的 Exchange,并進行設置。
雖然項目中還沒用過這個方法,但是里面大多數參數的作用你應該能夠猜得出來:
-
name:Exchange 的名稱
-
kind:Exchange 的類型,可以是 direct、topic、header 或者 fanout,這里使用 fanout
-
durable:表示這個 Exchange 是否可持久
-
autoDelete:表示在沒有綁定的情況下是否刪除 Exchange
-
internal:這個參數我們還沒見過,如果想拒絕外部的發布請求,就把這個設為 true。這可以在高級場景中使用,在高級場景中,Exchange 綁定在一起,在消息代理中形成更復雜的拓撲。
-
noWait 和 args 就不介紹了。
現在,協調器可以向這個 Exchange 發布消息了。而我們只需要向它發送一個消息即可,并沒有什么具體的內容要發送,所以我發布了一個空的 Publishing,這就可以告訴瀏覽器我在尋找它們了。
修改傳感器
下面我們讓傳感器(sensor.go)對上面發布的“發現”請求進行響應,不過首先,需要重構一下。
把 main 函數里面當傳感器上面時,發布數據 Queue 名稱那部分代碼提取出來放在單獨的一個函數里面:

然后在 main 函數相應的位置進行調用:

-
第 39 行,對重構的函數進行調用。
-
第 41 行,創建一個 Queue
-
第 42 行,使用 QueueBind 方法將這個 Queue 和 SensorDiscovery Exchange
-
第 48 行,創建goroutine 運行一個將要新建的函數 listenForDiscoveryRequests。通過使用 goroutine,無論當請求什么時候進來,這部分邏輯都將可用,而且不會阻塞系統的其余部分。這里需要傳入 Queue 的名稱和 Channel。
然后看一下 listenForDiscoveryRequests 函數:

這里使用 Channel 的 Consume 方法對 Channel進行設置以便能接收“發現”請求。
然后用 for range 來接收“發現”請求。這里忽略消息本身即可,因為該消息就是一個觸發而已。當消息進來時,調用剛剛重構出來的 publishQueueName 函數即可。
在 queuelistener 里調用發現方法
在 queuelistener.go 的 ListenForNewSource 方法里,在如下位置調用 DiscoverSensors 方法:

為什么在這里調用?因為這是可以保證協調器正在監聽傳感器路由的消息的第一個地方。
運行測試
先運行一個傳感器,然后在運行協調器:

傳感器這里我使用了 freq 參數,讓其每兩秒鐘生成一個數據。
可以看到,在這種情況下協調器也可以發現已經運行的傳感器并接收數據了。
你可以運行多個傳感器和多個協調器,應該也會好用的。
這也是一種非常簡單的分布式應用吧。


浙公網安備 33010602011771號