并發控制流程
控制并發流程
- 控制并發流程的工具類,作用就是更容易得讓線程之間合作
- 讓線程之間相互配合,來滿足業務邏輯
- 比如讓線程A等待線程B執行完畢后再執行等合作策略
類 | 作用 | 說明 |
Semaphore | 信號量,可以通過控制“許 | 線程只有在拿到“許可證”后 |
CyclicBarrier | 線程會等待,直到足夠多 | 適用于線程之間相互等待 |
Phaser | 和CyclicBarrier類似,但是 | Java 7加入的 |
CountDownLatch | 和CyclicBarrier類似,數量 | 不可重復使用 |
Exchanger | 讓兩個線程在合適時交換 | 適用場景:當兩個線程工 |
Condition | 可以控制線程的“等待”和 | 是Object.wait()的升級版 |
CountDownLatch
數量遞減到0,觸發動作。latch:門閂
- 流程:倒數結束之前,一直處于等待狀態,直到倒計時結束了,此線程才繼續工作。
- CountDownLatch(int count):僅有一個構造函數,參數count為需要倒數的數值。
- await():調用await()方法的線程會被掛起,它會等待直到count值為0才繼續執行。
- countDown():將count值減1,直到為0時,等待的線程會被喚起。

用法一:一個線程等待多個線程都執行完畢,再繼續自己的工作。
public class CountDownLatchDemo1 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int no = i + 1;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println("No." + no + "完成了檢查。");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
};
service.submit(runnable);
}
System.out.println("等待5個人檢查完.....");
latch.await();
System.out.println("所有人都完成了工作,進入下一個環節。");
}
}
用法二:多個線程等待某一個線程的信號,同時開始執行。
public class CountDownLatchDemo2 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch begin = new CountDownLatch(1);
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int no = i + 1;
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("No." + no + "準備完畢,等待發令槍");
try {
begin.await();
System.out.println("No." + no + "開始跑步了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
service.submit(runnable);
}
//裁判員檢查發令槍...
Thread.sleep(5000);
System.out.println("發令槍響,比賽開始!");
begin.countDown();
}
}
- 擴展用法:多個線程等多個線程完成執行后,再同時執行
- CountDownLatch是不能夠重用的,如果需要重新計數,可以考慮使用CyclicBarrier或者創建新的CountDownLatch實例。
Semaphore信號量
- Semaphore可以用來限制或管理數量有限的資源的使用情況。
- 信號量的作用是維護一個“許可證”的計數,線程可以“獲取”許可證,那信號量剩余的許可證就減一,線程也可以“釋放”一個許可證,那信號量剩余的許可證就加一,當信號量所擁有的許可證數量為0,那么下一個還想要獲取許可證的線程,就需要等待,直到有另外的線程釋放了許可證
信號量使用流程
1初始化Semaphore并指定許可證的數量
2.在需要被現在的代碼前加acquire()或者acquireUninterruptibly()方法
3.在任務執行結束后,調用release()來釋放許可證
信號量主要方法
- new Semaphore(int permits,boolean fair):這里可以設置是否要使用公平策略,如果傳入true,那么Semaphore會把之前等待的線程放到FIFO的隊列里,以便于當有了新的許可證可以分發給之前等了最長時間的線程。
- tryAcquire():看看現在有沒有空閑的許可證,如果有的話就獲取,如果沒有的話也沒關系,我不必陷入阻塞,我可以去做別的事,過一會再來查看許可證的空閑情況。
- tryAcquire(timeout):和tryAcquire()一樣,但是多了一個超時時間,比如“在3秒內獲取不到許可證,我就去做別的事”。
特殊用法
- 一次性獲取或釋放多個許可證
比如TaskA會調用很消耗資源的method1(),而TaskB調用的是不太消耗資源的method2(),假設我們一共有5個許可證。那么我們就可以要求TaskA獲取5個許可證才能執行,一個許可證就能執行,這樣就避免而TaskB只需要獲取到
了A和B同時運行的情況,我們可以根據自己的需求合理分配資源。
注意
- 獲取和釋放的許可證數量必須一致,否則比如每次都獲取2個但是只釋放1個甚至不釋放,隨著時間的推移,到最后許可證數量不夠用,會導致程序卡死。(雖然信號量類并不對是否和獲取的數量做規定,但是這是我們的編程規范,否則容易出錯)
- 注意在初始化Semaphore的時候設置公平性,一般設置》為true會更合理
- 并不是必須由獲取許可證的線程釋放那個許可證,事實上,獲取和釋放許可證對線程并無要求,也許是A獲取了,然后由B釋放,只要邏輯合理即可。
- 信號量的作用,除了控制臨界區最多同時有N個線程訪問外,另一個作用是可以實現“條件等待”,例如線程1需要在線程2完成準備工作后才能開始工作,那么就線程lacquire(),而線這樣的話,相當于是輕量級的程2完成任務后release(),CountDownLatch。
public class SemaphoreDemo {
static Semaphore semaphore = new Semaphore(5, true);
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(50);
for (int i = 0; i < 100; i++) {
service.submit(new Task());
}
service.shutdown();
}
static class Task implements Runnable {
@Override
public void run() {
try {
semaphore.acquire(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "拿到了許可證");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "釋放了許可證");
semaphore.release(2);
}
}
}
Condition接口(條件對象)
- 當線程1需要等待某個條件的時候,它就去執行condition.await()方法,一旦執行了await()方法,線程就會進入阻塞狀態
- 然后通常會有另外一個線程,假設是線程2,去執行對應的條件,直到這個條件達成的時候,線程2就會去執行condition.signal()方法,這時JVM就會從被阻塞的線程里找,找到那些等待該condition的線程,當線程1就會收到可執行信號的時候,它的線程狀態就會變成Runnable可執行狀態

