Redis 高階應用
生成全局唯一 ID
-
全局唯一 ID 需要滿足以下要求:
-
唯一性:在分布式環境中,要全局唯一
-
高可用:在高并發情況下保證可用性
-
高性能:在高并發情況下生成 ID 的速度必須要快,不能花費太長時間
-
遞增性:要確保整體遞增的,以便于數據庫創建索引
-
安全性:ID 的規律性不能太明顯,以免信息泄露
從上面的要求可以看出,全局 ID 生成器的條件還是比較苛刻的,而 Redis 恰巧可以滿足以上要求。
Redis 本身就是就是以性能著稱,因此完全符合高性能的要求,其次使用 Redis 的 incr 命令可以保證遞增性,配合相應的分布式 ID 生成算法便可以實現唯一性和安全性,Redis 可以通過哨兵、主從等集群方案來保證可用性。因此 Redis 是一個不錯的選擇。
下面我們就寫一個簡單的示例,來讓大家感受一下,實際工作中大家可以根據需要進行調整:
@Component
public class IDUtil{
//開始時間戳(單位:秒) 2000-01-01 00:00:00
private static final long START_TIMESTAMP = 946656000L;
//Spring Data Redis 提供的 Redis 操作模板
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 獲取 ID 格式:時間戳+序列號
* @param keyPrefix Redis 序列號前綴
* @return 生成的 ID
*/
public long getNextId(String keyPrefix){
//獲取當前時間戳
LocalDateTime now = LocalDateTime.now();
long nowTimestamp = now.toEpochSecond(ZoneOffset.UTC);
//獲取 ID 時間戳
long timestamp = nowSecond - START_TIMESTAMP;
//獲取當前日期
String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
//生成 key
String key = "incr:" + keyPrefix + ":" + date;
//獲取序列號
long count = stringRedisTemplate.opsForValue().increment(key);
//生成 ID 并返回
return timestamp << 32 | count;
}
}
分布式鎖
在 JVM 內部會有一個鎖監視器來控制線程間的互斥,但在分布式的環境下會有多臺機器部署同樣的服務,也就是說每臺機器都會有自己的鎖監視器。而 JVM 的鎖監視器只能保證自己內部線程的安全執行,并不能保證不同機器間的線程安全執行,因此也很難避免高并發帶來的線程安全問題。因此就需要分布式鎖來保證整個集群的線程的安全,而分布式鎖需要滿足 5 點要求:多進程可見、互斥性、高可用、高性能、安全性
其中核心要求就是多進程之間互斥,而滿足這一點的方式有很多,最常見的有三種:mysql、Redis、Zookeeper。

通過對比我們發現,其中 Redis 的效果最理想,所以下面就用 Redis 來實現一個簡單的分布式鎖。
public class DistributedLockUtil {
//分布式鎖前綴
private static final String KEY_PREFIX = "distributed:lock:";
//業務名
private String business;
//分布式鎖的值
private String value;
//Spring Data Redis 提供的 Redis 操作模板
private StringRedisTemplate stringRedisTemplate;
//私有化無參構造
private DistributedLockUtil(){}
//有參構造
public DistributedLockUtil(String business,StringRedisTemplate stringRedisTemplate){
this.business = business;
this.stringRedisTemplate = stringRedisTemplate;
this.value = UUID.randomUUID().toString();
}
/**
* 嘗試獲取鎖
* @param timeout 超時時間(單位:秒)
* @return 鎖是否獲取成功
*/
public boolean tryLock(long timeout){
//生成分布式鎖的 key
StringBuffer keyBuffer = new StringBuffer(KEY_PREFIX);
keyBuffer.append(business);
Boolean success = stringRedisTemplate.opsForValue().setIsAbsent(keyBuffer.toString(),value,timeout, TimeUnit.SECONDS);
//返回結果 注意:為了防止自動拆箱時出現空指針,所以這里用了 equals 判斷
return Boolean.TRUE.equals(success);
}
/**
* 釋放鎖(不安全版)
*/
public void unLock(){
//生成分布式鎖的 key
StringBuffer keyBuffer = new StringBuffer(KEY_PREFIX);
keyBuffer.append(business);
//獲取分布式鎖的值
String redisValue = stringRedisTemplate.opsForValue().get(keyBuffer.toString());
//判斷值是否一致,防止誤刪
if (value.equals(redisValue)) {
//當代碼執行到這里時,如果 JVM 恰巧執行了垃圾回收(雖然幾率極低),就會導致所有線程阻塞等待,因此這里仍然會有線程安全的問題
stringRedisTemplate.delete(keyBuffer.toString());
}
}
/**
* 通過腳本釋放鎖(徹底解決線程安全問題)
*/
public void unLockWithScript(){
//加載 lua 腳本,實際工作中我們可以將腳本設置為常量,并在靜態代碼塊中初始化(腳本內容在下文)
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setLocation(new ClassPathResource("unlock.lua"));
script.setResultType(Long.class);
//生成分布式鎖的 key
StringBuffer keyBuffer = new StringBuffer(KEY_PREFIX);
keyBuffer.append(business);
//調用 lua 腳本釋放鎖
stringRedisTemplate.execute(script,
Collections.singletonList(keyBuffer.toString()),
value);
}
}
lua 腳本內容如下:
-- 判斷值是否一致,防止誤刪
if(redis.call('get',KEYS[1]) == VRGV[1]) then
-- 判斷通過,釋放鎖
return redis.call('del',KEYS[1])
end
-- 判斷不通過,返回 0
return 0
雖然通過 lua 腳本解決了線程不安全的問題,但是仍然存在以下問題:
- 不可重入:同一個線程無法多次獲取同一把鎖
- 不可重試:獲取鎖只能嘗試一次,失敗就返回 false,沒有重試機制
- 超時釋放:鎖超時釋放雖然可以避免死鎖,但如果業務執行耗時較長,也會導致鎖釋放,存在安全隱患
- 主從一致性:如果 Redis 提供了主從集群,主從同步存在延遲,當主機宕機時,如果從機還沒來得及同步主機的鎖數據,則會出現鎖失效。
要解決以上問題也非常簡單,只需要利用 Redis 的 hash 結構記錄線程標識和重入次數就可以解決不可重入的問題。利用信號量和 PubSub 功能實現等待、喚醒,獲取鎖失敗的重試機制即可解決不可重試的問題。而超時釋放的問題則可以通過獲取鎖時為鎖添加一個定時任務(俗稱看門狗),定期刷新鎖的超時時間即可。至于主從一致性問題,我們只需要利用多個獨立的 Redis 節點(非主從),必須在所有節點都獲取重入鎖,才算獲取鎖成功。

