面試官:如何實現動態線程池的任務編排?
在開始聊動態線程池如何實現任務編排前,咱們先給大家聊聊什么是動態線程池?以及為什么需要任務編排?
1.動態線程池
定義:動態線程池是在程序運行期間,動態調整線程池參數而無需重啟程序的技術。
1.1 特性分析
動態線程池主要有以下三個特點:
- 可配置:支持運行時動態調整線程池參數,如核心線程數、最大線程數,并且修改后無需重啟服務即可生效。
- 可監控:動態線程池內置了全面的運行時監控能力,能夠定時采集并暴露線程池的多維度指標,幫助運維和開發人員實時掌握線程池的健康狀況。監控指標主要有以下幾個:
- 線程維度:當前線程數、活躍線程數、最大線程數、任務完成數、任務執行異常數等。
- 隊列維度:隊列當前大小、隊列剩余容量等。
- 任務維度:任務提交速率、任務執行耗時(TP99、TP999等)、任務等待耗時、任務拒絕次數等。
- 可預警:動態線程池提供了豐富且及時的預警機制,能夠在線程池出現潛在風險或異常行為時,第一時間通知到相關負責人。
- 預警維度:
- 配置變更通知:當線程池配置項在配置中心被修改時,會發送通知確認變更。
- 活性報警:當線程池的活躍度(活躍線程數 / 最大線程數)超過設定閾值時觸發。
- 隊列容量報警:當任務隊列的使用率(當前大小 / 隊列容量)超過設定閾值時觸發。
- 拒絕策略觸發報警:當線程池因隊列滿和線程滿而觸發拒絕策略,拒絕新任務時立即報警。
- 任務執行/等待超時報警:當任務的執行時間或等待時間超過設定的超時時間時觸發。
- 通知渠道:
- 原生支持:企業微信、釘釘、飛書、郵件等多種主流辦公通訊工具。
- 高擴展性:提供 SPI 接口,允許用戶接入自定義的報警通知平臺。
- 預警維度:
1.2 動態線程池實現
目前國內最知名的動態線程池開源實現技術是美團的 DynamicTP,官方地址:https://dynamictp.cn/
2.任務編排
定義:任務編排(Task Orchestration)是指管理和控制多個任務的執行流程,確保它們按照預定的順序正確執行。
在復雜的業務場景中,任務間通常存在依賴關系,也就是某個任務會依賴另一個任務的執行結果,在這種情況下,我們需要通過任務編排,來確保任務按照正確的順序進行執行。
例如,以下任務的執行順序:

其中,任務二要等任務一執行完才能執行,而任務四要等任務二和任務三全部執行完才能執行。
3.動態線程池任務編排
動態線程池的任務編排最靈活、也最推薦的是使用:CompletableFuture + DynamicTP 實現動態線程池的任務編排。
具體實現
我們可以直接將 DynamicTP 結合 CompletableFutrue 進行使用,從而實現任務編排。
CompletableFutrue 提供的方法有很多,但最常用和最實用的核心方法只有以下幾個:

接下來,使用 CompletableFuture 實現上述 4 個任務的編排(任務二要等任務一執行完才能執行,而任務四要等任務二和任務三全部執行完才能執行):
// 動態線程池
@Autowired
@Qualifier("dtpExecutor1")
private DtpExecutor dtpExecutor;
@RequestMapping("/dtp")
public String dtp() {
// 任務一:返回 "Task 1 result"
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
try {
// 模擬耗時操作
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "Task 1 result";
}, dtpExecutor);
// 任務二:依賴任務一,返回 "Task 2 result" + 任務一的結果
CompletableFuture<String> task2 = task1.handleAsync((result1, throwable) -> {
try {
// 模擬耗時操作
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "Task 2 result " + result1;
}, dtpExecutor);
// 任務三:和任務一、任務二并行執行,返回 "Task 3 result"
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
try {
// 模擬耗時操作
Thread.sleep(800); // 任務三可能比任務二先完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "Task 3 result";
}, dtpExecutor);
// 任務四:依賴任務二和任務三,等待它們都完成后執行,返回 "Task 4 result" + 任務二和任務三的結果
CompletableFuture<String> task4 = CompletableFuture.allOf(task2, task3)
.handleAsync((res, throwable) -> {
try {
// 這里不需要顯式等待,因為 allOf 已經保證了它們完成
return "Task 4 result with " + task2.get() + " and " + task3.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, dtpExecutor);
// 獲取任務四的結果并打印
String finalResult = task4.join();
System.out.println(finalResult);
}
小結
日常項目開發中,一定會使用到線程池,而動態線程池具備可配置、可觀測、可告警等功能是項目開發的首選。但在使用動態線程池時就會有任務執行順序的問題,此時就可以借助 CompletableFuture 一起執行來保證程序執行的正確性。
本文已收錄到我的面試小站 www.javacn.site,其中包含的內容有:場景題、SpringAI、SpringAIAlibaba、并發編程、MySQL、Redis、Spring、Spring MVC、Spring Boot、Spring Cloud、MyBatis、JVM、設計模式、消息隊列、Dify、Coze、AI常見面試題等。

浙公網安備 33010602011771號