線程池的運行邏輯與你想象的不一樣,它是池族中的異類
只要是 web 項目,程序都會直接或間接使用到線程池,它的使用是如此頻繁,以至于像空氣一樣,大多數時候被我們無視了。但有時候,我們會相當然地認為線程池與其它對象池(如:數據庫連接池)一樣,要用的時候向池子索取,用完后歸還給它即可。然后事實上,線程池獨樹一幟、鶴立雞群,它與普通的對象池就是不同。本文本將先闡述這種差異,接著用最簡單的代碼實現一個線程池,最后再對 JDK 中與線程池相關的 Executor 體系做一個全面介紹。
線程池與普通資源池的差異
提到 pool 這個設計思想,第一反映是這樣的:從一個資源容器中獲取空閑的資源對象。如果容器中有空閑的,就直接從空閑資源中取出一個返回,如果容器中沒有空閑資源,且容器空間未用盡,就新創建一個資源對象,然后再返回給調用方。這個容器就是資源池,它看起來就像這樣:

圖中的工人隊伍里,有3人是空閑的,工頭(資源池的管理者)可以任選兩人來提供勞務服務。同時,隊隊伍尚未飽和,還可以容納一名工人。如果雇主要求一次性提供4名勞工服務,則工頭需要再招納一名工人加入隊伍,然后再向雇主提供服務。此時,這個團隊(資源池)已達到飽和,不能再對外提供勞務服務了,除非某些工人完成了工作。
以上是一個典型資源池的基本特點,那么線程池是否也同樣如此呢。至少第一感覺是沒問題的,大概應該也是這樣吧,畢竟拿從池中取出一個線程,再讓它執行對應的代碼,這聽上去很科學嘛。等等,總感覺哪里不對呢,線程這東西能像普通方法調用那樣,讓我們在主程序里隨意支配嗎?沒錯,問題就在這里,線程一旦運行起來,就完全閉關鎖國了,除了按照運行前約定好的方式進行數據通信外,再也不能去打擾它老人家了。因此,線程池有點像發動機,池中的各個線程就對應發動機的各個汽缸。整個發動機一旦啟動(線程池激活),各個汽缸中的活塞便按照預定的設計,不停地來回運動,永遠也不停止,直到燃油耗盡,或人為地關閉油門。在此期間,我們是不能控制單個汽缸的活動方向的。就如同我們不能控制正在運行的線程,讓其停止正在執行的代碼,轉而去執行其它代碼一樣(利用 Thread.interrpt() 方法也達不到此目的,而 Thread.stop() 更是直接終止了線程)①。

