<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Redis Stream實現全部節點機器推送消息

      背景

      有時候,在微服務時代,我們需要對全部的機器節點進行通知。在常規情況下,一個請求經過負載均衡只有一個機器可以收到。那么,如何能讓全部的機器都收到同樣的請求呢?需要借助消息隊列的監聽機制,讓每個節點都監聽一個隊列,讓消息發送到所有的隊列中。

      rabbit MQ的fanout交換機可以實現這種功能。

      那么,如果想用redis去實現這個功能,有沒有什么好的選擇呢?畢竟僅僅為了一個全部節點的推送,就引入另外一個中間件,不是一個很經濟選擇。

      那么Redis的Stream結構就是一個可以選擇的了。

      實現

      對于Redis的Stream結構,誕生之初就是為了用作消息隊列的。具體用法如下:

      • 發送消息
      public void sendConfigMessage() {
              MapRecord<String, String, String> entries = StreamRecords.mapBacked(Collections.singletonMap("msg", "plsGet")).withStreamKey(RedisConfig.stream);
              // 將消息添加至消息隊列中
              redisTemplate.opsForStream().add(entries);
      }
      
      • 建立監聽
          @Bean
          public Subscription subscription(RedisConnectionFactory factory) {
              Set<String> keys = redisTemplate.keys(streamPattern);
              if (keys != null && keys.size() != 0) {
                  keys = keys.stream().filter(key -> !key.equals(stream)).collect(Collectors.toSet());
                  if (keys.size() != 0) {
                      redisTemplate.delete(keys);
                  }
              }
      
              group = UUID.randomUUID().toString();
              if (Boolean.FALSE.equals(redisTemplate.hasKey(stream))) {
                  StringRecord stringRecord = StreamRecords.string(Collections.singletonMap("msg", "init")).withStreamKey(RedisConfig.stream);
                  redisTemplate.opsForStream().add(stringRecord);
              }
      
              redisTemplate.opsForStream().createGroup(stream, group);
              StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer
                      .StreamMessageListenerContainerOptions
                      .builder()
                      .pollTimeout(Duration.ofSeconds(1))
                      .build();
              StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(factory, options);
              Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(group, "consumer1"), StreamOffset.create(configStream, ReadOffset.lastConsumed()), configStreamListener);
              listenerContainer.start();
              return subscription;
          }
      

      對于每一臺機器,都讓它關聯唯一的消費組,而這個功能關聯唯一的stream key。Redis的stream機制在于,每一條給stream key發送的消息都會推送給所有的消費組,這樣所有的機器都會收到這條消息。

      一些問題

      Redis的連接和MQ還是有一些區別。當Redis連接超時之后,之前建立的監聽就不能用了,因為之前的長連接斷開了。

      一個解決檔案就是手動維持一個Netty的心跳機制,不停輪訓判斷當前的訂閱是否還處于活躍狀態。一旦不處于活躍狀態就要重新建立長連接:

          @Autowired
          Subscription subscription;
      
          @Autowired
          RedisConnectionFactory factory;
      
      
          @Autowired
          StreamListener streamListener;
      
          @Autowired
          RedisTemplate<String, Object> redisTemplate;
      
          //當Redis連接超時,自動重置stream隊列。否則監聽失效
          @Bean
          public ClientResources clientResources(){
      
              NettyCustomizer nettyCustomizer = new NettyCustomizer() {
      
                  @Override
                  public void afterChannelInitialized(Channel channel) {
                      channel.pipeline().addLast(
                              new IdleStateHandler(0, 0, 10));
      
                      channel.pipeline().addLast(new ChannelDuplexHandler() {
                          @Override
                          public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
                              if (subscription != null && !subscription.isActive()) {
                                  synchronized ("resetStreamLock") {
                                      if (!subscription.isActive()) {
                                          StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer
                                                  .StreamMessageListenerContainerOptions
                                                  .builder()
                                                  .pollTimeout(Duration.ofSeconds(1))
                                                  .build();
                                          StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer0 = StreamMessageListenerContainer.create(factory, options);
                                          Subscription subscription0 = listenerContainer0.receiveAutoAck(Consumer.from(RedisConfig.group, "consumer1"), StreamOffset.create(RedisConfig.stream, ReadOffset.lastConsumed()), streamListener);
                                          listenerContainer0.start();
                                          subscription = subscription0;
                                          log.info("reset getStream!");
                                      }
                                  }
                              }
                              if (evt instanceof IdleStateEvent) {
                                  ctx.disconnect();
                              }
                          }
                      });
                  }
      
                  @Override
                  public void afterBootstrapInitialized(Bootstrap bootstrap) {
      
                  }
      
              };
      
              return ClientResources.builder().nettyCustomizer(nettyCustomizer ).build();
          }
      
      posted @ 2022-09-29 14:46  imissinstagram  Views(681)  Comments(0)    收藏  舉報
      主站蜘蛛池模板: 国产免费播放一区二区三区| 在线观看视频一区二区三区| 加勒比无码人妻东京热| 国产极品粉嫩福利姬萌白酱| 亚洲免费视频一区二区三区| 日本熟妇大乳| 精品粉嫩国产一区二区三区| 人妻少妇偷人无码视频| 国产精品一区在线蜜臀| 综合色一色综合久久网| 亚洲人亚洲人成电影网站色 | 色偷偷女人的天堂亚洲网| 日本高清中文字幕一区二区三区| 色偷偷亚洲女人天堂观看| 亚洲av片在线免费观看| 天堂va欧美ⅴa亚洲va在线| 志丹县| 九九热在线精品视频观看| 亚洲理论在线A中文字幕| 国产亚洲999精品AA片在线爽| 亚洲五月丁香综合视频| 精品人妻免费看一区二区三区| 亚洲sm另类一区二区三区| 吴桥县| 欧美和黑人xxxx猛交视频| 人妻夜夜爽天天爽三区麻豆av | 午夜福利国产区在线观看| 日本伊人色综合网| 色8久久人人97超碰香蕉987| 菏泽市| 成人一区二区不卡国产| 中文成人无字幕乱码精品区| 中文字幕有码免费视频| 亚洲综合精品一区二区三区| 亚洲国产精品无码av| 日韩精品亚洲不卡一区二区 | 中文字幕av国产精品| 成人国产精品日本在线观看| 国产线播放免费人成视频播放| 成人午夜在线观看刺激| 欧美人禽zozo动人物杂交|