并發(fā)編程系列之如何正確使用線程池?
并發(fā)編程系列之如何正確使用線程池?在上一章節(jié)的學(xué)習(xí)中,我們掌握了線程的基本知識,接著本博客會繼續(xù)學(xué)習(xí)多線程中的線程池知識
1、線程是不是越多越好?
在學(xué)習(xí)多線程之前,讀者可能會有疑問?如果單線程跑得太慢,那么是否就能多創(chuàng)建多個線程來跑任務(wù)?并發(fā)的情況,線程是不是創(chuàng)建越多越好?這是一個很經(jīng)典的問題,畫圖表示一下創(chuàng)建很多線程的情況,然后進行情況分析。

- 創(chuàng)建線程和銷毀線程都是需要時間的,如果創(chuàng)建時間+銷毀時間>執(zhí)行任務(wù)時間就很不劃算
- 創(chuàng)建后的線程是需要內(nèi)存去存放的,創(chuàng)建的線程對應(yīng)一個Thread對象,對象是會占用JVM的堆內(nèi)存的,根據(jù)jvm規(guī)范,一個線程默認最大棧大小為1M,這個棧空間也是需要從系統(tǒng)內(nèi)存中分配的,所以線程越多,需要的內(nèi)存就越多
- 創(chuàng)建線程,操作系統(tǒng)是需要頻繁進行線程上下文切換的,所以線程創(chuàng)建太多,是會影響性能的
上下文切換(context switch):對于單核CPU來說,在一個時刻只能運行一個線程,對于并行來說,單核cpu也是可以支持多線程執(zhí)行代碼的,CPU是通過給線程分配時間片來解決的,所謂時間片是CPU給每個線程分配的時間,時間片的時間是非常短的,所以執(zhí)行完成一個時間片后,進行任務(wù)切換,切換之前先保存這個任務(wù)的狀態(tài),以便于下次換回來的時候,可以加載這個任務(wù)的狀態(tài),所以從保存任務(wù)狀態(tài)到再加載任務(wù)的過程稱為上下文切換,不僅在線程間可以上下文切換,進程也同樣可以
2、如何正確使用多線程?
- 如果是計算型任務(wù)?
CPU數(shù)量的1~2倍即可 - 如果是IO密集型任務(wù)?
就需要多一些線程,要根據(jù)具體的io阻塞時長來進行考量決定
3、Java線程池的工作原理

- 接收任務(wù),放入線程池的任務(wù)倉庫
- 工作線程從線程池的任務(wù)倉庫取,執(zhí)行
- 沒有任務(wù)時,線程阻塞,有任務(wù)時喚醒線程
4、掌握JUC線程池API
-
Executor : 接口類
-
ExecutorService:加入關(guān)閉方法和對Runnable、Callable、Future的支持

- shutdown:已經(jīng)提交的會執(zhí)行完成
- shutdownNow:正在執(zhí)行的會執(zhí)行完成,未來執(zhí)行的返回
- awaitTermination:阻塞等待任務(wù)關(guān)閉完成
- submit類型的:都是提交任務(wù)的,支持Runnable和Callable
- invokeAll類型的:執(zhí)行集合中所有任務(wù)
-
ScheduleExecutorService :加入對定時任務(wù)的支持

其中schedule(Runablle , long, Timeunit)和schedule(Callable<V> , long, TimeUnit)表示的是多久后執(zhí)行,而scheduleAtFixedRate方法和scheduleWithFixedDelay方法表示的都是周期性重復(fù)執(zhí)行的
再描述scheduleAtFixedRate方法和scheduleWithFixedDelay方法的區(qū)別:
scheduleAtFixedRate:以固定的時間頻率重復(fù)執(zhí)行任務(wù),如每10s ,也就是兩個任務(wù)直接以固定的時間間隔執(zhí)行,不管任務(wù)執(zhí)行完成與否

scheduleWithFixedDelay:以固定的任務(wù)時延遲來重復(fù)執(zhí)行任務(wù),這種任務(wù)不管任務(wù)執(zhí)行多久都執(zhí)行完成,然后隔預(yù)定的如3s,接著執(zhí)行下一個任務(wù),每個任務(wù)之間的間隔都是一樣的

