Semaphore
Semaphore介紹
Semaphore,俗稱信號量,它是操作系統中PV操作的原語在java的實現,它也是基于AbstractQueuedSynchronizer實現的。
Semaphore的功能非常強大,大小為1的信號量就類似于互斥鎖,通過同時只能有一個線程獲取信號量實現。大小為n(n>0)的信號量可以實現限流的功能,它可以實現只能有n個線程同時獲取信號量。
?

PV操作是操作系統一種實現進程互斥與同步的有效方法。PV操作與信號量(S)的處理相關,P表示通過的意思,V表示釋放的意思。用PV操作來管理共享資源時,首先要確保PV操作自身執行的正確性。
P操作的主要動作是:
①S減1;
②若S減1后仍大于或等于0,則進程繼續執行;
③若S減1后小于0,則該進程被阻塞后放入等待該信號量的等待隊列中,然后轉進程調度。
V操作的主要動作是:
①S加1;
②若相加后結果大于0,則進程繼續執行;
③若相加后結果小于或等于0,則從該信號的等待隊列中釋放一個等待進程,然后再返回原進程繼續執行或轉進程調度。
Semaphore 常用方法
構造器
默認為非公平鎖
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
- permits 表示許可證的數量(資源數)
- fair 表示公平性,如果這個設為 true 的話,下次執行的線程會是等待最久的線程
常用方法
public void acquire() throws InterruptedException
public boolean tryAcquire()
public void release()
public int availablePermits()
public final int getQueueLength()
public final boolean hasQueuedThreads()
protected void reducePermits(int reduction)
- acquire() 表示阻塞并獲取許可
- tryAcquire() 方法在沒有許可的情況下會立即返回 false,要獲取許可的線程不會阻塞
- release() 表示釋放許可
- int availablePermits():返回此信號量中當前可用的許可證數。
- int getQueueLength():返回正在等待獲取許可證的線程數。
- boolean hasQueuedThreads():是否有線程正在等待獲取許可證。
- void reducePermit(int reduction):減少 reduction 個許可證
- Collection getQueuedThreads():返回所有等待獲取許可證的線程集合
應用場景
可以用于做流量控制,特別是公用資源有限的應用場景
限流
1
/**
* Semaphore是一個計數信號量,Semaphore經常用于限制獲取資源的線程數量
*
*/
public class SemaphoreTest {
public static void main(String[] args) {
// 聲明3個窗口 state: 資源數
Semaphore windows = new Semaphore(3);
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
// 占用窗口 加鎖
windows.acquire();
System.out.println(Thread.currentThread().getName() + ": 開始買票");
//模擬買票流程
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + ": 購票成功");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 釋放窗口
windows.release();
}
}
}).start();
}
}
}
2
import java.util.Date;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class SemaphoneTest2 {
/**
* 實現一個同時只能處理5個請求的限流器
*/
private static Semaphore semaphore = new Semaphore(5);
/**
* 定義一個線程池
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor
(10, 50, 60,
TimeUnit.SECONDS, new LinkedBlockingDeque<>(200));
/**
* 模擬執行方法
*/
public static void exec() {
try {
//占用1個資源
semaphore.acquire(1);
//TODO 模擬業務執行
System.out.println("執行exec方法");
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
} finally {
//釋放一個資源
semaphore.release(1);
System.out.println("釋放一個資源"+new Date());
}
}
public static void main(String[] args) throws InterruptedException {
{
for (; ; ) {
Thread.sleep(100);
// 模擬請求以10個/s的速度
executor.execute(new Runnable() {
@Override
public void run() {
exec();
}
});
}
}
}
}
結果
執行exec方法
執行exec方法
執行exec方法
執行exec方法
執行exec方法
執行exec方法
釋放一個資源Wed Jan 12 15:38:05 CST 2022
執行exec方法
釋放一個資源Wed Jan 12 15:38:05 CST 2022
執行exec方法
釋放一個資源Wed Jan 12 15:38:05 CST 2022
執行exec方法
釋放一個資源Wed Jan 12 15:38:05 CST 2022
執行exec方法
釋放一個資源Wed Jan 12 15:38:05 CST 2022
執行exec方法
釋放一個資源Wed Jan 12 15:38:07 CST 2022
執行exec方法
釋放一個資源Wed Jan 12 15:38:07 CST 2022
執行exec方法
釋放一個資源Wed Jan 12 15:38:07 CST 2022
執行exec方法
釋放一個資源Wed Jan 12 15:38:07 CST 2022
執行exec方法
釋放一個資源Wed Jan 12 15:38:07 CST 2022
執行exec方法
釋放一個資源Wed Jan 12 15:38:09 CST 2022
執行exec方法
釋放一個資源Wed Jan 12 15:38:09 CST 2022
執行exec方法
釋放一個資源Wed Jan 12 15:38:09 CST 2022
執行exec方法
釋放一個資源Wed Jan 12 15:38:09 CST 2022
釋放一個資源Wed Jan 12 15:38:09 CST 2022
執行exec方法
....
Semaphore源碼分析
關注點:
-
Semaphore的加鎖解鎖(共享鎖)邏輯實現
-
線程競爭鎖失敗入隊阻塞邏輯和獲取鎖的線程釋放鎖喚醒阻塞線程競爭鎖的邏輯實現
https://www.processon.com/view/link/61950f6e5653bb30803c5bd2
?

