<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Semaphore

      Semaphore介紹

      Semaphore,俗稱信號量,它是操作系統中PV操作的原語在java的實現,它也是基于AbstractQueuedSynchronizer實現的。

      Semaphore的功能非常強大,大小為1的信號量就類似于互斥鎖,通過同時只能有一個線程獲取信號量實現。大小為n(n>0)的信號量可以實現限流的功能,它可以實現只能有n個線程同時獲取信號量。

      ?
      image

      PV操作是操作系統一種實現進程互斥與同步的有效方法。PV操作與信號量(S)的處理相關,P表示通過的意思,V表示釋放的意思。用PV操作來管理共享資源時,首先要確保PV操作自身執行的正確性。
      
      P操作的主要動作是:
      
      ①S減1;
      
      ②若S減1后仍大于或等于0,則進程繼續執行;
      
      ③若S減1后小于0,則該進程被阻塞后放入等待該信號量的等待隊列中,然后轉進程調度。
      
      V操作的主要動作是:
      
      ①S加1; 
      
      ②若相加后結果大于0,則進程繼續執行;
      
      ③若相加后結果小于或等于0,則從該信號的等待隊列中釋放一個等待進程,然后再返回原進程繼續執行或轉進程調度。
      

      Semaphore 常用方法

      構造器

      默認為非公平鎖

      public Semaphore(int permits) {
          sync = new NonfairSync(permits);
      }
      
      
      public Semaphore(int permits, boolean fair) {
          sync = fair ? new FairSync(permits) : new NonfairSync(permits);
      }
      
      • permits 表示許可證的數量(資源數)
      • fair 表示公平性,如果這個設為 true 的話,下次執行的線程會是等待最久的線程

      常用方法

      public void acquire() throws InterruptedException
      public boolean tryAcquire()
      public void release()
      public int availablePermits()
      public final int getQueueLength() 
      public final boolean hasQueuedThreads()
      protected void reducePermits(int reduction)
      
      • acquire() 表示阻塞并獲取許可
      • tryAcquire() 方法在沒有許可的情況下會立即返回 false,要獲取許可的線程不會阻塞
      • release() 表示釋放許可
      • int availablePermits():返回此信號量中當前可用的許可證數。
      • int getQueueLength():返回正在等待獲取許可證的線程數。
      • boolean hasQueuedThreads():是否有線程正在等待獲取許可證。
      • void reducePermit(int reduction):減少 reduction 個許可證
      • Collection getQueuedThreads():返回所有等待獲取許可證的線程集合

      應用場景

      可以用于做流量控制,特別是公用資源有限的應用場景

      限流

      1

      /**
       * Semaphore是一個計數信號量,Semaphore經常用于限制獲取資源的線程數量
       *
       */
      public class SemaphoreTest {
      
          public static void main(String[] args) {
              // 聲明3個窗口  state:  資源數
              Semaphore windows = new Semaphore(3);
      
              for (int i = 0; i < 5; i++) {
                  new Thread(new Runnable() {
                      @Override
                      public void run() {
                          try {
                              // 占用窗口    加鎖
                              windows.acquire();
                              System.out.println(Thread.currentThread().getName() + ": 開始買票");
                              //模擬買票流程
                              Thread.sleep(5000);
                              System.out.println(Thread.currentThread().getName() + ": 購票成功");
      
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          } finally {
                              // 釋放窗口
                              windows.release();
                          }
                      }
                  }).start();
      
              }
          }
      }
      

      2

      import java.util.Date;
      import java.util.concurrent.LinkedBlockingDeque;
      import java.util.concurrent.Semaphore;
      import java.util.concurrent.ThreadPoolExecutor;
      import java.util.concurrent.TimeUnit;
      
      public class SemaphoneTest2 {
      
          /**
           * 實現一個同時只能處理5個請求的限流器
           */
          private static Semaphore semaphore = new Semaphore(5);
      
          /**
           * 定義一個線程池
           */
          private static ThreadPoolExecutor executor = new ThreadPoolExecutor
                  (10, 50, 60,
                          TimeUnit.SECONDS, new LinkedBlockingDeque<>(200));
      
          /**
           * 模擬執行方法
           */
          public static void exec() {
              try {
                  //占用1個資源
                  semaphore.acquire(1);
                  //TODO  模擬業務執行
                  System.out.println("執行exec方法");
                  Thread.sleep(2000);
              } catch (Exception e) {
                  e.printStackTrace();
              } finally {
                  //釋放一個資源
                  semaphore.release(1);
                  System.out.println("釋放一個資源"+new Date());
              }
          }
      
          public static void main(String[] args) throws InterruptedException {
              {
                  for (; ; ) {
                      Thread.sleep(100);
                      // 模擬請求以10個/s的速度
                      executor.execute(new Runnable() {
                          @Override
                          public void run() {
                              exec();
                          }
                      });
                  }
              }
          }
      }
      

      結果

      執行exec方法
      執行exec方法
      執行exec方法
      執行exec方法
      執行exec方法
      執行exec方法
      釋放一個資源Wed Jan 12 15:38:05 CST 2022
      執行exec方法
      釋放一個資源Wed Jan 12 15:38:05 CST 2022
      執行exec方法
      釋放一個資源Wed Jan 12 15:38:05 CST 2022
      執行exec方法
      釋放一個資源Wed Jan 12 15:38:05 CST 2022
      執行exec方法
      釋放一個資源Wed Jan 12 15:38:05 CST 2022
      執行exec方法
      釋放一個資源Wed Jan 12 15:38:07 CST 2022
      執行exec方法
      釋放一個資源Wed Jan 12 15:38:07 CST 2022
      執行exec方法
      釋放一個資源Wed Jan 12 15:38:07 CST 2022
      執行exec方法
      釋放一個資源Wed Jan 12 15:38:07 CST 2022
      執行exec方法
      釋放一個資源Wed Jan 12 15:38:07 CST 2022
      執行exec方法
      釋放一個資源Wed Jan 12 15:38:09 CST 2022
      執行exec方法
      釋放一個資源Wed Jan 12 15:38:09 CST 2022
      執行exec方法
      釋放一個資源Wed Jan 12 15:38:09 CST 2022
      執行exec方法
      釋放一個資源Wed Jan 12 15:38:09 CST 2022
      釋放一個資源Wed Jan 12 15:38:09 CST 2022
      執行exec方法
      ....
      

      Semaphore源碼分析

      關注點:

      1. Semaphore的加鎖解鎖(共享鎖)邏輯實現

      2. 線程競爭鎖失敗入隊阻塞邏輯和獲取鎖的線程釋放鎖喚醒阻塞線程競爭鎖的邏輯實現

      https://www.processon.com/view/link/61950f6e5653bb30803c5bd2

      ?
      image

      CountDownLatch

      CountDownLatch介紹

      CountDownLatch(閉鎖)是一個同步協助類,允許一個或多個線程等待,直到其他線程完成操作集。

      CountDownLatch使用給定的計數值(count)初始化。await方法會阻塞直到當前的計數值(count)由于countDown方法的調用達到0,count為0之后所有等待的線程都會被釋放,并且隨后對await方法的調用都會立即返回。這是一個一次性現象 —— count不會被重置。如果你需要一個重置count的版本,那么請考慮使用CyclicBarrier。

      ?
      image

      CountDownLatch的使用

      構造器

      public CountDownLatch(int count) {
          if (count < 0) throw new IllegalArgumentException("count < 0");
          this.sync = new Sync(count);
      }
      

      常用方法

       // 調用 await() 方法的線程會被掛起,它會等待直到 count 值為 0 才繼續執行
      public void await() throws InterruptedException { };  
      // 和 await() 類似,若等待 timeout 時長后,count 值還是沒有變為 0,不再等待,繼續執行
      public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  
      // 會將 count 減 1,直至為 0
       public void countDown() {
              sync.releaseShared(1);
          }
      

      CountDownLatch應用場景

      CountDownLatch一般用作多線程倒計時計數器,強制它們等待其他一組(CountDownLatch的初始化決定)任務執行完成。

      CountDownLatch的兩種使用場景:

      • 場景1:讓多個線程等待
      • 場景2:讓單個線程等待。

      場景1 讓多個線程等待:

      模擬并發,讓并發線程一起執行

      import java.util.Date;
      import java.util.concurrent.CountDownLatch;
      
      /**
       * 讓多個線程等待:模擬并發,讓并發線程一起執行
       */
      public class CountDownLatchTest {
          public static void main(String[] args) throws InterruptedException {
      
              CountDownLatch countDownLatch = new CountDownLatch(1);
              System.out.println(new Date());
              for (int i = 0; i < 5; i++) {
                  new Thread(() -> {
                      try {
      
                          //準備完畢……運動員都阻塞在這,等待號令
                          countDownLatch.await();
                          String parter = "【" + Thread.currentThread().getName() + "】";
                          System.out.println(parter + "開始執行……"+new Date());
                          System.out.println(new Date());
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  }).start();
              }
      
              Thread.sleep(2000);// 裁判準備發令
              countDownLatch.countDown();// 發令槍:執行發令
      
          }
      }
      

      結果

      Wed Jan 12 16:12:54 CST 2022
      【Thread-1】開始執行……Wed Jan 12 16:12:56 CST 2022
      【Thread-0】開始執行……Wed Jan 12 16:12:56 CST 2022
      【Thread-3】開始執行……Wed Jan 12 16:12:56 CST 2022
      【Thread-2】開始執行……Wed Jan 12 16:12:56 CST 2022
      Wed Jan 12 16:12:56 CST 2022
      【Thread-4】開始執行……Wed Jan 12 16:12:56 CST 2022
      Wed Jan 12 16:12:56 CST 2022
      Wed Jan 12 16:12:56 CST 2022
      Wed Jan 12 16:12:56 CST 2022
      Wed Jan 12 16:12:56 CST 2022
      

      場景2 讓單個線程等待:

      多個線程(任務)完成后,進行匯總合并

      很多時候,我們的并發任務,存在前后依賴關系;比如數據詳情頁需要同時調用多個接口獲取數據,并發請求獲取到數據后、需要進行結果合并;或者多個數據操作完成后,需要數據check;這其實都是:在多個線程(任務)完成后,進行匯總合并的場景。

      import java.util.concurrent.CountDownLatch;
      import java.util.concurrent.ThreadLocalRandom;
      
      /**
       * 讓單個線程等待:多個線程(任務)完成后,進行匯總合并
       */
      public class CountDownLatchTest2 {
          public static void main(String[] args) throws Exception {
      
              CountDownLatch countDownLatch = new CountDownLatch(5);
              for (int i = 0; i < 5; i++) {
                  final int index = i;
                  new Thread(() -> {
                      try {
                          Thread.sleep(1000 +
                                  ThreadLocalRandom.current().nextInt(1000));
                          System.out.println(Thread.currentThread().getName()
                                  + " finish task" + index);
      
                          countDownLatch.countDown();
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  }).start();
              }
      
              // 主線程在阻塞,當計數器==0,就喚醒主線程往下執行。
              countDownLatch.await();
              System.out.println("主線程:在所有任務運行完成后,進行結果匯總");
      
          }
      }
      

      結果

      Thread-2 finish task2
      Thread-0 finish task0
      Thread-1 finish task1
      Thread-3 finish task3
      Thread-4 finish task4
      主線程:在所有任務運行完成后,進行結果匯總
      

      CountDownLatch實現原理

      底層基于 AbstractQueuedSynchronizer 實現,CountDownLatch 構造函數中指定的count直接賦給AQS的state;每次countDown()則都是release(1)減1,最后減到0時unpark阻塞線程;這一步是由最后一個執行countdown方法的線程執行的。

      而調用await()方法時,當前線程就會判斷state屬性是否為0,如果為0,則繼續往下執行,如果不為0,則使當前線程進入等待狀態,直到某個線程將state屬性置為0,其就會喚醒在await()方法中等待的線程。

      CountDownLatch與Thread.join的區別

      • CountDownLatch的作用就是允許一個或多個線程等待其他線程完成操作,看起來有點類似join() 方法,但其提供了比 join() 更加靈活的API。
      • CountDownLatch可以手動控制在n個線程里調用n次countDown()方法使計數器進行減一操作,也可以在一個線程里調用n次執行減一操作。
      • 而 join() 的實現原理是不停檢查join線程是否存活,如果 join 線程存活則讓當前線程永遠等待。所以兩者之間相對來說還是CountDownLatch使用起來較為靈活。

      CyclicBarrier

      CyclicBarrier介紹

      字面意思回環柵欄,通過它可以實現讓一組線程等待至某個狀態(屏障點)之后再全部同時執行。叫做回環是因為當所有等待線程都被釋放以后,CyclicBarrier可以被重用。

      ?
      image

      CyclicBarrier的使用

      構造方法

      // parties表示屏障攔截的線程數量,每個線程調用 await 方法告訴 CyclicBarrier 我已經到達了屏障,然后當前線程被阻塞。
      public CyclicBarrier(int parties, Runnable barrierAction) {
          if (parties <= 0) throw new IllegalArgumentException();
          this.parties = parties;
          this.count = parties;
          this.barrierCommand = barrierAction;
      }
      
      // parties表示屏障攔截的線程數量,每個線程調用 await 方法告訴 CyclicBarrier 我已經到達了屏障,然后當前線程被阻塞。
      public CyclicBarrier(int parties) {
          this(parties, null);
      }
      

      CyclicBarrier應用場景

      CyclicBarrier 可以用于多線程計算數據,最后合并計算結果的場景。

      import java.util.Set;
      import java.util.concurrent.BrokenBarrierException;
      import java.util.concurrent.ConcurrentHashMap;
      import java.util.concurrent.CyclicBarrier;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      import java.util.concurrent.TimeUnit;
      
      /**
       * 柵欄與閉鎖的關鍵區別在于,所有的線程必須同時到達柵欄位置,才能繼續執行。
       */
      public class CyclicBarrierTest2 {
      
          //保存每個學生的平均成績
          private ConcurrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>();
      
          private ExecutorService threadPool= Executors.newFixedThreadPool(3);
      
          private CyclicBarrier cb=new CyclicBarrier(3, new Runnable() {
              @Override
              public void run() {
                  int result = 0;
                  Set<String> set = map.keySet();
                  for (String s : set) {
                      result += map.get(s);
                  }
                  System.out.println("三人平均成績為:" + (result / 3) + "分");
              }
          });
      
      
          public void count(){
              for(int i=0;i<3;i++){
                  threadPool.execute(new Runnable(){
      
                      @Override
                      public void run() {
                          //獲取學生平均成績
                          int score=(int)(Math.random()*40+60);
                          map.put(Thread.currentThread().getName(), score);
                          System.out.println(Thread.currentThread().getName()
                                  +"同學的平均成績為:"+score);
                          try {
                              //執行完運行await(),等待所有學生平均成績都計算完畢
                              cb.await();
                          } catch (InterruptedException | BrokenBarrierException e) {
                              e.printStackTrace();
                          }
                      }
      
                  });
              }
          }
      
      
          public static void main(String[] args) {
              CyclicBarrierTest2 test2=new CyclicBarrierTest2();
              test2.count();
          }
      }
      

      結果

      pool-1-thread-1同學的平均成績為:69
      pool-1-thread-3同學的平均成績為:81
      pool-1-thread-2同學的平均成績為:72
      三人平均成績為:74分
      

      利用CyclicBarrier的計數器能夠重置,屏障可以重復使用的特性,可以支持類似“人滿發車”的場景

      import java.io.Writer;
      import java.util.concurrent.ArrayBlockingQueue;
      import java.util.concurrent.BrokenBarrierException;
      import java.util.concurrent.CyclicBarrier;
      import java.util.concurrent.Executor;
      import java.util.concurrent.Executors;
      import java.util.concurrent.ThreadLocalRandom;
      import java.util.concurrent.ThreadPoolExecutor;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.atomic.AtomicInteger;
      
      import lombok.extern.slf4j.Slf4j;
      
      /**
       *  利用CyclicBarrier的計數器能夠重置,屏障可以重復使用的特性,可以支持類似“人滿發車”的場景
       */
      @Slf4j
      public class CyclicBarrierTest3 {
      
          public static void main(String[] args) {
      
              AtomicInteger counter = new AtomicInteger();
              ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                      5, 5, 1000, TimeUnit.SECONDS,
                      new ArrayBlockingQueue<>(100),
                      (r) -> new Thread(r, counter.addAndGet(1) + " 號 "),
                      new ThreadPoolExecutor.AbortPolicy());
      
              CyclicBarrier cyclicBarrier = new CyclicBarrier(5,
                      () -> System.out.println("裁判:比賽開始~~"));
      
              for (int i = 0; i < 10; i++) {
                  threadPoolExecutor.submit(new Runner(cyclicBarrier));
              }
      
          }
          static class Runner extends Thread{
              private CyclicBarrier cyclicBarrier;
              public Runner (CyclicBarrier cyclicBarrier) {
                  this.cyclicBarrier = cyclicBarrier;
              }
      
              @Override
              public void run() {
                  try {
                      int sleepMills = ThreadLocalRandom.current().nextInt(1000);
                      Thread.sleep(sleepMills);
                      System.out.println(Thread.currentThread().getName() + " 選手已就位, 準備共用時: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting());
                      cyclicBarrier.await();
      
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }catch(BrokenBarrierException e){
                      e.printStackTrace();
                  }
              }
          }
      
      }
      

      結果

      5 號  選手已就位, 準備共用時: 55ms0
      3 號  選手已就位, 準備共用時: 71ms1
      2 號  選手已就位, 準備共用時: 105ms2
      1 號  選手已就位, 準備共用時: 449ms3
      4 號  選手已就位, 準備共用時: 872ms4
      裁判:比賽開始~~
      1 號  選手已就位, 準備共用時: 321ms0
      5 號  選手已就位, 準備共用時: 374ms1
      3 號  選手已就位, 準備共用時: 704ms2
      2 號  選手已就位, 準備共用時: 807ms3
      4 號  選手已就位, 準備共用時: 923ms4
      裁判:比賽開始~~
      

      CountDownLatch與CyclicBarrier的區別

      CountDownLatch和CyclicBarrier都能夠實現線程之間的等待,只不過它們側重點不同:

      • CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset() 方法重置。所以CyclicBarrier能處理更為復雜的業務場景,比如如果計算發生錯誤,可以重置計數器,并讓線程們重新執行一次
      • CyclicBarrier還提供getNumberWaiting(可以獲得CyclicBarrier阻塞的線程數量)、isBroken(用來知道阻塞的線程是否被中斷)等方法。
      • CountDownLatch會阻塞主線程,CyclicBarrier不會阻塞主線程,只會阻塞子線程。
      • CountDownLatch和CyclicBarrier都能夠實現線程之間的等待,只不過它們側重點不同。CountDownLatch一般用于一個或多個線程,等待其他線程執行完任務后,再執行。CyclicBarrier一般用于一組線程互相等待至某個狀態,然后這一組線程再同時執行。
      • CyclicBarrier 還可以提供一個 barrierAction,合并多線程計算結果。
      • CyclicBarrier是通過ReentrantLock的"獨占鎖"和Conditon來實現一組線程的阻塞喚醒的,而CountDownLatch則是通過AQS的“共享鎖”實現

      CyclicBarrier源碼分析

      關注點:

      1.一組線程在觸發屏障之前互相等待,最后一個線程到達屏障后喚醒邏輯是如何實現的

      2.刪欄循環使用是如何實現的

      3.條件隊列到同步隊列的轉換實現邏輯

      https://www.processon.com/view/link/6197b0aef346fb271b36a2bf

      image

      posted on 2022-03-09 00:23  路仁甲  閱讀(77)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 亚洲天堂亚洲天堂亚洲天堂| 婷婷五月综合激情| 一本精品中文字幕在线| 国产三级无码内射在线看| 精品亚洲国产成人av在线| 亚洲欧美自偷自拍视频图片| 久久久精品94久久精品| 1区2区3区4区产品不卡码网站| 青草精品国产福利在线视频| A级日本乱理伦片免费入口| 精品人妻中文无码av在线| 国产日韩精品免费二三氏| 久久精品国产亚洲综合av| 中文字幕日韩欧美就去鲁| 国产精品自在拍首页视频8| 精品少妇人妻av无码专区| 欧美乱强伦xxxx孕妇| 国产高清在线男人的天堂| 国产精品午夜福利免费看| 一级做a爰片久久毛片下载| 中文字幕人妻日韩精品| 日本久久一区二区免高清| 日韩亚洲中文图片小说| jk白丝喷浆| 波多野结衣久久一区二区| 亚洲丰满熟女一区二区v| 人妻教师痴汉电车波多野结衣| 亚洲国产成人久久精品不卡| 亚洲中文久久久精品无码| 午夜福利在线观看6080| 亚洲精品日韩在线丰满| 久热这里有精品视频在线| 国产精品午夜福利小视频| 国产免费久久精品99reswag| 韶山市| 4399理论片午午伦夜理片| 国产午夜亚洲精品国产成人 | 国产美女直播亚洲一区色| 国产精品免费看久久久| 亚洲中文久久久精品无码| 日日躁夜夜躁狠狠躁超碰97|