技術面:Java并發(線程池、ForkJoinPool)
線程池是如何實現的?
線程池
線程池,就是提前創建好一批線程,然后存儲在線程池中,當有任務需要執行的時候,從線程池中選一個線程來執行。可以頻繁的避免線程的創建和銷毀的開銷。
線程池是基于池化思想的一種實現,本質就是提前準備好一批資源,以備不時之需,在資源有限的情況下,可以大大的提高資源的利用率,提升性能。
還有一些其他基于池化思想的實現:
- 連接池
- 內存池
- 對象池
Java中線程池各接口與實現類之間的關系

線程池的實現原理
除了ForkJoinPool以外,上圖中,無論是通過接口還是實現類來創建線程池,最終都是通過ThreadPoolExecutor的構造方法來實現的。
在構造方法中參數,可以反應出這個對象的數據結構,就是下面這些參數

- corePoolSize,核心線程數數量,線程池中正式員工的數量。
- maximumPoolSize,最大線程數數量,線程池中,正式員工與臨時工(非核心線程)兩者總共最大的數量
- workQueue,任務等待隊列,當核心線程數量的線程任務處理不過來的時候,會先將任務放到這個隊列里面進行等待,直到隊列滿了,然后再有任務就繼續創建線程,直到創建線程的數量到達
maximumPoolSize數量。 - keepAliveTime,非核心線程的最大空閑時間,就是當沒有任務需要處理的時候,臨時工可以待多久,超過這個時間就會被解雇
- threadFactory,創建線程的工程,可以統一處理創建線程的屬性。可以理解為每個公司對員工的要求都不一樣,可以在這里指定員工手冊。
- handler,線程池拒絕策略,當核心線程數,處理不過來任務,等待隊列里也滿了,算上臨時工線程數量也已經到了
maxmumPoolSize了,還有任務提交過來,這個時候可以配置的拒絕任務的策略。默認情況下是拋出異常,告訴任務提交者,“忙不過來了,老子不干了!”
拒絕策略JDK提供的有這么幾種:
- AbortPolicy(默認策略)
拋出RejectedExecutionException異常,立即拒絕任務。
適用場景:任務必須被處理,拒絕后需人工干預。 - DiscardPolicy
靜默丟棄任務,不拋異常。
適用場景:非關鍵任務(如日志記錄、統計)。 - DiscardOldestPolicy
丟棄隊列中最舊的任務,再嘗試提交新任務。
適用場景:實時性要求高的任務(如實時計算)。 - CallerRunsPolicy
由調用線程(提交任務的線程)直接執行任務。
適用場景:降低任務提交速度,緩沖系統壓力。

Worker
ThreadPoolExecutor里面還有一個重要的內部類Worker,這個Worker的概念也是比較重要的。它實現了Runnable接口,并且每個Worker對象包含一個任務和一個線程。
- 任務(
Runnable firstTask),這個任務就是我們提交給線程池要執行的那個任務(Runnable類型),就是說一個任務想要被線程池執行就必須變成一個Worker - 線程(
Thread thead),每個Worker會有一個線程來執行,這個線程是有ThreadPoolExecutor來進行管理的。

