<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      轉:基于Redis實現(xiàn)延時隊列

      摘要:使用 sortedset,拿時間戳作為score,消息內(nèi)容作為 key 調用 zadd 來生產(chǎn)消息,消費者用 zrangebyscore 指令獲取 N 秒之前的數(shù)據(jù)輪詢進行處理。

      ?? 前段時間做一個項目,需要各種定時任務處理會話狀態(tài),處理訂單狀態(tài),然后需求不停的變,修修改改就覺得很麻煩,就去了解了一下有沒有什么便捷的方式取代繁瑣的定時任務,于是就找到了延遲隊列的這種實現(xiàn)方式。

      一、應用場景

      • 訂單超過 30 分鐘未支付,則自動取消。
      • 訂單一些評論,如果48h用戶未對商家評論,系統(tǒng)會自動產(chǎn)生一條默認評論。
      • 外賣商家超時未接單,則自動取消訂單。
      • 醫(yī)生搶單電話點診,超過 30 分鐘未打電話,則自動退款。

      ?? 如上場景都可以用定時任務去輪詢實現(xiàn),但是當數(shù)據(jù)量過大的時候,高頻輪詢數(shù)據(jù)庫會消耗大量的資源,此時用延遲隊列來應對這類場景比較好。

      二、需求

      ?? 根據(jù)自身業(yè)務和公司情況,如果實現(xiàn)一個自己的延時隊列服務需要考慮一下幾點:

      • 消息存儲
      • 過期延時消息實時獲取
      • 高可用性

      三、為什么使用 Redis 實現(xiàn)?

      ?? 延時隊列就是一種帶有延遲功能的消息隊列。下面會介紹幾種目前已有的延時隊列。

      3.1、RabbitMQ 延時隊列

      ?? 優(yōu)點:消息持久化,分布式。
      ?? 缺點:延時相同的消息必須扔在同一個隊列,每一種延時就需要建立一個隊列。因為當后面的消息比前面的消息先過期,還是只能等待前面的消息過期,這里的過期檢測是惰性的。
      ?? 使用: RabbitMQ 可以針對 Queue 設置 x-expires 或者針對 Message 設置 x-message-ttl ,來控制消息的生存時間(可以根據(jù) Queue 來設置,也可以根據(jù) message 設置), Queue 還可以配置 x-dead-letter-exchange 和 x-dead-letter-routing-key(可選)兩個參數(shù),如果隊列內(nèi)出現(xiàn)了 dead letter ,則按照這兩個參數(shù)重新路由轉發(fā)到指定的隊列,此時就可以實現(xiàn)延時隊列了。

      RabbitMQ天然具備分布式的特性,可以很好的用在多服務。

      3.2、DelayQueue 延時隊列

      ?? 優(yōu)點:無界、延遲、阻塞隊列
      ?? 缺點:非持久化
      ?? 介紹:JDK 自帶的延時隊列,沒有過期元素的話,使用 poll() 方法會返回 null 值,超時判定是通過getDelay(TimeUnit.NANOSECONDS) 方法的返回值小于等于0來判斷,并且不能存放空元素。
      ?? 使用:getDelay 方法定義了剩余到期時間,compareTo 方法定義了元素排序規(guī)則。poll() 是非阻塞的獲取數(shù)據(jù),take() 是阻塞形式獲取數(shù)據(jù)。實現(xiàn) Delayed 接口即可使用延時隊列。
      ?? 注意事項:DelayQueue 實現(xiàn)了 Iterator 接口,但 iterator() 遍歷順序不保證是元素的實際存放順序。

      /**
       * 實現(xiàn) Delayed 定義延時隊列
       */
      @Data
      @NoArgsConstructor
      @AllArgsConstructor
      public class Sequence implements Delayed {
      
          private Long time;
          private String name;
      
          @Override
          public long getDelay(TimeUnit unit) {
              return time - System.currentTimeMillis();
          }
      
          @Override
          public int compareTo(Delayed o) {
              if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
                  return 1;
              } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
                  return -1;
              } else {
                  return 0;
              }
          }
      }
      

      3.3、Scala 的 Await & Future

      ?? 優(yōu)點:消息實時性
      ?? 缺點:非持久化
      ?? 介紹:Scala 的 ExecutionContext 中使用Await 的 result(awaitable: Awaitable[T], atMost: Duration)方法可以根據(jù)傳入的 atMost 間隔時間異步執(zhí)行 awaitable。

      import scala.concurrent.ExecutionContext.Implicits.global
      import scala.concurrent.{Await, Future}
      object test extends App {
          val task = Future{ doSomething() }
          Await.result(task, 5 seconds)
      }
      

      3.4、Redis 延遲隊列

      • 消息持久化,至少被消費一次。
      • 實時性:存在一定的時間誤差(定時任務間隔)。
      • 支持指定消息 remove。
      • 高可用性。
      • Redis 的特殊數(shù)據(jù)結構 ZSet 滿足延遲的特性。

      四、Redis 的使用

      4.1、使用 sortedset 操作元素

      ?? 賦值:zadd key score1 value1 score2 value2... (把全部的元素添加到sorted set中,并且每個元素有其對應的分數(shù),返回值是新增的元素個數(shù)。)

      ?? 獲取元素:

      • zscore key value:返回指定成員的分數(shù)
      • zcard key : 獲取集合中的成員數(shù)量

      ?? 刪除元素:zrem key value1 value2 … 刪除指定元素

      • zremrangebyrank key start stop:按照排名范圍刪除元素。
      • zremrangebyscore key min max:按照分數(shù)范圍刪除元素。

      ?? 查詢元素:

      • zrange key start end withscores:查詢start到end之間的成員。
      • zrevrange key start end withscores:查詢成員分數(shù)從大到小順序的索引 start 到 end 的所有成員。
      • zrangebyscore key min max withscores limit offset count:返回分數(shù) min 到 max 的成員并按照分數(shù)從小到大排序, limit 是從 offset 開始展示幾個元素。

      4.2、Redis 實現(xiàn)方式

      ?? 使用sortedset,用時間戳作為score,使用zadd key score1 value1
      命令生產(chǎn)消息,使用zrangebysocre key min max withscores limit 0 1消費消息最早的一條消息。
      ?? 這里選用 Redis 主要的原因就是其支持高性能的 score 排序,同時 Redis 的持久化 bgsave 特性,保證了消息的消費和存貯問題。bgsave 的原理是 fork 和 cow。fork 是指 Redis 通過創(chuàng)建子進程來進行 bgsave 操作, cow 指的是copy on write, 子進程創(chuàng)建后, 父進程通過共享數(shù)據(jù)段, 父進程繼續(xù)提供讀寫服務, 寫臟的頁面數(shù)據(jù)會逐漸和子進程分離開來。

      4.3、ACK

      ?? 隊列最重要的就是保證消息被成功消費,這里也不可避免的需要考慮這個問題。

      ?? RabbitMQ 的 ACK機制:Publisher 把消息發(fā)送到 Consumer,如果 Consumer 已處理完任務,那么它將向 Broker 發(fā)送 ACK 消息,告知某條消息已被成功處理,可以從隊列中移除。如果 Consumer 沒有發(fā)送回 ACK 消息,那么 Broker 會認為消息處理失敗,會將此消息及后續(xù)消息分發(fā)給其它 Consumer 進行處理 ( redeliver flag 置為 true )。

      ?? 這種確認機制和 TCP/IP 協(xié)議確立連接類似。不同的是,TCP/IP 確立連接需要經(jīng)過三次握手,而 RabbitMQ 只需要一次 ACK。還有一個重要的是,RabbitMQ 當且僅當檢測到 ACK 消息未發(fā)出且 Consumer 的連接終止時才會將消息重新分發(fā)給其他 Consumer ,因此不需要擔心消息處理時間過長而被重新分發(fā)的情況。

      Redis 實現(xiàn) ACK

      • 需要在業(yè)務代碼中處理消息失敗的情況,回滾消息到原始等待隊列。
      • Consumer 掛掉,仍然需要回滾消息到等待隊列中。
      • 前者只需要在業(yè)務中處理消費異常的情況,后者則需要維護兩個隊列。
      Redis ACK 實現(xiàn)方案

      ?? 維護一個消息記錄表,存貯消息的消費記錄,用于失敗時回滾消息。表中記錄消息ID、消息內(nèi)容、消息時間、消息狀態(tài)。

      ?? 定時任務輪詢該消息表,處理消費記錄表中消費狀態(tài)未成功的記錄,重新放入等待隊列。

      4.4、多實例問題

      ?? 多實例是指同一個服務部署在不同的地方,發(fā)揮相同的作用,此時就會導致同時消費同一個消息的問題。

      ?? 一般情況下解決此類問題就需要考慮接入外部應用的輔助。常見的分布式鎖的方案有:?基于數(shù)據(jù)庫實現(xiàn)分布式鎖、?基于緩存實現(xiàn)分布式鎖、?基于 Zookeeper 實現(xiàn)分布式鎖。這里使用基于Redis的緩存來解決問題。

      ?? 利用 Redis 的 setnx 的互斥特性,把 key 當作鎖存在 Redis 中,但是用 setnx 需要解決死鎖和正確解鎖的問題。

      ?? 死鎖:設置 key-value 的過期時間,并且使用 lua 腳本保證加鎖和設置過期時間的原子性。

      ?? 解鎖:解鎖需要保證是加鎖客戶端進行解鎖操作。將 value 設置為 UUID,用對應的 UUID 去解鎖保證是加鎖客戶端進行對應的解鎖操作。

      ?? 利用 Redis 的 List 實現(xiàn)一個 Publisher 推送消費保證只被消費一次,這種不用考慮死鎖問題,但是需要額外維護一個隊列。

      五、Jedis實現(xiàn)簡單延時隊列

      ??Zset本質就是Set結構上加了個排序的功能,除了添加數(shù)據(jù)value之外,還提供另一屬性score,這一屬性在添加修改元素的時候可以指定,每次指定后,Zset會自動重新排序。可以理解為有兩列字段的數(shù)據(jù)表,一列存value,一列存次序編號。操作中key理解為zset的名字。那么這個特性對延時隊列又有何用呢?試想如果score代表的是任務想要執(zhí)行的時間戳,zset便會按照score的時間戳大小進行排序,也就是對執(zhí)行時間進行排序。這樣的話,起一個死循環(huán)線程不斷地進行取第一個key值,如果當前時間戳不小于該socre,就將它取出來消費并刪除,從而達到延時執(zhí)行的目的。注意不需要遍歷整個zset集合,以免造成性能浪費。

      package cn.chinotan.service.delayQueueRedis;
      
      import org.apache.commons.lang3.StringUtils;
      import redis.clients.jedis.Jedis;
      import redis.clients.jedis.JedisPool;
      import redis.clients.jedis.Tuple;
      
      import java.text.SimpleDateFormat;
      import java.util.Calendar;
      import java.util.Date;
      import java.util.Set;
      import java.util.concurrent.CountDownLatch;
      import java.util.concurrent.TimeUnit;
      
      /**
       * @program: test
       * @description: 單實例redis實現(xiàn)延時隊列
       **/
      public class AppTest {
      
          private static final String ADDR = "127.0.0.1";
          private static final int PORT = 6379;
          private static JedisPool jedisPool = new JedisPool(ADDR, PORT);
          private static CountDownLatch cdl = new CountDownLatch(10);
      
          public static Jedis getJedis() {
              return jedisPool.getResource();
          }
      
          /**
           * 生產(chǎn)者,生成5個訂單
           */
          public void productionDelayMessage() {
              for (int i = 0; i < 5; i++) {
                  Calendar instance = Calendar.getInstance();
                  // 3秒后執(zhí)行
                  instance.add(Calendar.SECOND, 3 + i);
                  AppTest.getJedis().zadd("orderId", (instance.getTimeInMillis()) / 1000, StringUtils.join("000000000", i + 1));
                  System.out.println("生產(chǎn)訂單: " + StringUtils.join("000000000", i + 1) + " 當前時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
                  System.out.println((3 + i) + "秒后執(zhí)行");
              }
          }
      
          //消費者,取訂單
          public static void consumerDelayMessage() {
              Jedis jedis = AppTest.getJedis();
              while (true) {
                  Set<Tuple> order = jedis.zrangeWithScores("orderId", 0, 0);
                  if (order == null || order.isEmpty()) {
                      System.out.println("當前沒有等待的任務");
                      try {
                          TimeUnit.MICROSECONDS.sleep(500);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      continue;
                  }
                  Tuple tuple = (Tuple) order.toArray()[0];
                  double score = tuple.getScore();
                  Calendar instance = Calendar.getInstance();
                  long nowTime = instance.getTimeInMillis() / 1000;
                  if (nowTime >= score) {
                      String element = tuple.getElement();
                      Long orderId = jedis.zrem("orderId", element);
                      if (orderId > 0) {
                          System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ":redis消費了一個任務:消費的訂單OrderId為" + element);
                      }
                  }
              }
          }
      
          static class DelayMessage implements Runnable{
              @Override
              public void run() {
                  try {
                      cdl.await();
                      consumerDelayMessage();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
          }
          
          public static void main(String[] args) {
              AppTest appTest = new AppTest();
              appTest.productionDelayMessage();
              for (int i = 0; i < 10; i++) {
                  new Thread(new DelayMessage()).start();
                  cdl.countDown();
              }
          }
      }
      

      ??實現(xiàn)效果如下:

      六、小結

      ?? 使用 Redis 實現(xiàn)的隊列具有很好的擴展性,可以很便捷的應對需求的變更和業(yè)務的擴展,但是對于簡單的場景直接使用定時任務會更加容易。在有大量的定時任務需要實現(xiàn)的時候,就可以考慮使用延遲隊列去實現(xiàn),讓代碼更具有擴展性。

      ?? Redis 作為消息隊列的局限性很大,實現(xiàn) ack 機制的成本相對較高,然而他的輕量級的特性以及兼容很多的數(shù)據(jù)結構,Redis 成熟的分布式、持久化、集群等技術體系,讓他可以實現(xiàn)一些輕量級的隊列。總之沒有最好的技術,只有最好的 developer。

      Reference

      posted @ 2021-04-17 13:16  樓蘭胡楊  閱讀(1601)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 1精品啪国产在线观看免费牛牛| 国产精品一二三入口播放| 97无码人妻福利免费公开在线视频 | 小伙无套内射老熟女精品| 日韩av综合中文字幕| 中文人妻av高清一区二区| 无码专区视频精品老司机| 国产福利高颜值在线观看| 国产在线视频精品视频| 午夜精品视频在线看| 人妻聚色窝窝人体WWW一区| 精品精品亚洲高清a毛片| 国产精品国产三级国av| 洞头县| 蜜芽久久人人超碰爱香蕉| 2020国产欧洲精品网站| 99精品久久久中文字幕| 久久熟女| 丁香婷婷综合激情五月色 | 国产精品色一区二区三区| 亚洲欧美综合人成在线| 亚洲国产欧美一区二区好看电影| 欧美色丁香| 国产成AV人片久青草影院| 色五月丁香五月综合五月4438| 成人亚洲性情网站www在线观看| 国产免费午夜福利在线观看| 亚洲免费成人av一区| 欧美成本人视频免费播放| 亚洲区一区二区激情文学| 国产精品高清国产三级囯产AV| 国产最大成人亚洲精品| 337p西西人体大胆瓣开下部| 亚洲国产精品一二三四区| 国产永久免费高清在线| 久久天堂综合亚洲伊人HD妓女 | 国产精品一区二区三区性色| 日韩精品中文字幕有码| 人妻日韩人妻中文字幕| 四虎国产精品永久在线看| 亚洲毛片多多影院|