CountDownLatch和FutureTask類使用方法解析
摘要:使用CountDownLatch和FutureTask解決主線程需要拿到多個子線程任務的執行結果之后再進行執行的問題。
綜述
??我們在工作中,經常遇到有些業務場景需要使用多線程異步執行任務,從而加快任務執行速度。本文探討的業務場景如下:某一個業務接口,需要處理幾百個請求,之后再由一個線程匯總每個請求的執行結果。解決辦法是基于并發包中的FutureTask,用輪詢方式判斷 Future.isDone 任務是否結束,再獲取結果。
??FutureTask是什么?FutureTask表示一個異步運算的任務,是Java 5 新增的Future接口的一個基礎實現,我們可以將它同Executors一起使用處理異步任務。
??FutureTask里面可以傳入一個Callable的具體實現類,可以對這個異步運算任務的結果進行等待獲取、判斷是否已經完成、取消任務等操作。當然,由于FutureTask也是Runnable接口的實現類,所以FutureTask也可以放入線程池中。
??FutureTask表示的計算是通過Callable來實現的,相當于一種可生產結果的Runnable,并且可以處于以下3種狀態:等待運行,正在運行和運行完成。運行完成表示計算的所有可能結束方式,包括正常結束、由于取消而結束和由于異常而結束等。當FutureTask進入完成狀態后,它會永遠停止在這個狀態上。Future.get的行為取決于任務的狀態,如果任務已經完成,那么get會立刻返回結果,否則get將阻塞直到任務進入完成狀態,然后返回結果或者異常。
CountDownLatch方法詳解
??CountDownLatch是一個同步工具類,它通過一個計數器來實現,初始值為線程的數量。每當一個線程完成了自己的任務,計數器的值就務必減1;當計數器遞減至0時,表示所有的線程都已執行完畢,然后在等待的線程就被喚醒。
CountDownLatch(int count):count為計數器的初始值(一般初始化為線程個數)。
countDown():每調用一次計數器值減去1,直到計數器的值被減為0,代表所有線程全部執行完畢。
getCount():獲取當前計數器的值。
await(): 等待計數器變為0,即等待所有異步線程執行完畢。
boolean await(long timeout, TimeUnit unit):無論計數器的值是否遞減到0,只等待timeout時間就喚醒。它與await()區別:
① 至多會等待指定的時間,超時后自動喚醒,若 timeout 小于等于零,則不會等待;
② boolean 類型返回值:若計數器變為零了,則返回 true;若指定的等待時間過去了且計數器的值大于零,則返回 false。
CountDownLatch應用場景
??常見應用場景如下:
- 某個線程需要在其它N個線程執行完畢后再執行。
- 某個線程需要等待其它N個線程執行 T 毫秒后再執行。
- 多個線程并行執行同一個任務,提高并發量。
Future類
??Future類提供了方法來檢查異步調用是否完成、等待異步調用完成并獲取異步調用返回結果。get()方法可以對線程進行阻塞,直到異步調用完成并返回結果。cancel()方法可以取消異步方法的執行。
??Future是一個接口,定義了異步線程執行結果的獲取方法,以及異步線程執行的取消方法。下面看源碼:
boolean cancel(boolean mayInterruptIfRunning);
??嘗試取消此任務的執行。如果任務已完成、已取消或由于其他原因無法取消,則此嘗試將失敗。如果成功,并且在調用cancel時此任務尚未啟動,則此任務不應運行。如果任務已經啟動,則mayInterruptIfRunning參數確定執行此任務的線程是否應該中斷以嘗試停止任務。
此方法返回后,對isDone的后續調用將始終返回true。如果此方法返回true,則對isCancelled的后續調用將始終返回true。
V get() throws InterruptedException, ExecutionException;
??阻塞調用線程直到任務執行完成或者取消,然后檢索其結果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
??如果需要,最多等待給定時間以完成任務,然后檢索其結果(如果可用),此時如果任務未結束,則拋出TimeoutException異常。
isDone()
??是否執行結束,true 已結束,false 未結束。
夯實基礎
??學習并發部分的Future與CountDownLatch時有個疑惑:既然Future對象的get方法會掛起等待到該線程執行完并返回結果時才執行,那么有時候為什么還需配合使用CountDownLatch呢?接下來通過一個例子來說明為什么需要,本例模擬并行獲取接口(工人工作時長)數據,最后Boss線程統計優秀員工工作時間。
??基本程序基于帖子 java latch閉鎖基本使用(結合future) 做了改動。我們在該程序上做進一步的小測試——某個線程需要等待其它N個線程執行M 毫秒后再執行。定義worker線程,它會執行工作并返回工作時長:
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @Author Wiener
* @Date 2022-12-25
* @Description: 實現Callable的工人
*/
public class WorkerWithResult implements Callable<Worker> {
private CountDownLatch downLatch;
private int workerId;
public WorkerWithResult(CountDownLatch downLatch, int workerId) {
this.downLatch = downLatch;
this.workerId = workerId;
}
@Override
public Worker call() {
int workTime = -1;
try {
workTime = new Random().nextInt(3000);
// 降低4號員工的工作效率
if (workerId == 4) {
workTime = workTime + 3000;
}
//通過睡眠時長模擬在處理復雜業務,如果不睡,效率剛剛的
TimeUnit.MILLISECONDS.sleep(workTime);
System.out.println(workerId + " used time: " + workTime);
} finally {
Worker oneWorker = new Worker();
oneWorker.setWorkerId(this.workerId);
oneWorker.setWorkTime(workTime);
downLatch.countDown();
return oneWorker;
}
}
}
??boss線程統計所有優秀worker的工作時長,并在控制臺打印優秀員工的工時情況。接收時以list傳入它的構造器:
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* Boss Bean
*
* @author Wiener
*/
@Slf4j
public class BossWithResult implements Runnable {
private static Long givenWorkTime = 2000L;
private CountDownLatch downLatch;
//保存每個工人的工作時間
private List<Future> taskList;
public BossWithResult(CountDownLatch downLatch, List<Future> workTimeUseList) {
this.downLatch = downLatch;
this.taskList = workTimeUseList;
}
@Override
public void run() {
Integer totalTime = 0;
// 任務開始前預創建變量
List<Future> doneTasks = new ArrayList<>();
try {
// 等待指定時間后,查看工作完成情況,這期間完成工作說明工作效率杠杠的,優秀員工
Boolean finished = downLatch.await(givenWorkTime, TimeUnit.MILLISECONDS);
if (!finished) {
// 響應超時
log.warn("響應超時: {} ms", givenWorkTime);
}
} catch (InterruptedException e) {
log.error("任務失敗,", e);
}
// 保留已完成的任務
for (Future<Worker> workerTask : taskList) {
if (workerTask.isDone()) {
doneTasks.add(workerTask);
}
}
log.info("已完成task:{}", doneTasks.size());
// 分析已經執行完的任務
int workTime = 0;
for (Future<Worker> excellentWorker : doneTasks) {
try {
Worker oneWorker = excellentWorker.get(1, TimeUnit.MILLISECONDS);
workTime = oneWorker.getWorkTime();
if (workTime > givenWorkTime) {
log.info(" ****** Worker id {} uses {}", oneWorker.getWorkerId(), workTime);
} else {
log.info("The work time of worker id {} is {}", oneWorker.getWorkerId(), workTime);
}
totalTime = workTime + totalTime;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("-----------------", e);
}
}
log.info("工人總的工作時長是:" + totalTime);
}
}
??Worker Bean定義如下:
/**
* @Author Wiener
* @Date 2022-12-25
* @Description: 工人 Bean
*/
@Getter
@Setter
@ToString
public class Worker implements Serializable {
private static final long serialVersionUID = 8461989391177768538L;
/**
* 工人ID
*/
private Integer workerId;
/**
* 工作時長
*/
private Integer workTime;
}
??基于線程池、CountDownLatch和FutureTask,定義boss線程與多個worker線程,并且使它們并發執行:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class LatchDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* 使用future統計每個worker工作的時長,最后通過latch的await函數通知boss統計工時。
* 溫馨提示,在給boss傳遞參數的時候,可以讓boss直接獲得future中的值,但是如果使用
* 這種方式,就沒有必要使用latch了,因為在獲取每個worker的值時需要使用future.get(),能夠創建完成
* 參數的時候,worker線程應該已經結束了。所以就沒有必要使用latch了。
*
* 如果像下面程序傳遞的是future,然后在boss的線程中對future進行取值,就是需要latch的。因為在boss線程
* 開始的時候future沒有執行完成,需要latch等待2000ms后,才能保證有的future已經執行結束。
*
* 定義工人個數,數量越大,模擬的越逼真
*/
int workerNum = 21;
CountDownLatch downLatch = new CountDownLatch(workerNum);
ExecutorService executor = Executors.newFixedThreadPool(workerNum);
List<Future> workTimeList = new ArrayList<>();
for (int i = 0; i < workerNum; i ++) {
FutureTask<Worker> loopTask = new FutureTask<>(new WorkerWithResult(downLatch, i));
workTimeList.add(loopTask);
executor.submit(loopTask);
}
executor.submit(new BossWithResult(downLatch, workTimeList));
// 關閉線程池
executor.shutdown();
}
}
??如果把每個worker線程視作一個微服務API調用,這就變成了分布式環境中調度不同微服務,是不是更加貼近實戰?
結果分析
??隨機抽取執行一次的結果,控制臺打印信息如下:
18 used time: 115
14 used time: 208
20 used time: 327
16 used time: 411
15 used time: 437
17 used time: 759
6 used time: 867
7 used time: 937
13 used time: 1025
5 used time: 1029
19 used time: 1113
10 used time: 1125
1 used time: 1313
8 used time: 1316
0 used time: 1390
9 used time: 1581
3 used time: 2082
11 used time: 2137
18:08:56.125 [pool-1-thread-19] WARN com.swagger.demo.service.latch.BossWithResult - 響應超時: 2000 ms
18:08:56.139 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - 已完成task:18
18:08:56.140 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 0 is 1390
18:08:56.141 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 1 is 1313
18:08:56.141 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - ****** Worker id 3 uses 2082
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 5 is 1029
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 6 is 867
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 7 is 937
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 8 is 1316
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 9 is 1581
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 10 is 1125
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - ****** Worker id 11 uses 2137
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 13 is 1025
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 14 is 208
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 15 is 437
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 16 is 411
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 17 is 759
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 18 is 115
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 19 is 1113
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - The work time of worker id 20 is 327
18:08:56.142 [pool-1-thread-19] INFO com.swagger.demo.service.latch.BossWithResult - 工人總的工作時長是:18172
2 used time: 2491
12 used time: 2919
4 used time: 4190
??通過分析執行結果發現,批量執行的時候,由于需要處理之前的task,導致通過isDone()判斷是否已經完成是不準確的,一部分超過超時時間才完成工作的工人也被定義為優秀員工。這一點需要各位老鐵注意。如果你有好的規避方法,請不吝賜教,留言評論!
結束語
??我確實最近比較忙,每個月擠不出幾篇文章,雖然有點手癢癢,涂了皮炎平也抑制不住。
??擋風玻璃為什么比后視鏡大?因為前面的路比過去的更重要。你可以回頭看,但別忘了前行。
Buy me a coffee. ?Get red packets.
浙公網安備 33010602011771號