分布式鎖之一:Redis實現分布式鎖
分布式鎖之一:Redis實現分布式鎖
分布式鎖一般有三種實現方式:1、基于數據庫樂觀鎖;2、基于Redis的分布式鎖;3、基于Zookeeper的分布式鎖。本文檔主要介紹基于Redis實現分布式鎖的方法。
1、加鎖
// redis加鎖
public boolean getLock(Jedis jedis,String key,int expire){
String uuid =UUID.randomUUID().toString();
String result = jedis.set(key,uuid,"NX","PX",expire);
if(Objects.equals("OK",result)){
logger.info("redis加鎖成功");
return true;
}else{
logger.info("redis鎖已存在,加鎖失敗");
return false;
}
}
- 第一個為key,我們使用key來當鎖,因為key是唯一的。
- 第二個為value,使用uuid便于解鎖的時候確保解的是同一把鎖。
- 第三個為nxxx,這個參數我們填的是NX,
- 第四個為nxxx,這個參數我們填的是PX,意思是我們要給這個key加一個過期的設置,具體時間由第五個參數決定。
- 第五個為time,與第四個參數相呼應,代表key的過期時間。
2.執行業務流程
public void doTask(){
//處理業務邏輯
}
3.解鎖
//redis解鎖
public boolean unLock(Jedis jedis,String key,String uuid){
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
long res = redisClient.executeCmd( jedis -> jedis.eval(script,Collections.singleonList(RedisConstans.CLOSE_LEAVE_USER),Collections.singletonList(uuid)));
if(Objects.equals(1L,res)){
logger.info("redis解鎖成功");
return true;
}else{
logger.info("redis解鎖失敗")
}
}
- 使用lua腳本解鎖,把它變成原子操作,保證鎖的釋放正確。
但是以上代碼還是存在問題的,會產生續約和集群同步延遲問題。
續約問題
假想這樣一個場景,如果過期時間為30S,A線程超過30S還沒執行完,但是自動過期了。這時候B線程就會再拿到鎖,造成了同時有兩個線程持有鎖
這時候就要考慮延長鎖的過期時間了??梢栽O置一個合理的過期時間,保證業務能處理完?;蛘呤褂肦edisson。
集群同步延遲問題
用于redis的服務肯定不能是單機,因為單機就不是高可用了,一量掛掉整個分布式鎖就沒用了。
在集群場景下,如果A在master拿到了鎖,在沒有把數據同步到slave時,master掛掉了。B再拿鎖就會從slave拿鎖,而且會拿到。又出現了兩個線程同時拿到鎖。
Redisson
Redisson是架設在Redis基礎上的一個Java駐內存數據網格(In-Memory Data Grid)。充分的利用了Redis鍵值數據庫提供的一系列優勢,基于Java實用工具包中常用接口,為使用者提供了一系列具有分布式特性的常用工具類。
Redisson通過lua腳本解決了上面的原子性問題,通過“看門狗”解決了續約問題,但是它應該解決不了集群中的同步延遲問題。
總結
redis分布式鎖的方案,無論用何種方式實現都會有續約問題與集群同步延遲問題??偟膩碚f,是一個不太靠譜的方案。如果追求高正確率,不能采用這種方案。
但是它也有優點,就是比較簡單,在某些非嚴格要求的場景是可以使用的,比如社交系統一類,交易系統一類不能出現重復交易則不建議用。
在Spring Boot中使用Redis實現分布式鎖
這種機制可以有效地控制對共享資源的訪問,避免數據競爭和不一致的問題。
步驟 1: 添加依賴
確保在你的pom.xml中添加了Spring Data Redis相關依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>io.lettuce.core</groupId>
<artifactId>lettuce-core</artifactId>
</dependency>
步驟 2: 配置Redis連接
在application.properties或application.yml文件中配置Redis的連接信息:
spring.redis.host=localhost
spring.redis.port=6379
步驟 3: 創建分布式鎖工具類
創建一個工具類來處理分布式鎖的獲取和釋放:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
public class RedisLock {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public boolean tryLock(String lockKey, String requestId, long timeout) {
Boolean success = redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, timeout, TimeUnit.SECONDS);
return success != null && success;
}
public void unlock(String lockKey, String requestId) {
String value = redisTemplate.opsForValue().get(lockKey);
if (requestId.equals(value)) {
redisTemplate.delete(lockKey);
}
}
}
步驟 4: 使用分布式鎖
在你的服務或控制器中,可以通過RedisLock類來獲取和釋放鎖。以下是一個示例:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MyService {
@Autowired
private RedisLock redisLock;
private static final String LOCK_KEY = "myLock";
public void performTask() {
String requestId = String.valueOf(System.currentTimeMillis()); // 使用當前時間戳作為請求ID
// 嘗試獲取鎖
if (redisLock.tryLock(LOCK_KEY, requestId, 10)) { // 10秒超時
try {
// 執行需要加鎖的操作
System.out.println("Lock acquired! Performing task...");
Thread.sleep(5000); // 模擬任務執行
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 釋放鎖
redisLock.unlock(LOCK_KEY, requestId);
System.out.println("Lock released!");
}
} else {
System.out.println("Could not acquire lock, please try again later.");
}
}
}
步驟 5: 測試分布式鎖
你可以在控制器中調用MyService的performTask方法進行測試:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MyController {
@Autowired
private MyService myService;
@GetMapping("/test-lock")
public String testLock() {
myService.performTask();
return "Task initiated!";
}
}
步驟 6: 啟動應用并測試
啟動你的Spring Boot應用,并訪問以下URL來測試分布式鎖功能:
http://localhost:8080/test-lock
注意事項
- 過期時間:為了避免死鎖,確保在設置鎖時指定合理的過期時間。
- 鎖的唯一性:在分布式環境中,鎖的唯一性非常重要,建議使用唯一的lockKey和requestId。
- 異常處理:確保在任務執行過程中正確處理異常,并在finally塊中釋放鎖。
為了避免死鎖,如何確保在設置鎖時指定合理的過期時間
為了避免死鎖,確保在設置鎖時指定合理的過期時間,可以遵循以下幾個原則:
1. 分析任務執行時間
在設置鎖的過期時間時,首先應分析被鎖定操作的最長執行時間??梢酝ㄟ^歷史監控數據或日志分析來獲取這個信息。
2. 設置適當的過期時間
將鎖的過期時間設置為預計的最大執行時間加上一些安全邊際。例如,如果你預計某個操作通常需要3秒,設置鎖的過期時間為5秒或6秒。
3. 使用可重入鎖
如果你的操作可能會重入(即同一個線程可能會多次嘗試獲取同一把鎖),考慮使用可重入鎖。這種鎖允許同一個請求在持有鎖的情況下再次獲取鎖,而不會導致死鎖。
4.動態調整過期時間
在執行任務的過程中,可以定期檢查鎖的狀態,并在必要時動態延長鎖的過期時間。這樣可以防止因為任務執行時間超過預期而導致的鎖過期。
5. 監控與報警
實施監控措施,跟蹤鎖的使用情況,并在發現鎖頻繁超時或未釋放的情況時報警。這有助于及時發現問題并進行處理。
6. 日志記錄與分析
7.動態調整過期時間示例代碼
我們可以添加動態調整過期時間的邏輯。以下是一個簡單的實現:
public boolean tryLock(String lockKey, String requestId, long timeout) {
Boolean success = redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, timeout, TimeUnit.SECONDS);
return success != null && success;
}
// 動態調整鎖的過期時間
public void extendLock(String lockKey, String requestId, long extraTime) {
String value = redisTemplate.opsForValue().get(lockKey);
if (requestId.equals(value)) {
redisTemplate.expire(lockKey, extraTime, TimeUnit.SECONDS);
}
}
8.總結
通過以上策略,你可以有效地減少死鎖的發生概率,并提高系統的穩定性。確保在設計鎖機制時充分考慮執行時間和任務特性,以制定合理的過期時間。

浙公網安備 33010602011771號