當 Worker被創建時,它會通過構造函數接收一個 Runnable 類型的任務。但是Worker并不是執行完這個任務就結束了,而是會繼續從任務隊列中取任務并執行,直到線程池關閉或任務隊列為空。
Worker 中的Thread 對象,表示實際執行任務的工作線程。
每個 Worker都會擁有一個工作線程,工作線程會執行run()方法中的任務。
在 run()方法中,Worker 反復執行 runTask(firstTask)來執行任務。執行完一個任務后, Worker 會繼續檢查線程池的狀態
(runStateAtLeast(ctl.get(),SHUTDOWN))并獲取新的任務,直到線程池關閉。
在ThreadPoolExecutor中有一個字段,workers類型是HashSet<Worker>,專門用來存儲工作線程集合,負責管理所有工作線程的生命周期,無論是想停止線程池還是說結束線程池,都會檢查workers集合中是否還有正在運行的工作線程。
線程池執行任務
下面我們來看一下線程池是如何執行任務的,直接貼源碼,因為用線程池執行任務,無論是使用execute方法還是使用submit方法,最終都是會調用execute方法,所以直接貼出execute方法的源碼
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
* 這段注釋就是介紹的線程池的執行流程,后面有翻譯成中文的說明。
*/
int c = ctl.get();
// 1. 如果當前運行的線程數少于 corePoolSize,嘗試啟動一個新線程并將其給定的任務作為第一個任務。
if (workerCountOf(c) < corePoolSize) {
// 調用 addWorker 方法會原子性地檢查 runState 和 workerCount,通過返回 false 來防止在不應該添加線程時的誤報。
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 如果任務可以成功排隊,那么我們仍然需要再次檢查是否應該添加一個線程(因為自上次檢查以來已有線程死亡)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 或者在此方法進入后線程池已關閉。因此我們需要重新檢查狀態,如果停止則回滾入隊操作,或者在沒有線程的情況下啟動新線程。
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 如果我們無法將任務加入隊列,則嘗試添加一個新線程。如果失敗,我們知道線程池已經關閉或飽和,因此拒絕該任務。
else if (!addWorker(command, false))
reject(command);
}
執行步驟
- 如果當前運行的線程數少于
corePoolSize,嘗試啟動一個新線程并將其給定的任務作為第一個任務。調用addWorker方法會原子性地檢查runState和workerCount,通過返回false來防止在不應該添加線程時的誤報。 - 如果任務可以成功排隊,那么我們仍然需要再次檢查是否應該添加一個線程(因為自上次檢查以來已有線程死亡)或者在此方法進入后線程池已關閉。因此我們需要重新檢查狀態,如果停止則回滾入隊操作,或者在沒有線程的情況下啟動新線程。
- 如果我們無法將任務加入隊列,則嘗試添加一個新線程。如果失敗,我們知道線程池已經關閉或飽和,因此拒絕該任務。
通過上面這段源碼,我們可以看出來,最核心的,用來執行任務的方法就是addWorker那就也看看addWorker是如何執行的。
private boolean addWorker(Runnable firstTask, boolean core) {
// 檢查代碼邏輯省略
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//1. 創建一個worker對象,firstTask作為傳遞給worker的任務。
w = new Worker(firstTask);
// 2. 創建完worker對象后,會從線程池里面拿出一個線程用來執行worker
final Thread t = w.thread;
if (t != null) {
// 3. 由于線程池需要保持對工作線程集合(workers)的同步訪問,線程池會用一個鎖來保護執行任務的邏輯。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
// 4. 先判斷線程池是否處于運行狀態,
// 若線程池沒有關閉且任務有效,則允許添加工作線程。
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
// 5. 確保新創建出來的線程狀態是NEW,即尚未開始執行。
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
// 6. 將worker對象添加到工作線程集合(workers)中。
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
// 7. 更新largestPoolSize,
// 記錄線程池中最大線程數,方便監控線程池負載情況。
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
// 8. worker對象添加到工作線程集合成功,開始啟動工作線程執行worker。
if (workerAdded) {
container.start(t);
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
- 創建一個
worker對象,firstTask作為傳遞給worker的任務。 - 創建完
worker對象后,會從線程池里面拿出一個線程用來執行worker,如果能從線程池中拿到線程,接下來就用這個線程開始執行worker。 - 由于線程池需要保持對工作線程集合(
workers)的同步訪問,線程池會用一個鎖來保護執行任務的邏輯。 - 先判斷線程池是否處于運行狀態,若線程池沒有關閉且任務有效,則允許添加工作線程。
- 確保新創建出來的線程狀態是
NEW,即尚未開始執行。 - 將
worker對象添加到工作線程集合(workers)中。 - 更新
largestPoolSize,記錄線程池中最大線程數,方便監控線程池負載情況。 worker對象添加到工作線程集合成功,開始啟動工作線程執行worker。
那么線程池的線程數具體應該設置成多少呢?
這個問題,面試官一般不是想聽到你給出一個具體的數值,而是想聽到的是你的一個思考過程,就算你回答出來了一個具體數值,也會問你為什么是這個值。
影響線程池線程數量的因素
- CPU核數,多核處理器當然是每個CPU運行一個線程最高效,但是隨著技術的發展現在很多的CPU都有了超線程技術,也就是利用特殊的硬件指令,將兩個邏輯內核模擬成物理處理器,單核處理器可以讓線程并行執行,所以會看到有“4核8線程的CPU”。
- 任務類型,
- CPU密集型,這種任務的核心線程數最好設置成cpu數的1至1.5倍
- I/O密集型,有阻塞有等待的任務,例如:數據庫連接,文件操作,網絡傳輸等,可以將核心線程數量設置成cpu數量的2倍,利用阻塞時間讓其他CPU去干更多的事情。
- JVM和系統資源
- 內存限制,每個線程占用一定的內存,線程過多有內存溢出的風險。
- 操作系統限制,通常操作系統對單個進程可創建的線程也是有數量限制的,數量過多會降低系統效率。
- 并發量與響應時間
- 高并發場景:增加線程數,但需避免資源競爭。
- 快速響應需求:減少任務等待時間,適當增加線程數或隊列容量。
具體該怎么設置線程數量呢?
網上流傳著一些固定的公式來告訴大家如何配置核心線程數量。
就是基于簡單因素考慮,在主要參考CPU和任務類型時:
- CPU密集型任務,線程池的線程數量配置為(CPU數量+1);
- I/O密集型任務,線程池的線程數量配置為(2*CPU數量)+1;
由于無法根據具體的指標判斷任務類型到底是CPU密集型還是I/O密集型,所以又有了,下面一個公式:

等段時間,線程執行過程中等待外部操作完成的時間。在等待時間內,線程通常不占用CPU資源。
計算時間,通常指線程實際計算處理的時間。
不建議直接套用公式
雖然網上流傳了這些公式,但是并不是這個公式就是萬能呢,很多時候我們的任務在執行的時候要考慮的因素有很多。而且現在很多服務器都是虛擬機,并不能真正的發揮出物理機的全部能力,所以很多依賴因素也是不準確的。
所以建議用以下的方式來進行配置:
- 可以在剛上線的時候,先根據公式大致的設置一個數值,然后再根據你自己的實際業務情況,以及不斷的壓測結果,再不斷調整,最終達到一個相對合理的值。
- 也可以結合監控工具(如
Prometheus、Grafana)實時檢測線程池的線程數量,然后再通過ThreadPoolExecutor.setCorePoolSize()和setMaximumPoolSize()動態修改參數。一些成熟的動態線程池框架,比如dynamicTp,不僅支持線程數調整,還支持隊列容量和拒絕策略的調整。
ForkJoinPool和ThreadPoolExecutor有什么區別?
ForkJoinPool是基于工作竊取(Work-Stealing)算法實現的線程池,ForkJoinPool 中每個線程都有自己的工作隊列,用于存儲待執行的任務。當一個線程執行完自己的任務之后,會從其他線程的工作隊列中竊取任務執行,以此來實現任務的動態均衡和線程的利用率最大化。
ThreadPoolExecutor 是基于任務分配(Task-Assignment)算法實現的線程池,ThreadPoolExecutor 中線程池中有一個共享的工作隊列,所有任務都將提交到這個隊列中。線程池中的線程會從隊列中獲取任務執行,如果隊列為空,則線程會等待,直到隊列中有任務為止。
ForkJoinPool的任務調度是通過fork()拆分,再通過join() 合并結果,支持遞歸分治。
默認線程數等于 CPU 核心數(Runtime.getRuntime().availableProcessors()),支持動態調整。
通過 ForkJoinTask 的異常傳播機制處理子任務異常。

ForkJoinPool 中的工作線程是一種特殊的線程,與普通線程池中的工作線程有所不同。
它們會自動地創建和銷毀,以及自動地管理線程的數量和調度。
這種方式可以降低線程池的管理成本,提高線程的利用率和并行度。
提交任務方式與使用場景
提交任務
| 特性 | ForkJoinPool | ThreadPoolExecutor |
|---|---|---|
| 任務類型 | 必須繼承 ForkJoinTask 的子類(如 RecursiveAction 或 RecursiveTask)。 |
提交普通 Runnable 或 Callable 任務。 |
| 任務提交方法 | 使用 submit(ForkJoinTask) 或 invoke(ForkJoinTask)。 |
使用 execute(Runnable) 或 submit(Callable/Runnable)。 |
| 任務依賴性 | 任務間存在依賴關系 (需合并子任務結果)。 |
任務間獨立,無依賴關系。 |
使用場景
| ForkJoinPool | ThreadPoolExecutor |
|---|---|
| 并行計算(如數組求和、歸并排序); 分治算法(如矩陣乘法); - Java 并行流( parallelStream()); |
網絡請求處理; -文件批量處理; 定時任務(如 ScheduledThreadPoolExecutor); |
CompletableFuture底層就是用ForkJoinPool來實現。
代碼示例
public class SumTask extends RecursiveTask<Long> {
private final long[] array;
private final int start, end;
private static final int THRESHOLD = 1000;
public SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) sum += array[i];
return sum;
} else {
int mid = (start + end) / 2;
SumTask left = new SumTask(array, start, mid);
SumTask right = new SumTask(array, mid, end);
left.fork(); // 異步執行左子任務
return left.join() + right.compute(); // 合并結果
}
}
}
public static void main(String[] args) {
// 使用 ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
long[] data = new long[1000000];
// 初始化 data
for(int i=0;i<data.length;i++){
data[i] = i;
}
// 執行任務
Long result = pool.invoke(new SumTask(data, 0, data.length));
}
作者:紀莫
歡迎任何形式的轉載,但請務必注明出處。
限于本人水平,如果文章和代碼有表述不當之處,還請不吝賜教。
歡迎掃描二維碼關注公眾號:Jimoer
文章會同步到公眾號上面,大家一起成長,共同提升技術能力。
聲援博主:如果您覺得文章對您有幫助,可以點擊文章右下角【推薦】一下。
您的鼓勵是博主的最大動力!


線程池的原理?線程池是怎么執行任務的?線程池的核心線程數量應該設置成多少?ForkJoinPool和ThreadPoolExecutor有什么區別?
浙公網安備 33010602011771號