既然不能直接給線程池里的單個線程明確指派任務,那線程池的意義何在呢?意義就在于,雖然不能一對一精確指派任務,但可以給整個線程池提交任務,至于這些任務由池中的哪個線程來執行,則是不可控的。此時,可以把線程池看作是生產流水線上的單個工序。這里以給「老干媽香辣醬」的玻璃瓶加蓋子為例,給瓶子加蓋就是要執行的任務,最初該工序上只設置了一個機械臂,加蓋子也順序操作的。但單個機械臂忙不過來,后來又加了一個機械臂,這樣效率就提高了。瓶子被加蓋的順序也是不確定的,但最終所有瓶子都會被加蓋。
手動編寫一個簡易的線程池
如上小節所述,線程池與其它池類組件不一樣,調用方不可能直接從池中取出一個線程,然后讓它執行一段任務代碼。因為線程一旦啟動起來,就會在自己的頻軌道內獨立運行,不受外部控制。要讓這些線程執行外部提交的任務,需要提供一個數據通道,將任務打包成一個數據結構傳遞過去。而這些運行起來的線程,他們都執行一個相同的循環操作:讀取任務 → 執行任務 → 讀取任務 → ...... ②
┌──────────┐ ┌──────────────┐
┌─→ │Take Task │ -→ │ Execute Task │ ─┐
│ └──────────┘ └──────────────┘ │
└─────────────────────────────────────┘
這個讀取任務的數據通道就是隊列,池中的所有線程都不斷地執行 ② 處的循環邏輯,這便是線程池運行的基本原理。
相對于線程池這個叫法,實際上「執行器 Executor」這個術語在實踐中使用得要更多些。因為在 jdk 的 java.util.concurrent 包下,有一個 Executor 接口,它只有一個方法:
public interface Executor {
void execute(Runnable command);
}
這便是執行器接口,顧名思義,它接受一個 Runnable 對象,并能夠執行它。至于如何執行,交由具體的實現類負責,目前至少有以下四種執行方式 ③
- 在當前線程中同步執行
- 總是新開線程來異步執行
- 只使用一個線程來異步串行執行
- 使用多個線程來并發執行
本小節將以一個簡易的線程池方式來實現 Executor。
編寫只有一個線程的線程池
這是線程池的最簡形式,實現代碼也非常簡單,如下所示
public class SingleThreadPoolExecutor implements Executor {
// 任務隊列
private final Queue<Runnable> tasks = new LinkedBlockingDeque<>();
// 直接將任務添加到隊列中
@Override
public void execute(Runnable task) {
tasks.offer(task);
}
public SingleThreadPoolExecutor() {
// 在構造函數中,直接創建一個線程,作為為線程池的唯一任務執行線程
// 它將在被創建后立即執行,執行邏輯為:
// 1. 從隊列中獲取任務
// 2. 如果獲取到任務,則執行它,執行完后,返回第1步
// 3. 如果未獲取到任務,則簡短休息,繼續第1步
Thread taskRunner = new Thread(() -> {
Runnable task;
while (true) {
task = tasks.poll();
if (task != null) {
task.run();
continue;
}
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
});
taskRunner.start();
}
}
上述的單線程執行器實現中,執行任務的線程是永遠不會停止的,獲取到任務時,就執行它,沒有獲取到,就一直不斷的獲取。下面是這個執行器的測試代碼:
public class SingleThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
SingleThreadPoolExecutorstp stp= new SingleThreadPoolExecutor();
// 連續添加 5 個任務
for (int i = 1; i <= 5; i++) {
stp.execute(new SpeakNameTask("Coding Change The World " + i));
}
System.out.println("主線程已結束");
}
// 一個模擬的任務:簡單地輸出名稱
static class SpeakNameTask implements Runnable {
private String name;
public SpeakNameTask(String name) {
this.name = name;
}
@Override
public void run() {
Random random = new Random();
int milliseconds = 500 + random.nextInt(1000);
try {
TimeUnit.MILLISECONDS.sleep(milliseconds);
System.out.println("["+Thread.currentThread().getName()+"]: I believe " + name);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
下面是輸出結果:
主線程已結束
[Thread-0]: I believe Coding Change The World 1
[Thread-0]: I believe Coding Change The World 2
[Thread-0]: I believe Coding Change The World 3
[Thread-0]: I believe Coding Change The World 4
[Thread-0]: I believe Coding Change The World 5
可以看到:作為測試程序的主線程,已經先執行結束了,而線程池還在順序地執行主線程添加的任務。并且線程池在執行完所有任務后,并沒有退出,jvm 進程會一直存在。
改進為擁有多個線程的線程池
多線程版本的線程池執任務執行器,只是在單線程版本上,增加了執行線程的數量,其它的變化不是很大。但為了更好的組織代碼,需要將任務執行線程的邏輯單獨抽取出來。另外,為了模擬得更像一個池,本示例代碼還增加了以下特性
-
支持核心線程數功能
核心線程數在執行器創建時,一起創建,并永不結束 -
支持最大線程數功能
當核心線程執行任務效率變慢時,增加執行線程 -
支持空閑線程移除功能
當非核心線程空閑時長超過限定值時,結束該線程,并從池中移除
主要代碼如下:
MultiThreadPoolExecutor.java (點擊查看代碼)
public class MultiThreadPoolExecutor implements Executor {
// 線程池
private final Set<TaskRunner> runnerPool = new HashSet<>();
// 任務隊列
private final Queue<Runnable> tasks = new LinkedBlockingDeque<>();
// 單個線程最大空閑毫秒數
private int maxIdleMilliSeconds = 3000;
// 核心線程數
private int coreThreadCount = 1;
// 最大線程數
private int maxThreadCount = 3;
public MultiThreadPoolExecutor() {
// 初始化核心線程
for (int i = 0; i < coreThreadCount; i++) {
addRunner(true);
}
}
private void addRunner(boolean isCoreRunner) {
TaskRunner runner = new TaskRunner(isCoreRunner);
runnerPool.add(runner);
runner.start();
}
@Override
public void execute(Runnable task) {
tasks.add(task);
addRunnerIfRequired();
}
// 視情況增加線程數,這里簡化為當任務數超過線程數的兩倍時,就增加線程
private void addRunnerIfRequired() {
if (tasks.size() <= 2 * runnerPool.size()) {
return;
}
// 未達到最大線程數時,可增加執行線程
if (runnerPool.size() < maxThreadCount) {
synchronized (this) {
if (runnerPool.size() < maxThreadCount) {
addRunner(false);
}
}
}
}
class TaskRunner extends Thread {
// 是否為核心線程
private final boolean coreRunner;
// 已空閑的毫秒數
private long idleMilliseconds = 0;
TaskRunner(boolean coreRunner) {
this.coreRunner = coreRunner;
}
@Override
public void run() {
Runnable task;
while (true) {
task = tasks.poll();
if (task != null) {
task.run();
continue;
}
try {
TimeUnit.MILLISECONDS.sleep(10);
idleMilliseconds += 10;
if(coreRunner) {
continue;
}
if (idleMilliseconds > maxIdleMilliSeconds) {
// 超過最大空間時間,線程結束,并從池中移徐本線程
runnerPool.remove(this);
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
}
完整代碼已上傳至 thread-pool-sample
其實多線程版本的主要難點,是判定增加新線程來執行任務的算法,即如何確定當前需要添加新線程,而不是保持當前的線程數量來執行任務,以保證最高的效率。以這個粗糙的原始版本為基準,不斷豐富細節和增強健壯性,就可以慢慢演進出 Jdk 中的 Executor 體系。
JDK 線程池任務執行器淺析
Executor 體系類結構
Executor 接口是任務執行器的頂級接口,它僅定義了一個方法,但并未限制如何執行傳遞過來的任務。正如第③處所述,「線程池執行」也只是多種方式中的一種,也是用得最多的一種。由于 Executor 接口定義的功能過于單一,于是在 JDK 的并發包下,又對它進行了擴展,這個擴展就是 ExecutorService,如下所示:
public interface ExecutorService extends Executor {
Future<?> submit(Runnable task);
<T> Future<T> submit(Callable<T> task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
void shutdown();
List<Runnable> shutdownNow();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
boolean isShutdown();
boolean isTerminated();
}
這些擴展方法共分為三組,分別是:任務提交類、狀態控制類、狀態檢查類。從分類上可以看出,ExecutorService 增加了「提交任務」的概念(相對于 Executor 的「執行任務」)。另外,還有「關閉」操作,以及檢測執行器當前的狀態,這些都是 Exector 不具備的。下面這個分類列表更加清晰:
-
任務提交
方法 異步提交 批量提交 超時等待 submit(Runnable task) √ submit(Callable task) √ invokeAll(Collection<? extends Callable > tasks) √ invokeAll(Collection<? extends Callable > tasks,long timeout, TimeUnit unit) √ √ invokeAll(Collection<? extends Callable > tasks) √ invokeAny(Collection<? extends Callable > tasks,long timeout, TimeUnit unit) √ √ -
狀態控制
- shutdown()
- shutdownNow()
- awaitTermination(long timeout, TimeUnit unit)
-
狀態檢查
- isShutdown()
- isTerminated()
除了增加了新的方法外,還新增加了一種任務類型,即:java.util.concurrent.Callable,而 Executor 接口定義的任務接口是 java.lang.Runnable。二者的區別是,Callable#call() 方法有返回值,而后者沒有。一般而言,任務提交給執行器后,通常都會異步執行。提交任務的線程是拿不到這個 call() 方法執行完畢后的返回值的,既然這樣,那定義這個有返回值的方法還有什么意義呢?
為了拿到返回值,引入了 java.util.concurrent.Future 接口,它定義了獲取單個異步任務執行結果的方法,不僅如此,它還定義了其它一些訪問和控制單個任務的方法,見下表:
| 方法 | 解釋 |
|---|---|
| get() | 阻塞調用線程,直到所關聯的任務執行結束,拿到返回值,或任務執行結束(取消操作和發生異常均會導致結) |
| get(long timeout, TimeUnit unit) | 同上,但會有一個最大等待時長,若超過該時長后,任務依然未執行結束,則結束等待,并拋出 TimeoutException |
| cancel(boolean mayInterruptIfRunning) | 嘗試取消關聯的任務,只是嘗試,遇到以下情況,均無法取消 · 任務已經取消 · 任務已完成 · 其它原因 通常任務一旦開始執行,就無法取消, 除非是極其特定的任務,這類任務的代碼本身會與外界通信,判斷是否應該取消自己的執行。 因此本方法提供了一個 mayInterruptIfRunning 參數,用來做這種信息傳達, 但也僅僅是一個信息傳達,表達了期望已運行的任務能自我終止, 但能否真的終止,取決于任務本身的代碼邏輯 |
| isCancelled() | 檢測關聯的任務是否已「取消」 |
| isDone() | 檢測關聯的任務是否已「結束」,任務正常執行完畢、遭遇異常和被取消均視為任務已「結束」 |
?? 特別說明
Future#cancel() 方法只是從執行角度上講,取消了任務的執行。它沒有 “回滾” 這種業務上的含義。對于接受 mayInterruptIfRunning 參數的任務,若要實現 “回滾”效果,需要任務自身代碼來實現
Future 只是一個接口,要怎么來實現接口的這些功能呢,以 get() 方法為例,大致分為以下3步:
- 在 Future 實現類的內部持有它要訪問和控制的 Callable 任務實例、執行該任務的線程以及任務執行結果。
- Future 實現類自己要實現 Runnable 接口, 并在 Runnable#run() 方法實現中,調用真實任務 Callable 的 run 方法并獲取返回值,然后將返回值寫入到 Future 實現類的「任務執行結果」字段中。這樣一來,Executor 直接要執行的方法就從原始的 Callable 實例,變成了 Future 實例。
- 有了上面兩步,get() 方法實現就簡單了,一直獲取「任務執行結果」這個字段的值就可以了。
下而是 get() 方法的簡化版(非線程安全)實現樣例:
public class MyFutureImpl<T> implements Future<T>, Runnable {
// 是否運行結束了
private volatile complete;
// Callable 任務執行的結果
private volatile T result;
// 實際執行 Callable 任務的線程
private volatile Thread runner;
private Callable<T> task;
public MyFutureImpl(Callable<T> task) {
this.task = task; // ⑴ 持有真實任務實例
}
@Override
public void run() {
this.runner = Thread.currentThread(); // ?? ⑴ 持有實際執行此任務的線程
T result = task.call(); // ⑵ 調用真實任務的 call 方法,并在實際執行線程中獲得返回值
this.complete = true; // ?? Future 對象的狀態設置為「完成」
this.result = result; // ?? ⑵ 將實際執行線程中獲得的返回值,回寫到 Future 實例的字段中
}
@Override
public T get(long timeout, TimeUnit unit) {
long remainsMillis = unit.toMillis(timeout);
while( !complete ) { // ?? ⑶ 檢查任務是否執行完畢,未執行完畢則一直檢查(更好的辦法是阻塞自己)
TimeUnit.MILLISECONDS.sleep(10);
remainsMillis -= 10;
if( remainsMillis <=0 ) { // 超時檢查
throw new TimeoutException();
}
}
return this.result; // 任務已執行完畢,直接返回結果
}
}
以上這個一點也不線程安全的 Future 實現類,由于去除了復雜的同步操作代碼,核心邏輯反而更加清晰了。代碼中有感嘆號 ?? 的地方,都是存在線程同步問題的。感興趣的碼友,在對這個基本的核心邏輯有了認知后,再去看 JDK 的源碼就更加容易了(JDK 源碼中,使用了 sun.misc.Unsafe 中的相關原子方法來處理并發問題)。
JDK 的并發包下,真實的 Future 實現類是 FutureTask, 它沒有直接實現 Future, 因為根據實現步驟的第 2 步,實現類自身還需要實現 Runnable 接口, 因此,又增加了一個中間接口 RunnableFuture,該接口繼承了 Runnable,而 FutureTask 直接實現的接口正是 RunnableFuture,如下圖所示:

ExecutorService 的直接實現類是 AbstractExecutorService,這是一個抽象類,最終的實現類是 ThreadPoolExecutor。至此終于是回到本文的主題了,即線程池任務執行器,不過 JDK 并發包在此基礎上還提供了一個擴展: ScheduledExeuctorService,所謂 Scheduled(可調度的),即可以安排提交的任務在什么時候執行,也就是經常提到的定時任務。OK,至此我們可以看到整個 Executor 體系的類繼承結構了,如下圖所示:

Executor 核心實現類
從 Executor 的繼承類圖中可以看出,最終實現類只有 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor, 但實際上,大部分邏輯都在 AbstractExecutorService 這個抽象類中實現了。這三個類各自實現了整個 Executor 體系中的一部分方法,為更清晰地呈現它們之間的分工,我整理了這個體系下各個接口方法與對應實現類間的關系表,如下所示:
| Method | AbstractExecutorService | ThreadPoolExecutor | ScheduledThreadPoolExecutor |
|---|---|---|---|
| Executor#execute(Runnable command) | √ | ||
| ExecutorService#submit(Runnable task) | √ | ||
| ExecutorService#submit(Callable task) | √ | ||
| ExecutorService#invokeAll(Collection<? extends Callable> tasks) | √ | ||
| ExecutorService#invokeAll(Collection |
√ | ||
| ExecutorService#invokeAny(Collection<? extends Callable> tasks) | √ | ||
| ExecutorService#invokeAny(Collection |
√ | ||
| ExecutorService#shutdown() | √ | ||
| ExecutorService#shutdownNow() | √ | ||
| ExecutorService#awaitTermination(long timeout, TimeUnit unit) | √ | ||
| ExecutorService#isShutdown() | √ | ||
| ExecutorService#isTerminated() | √ | ||
| ScheduledExecutorService#schedule(Runnable command,long delay, TimeUnit unit) | √ | ||
| ScheduledExecutorService#schedule(Callable command,long delay, TimeUnit unit) | √ | ||
| ScheduledExecutorService#scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) | √ | ||
| ScheduledExecutorService#scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) | √ |
程序分層設計的基本準則是:一個層級只負責一件事,無論是 jdk 的基準庫還是 spring 這樣的框架,它們都遵循這樣的理念。上面這個三個類,就是很好的貫徹了這一原則,三者各自負責的內容為:
-
AbstractExecutorService
只關注如何提交任務,至于提交任務后,如何去執行它,交由子類去處理。之所以要把提交任務的邏輯寫在一個抽象類里邊,是因為這些提交任務的邏輯具有通用性,不需要有多種實現,子類直接復用就好了。 -
ThreadPoolExecutor
關注如何執行任務,這也是執行器的核心。同時,由于它直接負責任務的執行,因此,整個執行器的控制和狀態檢測,也理應由它負責。 -
ScheduledThreadPoolExecutor
關注如何讓任務在指定的時間執行,即所謂的「調度」。它也不關注如何執行任務,所謂在指定的時間執行,其實是在指定的時間提交任務。至于提交后是否會被立刻執行,則取決于真正負責處理執行任務的組件, 這個組件就是ThreadPoolExecutor。
下面是 ThreadPoolExecutor 中最核心的 execute() 方法源碼
public class ThreadPoolExecutor {
public void execute(Runnable command) {
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
 ̄ ̄ ̄ ̄ ̄ ̄ ?
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
 ̄ ̄ ̄ ̄ ̄ ̄ ̄ ̄ ̄ ?
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
 ̄ ̄ ̄ ̄ ̄ ̄ ?
}
else if (!addWorker(command, false))
reject(command);
}
}
要徹底讀懂源碼,還需要掌握并發包下的 Lock 體系,這個體系比 Executor 體系更難。不過有了第二小節「手寫簡易線程池」的經驗,即使我們對 Lock 體系沒有全面掌握,也能從上述源碼中梳理出核心邏輯。比如 ? 處,就是向任務隊列里添加任務,? 處就是在嘗試增加執行線程,其它地方都是做各種并發控制與內部狀態的控制。
現在可以來看看 AbstractExecutorService 實現的 submit(Callable task) 方法,其底層的邏輯是什么了。所謂提交,其實就是調用父接口 Executor 的 execute(Runnable command) 方法,最簡單的實現是將 Callable 對象包裝成一個 Runnable,然后直接調用 execute() 方法,將包裝出來的 Runnable 對象作為參數傳遞過去即可。事實上,AbstractExecutorService 的源碼就是這么做的,以下是 submit 方法的源碼:
public class AbstractExecutorService {
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task); // 將 Callable 包裝成 Runnable
 ̄ ̄ ̄ ̄ ̄ ̄ ̄ ̄ ̄ ̄
execute(ftask); // 以上一步包裝的 Runnable 對象為參數,調用父接口的 execute 方法
 ̄ ̄ ̄ ̄ ̄ ̄ ̄ ̄ ̄
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
}
源碼中的行為與我們預想的一致,現在問題的是:將 Callable 包裝成 Runnable 的核心邏輯是什么?從上述源碼看,這個包裝過程極其簡單,只是簡單的用 Callable 作為參數,新創建了一個 FutureTask 實例。這個 FutureTask 正是在「Executor 體系類結構」小節中提到的 FutrueTask,它的核心邏輯,我們已用簡易的非線程安全代碼演示過了。
有了 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor,我們就可以方便的處理任務了。不過 Jdk 并發包的設計大師為我們考慮得更周到,她還提供一個任務執行器的工廠類 Excutors。Executors 提供的都是靜態方法,通過這些靜態方法,可以創建擁有不同特性的 ExecutorService 對象。比如 Executors#newFixedThreadPool(int threadCount, ThreadFactory threadFactory) 方法,就可以快速創建一個擁有固定線程數量的 ThreadPoolExecutor 實例。
小結
- 線程池不是普通的對象池,池中的線程不受外界控制,也不存在 borrow(借出)與 return(歸還) 一說。這些線程會不斷地從內部的任務隊列里提取任務,然后執行它。
- JDK 并發包構成了一個 Excutor 體系,核心方法的實現有層次地分攤到了 AbstractExecutorService、ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 三個類中。
- Executor 體系提供了一 Executors 工廠類,使得可以快速創建 ExecutorService 實例。
- Executor 體系的實現代碼,還非常依賴并發包下的 Lock 體系,需要該體系來提供線程安全保障。

浙公網安備 33010602011771號