有的人可能說了,雖然說起來簡單,但真正實現起來也不是很容易呀。對于這種問題,大家不用擔心,俗話說得好想要看的更遠,需要站在巨人的肩膀上。對于上述的需求,早就有了成熟的開源方案 Redisson ,我們直接拿來用就可以了,無需重復造輪子,具體使用方法可以查看官方文檔。
輕量化消息隊列
雖然市面上有很多優秀的消息中間件如 RocketMQ、Kafka 等,但對于應用場景較為簡單,只需要簡單的消息傳遞,比如任務調度、簡單的通知系統等,不需要復雜的消息路由、事務支持的業務來說,用那些專門的消息中間件成本就顯得過高。因此我們就可以使用 Redis 來做消息隊列。
Redis 提供了三種不同的方式來實現消息隊列:
- list 結構:可以使用 list 來模擬消息隊列,可以使用 BRPOP 或 BLPOP 命令來實現類似 JVM 阻塞隊列的消息隊列。
- PubSub:基于發布/訂閱的消息模型,但不支持數據持久化,且消息堆積有上限,超出時數據丟失。
- Stream:Redis 5.0 新增的數據類型,可以實現一個功能非常完善的消息隊列,也是我們實現消息隊列的首選。

下面我就采用 Redis 的 Stream 實現一個簡單的案例來讓大家感受一下,實際工作中大家可以根據需要進行調整:
public class RedisQueueUtil{
//Spring Data Redis 提供的 Redis 操作模板
private StringRedisTemplate stringRedisTemplate;
/**
* 獲取消息隊列中的數據,執行該方法前,一定要確保消費者組已經創建
* @param queueName 隊列名
* @param groupName 消費者組名
* @param consumerName 消費者名
* @param type 返回值類型
* @return 消息隊列中的數據
*/
public <T> T getQueueData(String queueName, String groupName, String consumerName, Class<T> type){
while (true){
try {
//獲取消息隊列中的信息
List<MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from(groupName,consumerName),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
//判斷消息是否獲取成功
if (list == null || list.isEmpty()){
//如果獲取失敗,說明沒有消息,繼續下一次循環
continue;
}
//如果獲取成功,則解析消息中的數據
MapRecord<String,Object,Object> record = list.get(0);
Map<Object,Object> values = record.getValue();
String jsonString = JSON.toJSONString(values);
T result = JSON.parseObject(jsonString, type);
// ACK
stringRedisTemplate.opsForStream().acknowledge(queueName,groupName,record.getId());
//返回結果
return result;
}catch (Exception e){
while (true){
try {
//獲取 pending-list 隊列中的信息
List<MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from(groupName,consumerName),
StreamReadOptions.empty().count(1)),
StreamOffset.create(queueName,ReadOffset.from("0")
);
//判斷消息是否獲取成功
if (list == null || list.isEmpty()){
//如果獲取失敗,說明 pending-list 沒有異常消息,結束循環
break;
}
//如果獲取成功,則解析消息中的數據
MapRecord<String,Object,Object> record = list.get(0);
Map<Object,Object> values = record.getValue();
String jsonString = JSON.toJSONString(values);
T result = JSON.parseObject(jsonString, type);
// ACK
stringRedisTemplate.opsForStream().acknowledge(queueName,groupName,record.getId());
//返回結果
return result;
}catch (Exception ex){
log.error("處理 pending-list 訂單異常",ex);
try {
Thread.sleep(50);
}catch (InterruptedException err){
err.printStackTrace();
}
}
}
}
}
}
/**
* 向消息隊列中發送數據
* @param queueName 消息隊列名
* @param map 要發送數據的集合
*/
public void sendQueueData(String queueName, Map<String,Object> map){
StringBuilder builder = new StringBuilder("redis.call('xadd','");
builder.append(queueName).append("','*','");
Set<String> keys = map.keySet();
for(String key:keys){
builder.append(key).append("','").append(map.get(key)).append("','");
}
String script = builder.substring(0, builder.length() - 2);
script += ")";
stringRedisTemplate.execute(new DefaultRedisScript<Long>(script,Long.class),Collections.emptyList());
}
}

浙公網安備 33010602011771號