Java 線程池中的線程復(fù)用是如何實(shí)現(xiàn)的?
前幾天,技術(shù)群里有個(gè)群友問(wèn)了一個(gè)關(guān)于線程池的問(wèn)題,內(nèi)容如圖所示:

關(guān)于線程池相關(guān)知識(shí)可以先看下這篇:為什么阿里巴巴Java開發(fā)手冊(cè)中強(qiáng)制要求線程池不允許使用Executors創(chuàng)建?
那么就來(lái)和大家探討下這個(gè)問(wèn)題,在線程池中,線程會(huì)從 workQueue 中讀取任務(wù)來(lái)執(zhí)行,最小的執(zhí)行單位就是 Worker,Worker 實(shí)現(xiàn)了 Runnable 接口,重寫了 run 方法,這個(gè) run 方法是讓每個(gè)線程去執(zhí)行一個(gè)循環(huán),在這個(gè)循環(huán)代碼中,去判斷是否有任務(wù)待執(zhí)行,若有則直接去執(zhí)行這個(gè)任務(wù),因此線程數(shù)不會(huì)增加。
如下是線程池創(chuàng)建線程的整體流程圖:

首先會(huì)判斷線程池的狀態(tài),也就是是否在運(yùn)行,若線程為非運(yùn)行狀態(tài),則會(huì)拒絕。接下來(lái)會(huì)判斷線程數(shù)是否小于核心線程數(shù),若小于核心線程數(shù),會(huì)新建工作線程并執(zhí)行任務(wù),隨著任務(wù)的增多,線程數(shù)會(huì)慢慢增加至核心線程數(shù),如果此時(shí)還有任務(wù)提交,就會(huì)判斷阻塞隊(duì)列 workQueue 是否已滿,若沒(méi)滿,則會(huì)將任務(wù)放入到阻塞隊(duì)列中,等待工作線程獲得并執(zhí)行,如果任務(wù)提交非常多,使得阻塞隊(duì)列達(dá)到上限,會(huì)去判斷線程數(shù)是否小于最大線程數(shù) maximumPoolSize,若小于最大線程數(shù),線程池會(huì)添加工作線程并執(zhí)行任務(wù),如果仍然有大量任務(wù)提交,使得線程數(shù)等于最大線程數(shù),如果此時(shí)還有任務(wù)提交,就會(huì)被拒絕。
現(xiàn)在我們對(duì)這個(gè)流程大致有所了解,那么讓我們?nèi)タ纯丛创a是如何實(shí)現(xiàn)的吧!
線程池的任務(wù)提交從 submit 方法來(lái)說(shuō),submit 方法是 AbstractExecutorService 抽象類定義的,主要做了兩件事情:
- 把 Runnable 和 Callable 都轉(zhuǎn)化成 FutureTask
- 使用 execute 方法執(zhí)行 FutureTask
execute 方法是 ThreadPoolExecutor 中的方法,源碼如下:
public void execute(Runnable command) {
// 若任務(wù)為空,則拋 NPE,不能執(zhí)行空任務(wù)
if (command == null) {
throw new NullPointerException();
}
int c = ctl.get();
// 若工作線程數(shù)小于核心線程數(shù),則創(chuàng)建新的線程,并把當(dāng)前任務(wù) command 作為這個(gè)線程的第一個(gè)任務(wù)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) {
return;
}
c = ctl.get();
}
/**
* 至此,有以下兩種情況:
* 1.當(dāng)前工作線程數(shù)大于等于核心線程數(shù)
* 2.新建線程失敗
* 此時(shí)會(huì)嘗試將任務(wù)添加到阻塞隊(duì)列 workQueue
*/
// 若線程池處于 RUNNING 狀態(tài),將任務(wù)添加到阻塞隊(duì)列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
// 再次檢查線程池標(biāo)記
int recheck = ctl.get();
// 如果線程池已不處于 RUNNING 狀態(tài),那么移除已入隊(duì)的任務(wù),并且執(zhí)行拒絕策略
if (!isRunning(recheck) && remove(command)) {
// 任務(wù)添加到阻塞隊(duì)列失敗,執(zhí)行拒絕策略
reject(command);
}
// 如果線程池還是 RUNNING 的,并且線程數(shù)為 0,那么開啟新的線程
else if (workerCountOf(recheck) == 0) {
addWorker(null, false);
}
}
/**
* 至此,有以下兩種情況:
* 1.線程池處于非運(yùn)行狀態(tài),線程池不再接受新的線程
* 2.線程處于運(yùn)行狀態(tài),但是阻塞隊(duì)列已滿,無(wú)法加入到阻塞隊(duì)列
* 此時(shí)會(huì)嘗試以最大線程數(shù)為界創(chuàng)建新的工作線程
*/
else if (!addWorker(command, false)) {
// 任務(wù)進(jìn)入線程池失敗,執(zhí)行拒絕策略
reject(command);
}
}
可以看到 execute 方法中的的核心方法為 addWorker,再去看 addWorker 方法之前,先看下 Worker 的初始化方法:
Worker(Runnable firstTask) {
// 每個(gè)任務(wù)的鎖狀態(tài)初始化為-1,這樣工作線程在運(yùn)行之前禁止中斷
setState(-1);
this.firstTask = firstTask;
// 把 Worker 作為 thread 運(yùn)行的任務(wù)
this.thread = getThreadFactory().newThread(this);
}
在 Worker 初始化時(shí)把當(dāng)前 Worker 作為線程的構(gòu)造器入?yún)ⅲ酉聛?lái)從 addWorker 方法中可以找到如下代碼:
final Thread t = w.thread;
// 如果成功添加了 Worker,就可以啟動(dòng) Worker 了
if (workerAdded) {
t.start();
workerStarted = true;
}
這塊代碼是添加 worker 成功,調(diào)用 start 方法啟動(dòng)線程,Thread t = w.thread; 此時(shí)的 w 是 Worker 的引用,那么t.start();實(shí)際上執(zhí)行的就是 Worker 的 run 方法。
Worker 的 run 方法中調(diào)用了 runWorker 方法,簡(jiǎn)化后的 runWorker 源碼如下:
final void runWorker(Worker w) {
Runnable task = w.firstTask;
while (task != null || (task = getTask()) != null) {
try {
task.run();
} finally {
task = null;
}
}
}
這個(gè) while 循環(huán)有個(gè) getTask 方法,getTask 的主要作用是阻塞從隊(duì)列中拿任務(wù)出來(lái),如果隊(duì)列中有任務(wù),那么就可以拿出來(lái)執(zhí)行,如果隊(duì)列中沒(méi)有任務(wù),這個(gè)線程會(huì)一直阻塞到有任務(wù)為止(或者超時(shí)阻塞),其中 getTask 方法的時(shí)序圖如下:

