使用線程池你應該知道的知識點
多線程編程是每一個開發(fā)必知必會的技能,在實際項目中,為了避免頻繁創(chuàng)建和銷毀線程,我們通常使用池化的思想,用線程池進行多線程開發(fā)。
線程池在開發(fā)中使用頻率非常高,也包含不少知識點,是一個高頻面試題,本篇總結線程池的使用經(jīng)驗和需要注意的問題,更好的應對日常開發(fā)和面試。
如有更多知識點,歡迎補充~
異常處理
正如我們在異常處理機制所講,如果你沒有對提交給線程池的任務進行異常捕獲,那么異常信息將會丟失,不利于問題排查。
通常異常處理要么是手動處理掉,要么是往上拋由全局異常處理器統(tǒng)一處理,切勿吃掉異常。
在實際開發(fā)中,我們可以使用裝飾器模式對TheradPoolExecutor進行封裝,重寫它的execute和submit方法,進行try-catch處理,打印日志,防止開發(fā)同學直接使用ThreadPoolExecutor提交任務而漏了異常處理。
traceid
完整的日志鏈路對日志分析,問題排查是至關重要的,否則拿到一堆日志沒有關聯(lián)性,根本無從下手。
一個完整的請求可能經(jīng)過很多個方法調用,服務調用,mq消息發(fā)送等,要串聯(lián)起來需要一個全局id,稱為traceid。
例如我們使用spring cloud sleuth鏈路跟蹤,它就會在上下文(MDC)塞一個traceid,并不斷傳遞下去。
很遺憾,如果你在請求過程使用線程池(直接new ThreadPoolExecutor),那么traceid將會丟失,例如你會看到如下日志:

很明顯,線程池里打印的日志跟外面的關聯(lián)不起來了,這會影響我們分析排查問題。
解決方案,可以使用spring提供的ThreadPoolTaskExecutor,它內部也包裝了ThreadPoolExecutor,提供更多功能。將其注冊到spring容器中,使用spring cloud sleuth時,它會判斷如果實現(xiàn)了ExecutorService接口的bean,就會進行動態(tài)代理為TraceableExecutorService,它會將當前上下文的traceid傳遞給線程池的線程,那么就可以關聯(lián)起來了。如:

當然你也可以像前面說的,封裝自己的ThreadPoolExecutor注冊到spring容器,也一樣會被代理。
關于traceid我們還在xxl這邊有寫到,可以參考給xxl新增traceId和spanId。
ThreadLocal
ThreadLocal是線程內一塊數(shù)據(jù)結構(Thread類內有一個ThreadLocal.ThreadLocalMap),線程間互不干擾,沒有并發(fā)問題。上面我們提到使用MDC可以在各個位置打印traceid,實際就是利用了ThreadLocal,如使用logback:

ThreadLocal在線程內傳遞數(shù)據(jù)是沒有問題的,但涉及到子線程怎么辦呢?這個時候就無法傳遞過去了,不過Thread類內還有一個ThreadLocal.ThreadLocalMap inheritableThreadLocals,當創(chuàng)建子線程的時候會把父線程的ThreadLocal“繼承”過來。

