詳細分析Redisson的分布式鎖
在Redisson中,鎖的續期是一個關鍵特性,用于確保在鎖的持有者仍在執行任務期間,鎖不會被意外釋放。

看門狗什么時間被啟用
Redisson中的看門狗(watchdog)機制的行為確實與是否顯式指定鎖的超時時間有關。
- lock() 方法與看門狗:
- 當您使用 lock() 方法而不傳遞任何參數時,Redisson默認會啟動看門狗機制。這是因為沒有指定具體的鎖超時時間,Redisson會認為需要自動續期鎖,以防止因客戶端崩潰或其他原因導致鎖未被釋放而造成的死鎖問題。
- lock(long leaseTime, TimeUnit unit) 方法與看門狗:
- 如果您在調用 lock() 方法時顯式指定了鎖的超時時間(例如 lock(5000, TimeUnit.SECONDS)),則Redisson不會啟動看門狗機制。這是因為您已經指定了鎖的確切過期時間,Redisson會認為您希望在指定的時間內持有鎖,而不希望自動續期。
- tryLock() 方法與看門狗:
- 使用 tryLock() 方法時,如果不傳遞 leaseTime 參數或者傳遞的 leaseTime 不大于0,Redisson會啟動看門狗機制。這是因為看門狗機制用于在鎖的持有期間自動續期,確保業務邏輯能夠在鎖釋放前完成。
renewExpiration方法
鎖的續期機制在Redisson中是自動管理的,鎖的續期是基于一個定時任務的機制,定期檢查鎖的狀態并決定是否需要續期。具體實現為:
private void renewExpiration() {
// 1、首先會從EXPIRATION_RENEWAL_MAP中獲取一個值,如果為null說明鎖可能已經被釋放或過期,因此不需要進行續期,直接返回
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 2、基于TimerTask實現一個定時任務,設置internalLockLeaseTime / 3的時長進行一次鎖續期,也就是每10s進行一次續期。
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 從EXPIRATION_RENEWAL_MAP里獲取一個值,檢查鎖是否被釋放
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
// 如果為null則說明鎖也被釋放了,不需要續期
if (ent == null) {
return;
}
// 如果不為null,則獲取第一個thread(也就是持有鎖的線程)
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 如果threadId 不為null,說明需要續期,它會異步調用renewExpirationAsync(threadId)方法來實現續期
RFuture<Boolean> future = renewExpirationAsync(threadId);
// 處理結果
future.onComplete((res, e) -> {
// 如果有異常
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
// 如果續期成功,則會重新調用renewExpiration()方法進行下一次續期
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
具體步驟和邏輯分析
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
首先,從 EXPIRATION_RENEWAL_MAP 中獲取當前鎖的 ExpirationEntry 對象。如果該對象為null,說明鎖可能已經被釋放或過期,因此不需要進行續期,直接返回。
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
...
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
如果當前鎖的 ExpirationEntry 對象不是null,就會繼續往下執行,創建一個定時任務。這個定時任務的代碼實現了一個鎖的續期機制,具體步驟和邏輯分析如下:
在代碼中,定時任務是通過 commandExecutor.getConnectionManager().newTimeout(...) 方法創建的,該任務的延遲時間設置為 internalLockLeaseTime / 3 毫秒,即每次續期的時間間隔。
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
在定時任務的 run 方法中,首先嘗試從 EXPIRATION_RENEWAL_MAP 中獲取與當前鎖對應的 ExpirationEntry 實例。如果獲取到的 ExpirationEntry 為 null,則說明鎖已經被釋放,此時無需續期,直接返回。
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
如果獲取到的 ExpirationEntry 不為 null,說明如果鎖仍然有效,繼續往下走,接下來獲取持有該鎖的線程 ID。如果 threadId 為 null,也說明鎖可能已經被釋放,直接返回。
RFuture<Boolean> future = renewExpirationAsync(threadId);
如果持有鎖的線程 ID 不為 null,繼續往下走,則調用 renewExpirationAsync(threadId) 方法異步續期鎖的有效期。
繼續進入這個renewExpirationAsync()方法,可以看到,方法的主要功能是延長鎖的有效期。下面是對這段代碼的詳細分析:
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
renewExpiration()函數內部的RFuture
- 返回類型:RFuture
表示該方法返回一個表示異步操作結果的未來對象,最終會得到一個布爾值,指示續期操作是否成功。 - 參數:long threadId 是持有鎖的線程 ID,用于標識當前續期操作是否適用于該線程。
這個renewExpirationAsync()是一個異步刷新有效期的函數,它主要是用evaLWriteAsync()方法來異步執行一段Lua腳本,重置當前threadId線程持有的鎖的有效期。也就是說該方法負責執行給定的Lua腳本,以實現分布式鎖的續期。
- KEYS[1]:代表鎖的名稱,即 Redis 鍵。
- ARGV[1]:引用傳入的第一個非鍵參數,表示希望設置的新過期時間(毫秒),鎖的默認租約時間為internalLockLeaseTime。
- ARGV[2]:引用傳入的第二個非鍵參數,表示通過getLockName(threadId)根據線程ID生成特定的鎖標識符,確保操作的是特定線程的鎖。簡單說就是持有鎖的線程id。
- getName():獲取當前鎖的名稱,用于作為Redis中的鍵。
- LongCodec.INSTANCE:編碼器,指示如何處理數據的序列化與反序列化。
- RedisCommands.EVAL_BOOLEAN:表示執行的命令類型,這里是執行一個返回布爾值的Lua腳本。
Lua腳本中,首先執行redis.call('hexists', KEYS[1], ARGV[2]) == 1,該命令檢查鎖的名稱KEYS[1]下是否存在持有該鎖的線程ID(ARGV[1])。如果存在,說明該線程仍然是鎖的持有者,則調用pexpire命令redis.call('pexpire', KEYS[1], ARGV[1])更新鎖的過期時間。如果續期成功,返回1,否則返回0。
因此,Lua腳本中的整體邏輯是如果當前key存在,說明當前鎖還被該線程持有,那么就重置過期時間為30s,并返回true表示續期成功,反之返回false。
這段代碼的設計充分利用了Redis的Lua腳本特性,實現了高效且原子化的鎖續期邏輯,減少了并發操作中的 race condition 問題,同時提供了異步執行的能力,提升了系統的響應性和性能。
然后,我們退回到renewExpiration()方法中,繼續往下走,
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
renewExpiration();
}
});
通過 onComplete 方法處理續期操作的結果,如果e 不為 null,說明有異常則記錄錯誤日志。如果res 為 true,說明續期成功則調用 renewExpiration() 方法,安排下一次的續期操作。
總結一下,整體流程就是,在代碼中,定時任務是通過 commandExecutor.getConnectionManager().newTimeout(...) 方法創建的。該任務會在指定的時間(internalLockLeaseTime / 3 毫秒)后執行一次。每當任務執行時,都會檢查當前鎖的狀態,并嘗試續期。如果需要續期(即鎖仍然有效),則會調用 renewExpiration() 方法。
為什么需要遞歸調用?
在鎖的實現中,為了確保鎖在持有者處理任務期間保持有效,通常會設置一個有效期(lease time)。在有效期內,如果持有鎖的線程仍然在執行任務,那么它需要定期續期,以防止在任務完成前鎖過期,從而導致其他線程獲取鎖。
遞歸調用的機制:在 run 方法的最后,如果續期成功,調用 renewExpiration() 方法。這通常意味著該方法會重新安排另一個定時任務,相當于在每次續期后再次創建一個新的定時任務,使得續期操作可以持續進行。這種遞歸調用的方式確保了只要鎖仍然被持有,續期操作就會不斷地被調度,從而保持鎖的有效性。
定時任務的生命周期?
每個定時任務的生命周期是短暫的,完成一次 run 方法的執行后,該任務就結束了。然后,通過遞歸調用,可能會創建新的定時任務,從而繼續續期。
(1)任務通過 newTimeout 被創建,并且首次執行會在 internalLockLeaseTime / 3 毫秒后觸發。這個時間間隔確保了任務在鎖的生命周期的早期進行檢查和續期。此時,任務進入其生命周期,準備執行。
(2)當定時任務第一次執行時,run() 方法被調用。它主要的任務是:
- 從 EXPIRATION_RENEWAL_MAP 獲取鎖的狀態。
- 如果鎖被釋放(ent == null),任務直接返回,不再進行續期。
- 如果鎖仍然存在并且當前線程持有鎖(threadId != null),則異步調用 renewExpirationAsync(threadId) 來續期鎖。
- 在續期的異步任務完成后,如果續期成功(res == true),會重新調用 renewExpiration() 進行下一次續期。
(3)續期條件:如果任務成功續期,它會在異步任務的 onComplete 回調中再次調用 renewExpiration() 方法。renewExpiration() 負責創建一個新的定時任務,這意味著每次任務續期成功后,系統會重新調度一個新的定時任務,以確保鎖的有效期能夠持續。
這個 renewExpiration() 方法的調用實際上是遞歸調用新的定時任務,續期繼續進行下去。每次任務執行后,都可能會創建一個新的任務,直到鎖被釋放。
(3)定時任務的生命周期可能在以下情況下終止:
- 鎖被釋放:當 EXPIRATION_RENEWAL_MAP.get(getEntryName()) 返回 null,表示鎖已經被釋放,定時任務會停止續期,不再創建新的定時任務。
- 無持有鎖的線程:如果沒有線程持有鎖(即 threadId == null),任務也會停止續期。
- 異步任務失敗:如果續期的異步任務失敗(例如網絡問題、數據庫問題等),則可能無法繼續續期。不過在代碼中,如果發生異常,它只會記錄錯誤,并不會立即停止整個續期機制,但最終續期將會失敗并終止。
定時任務的生命周期從它的創建開始,通過定期執行檢查和續期,直到鎖被釋放或沒有線程持有鎖時,任務才會停止。每次續期成功后,新的定時任務會繼續執行,確保鎖的有效期在持鎖線程存在時不會過期。
因此,雖然定時任務會被創建并執行,但它的執行是基于持鎖狀態的,只有在鎖有效且持有者仍在執行任務的情況下才會持續進行續期。這個設計確保了資源的有效管理,避免不必要的續期操作。
gpt總結
在 Redisson 中,renewExpiration() 方法是分布式鎖續期機制的核心部分。該方法的主要作用是延長鎖的過期時間,確保持有鎖的線程在執行任務期間不會因為鎖的超時而被其他線程搶占。下面將詳細分析 renewExpiration() 方法的實現及其工作原理。
1. 方法概述
renewExpiration() 方法通常是在鎖被持有時定期調用的,用于更新 Redis 中存儲的鎖狀態,以防止鎖過期。該方法的基本邏輯是:
- 檢查當前線程是否仍然持有鎖。
- 如果是,則更新鎖的過期時間。
2. 實現細節
以下是 renewExpiration() 方法的一些關鍵點和步驟(具體實現可能會因版本而異):
2.1 鎖的狀態檢查
在調用 renewExpiration() 方法之前,Redisson 會首先確認當前線程是否持有鎖。這通常通過檢查與鎖相關的標識符(如線程 ID 或 UUID)來實現。
if (!isHeldByCurrentThread()) {
return;
}
2.2 獲取當前鎖的過期時間
如果當前線程確實持有鎖,接下來會獲取當前鎖的過期時間。這個時間通常是通過 Redis 中存儲的鍵值對來讀取的。
2.3 更新過期時間
一旦確認當前線程持有鎖并獲取了過期時間,Redisson 將使用 Redis 的命令(如 EXPIRE 或 SETEX)來更新鎖的過期時間。例如:
redisCommands.expire(lockKey, newExpirationTime);
這里的 newExpirationTime 是根據配置或策略計算得出的新的過期時間。
2.4 異常處理
在執行續期操作時,Redisson 還需要處理可能發生的異常,例如網絡問題或 Redis 服務不可用等。適當的異常處理可以確保系統的健壯性和穩定性。
3. 續期策略
Redisson 通常會在后臺啟動一個線程定期調用 renewExpiration() 方法。續期的頻率和鎖的過期時間可以通過配置進行調整,常見的做法是設置續期時間為鎖過期時間的一半,以確保續期操作在鎖到期之前完成。
4. 總結
renewExpiration() 方法在 Redisson 分布式鎖中起到了至關重要的作用,它通過持續更新鎖的過期時間,避免了由于任務執行時間過長導致的鎖自動釋放。合理地配置續期策略可以大幅提升分布式系統的穩定性和數據一致性。
示例代碼
以下是一個簡化的示例,展示了如何在 Redisson 中使用分布式鎖以及續期機制:
RLock lock = redisson.getLock("myLock");
try {
// 嘗試獲取鎖,最多等待 10 秒,鎖自動過期時間為 30 秒
if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {
// 執行任務
// ...
// 定期續期
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(() -> {
lock.renewExpiration(); // 續期
}, 10, 10, TimeUnit.SECONDS); // 每 10 秒續期一次
}
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock(); // 確保釋放鎖
}
}
注意事項
- 使用分布式鎖時,要注意鎖的使用場景,避免產生死鎖或性能瓶頸。
- 配置合理的過期時間和續期策略,以適應不同的業務需求。
浙公網安備 33010602011771號