JAVA多線程(二)--線程池
JAVA多線程(二)--線程池
一、線程池概念
顧名思義,線程池是管理線程的池子。使用線程池有以下優(yōu)點:
- 降低線程創(chuàng)建和銷毀的開銷。
- 提高響應(yīng)速度。用到時創(chuàng)建和直接使用已創(chuàng)建好的線程,速度肯定是不一樣的。
- 提高線程可管理性。線程是稀缺資源,使用線程池可對線程進行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。
二、JUC架構(gòu)

1、Executor接口
Executor 接口是JUC架構(gòu)的頂級接口,它只包含一個void execute(Runnable command)方法,是一個執(zhí)行線程的工具。
public interface Executor {
void execute(Runnable command);
}
2、ExecutorService接口
ExecutorService接口繼承于Executor 接口,向外提供了接收異步任務(wù)的服務(wù)。
public interface ExecutorService extends Executor {
// 接收單個異步任務(wù)
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
// 批量接收異步任務(wù)
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
}
3、AbstractExecutorService抽象類
抽象類,實現(xiàn)了ExecutorService接口
4、ScheduledExecutorService接口
ScheduledExecutorService接口繼承了ExecutorService接口,是一個可以完成'延時'和'周期性'任務(wù)的調(diào)度線程池接口。
5、ThreadPoolExecutor類
ThreadPoolExecutor類繼承了AbstractExecutorService抽象類,是線程池中核心實現(xiàn)類。
6、ScheduledThreadPoolExecutor類
ScheduledThreadPoolExecutor類實現(xiàn)了ScheduledExecutorService接口,繼承了ThreadPoolExecutor類。拓展實現(xiàn)了延時執(zhí)行和周期執(zhí)行等抽象方法。
7、Executors
靜態(tài)工廠類,它通過靜態(tài)工廠方法返回ExecutorService、ScheduledExecutorService等線程池示例對象
三、使用Executors靜態(tài)工廠類創(chuàng)建線程池
1、newFixedThreadPool 固定線程數(shù)的線程池
創(chuàng)建一個可重用固定線程數(shù)的線程池,以共享的無界隊列方式來運行這些線程。如果所有線程處于活動狀態(tài)時提交附加任務(wù),則在有可用線程之前,附加任務(wù)將在隊列中等待。如果在執(zhí)行期間有由于失敗導(dǎo)致任何線程終止,那么一個新線程將代替它執(zhí)行后續(xù)任務(wù)。在某個線程被顯示的關(guān)閉之前,池中的線程將一直存在。
弊端:阻塞隊列是無界隊列,大量任務(wù)涌入時,會導(dǎo)致隊列很大,容易導(dǎo)致JVM出現(xiàn)OOM異常,即內(nèi)存溢出。
2、newSingleThreadExecutor 單個線程的線程池
創(chuàng)建一個只有一個線程的線程池。這個線程池可以在線程死后(或發(fā)生異常),重新啟動一個線程來替代原先的線程繼續(xù)執(zhí)行后續(xù)任務(wù)。
弊端:同固定數(shù)量線程池,阻塞隊列無界。
3、newCachedThreadPool 緩存線程池
創(chuàng)建一個可根據(jù)需要創(chuàng)建新線程的線程池,但是在以前線程可用時將重用它們。當(dāng)前線程池中有可用線程時將重用已有線程,當(dāng)線程池中無可用線程將創(chuàng)建新的線程加入到線程池中執(zhí)行新的任務(wù)。終止并移除那些已有60s未被使用的線程。
弊端:線程數(shù)沒有限制,由于其maximumPoolSize的值為Integer.MAX_VALUE(非常大),可以認為可以無限創(chuàng)建線程,如果任務(wù)提交較多,就會造成大量的線程被啟動,很有可能造成OOM異常,甚至導(dǎo)致CPU線程資源耗盡
4、newScheduledThreadPool 可調(diào)度線程池
創(chuàng)建一個可安排在給定延遲后執(zhí)行任務(wù)的線程池。線程池中包含最小限度固定數(shù)量的線程corePoolSize,即使空閑狀態(tài)也一直存在,除非設(shè)置了allowCoreThreadTimeOut。當(dāng)初始線程不夠時會創(chuàng)建新的線程加入線程池,這部分線程會因為限制狀態(tài)被釋放
弊端:線程數(shù)不設(shè)上限
5、newSingleThreadScheduledExecutor 單個線程的可調(diào)度線程池
創(chuàng)建一個corePoolSize 為1的可安排在給定延遲后執(zhí)行任務(wù)的線程池。
弊端:線程數(shù)不設(shè)上限
6、newWorkStealingPool 工作竊取式線程池
Java8新增的創(chuàng)建線程池方法。實際返回ForkJoinPool對象,創(chuàng)建時如果不設(shè)置任何參數(shù),則以當(dāng)前機器處理器個數(shù)作為線程個數(shù),此線程池會并行處理任務(wù),不能保證執(zhí)行順序。使用所有可用的處理器作為其目標并行度級別創(chuàng)建一個竊取工作的線程池。
使用場景:能夠合理的使用CPU進行對任務(wù)操作(并行操作),適合使用在很耗時的任務(wù)中。底層用的ForkJoinPool 來實現(xiàn)的。 ForkJoinPool的優(yōu)勢在于,可以充分利用多cpu,多核cpu的優(yōu)勢,把一個任務(wù)拆分成多個“小任務(wù)”分發(fā)到不同的cpu核心上執(zhí)行,執(zhí)行完后再把結(jié)果收集到一起返回。
四、使用ThreadPoolExecutor創(chuàng)建線程池
其實Executors中除去Java8新加的newWorkStealingPool方法外(調(diào)用的ForkJoinPool),其他的創(chuàng)建線程池的方法基本上都是調(diào)用了ThreadPoolExecutor的構(gòu)造函數(shù)。
ThreadPoolExecutor提供了一系列屬性,供我們根據(jù)實際業(yè)務(wù)來創(chuàng)建合適的線程池;
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
int corePoolSize: 核心線程最大數(shù)量,通俗點來講就是,線程池中常駐線程的最大數(shù)量int maximumPoolSize: 線程池中運行最大線程數(shù)(包括核心線程和非核心線程)long keepAliveTime: 線程池中空閑線程(僅適用于非核心線程)所能存活的最長時間TimeUnit unit: 存活時間單位,與keepAliveTime搭配使用BlockingQueue<Runnable> workQueue:存放任務(wù)的阻塞隊列ThreadFactory threadFactory:新線程的產(chǎn)生方式RejectedExecutionHandler handler: 線程池拒絕策略
注:若調(diào)用了allowCoreThreadTimeOut(boolean)方法,并且傳入了參數(shù)true,則keepAliveTime參數(shù)所設(shè)置的Idle超時策略也將被應(yīng)用于核心線程
五、向線程池提交任務(wù)
1、execute()
void execute(Runnable command)
Executor接口中的方法, ThreadPoolExecutor中實現(xiàn),ScheduledThreadPoolExecutor中重寫等。
只能接收Runnable,無返回值
2、submit()
<T> Future<T> submit(Callable<T> task)<T> Future<T> submit(Runnable task, T result)Future<?> submit(Runnable task)
這3個submit()方法都是ExecutorService接口中定義的方法, AbstractExecutorService中實現(xiàn),ScheduledThreadPoolExecutor中重寫等。
可以接收Callable``Runnable,有返回值。
3、invokeAny()
批量提交,返回第一個返回值。
4、invokeAll()
批量提交,返回所有返回值。
六、線程池執(zhí)行流程