但使用線程池場景又不太一樣了,因為線程池里的線程是只創(chuàng)建一次,后續(xù)復用的,而前面說的“繼承”是創(chuàng)建時一次性傳遞過來,后續(xù)就不會更改,很明顯不符合線程池的場景,使用線程池時希望線程每次都從父線程獲取最新的ThreadLocal。
解決方案,可以使用阿里的transmittable-thread-local,它的原理也不復雜,就是在每次提交任務給線程池的時候,拷貝ThreadLocal。 關于ThreadLocal我們之前有過介紹,有興趣可以看下面試再也不怕問ThreadLocal了,ThreadLocal擴展。
核心參數(shù)
這是入門級八股文了吧,基本爛到面試官都不問了,但有一些點我仍想發(fā)掘一下亮點。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
如上是參數(shù)最全的構造方法,參數(shù)解釋:
- corePoolSize
核心線程數(shù),默認是不初始化創(chuàng)建核心線程,也不回收。可以通過prestartCoreThread/prestartAllCoreThreads方法對線程池進行預熱,也可以通過allowCoreThreadTimeOut方法對核心線程進行回收。 - maximumPoolSize
最大線程數(shù),當核心線程開到最大,且任務隊列也滿了,還有任務提交,就會繼續(xù)開線程到直到最大線程數(shù)。 - keepAliveTime
當線程數(shù)大于核心線程數(shù),線程最大空閑時間,超過就會被銷毀回收。 - unit
keepAliveTime的時間單位。 - workQueue
隊列,核心線程滿了,任務會暫存到隊列,等待執(zhí)行。 - threadFactory
線程工廠,默認是Executors.defaultThreadFactory(),線程名稱由:pool-數(shù)字-thread 組成。 - RejectedExecutionHandler
拒絕策略,當隊列滿了,最大線程數(shù)也滿了,還提交任務就會觸發(fā)拒絕策略,jdk提供了4種拒絕策略,默認是AbortPolicy,也可以自定義拒絕策略。
![]()
需要提到的點是:
- 根據(jù)業(yè)務定義不同的線程池
有的人喜歡定義一個所謂“通用”的線程池來處理各種業(yè)務,這種做法不好,每種業(yè)務的核心參數(shù)需求不一樣,且會相互競爭資源,正確的做法應該是每種業(yè)務定義一個線程池。 - 有意義的線程名稱
從上面可以看到默認的線程名稱不是特別友好,我們可以根據(jù)業(yè)務取一個有意義的名稱。這里我用到guava里的ThreadFactoryBuilder,例如:
new ThreadFactoryBuilder()
.setNameFormat(“taskDispatchPool” + "-%d")
.build();
- 合適的隊列長度
過長的隊列長度可能導致應用OOM,實際要根據(jù)具體情況指定,不宜過大,禁止使用無界隊列。
jdk的Executors輔助類創(chuàng)建的線程池隊列長度很多都是無界的,稍有不慎就會導致內存溢出,這也是為什么阿里java開發(fā)規(guī)范明確禁止使用Executors創(chuàng)建線程池的原因。 - 與tomcat/hystirx線程池的區(qū)別
jdk的線程池是在核心線程滿了,任務先進入隊列,隊列滿了再繼續(xù)創(chuàng)建線程到最大線程數(shù)。
而tomcat/hystrix的線程池策略是,核心線程滿了,就繼續(xù)創(chuàng)建線程到最大線程數(shù),再有任務就進入隊列。 - 設置合適的核心參數(shù)
一般任務可以分為cpu密集型或io密集型,對于cpu密集型比較容易設置,一般設置為cpu核數(shù)即可,不宜過大,因為cpu密集型線程過大會有大量的線程切換,反而降低性能。
io密集型就不好估算了,而且大部分情況下都是io密集型,沒有萬能公式,只能根據(jù)經(jīng)驗,測試,生產運行觀察,調整,才能達到一個比較合適的值,這就要求我們的線程池要支持動態(tài)調整參數(shù),下面還會說到。
如果跟面試官提這些點,說明你不是單純背八股文,是有真的在思考總結~
動態(tài)線程池
上面我們說到線程池的核心參數(shù)(主要是線程數(shù)和隊列長度)不太好估算,設置過小可能任務處理不過來導致阻塞,設置過大可能影響整體服務或影響下游服務,所以生產環(huán)境的線程池要支持動態(tài)調整。幸運的是ThreadPoolExecutor提供了方法可以直接對核心參數(shù)進行修改,例如setCorePoolSize,setMaximumPoolSize,我們可以在運行過程進行設置,線程池內部就會根據(jù)參數(shù)調整線程了。
筆者所在的項目一份代碼會部署在各個國家(環(huán)境),每個環(huán)境的業(yè)務,數(shù)據(jù)量不一樣,機器配置也不一樣,所以需要根據(jù)環(huán)境設置不同的參數(shù)。在實際運行過程中,有時候為了提升處理效率設置比較大的線程數(shù),而忽略對下游服務的影響,導致下游服務被壓垮。也有時候開始估算太小,后面業(yè)務增長,導致處理不過來的情況。所以需要動態(tài)調整,我們把線程池參數(shù)配置接入到apollo,隨時可以調整生效,同時我們也把線程池的運行情況上報到grafana,可以進行監(jiān)控,告警。
關于動態(tài)線程池,之前也寫過,可以參考:
加強版ThreadPoolExecutor升級
Java線程池實現(xiàn)原理及其在美團業(yè)務中的實踐
hippo4j
如何設置線程池參數(shù)?美團給出了一個讓面試官虎軀一震的回答
任務統(tǒng)計
有時候我們需要對提交給線程池的任務進行統(tǒng)計,例如本次執(zhí)行成功多少,失敗多少,過濾多少。線程池就沒有提供這種實現(xiàn)了,因為它是一直運行的狀態(tài),區(qū)分不了業(yè)務上的東西,只能簡單獲取總體完成成功次數(shù)(getCompletedTaskCount),或觸發(fā)拒絕策略次數(shù)(getRejectCount)。
業(yè)務上的統(tǒng)計我們就可以結合CountDownLatch來進行計數(shù),主線程提交完要進行await等待線程池所有任務完成,每個線程完成一次任務就countDown一次,任務計數(shù)可以使用LongAdder統(tǒng)計,相比AtomicLong更加高效。這也回應了我們上面說要根據(jù)業(yè)務定義不同的線程池的原因,不同類型的統(tǒng)計不會相互影響。
后來我們有個同學提醒jdk還有個Phaser類,比CountDownLatch好用,也可以用它來完成,具體參見:并發(fā)工具類Phaser
默認線程池
你是否在你的團隊見過如下代碼:
@Test
public void test4() throws InterruptedException {
List<String> list = Lists.newArrayList();
list.parallelStream().forEach(item->{
//run async
});
CompletableFuture.runAsync(()->{
//run async
});
}
這些寫法確實都是異步的,但底層都是用了系統(tǒng)默認的線程池ForkJoinPool.commonPool(),默認線程數(shù)是Runtime.getRuntime().availableProcessors() - 1。
如果是cpu密集型任務,這么使用也沒啥問題。如果是io密集型,就會相互影響。如下,業(yè)務2的任務會卡住,知道業(yè)務1執(zhí)行完成。
//業(yè)務1
for (int i = 0; i < ForkJoinPool.commonPool().getParallelism(); i++) {
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
});
}
//業(yè)務2
for (int i = 0; i < ForkJoinPool.commonPool().getParallelism(); i++) {
CompletableFuture.runAsync(() -> {
System.out.println("running...");
});
}
筆者所在的項目,最開始有的使用上面的寫法,有的使用Executors創(chuàng)建,有的使用new ThreadPoolTaskExecutor,有的使用spring ThreadPoolTaskExecutor...,五花八門,直到后來我們統(tǒng)一使用自己封裝的線程池,這種情況才得以糾正。
父子線程公用一個線程池
這也是一個實際案例,在我們團隊有同學這么使用線程池,邏輯很簡單,想要查一批數(shù)據(jù)的時間和還款概率,實際情況還查了更多信息,做了簡化,這些查詢需要調用外部接口,為了提升接口性能,把這些查詢都丟到線程池里去并發(fā)執(zhí)行,通過Future.get獲取結果。同時主線程希望這兩個操作也可以并發(fā)執(zhí)行,所以通過CompletableFuture也提交到這個線程池里,最終通過CompletableFuture.join等待所有任務完成。
實際代碼比較復雜,簡化后如下:
//查一個時間
CompletableFuture<Void> timeFuture = CompletableFuture.runAsync(() -> {
initTime(pageResult);
}, executor);
//預測還款概率
CompletableFuture<Void> repayDesireFuture = CompletableFuture.runAsync(() -> {
initRepayDesire(pageResult);
}, executor);
//主線程等待
CompletableFuture.allOf(timeFuture, repayDesireFuture).join();
private void initTime(List<JobListVo> data) {
List<List<JobListVo>> lists = Lists.partition(data, CommonConst.PAGE_SIZE_50);
List<Future<Result<List<TimeInfo>>>> futures = new ArrayList<>();
lists.forEach(item -> futures.add(executor.submit(() -> client.getTimeList(ids))));
futures.forEach(
item -> {
try {
Result<List<TimeInfo>> result = item.get();
} catch (Exception e) {
log.error("fetch error", e);
}
}
);
}
private void initRepayDesire(List<JobListVo> data) {
List<List<JobListVo>> lists = Lists.partition(data, CommonConst.PAGE_SIZE_50);
List<Future<Result<List<RepayDesire>>>> futures = new ArrayList<>();
lists.forEach(item -> futures.add(executor.submit(() -> client.queryRepayDesire(ids))));
futures.forEach(
item -> {
try {
Result<List<RepayDesire>> result = item.get();
} catch (Exception e) {
log.error("fetch error", e);
}
}
);
}
看到這段代碼,可能有的人會說主線程那兩個操作并發(fā)的意義不大,串行執(zhí)行即可。但實際分析還是有意義的,假如只有一個請求,查詢時間的任務提交到線程池后,線程池資源還有剩余,這個時候并發(fā)執(zhí)行還款概率的任務,是可以加速整個查詢速度的。
筆者在review這段代碼的時候,感覺總是怪怪的,但邏輯上又好像說得通,直到我看到這邊文章線程池遇到父子任務,有大坑,要注意!,這不就跟我們的場景幾乎一樣嗎,確實可能會有問題。
根據(jù)文章所述,父子任務提交到同一個線程池可能導致父子任務相互等待,最終卡死。極端一點,假設線程池核心線程數(shù)是1,隊列長度是1,現(xiàn)在主線程提交了一個任務,使用了這個核心線程,開始執(zhí)行,并等待子線程執(zhí)行完成。它的執(zhí)行邏輯是在子線程內再提交一個任務給線程池,由于只有一個核心線程,所以這個任務進入隊列等待。等到什么時候呢,等到主線程那個任務完成線程才能釋放,主線程又什么時候完成呢,等待子線程執(zhí)行完成,死循環(huán)了...
總結:父子任務,不要公用一個線程池。
shutdown/shutdownNow
這兩個方法都是關閉線程池的,區(qū)別是shutdown是不再接收新任務,但提交的任務還會執(zhí)行,而shutdownNow除了不再接收新任務,已提交的任務也不會執(zhí)行,正在執(zhí)行中的任務會終止。
一般在線程池不再使用或應用下線前,就會調用線程關閉方法。如果關閉后再提交任務,就會觸發(fā)拒絕策略。
最好確保在關閉后不再有任務提交給線程池,否則可能會有問題,筆者之前就遇到線程池關閉后還有請求(服務下線前)進來,導致報TimeoutException錯誤,具體分析在這篇/線程池shutdown引發(fā)TimeoutException,有興趣的可以看下。
內存泄漏
將線程池變量定義為局部變量時,可能會發(fā)生內存泄漏。如下:
@GetMapping(value = "/test")
public Result<Void> info() {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.submit(() -> System.out.println("active thread"));
return Result.success();
}
在我們印象中,executorService作為一個局部變量,在方法返回時,生命周期就結束了,這個時候應該是可以被gc回收的,怎么會發(fā)生內存泄漏呢?
使用線程池時有點不同,因為這里有個隱含的條件是,雖然方法返回結束了,但線程仍存活這,而存活的線程是可以作為gc root的。
我們知道線程池里的線程會被包裝為Worker對象,這是定義在ThreadPoolExecutor里的非靜態(tài)內部類。如下代碼,Worker也實現(xiàn)了Runnable接口,把它作為參數(shù)傳遞給Thread對象。


