Redis Stream實現全部節點機器推送消息
背景
有時候,在微服務時代,我們需要對全部的機器節點進行通知。在常規情況下,一個請求經過負載均衡只有一個機器可以收到。那么,如何能讓全部的機器都收到同樣的請求呢?需要借助消息隊列的監聽機制,讓每個節點都監聽一個隊列,讓消息發送到所有的隊列中。
rabbit MQ的fanout交換機可以實現這種功能。
那么,如果想用redis去實現這個功能,有沒有什么好的選擇呢?畢竟僅僅為了一個全部節點的推送,就引入另外一個中間件,不是一個很經濟選擇。
那么Redis的Stream結構就是一個可以選擇的了。
實現
對于Redis的Stream結構,誕生之初就是為了用作消息隊列的。具體用法如下:
- 發送消息
public void sendConfigMessage() {
MapRecord<String, String, String> entries = StreamRecords.mapBacked(Collections.singletonMap("msg", "plsGet")).withStreamKey(RedisConfig.stream);
// 將消息添加至消息隊列中
redisTemplate.opsForStream().add(entries);
}
- 建立監聽
@Bean
public Subscription subscription(RedisConnectionFactory factory) {
Set<String> keys = redisTemplate.keys(streamPattern);
if (keys != null && keys.size() != 0) {
keys = keys.stream().filter(key -> !key.equals(stream)).collect(Collectors.toSet());
if (keys.size() != 0) {
redisTemplate.delete(keys);
}
}
group = UUID.randomUUID().toString();
if (Boolean.FALSE.equals(redisTemplate.hasKey(stream))) {
StringRecord stringRecord = StreamRecords.string(Collections.singletonMap("msg", "init")).withStreamKey(RedisConfig.stream);
redisTemplate.opsForStream().add(stringRecord);
}
redisTemplate.opsForStream().createGroup(stream, group);
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(factory, options);
Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(group, "consumer1"), StreamOffset.create(configStream, ReadOffset.lastConsumed()), configStreamListener);
listenerContainer.start();
return subscription;
}
對于每一臺機器,都讓它關聯唯一的消費組,而這個功能關聯唯一的stream key。Redis的stream機制在于,每一條給stream key發送的消息都會推送給所有的消費組,這樣所有的機器都會收到這條消息。
一些問題
Redis的連接和MQ還是有一些區別。當Redis連接超時之后,之前建立的監聽就不能用了,因為之前的長連接斷開了。
一個解決檔案就是手動維持一個Netty的心跳機制,不停輪訓判斷當前的訂閱是否還處于活躍狀態。一旦不處于活躍狀態就要重新建立長連接:
@Autowired
Subscription subscription;
@Autowired
RedisConnectionFactory factory;
@Autowired
StreamListener streamListener;
@Autowired
RedisTemplate<String, Object> redisTemplate;
//當Redis連接超時,自動重置stream隊列。否則監聽失效
@Bean
public ClientResources clientResources(){
NettyCustomizer nettyCustomizer = new NettyCustomizer() {
@Override
public void afterChannelInitialized(Channel channel) {
channel.pipeline().addLast(
new IdleStateHandler(0, 0, 10));
channel.pipeline().addLast(new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (subscription != null && !subscription.isActive()) {
synchronized ("resetStreamLock") {
if (!subscription.isActive()) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer0 = StreamMessageListenerContainer.create(factory, options);
Subscription subscription0 = listenerContainer0.receiveAutoAck(Consumer.from(RedisConfig.group, "consumer1"), StreamOffset.create(RedisConfig.stream, ReadOffset.lastConsumed()), streamListener);
listenerContainer0.start();
subscription = subscription0;
log.info("reset getStream!");
}
}
}
if (evt instanceof IdleStateEvent) {
ctx.disconnect();
}
}
});
}
@Override
public void afterBootstrapInitialized(Bootstrap bootstrap) {
}
};
return ClientResources.builder().nettyCustomizer(nettyCustomizer ).build();
}
本文來自博客園,作者:imissinstagram,轉載請注明原文鏈接:http://www.rzrgm.cn/LostSecretGarden/p/16741552.html

浙公網安備 33010602011771號