-
Executors:快速得到線程池的工具類,創(chuàng)建線程池的工廠類
newFixedThreadPool(int nThreads):創(chuàng)建一個固定大小、任務(wù)隊列 無界的線程池。線程池的核心線程數(shù)=最大線程池=nThreadsnewCachedThreadPool():創(chuàng)建的是一個大小無界的緩沖線程池。它的任務(wù)隊列是一個同步隊列。如果隊列中有空閑的線程,則用空閑線程執(zhí)行,如果沒有就創(chuàng)建新線程執(zhí)行。池中線程空閑超過60s,就會被釋放。緩沖線程池使用于執(zhí)行耗時比較小的異步任務(wù)。線程池的核心線程數(shù)=0,最大線程池=Integer.MAX_VALUEnewSingleThreadExecutor():創(chuàng)建的是只有一個線程來執(zhí)行無界任務(wù)隊列的單一線程池。該線程池按順序執(zhí)行一個一個加入的任務(wù),任何時刻都只有一個線程在執(zhí)行。單一線程池和newFixedThreadPool(1)的區(qū)別在于,單一線程池的池大小是不能再改變的newScheduleThreadPool(int corePoolSize): 能定時執(zhí)行任務(wù)的線程池,該池的核心線程數(shù)由參數(shù)corePoolSize指定,最大線程數(shù)=Integer.MAX_VALUEnewWorkStealingPool():以當前系統(tǒng)可用的處理器數(shù)作為并行級別創(chuàng)建的work-stealing thread pool(ForkJoinPool)newWorkStealingPool(int parallelism):以指定的parallelism并行級別創(chuàng)建的work-stealing thread pool(ForkJoinPool)
-
ThreadPoolExecutor:線程池的標準實現(xiàn)

下面列舉出ThreadPoolExecutor的主要參數(shù):
| 參數(shù) | 描述 |
|---|---|
| corePoolSize | 核心線程數(shù)量 |
| maxPoolSize | 最大線程數(shù)量 |
| keepAliveTime+時間單位 | 空閑線程的存活時間 |
| ThreadFactory | 線程工廠,用于創(chuàng)建線程 |
| workQueue | 用于存放任務(wù)的隊列,可以稱之為工作隊列 |
| Handler | 用于處理被拒絕的任務(wù) |
雖然Executors使用起來很方便,不過在阿里編程規(guī)范里是強調(diào)了慎用Executors創(chuàng)建線程池,下面摘錄自阿里編程規(guī)范手冊:
【強制】線程池不允許使用Executors去創(chuàng)建,而是通過ThreadPoolExecutor的方式,
這樣的處理方式讓寫的同學(xué)更加明確線程池的運行規(guī)則,規(guī)避資源耗盡的風險。
說明:Executors各個方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
??主要問題是堆積的請求處理隊列可能會耗費非常大的內(nèi)存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
??主要問題是線程數(shù)最大數(shù)是Integer.MAX_VALUE,可能會創(chuàng)建數(shù)量非常多的線程,甚至OOM。
ThreadPoolExecutor的基本參數(shù):
new ThreadPoolExecutor(
2, // 核心線程數(shù)
5, // 最大線程數(shù)
60L, // keepAliveTime,線程空閑超過這個數(shù),就會被銷毀釋放
TimeUnit.SECONDS, // keepAliveTime的時間單位
new ArrayBlockingQueue(5)); // 傳入邊界為5的工作隊列
畫流程圖表示,線程池的核心參數(shù)是corePoolSize、maxPoolSize、workQueue(工作隊列)

線程池工作原理示意圖:任務(wù)可以一直放,直到線程池滿了的情況,才會拒絕,然后除了核心線程,其它的線程會被合理回收。所以正常情況下,線程池中的線程數(shù)量會處在 corePoolSize 與 maximumPoolSize 的閉區(qū)間內(nèi)

ThreadPoolExecutor基本實例:
ExecutorService service = new ThreadPoolExecutor(2, 5,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue(5));
service.execute(() ->{
System.out.println(String.format("thread name:%s",Thread.currentThread().getName()));
});
// 避免內(nèi)存泄露,記得關(guān)閉線程池
service.shutdown();
ThreadPoolExecutor加上Callable、Future使用的例子:
public static void main(String[] args) {
ExecutorService service = new ThreadPoolExecutor(2, 5,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue(5));
Future<Integer> future = service.submit(new CallableTask());
Thread.sleep(3000);
System.out.println("future is done?" + future.isDone());
if (future.isDone()) {
System.out.println("callableTask返回參數(shù):"+future.get());
}
service.shutdown();
}
static class CallableTask implements Callable<Integer>{
@Override
public Integer call() {
return ThreadLocalRandom.current().ints(0, (99 + 1)).limit(1).findFirst().getAsInt();
}
}

浙公網(wǎng)安備 33010602011771號