活躍的線程作為gc root的,也就是不會被垃圾回收,而這個線程又引用著這個Worker對象,所以它也不會被回收。
那個線程池對象又有什么關系呢?上面說到Worker是作為ThreadPoolExecutor里的非靜態(tài)內部類,非靜態(tài)內部類有一個規(guī)則就是持有外部類的引用,例如我們可以在InnerClass里調用外部類的方法。
或者通過編譯后查看class文件也可以看到InnerClass持有外部類的this引用。
public class OuterClass {
public OuterClass(Runnable runnable) {
}
public void outterFunction() {
}
class InnerClass {
public InnerClass() {
outterFunction();
}
}
}
所以,由于Worker持有ThreadPoolExecutor的引用,所以它也不會被回收,用一張圖表示就是:

解決方案,1.不要使用局部線程池變量,定義為全局變量。2.調用shutdown/shutdownNow關閉線程池,關閉后線程就會被回收,線程池也可以被回收。
附chatgpt:jdk8中,哪些對象可以作為gc root?

虛擬線程
在jdk21以前,我們在java里使用的線程都稱為平臺線程,與內核線程是一對一的關系,開篇就說到,平臺線程的使用成本比較高,所以才使用線程線程池來緩存復用它。

jdk21虛擬線程成為正式功能,可以投入生產使用。java里的虛擬線程類似于goland中的協(xié)程,虛擬線程與內核線程不再是一對一的關系,而是多對一,在jvm層面進行調度,可以大大內核線程的數(shù)量。
如圖,可以看到多個虛擬線程在底層還是公用一個內核線程,它們之間的執(zhí)行調度由jvm自動完成,虛擬線程的創(chuàng)建成本非常低,可以創(chuàng)建的虛擬線程數(shù)量可以遠大于平臺線程。
當我們的程序遇到io的時候,以往的方式是將當前線程掛起,cpu進行線程切換,執(zhí)行另外的任務。使用虛擬線程則不需要,任務的調度執(zhí)行是在jvm層面完成的,cpu還是一直在執(zhí)行同一個線程。

代碼示例:
Thread.ofVirtual().start(()->{});
springboot3.2 tomcat開啟虛擬線程:
spring.threads.virtual.enabled = true
那么使用虛擬線程,還需要線程池嗎?答案是不需要的,我們之所以池化是因為對象的創(chuàng)建、銷毀成本較高,每次使用都創(chuàng)建,用完就丟棄,太浪費了,但虛擬線程的使用成本很低,所以它不需要池化了。
虛擬線程的設計雖然不是為了取代傳統(tǒng)線程和編程方式,可以看到jdk是通過擴展支持虛擬線程的,我們依然可以像以前一樣編程開發(fā),但在高并發(fā)場景虛擬線程是更好的選擇,這也是大勢所趨,說不定以后哪一天線程池也會被無情的標記上@Deprecated。
更多分享,歡迎關注我的github:https://github.com/jmilktea/jtea


浙公網(wǎng)安備 33010602011771號