<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Redis下篇--分布式鎖

      Redis下篇--分布式鎖

      本文示例代碼見GITEE倉庫中的【redis-analysis】

      地址:https://gitee.com/quercus-sp204/new-technology/tree/master/redis-analysis

      1.基本介紹

      Redis分布式鎖是一種基于Redis實現的跨進程互斥機制,用于在分布式系統中控制多個服務/節點對共享資源的并發訪問,確保同一時刻只有一個客戶端能執行關鍵操作(如修改共享數據、執行任務等)。

      在單機系統中,我們可以用線程鎖(如Java的synchronizedReentrantLock)保證并發安全。但在分布式系統中【項目集群部署】,服務部署在多臺機器上,跨進程的共享資源無法通過本地鎖保護。如下圖1所示:

      此時就需要分布式鎖。我們需要用它來:

      1. 解決分布式競爭條件:當多個服務實例同時操作共享資源(如庫存扣減、訂單狀態更新)時,可能因并發寫入導致數據錯誤(例如超賣)。分布式鎖確保同一時刻只有一個實例能執行操作。

      2. 替代低效方案:傳統方案(如數據庫行鎖)在并發高時性能差,而Redis作為內存數據庫,高性能(10萬+ QPS)原子操作特性使其成為理想的分布式鎖實現基礎。

      3. 保證操作原子性:分布式鎖將臨界區操作(如“查詢+修改”)封裝為原子操作,避免多個客戶端交叉執行引發的邏輯錯誤。

      大致示意圖如下圖2所示:(就是多加了一層!)

      上面說了,用于分布式系統的,我在這里拋出一個小問題:現在也有很多應用不是微服務架構的,我們是不是沒有必要上分布式鎖呢?熟悉java的人都知道,我們可以用java自帶的synchronized、ReentrantLock鎖呀,我們為什么要這個分布式鎖呢?好像確實有些許道理。這個問題先放一放。

      介紹完了其基本信息,下面給出其在實際生產中的應用場景:

      1. 防止重復操作:用戶重復提交訂單:方案:用訂單ID+操作類型作為鎖key,確保同一訂單的支付操作只執行一次。
      2. 高并發庫存扣減
      # 偽代碼示例【大致流程】
      lock_key = "stock_lock:product_123"
      if redis.set(lock_key, "1", nx=True, ex=5):  # 獲取鎖
          try:
              stock = db.query("SELECT stock FROM products WHERE id=123")
              if stock > 0:
                  db.execute("UPDATE products SET stock = stock - 1 WHERE id=123")
          finally:
              redis.del(lock_key)  # 釋放鎖
      
      1. 分布式任務調度:多個節點競爭執行定時任務(如每天0點統計報表,或者是每天定時數據庫做一些數據修改之類的),在要執行定時任務的時候,我們可以用任務名作為key,搶到鎖的節點執行任務。

      等等場景都可以用到分布式鎖。其實分布式鎖實現方式不只有Redis可以實現,還可以用其他方案來實現,比如說

      我們可以用數據庫來實現分布式鎖,通過SELECT ... FOR UPDATE對數據庫記錄加行鎖,亦或者是在表中增加版本號字段,更新時校驗版本號(CAS機制);但是頻繁加鎖導致數據庫IO壓力大,高并發下延遲顯著,同時事務未提交或超時可能引發死鎖。

      還可以使用Zookeeper來創建臨時結點來達到類似的效果,創建臨時順序節點,最小序號節點獲鎖,其他節點監聽前序節點刪除事件,節點宕機時臨時節點自動刪除,避免死鎖。但是相比于redis來說,節點創建/刪除及事件通知的開銷較大(低于Redis),同時可能需維護ZooKeeper集群,開發成本較高。

      實際生產要將成本和系統復雜度綜合起來考慮,最后再決定采用哪種方案,但是Redis這個中間件對于現在的很多系統來說,我認為是用得非常普遍了吧。在綜合考慮性能、系統復雜度、以及業界普遍方案,本文就以Redis實現分布式鎖展開了。

      2. Redis實現分布式鎖

      2.1 基礎版setnx

      SETNX lock_key 1  # 嘗試獲取鎖
      DEL lock_key      # 釋放鎖
          
      @Component
      public class BaseLock {
          @Resource
          private RedisTemplate<String, Object> redisTemplate;
      
          public boolean tryLock(String lockName) {
              Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockName, "lock");
              return lock != null && lock;
          }
      
          public void unLock(String lockName) {
              redisTemplate.delete(lockName);
          }
      }
      
      // 測試使用
      public void test() {
          // 1.加鎖
          try {
              bool res = baseLock.tryLock(key);
              if ( res ) {
                  do......
              } else {
                  獲取鎖失敗
              }
          } catch( e ) {
              ........
          } finally{
              baseLock.unLock(key) // 如果這一行死活執行不了
          }
      }
      

      這絕對是有一點問題的,如果有異常原因,在執行baseLock.unLock(key) 這行代碼的時候出現異常怎么辦,可能這個時候網絡異常?Redis客戶端斷開了?等等原因,導致該key沒有被刪掉,那么就會有死鎖問題。下面為了避免這個問題,我們把自動刪除key的操作,加一個過期時間,讓redis-server為我們兜一下底。

      2.2 過期時間

      在設置過期時間這一點上,需要提前說明一下,Redis是有EXPIRE這個命令的,它為鎖設置過期時間,避免死鎖(如 EXPIRE lock_key 10),如果是先setnx,再expire,這就是兩條命令了,所以我們需要保證兩條命令的原子性。但是Redis如今都發展到版本7.x了,Redis早就提供了原子命令:

      SET lock_key unique_value NX EX 10 #同時完成鎖設置和過期時間,避免非原子性問題
      

      本文就直接用上面這個了。

      // 方案二:set + nx + ex
      public boolean tryLock2(String lockName, int t) {
          Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockName, "lock", Duration.of(t, ChronoUnit.SECONDS));
          return lock != null && lock;
      }
      public void unLock2(String lockName) {
          redisTemplate.delete(lockName);
      }
      
      // 測試使用
      public void test() {
          // 1.加鎖
          try {
              // 指定key的有效時間 假設time=5
              bool res = baseLock.tryLock2(key, time);
              if ( res ) {
                  do...... // 【但是這里執行6秒喔】
              } else {
                  獲取鎖失敗
              }
          } catch( e ) {
              ........
          } finally{
              baseLock.unLock2(key) //【釋放鎖處】
          }
      }
      

      上面這個方案,就算是【釋放鎖處】死活執行不了,有過期時間為我們兜底,不用擔心死鎖問題,過期時間到了就行了。上面仍然會有問題,且聽細細道來。

      看上面測試代碼的注釋 do...... // 【但是這里執行6秒喔】,就是說業務執行時間超過了鎖的過期時間,就會導致如下問題:

      請按照順序看上面的流程,我們可以知道發生了這樣一件事情:那就是線程1把線程2的鎖給刪了,出現誤刪的現象,然后后面就可能會引發一連串的錯誤了。

      下面來看方案3,前面兩個方案的value我們都沒有用到,現在要用到了:

      // 方案三:set + uuid
      public boolean tryLock3(String lockName, String value, int t) {
          Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockName, value, Duration.of(t, ChronoUnit.SECONDS));
          return lock != null && lock;
      }
      public void unLock3(String lockName, String value) {
          String lock = (String) redisTemplate.opsForValue().get(lockName);
          if ( lock != null && lock.equals(value)) { // 比較一下value,是不是自己的
              redisTemplate.delete(lockName);
          }
      }
      
      public void test() {
          String lockName = "lock";
          String uuid = UUID.randomUUID().toString();
          try {
              boolean b = tryLock3(lockName, uuid, 5);
              if (b) {
                  // 獲取鎖成功
                  // 邏輯代碼
                  Thread.sleep(5000);
              } else {
                  // 獲取鎖失敗
              }
          } catch (Exception e ) {
              // ...
          } finally {
              unLock3(lockName, uuid);
          }
      }
      

      雖然這樣看起來不會釋放別人的鎖了,但是如果業務超時,還是會出現有多個線程進入臨界區的情況,這是不希望看到的。還有一個小問題,就是unLock3的那兩步不是原子操作,極端情況也會出現誤刪問題:下面給出deepseek的極端情況推演圖

      2.3 lua腳本

      所以要保證原子性。我們使用redis lua腳本來保證這兩部操作的原子性

      private static final String UNLOCK_SCRIPT =
                  "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                          "   return redis.call('del', KEYS[1]) " +
                          "else " +
                          "   return 0 " +
                          "end";
      // 刪除鎖的時候用lua腳本 unLock3優化版
      public void unLock3(String lockName, String value) {
          DefaultRedisScript<Long> script = new DefaultRedisScript<>(UNLOCK_SCRIPT, Long.class);
          redisTemplate.execute(script, Collections.singletonList(lockName), value);
      }
      

      這里順帶提一嘴,Redis不是還有事務嗎?我們可以用事務嗎:

      public void unsafeUnlockWithTransaction(String lockName, String value) {
          redisTemplate.execute(new SessionCallback<>() {
              @Override
              public Object execute(RedisOperations operations) {
                  operations.watch(lockName);
                  String lockValue = (String) operations.opsForValue().get(lockName);
                  if (value.equals(lockValue)) {
                      operations.multi();
                      operations.delete(lockName);
                      operations.exec(); // 事務提交
                  } else {
                      operations.unwatch();
                  }
                  return null;
              }
          });
      }
      

      那么,這個Redis事務在第三章補充一下,在此處先放一下。再次回到2.2小節的問題,那就是業務時間 > 鎖持有時間,還是會導致多個線程進入臨界區的情況,所以這個過期時間是我們要著重考慮的問題了。

      2.4 鎖續期

      對于這個問題,我們可以有這樣的實現思路,啟動一個線程在后臺對key進行續期不就好了?可以我們可以利用Java的周期任務線程來完成這個效果,簡單示例代碼如下:

      @Component
      @Slf4j
      public class AutoExtensionLock {
          @Resource
          private RedisTemplate<String, Object> redisTemplate;
          // 鎖的隊列
          private static ConcurrentLinkedQueue<RedisLockHolder> keys = new ConcurrentLinkedQueue<>();
      
          private static final ScheduledExecutorService SCHEDULER =
                  new ScheduledThreadPoolExecutor(1,
                      new BasicThreadFactory.Builder().daemon(true).namingPattern("redis-AutoExtensionLock-%d").build());
      
          @PostConstruct
          public void init() {
              // 定時檢查鎖的到期時間
              SCHEDULER.scheduleAtFixedRate(() -> {
                  Iterator<RedisLockHolder> iterator = keys.iterator();
                  while (iterator.hasNext()) {
                      RedisLockHolder redisLockHolder = iterator.next();
                      log.info("redis-AutoExtensionLock-info, {}", redisLockHolder.toString());
                      try { //try-catch起來否者報錯后定時任務將不會再運行
                          if (redisLockHolder.getEndTime() >= System.currentTimeMillis()) {
                              Object v = redisTemplate.opsForValue().get(redisLockHolder.getKey());
                              if ( v == null ) { // 鎖不存在
                                  iterator.remove();
                              } else {
                                  long holderExpireTime = redisLockHolder.getEndTime();
                                  long now = System.currentTimeMillis();
                                  long during = redisLockHolder.getDuring();
                                  int maxRetry = redisLockHolder.getMaxRetry();
                                  int nowCount = redisLockHolder.getNowCount();
                                  long l = holderExpireTime - now; // 鎖的剩余時間
                                  if ( nowCount == maxRetry ) {
                                      log.info("redis-AutoExtensionLock-error, {}", "重試次數已滿");
                                      iterator.remove();
                                  } else if ( l <= 0 ) {
                                      log.info("redis-AutoExtensionLock-error, {}", "鎖已過期");
                                      iterator.remove();
                                  }
                                  // 如果 l <= during / 3
                                  else if ( l <= during / 3.0 ) {
                                      log.info("redis-AutoExtensionLock-info, {}", "鎖即將過期,開始自動續期");
                                      redisTemplate.expire(redisLockHolder.getKey(), redisLockHolder.getDuring(), TimeUnit.SECONDS);
                                      redisLockHolder.setNowCount(nowCount + 1);
                                      redisLockHolder.setEndTime(now + during);
                                  }
                              }
                          }
                          else {
                              iterator.remove();
                          }
                      } catch (Exception e) {
                          // ....
                          log.info("redis-AutoExtensionLock-error, {}", e.toString());
                          iterator.remove();
                      }
                  }
      
              }, 0, 3000, TimeUnit.MILLISECONDS);
          }
      
          // 嘗試獲取鎖
          public boolean tryLock4(String lockName, String value, long t ) {
              Boolean b = redisTemplate.opsForValue().setIfAbsent(lockName, value, Duration.of(t, ChronoUnit.SECONDS));
              if (b != null && b) {
                  keys.add(new RedisLockHolder(lockName, value, t * 1000,System.currentTimeMillis() + (t * 1000), 0, 5));
                  return true;
              }
              return false;
          }
      
          // 釋放鎖
          private static final String UNLOCK_SCRIPT =
                  "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                          "   return redis.call('del', KEYS[1]) " +
                          "else " +
                          "   return 0 " +
                          "end";
          public void unlock4(String lockName, String value) {
              DefaultRedisScript<Long> script = new DefaultRedisScript<>(UNLOCK_SCRIPT, Long.class);
              redisTemplate.execute(script, Collections.singletonList(lockName), value);
          }
      
          @Data
          @AllArgsConstructor
          @NoArgsConstructor
          private static class RedisLockHolder {
              public static final int MAX_RETRY = 5;
      
              private String key;
              private String value;
              private long during; // 鎖的持續時間 --- 毫秒
              private long endTime; // 到期時間
              private int nowCount; // 當前重試次數
              private int maxRetry; // 重試次數
          }
      }
      

      像這種守護線程去續期鎖的機制,叫做“看門狗機制”。

      2.5 Redisson

      2.5.1 基本介紹

      如果自行去實現各種api,相比是有點麻煩的,所以Redisson 是一個基于 Redis 的高性能 Java 客戶端庫,專為分布式系統設計。它不僅封裝了 Redis 的基礎操作(如鍵值存儲),還提供了豐富的分布式數據結構和服務,簡化了分布式環境下的開發復雜度。Redisson 的核心價值在于 將分布式系統的通用能力(鎖、隊列、緩存等)抽象為易用的 Java API,開發者無需重復造輪子即可構建高可靠分布式應用。Redisson的宗旨是促進使用者對 Redis 的關注分離(Separation of Concern),從而讓使用者能夠將精力更集中地放在處理業務邏輯上

      如果我們的項目中已使用 Redis,那么Redisson可能是比原生客戶端更高效的選擇。

      • 分布式鎖

      它在分布式鎖的場景下,實現了可重入鎖(Reentrant Lock):同一線程多次獲取同一鎖不會死鎖,通過 Hash 結構存儲線程 ID 和重入計數器;還有公平鎖(Fair Lock)【默認情況下redisson分布式鎖是非公平的,即任意時刻任意一個請求都可以在鎖釋放后爭搶分布式鎖】:基于 Redis 的 BRPOPLPUSH 命令,按請求順序分配鎖,避免饑餓問題;它也有自帶的看門狗喔(自動續期(Watchdog)):后臺線程自動延長鎖超時時間,防止業務未完成時鎖過期。

      Redisson的看門狗機制提供的默認超時時間是30*1000毫秒,也就是30秒

      如果一個線程獲取鎖后,運行程序到釋放鎖所花費的時間大于鎖自動釋放時間(也就是看門狗機制提供的超時時間30s),那么Redission會自動給redis中的目標鎖延長超時時間。在Redission中想要啟動看門狗機制,那么我們就不用獲取鎖的時候自己定義leaseTime(鎖自動釋放時間)。如果自己定義了鎖自動釋放時間的話,無論是通過lock還是tryLock方法,都無法啟用看門狗機制。但是,如果傳入的leaseTime為-1,也是會開啟看門狗機制的。

      這里拋出一個小問題:如果 業務一直沒執行完,那豈不是一直續期,相當于死鎖了嗎,這種情況怎么處理?

      乍一看,對喔,有道理啊,但是仔細一想,這是問題嗎?業務代碼怎么可能一直沒執行完(一直沒執行完難道不是業務程序的邏輯有問題嗎)?而且,看門狗是給拿到鎖的客戶端續期(有限時間),在看門狗或拿到鎖的客戶端宕機后(或是其它異常)就會停止續期,最后一次續期的時間就是他的有效期,到期自動釋放。這個鍋中間件不能背喔。

      RLock lock = redisson.getLock("orderLock");
      lock.lock(); // 獲取鎖(自動續期)
      try {
          // 業務操作
      } finally {
          lock.unlock(); // 原子化釋放
      }
      

      在分布式鎖的基礎上還提供了聯鎖(MultiLock),讀寫鎖(ReadWriteLock),公平鎖(Fair Lock),紅鎖(RedLock),信號量(Semaphore),可過期性信號量(PermitExpirableSemaphore)和閉鎖(CountDownLatch)這些實際當中對多線程高并發應用至關重要的基本部件

      • 分布式數據結構
      類型 實現類示例 功能說明
      Map RMap 分布式 HashMap,支持本地緩存優化
      Queue RBlockingQueueRDelayedQueue 阻塞隊列,支持任務調度;延遲隊列
      AtomicLong RAtomicLong 分布式原子計數器
      Bloom Filter RBloomFilter 高效大數據去重

      布隆過濾器見:

      CSDN:Redis中篇

      微信公眾號https://mp.weixin.qq.com/s/tcUQeX1PFOrW_ifSBytefQ

      在普通數據結構上,將原生的Redis Hash,List,Set,String,Geo,HyperLogLog等數據結構封裝為Java里大家最熟悉的映射(Map),列表(List),集(Set),通用對象桶(Object Bucket),地理空間對象桶(Geospatial Bucket),基數估計算法(HyperLogLog)等結構,

      • 網絡通信上

      Redisson采用了基于NIO的Netty框架,不僅能作為Redis底層驅動客戶端,具備提供對Redis各種組態形式的連接功能,對Redis命令能以同步發送、異步形式發送、異步流形式發送或管道形式發送的功能,LUA腳本執行處理,以及處理返回結果的功能

      可以說,Redisson算是Redis的一款開發利器了!怪不得叫他“瑞士軍刀”。。

      2.5.2 分布式鎖相關原理分析

      <dependency>
          <groupId>org.redisson</groupId>
          <artifactId>redisson-spring-boot-starter</artifactId>
          <version>3.45.1</version>
      </dependency>
      

      首先是這個版本。

      加鎖大致過程:指定一個 key 作為鎖標記,存入 Redis 中,指定一個 唯一的用戶標識 作為 value;當 key 不存在時才能設置值,確保同一時間只有一個客戶端進程獲得鎖,滿足 互斥性 特性;設置一個過期時間,防止因系統異常導致沒能刪除這個 key,滿足 防死鎖 特性;當處理完業務之后需要清除這個 key 來釋放鎖,清除 key 時需要校驗 value 值,需要滿足 只有加鎖的人才能釋放鎖

      現在看這個tryLock方法【無參數的】

      public boolean tryLock(String lockName) {
          return redissonClient.getLock(lockName).tryLock();
      }
      
      public void unLock(String lockName) {
          redissonClient.getLock(lockName).unlock();
      }
      
      // 帶超時的鎖
      public boolean tryLock(String lockName, long waitTime, long leaseTime) {
          try {
              return redissonClient.getLock(lockName).tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
          } catch (InterruptedException e) {
              log.error("tryLock 出現異常", e);
              return false;
          }
      }
      

      可以看到使用起來非常簡單吧。那我們就從加鎖、釋放鎖看起。

      我們先看加鎖,也就是這里:redissonClient.getLock(lockName).tryLock();

      // 首先是getLock
      // 【Redisson.java】
      public final class Redisson implements RedissonClient {
          @Override
          public RLock getLock(String name) {
              // 返回的是RLock類型【具體類型是RedissonLock】
              return new RedissonLock(commandExecutor, name);
          }
      }
      // 然后就是tryLock方法了
      // 【RedissonLock.java】
      public class RedissonLock extends RedissonBaseLock {
          @Override
          public boolean tryLock() {
              // 1.tryLockAsync()
              // 2.get( RFuture<Boolean> )
              // get( xxx ) 阻塞到tryLockAsync()返回
              return get(tryLockAsync());
          }
         
      }
      
      // 【RedissonBaseLock.java】
      public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
          @Override
          public RFuture<Boolean> tryLockAsync() {
              // 這里調用的tryLockAsync()方法,具體實現在【RedissonLock.java】
              // 這個方法是RLockAsync接口里面定義的
              // RLock接口繼承了RLockAsync接口,所以這里可以看到這個方法
              return tryLockAsync(Thread.currentThread().getId());
          }
      }
      
      // 現在又回到 【RedissonLock.java】
      @Override
      public RFuture<Boolean> tryLockAsync(long threadId) {
          // 先 () -> tryAcquireOnceAsync(-1, -1, null, threadId)
          // 返回RFuture<Boolean>是execute方法返回的
          return getServiceManager().execute(() -> tryAcquireOnceAsync(-1, -1, null, threadId));
      }
      private <T> void execute(AtomicInteger attempts, CompletableFuture<T> result, Supplier<CompletionStage<T>> supplier) {
          // 這里一行執行的是傳進來的supplier.....也就是() -> tryAcquireOnceAsync(-1, -1, null, threadId)
          CompletionStage<T> future = supplier.get(); 
          future.whenComplete((r, e) -> { 
              if (e != null) { // 有異常
                  if (.....) {
                      ....
      				// 重試
                      newTimeout(t -> execute(attempts, result, supplier),
                                 config.getRetryInterval(), TimeUnit.MILLISECONDS);
                      return;
                  }
                  result.completeExceptionally(e);
                  return;
              }
      		// 沒有異常,ok,future設置為完成
              result.complete(r);
          });
      }
      // () -> tryAcquireOnceAsync(-1, -1, null, threadId)
      private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
          CompletionStage<Boolean> acquiredFuture;
          // 如果給的所持有時間 > 0
          if (leaseTime > 0) {
              //場景:用戶顯式指定鎖超時時間(如 lock.lock(10, TimeUnit.SECONDS))
              acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
          } else { 
              // 如果 <= 0, 由于我們最外層是調用的tryLock(),沒有帶任何參數,走到這里的話,leaseTime傳過來的是-1
              // 在該方法下面看tryLockInnerAsync方法
              acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                   TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
          }
      
          acquiredFuture = handleNoSync(threadId, acquiredFuture);
      
          CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {
              // lock acquired
              if (acquired) {
                  if (leaseTime > 0) { // 如果傳進來的鎖持有時間>0
                      internalLockLeaseTime = unit.toMillis(leaseTime);
                  } else { 
                      // 如果傳進來的鎖持有時間 <= 0
                      // 會啟動看門狗喔
                      scheduleExpirationRenewal(threadId);
                  }
              }
              return acquired;
          });
          return new CompletableFutureWrapper<>(f);
      }
      // 可以看到底層是用了lua腳本喔
      <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
          return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, command,
              "if ((redis.call('exists', KEYS[1]) == 0) " + // 情況1:鎖不存在
                        "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " + // 情況2:鎖已被當前線程持有
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 重入計數+1
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " + // 設置/刷新過期時間
                    "return nil; " + // 返回nil表示成功
               "end; " +
               "return redis.call('pttl', KEYS[1]);", // 返回鎖剩余時間(毫秒)
             Collections.singletonList(getRawName()), // KEYS[1] = 鎖名稱
             unit.toMillis(leaseTime),  // ARGV[1] = 鎖持有時間(毫秒)
             getLockName(threadId));  // ARGV[2] = 線程標識符(UUID:threadId)
      }
      

      到這里我們知道,Redisson不是用的string數據類型的,而是hash類型的。在Redisson里面是這樣的:【可以看到是一個可重入鎖】

      key: test [hash類型的]
      value: 
      ----field: 285475da-9152-4c83-822a-67ee2f116a79:52 [線程ID]
      ----val: 1 [重入次數]
      

      看得有點兒長了,我們接著看看門狗是怎么運行的:

      // 接上文
      // 會啟動看門狗喔
      // 【RedissonLock.java】
      scheduleExpirationRenewal(threadId);
      // 這個方法其實是 【RedissonBaseLock.java】里面的
      protected void scheduleExpirationRenewal(long threadId) {
          renewalScheduler.renewLock(getRawName(), threadId, getLockName(threadId));
      }
      // 【LockRenewalScheduler.java】
      public void renewLock(String name, Long threadId, String lockName) {
          reference.compareAndSet(null, new LockTask(internalLockLeaseTime, executor, batchSize));
          LockTask task = reference.get();
          task.add(name, lockName, threadId);
      }
      // ...
      // 到這里
      // RenewalTask.java
      final void add(String rawName, String lockName, long threadId, LockEntry entry) {
          addSlotName(rawName);
          LockEntry oldEntry = name2entry.putIfAbsent(rawName, entry);
          if (oldEntry != null) {
              oldEntry.addThreadId(threadId, lockName);
          } else {
              if (tryRun()) {
                  schedule(); // 這里
              }
          }
      }
      public void schedule() {
          if (!running.get()) {
              return;
          }
          long internalLockLeaseTime = executor.getServiceManager().getCfg().getLockWatchdogTimeout();
          //定時任務 是 lockWatchdogTimeout 的1/3時間去執行 renewExpirationAsync【默認就是10秒嘛】
          executor.getServiceManager().newTimeout(this, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
      }
      // ServiceManager.java
      public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
          try {
              return timer.newTimeout(task, delay, unit);
          } catch (IllegalStateException e) {
              ....
          }
      }
      // 然后我們發現,來到netty包下面了 io.netty.util
      // public class HashedWheelTimer implements Timer {....}
      @Override
      public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
          // delay參數是internalLockLeaseTime / 3
      }
      

      我發現這個Netty時間輪算法,我還不會,在這里就不繼續深究了。見后續文章吧。

      上面是tryLock無參數的,下面來看tryLock帶時間參數的,方法有一點長,這里僅給出關鍵部分:

      // 【RedissonLock.java】
      @Override
      public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
          // 階段1:初始化計時
          long time = unit.toMillis(waitTime);  // 總等待時間(ms)
          long current = System.currentTimeMillis();  // 起始時間戳
          long threadId = Thread.currentThread().getId();  // 線程唯一ID
          // 階段2:首次嘗試獲取鎖
          // 最終會走到上面的見過的方法 tryAcquireOnceAsync -- 執行Lua腳本
          Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
          if (ttl == null) {  // 加鎖成功
              return true;
          }
          // 階段3:檢查剩余等待時間
          time -= System.currentTimeMillis() - current;  // 扣除已用時間
          if (time <= 0) {  // 已超時
              acquireFailed(waitTime, unit, threadId);  // 記錄失敗指標
              return false;
          }
          // 階段4:訂閱鎖釋放事件
          // 通過 Redis Pub/Sub 監聽鎖釋放事件(通道名:redisson_lock__channel:{lockName}
          current = System.currentTimeMillis();
          CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
          try {
              subscribeFuture.get(time, TimeUnit.MILLISECONDS);
          } catch (TimeoutException e) {
              // 訂閱超時處理(關鍵點2)
              if (!subscribeFuture.completeExceptionally(new RedisTimeoutException(...))) {
                  subscribeFuture.whenComplete((res, ex) -> {
                      if (ex == null) unsubscribe(res, threadId);  // 異步取消訂閱
                  });
              }
              // 若訂閱操作超時(TimeoutException),取消訂閱并返回失敗
              acquireFailed(waitTime, unit, threadId);
              return false;
          } catch (ExecutionException e) {
              // 訂閱異常處理
              acquireFailed(waitTime, unit, threadId);
              return false;
          }
          try {
              // 階段5:二次時間檢查
              time -= System.currentTimeMillis() - current;
              if (time <= 0) {
                  acquireFailed(waitTime, unit, threadId);
                  return false;
              }
              // 階段6:循環嘗試獲取鎖(核心邏輯)
              while (true) {
                  long currentTime = System.currentTimeMillis();
                  ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                  if (ttl == null) {  // 獲取成功
                      return true;
                  }
                  // 更新時間余額
                  time -= System.currentTimeMillis() - currentTime;
                  if (time <= 0) {  // 總等待超時
                      acquireFailed(waitTime, unit, threadId);
                      return false;
                  }
                  // 階段7:精準阻塞等待(設計精髓)
                  currentTime = System.currentTimeMillis();
                  if (ttl >= 0 && ttl < time) {  // 鎖將先于等待時間過期
                      getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                  } else {  // 等待時間先結束
                      getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                  }
                  // 最終時間檢查
                  time -= System.currentTimeMillis() - currentTime;
                  if (time <= 0) {
                      acquireFailed(waitTime, unit, threadId);
                      return false;
                  }
              }
          } finally {
              // 階段8:資源清理
              unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
          }
      }
      

      接下來看一下釋放鎖的:redissonClient.getLock(lockName).unlock();

      // 【RedissonBaseLock.java】
      @Override
      public void unlock() {
          try {
              // 同理哦
              // 看這個unlockAsync(Thread.currentThread().getId())
              get(unlockAsync(Thread.currentThread().getId()));
          } ...
      }
      @Override
      public RFuture<Void> unlockAsync(long threadId) {
          // 通過上面的加鎖源碼分析,這一步也很能理解了
          // () -> unlockAsync0(threadId)
          return getServiceManager().execute(() -> unlockAsync0(threadId));
      }
      
      private RFuture<Void> unlockAsync0(long threadId) {
          // 這個
          CompletionStage<Boolean> future = unlockInnerAsync(threadId);
          CompletionStage<Void> f = future.handle((res, e) -> {
              cancelExpirationRenewal(threadId, res);
              if (e != null) {
                  // 拋異常
              }
              if (res == null) {
                  。。。。
              }
              return null;
          });
      
          return new CompletableFutureWrapper<>(f);
      }
      // 然后繞進去,最后會看到這里
      protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {
          return evalWriteSyncedNoRetryAsync(
              getRawName(),  // 鎖名稱(Redis Key)
              LongCodec.INSTANCE, // 編解碼器
              RedisCommands.EVAL_BOOLEAN, // 期望返回布爾值
              // 以下是關鍵 Lua 腳本
               "local val = redis.call('get', KEYS[3]); " +
               "if val ~= false then " +
               	  "return tonumber(val);" +
               "end; " +
               "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
               	  "return nil;" +
               "end; " +
               "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 減少重入計數
               "if (counter > 0) then " + // 重入計數 > 0
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " + // 更新鎖過期時間
                    "redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +
                    "return 0; " +
               "else " + // 重入計數 = 0
                   "redis.call('del', KEYS[1]); " +  // 刪除鎖
                   "redis.call(ARGV[4], KEYS[2], ARGV[1]); " +  // 發布解鎖消息
                   "redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +
                   "return 1; " +
               "end; ",
               // KEYS 參數列表
               // KEYS[1] = 鎖名稱(如 myLock) 
               // KEYS[2] = 發布訂閱頻道(如 redisson_lock__channel:myLock)
               // KEYS[3] = key(如 redisson_unlock_latch:{UUID})
               Arrays.asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),
               LockPubSub.UNLOCK_MESSAGE,  // ARGV[1] = 解鎖消息(固定為0)
               internalLockLeaseTime,// ARGV[2] = 鎖租約時間(毫秒)
               getLockName(threadId), // ARGV[3] = 鎖持有者ID(UUID:threadId)
               getSubscribeService().getPublishCommand(),  // ARGV[4] = 發布命令(PUBLISH)
               timeout);// ARGV[5] = 超時時間
      }
      

      3. 補充點

      3.1 redis事務

      上文說到了Redis的事務問題,本章探討一下Redis的事務:

      事務本質是一組命令的集合,支持一次執行多個命令,一個事務中所有命令都會被序列化。在redis事務的執行過程,會按照順序串行化執行隊列中的命令,其他客戶端提交的命令請求不會插入到事務執行命令序列中。

      redis的事務指令有3個關鍵字,分別是:

      1. multi:開啟事務
      2. exec:執行事務
      3. discard:取消事務
      4. watch:監視Key改變,用于實現樂觀鎖。如果監視的Key的值改變,事務最終會執行失敗。在事務開啟前使用
      5. unwatch:放棄監視。

      通過multi,當前客戶端就會開啟事務,后續用戶鍵入的都指令都會保證到隊列中暫不執行,當用戶鍵入exec后,這些指令都會按順序執行。 需要注意的是,若開啟multi后輸入若干指令,客戶端輸入discard,則之前的指令通通取消執行。

      總結說:Redis事務就是一次性、順序性、排他性的執行一個隊列中的一系列命令

      127.0.0.1:6379[2]> keys *
      (empty array)
      127.0.0.1:6379[2]> MULTI # 開啟事務
      OK
      127.0.0.1:6379[2](TX)> set test1 1
      QUEUED
      127.0.0.1:6379[2](TX)> set test2 2qwe
      QUEUED
      127.0.0.1:6379[2](TX)> EXEC # 執行
      1) OK
      2) OK
      127.0.0.1:6379[2]> keys *
      1) "test2"
      2) "test1"
      

      那么Redis事務滿足我們熟悉的事務四大特性嗎?

      對于隔離性來說,Redis是單線程執行命令的,并且執行事務時是對事務隊列中的命令依次執行,因此Redis不會出現隔離性問題。

      持久性那就不必多說了。

      重點看一下原子性:

      # 情況一:語法錯誤【編譯器就可以檢查到】,可以看到還是可以保證原子性的
      127.0.0.1:6379[2]> keys *
      (empty array)
      127.0.0.1:6379[2]> multi
      OK
      127.0.0.1:6379[2](TX)> set test1 1
      QUEUED
      127.0.0.1:6379[2](TX)> seeqw test2 2
      (error) ERR unknown command 'seeqw', with args beginning with: 'test2' '2' 
      127.0.0.1:6379[2](TX)> exec
      (error) EXECABORT Transaction discarded because of previous errors.
      127.0.0.1:6379[2]> keys *
      (empty array)
      
      # 情況二:運行時錯誤
      # 這種情況并沒有保證原子性
      # 除了運行出錯的命令,其他命令都會執行喔
      127.0.0.1:6379[2]> keys *
      (empty array)
      127.0.0.1:6379[2]> MULTI
      OK
      127.0.0.1:6379[2](TX)> set test1 1
      QUEUED
      127.0.0.1:6379[2](TX)> set test2 2qwe # 不是純數字
      QUEUED
      127.0.0.1:6379[2](TX)> incr test2 # 這一步肯定是出錯
      QUEUED
      127.0.0.1:6379[2](TX)> incr test1
      QUEUED
      127.0.0.1:6379[2](TX)> EXEC # 執行事務
      1) OK
      2) OK
      3) (error) ERR value is not an integer or out of range
      4) (integer) 2
      127.0.0.1:6379[2]> keys * # 發現有倆key
      1) "test2"
      2) "test1"
      127.0.0.1:6379[2]> get test1 # 對test1的自增成功了
      "2"
      

      當命令輸入錯誤會在執行時直接報錯,這種情況下能夠滿足原子性

      當運行時出現錯誤時,會執行到具體命令時才報錯,這種情況下除了報錯的命令不執行,事務中其他正常的命令會執行,不能滿足原子性

      為什么這么做?

      • 使用Redis命令語法錯誤,或是將命令運用在錯誤的數據類型鍵上(如對字符串進行加減乘除等),從而導致業務數據有問題,這種情況認為是編程導致的錯誤,應該在開發過程中解決,避免在生產環境中發生;
      • 由于不用支持回滾功能,Redis內部簡單化,而且還比較快;

      多數事務失敗是由語法錯誤或者數據結構類型錯誤導致的,語法錯誤說明在命令入隊前就進行檢測的,而類型錯誤是在執行時檢測的,Redis為提升性能而采用這種簡單的事務,這是不同于關系型數據庫的,特別要注意區分。Redis之所以保持這樣簡易的事務,完全是為了保證高并發下的核心問題——性能

      接下來看一下watch命令:【最前面的數字,表示命令執行順序】

      客戶端一:

      【6】127.0.0.1:6379[2]> keys *
      1) "test1"
      【7】127.0.0.1:6379[2]> get test1
      "100"
      【8】127.0.0.1:6379[2]> set test1 150
      OK
      

      客戶端二:

      【1】127.0.0.1:6379[2]> keys *
      (empty array)
      【2】127.0.0.1:6379[2]> set test1 100
      OK
      【3】127.0.0.1:6379[2]> watch test1
      OK
      【4】127.0.0.1:6379[2]> multi
      OK
      【5】127.0.0.1:6379[2](TX)> set test1 1000
      QUEUEDime-seconds|PXAT unix-time-milliseconds|KEEPTTL]
      【9】127.0.0.1:6379[2](TX)> exec
      (nil) # 在執行前被其他客戶端修改了
      【10】127.0.0.1:6379[2]> get test1
      "150"
      

      上面通過watch監視指定Redis Key ( test1 ),如果在事務執行之前沒有改變,就執行成功,如果發現對應值發生改變,事務就會執行失敗

      看到這里,在2.3節可以用事務嗎?我認為在邏輯不出錯的情況下,是可以用的,需要我們編程人員編碼的時候,來確認每一條命令都確保邏輯上都是正確的。

      但是我們能用Lua腳本解決的原子性問題,優先用Lua

      3.2 tryLock 和 lock

      上面源碼分析的都是tryLock的,其實還有lock方法,那么這二者有什么區別呢?

      (1)返回值: lock() 是沒有返回值的;tryLock() 的返回值是 boolean。

      (2)時機:lock() 一直等鎖釋放;tryLock() 獲取到鎖返回true,獲取不到鎖并直接返回false

      (3)tryLock() 是可以被打斷的,被中斷的;lock是不可以。

      private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
      	// 獲取當前線程 ID
          long threadId = Thread.currentThread().getId();
          // 獲取鎖,正常獲取鎖則ttl為null,競爭鎖時返回鎖的過期時間
          Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
          if (ttl == null) {
              return;
          }
          // 訂閱鎖釋放事件
          // 如果當前線程通過 Redis 的 channel 訂閱鎖的釋放事件獲取得知已經被釋放,則會發消息通知待等待的線程進行競爭
          CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
          pubSub.timeout(future);
          RedissonLockEntry entry;
          if (interruptibly) {
              entry = commandExecutor.getInterrupted(future);
          } else {
              entry = commandExecutor.get(future);
          }
          try {
              while (true) {
                  // 循環重試獲取鎖,直至重新獲取鎖成功才跳出循環
                  // 此種做法阻塞進程,一直處于等待鎖手動釋放或者超時才繼續線程    
                  ttl = tryAcquire(-1, leaseTime, unit, threadId);
                  if (ttl == null) {
                      break;
                  }
                  ....
              }
          } finally {
              // 最后釋放訂閱事件
              unsubscribe(future, threadId);
          }
      }
      

      實際二者使用

      RLock lock = redisson.getLock("myLock");
      boolean isLocked = lock.tryLock(); // 不會阻塞
      if (isLocked) { // 可以根據返回結果作不同的操作
          try {
              ...邏輯
          } finally {
              lock.unlock();
          }
      } else {
          獲取鎖失敗
      }
      
      RLock lock = redisson.getLock("myLock");
      lock.lock(); // 如果獲取不到鎖,在這里阻塞住了
      try {
          ....do
      } finally {
          lock.unlock();
      }
      
      

      end. 參考

      1. https://mp.weixin.qq.com/s/nrCO8GZBJrLQis98bMaRhg
      2. https://mp.weixin.qq.com/s/UzMTAqVy5MXxmV9rXdgyPg
      3. http://www.rzrgm.cn/jackson0714/p/redisson.html
      4. https://mp.weixin.qq.com/s/qVPT-e-gOXsqQ3pMVIEK7A 【Redis事務】
      5. https://segmentfault.com/a/1190000044686369 【思否- redis事務
      6. https://blog.csdn.net/jiayi_yao/article/details/124689937 【Redis事務】
      posted @ 2025-06-22 14:39  別來無恙?  閱讀(76)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 人妻无码| 亚洲精品国产精品乱码不卡| 男人狂桶女人高潮嗷嗷| 日本精品极品视频在线| 乱码精品一区二区三区| 国产精品自在拍首页视频8| 亚洲色最新高清AV网站| 人妻日韩人妻中文字幕| 欧美级特黄aaaaaa片| 最新国产精品亚洲| 99在线精品免费视频| 欧美日本一区二区视频在线观看| 国产精品香港三级国产av| 92精品国产自产在线观看481页| av天堂午夜精品一区| 中文字幕人妻中文AV不卡专区| 亚洲永久精品日韩成人av| 国产女人看国产在线女人| 奇米四色7777中文字幕| 免费一本色道久久一区| 美女内射福利大全在线看| 亚洲中文字幕综合网在线| 亚洲毛片不卡AV在线播放一区| 两个人日本www免费版| 国产免费午夜福利在线播放| 在线天堂最新版资源| 精品成在人线av无码免费看| 99精品国产高清一区二区麻豆| 欧美大胆老熟妇乱子伦视频| 久久人人爽人人爽人人av| 农村欧美丰满熟妇xxxx| 欧美奶涨边摸边做爰视频| 亚洲国产天堂久久综合226114| 国产亚洲视频在线播放香蕉| 中文字幕第一页国产| 亚洲精品国产一二三区| 亚洲色大成网站www永久一区 | 少妇人妻偷人免费观看| 亚洲国产成人va在线观看天堂 | 九九在线精品国产| 澜沧|