Java-函數(shù)式編程-實(shí)現(xiàn)分布式鎖工具
該業(yè)務(wù)適用場景:
-
分布式環(huán)境下的資源互斥訪問
-
防止重復(fù)執(zhí)行(如定時(shí)任務(wù))
-
庫存扣減等需要強(qiáng)一致性的操作
一. 背景
在開發(fā)過程中需要使用到分布式鎖的時(shí)候(以redisson為例). 一般會通過初始化RedissonClient配置, 然后在需要的地方使用. 當(dāng)使用的多的時(shí)候會發(fā)現(xiàn), 代碼中充斥著相似的代碼結(jié)構(gòu), 例如
String lockKey = RedisBaseKeyEnum.APP_USER_REGISTRY_VERIFY_CODE_KEY_LOCK.buildKey(account);
RLock lock = redissonClient.getLock(lockKey);
try {
if (lock != null && !lock.isLocked() && lock.tryLock(0, 60L, TimeUnit.SECONDS)) {
// some biz
}
} catch (InterruptedException e) {
LOG.error("加鎖失敗,key:{}, account:{}", lockKey, account, e);
Thread.currentThread().interrupt();
throw new BizException(BizErrorCode.VERIFY_CODE_SENDER_ERROR);
} finally {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
簡單的加鎖過程類似于上面的代碼. 所以為了提升代碼的可復(fù)用性, 降低重復(fù)代碼, 提高系統(tǒng)可維護(hù)性. 我們可以通過Java中的函數(shù)式編程, 將重復(fù)的代碼塊抽象出一個(gè)模板. 然后具體的業(yè)務(wù)通過作為方法入?yún)?/strong> 來實(shí)現(xiàn).
二. 核心點(diǎn)
-
抽象分布式鎖業(yè)務(wù)
-
使用@FunctionalInterface注解將需要注入的業(yè)務(wù)抽象出來
-
注入使用
三. 實(shí)現(xiàn)方式
-
定義函數(shù)式接口 @FunctionalInterface
============================= 加鎖業(yè)務(wù) 有返回值 ===============================
package com.sj.utils.lock.func;
@FunctionalInterface
public interface LockTask {
void run() throws Exception; // 沒有入?yún)⒌姆椒?}
============================= 加鎖業(yè)務(wù)沒有返回值 ===============================
package com.sj.utils.lock.func;
@FunctionalInterface
public interface LockTask {
void run() throws Exception; // 沒有入?yún)⒌姆椒?
}
-
定義Redisson分布式鎖類型枚舉
package com.sj.utils.lock;
public enum RLockType {
REENTRANT, // 可重入鎖
FAIR, // 公平鎖
READ, // 讀鎖
WRITE, // 寫鎖
TRY, // 嘗試獲取鎖
TRY_WITH_FAIL // 嘗試失敗鎖 tryLock中waitTime為0
}
然后
===========================RLockSimpleUtil========================
public RLock getLock(String lockKey, RLockType lockType) {
switch (lockType) {
case TRY:
case REENTRANT:
case TRY_WITH_FAIL:
return redissonClient.getLock(lockKey);
case FAIR:
return redissonClient.getFairLock(lockKey);
case READ:
return redissonClient.getReadWriteLock(lockKey).readLock();
case WRITE:
return redissonClient.getReadWriteLock(lockKey).writeLock();
default:
throw new IllegalArgumentException("Unknown lock type: " + lockType);
}
}
-
定義RLockSimpleUtil
package com.sj.utils.lock;
import com.sj.utils.lock.func.LockException;
import com.sj.utils.lock.func.LockTask;
import com.sj.utils.lock.func.LockTaskWithResult;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
public class RLockSimpleUtil {
private final Logger log = LoggerFactory.getLogger(RLockSimpleUtil.class);
private final RedissonClient redissonClient;
// 鎖默認(rèn)參數(shù)
private final long DEFAULT_WAIT_TIME = 5; // 秒
private final long DEFAULT_LEASE_TIME = 30; // 秒
private final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;
public RLockSimpleUtil(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
/**
* 執(zhí)行帶返回值的任務(wù),默認(rèn)使用可重入鎖
*
* @param lockKey 鎖鍵
* @param task 任務(wù)
* @param <T> 任務(wù)返回值類型
* @return 任務(wù)返回值
*/
public <T> T executeWithResult(String lockKey, LockTaskWithResult<T> task) {
return executeWithResult(lockKey, RLockType.REENTRANT, DEFAULT_WAIT_TIME, DEFAULT_LEASE_TIME, DEFAULT_TIME_UNIT, task);
}
/**
* 執(zhí)行帶返回值的任務(wù),支持指定鎖類型和超時(shí)時(shí)間
*
* @param lockKey 鎖鍵
* @param lockType 鎖類型
* @param waitTime 等待時(shí)間
* @param leaseTime 租約時(shí)間
* @param unit 時(shí)間單位
* @param lockTask 任務(wù)
* @param <T> 任務(wù)返回值類型
* @return 任務(wù)返回值
*/
public <T> T executeWithResult(String lockKey,
RLockType lockType,
long waitTime,
long leaseTime,
TimeUnit unit,
LockTaskWithResult<T> lockTask) {
RLock lock = getLock(lockKey, lockType);
boolean acquired = false;
try {
acquired = lock.tryLock(waitTime, leaseTime, unit);
if (!acquired) {
throw new LockException("Failed to acquire lock: " + lockKey);
}
return lockTask.runWithResult();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LockException("Lock acquisition interrupted", e);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
unlockQuietly(lock, acquired);
}
}
/**
* 執(zhí)行無返回值的任務(wù),默認(rèn)使用可重入鎖
*
* @param lockKey 鎖鍵
* @param lockTask 任務(wù)
*/
public void execute(String lockKey, LockTask lockTask) {
execute(lockKey, RLockType.REENTRANT, DEFAULT_WAIT_TIME, DEFAULT_LEASE_TIME, DEFAULT_TIME_UNIT, lockTask);
}
/**
* 執(zhí)行無返回值的任務(wù),支持指定鎖類型和超時(shí)時(shí)間
*
* @param lockKey 鎖鍵
* @param lockType 鎖類型
* @param waitTime 等待時(shí)間
* @param leaseTime 租約時(shí)間
* @param timeUnit 時(shí)間單位
* @param lockTask 任務(wù)
*/
public void execute(String lockKey, RLockType lockType, long waitTime, long leaseTime, TimeUnit timeUnit, LockTask lockTask) {
RLock lock = getLock(lockKey, lockType);
boolean acquired = false;
try {
acquired = lock.tryLock(waitTime, leaseTime, timeUnit);
if (!acquired) {
throw new LockException("Failed to acquire lock: " + lockKey);
}
lockTask.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LockException("Interrupted while trying to acquire lock: " + lockKey);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
unlockQuietly(lock, acquired);
}
}
/**
* 獲取指定鎖類型的鎖實(shí)例
*
* @param lockKey 鎖鍵
* @param lockType 鎖類型
* @return 鎖實(shí)例
*/
public RLock getLock(String lockKey, RLockType lockType) {
switch (lockType) {
case TRY:
case REENTRANT:
case TRY_WITH_FAIL:
return redissonClient.getLock(lockKey);
case FAIR:
return redissonClient.getFairLock(lockKey);
case READ:
return redissonClient.getReadWriteLock(lockKey).readLock();
case WRITE:
return redissonClient.getReadWriteLock(lockKey).writeLock();
default:
throw new IllegalArgumentException("Unknown lock type: " + lockType);
}
}
/**
* 安靜地解鎖鎖實(shí)例
*
* @param lock 鎖實(shí)例
* @param acquired 是否成功獲取鎖
*/
private void unlockQuietly(RLock lock, boolean acquired) {
try {
if (acquired && lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
} catch (Exception e) {
if (lock != null && lock.isHeldByCurrentThread()) {
log.warn("Error unlocking: {} ", e.getMessage(), e);
lock.unlock();
}
}
}
}
四. 使用方式
-
在具體的業(yè)務(wù)中注入RLockSimpleUtil
-
根據(jù)具體業(yè)務(wù), 直接調(diào)用util中的具體方法, 例如一個(gè)簡單的加鎖業(yè)務(wù):
@Override
public ShortAddressResult acquire(ShortAddressTypeEnum type, Long householdId, Integer count) {
String key = RedisBaseKeyEnum.DEVICE_SHORT_ADDRESS_LIST_CACHE_KEY_LOCK
.buildKey(householdId.toString());
return rLockSimpleUtil.executeWithResult(
key,
RLockType.FAIR,
30,
3,
TimeUnit.SECONDS,
// 下方執(zhí)行具體的加鎖業(yè)務(wù)
() -> {
String cacheKey = getCacheKey(type, householdId);
return shortAddressPoolService.acquire(type, cacheKey, householdId, count);
});
}
五. 補(bǔ)充
-
Java內(nèi)置的函數(shù)式接口:
-
Runnable - 無參數(shù)無返回值
-
Supplier
- 無參數(shù)有返回值 -
Consumer
- 有參數(shù)無返回值 -
Function<T,R> - 有參數(shù)有返回值
-
Predicate
- 有參數(shù)返回布爾值
-
潛在問題
-
Redis 單點(diǎn)依賴
-
依賴 Redis 可用性
-
Redis 故障會影響所有鎖操作
-
-
鎖超時(shí)風(fēng)險(xiǎn)
-
固定租約時(shí)間可能不適合所有場景
-
長時(shí)間任務(wù)可能導(dǎo)致鎖提前釋放
-
-
性能考慮
-
每次鎖操作都需要網(wǎng)絡(luò)通信
-
高并發(fā)場景下可能成為瓶頸
-
-
異常處理可能過于寬泛
-
缺少監(jiān)控指標(biāo)
-
沒有鎖獲取成功率統(tǒng)計(jì)
-
沒有鎖持有時(shí)間監(jiān)控
-
本文來自博客園,作者:你啊347,轉(zhuǎn)載請注明原文鏈接:http://www.rzrgm.cn/LinKinSJ/p/19178886

浙公網(wǎng)安備 33010602011771號