<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      redis實現延時隊列的兩種方式

      背景

      項目中的流程監控,有幾種節點,需要監控每一個節點是否超時。按傳統的做法,肯定是通過定時任務,去掃描然后判斷,但是定時任務有缺點:1,數據量大會慢;2,時間不好控制,太短,怕一次處理不完,太長狀態就會有延遲。所以就想到用延遲隊列的方式去實現。

      一,redis的過期key監控

      1,開啟過期key監聽

      在redis的配置里把這個注釋去掉

      notify-keyspace-events Ex

      然后重啟redis

      2,使用redis過期監聽實現延遲隊列

      繼承KeyExpirationEventMessageListener類,實現父類的方法,就可以監聽key過期時間了。當有key過期,就會執行這里。這里就把需要的key過濾出來,然后發送給kafka隊列。

      @Component
      @Slf4j
      public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
      
        @Autowired
        private KafkaProducerService kafkaProducerService;
      
        public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
          super(listenerContainer);
        }
      
        /**
         * 針對 redis 數據失效事件,進行數據處理
         * @param message
         * @param pattern
         */
        @Override
        public void onMessage(Message message, byte[] pattern){
          if(message == null || StringUtils.isEmpty(message.toString())){
            return;
          }
          String content = message.toString();
          //key的格式為  flag:時效類型:運單號 示例如下
          try {
            if(content.startsWith(AbnConstant.EMS)){
              kafkaProducerService.sendMessageSync(TopicConstant.EMS_WAYBILL_ABN_QUEUE,content);
            }else if(content.startsWith(AbnConstant.YUNDA)){
              kafkaProducerService.sendMessageSync(TopicConstant.YUNDA_WAYBILL_ABN_QUEUE,content);
            }
          } catch (Exception e) {
            log.error("監控過期key,發送kafka異常,",e);
          }
        }
      }

      可以看的出來,這種方式其實是很簡單的,但是有幾個問題需要注意,一是,這個盡量單機運行,因為多臺機器都會執行,浪費cpu,增加數據庫負擔。二是,機器頻繁部署的時候,如果有時間間隔,會出現數據的漏處理。

      二,redis的zset實現延遲隊列

      1,生產者實現

      可以看到生產者很簡單,其實就是利用zset的特性,給一個zset添加元素而已,而時間就是它的score。

      public void produce(Integer taskId, long exeTime) {
        System.out.println("加入任務, taskId: " + taskId + ", exeTime: " + exeTime + ", 當前時間:" + LocalDateTime.now());
        RedisOps.getJedis().zadd(RedisOps.key, exeTime, String.valueOf(taskId));
      }
      
      

      2,消費者實現

      消費者的代碼也不難,就是把已經過期的zset中的元素給刪除掉,然后處理數據。

      public void consumer() {
        Executors.newSingleThreadExecutor().submit(new Runnable() {
          @Override
          public void run() {
            while (true) {
              Set<String> taskIdSet = RedisOps.getJedis().zrangeByScore(RedisOps.key, 0, System.currentTimeMillis(), 0, 1);
              if (taskIdSet == null || taskIdSet.isEmpty()) {
                System.out.println("沒有任務");
       
              } else {
                taskIdSet.forEach(id -> {
                  long result = RedisOps.getJedis().zrem(RedisOps.key, id);
                  if (result == 1L) {
                    System.out.println("從延時隊列中獲取到任務,taskId:" + id + " , 當前時間:" + LocalDateTime.now());
                  }
                });
              }
              try {
                TimeUnit.MILLISECONDS.sleep(100);
              } catch (InterruptedException e) {
                e.printStackTrace();
              }
            }
          }
        });
      }

      可以看到這種方式其實是比上個方式要好的。因為,他的那兩個缺點都被克服掉了。多臺機器也沒事兒,也不用再擔心部署時間間隔長的問題。

      posted @ 2021-08-03 08:26  不撞南墻  閱讀(1209)  評論(0)    收藏  舉報
      Live2D
      主站蜘蛛池模板: 精品一区二区不卡无码AV| 国产影片AV级毛片特别刺激| 亚洲天堂亚洲天堂亚洲天堂| 潍坊市| 国产精品99久久久久久www| 国产免费又黄又爽又色毛| 国产精品白浆在线观看免费| 国产成人免费高清激情视频| 高淳县| 日韩国产欧美精品在线| 亚洲午夜福利AV一区二区无码| 抚顺县| 99久久99这里只有免费费精品 | 99re6这里有精品热视频 | 国产性色的免费视频网站| 欧美成人性色一区欧美成人性色区| 富宁县| 一个色综合亚洲热色综合| 国产精品无码无需播放器| 国产中文字幕一区二区| 人妻少妇精品视频三区二区| 欧美精品一产区二产区| 国产精品日韩av在线播放| 方正县| 久久精品免费自拍视频| 国产鲁鲁视频在线观看| 旅游| 亚洲区一区二区激情文学| 麻豆国产成人AV在线播放| 国产又色又爽又高潮免费| 亚洲欧美日韩综合在线丁香| 亚洲精品美女久久7777777| 日韩欧美国产aⅴ另类| 亚洲国产精品成人av网| 国99久9在线 | 免费| 亚洲AV永久无码一区| 色爱综合激情五月激情| 国产成人亚洲精品在线看| 毛片亚洲AV无码精品国产午夜| 青青狠狠噜天天噜日日噜| 少妇高潮水多太爽了动态图|