如何實現一個分布式鎖
如何實現一個分布式鎖
本篇內容主要介紹如何使用 Java 語言實現一個注解式的分布式鎖,主要是通過注解+AOP 環繞通知來實現。
1. 鎖注解
我們首先寫一個鎖的注解
/**
* 分布式鎖注解
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@Documented
public @interface RedisLock {
long DEFAULT_TIMEOUT_FOR_LOCK = 5L;
long DEFAULT_EXPIRE_TIME = 60L;
String key() default "your-biz-key";
long expiredTime() default DEFAULT_EXPIRE_TIME;
long timeoutForLock() default DEFAULT_TIMEOUT_FOR_LOCK;
}
expiredTime 是設置鎖的過期時間,timeoutForLock 是設置等待鎖的超時時間。如果沒有等待獲得鎖的超時時間這個功能,那么其他線程在獲取鎖失敗時只能直接失敗,無法進行排隊等待。
我們如何使用這個注解呢,很容易,在需要加鎖的業務方法上直接用就行.如下,我們有一個庫存服務類,它有一個扣減庫存方法,該方法將數據庫中的一個庫存商品的數量減一。在并發場景下,如果我們沒有對其進行資源控制,必然會發生庫存扣減不一致現象。
public class StockServiceImpl {
@RedisLock(key = "stock-lock", expiredTime = 10L, timeoutForLock = 5L)
public void deduct(Long stockId) {
Stock stock = this.getById(1L);
Integer count = stock.getCount();
stock.setCount(count - 1);
this.updateById(stock);
}
}
2. 在 AOP 切面中進行加鎖處理
我們需要使用 AOP 來處理什么?自然是處理使用@RedisLock的方法,因此我們寫一個切點表達式,它匹配所有標有 @RedisLock 注解的方法。
接著,我們將此切點表達式與 @Around 注解結合使用,以創建環繞通知,在目標方法執行前后執行我們的加鎖解鎖邏輯。
因此,基本的邏輯我們就理清了,代碼大致長下面這個樣子:
public class RedisLockAspect {
private final RedisTemplate<String, Object> redisTemplate;
// 鎖的redis key前綴
private static final String DEFAULT_KEY_PREFIX = "lock:";
// 匹配所有標有 @RedisLock 注解的方法
@Pointcut("@annotation(com.kelton.lock.annotation.RedisLock)")
public void lockAnno() {
}
@Around("lockAnno()")
public void invoke(ProceedingJoinPoint joinPoint) throws Exception {
// 獲取攔截方法上的RedisLock注解
RedisLock annotation = getLockAnnotationOnMethod(joinPoint);
// 獲取鎖key
String key = getKey(annotation);
// 鎖過期時間
long expireTime = annotation.expiredTime();
// 獲取鎖的等待時間
long timeoutForLock = annotation.timeoutForLock();
// 在這里加鎖
someCodeForLock...
// 執行業務
joinPoint.proceed();
// 在這里解鎖
someCodeForUnLock...
}
我們在加鎖的時候,需要用上 timeoutForLock 這個屬性,我們通過自旋加線程休眠的方式,來達到在一段時間內等待獲取鎖的目的。如果自旋時間結束后,還沒獲取鎖,則拋出異常,這里可以根據自己情況而定。自旋加鎖代碼如下:
// 自旋獲取鎖
long endTime = System.currentTimeMillis() + timeoutForLock * 1000;
boolean acquired = false;
String uuid = UUID.randomUUID().toString();
while(System.currentTimeMillis() < endTime) {
Boolean absent = redisTemplate.opsForValue()
.setIfAbsent(key, uuid, expireTime, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(absent)) {
acquired = true;
break;
} else {
// 獲取不到鎖,嘗試休眠100毫秒后重試
Thread.sleep(100);
}
}
// 超時未獲取到鎖, 拋出異常,可根據自己業務而定
if (!acquired) {
throw new RuntimeException("獲取鎖異常");
}
我們發現上面加鎖的時候設置了一個 uuid 作為 value 值,這是為了在鎖釋放的時候,不誤刪其他線程上的鎖,隨后,我們就可以執行被 AOP 切中的方法,執行結束釋放鎖。代碼如下:
try {
// 執行業務
joinPoint.proceed();
} catch (Throwable e) {
log.error("業務執行出錯!");
} finally {
// 解鎖時進行校驗,只刪除自己線程加的鎖
String value = (String) redisTemplate.opsForValue().get(key);
if (uuid.equals(value)) {
redisTemplate.delete(key);
} else {
log.warn("鎖已過期!");
}
}
到這里,我們就以注解+AOP 的方式實現了分布式鎖的功能。當然,以上只實現了分布式鎖的簡單功能,還缺少了分布式鎖的 key 自動續約防止鎖過期功能,以及鎖重入功能。
目前,RedisLockAspect的完整代碼如下:
@Component
@Aspect
@Slf4j
@AllArgsConstructor
public class RedisLockAspect {
// 匹配所有標有 @RedisLock 注解的方法
@Pointcut("@annotation(com.kelton.lock.annotation.RedisLock)")
public void lockAnno() {
}
@Around("lockAnno()")
public void invoke(ProceedingJoinPoint joinPoint) throws Exception {
// 獲取攔截方法上的RedisLock注解
RedisLock annotation = getLockAnnotationOnMethod(joinPoint);
String key = getKey(annotation);
// 鎖過期時間
long expireTime = annotation.expiredTime();
// 獲取鎖的等待時間
long timeoutForLock = annotation.timeoutForLock();
// 自旋獲取鎖
long endTime = System.currentTimeMillis() + timeoutForLock * 1000;
boolean acquired = false;
String uuid = UUID.randomUUID().toString();
while(System.currentTimeMillis() < endTime) {
Boolean absent = redisTemplate.opsForValue()
.setIfAbsent(key, uuid, expireTime, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(absent)) {
acquired = true;
break;
} else {
// 獲取不到鎖,嘗試休眠100毫秒后重試
Thread.sleep(100);
}
}
// 超時未獲取到鎖, 拋出異常,可根據自己業務而定
if (!acquired) {
throw new RuntimeException("獲取鎖異常");
}
try {
// 執行業務
joinPoint.proceed();
} catch (Throwable e) {
log.error("業務執行出錯!");
} finally {
// 解鎖時進行校驗,只刪除自己線程加的鎖
String value = (String) redisTemplate.opsForValue().get(key);
if (uuid.equals(value)) {
redisTemplate.delete(key);
} else {
log.warn("鎖已過期!");
}
}
}
private String getKey(RedisLock redisLock) {
if (Objects.isNull(redisLock)) {
return DEFAULT_KEY_PREFIX + "default";
}
return DEFAULT_KEY_PREFIX + redisLock.key();
}
private RedisLock getLockAnnotationOnMethod(ProceedingJoinPoint joinPoint) {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
return method.getAnnotation(RedisLock.class);
}
}
3. key 自動續約防止鎖過期
我們接著完善該分布式鎖,為其添加 key 自動續約防止鎖過期的功能。我們的思路與Redission的watch dog類似,開啟一個后臺線程,來定時檢查需要續約的鎖。我們如何判斷一個鎖是否需要續約呢,我們可以簡單定義一個續約分界線,比如在鎖過期時間的三分之二的時間點及之后,對鎖進行續約。
3.1 定義一個續約任務4
我們來定義一個鎖續約任務,那我們需要什么信息呢?
我們至少需要鎖的 key,鎖要設置的過期時間。這是兩個最基本的信息。
要判斷在鎖過期時間的三分之二的時間點及之后進行續約,那么我們還需要記錄鎖上次續約的時間點。
此外,我們還可以為鎖續約任務添加最大續約次數限制,這可以避免某些執行時間特別久的任務不斷占用鎖。所以我們還需要記錄當前鎖續約次數和最大續約次數。
對超過最大續約次數的鎖的線程,我們直接將其停止,因此我們也記錄一下該鎖的線程。
結合上面的分析,我們定義的鎖續約任務類如下:
public class LockRenewTask {
/**
* key
*/
private final String key;
/**
* 過期時間。單位:秒
*/
private final long expiredTime;
/**
* 鎖的最大續約次數
*/
private final int maxRenewCount;
/**
* 鎖的當前續約次數
*/
private int currentRenewCount;
/**
* 最新更新時間
*/
private LocalDateTime latestRenewTime;
/**
* 業務線程
*/
private final Thread thread;
public LockRenewTask(String key, long expiredTime, int maxRenewCount, Thread thread) {
this.key = key;
this.expiredTime = expiredTime;
this.maxRenewCount = maxRenewCount;
this.thread = thread;
this.latestRenewTime = LocalDateTime.now();
}
/**
* 是否到達續約時間
* @return
*/
public boolean isTimeToRenew() {
LocalDateTime now = LocalDateTime.now();
Duration duration = Duration.between(latestRenewTime, now);
return duration.toSeconds() >= ((double)(this.expiredTime / 3) * 2);
}
/**
* 是否達到最大續約次數
* @return
*/
public boolean exceedMaxRenewCount() {
return this.currentRenewCount >= this.maxRenewCount;
}
public synchronized void renew() {
this.currentRenewCount++;
this.latestRenewTime = LocalDateTime.now();
}
// 取消業務方法
public void cancel() {
thread.interrupt();
}
public String getKey() {
return key;
}
public long getExpiredTime() {
return expiredTime;
}
}
我們添??了一些關于鎖續約的方法:
isTimeToRenew(): 判斷是否可以對鎖進行續約exceedMaxRenewCount(): 判斷是否達到最大續約次數renew(): 來標記一次續約操作cancel(): 取消業務方法
3.2 定義一個鎖續約任務處理器
接著,我們定義一個定時執行該續約任務的 handler。該 handler 也比較簡答,核心邏輯是持有一個類型為 List<LockRenewTask>的 taskList 來添加續約任務,且使用一個 ScheduledExecutorService 來定時遍歷該 taskList 來執行續約任務。該 handler 再對外暴露一個 addRenewTask 方法,方便外部調用來添加續約任務到 taskList 中。
@Slf4j
@Component
public class LockRenewHandler {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 保障對 taskList的添加刪除操作是線程安全的
*/
private final ReentrantLock taskListLock = new ReentrantLock();
private final List<LockRenewTask> taskList = new ArrayList<>();
private final ScheduledExecutorService taskExecutorService;
{
taskExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
taskExecutorService.scheduleAtFixedRate(() -> {
try {
executeRenewTask();
} catch (Exception e) {
//錯誤處理
}
}, 1, 2, TimeUnit.SECONDS);
}
/**
* 添加續約任務
*/
public void addRenewTask(LockRenewTask task) {
taskListLock.lock();
try {
taskList.add(task);
} finally {
taskListLock.unlock();
}
}
/**
* 執行續約任務
*/
private void executeRenewTask() {
log.info("開始執行續約任務");
if (CollectionUtils.isEmpty(taskList)) {
return;
}
// 需要刪除的任務,暫存這個集合中 取消
List<LockRenewTask> cancelTask = new ArrayList<>();
// 獲取任務副本
List<LockRenewTask> copyTaskList = new ArrayList<>(taskList);
for (LockRenewTask task : copyTaskList) {
try {
// 判斷 Redis 中是否存在 key
if (!redisTemplate.hasKey(task.getKey())) {
cancelTask.add(task);
continue;
}
// 大于等于最大續約次數
if (task.exceedMaxRenewCount()) {
// 停止續約任務
task.cancel();
cancelTask.add(task);
continue;
}
// 到達續約時間
if (task.isTimeToRenew()) {
log.info("續約任務:{}", task.getKey());
redisTemplate.expire(task.getKey(), task.getExpiredTime(), TimeUnit.SECONDS);
task.renew();
}
} catch (Exception e) {
//錯誤處理
log.error("處理任務出錯:{}", task);
}
}
// 加鎖,刪除 taskList 中需要移除的任務
taskListLock.lock();
try {
taskList.removeAll(cancelTask);
// 清理cancelTask,避免堆積,產生內存泄露
cancelTask.clear();
} finally {
taskListLock.unlock();
}
}
}
總結一下 LockRenewHandler的主要作用:它負責管理和執行續約任務,以延長 Redis 中鍵的過期時間。
- 添加續約任務:
addRenewTask()方法允許添加新的續約任務到內部列表taskList中。 - 執行續約任務:
executeRenewTask()方法定期執行續約任務。它檢查每個任務的狀態,并根據需要續約Redis中的鍵。 - 移除完成的任務:維護一個
cancelTask列表,用于存儲需要從taskList中移除的任務。在executeRenewTask()方法中,它會將完成的任務添加到cancelTask列表中,并在之后將其從taskList中移除。
大概的工作流程如下:
-
續約任務被添加到
taskList中。 -
executeRenewTask()方法定期執行,它檢查每個任務的狀態:- 如果
Redis中不再存在該鍵,則取消任務。 - 如果任務的續約次數達到上限,則取消任務。
- 如果是時候續約了,則續約 Redis 中的鍵并更新任務的續約次數,記錄續約時間點。
- 如果
-
完成的任務被添加到
cancelTask列表中。 -
executeRenewTask()方法獲取taskList的副本,并從副本中移除cancelTask中的任務,并且在完成移除任務操作后清空cancelTask。 -
更新后的
taskList被保存回類中。
兩個需要注意的點
- 我們遍歷
taskList時拷貝了一份副本進行遍歷,因為taskList是可變的,這樣可以避免在遍歷的時候產生并發修改問題。 cancelTask需要清理,避免產生內存泄漏。
通過這種方式,LockRenewHandler 可以確保 Redis 中的鍵在需要時得到續約,并自動移除完成或失敗的任務。
3.3 添加鎖續約任務
在上面 3.1 節和 3.2 節我們定義好了鎖續約任務和處理鎖續約任務的核心代碼,接下來我們需要在第 2 節加鎖解鎖的 AOP 處理邏輯上進行一點小小的修改,主要就是在執行加鎖之后,執行業務代碼之前,添加上鎖續約任務。修改位置如下:
public void invoke(ProceedingJoinPoint joinPoint) throws Exception {
... // 省略代碼
try {
// 添加鎖續約任務
LockRenewTask task = new LockRenewTask(key, annotation.expiredTime(), annotation.maxRenew(), Thread.currentThread());
lockRenewHandler.addRenewTask(task);
log.info("添加續約任務, key:{}", key);
// 執行業務
joinPoint.proceed();
} catch (Throwable e) {
log.error("業務執行出錯!");
} finally {
// 解鎖時進行校驗,只刪除自己線程加的鎖
String value = (String) redisTemplate.opsForValue().get(key);
if (uuid.equals(value)) {
redisTemplate.delete(key);
} else {
log.warn("鎖已過期!");
}
}
... // 省略代碼
}
到這里,我們的分布式鎖已經相當完善了,把鎖自動續約的功能也加上了。當然,還沒有實現鎖的可重入性。

浙公網安備 33010602011771號