1、當(dāng)核心線程數(shù)未滿時,即使當(dāng)前有空閑線程,也會優(yōu)先創(chuàng)建新的線程。
2、如果當(dāng)前核心線程數(shù)達到上限時,新的任務(wù)會被分配到阻塞隊列中,一直到阻塞隊列已滿。
3、當(dāng)一個任務(wù)被完成時,執(zhí)行器優(yōu)先從阻塞隊列中獲取任務(wù)執(zhí)行,一直到阻塞隊列為空。
4、當(dāng)核心線程數(shù)已滿,而且阻塞隊列也滿了時,接收到新的任務(wù),將會創(chuàng)建新的線程(非核心線程),并馬上執(zhí)行新任務(wù)。
5、當(dāng)核心線程和阻塞隊列都已滿時,會一直創(chuàng)建新的線程執(zhí)行新任務(wù),直達線程數(shù)超出maximumPoolSize。如果超出maximumPoolSize,線程池會拒絕接收任務(wù)。當(dāng)新任務(wù)過來時,執(zhí)行拒絕策略。
注:
corePoolSizemaximumPoolSizeBlockingQueue等參數(shù)如果配置的不合理,可能會造成異步任務(wù)得不到預(yù)期的執(zhí)行效果,造成嚴重的排隊現(xiàn)象;- 創(chuàng)建新線程的順序:
corePoolSize已滿后,在BlockingQueue也滿之后,才會創(chuàng)建新的線程,直到超出maximumPoolSize;
七、阻塞隊列
當(dāng)一個線程從空的阻塞隊列中獲取任務(wù)時,會被阻塞,直到隊列中有了元素。當(dāng)隊列中有了元素,隊列會被自動喚醒。
常見的阻塞隊列:
ArrayBlockingQueue: 使用數(shù)組實現(xiàn)的有界阻塞隊列,特性先進先出,創(chuàng)建時必須設(shè)置大小;LinkedBlockingQueue: 使用鏈表實現(xiàn)的阻塞隊列,特性先進先出,可以設(shè)置其容量,默認為Interger.MAX_VALUE,特性先進先出;PriorityBlockingQueue: 使用平衡二叉樹堆,實現(xiàn)的具有優(yōu)先級的無界阻塞隊列;DelayQueue: 無界阻塞延遲隊列,隊列中每個元素均有過期時間,當(dāng)從隊列獲取元素時,只有過期元素才會出隊列。隊列頭元素是最塊要過期的元素;SynchronousQueue: 一個不存儲元素的阻塞隊列,每個插入操作,必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài);
八、線程工廠
ThreadFactory是Java線程工廠接口,只有一個方法Thread newThread(Runnable r),調(diào)用它創(chuàng)建新線程時可以更改所創(chuàng)建的新線程的名稱、線程組、優(yōu)先級、守護進程狀態(tài)等。
public interface ThreadFactory {
/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}
ThreadFactory是線程工廠,用于創(chuàng)建線程;Executors是線程池工廠類,用于便捷創(chuàng)建線程池。
九、拒絕策略
觸發(fā)情況:
- 線程池已經(jīng)關(guān)閉
corePool已滿,阻塞隊列已滿,且maximumPoolSize已滿
![image]()
ThreadPoolExecutor中主要提供了四種拒絕策略:
1、AbortPolicy
拒絕策略。新任務(wù)進入會被拒絕,并拋出 RejectedExecutionException 異常。該策略是線程池默認的拒絕策略。
2、DiscardPolicy
拋棄策略。新任務(wù)被直接丟掉,且不會拋出任何異常。
3、DiscardOldestPolicy
拋棄最老任務(wù)策略。將最早加入隊列的任務(wù)拋棄,并嘗試加入隊列。
4、CallerRunsPolicy
調(diào)用者執(zhí)行策略。新任務(wù)被添加到線程池時,如果添加失敗,那么提交任務(wù)線程會自己去執(zhí)行該任務(wù),不會使用線程池中的線程去執(zhí)行新任務(wù)。
十、調(diào)度器的鉤子方法
調(diào)度器的鉤子方法定義在ThreadPoolExecutor中,三個方法都是空方法,一般在子類中重寫。
public class ThreadPoolExecutor extends AbstractExecutorService {
// ...
// 任務(wù)執(zhí)行之前的鉤子方法
protected void afterExecute(Runnable r, Throwable t) { }
// 任務(wù)執(zhí)行之后的鉤子方法
protected void beforeExecute(Thread t, Runnable r) { }
// 線程池終止時的鉤子方法
protected void terminated() { }
// ...
public class MyThreadPoolTest {
public static final int SLEEP_TIME = 1000;
static class MyThreadFactory implements ThreadFactory {
// 自增對象
static AtomicInteger threadNo=new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
String threadName = "MyThread-"+ threadNo;
System.out.println("創(chuàng)建一條新線程," + threadName);
threadNo.incrementAndGet();
Thread thread=new Thread(r, threadName);
thread.setDaemon(true);
return thread;
}
}
static class TargetTask implements Runnable{
static AtomicInteger taskNo=new AtomicInteger(1);
public String taskName;
public TargetTask(){
taskName="task-"+taskNo;
taskNo.incrementAndGet();
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+": "+taskName+" is running...");
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(taskName+" end...");
}
}
public static void main(String[] arg) throws InterruptedException {
ExecutorService pool = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2)){
@Override
protected void terminated() {
System.out.println("調(diào)度器已停止...");
}
@Override
protected void beforeExecute(Thread t,Runnable target) {
System.out.println(t.getName() + " 前鉤執(zhí)行...");
super.beforeExecute(t, target);
}
@Override
protected void afterExecute(Runnable target,Throwable t) {
System.out.println("后鉤執(zhí)行...");
super.afterExecute(target, t);
}
};
for(int i=0;i<5;i++){
pool.submit(new TargetTask());
}
Thread.sleep(5000);
pool.shutdown();
}
}
/**
pool-1-thread-1 前鉤執(zhí)行...
pool-1-thread-3 前鉤執(zhí)行...
pool-1-thread-2 前鉤執(zhí)行...
pool-1-thread-2: task-2 is running...
pool-1-thread-3: task-5 is running...
pool-1-thread-1: task-1 is running...
task-1 end...
后鉤執(zhí)行...
pool-1-thread-1 前鉤執(zhí)行...
pool-1-thread-1: task-3 is running...
task-2 end...
后鉤執(zhí)行...
pool-1-thread-2 前鉤執(zhí)行...
pool-1-thread-2: task-4 is running...
task-5 end...
后鉤執(zhí)行...
task-4 end...
task-3 end...
后鉤執(zhí)行...
后鉤執(zhí)行...
調(diào)度器已停止...
Process finished with exit code 0
**/
十一、關(guān)閉線程池
1、線程池的5種狀態(tài)
- RUNNING: 線程池創(chuàng)建之后的初始狀態(tài),這種狀態(tài)下可以執(zhí)行任務(wù)
- SHUTDOWN:該狀態(tài)下線程池不再接受新任務(wù),但是會將工作隊列中的任務(wù)執(zhí)行完畢
- STOP:該狀態(tài)下線程池不再接受新任務(wù),也不會處理工作隊列中的剩余任務(wù),并且將會中斷所有工作線程
- TIDYING:該狀態(tài)下所有任務(wù)都已終止或者處理完成,將會執(zhí)行terminated()鉤子方法
- TERMINATED:執(zhí)行完terminated()鉤子方法之后的狀態(tài)
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
2、線程池狀態(tài)流轉(zhuǎn)

3、幾種關(guān)閉線程池的方法
1、shutdown()
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
等待當(dāng)前工作隊列中的剩余任務(wù)全部執(zhí)行完成之后,才會執(zhí)行關(guān)閉,但是此方法被調(diào)用之后線程池的狀態(tài)轉(zhuǎn)為SHUTDOWN,線程池不會再接收新的任務(wù)
2、shutdownNow()
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
立即關(guān)閉線程池的方法,此方法會打斷正在執(zhí)行的工作線程,并且會清空當(dāng)前工作隊列中的剩余任務(wù),返回的是尚未執(zhí)行的任務(wù)
3、awaitTermination()
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
等待線程池完成關(guān)閉, shutdown()與shutdownNow()方法之后,用戶程序都不會主動等待線程池關(guān)閉完成
在設(shè)置的時間timeout內(nèi)如果線程池完成關(guān)閉,返回true, 否則返回false


浙公網(wǎng)安備 33010602011771號