轉:基于Redis實現(xiàn)延時隊列
摘要:使用 sortedset,拿時間戳作為score,消息內(nèi)容作為 key 調用 zadd 來生產(chǎn)消息,消費者用 zrangebyscore 指令獲取 N 秒之前的數(shù)據(jù)輪詢進行處理。
?? 前段時間做一個項目,需要各種定時任務處理會話狀態(tài),處理訂單狀態(tài),然后需求不停的變,修修改改就覺得很麻煩,就去了解了一下有沒有什么便捷的方式取代繁瑣的定時任務,于是就找到了延遲隊列的這種實現(xiàn)方式。
一、應用場景
- 訂單超過 30 分鐘未支付,則自動取消。
- 訂單一些評論,如果48h用戶未對商家評論,系統(tǒng)會自動產(chǎn)生一條默認評論。
- 外賣商家超時未接單,則自動取消訂單。
- 醫(yī)生搶單電話點診,超過 30 分鐘未打電話,則自動退款。
?? 如上場景都可以用定時任務去輪詢實現(xiàn),但是當數(shù)據(jù)量過大的時候,高頻輪詢數(shù)據(jù)庫會消耗大量的資源,此時用延遲隊列來應對這類場景比較好。
二、需求
?? 根據(jù)自身業(yè)務和公司情況,如果實現(xiàn)一個自己的延時隊列服務需要考慮一下幾點:
- 消息存儲
- 過期延時消息實時獲取
- 高可用性
三、為什么使用 Redis 實現(xiàn)?
?? 延時隊列就是一種帶有延遲功能的消息隊列。下面會介紹幾種目前已有的延時隊列。
3.1、RabbitMQ 延時隊列
?? 優(yōu)點:消息持久化,分布式。
?? 缺點:延時相同的消息必須扔在同一個隊列,每一種延時就需要建立一個隊列。因為當后面的消息比前面的消息先過期,還是只能等待前面的消息過期,這里的過期檢測是惰性的。
?? 使用: RabbitMQ 可以針對 Queue 設置 x-expires 或者針對 Message 設置 x-message-ttl ,來控制消息的生存時間(可以根據(jù) Queue 來設置,也可以根據(jù) message 設置), Queue 還可以配置 x-dead-letter-exchange 和 x-dead-letter-routing-key(可選)兩個參數(shù),如果隊列內(nèi)出現(xiàn)了 dead letter ,則按照這兩個參數(shù)重新路由轉發(fā)到指定的隊列,此時就可以實現(xiàn)延時隊列了。
RabbitMQ天然具備分布式的特性,可以很好的用在多服務。
3.2、DelayQueue 延時隊列
?? 優(yōu)點:無界、延遲、阻塞隊列
?? 缺點:非持久化
?? 介紹:JDK 自帶的延時隊列,沒有過期元素的話,使用 poll() 方法會返回 null 值,超時判定是通過getDelay(TimeUnit.NANOSECONDS) 方法的返回值小于等于0來判斷,并且不能存放空元素。
?? 使用:getDelay 方法定義了剩余到期時間,compareTo 方法定義了元素排序規(guī)則。poll() 是非阻塞的獲取數(shù)據(jù),take() 是阻塞形式獲取數(shù)據(jù)。實現(xiàn) Delayed 接口即可使用延時隊列。
?? 注意事項:DelayQueue 實現(xiàn)了 Iterator 接口,但 iterator() 遍歷順序不保證是元素的實際存放順序。
/**
* 實現(xiàn) Delayed 定義延時隊列
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Sequence implements Delayed {
private Long time;
private String name;
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
} else {
return 0;
}
}
}
3.3、Scala 的 Await & Future
?? 優(yōu)點:消息實時性
?? 缺點:非持久化
?? 介紹:Scala 的 ExecutionContext 中使用Await 的 result(awaitable: Awaitable[T], atMost: Duration)方法可以根據(jù)傳入的 atMost 間隔時間異步執(zhí)行 awaitable。
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
object test extends App {
val task = Future{ doSomething() }
Await.result(task, 5 seconds)
}
3.4、Redis 延遲隊列
- 消息持久化,至少被消費一次。
- 實時性:存在一定的時間誤差(定時任務間隔)。
- 支持指定消息 remove。
- 高可用性。
- Redis 的特殊數(shù)據(jù)結構 ZSet 滿足延遲的特性。
四、Redis 的使用
4.1、使用 sortedset 操作元素
?? 賦值:zadd key score1 value1 score2 value2... (把全部的元素添加到sorted set中,并且每個元素有其對應的分數(shù),返回值是新增的元素個數(shù)。)
?? 獲取元素:
- zscore key value:返回指定成員的分數(shù)
- zcard key : 獲取集合中的成員數(shù)量
?? 刪除元素:zrem key value1 value2 … 刪除指定元素
- zremrangebyrank key start stop:按照排名范圍刪除元素。
- zremrangebyscore key min max:按照分數(shù)范圍刪除元素。
?? 查詢元素:
- zrange key start end withscores:查詢start到end之間的成員。
- zrevrange key start end withscores:查詢成員分數(shù)從大到小順序的索引 start 到 end 的所有成員。
- zrangebyscore key min max withscores limit offset count:返回分數(shù) min 到 max 的成員并按照分數(shù)從小到大排序, limit 是從 offset 開始展示幾個元素。
4.2、Redis 實現(xiàn)方式
?? 使用sortedset,用時間戳作為score,使用zadd key score1 value1
命令生產(chǎn)消息,使用zrangebysocre key min max withscores limit 0 1消費消息最早的一條消息。
?? 這里選用 Redis 主要的原因就是其支持高性能的 score 排序,同時 Redis 的持久化 bgsave 特性,保證了消息的消費和存貯問題。bgsave 的原理是 fork 和 cow。fork 是指 Redis 通過創(chuàng)建子進程來進行 bgsave 操作, cow 指的是copy on write, 子進程創(chuàng)建后, 父進程通過共享數(shù)據(jù)段, 父進程繼續(xù)提供讀寫服務, 寫臟的頁面數(shù)據(jù)會逐漸和子進程分離開來。
4.3、ACK
?? 隊列最重要的就是保證消息被成功消費,這里也不可避免的需要考慮這個問題。
?? RabbitMQ 的 ACK機制:Publisher 把消息發(fā)送到 Consumer,如果 Consumer 已處理完任務,那么它將向 Broker 發(fā)送 ACK 消息,告知某條消息已被成功處理,可以從隊列中移除。如果 Consumer 沒有發(fā)送回 ACK 消息,那么 Broker 會認為消息處理失敗,會將此消息及后續(xù)消息分發(fā)給其它 Consumer 進行處理 ( redeliver flag 置為 true )。
?? 這種確認機制和 TCP/IP 協(xié)議確立連接類似。不同的是,TCP/IP 確立連接需要經(jīng)過三次握手,而 RabbitMQ 只需要一次 ACK。還有一個重要的是,RabbitMQ 當且僅當檢測到 ACK 消息未發(fā)出且 Consumer 的連接終止時才會將消息重新分發(fā)給其他 Consumer ,因此不需要擔心消息處理時間過長而被重新分發(fā)的情況。
Redis 實現(xiàn) ACK
- 需要在業(yè)務代碼中處理消息失敗的情況,回滾消息到原始等待隊列。
- Consumer 掛掉,仍然需要回滾消息到等待隊列中。
- 前者只需要在業(yè)務中處理消費異常的情況,后者則需要維護兩個隊列。
Redis ACK 實現(xiàn)方案
?? 維護一個消息記錄表,存貯消息的消費記錄,用于失敗時回滾消息。表中記錄消息ID、消息內(nèi)容、消息時間、消息狀態(tài)。
?? 定時任務輪詢該消息表,處理消費記錄表中消費狀態(tài)未成功的記錄,重新放入等待隊列。
4.4、多實例問題
?? 多實例是指同一個服務部署在不同的地方,發(fā)揮相同的作用,此時就會導致同時消費同一個消息的問題。
?? 一般情況下解決此類問題就需要考慮接入外部應用的輔助。常見的分布式鎖的方案有:?基于數(shù)據(jù)庫實現(xiàn)分布式鎖、?基于緩存實現(xiàn)分布式鎖、?基于 Zookeeper 實現(xiàn)分布式鎖。這里使用基于Redis的緩存來解決問題。
?? 利用 Redis 的 setnx 的互斥特性,把 key 當作鎖存在 Redis 中,但是用 setnx 需要解決死鎖和正確解鎖的問題。
?? 死鎖:設置 key-value 的過期時間,并且使用 lua 腳本保證加鎖和設置過期時間的原子性。
?? 解鎖:解鎖需要保證是加鎖客戶端進行解鎖操作。將 value 設置為 UUID,用對應的 UUID 去解鎖保證是加鎖客戶端進行對應的解鎖操作。
?? 利用 Redis 的 List 實現(xiàn)一個 Publisher 推送消費保證只被消費一次,這種不用考慮死鎖問題,但是需要額外維護一個隊列。
五、Jedis實現(xiàn)簡單延時隊列
??Zset本質就是Set結構上加了個排序的功能,除了添加數(shù)據(jù)value之外,還提供另一屬性score,這一屬性在添加修改元素的時候可以指定,每次指定后,Zset會自動重新排序。可以理解為有兩列字段的數(shù)據(jù)表,一列存value,一列存次序編號。操作中key理解為zset的名字。那么這個特性對延時隊列又有何用呢?試想如果score代表的是任務想要執(zhí)行的時間戳,zset便會按照score的時間戳大小進行排序,也就是對執(zhí)行時間進行排序。這樣的話,起一個死循環(huán)線程不斷地進行取第一個key值,如果當前時間戳不小于該socre,就將它取出來消費并刪除,從而達到延時執(zhí)行的目的。注意不需要遍歷整個zset集合,以免造成性能浪費。
package cn.chinotan.service.delayQueueRedis;
import org.apache.commons.lang3.StringUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Tuple;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @program: test
* @description: 單實例redis實現(xiàn)延時隊列
**/
public class AppTest {
private static final String ADDR = "127.0.0.1";
private static final int PORT = 6379;
private static JedisPool jedisPool = new JedisPool(ADDR, PORT);
private static CountDownLatch cdl = new CountDownLatch(10);
public static Jedis getJedis() {
return jedisPool.getResource();
}
/**
* 生產(chǎn)者,生成5個訂單
*/
public void productionDelayMessage() {
for (int i = 0; i < 5; i++) {
Calendar instance = Calendar.getInstance();
// 3秒后執(zhí)行
instance.add(Calendar.SECOND, 3 + i);
AppTest.getJedis().zadd("orderId", (instance.getTimeInMillis()) / 1000, StringUtils.join("000000000", i + 1));
System.out.println("生產(chǎn)訂單: " + StringUtils.join("000000000", i + 1) + " 當前時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
System.out.println((3 + i) + "秒后執(zhí)行");
}
}
//消費者,取訂單
public static void consumerDelayMessage() {
Jedis jedis = AppTest.getJedis();
while (true) {
Set<Tuple> order = jedis.zrangeWithScores("orderId", 0, 0);
if (order == null || order.isEmpty()) {
System.out.println("當前沒有等待的任務");
try {
TimeUnit.MICROSECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
Tuple tuple = (Tuple) order.toArray()[0];
double score = tuple.getScore();
Calendar instance = Calendar.getInstance();
long nowTime = instance.getTimeInMillis() / 1000;
if (nowTime >= score) {
String element = tuple.getElement();
Long orderId = jedis.zrem("orderId", element);
if (orderId > 0) {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ":redis消費了一個任務:消費的訂單OrderId為" + element);
}
}
}
}
static class DelayMessage implements Runnable{
@Override
public void run() {
try {
cdl.await();
consumerDelayMessage();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
AppTest appTest = new AppTest();
appTest.productionDelayMessage();
for (int i = 0; i < 10; i++) {
new Thread(new DelayMessage()).start();
cdl.countDown();
}
}
}
??實現(xiàn)效果如下:

六、小結
?? 使用 Redis 實現(xiàn)的隊列具有很好的擴展性,可以很便捷的應對需求的變更和業(yè)務的擴展,但是對于簡單的場景直接使用定時任務會更加容易。在有大量的定時任務需要實現(xiàn)的時候,就可以考慮使用延遲隊列去實現(xiàn),讓代碼更具有擴展性。
?? Redis 作為消息隊列的局限性很大,實現(xiàn) ack 機制的成本相對較高,然而他的輕量級的特性以及兼容很多的數(shù)據(jù)結構,Redis 成熟的分布式、持久化、集群等技術體系,讓他可以實現(xiàn)一些輕量級的隊列。總之沒有最好的技術,只有最好的 developer。
Reference
Buy me a coffee. ?Get red packets.
浙公網(wǎng)安備 33010602011771號