signalAll()和signal()區別
- signalAll()會喚起所有的正在等待的線程
- 但是signal()是公平的,只會喚起那個等待時間最長的線程
public class ConditionDemo1 {
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
void method1() throws InterruptedException {
lock.lock();
try{
System.out.println("條件不滿足,開始await");
condition.await();
System.out.println("條件滿足了,開始執行后續的任務");
}finally {
lock.unlock();
}
}
void method2() {
lock.lock();
try{
System.out.println("準備工作完成,喚醒其他的線程");
condition.signal();
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ConditionDemo1 conditionDemo1 = new ConditionDemo1();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
conditionDemo1.method2();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
conditionDemo1.method1();
}
}
實現生產模式
public class ConditionDemo2 {
private int queueSize = 10;
private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public static void main(String[] args) {
ConditionDemo2 conditionDemo2 = new ConditionDemo2();
Producer producer = conditionDemo2.new Producer();
Consumer consumer = conditionDemo2.new Consumer();
producer.start();
consumer.start();
}
class Consumer extends Thread {
@Override
public void run() {
consume();
}
private void consume() {
while (true) {
lock.lock();
try {
while (queue.size() == 0) {
System.out.println("隊列空,等待數據");
try {
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll();
notFull.signalAll();
System.out.println("從隊列里取走了一個數據,隊列剩余" + queue.size() + "個元素");
} finally {
lock.unlock();
}
}
}
}
class Producer extends Thread {
@Override
public void run() {
produce();
}
private void produce() {
while (true) {
lock.lock();
try {
while (queue.size() == queueSize) {
System.out.println("隊列滿,等待有空余");
try {
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.offer(1);
notEmpty.signalAll();
System.out.println("向隊列插入了一個元素,隊列剩余空間" + (queueSize - queue.size()));
} finally {
lock.unlock();
}
}
}
}
}
CyclicBarrier(循環柵欄)
- CyclicBarrier循環柵欄和CountDownLatch很類似,都能阻塞一組線程
- 當有大量線程相互配合,分別計算不同任務,并且需要最后統一匯總的時候,我們可以使用CyclicBarrier。CyclicBarrier可以構造一個集結點,當某一個一個線程執行完畢,它就會到集結點等待,直到所有線程都到'了集結點,那么該柵欄就被撤銷,所有線程再統一出發,繼續執行剩下的任務。
- 與CountDownLatch不同
- 作用不同:CyclicBarrier要等固定數量的線程都到達了柵欄位置才能繼續執行,而CountDownLatch只需等待數字到0,也就是說,CountDownLatch用于事件,但是CyclicBarrier是用于線程的。
- 可重用性不同:CountDownLatch在倒數到0并觸發門閂打開后,就不能再次使用了,除非新建新的實例;而CyclicBarrier可以重復使用。
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("所有人都到場了, 大家統一出發!");
}
});
for (int i = 0; i < 10; i++) {
new Thread(new Task(i, cyclicBarrier)).start();
}
}
static class Task implements Runnable{
private int id;
private CyclicBarrier cyclicBarrier;
public Task(int id, CyclicBarrier cyclicBarrier) {
this.id = id;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("線程" + id + "現在前往集合地點");
try {
Thread.sleep((long) (Math.random()*10000));
System.out.println("線程"+id+"到了集合地點,開始等待其他人到達");
cyclicBarrier.await();
System.out.println("線程"+id+"出發了");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}

浙公網安備 33010602011771號