其中線程復(fù)用的關(guān)鍵是 1.6 和 1.7 部分,這部分源碼如下:
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
使用隊(duì)列的 poll 或 take 方法從隊(duì)列中拿數(shù)據(jù),根據(jù)隊(duì)列的特性,隊(duì)列中有任務(wù)可以返回,隊(duì)列中無(wú)任務(wù)會(huì)阻塞。
線程池的線程復(fù)用就是通過(guò)取 Worker 的 firstTask 或者通過(guò) getTask 方法從 workQueue 中不停地取任務(wù),并直接調(diào)用 Runnable 的 run 方法來(lái)執(zhí)行任務(wù),這樣就保證了每個(gè)線程都始終在一個(gè)循環(huán)中,反復(fù)獲取任務(wù),然后執(zhí)行任務(wù),從而實(shí)現(xiàn)了線程的復(fù)用。
總結(jié)
本文主要從源碼的角度解析了 Java 線程池中的線程復(fù)用是如何實(shí)現(xiàn)的。歡迎大家留言交流討論。
最好的關(guān)系就是互相成就,大家的在看、轉(zhuǎn)發(fā)、留言三連就是我創(chuàng)作的最大動(dòng)力。
更詳細(xì)的源碼解析可以點(diǎn)擊鏈接查看:https://github.com/wupeixuan/JDKSourceCode1.8
參考
https://github.com/wupeixuan/JDKSourceCode1.8
面試官系統(tǒng)精講Java源碼及大廠真題
Java并發(fā)編程學(xué)習(xí)寶典
Java 并發(fā)面試 78 講

浙公網(wǎng)安備 33010602011771號(hào)