CountDownLatch
CountDownLatch介紹
CountDownLatch(閉鎖)是一個同步協助類,允許一個或多個線程等待,直到其他線程完成操作集。
CountDownLatch使用給定的計數值(count)初始化。await方法會阻塞直到當前的計數值(count)由于countDown方法的調用達到0,count為0之后所有等待的線程都會被釋放,并且隨后對await方法的調用都會立即返回。這是一個一次性現象 —— count不會被重置。如果你需要一個重置count的版本,那么請考慮使用CyclicBarrier。
?

CountDownLatch的使用
構造器
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
常用方法
// 調用 await() 方法的線程會被掛起,它會等待直到 count 值為 0 才繼續執行
public void await() throws InterruptedException { };
// 和 await() 類似,若等待 timeout 時長后,count 值還是沒有變為 0,不再等待,繼續執行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
// 會將 count 減 1,直至為 0
public void countDown() {
sync.releaseShared(1);
}
CountDownLatch應用場景
CountDownLatch一般用作多線程倒計時計數器,強制它們等待其他一組(CountDownLatch的初始化決定)任務執行完成。
CountDownLatch的兩種使用場景:
- 場景1:讓多個線程等待
- 場景2:讓單個線程等待。
場景1 讓多個線程等待:
模擬并發,讓并發線程一起執行
import java.util.Date;
import java.util.concurrent.CountDownLatch;
/**
* 讓多個線程等待:模擬并發,讓并發線程一起執行
*/
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
System.out.println(new Date());
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
//準備完畢……運動員都阻塞在這,等待號令
countDownLatch.await();
String parter = "【" + Thread.currentThread().getName() + "】";
System.out.println(parter + "開始執行……"+new Date());
System.out.println(new Date());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(2000);// 裁判準備發令
countDownLatch.countDown();// 發令槍:執行發令
}
}
結果
Wed Jan 12 16:12:54 CST 2022
【Thread-1】開始執行……Wed Jan 12 16:12:56 CST 2022
【Thread-0】開始執行……Wed Jan 12 16:12:56 CST 2022
【Thread-3】開始執行……Wed Jan 12 16:12:56 CST 2022
【Thread-2】開始執行……Wed Jan 12 16:12:56 CST 2022
Wed Jan 12 16:12:56 CST 2022
【Thread-4】開始執行……Wed Jan 12 16:12:56 CST 2022
Wed Jan 12 16:12:56 CST 2022
Wed Jan 12 16:12:56 CST 2022
Wed Jan 12 16:12:56 CST 2022
Wed Jan 12 16:12:56 CST 2022
場景2 讓單個線程等待:
多個線程(任務)完成后,進行匯總合并
很多時候,我們的并發任務,存在前后依賴關系;比如數據詳情頁需要同時調用多個接口獲取數據,并發請求獲取到數據后、需要進行結果合并;或者多個數據操作完成后,需要數據check;這其實都是:在多個線程(任務)完成后,進行匯總合并的場景。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
/**
* 讓單個線程等待:多個線程(任務)完成后,進行匯總合并
*/
public class CountDownLatchTest2 {
public static void main(String[] args) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
final int index = i;
new Thread(() -> {
try {
Thread.sleep(1000 +
ThreadLocalRandom.current().nextInt(1000));
System.out.println(Thread.currentThread().getName()
+ " finish task" + index);
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
// 主線程在阻塞,當計數器==0,就喚醒主線程往下執行。
countDownLatch.await();
System.out.println("主線程:在所有任務運行完成后,進行結果匯總");
}
}
結果
Thread-2 finish task2
Thread-0 finish task0
Thread-1 finish task1
Thread-3 finish task3
Thread-4 finish task4
主線程:在所有任務運行完成后,進行結果匯總
CountDownLatch實現原理
底層基于 AbstractQueuedSynchronizer 實現,CountDownLatch 構造函數中指定的count直接賦給AQS的state;每次countDown()則都是release(1)減1,最后減到0時unpark阻塞線程;這一步是由最后一個執行countdown方法的線程執行的。
而調用await()方法時,當前線程就會判斷state屬性是否為0,如果為0,則繼續往下執行,如果不為0,則使當前線程進入等待狀態,直到某個線程將state屬性置為0,其就會喚醒在await()方法中等待的線程。
CountDownLatch與Thread.join的區別
- CountDownLatch的作用就是允許一個或多個線程等待其他線程完成操作,看起來有點類似join() 方法,但其提供了比 join() 更加靈活的API。
- CountDownLatch可以手動控制在n個線程里調用n次countDown()方法使計數器進行減一操作,也可以在一個線程里調用n次執行減一操作。
- 而 join() 的實現原理是不停檢查join線程是否存活,如果 join 線程存活則讓當前線程永遠等待。所以兩者之間相對來說還是CountDownLatch使用起來較為靈活。
CyclicBarrier
CyclicBarrier介紹
字面意思回環柵欄,通過它可以實現讓一組線程等待至某個狀態(屏障點)之后再全部同時執行。叫做回環是因為當所有等待線程都被釋放以后,CyclicBarrier可以被重用。
?

CyclicBarrier的使用
構造方法
// parties表示屏障攔截的線程數量,每個線程調用 await 方法告訴 CyclicBarrier 我已經到達了屏障,然后當前線程被阻塞。
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
// parties表示屏障攔截的線程數量,每個線程調用 await 方法告訴 CyclicBarrier 我已經到達了屏障,然后當前線程被阻塞。
public CyclicBarrier(int parties) {
this(parties, null);
}
CyclicBarrier應用場景
CyclicBarrier 可以用于多線程計算數據,最后合并計算結果的場景。
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 柵欄與閉鎖的關鍵區別在于,所有的線程必須同時到達柵欄位置,才能繼續執行。
*/
public class CyclicBarrierTest2 {
//保存每個學生的平均成績
private ConcurrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>();
private ExecutorService threadPool= Executors.newFixedThreadPool(3);
private CyclicBarrier cb=new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
int result = 0;
Set<String> set = map.keySet();
for (String s : set) {
result += map.get(s);
}
System.out.println("三人平均成績為:" + (result / 3) + "分");
}
});
public void count(){
for(int i=0;i<3;i++){
threadPool.execute(new Runnable(){
@Override
public void run() {
//獲取學生平均成績
int score=(int)(Math.random()*40+60);
map.put(Thread.currentThread().getName(), score);
System.out.println(Thread.currentThread().getName()
+"同學的平均成績為:"+score);
try {
//執行完運行await(),等待所有學生平均成績都計算完畢
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}
public static void main(String[] args) {
CyclicBarrierTest2 test2=new CyclicBarrierTest2();
test2.count();
}
}
結果
pool-1-thread-1同學的平均成績為:69
pool-1-thread-3同學的平均成績為:81
pool-1-thread-2同學的平均成績為:72
三人平均成績為:74分
利用CyclicBarrier的計數器能夠重置,屏障可以重復使用的特性,可以支持類似“人滿發車”的場景
import java.io.Writer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
/**
* 利用CyclicBarrier的計數器能夠重置,屏障可以重復使用的特性,可以支持類似“人滿發車”的場景
*/
@Slf4j
public class CyclicBarrierTest3 {
public static void main(String[] args) {
AtomicInteger counter = new AtomicInteger();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5, 5, 1000, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
(r) -> new Thread(r, counter.addAndGet(1) + " 號 "),
new ThreadPoolExecutor.AbortPolicy());
CyclicBarrier cyclicBarrier = new CyclicBarrier(5,
() -> System.out.println("裁判:比賽開始~~"));
for (int i = 0; i < 10; i++) {
threadPoolExecutor.submit(new Runner(cyclicBarrier));
}
}
static class Runner extends Thread{
private CyclicBarrier cyclicBarrier;
public Runner (CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
int sleepMills = ThreadLocalRandom.current().nextInt(1000);
Thread.sleep(sleepMills);
System.out.println(Thread.currentThread().getName() + " 選手已就位, 準備共用時: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting());
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
}
}
}
結果
5 號 選手已就位, 準備共用時: 55ms0
3 號 選手已就位, 準備共用時: 71ms1
2 號 選手已就位, 準備共用時: 105ms2
1 號 選手已就位, 準備共用時: 449ms3
4 號 選手已就位, 準備共用時: 872ms4
裁判:比賽開始~~
1 號 選手已就位, 準備共用時: 321ms0
5 號 選手已就位, 準備共用時: 374ms1
3 號 選手已就位, 準備共用時: 704ms2
2 號 選手已就位, 準備共用時: 807ms3
4 號 選手已就位, 準備共用時: 923ms4
裁判:比賽開始~~
CountDownLatch與CyclicBarrier的區別
CountDownLatch和CyclicBarrier都能夠實現線程之間的等待,只不過它們側重點不同:
- CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset() 方法重置。所以CyclicBarrier能處理更為復雜的業務場景,比如如果計算發生錯誤,可以重置計數器,并讓線程們重新執行一次
- CyclicBarrier還提供getNumberWaiting(可以獲得CyclicBarrier阻塞的線程數量)、isBroken(用來知道阻塞的線程是否被中斷)等方法。
- CountDownLatch會阻塞主線程,CyclicBarrier不會阻塞主線程,只會阻塞子線程。
- CountDownLatch和CyclicBarrier都能夠實現線程之間的等待,只不過它們側重點不同。CountDownLatch一般用于一個或多個線程,等待其他線程執行完任務后,再執行。CyclicBarrier一般用于一組線程互相等待至某個狀態,然后這一組線程再同時執行。
- CyclicBarrier 還可以提供一個 barrierAction,合并多線程計算結果。
- CyclicBarrier是通過ReentrantLock的"獨占鎖"和Conditon來實現一組線程的阻塞喚醒的,而CountDownLatch則是通過AQS的“共享鎖”實現
CyclicBarrier源碼分析
關注點:
1.一組線程在觸發屏障之前互相等待,最后一個線程到達屏障后喚醒邏輯是如何實現的
2.刪欄循環使用是如何實現的
3.條件隊列到同步隊列的轉換實現邏輯
https://www.processon.com/view/link/6197b0aef346fb271b36a2bf

浙公網安備 33010602011771號