<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      如何實現一個分布式鎖

      如何實現一個分布式鎖

      本篇內容主要介紹如何使用 Java 語言實現一個注解式的分布式鎖,主要是通過注解+AOP 環繞通知來實現。

      1. 鎖注解

      我們首先寫一個鎖的注解

      /**
       * 分布式鎖注解
       */
      @Retention(RetentionPolicy.RUNTIME)
      @Target({ElementType.METHOD})
      @Documented
      public @interface RedisLock {
      
      	long DEFAULT_TIMEOUT_FOR_LOCK = 5L;
      	long DEFAULT_EXPIRE_TIME = 60L;
      
      	String key() default "your-biz-key";
      
      	long expiredTime() default DEFAULT_EXPIRE_TIME;
      
      	long timeoutForLock() default DEFAULT_TIMEOUT_FOR_LOCK;
      
      }
      

      expiredTime 是設置鎖的過期時間,timeoutForLock 是設置等待鎖的超時時間。如果沒有等待獲得鎖的超時時間這個功能,那么其他線程在獲取鎖失敗時只能直接失敗,無法進行排隊等待。

      我們如何使用這個注解呢,很容易,在需要加鎖的業務方法上直接用就行.如下,我們有一個庫存服務類,它有一個扣減庫存方法,該方法將數據庫中的一個庫存商品的數量減一。在并發場景下,如果我們沒有對其進行資源控制,必然會發生庫存扣減不一致現象。

      public class StockServiceImpl {
      	@RedisLock(key = "stock-lock", expiredTime = 10L, timeoutForLock = 5L)
      	public void deduct(Long stockId) {
      		Stock stock = this.getById(1L);
      		Integer count = stock.getCount();
      		stock.setCount(count - 1);
      		this.updateById(stock);
      	}
      }
      

      2. 在 AOP 切面中進行加鎖處理

      我們需要使用 AOP 來處理什么?自然是處理使用@RedisLock的方法,因此我們寫一個切點表達式,它匹配所有標有 @RedisLock 注解的方法。
      接著,我們將此切點表達式與 @Around 注解結合使用,以創建環繞通知,在目標方法執行前后執行我們的加鎖解鎖邏輯。

      因此,基本的邏輯我們就理清了,代碼大致長下面這個樣子:

      public class RedisLockAspect {
      
      	private final RedisTemplate<String, Object> redisTemplate;
      
      	// 鎖的redis key前綴
      	private static final String DEFAULT_KEY_PREFIX = "lock:";
      
      	// 匹配所有標有 @RedisLock 注解的方法
      	@Pointcut("@annotation(com.kelton.lock.annotation.RedisLock)")
      	public void lockAnno() {
      	}
      
      
      	@Around("lockAnno()")
      	public void invoke(ProceedingJoinPoint joinPoint) throws Exception {
      		// 獲取攔截方法上的RedisLock注解
      		RedisLock annotation = getLockAnnotationOnMethod(joinPoint);
      		// 獲取鎖key
      		String key = getKey(annotation);
      		// 鎖過期時間
      		long expireTime = annotation.expiredTime();
      		// 獲取鎖的等待時間
      		long timeoutForLock = annotation.timeoutForLock();
      		// 在這里加鎖
      		someCodeForLock...
      		// 執行業務
      		joinPoint.proceed();
      		// 在這里解鎖
      		someCodeForUnLock...
      	}
      

      我們在加鎖的時候,需要用上 timeoutForLock 這個屬性,我們通過自旋加線程休眠的方式,來達到在一段時間內等待獲取鎖的目的。如果自旋時間結束后,還沒獲取鎖,則拋出異常,這里可以根據自己情況而定。自旋加鎖代碼如下:

          // 自旋獲取鎖
          long endTime = System.currentTimeMillis() + timeoutForLock * 1000;
          boolean acquired = false;
          String uuid = UUID.randomUUID().toString();
          while(System.currentTimeMillis() < endTime) {
              Boolean absent = redisTemplate.opsForValue()
                      .setIfAbsent(key, uuid, expireTime, TimeUnit.SECONDS);
      
              if (Boolean.TRUE.equals(absent)) {
                  acquired = true;
                  break;
              } else {
                  // 獲取不到鎖,嘗試休眠100毫秒后重試
                  Thread.sleep(100);
              }
          }
          // 超時未獲取到鎖, 拋出異常,可根據自己業務而定
          if (!acquired) {
              throw new RuntimeException("獲取鎖異常");
          }
      

      我們發現上面加鎖的時候設置了一個 uuid 作為 value 值,這是為了在鎖釋放的時候,不誤刪其他線程上的鎖,隨后,我們就可以執行被 AOP 切中的方法,執行結束釋放鎖。代碼如下:

          try {
              // 執行業務
              joinPoint.proceed();
          } catch (Throwable e) {
              log.error("業務執行出錯!");
          } finally {
              // 解鎖時進行校驗,只刪除自己線程加的鎖
              String value = (String) redisTemplate.opsForValue().get(key);
              if (uuid.equals(value)) {
                  redisTemplate.delete(key);
              } else {
                  log.warn("鎖已過期!");
              }
          }
      

      到這里,我們就以注解+AOP 的方式實現了分布式鎖的功能。當然,以上只實現了分布式鎖的簡單功能,還缺少了分布式鎖的 key 自動續約防止鎖過期功能,以及鎖重入功能。

      目前,RedisLockAspect的完整代碼如下:

      @Component
      @Aspect
      @Slf4j
      @AllArgsConstructor
      public class RedisLockAspect {
      
      	// 匹配所有標有 @RedisLock 注解的方法
      	@Pointcut("@annotation(com.kelton.lock.annotation.RedisLock)")
      	public void lockAnno() {
      	}
      
      
      	@Around("lockAnno()")
      	public void invoke(ProceedingJoinPoint joinPoint) throws Exception {
      		// 獲取攔截方法上的RedisLock注解
      		RedisLock annotation = getLockAnnotationOnMethod(joinPoint);
      
      		String key = getKey(annotation);
      		// 鎖過期時間
      		long expireTime = annotation.expiredTime();
      		// 獲取鎖的等待時間
      		long timeoutForLock = annotation.timeoutForLock();
      		// 自旋獲取鎖
      		long endTime = System.currentTimeMillis() + timeoutForLock * 1000;
      		boolean acquired = false;
      		String uuid = UUID.randomUUID().toString();
      		while(System.currentTimeMillis() < endTime) {
      			Boolean absent = redisTemplate.opsForValue()
      					.setIfAbsent(key, uuid, expireTime, TimeUnit.SECONDS);
      
      			if (Boolean.TRUE.equals(absent)) {
      				acquired = true;
      				break;
      			} else {
      				// 獲取不到鎖,嘗試休眠100毫秒后重試
      				Thread.sleep(100);
      			}
      		}
      		// 超時未獲取到鎖, 拋出異常,可根據自己業務而定
      		if (!acquired) {
      			throw new RuntimeException("獲取鎖異常");
      		}
      		try {
      			// 執行業務
      			joinPoint.proceed();
      		} catch (Throwable e) {
      			log.error("業務執行出錯!");
      		} finally {
      			// 解鎖時進行校驗,只刪除自己線程加的鎖
      			String value = (String) redisTemplate.opsForValue().get(key);
      			if (uuid.equals(value)) {
      				redisTemplate.delete(key);
      			} else {
      				log.warn("鎖已過期!");
      			}
      		}
      	}
      
      	private String getKey(RedisLock redisLock) {
      		if (Objects.isNull(redisLock)) {
      			return DEFAULT_KEY_PREFIX + "default";
      		}
      		return DEFAULT_KEY_PREFIX + redisLock.key();
      	}
      
      	private RedisLock getLockAnnotationOnMethod(ProceedingJoinPoint joinPoint) {
      		MethodSignature signature = (MethodSignature) joinPoint.getSignature();
      		Method method = signature.getMethod();
      		return method.getAnnotation(RedisLock.class);
      	}
      
      }
      

      3. key 自動續約防止鎖過期

      我們接著完善該分布式鎖,為其添加 key 自動續約防止鎖過期的功能。我們的思路與Redission的watch dog類似,開啟一個后臺線程,來定時檢查需要續約的鎖。我們如何判斷一個鎖是否需要續約呢,我們可以簡單定義一個續約分界線,比如在鎖過期時間的三分之二的時間點及之后,對鎖進行續約。

      3.1 定義一個續約任務4

      我們來定義一個鎖續約任務,那我們需要什么信息呢?
      我們至少需要鎖的 key,鎖要設置的過期時間。這是兩個最基本的信息。
      要判斷在鎖過期時間的三分之二的時間點及之后進行續約,那么我們還需要記錄鎖上次續約的時間點。
      此外,我們還可以為鎖續約任務添加最大續約次數限制,這可以避免某些執行時間特別久的任務不斷占用鎖。所以我們還需要記錄當前鎖續約次數和最大續約次數。
      對超過最大續約次數的鎖的線程,我們直接將其停止,因此我們也記錄一下該鎖的線程。
      結合上面的分析,我們定義的鎖續約任務類如下:

      public class LockRenewTask {
      
      	/**
      	 * key
      	 */
      	private final String key;
      	/**
      	 * 過期時間。單位:秒
      	 */
      	private final long expiredTime;
      	/**
      	 * 鎖的最大續約次數
      	 */
      	private final int maxRenewCount;
      	/**
      	 * 鎖的當前續約次數
      	 */
      	private int currentRenewCount;
      	/**
      	 * 最新更新時間
      	 */
      	private LocalDateTime latestRenewTime;
      	/**
      	 * 業務線程
      	 */
      	private final Thread thread;
      
      	public LockRenewTask(String key, long expiredTime, int maxRenewCount, Thread thread) {
      		this.key = key;
      		this.expiredTime = expiredTime;
      		this.maxRenewCount = maxRenewCount;
      		this.thread = thread;
      		this.latestRenewTime = LocalDateTime.now();
      	}
      	/**
      	 * 是否到達續約時間
      	 * @return
      	 */
      	public boolean isTimeToRenew() {
      		LocalDateTime now = LocalDateTime.now();
      		Duration duration = Duration.between(latestRenewTime, now);
      
      		return duration.toSeconds() >= ((double)(this.expiredTime / 3) * 2);
      	}
      	/**
      	 * 是否達到最大續約次數
      	 * @return
      	 */
      	public boolean exceedMaxRenewCount() {
      		return this.currentRenewCount >= this.maxRenewCount;
      	}
      	public synchronized void renew() {
      		this.currentRenewCount++;
      		this.latestRenewTime = LocalDateTime.now();
      	}
      	// 取消業務方法
      	public void cancel() {
      		thread.interrupt();
      	}
      	public String getKey() {
      		return key;
      	}
      	public long getExpiredTime() {
      		return expiredTime;
      	}
      }
      

      我們添??了一些關于鎖續約的方法:

      • isTimeToRenew(): 判斷是否可以對鎖進行續約
      • exceedMaxRenewCount(): 判斷是否達到最大續約次數
      • renew(): 來標記一次續約操作
      • cancel(): 取消業務方法

      3.2 定義一個鎖續約任務處理器

      接著,我們定義一個定時執行該續約任務的 handler。該 handler 也比較簡答,核心邏輯是持有一個類型為 List<LockRenewTask>taskList 來添加續約任務,且使用一個 ScheduledExecutorService 來定時遍歷該 taskList 來執行續約任務。該 handler 再對外暴露一個 addRenewTask 方法,方便外部調用來添加續約任務到 taskList 中。

      @Slf4j
      @Component
      public class LockRenewHandler {
      
      	@Autowired
      	private RedisTemplate<String, Object> redisTemplate;
      
      	/**
      	 * 保障對 taskList的添加刪除操作是線程安全的
      	 */
      	private final ReentrantLock taskListLock = new ReentrantLock();
      
      	private final List<LockRenewTask> taskList = new ArrayList<>();
      
      	private final ScheduledExecutorService taskExecutorService;
      
      	{
      		taskExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
      		taskExecutorService.scheduleAtFixedRate(() -> {
      			try {
      				executeRenewTask();
      			} catch (Exception e) {
      				//錯誤處理
      			}
      		}, 1, 2, TimeUnit.SECONDS);
      
      	}
      	/**
      	 * 添加續約任務
      	 */
      	public void addRenewTask(LockRenewTask task) {
      		taskListLock.lock();
      		try {
      			taskList.add(task);
      		} finally {
      			taskListLock.unlock();
      		}
      	}
      	/**
      	 * 執行續約任務
      	 */
      	private void executeRenewTask() {
      		log.info("開始執行續約任務");
      		if (CollectionUtils.isEmpty(taskList)) {
      			return;
      		}
      		// 需要刪除的任務,暫存這個集合中  取消
      		List<LockRenewTask> cancelTask = new ArrayList<>();
      		// 獲取任務副本
      		List<LockRenewTask> copyTaskList = new ArrayList<>(taskList);
      		for (LockRenewTask task : copyTaskList) {
      			try {
      				// 判斷 Redis 中是否存在 key
      				if (!redisTemplate.hasKey(task.getKey())) {
      					cancelTask.add(task);
      					continue;
      				}
      				// 大于等于最大續約次數
      				if (task.exceedMaxRenewCount()) {
      					// 停止續約任務
      					task.cancel();
      					cancelTask.add(task);
      					continue;
      				}
      				// 到達續約時間
      				if (task.isTimeToRenew()) {
      					log.info("續約任務:{}", task.getKey());
      					redisTemplate.expire(task.getKey(), task.getExpiredTime(), TimeUnit.SECONDS);
      					task.renew();
      				}
      			} catch (Exception e) {
      				//錯誤處理
      				log.error("處理任務出錯:{}", task);
      			}
      		}
      		// 加鎖,刪除 taskList 中需要移除的任務
      		taskListLock.lock();
      		try {
      			taskList.removeAll(cancelTask);
      			// 清理cancelTask,避免堆積,產生內存泄露
      			cancelTask.clear();
      		} finally {
      			taskListLock.unlock();
      		}
      	}
      }
      

      總結一下 LockRenewHandler的主要作用:它負責管理和執行續約任務,以延長 Redis 中鍵的過期時間。

      • 添加續約任務:addRenewTask() 方法允許添加新的續約任務到內部列表 taskList 中。
      • 執行續約任務:executeRenewTask() 方法定期執行續約任務。它檢查每個任務的狀態,并根據需要續約 Redis 中的鍵。
      • 移除完成的任務:維護一個 cancelTask 列表,用于存儲需要從 taskList 中移除的任務。在 executeRenewTask() 方法中,它會將完成的任務添加到 cancelTask 列表中,并在之后將其從 taskList 中移除。

      大概的工作流程如下:

      • 續約任務被添加到 taskList 中。

      • executeRenewTask() 方法定期執行,它檢查每個任務的狀態:

        • 如果 Redis 中不再存在該鍵,則取消任務。
        • 如果任務的續約次數達到上限,則取消任務。
        • 如果是時候續約了,則續約 Redis 中的鍵并更新任務的續約次數,記錄續約時間點。
      • 完成的任務被添加到 cancelTask 列表中。

      • executeRenewTask() 方法獲取 taskList 的副本,并從副本中移除 cancelTask 中的任務,并且在完成移除任務操作后清空cancelTask

      • 更新后的 taskList 被保存回類中。

      兩個需要注意的點

      • 我們遍歷taskList時拷貝了一份副本進行遍歷,因為taskList是可變的,這樣可以避免在遍歷的時候產生并發修改問題。
      • cancelTask需要清理,避免產生內存泄漏。

      通過這種方式,LockRenewHandler 可以確保 Redis 中的鍵在需要時得到續約,并自動移除完成或失敗的任務。

      3.3 添加鎖續約任務

      在上面 3.1 節和 3.2 節我們定義好了鎖續約任務和處理鎖續約任務的核心代碼,接下來我們需要在第 2 節加鎖解鎖的 AOP 處理邏輯上進行一點小小的修改,主要就是在執行加鎖之后,執行業務代碼之前,添加上鎖續約任務。修改位置如下:

      public void invoke(ProceedingJoinPoint joinPoint) throws Exception {
          ... // 省略代碼
          try {
              // 添加鎖續約任務
              LockRenewTask task = new LockRenewTask(key, annotation.expiredTime(), annotation.maxRenew(), Thread.currentThread());
              lockRenewHandler.addRenewTask(task);
              log.info("添加續約任務, key:{}", key);
              // 執行業務
              joinPoint.proceed();
          } catch (Throwable e) {
              log.error("業務執行出錯!");
          } finally {
              // 解鎖時進行校驗,只刪除自己線程加的鎖
              String value = (String) redisTemplate.opsForValue().get(key);
              if (uuid.equals(value)) {
                  redisTemplate.delete(key);
              } else {
                  log.warn("鎖已過期!");
              }
          }
          ... // 省略代碼
      }
      

      到這里,我們的分布式鎖已經相當完善了,把鎖自動續約的功能也加上了。當然,還沒有實現鎖的可重入性。

      posted @ 2024-07-13 21:57  二價亞鐵  閱讀(852)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 日本一区二区三区免费播放视频站| 国产亚洲AV电影院之毛片| 精品国产乱码久久久久乱码| 九九热精品免费视频| 成人av一区二区亚洲精| 精品国产中文字幕av| 亚洲 校园 欧美 国产 另类| 喀什市| 色噜噜狠狠成人综合| 99精品久久免费精品久久| 九九综合va免费看| 国产va免费精品观看精品| 美女午夜福利视频一区二区| 精品一区二区三区四区五区| 国产mv在线天堂mv免费观看| 国产亚洲一区二区三区四区| 女人腿张开让男人桶爽| 国产女人18毛片水真多1| 不卡一区二区三区视频播放 | 久久精品一本到99热免费| 中文字幕有码高清日韩| 久久精品国产最新地址| 日韩中文免费一区二区| 国产成人综合色视频精品| 欧美成人精品手机在线| 男女做aj视频免费的网站| 国产精品男女午夜福利片| 五月婷久久麻豆国产| 国产午夜精品福利91| 久久夜色精品亚洲国产av| 99久久国产宗和精品1上映| 亚洲精品麻豆一区二区| 国产黄色一区二区三区四区 | 日本一区不卡高清更新二区| 永定县| 国产成人综合在线观看不卡| 亚洲欧美人成人让影院| 日本特黄特黄刺激大片| 国产精品黄色精品黄色大片| 最新国产麻豆AⅤ精品无码| 精品国产一区二区三区av性色|