JAVA JUC干貨之線程池實(shí)現(xiàn)原理和源碼詳解(下)
摘要:分享JAVA JUC線程池干貨,首先描述線程池的基本概念,然后介紹線程工廠和拒絕策略,其次逐步深入線程池實(shí)現(xiàn)原理和線程池狀態(tài)機(jī),最后結(jié)合實(shí)戰(zhàn)講解源碼。
JUC干貨系列目錄:
??我在《JAVA JUC干貨之線程池實(shí)現(xiàn)原理和源碼詳解(上)》中介紹了線程池的基本功能,本文分享線程池實(shí)現(xiàn)原理,并結(jié)合案例詳解線程池源碼。
線程池設(shè)計(jì)思路
??本節(jié)主要參考文章《Java 線程池詳解,圖文并茂,還有誰(shuí)不會(huì)》,感興趣的讀者可以去看看。有句話叫做藝術(shù)來(lái)源于生活,編程語(yǔ)言也是如此,很多設(shè)計(jì)思想能映射到日常生活中,比如封裝、繼承和抽象等等。今天我們要說(shuō)的線程池,它同樣可以在現(xiàn)實(shí)世界找到對(duì)應(yīng)的實(shí)體——工廠。先假想一個(gè)工廠的生產(chǎn)流程:
??假如有一個(gè)工廠,工廠中有固定的一批工人,稱為正式員工,每個(gè)正式員工同一時(shí)刻只能做一個(gè)任務(wù),由這些正式員工協(xié)作完成工廠接收的訂單,每個(gè)訂單拆分為很多子任務(wù)。一般來(lái)說(shuō)工廠完成訂單的效率與工人的數(shù)量成正相關(guān),在訂單量激增時(shí),必定出現(xiàn)正式員工忙不過(guò)來(lái)的場(chǎng)景,工廠只能臨時(shí)將新訂單的生產(chǎn)原料存放在倉(cāng)庫(kù)中,等有空閑的正式員工時(shí)再處理。調(diào)度員負(fù)責(zé)實(shí)時(shí)調(diào)度空閑工人處理倉(cāng)庫(kù)中的生產(chǎn)任務(wù)。倉(cāng)庫(kù)爆倉(cāng)后,訂單還在增加怎么辦?只能臨時(shí)擴(kuò)招一批工人來(lái)應(yīng)對(duì)生產(chǎn)高峰,而這批工人在高峰結(jié)束后,由于訂單增長(zhǎng)的速度放緩慢了,是要辭掉的,所以稱為臨時(shí)工。當(dāng)臨時(shí)工也招滿后(受限于工位數(shù)量有上限),只能忍痛拒絕后面再來(lái)的訂單了。我們做如下一番映射:
| 工廠 | 線程池 |
|---|---|
| 訂單 | 任務(wù) |
| 正式員工 | 核心線程 |
| 臨時(shí)工 | 非核心線程 |
| 倉(cāng)庫(kù) | 阻塞隊(duì)列 |
| 工位數(shù)量 | 最大線程數(shù) |
| 調(diào)度員 | getTask(),將任務(wù)隊(duì)列中的任務(wù)調(diào)度給空閑線程 |
??強(qiáng)調(diào)一下,線程池的工作線程是沒(méi)有屬性記錄是核心線程還是非核心線程的,但根據(jù)核心線程數(shù)和最大線程數(shù)這兩個(gè)屬性歸類,方便大家理解;工作線程無(wú)論什么時(shí)候被創(chuàng)建,都存在被回收的可能,下文將詳細(xì)解釋。分析工廠-線程池映射表后,可以得到如下線程池流程圖,兩者是不是有異曲同工之妙?
??這樣,線程池的工作原理或者說(shuō)流程就很好理解了,在下一章詳細(xì)介紹。
線程池實(shí)現(xiàn)原理
??線程池的執(zhí)行原理是通過(guò)循環(huán)地從任務(wù)隊(duì)列中取出任務(wù),然后將任務(wù)分配給空閑的工作線程執(zhí)行。當(dāng)任務(wù)隊(duì)列為空時(shí),線程池會(huì)進(jìn)入等待狀態(tài),直到有新的任務(wù)到來(lái)。線程池還提供設(shè)置線程數(shù)和任務(wù)隊(duì)列長(zhǎng)度的能力,以控制并發(fā)線程的數(shù)量。
??下面我們進(jìn)入正題,看一下在線程池中一個(gè)任務(wù)從提交到最終執(zhí)行完畢經(jīng)歷了哪些過(guò)程。
??敲黑板劃重點(diǎn),用一句話簡(jiǎn)述ThreadPoolExecutor線程池實(shí)現(xiàn)原理:首先創(chuàng)建核心線程,再把任務(wù)放入阻塞隊(duì)列,其次創(chuàng)建非核心線程,最后拋棄任務(wù)。詳細(xì)流程圖如下:
??從執(zhí)行流程可知,屬性判斷順序如下:corePoolSize -> workQueue -> maxinumPoolSize。線程池執(zhí)行流程歸納為如下:
1.預(yù)熱核心線程:提交新任務(wù)后,如果線程數(shù)沒(méi)有達(dá)到核心線程數(shù) corePoolSize,則創(chuàng)建核心線程執(zhí)行新任務(wù),而且空閑的核心線程處于阻塞狀態(tài)不執(zhí)行新任務(wù)直到隊(duì)列中有任務(wù)。即線程池優(yōu)先填滿corePoolSize個(gè)核心線程,再?gòu)?fù)用核心線程處理任務(wù)。如果線程數(shù)達(dá)到核心線程數(shù),則進(jìn)入下個(gè)流程。
2.任務(wù)入隊(duì):如果線程數(shù)達(dá)到核心線程數(shù),且任務(wù)隊(duì)列未滿,則把新任務(wù)放入阻塞隊(duì)列末尾;這個(gè)場(chǎng)景下,核心線程執(zhí)行完當(dāng)前任務(wù)后自動(dòng)從任務(wù)隊(duì)列中獲取任務(wù)來(lái)執(zhí)行。如果任務(wù)隊(duì)列已滿,則進(jìn)入下個(gè)流程。
3.創(chuàng)建非核心線程:如果任務(wù)隊(duì)列爆滿而且最大線程數(shù) maximumPoolSize > 線程數(shù)poolSize > corePoolSize,則創(chuàng)建新的工作線程(一般視作非核心線程)并立刻執(zhí)行當(dāng)前的新任務(wù);溫馨提示,這個(gè)場(chǎng)景下新任務(wù)早于隊(duì)列中的任務(wù)執(zhí)行。如果線程數(shù)達(dá)到最大線程數(shù),則進(jìn)入執(zhí)行拒絕任務(wù)的流程。
非核心線程數(shù)為 maximumPoolSize - corePoolSize,即為最大線程數(shù)減去核心線程數(shù)。
4.拒絕任務(wù):如果線程數(shù)等于最大線程數(shù)且任務(wù)隊(duì)列已滿,則根據(jù)拒絕策略處理新任務(wù)。
5.復(fù)用線程:線程執(zhí)行完任務(wù)后去檢查任務(wù)隊(duì)列里是否有任務(wù)需要執(zhí)行,若有則馬上執(zhí)行;否則,進(jìn)入阻塞狀態(tài)。
6.銷毀線程:線程如果無(wú)事可做的時(shí)間超過(guò)存活時(shí)間keepAliveTime,那么就會(huì)被回收,以便減少內(nèi)存占用和資源消耗,實(shí)現(xiàn)對(duì)系統(tǒng)資源的調(diào)優(yōu)。如果調(diào)用了allowCoreThreadTimeOut(true)方法,則會(huì)給核心線程數(shù)設(shè)置存活時(shí)間,使得超過(guò)keepAliveTime的核心線程也會(huì)被銷毀,從而最終有可能導(dǎo)致線程池中的線程數(shù)為0。
??通常情況下,隨著阻塞隊(duì)列中任務(wù)數(shù)的減少,線程數(shù)最終會(huì)收縮到 corePoolSize 的大小,故線程數(shù)維持在corePoolSize和maximumPoolSize之間。這就是線程池動(dòng)態(tài)調(diào)整線程數(shù)的過(guò)程。一旦線程數(shù)降低到核心線程數(shù),就會(huì)暫停回收工作,這是為什么呢?因?yàn)榫€程池為了保證穩(wěn)定性,必須預(yù)留一定規(guī)模的“即戰(zhàn)力”。
??有些人認(rèn)為線程池會(huì)把每個(gè)工作線程標(biāo)記為核心線程或者非核心線程,然后空閑時(shí)定向回收非核心的工作線程。事實(shí)不是這樣的,線程沒(méi)有核心線程或者非核心線程的標(biāo)記,在線程池銷毀線程時(shí)對(duì)所有線程一視同仁,即無(wú)論哪個(gè)工作線程,只要同時(shí)符合如下兩個(gè)條件,都存在被銷毀的可能性:
+ 總線程數(shù)超過(guò)corePoolSize(工人多了,這是大前提)
+ 線程不處理任務(wù)的時(shí)間超過(guò)存活時(shí)間keepAliveTime(keepAliveTime時(shí)間內(nèi)都沒(méi)有產(chǎn)出)
??通俗地說(shuō),線程池“降本”時(shí)銷毀的一定是在【創(chuàng)建非核心線程】階段“新建”的第二批工作線程嗎?事實(shí)并非如此,線程池展現(xiàn)出了狼性般的敏銳與果斷,“裁員”的底層邏輯是只看上述兩條鐵律,誰(shuí)達(dá)標(biāo)就優(yōu)化誰(shuí)。
??乍一看這種實(shí)現(xiàn)原理似乎有點(diǎn)丈二和尚摸不著頭腦,但“為了快、想追求經(jīng)濟(jì)、最后發(fā)現(xiàn)還是要快”三個(gè)階段的構(gòu)思調(diào)整在我看來(lái)恰恰是一種鬼斧神工般的技藝,頗有點(diǎn)博弈的氣概:初次接業(yè)務(wù),果斷招聘;業(yè)務(wù)繁忙時(shí),如果隊(duì)列能頂住,勤儉持家點(diǎn)不好嗎?如果隊(duì)列被打爆頂不住了,說(shuō)明真到拼刺刀的時(shí)刻了,已顧不得太多,把全部家底都拿出來(lái)唄!
??這一系列轉(zhuǎn)變的底層支撐正是線程池的“臨時(shí)工機(jī)制”:業(yè)務(wù)太多忙不過(guò)來(lái)時(shí),就新增線程提效,等高峰過(guò)去以后,再銷毀線程降本。
??這里有個(gè)細(xì)節(jié)值得大家品味,在線程池里的線程數(shù)小于核心線程數(shù)的前提下提交任務(wù)的時(shí)候,雖然核心線程因任務(wù)隊(duì)列空空如也而可能處于閑置狀態(tài),但是依然會(huì)繼續(xù)創(chuàng)建核心線程,而非復(fù)用已有的、空閑的核心線程。
ThreadPoolExecutor 示例
??下面這個(gè)使用execute提交任務(wù)的示例用于演示線程池執(zhí)行流程:
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author 樓蘭胡楊
* @Date 2025-05-16
* @Description: 通過(guò)調(diào)整for循環(huán)次數(shù)驗(yàn)證線程池執(zhí)行機(jī)制
*/
public class ThreadPoolTest {
public static ThreadPoolExecutor executor;
public static void main(String[] args) {
ThreadFactory guavaFactory = new ThreadFactoryBuilder().setNameFormat("pool-Wiener-%d").build();
// 自定義線程工廠
ThreadFactory jucFactory = new ThreadFactory() {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Wiener-" + mThreadNum.getAndIncrement());
}
};
RejectionImpl rejection = new RejectionImpl();
executor = new ThreadPoolExecutor(2, 5, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10), guavaFactory, rejection);
for (int i = 0; i < 25; i++) {
MyPoolTask myTask = new MyPoolTask(i);
executor.execute(myTask);
}
executor.shutdown();
}
}
class MyPoolTask implements Runnable {
private int taskNum;
ThreadPoolExecutor myExe = ThreadPoolTest.executor;
public MyPoolTask(int num) {
this.taskNum = num;
}
@Override
public void run() {
// System.out.println("正在執(zhí)行task " + taskNum);
System.out.println(Thread.currentThread().getName() + ",活躍線程數(shù)目:" + myExe.getPoolSize()
+ ",隊(duì)列長(zhǎng)度:" +
myExe.getQueue().size()
+ ",當(dāng)前任務(wù)是" + taskNum + ",已執(zhí)行任務(wù)數(shù)目:" + myExe.getCompletedTaskCount());
try {
Thread.currentThread().sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("執(zhí)行完畢task " + taskNum);
}
@Override
public String toString() {
return taskNum + "";
}
}
class RejectionImpl implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("被拒絕任務(wù)是 " + r.toString());
}
}
??執(zhí)行結(jié)果如下:
pool-Wiener-1,活躍線程數(shù)目:3,隊(duì)列長(zhǎng)度:10,當(dāng)前任務(wù)是1,已執(zhí)行任務(wù)數(shù)目:0
pool-Wiener-0,活躍線程數(shù)目:2,隊(duì)列長(zhǎng)度:9,當(dāng)前任務(wù)是0,已執(zhí)行任務(wù)數(shù)目:0
pool-Wiener-2,活躍線程數(shù)目:3,隊(duì)列長(zhǎng)度:10,當(dāng)前任務(wù)是12,已執(zhí)行任務(wù)數(shù)目:0
pool-Wiener-4,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:10,當(dāng)前任務(wù)是14,已執(zhí)行任務(wù)數(shù)目:0
pool-Wiener-3,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:10,當(dāng)前任務(wù)是13,已執(zhí)行任務(wù)數(shù)目:0
被拒絕任務(wù)是 15
被拒絕任務(wù)是 16
被拒絕任務(wù)是 17
被拒絕任務(wù)是 18
被拒絕任務(wù)是 19
被拒絕任務(wù)是 20
被拒絕任務(wù)是 21
被拒絕任務(wù)是 22
被拒絕任務(wù)是 23
被拒絕任務(wù)是 24
執(zhí)行完畢task 0
執(zhí)行完畢task 14
pool-Wiener-0,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:9,當(dāng)前任務(wù)是2,已執(zhí)行任務(wù)數(shù)目:1
執(zhí)行完畢task 1
pool-Wiener-4,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:8,當(dāng)前任務(wù)是3,已執(zhí)行任務(wù)數(shù)目:2
執(zhí)行完畢task 12
pool-Wiener-2,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:6,當(dāng)前任務(wù)是5,已執(zhí)行任務(wù)數(shù)目:4
pool-Wiener-1,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:7,當(dāng)前任務(wù)是4,已執(zhí)行任務(wù)數(shù)目:3
執(zhí)行完畢task 13
pool-Wiener-3,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:5,當(dāng)前任務(wù)是6,已執(zhí)行任務(wù)數(shù)目:5
執(zhí)行完畢task 3
pool-Wiener-4,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:4,當(dāng)前任務(wù)是7,已執(zhí)行任務(wù)數(shù)目:6
執(zhí)行完畢task 2
執(zhí)行完畢task 4
pool-Wiener-0,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:3,當(dāng)前任務(wù)是8,已執(zhí)行任務(wù)數(shù)目:7
執(zhí)行完畢task 6
pool-Wiener-1,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:2,當(dāng)前任務(wù)是9,已執(zhí)行任務(wù)數(shù)目:8
執(zhí)行完畢task 5
pool-Wiener-3,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:1,當(dāng)前任務(wù)是10,已執(zhí)行任務(wù)數(shù)目:9
pool-Wiener-2,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:0,當(dāng)前任務(wù)是11,已執(zhí)行任務(wù)數(shù)目:10
執(zhí)行完畢task 9
執(zhí)行完畢task 8
執(zhí)行完畢task 10
執(zhí)行完畢task 11
執(zhí)行完畢task 7
??從執(zhí)行結(jié)果日志可以看出,當(dāng)線程池中線程的數(shù)目大于5時(shí),便將任務(wù)放入任務(wù)緩存隊(duì)列里面,當(dāng)任務(wù)緩存隊(duì)列滿了之后,便創(chuàng)建新的線程。如果上面程序for循環(huán)中,把創(chuàng)建的任務(wù)數(shù)從25個(gè)降低到15個(gè),就不會(huì)拋出任務(wù)被拒絕的異常了。
ThreadPoolExecutor 源碼詳解
??程序猿為什么需要讀源碼?閱讀源碼是程序猿成長(zhǎng)過(guò)程中非常重要的一個(gè)環(huán)節(jié)。這就像學(xué)習(xí)語(yǔ)言時(shí)需要閱讀經(jīng)典文學(xué)作品一樣,通過(guò)閱讀源碼,程序員可以學(xué)到很多優(yōu)秀的編程技巧和最佳實(shí)踐;可以了解多樣化的技術(shù)選型方案,培養(yǎng)架構(gòu)設(shè)計(jì)敏感度,為技術(shù)決策提供參考依據(jù)。具體來(lái)說(shuō),讀源碼有以下幾個(gè)好處:
??深入了解工具庫(kù)或框架:當(dāng)你使用第三方庫(kù)或者框架時(shí),深入其內(nèi)部結(jié)構(gòu)有助于更好地掌握它們的功能以及局限性,從而更高效地運(yùn)用到自己的項(xiàng)目中去。
??增強(qiáng)業(yè)務(wù)能力和團(tuán)隊(duì)協(xié)作能力:閱讀和理解其他同事敲的代碼可以使人深度認(rèn)識(shí)項(xiàng)目,精準(zhǔn)了解業(yè)務(wù)邏輯,還能促進(jìn)成員間的交流與合作。
??面試準(zhǔn)備:在求職過(guò)程中,了解常見(jiàn)開(kāi)源框架的源碼實(shí)現(xiàn)可以幫助你在面試環(huán)節(jié)中脫穎而出,因?yàn)樵S多公司會(huì)考察候選人對(duì)這些技術(shù)細(xì)節(jié)的理解程度。
??拓展技術(shù)視野:接觸不同項(xiàng)目的源碼(如微服務(wù)框架、分布式系統(tǒng))可了解多樣化的技術(shù)選型方案,培養(yǎng)架構(gòu)設(shè)計(jì)敏感度,為技術(shù)決策提供參考依據(jù)。
??總之,對(duì)于任何有志于提升自身技術(shù)水平的程序員而言,定期花時(shí)間去研究高質(zhì)量的源碼是一項(xiàng)不可或缺的學(xué)習(xí)活動(dòng)。你對(duì)哪些源碼感興趣?可以評(píng)論區(qū)留言,一起品嘗。下面淺談我對(duì) ThreadPoolExecutor 源碼的理解,不當(dāng)之處還請(qǐng)?jiān)谠u(píng)論區(qū)留言,一起翻越ThreadPoolExecutor這座山。
execute提交任務(wù)源碼分析
??在ThreadPoolExecutor類中,最核心的任務(wù)提交方法是execute(Runnable command)方法,雖然通過(guò)submit也可以提交任務(wù),但是submit方法底層實(shí)現(xiàn)中最終調(diào)用的還是execute,所以我們只需要分析execute的源碼即可。在JDK 21中,execute(Runnable command)的源代碼如下所示,已經(jīng)添加中文注釋,同時(shí)保留了原汁原味的英文注釋:
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@link RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
// 判斷提交的任務(wù)是否為null
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps(執(zhí)行過(guò)程可以分為如下三個(gè)步驟):
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 可以將 ctl 理解成保存了線程數(shù)和運(yùn)行狀態(tài)等信息的變量
int c = ctl.get();
// 判斷工作線程總數(shù)是否小于核心線程數(shù),小于的時(shí)候可以添加核心線程,并且將當(dāng)前任務(wù)作為此線程執(zhí)行的第一個(gè)任務(wù)
if (workerCountOf(c) < corePoolSize) {
//新增的核心線程成功執(zhí)行任務(wù),流程結(jié)束;addWorker第二個(gè)參數(shù)為true,表示根據(jù)核心線程數(shù)判斷線程數(shù)量
if (addWorker(command, true))
return;
// addWorker執(zhí)行報(bào)錯(cuò),需要再次檢查變量值
c = ctl.get();
}
//判斷線程池的狀態(tài)是否正常而且任務(wù)放入任務(wù)隊(duì)列是否正常
if (isRunning(c) && workQueue.offer(command)) {
// 同理,為了防止向任務(wù)隊(duì)列中又提交新的任務(wù)造成錯(cuò)誤,再次更新變量值
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//當(dāng)前線程池處于非運(yùn)行狀態(tài)且將剛添加的任務(wù)成功從任務(wù)隊(duì)列移除,執(zhí)行拒絕策略
reject(command);
else if (workerCountOf(recheck) == 0)
// 線程數(shù)為0,第一個(gè)參數(shù)為null,表示創(chuàng)建線程但不啟動(dòng)
addWorker(null, false);
//線程數(shù)超過(guò)核心線程數(shù)而且隊(duì)列已滿,需要新增非核心線程
} else if (!addWorker(command, false))
//工作線程已經(jīng)達(dá)到了最大線程數(shù)閾值,或者是調(diào)用了關(guān)閉線程池的方法,觸發(fā)拒絕策略
reject(command);
}
??假設(shè)線程數(shù)達(dá)到核心線程數(shù)且隊(duì)列未滿,這時(shí)新提交的任務(wù)可以直接復(fù)用空閑的核心線程嗎?從源碼來(lái)看,新提交的任務(wù)需要放入任務(wù)隊(duì)列,空閑的核心線程是從任務(wù)隊(duì)列拿任務(wù)。
??線程池的本質(zhì)是對(duì)任務(wù)和線程的管理,而做到這一點(diǎn)最關(guān)鍵的思想就是借助生產(chǎn)者消費(fèi)者模式將任務(wù)和線程兩者解耦,不讓兩者直接綁定,方便做后續(xù)的工作分配。在隊(duì)列為空時(shí),線程會(huì)等待任務(wù)進(jìn)入隊(duì)列;當(dāng)隊(duì)列不滿時(shí),任務(wù)會(huì)等待線程來(lái)執(zhí)行。
??從上述源碼不難發(fā)現(xiàn),execute 方法多次通過(guò) addWorker 方法來(lái)添加線程處理任務(wù),故我們下面來(lái)閱讀 addWorker 方法的源碼。
addWorker創(chuàng)建線程源碼詳解
/**
* Set containing all worker threads in pool. 線程池中存放線程的集合,維護(hù)一組Worker對(duì)象
* Accessed only when holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<>();
/*
* Methods for creating, running and cleaning up after workers
*/
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
// 外層循環(huán):判斷線程池狀態(tài)
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
// 線程池已經(jīng)被停止或者關(guān)閉,或者隊(duì)列為空,終止流程,返回 false
return false;
// 死循環(huán)
for (;;) {
// 檢查當(dāng)前工作線程數(shù)是否已經(jīng)達(dá)到核心線程數(shù)(core 為 true)或最大線程數(shù)(core 為 false)
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
// 達(dá)到核心線程數(shù)或最大線程數(shù),返回 false
return false;
//通過(guò)cas操作增加線程池的工作線程數(shù)量
if (compareAndIncrementWorkerCount(c))
// 成功原子地增加工作線程數(shù),跳出外層的 for 循環(huán)
break retry;
// 增加線程數(shù)量失敗,再次讀取 ctl 的值
c = ctl.get(); // Re-read ctl
// 如果當(dāng)前運(yùn)行狀態(tài)不等于
if (runStateAtLeast(c, SHUTDOWN))
// 線程池處于關(guān)閉狀態(tài),跳到外層循環(huán)重新執(zhí)行
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
/**
* 線程數(shù)量+1成功的后續(xù)操作:添加新工作線程到工作線程集合,啟動(dòng)工作線程執(zhí)行firstTask
*/
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//添加新的Worker對(duì)象來(lái)處理任務(wù)firstTask
w = new Worker(firstTask);
//獲取 Worker 對(duì)象中的線程。Worker類是任務(wù)線程的包裝類,內(nèi)部封裝了一個(gè)Thread類型的變量
final Thread t = w.thread;
if (t != null) {
// 開(kāi)始加內(nèi)置鎖
final ReentrantLock mainLock = this.mainLock;
// 基于ReentrantLock的同步塊
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 再次檢查狀態(tài),保證線程工廠沒(méi)有失敗或者在獲取鎖之前線程池沒(méi)有被關(guān)閉
int c = ctl.get();
//添加線程的前提條件:①線程處于運(yùn)行狀態(tài),②線程處于shutdown狀態(tài)并且firstTask為null
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
// 檢查線程狀態(tài),如果不是新建狀態(tài)就拋異常
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
//workers是一個(gè)HashSet<Woker> 集合,往里面新增Worker類型的工作線程對(duì)象
workers.add(w);
// 把工作線程添加標(biāo)志置為 true
workerAdded = true;
int s = workers.size();
// largestPoolSize 表示線程池中出現(xiàn)過(guò)的最大線程數(shù)
if (s > largestPoolSize)
// 更新線程池中線程數(shù)最大值
largestPoolSize = s;
}
} finally {
// 釋放鎖
mainLock.unlock();
}
if (workerAdded) {
// 啟動(dòng)新創(chuàng)建的線程,執(zhí)行當(dāng)前任務(wù)firstTask
container.start(t);
// 把線程啟動(dòng)標(biāo)志置為 true
workerStarted = true;
}
}
} finally {
//判斷線程有沒(méi)有啟動(dòng)成功,如果沒(méi)有則調(diào)用addWorkerFailed方法
if (! workerStarted)
addWorkerFailed(w);
}
// 返回線程啟動(dòng)標(biāo)志
return workerStarted;
}
??我們來(lái)看看函數(shù)addWorker(Runnable firstTask, boolean core) 的入?yún)ⅰ?strong>firstTask:添加的新線程需要執(zhí)行的第一任務(wù),執(zhí)行完成之后才能執(zhí)行任務(wù)隊(duì)列中其它任務(wù);如果沒(méi)有任務(wù),則需要設(shè)置為null。core的值如果是true,則使用corePoolSize與線程數(shù)進(jìn)行比較,判斷線程數(shù)是否已經(jīng)達(dá)到核心線程數(shù);否則,使用maximumPoolSize判斷線程數(shù)是否已經(jīng)達(dá)到最大線程數(shù)。
??Worker類繼承了AbstractQueuedSynchronizer,實(shí)現(xiàn)了Runnable接口。線程池中,每一個(gè)線程都被封裝成一個(gè)Worker對(duì)象,ThreadPool維護(hù)的其實(shí)就是存放在變量HashSet
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
/** Thread this worker is running in. Null if factory fails. 處理任務(wù)的線程 */
@SuppressWarnings("serial") // Unlikely to be serializable
final Thread thread;
/** Initial task to run. Possibly null.保存?zhèn)魅氲娜蝿?wù) */
@SuppressWarnings("serial") // Not statically typed as Serializable
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
// TODO: switch to AbstractQueuedLongSynchronizer and move
// completedTasks into the lock word.
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
//設(shè)置AQS的同步狀態(tài)
setState(-1); // inhibit interrupts until runWorker
// 封裝任務(wù)
this.firstTask = firstTask;
// 新增執(zhí)行任務(wù)的線程,this表示線程,故Worker對(duì)象在啟動(dòng)的時(shí)候就會(huì)調(diào)用函數(shù)run
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 使用AQS實(shí)現(xiàn)獨(dú)占鎖,不允許重入
protected boolean tryAcquire(int unused) {
// 使用CAS修改狀態(tài),不允許重入
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
??Worker類中的函數(shù)run()的實(shí)現(xiàn)都在 runWorker() 方法中,故我們看一下runWorker()的源碼。
runWorker執(zhí)行任務(wù)源碼詳解
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();// 獲取當(dāng)前線程
// 用創(chuàng)建線程時(shí)傳入的firstTask初始化任務(wù)
Runnable task = w.firstTask;
w.firstTask = null;// 清空 Worker 對(duì)象的第一個(gè)任務(wù)
// 解鎖,允許中斷
w.unlock(); // allow interrupts
// 標(biāo)記線程是否因?yàn)楫惓M顺鲅h(huán),默認(rèn)為 true
boolean completedAbruptly = true;
try {
//循環(huán)獲取任務(wù),在getTask()返回null時(shí)跳出while循環(huán),且回收線程
// 如果firstTask為null,則使用getTask()從任務(wù)隊(duì)列取任務(wù)
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//當(dāng)前線程被打上中斷標(biāo)志
wt.interrupt();
try {
//執(zhí)行任務(wù)前的鉤子方法,讓繼承類做一些統(tǒng)計(jì)之類的事情
beforeExecute(wt, task);
try {
// 執(zhí)行任務(wù)
task.run();
// 執(zhí)行任務(wù)后的鉤子方法
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
// 把執(zhí)行完的任務(wù)設(shè)置為null,從而觸發(fā)getTask()重新獲取任務(wù);增加任務(wù)數(shù),同時(shí)釋放鎖
task = null;
// 更新已完成的任務(wù)數(shù)
w.completedTasks++;
w.unlock();
}
}
// 標(biāo)記線程正常退出
completedAbruptly = false;
} finally {
// 回收/銷毀線程
processWorkerExit(w, completedAbruptly);
}
}
??只要一直有新增任務(wù)或者能夠循環(huán)地使用getTask獲取任務(wù),線程就永遠(yuǎn)死不了。但是,如果getTask結(jié)果為null,就需要跳出while循環(huán),執(zhí)行processWorkerExit,銷毀線程;如果執(zhí)行processWorkerExit的入?yún)ompletedAbruptly=true,表示線程意外退出,需要減少計(jì)數(shù)。
??線程池的核心功能就是實(shí)現(xiàn)了線程的重復(fù)利用,那么它是怎么實(shí)現(xiàn)線程復(fù)用的呢?在這個(gè)方法中,while (task != null || (task = getTask()) != null)說(shuō)明線程會(huì)無(wú)限循環(huán)地【使用getTask函數(shù)從任務(wù)隊(duì)列中取任務(wù)】、【執(zhí)行任務(wù)】和【清空?qǐng)?zhí)行結(jié)束的任務(wù)】,直到取出的任務(wù)為null,此時(shí)線程池已經(jīng)關(guān)閉或者任務(wù)隊(duì)列為空,這就是線程能夠復(fù)用的主要原因。接下來(lái)就會(huì)跳出 while 循環(huán)進(jìn)入 finally 語(yǔ)句塊執(zhí)行processWorkerExit(),嘗試回收線程。這行while循環(huán)代碼也說(shuō)明如果隊(duì)列中沒(méi)有任務(wù)且核心線程數(shù)小于corePoolSize,則來(lái)任務(wù)時(shí)一定會(huì)創(chuàng)建核心線程,而已經(jīng)創(chuàng)建的核心線程哪怕一直無(wú)所事事也不會(huì)執(zhí)行新任務(wù)。
??到這里就介紹完線程池腳踏實(shí)地干活的Worker類了,簡(jiǎn)單歸納一下:worker對(duì)象同時(shí)封裝了一個(gè)任務(wù)Runnable firstTask和一個(gè)線程final Thread thread,它的創(chuàng)建依賴于線程狀態(tài),任務(wù)的執(zhí)行也是在worker里去處理。這里有個(gè)小技巧值得大家細(xì)細(xì)品味——在函數(shù)runWorker中,會(huì)把執(zhí)行完的任務(wù)及時(shí)設(shè)置為null但保留線程,從而觸發(fā)函數(shù)getTask()重新獲取任務(wù),實(shí)現(xiàn)線程復(fù)用。
processWorkerExit回收線程源碼詳解
/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 將線程引用移出線程池
workers.remove(w);
} finally {
mainLock.unlock();
}
// 回收線程
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
??事實(shí)上,在這個(gè)processWorkerExit函數(shù)中,將線程引用移出線程池線程集合且調(diào)用了tryTerminate()就已經(jīng)回收了線程。但由于引起線程銷毀的可能性有很多,故線程池要判斷是什么觸發(fā)了這次銷毀,是否要改變線程池的現(xiàn)階段狀態(tài),是否要根據(jù)新?tīng)顟B(tài)重新分配線程。
getTask獲取任務(wù)源碼詳解
??本節(jié),我們來(lái)分析線程怎么通過(guò) getTask() 方法從任務(wù)隊(duì)列中取出任務(wù)的。
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
// 死循環(huán),此方法要么返回 null,要么返回 Runnable 對(duì)象代表取到了任務(wù)
for (;;) {
int c = ctl.get();
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
//線程處于非運(yùn)行狀態(tài)時(shí),若滿足如下兩個(gè)條件之一,則線程數(shù)減1
// 1.線程處于 STOP 及以上(STOP、TIDYING、TERMINAL)的狀態(tài);2.任務(wù)隊(duì)列為空
decrementWorkerCount();
return null;
}
// 計(jì)算線程總數(shù)
int wc = workerCountOf(c);
// Are workers subject to culling? 可以銷毀線程嗎
// 變量timed用于判斷是否需要進(jìn)行超時(shí)控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如下場(chǎng)景可以銷毀線程:①線程數(shù)大于最大線程數(shù);
// ②允許核心線程被回收且空閑時(shí)間超過(guò)存活時(shí)間;
// ③非核心線程空閑時(shí)間超過(guò)存活時(shí)間
// TODO wc可以為0嗎?
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 線程數(shù)減一成功時(shí)返回null,表名當(dāng)前線程可以被回收
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/**
* 從阻塞隊(duì)列中取Runnable類型的任務(wù)對(duì)象,
*/
Runnable r = timed ?
// timed 為 true,調(diào)用poll方法,如果keepAliveTime納秒內(nèi)沒(méi)有拿到任務(wù),則返回null
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// timed 為 false,調(diào)用take方法,此時(shí)線程永久阻塞直到有任務(wù)對(duì)象返回
workQueue.take();
if (r != null)
// 返回從阻塞隊(duì)列中取到的任務(wù),終止循環(huán)
return r;
// 線程沒(méi)有拿到任務(wù),標(biāo)記為已過(guò)期等待被銷毀
timedOut = true;
} catch (InterruptedException retry) {
//當(dāng)前線程在獲取任務(wù)時(shí)發(fā)生中斷,則設(shè)置為不過(guò)期并返回循環(huán)重試
timedOut = false;
}
}
}
??如果不允許核心線程超時(shí)(allowCoreThreadTimeOutfalse)且線程數(shù)不大于核心線程數(shù)(wc <= corePoolSize),則workQueue.take()操作會(huì)導(dǎo)致線程持續(xù)阻塞,避免線程被銷毀。這就是在【預(yù)熱線程】階段不能復(fù)用線程的原因。如果支持核心線程超時(shí)(allowCoreThreadTimeOuttrue)或者線程數(shù)大于核心線程數(shù)(wc > corePoolSize),則執(zhí)行workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)操作,若在keepAliveTime納秒內(nèi)沒(méi)有拿到任務(wù),將返回null。
tryTerminate終止線程源碼詳解
在《Java線程池狀態(tài)和狀態(tài)切換》中已經(jīng)介紹過(guò),通過(guò) shutdownNow 與 shutdown可以觸發(fā)線程池關(guān)閉流程,當(dāng)方法執(zhí)行完畢后,線程池將會(huì)進(jìn)入 STOP 或者 SHUTDOWN 狀態(tài)。但是此時(shí)線程池并未真正的被關(guān)閉,在runWorker方法最后的finally塊中,調(diào)用了processWorkerExit方法,其邏輯實(shí)現(xiàn)中調(diào)用了一個(gè) tryTerminate 方法,這個(gè)才是正在關(guān)閉線程池的鉤子方法。樓蘭胡楊本節(jié)和各位一起看看函數(shù)tryTerminate的源碼。
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 如果線程池不處于預(yù)停機(jī)狀態(tài),則不進(jìn)行停機(jī)
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
// 當(dāng)前還有工作線程,不停機(jī)
interruptIdleWorkers(ONLY_ONE);
return;
}
// 線程處于預(yù)關(guān)閉狀態(tài),開(kāi)始關(guān)閉線程池
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 嘗試通過(guò) CAS 將線程池狀態(tài)修改為 TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
// 嘗試通過(guò) CAS 將線程池狀態(tài)修改為 TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
container.close();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
??哦了,再來(lái)看看函數(shù) terminated(),是不是瞬間感覺(jué)很坑爹?它的方法體里面神!馬!也!沒(méi)!干!淡定,其實(shí)它是個(gè)鉤子方法,允許通過(guò)重寫(xiě)在線程池被終止時(shí)做一些特殊的業(yè)務(wù)邏輯,默認(rèn)的線程池沒(méi)有什么要做的事情,當(dāng)然也沒(méi)有必要寫(xiě)什么啦~
/**
* Method invoked when the Executor has terminated. Default
* implementation does nothing. Note: To properly nest multiple
* overridings, subclasses should generally invoke
* {@code super.terminated} within this method.
*/
protected void terminated() { }
創(chuàng)建多少線程比較合適
??要想合理的配置線程池線程數(shù),就必須首先分析任務(wù)特性,可以從以下幾個(gè)角度來(lái)進(jìn)行分析:
- 任務(wù)的性質(zhì):CPU 密集型任務(wù),IO 密集型任務(wù)和混合型任務(wù);
- 任務(wù)的優(yōu)先級(jí):高,中和低;
- 任務(wù)的執(zhí)行時(shí)間;
- 任務(wù)的依賴關(guān)系:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫(kù)連接;
??上面都是需要考慮的因素,至于應(yīng)該創(chuàng)建多個(gè)線程,還是需要壓測(cè)的,單純考慮CPU 密集型任務(wù)和IO 密集型不合適。另外,建議任務(wù)性質(zhì)不同的任務(wù)用不同規(guī)模的線程池分開(kāi)處理,保證業(yè)務(wù)解耦。
初始化線程池
??默認(rèn)情況下,創(chuàng)建線程池之后,線程池中是沒(méi)有線程的,需要提交任務(wù)之后才會(huì)創(chuàng)建線程。在實(shí)際中如果需要線程池創(chuàng)建之后立即創(chuàng)建線程,可以通過(guò)prestartCoreThreadhe 和 prestartAllCoreThreads兩個(gè)方法實(shí)現(xiàn),二者都通過(guò)調(diào)用函數(shù)boolean addWorker(Runnable firstTask, boolean core)來(lái)實(shí)現(xiàn)。
??prestartCoreThread()用于在創(chuàng)建線程池的時(shí)候初始化一個(gè)核心線程,實(shí)現(xiàn)源碼如下:
/**
* Starts a core thread, causing it to idly wait for work. This
* overrides the default policy of starting core threads only when
* new tasks are executed. This method will return {@code false}
* if all core threads have already been started.
*
* @return {@code true} if a thread was started
*/
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
//注意傳進(jìn)去的參數(shù)firstTask是null
addWorker(null, true);
}
??prestartAllCoreThreads():初始線程池時(shí)創(chuàng)建所有核心線程,實(shí)現(xiàn)源碼如下:
/**
* Starts all core threads, causing them to idly wait for work. This
* overrides the default policy of starting core threads only when
* new tasks are executed.
*
* @return the number of threads started
*/
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))//注意傳入的任務(wù)是null
++n;
return n;
}
??注意上面?zhèn)鬟M(jìn)boolean addWorker(Runnable firstTask, boolean core)的參數(shù)firstTask是null,創(chuàng)建的核心線程會(huì)被阻塞在getTask方法中,等待獲取任務(wù)隊(duì)列中的任務(wù)。
??八股文:當(dāng)任務(wù)數(shù)超過(guò)核心線程數(shù)時(shí),如何直接啟用最大線程數(shù)maximumPoolSize?
分析:題目中【任務(wù)數(shù)超過(guò)核心線程數(shù)】可以理解為【線程數(shù)超過(guò)核心線程數(shù)】。
答:綜合【線程池執(zhí)行流程】所述得知,既然我們的預(yù)期是任務(wù)數(shù)超過(guò)核心線程數(shù)時(shí)新提交的任務(wù)不進(jìn)入任務(wù)隊(duì)列,就需要人為干預(yù)第二步 任務(wù)入隊(duì)。這答案就顯而易見(jiàn)了——在創(chuàng)建線程池的時(shí)候,指定任務(wù)隊(duì)列使用SynchronousQueue。SynchronousQueue是不能存儲(chǔ)元素的一個(gè)隊(duì)列,它的特性是每生產(chǎn)一個(gè)任務(wù)就需要指定一個(gè)消費(fèi)者來(lái)處理這個(gè)任務(wù);否則,阻塞生產(chǎn)者。
結(jié)束語(yǔ)
??至此,已經(jīng)介紹完線程池核心知識(shí)點(diǎn),預(yù)祝各位讀者在工作中能夠迅速而準(zhǔn)確地處理線程池相關(guān)需求,就像運(yùn)斤成風(fēng)一樣。
??在編程這個(gè)復(fù)雜嚴(yán)峻的環(huán)境中,請(qǐng)活得優(yōu)雅坦然:也許你的錢包空空如也,也許你的工作不夠好,也許你正處在困境中,也許你被情所棄。不論什么原因,請(qǐng)你在出門(mén)時(shí),一定要把自己打扮地清清爽爽,昂起頭,挺起胸,面帶微笑,從容自若地面對(duì)生活和面對(duì)工作。人生就像蒲公英,沒(méi)事盡量少吹風(fēng);只要你自己真正撐起來(lái)了一片天地,別人無(wú)論如何是壓不垮你的,內(nèi)心的強(qiáng)大才是真正的強(qiáng)大。
Reference
Buy me a coffee. ?Get red packets.
浙公網(wǎng)安備 33010602011771號(hào)