<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      【萬字圖文-原創】 | 學會Java中的線程池,這一篇也許就夠了!

      碎碎念

      關于JDK源碼相關的文章這已經是第四篇了,原創不易,粉絲從幾十人到昨天的666人,真的很感謝之前幫我轉發文章的一些朋友們。

      關注人數.png

      從16年開始寫技術文章,到現在博客園已經發表了222篇文章,大多數都是原創,共有800多粉絲,基本上每個月都會有文章的產出。

      博客園信息.png

      博客園文章月份記錄.png

      回顧這幾年以來寫作的心路歷程,一直都是偷偷的寫,偷偷的發,害怕被人知道,怕被人罵文章寫的太水(之前心理太脆弱了,哈哈)。后面和cxuan聊過后,他建議我給他投稿試試,于是就有了那一篇的萬字的AQS文章。

      最近也有好多讀者加到我的微信,問一些文章中的問題,我也都會認真解答,看到有人閱讀我的文章并有所收獲,我真的挺欣慰,這就是寫作的動力吧。

      幫助別人的同時也是在幫助自己,自己學的技術和理解的內容都是有局限性的。通過寫文章結識到了很多朋友,聽聽別人的分析和見解,我也能學到很多。

      答疑.png

      每次看到博客中有人留言都很激動,也會第一時間去回復。感謝下面的公眾號大佬們之前無私的幫助,大家也可以關注一下他們,都是很nice的大佬:

      Java建設者、Java團長、程序猿石頭、碼象全棧、Java3y、JAVA小咖秀、Bella的技術輪子、石杉的架構筆記、武培軒、程序通事

      前言

      Java中的線程池已經不是什么神秘的技術了,相信在看的讀者在項目中也都有使用過。關于線程池的文章也是數不勝數,我們站在巨人的肩膀上來再次梳理一下。

      本文還是保持原有的風格,圖文解析,盡量做到多畫圖!全文共20000+字,建議收藏后細細品讀,閱讀期間搭配源碼食用效果更佳!

      讀完此文你將學到:

      1. ThreadPoolExecutor中常用參數有哪些?
      2. ThreadPoolExecutor中線程池狀態和線程數量如何存儲的?
      3. ThreadPoolExecutor有哪些狀態,狀態之間流轉是什么樣子的?
      4. ThreadPoolExecutor任務處理策略?
      5. ThreadPoolExecutor常用的拒絕策略有哪些?
      6. Executors工具類提供的線程池有哪些?有哪些缺陷?
      7. ThreadPoolExecutor核心線程池中線程預熱功能?
      8. ThreadPoolExecutor中創建的線程如何被復用的?
      9. ThreadPoolExecutor中關閉線程池的方法shutdownshutdownNow的區別?
      10. ThreadPoolExecutor中存在的一些擴展點?
      11. ThreadPoolExecutor支持動態調整核心線程數、最大線程數、隊列長度等一些列參數嗎?怎么操作?

      本文源碼基于JDK1.8

      線程池基本概念

      線程池是一種池化思想的產物,如同我們數據庫有連接池、Java中的常量池。線程池可以幫助我們管理線程、復用線程,減少線程頻繁新建、銷毀等帶來的開銷。

      在Java中是通過ThreadPoolExecutor類來創建一個線程池的,一般我們建議項目中自己去定義線程池,不推薦使用JDK提供的工具類Executors去構建線程池。

      查看阿里巴巴開發手冊中也有對線程池的一些建議:

      【強制】創建線程或線程池時請指定有意義的線程名稱,方便出錯時回溯。
      正例:自定義線程工廠,并且根據外部特征進行分組,比如,來自同一機房的調用,把機房編號賦值給whatFeaturOfGroup

      public class UserThreadFactory implements ThreadFactory {
      
      	private final String namePrefix;
      	private final AtomicInteger nextId = new AtomicInteger(1);
      
      	UserThreadFactory(String whatFeaturOfGroup) {
      		namePrefix = "From UserThreadFactory's " + whatFeaturOfGroup + "-Worker-";
      	}
      
      	@Override
      	public Thread newThread(Runnable task) {
      		String name = namePrefix + nextId.getAndIncrement();
      		Thread thread = new Thread(null, task, name, 0, false);
      		System.out.println(thread.getName());
      		return thread;
      	}
      }
      

      【強制】線程資源必須通過線程池提供,不允許在應用中自行顯式創建線程。

      說明:線程池的好處是減少在創建和銷毀線程上所消耗的時間以及系統資源的開銷,解決資源不足的問題。
      如果不使用線程池,有可能造成系統創建大量同類線程而導致消耗完內存或者“過度切換”的問題。

      【強制】線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這
      樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。

      說明:Executors 返回的線程池對象的弊端如下:
      1) FixedThreadPool 和 SingleThreadPool:
      允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。
      2) CachedThreadPool:
      允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM。

      線程池使用示例

      下面是一個自定義的線程池,這是之前公司在用的一個線程池,修改其中部分屬性和備注做脫敏處理:

      public class MyThreadPool {
      	static final Logger LOGGER = LoggerFactory.getLogger(MyThreadPool.class);
      
      	private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;
      
      	private static final String THREAD_POOL_NAME = "MyThreadPool-%d";
      
      	private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder().namingPattern(THREAD_POOL_NAME)
      			.daemon(true).build();
      
      	private static final int DEFAULT_SIZE = 500;
      
      	private static final long DEFAULT_KEEP_ALIVE = 60L;
      
      	private static ExecutorService executor;
      
      	private static BlockingQueue<Runnable> executeQueue = new ArrayBlockingQueue<>(DEFAULT_SIZE);
      
      	static {
      		try {
      			executor = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT + 2, DEFAULT_KEEP_ALIVE,
      					TimeUnit.SECONDS, executeQueue, FACTORY);
      
      			Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
      				@Override
      				public void run() {
      					LOGGER.info("MyThreadPool shutting down.");
      					executor.shutdown();
      
      					try {
      						if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
      							LOGGER.error("MyThreadPool shutdown immediately due to wait timeout.");
      							executor.shutdownNow();
      						}
      					} catch (InterruptedException e) {
      						LOGGER.error("MyThreadPool shutdown interrupted.");
      						executor.shutdownNow();
      					}
      
      					LOGGER.info("MyThreadPool shutdown complete.");
      				}
      			}));
      		} catch (Exception e) {
      			LOGGER.error("MyThreadPool init error.", e);
      			throw new ExceptionInInitializerError(e);
      		}
      	}
      
      	private MyThreadPool() {
      	}
      
      	public static boolean execute(Runnable task) {
      
      		try {
      			executor.execute(task);
      		} catch (RejectedExecutionException e) {
      			LOGGER.error("Task executing was rejected.", e);
      			return false;
      		}
      
      		return true;
      	}
      
      	public static <T> Future<T> submitTask(Callable<T> task) {
      
      		try {
      			return executor.submit(task);
      		} catch (RejectedExecutionException e) {
      			LOGGER.error("Task executing was rejected.", e);
      			throw new UnsupportedOperationException("Unable to submit the task, rejected.", e);
      		}
      	}
      }
      

      這里主要就是使用調用ThreadPoolExecutor構造函數來構造一個線程池,指定自定義的ThreadFactory,里面包含我們自己線程池的poolName等信息。重寫里面的execute()submitTask()方法。 添加了系統關閉時的鉤子函數shutDownHook(),在里面調用線程池的shutdown()方法,使得系統在退出(使用ctrl c或者kill -15 pid)時能夠優雅的關閉線程池。

      如果有看不懂的小伙伴也沒有關系,后面會詳細分析ThreadPoolExecutor中的源碼,相信看完后面的代碼再回頭來看這個用例 就完全是小菜一碟了。

      線程池實現原理

      通過上面的示例代碼,我們需要知道創建線程池時幾個重要的屬性:

      ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
      

      corePoolSize: 線程池核心線程數量
      maximumPoolSize: 線程池最大線程數量
      workQueue: 線程池中阻塞隊列,一般指定隊列大小

      線程池中數據模型可以簡化成下圖所示,其中Thread應該是添加的一個個Worker,這里標注的Thread是為了方便理解:

      線程池中數據模型.png

      線程池中提交一個任務具體執行流程如下圖:

      執行流程.png

      提交任務時,比較當前線程池中線程數量和核心線程數的大小,根據比較結果走不同的任務處理策略,這個下面會有詳細說明。

      線程池中核心方法調用鏈路:

      方法調用鏈路.png

      TheadPoolExecutor源碼初探

      TheadPoolExecutor中常用屬性和方法較多,我們可以先分析下這些,然后一步步往下深入,常用屬性和方法如下:

      線程池常見屬性和方法.png

      具體代碼如下:

      public class ThreadPoolExecutor extends AbstractExecutorService {
      
      	private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
      	private static final int COUNT_BITS = Integer.SIZE - 3;
      	private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
      
      	private static final int RUNNING    = -1 << COUNT_BITS;
      	private static final int SHUTDOWN   =  0 << COUNT_BITS;
      	private static final int STOP       =  1 << COUNT_BITS;
      	private static final int TIDYING    =  2 << COUNT_BITS;
      	private static final int TERMINATED =  3 << COUNT_BITS;
      
      	private static int runStateOf(int c)     { return c & ~CAPACITY; }
      	private static int workerCountOf(int c)  { return c & CAPACITY; }
      	private static int ctlOf(int rs, int wc) { return rs | wc; }
      
      	private static boolean runStateLessThan(int c, int s) {
              return c < s;
          }
      
          private static boolean runStateAtLeast(int c, int s) {
              return c >= s;
          }
      
          private static boolean isRunning(int c) {
              return c < SHUTDOWN;
          }
      
          private boolean compareAndIncrementWorkerCount(int expect) {
              return ctl.compareAndSet(expect, expect + 1);
          }
      
          private boolean compareAndDecrementWorkerCount(int expect) {
              return ctl.compareAndSet(expect, expect - 1);
          }
      
           private void decrementWorkerCount() {
              do {} while (! compareAndDecrementWorkerCount(ctl.get()));
          }
      
          public ThreadPoolExecutor(int corePoolSize,
                                    int maximumPoolSize,
                                    long keepAliveTime,
                                    TimeUnit unit,
                                    BlockingQueue<Runnable> workQueue) {
              this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                      Executors.defaultThreadFactory(), defaultHandler);
          }
      }
      
      1. ctl

      ctl代表當前線程池狀態和線程池線程數量的結合體,高3位標識當前線程池運行狀態,后29位標識線程數量。ctlOf方法就是rs(線程池運行狀態)和wc(線程數量)按位或操作

      1. COUNT_BITS

      COUNT_BITS = Integer.SIZE - 3 = 29,在ctl中,低29位用于存放當前線程池中線程的數量

      1. CAPACITY

      CAPACITY = (1 << COUNT_BITS) - 1
      我們來計算一下:
      1 << 29 = 0010 0000 0000 0000 0000 0000 0000 0000
      (1 << 29) - 1 = 0001 1111 1111 1111 1111 1111 1111 1111
      這個屬性是用來線程池能裝載線程的最大數量,也可以用來做一些位運算操作。

      1. 線程池幾種狀態

      RUNNING:

      (1) 狀態說明:線程池處在RUNNING狀態時,能夠接收新任務,以及對已添加的任務進行處理。
      (2) 狀態切換:線程池的初始化狀態是RUNNING。換句話說,線程池被一旦被創建,就處于RUNNING狀態,并且線程池中的任務數為0

      SHUTDOWN:

      (1) 狀態說明:線程池處在SHUTDOWN狀態時,不接收新任務,但能處理已添加的任務。
      (2) 狀態切換:調用線程池的shutdown()接口時,線程池由RUNNING -> SHUTDOWN

      STOP:

      (1) 狀態說明:線程池處在STOP狀態時,不接收新任務,不處理已添加的任務,并且會中斷正在處理的任務。
      (2) 狀態切換:調用線程池的shutdownNow()接口時,線程池由(RUNNING or SHUTDOWN ) -> STOP

      TIDYING:

      (1) 狀態說明:當所有的任務已終止,ctl記錄的"任務數量"為0,線程池會變為TIDYING狀態。當線程池變為TIDYING狀態時,會執行鉤子函數terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變為TIDYING時,進行相應的處理;可以通過重載terminated()函數來實現。
      (2) 狀態切換:當線程池在SHUTDOWN狀態下,阻塞隊列為空并且線程池中執行的任務也為空時,就會由 SHUTDOWN -> TIDYING。
      當線程池在STOP狀態下,線程池中執行的任務為空時,就會由STOP -> TIDYING

      TERMINATED:

      (1) 狀態說明:線程池徹底終止,就變成TERMINATED狀態。
      (2) 狀態切換:線程池處在TIDYING狀態時,執行完terminated()之后,就會由 TIDYING -> TERMINATED

      狀態的變化流轉:

      線程池的狀態流轉.png

      1. runStateOf()

      計算線程池運行狀態的,就是計算ctl前三位的數值。`unStateOf() = c & ~CAPACITY,CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111,那么~CAPACITY = 1110 0000 0000 0000 0000 0000 0000 0000,它與任何數按位與的話都是只看這個數前三位

      1. workerCountOf()

      計算線程池的線程數量,就是看ctl的后29位,workerCountOf() = c & CAPACITY, CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111與任何數按位與,就是看這個數的后29位

      1. ctlOf(int rs, int wt)

      在獲取當前線程池ctl的時候會用到,在后面源碼中會有很多地方調用, 傳遞的參數rs代表線程池狀態,wt代表當前線程次線程(worker)的數量

      1. runStateLessThan(int c, int s)

      return c < s,c一般傳遞的是當前線程池的ctl值。比較當前線程池ctl所表示的狀態是否小于某個狀態s

      1. runStateAtLeast(int c, int s)

      return c >= s,c一般傳遞的是當前線程池的ctl值。比較當前線程池ctl所表示的狀態,是否大于等于某個狀態s

      1. isRunning(int c)

      c < SHUTDOWN, 判斷當前線程池是否是RUNNING狀態,因為只有RUNNING的值小于SHUTDOWN

      1. compareAndIncrementWorkerCount()/compareAndDecrementWorkerCount()

      使用CAS方式 讓ctl值分別加一減一 ,成功返回true, 失敗返回false

      1. decrementWorkerCount()

      將ctl值減一,這個方法用了do...while循環,直到成功為止

      1. completedTaskCount

      記錄線程池所完成任務總數 ,當worker退出時會將 worker完成的任務累積到completedTaskCount

      1. Worker

      線程池內部類,繼承自AQS且實現Runnable接口。Worker內部有一個Thread thread是worker內部封裝的工作線程。Runnable firstTask用來接收用戶提交的任務數據。在初始化Worker時候會設置state為-1(初始化狀態),通過threadFactory創建一個線程。

      1. ThreadPoolExecutor初始化參數

      corePoolSize: 核心線程數限制
      maximumPoolSize: 最大線程限制
      keepAliveTime: 非核心的空閑線程等待新任務的時間 unit: 時間單位。配合allowCoreThreadTimeOut也會清理核心線程池中的線程。
      workQueue: 任務隊列,最好選用有界隊列,指定隊列長度
      threadFactory: 線程工廠,最好自定義線程工廠,可以自定義每個線程的名稱
      handler: 拒絕策略,默認是AbortPolicy

      execute()源碼分析

      當有任務提交到線程池時,就會直接調用ThreadPoolExecutor.execute()方法,執行流程如下:

      執行流程.png

      從流程圖可看,添加任務會有三個分支判斷,源碼如下:

      java.util.concurrent.ThreadPoolExecutor.execute()

      public void execute(Runnable command) {
          if (command == null)
              throw new NullPointerException();
          
          int c = ctl.get();
          if (workerCountOf(c) < corePoolSize) {
              if (addWorker(command, true))
                  return;
              c = ctl.get();
          }
          if (isRunning(c) && workQueue.offer(command)) {
              int recheck = ctl.get();
              if (! isRunning(recheck) && remove(command))
                  reject(command);
              else if (workerCountOf(recheck) == 0)
                  addWorker(null, false);
          }
          else if (!addWorker(command, false))
              reject(command);
      }
      

      c在這里代表線程池ctl的值,包含工作任務數量以及線程池的狀態,上面有解釋過。

      接著看下面幾個分支代碼:

      分支一: if (workerCountOf(c) < corePoolSize) ,條件成立表示當前線程數量小于核心線程數,此次提交任務,直接創建一個新的worker。

      if (workerCountOf(c) < corePoolSize) {
          if (addWorker(command, true))
              return;
          c = ctl.get();
      }
      

      如果線程數小于核心線程數,執行addWorker操作,這個后面會講這個方法的細節,如果添加成功則直接返回,失敗后會重新計算ctl的值,然后執行分支二。

      針對addWorker()執行失敗的情況,有以下幾種可能:

      1. 存在并發情況,execute()方法是可能有多個線程同時調用的,當多個線程同時workerCountOf(c) < corePoolSize成立后,就會向線程池中創建worker,這個時候線程池的核心線程數可能已經達到,在addWorker中還會再次判斷,所以會有任務添加失敗。

      addWorker()失敗場景一.png

      1. 當前線程池狀態發生改變,例如線程A執行addWorker()方法時,線程B修改線程池狀態,導致線程池不是RUNNING狀態,此時線程A執行addWorker()就有可能失敗。

      addWorker()失敗場景二.png

      分支二: if (isRunning(c) && workQueue.offer(command)) {}

      通過分支一流程的分析,我們可以知道執行到這個分支說明**當前線程數量已經達到corePoolSize或者addWorker()執行失敗,我們先看看分支二執行流程:

      超過核心線程數往隊列中添加任務流程圖.png

      首先判斷當前線程池是否處于RUNNING狀態,如果是則嘗試將task放入到workQueue中,workQueue是我們在初始化ThreadPoolExecutor時傳入進來的阻塞隊列。

      如果當前任務成功添加到阻塞隊列中,再次獲取ctl賦值給recheck變量,然后執行:

      if (!isRunning(recheck) && remove(command))
          reject(command);
      

      再次判斷當前線程池是否為RUNNINT狀態,如果不是則說明提交任務到隊列之后,線程池狀態被其他線程給修改了,比如調用shutdown()/shutdownNow()等。這種情況就需要把剛剛提交到隊列中的的任務刪除掉。

      再看下remove()方法:

      public boolean remove(Runnable task) {
          boolean removed = workQueue.remove(task);
          tryTerminate();
          return removed;
      }
      

      如果任務提交到隊列之后,線程池中的線程還未將這個任務消費,那么就可以remove成功,調用reject()方法來執行拒絕策略。
      如果在改變線程池狀態之前,隊列中的數據已經被消費了,此時remove()就會失敗。

      移除隊列中Task任務.png

      接著走else if中的邏輯:

      else if (workerCountOf(recheck) == 0)
          addWorker(null, false);
      

      走這個else if邏輯有兩種可能,線程池是RUNNING狀態或者線程池狀態被改變且workQueue中添加的任務已經被消費導致remove()失敗。
      如果是RUNNING狀態,線程池中的線程數量是0,此時workQueue中還有待執行的任務,就需要新增一個worker(addWorker里面會有創建線程的操作),繼續消費workqueue中的任務。

      添加新任務.png

      這里要注意一下addWorker(null, false),也就是創建一個線程,但并沒有傳入任務,因為任務已經被添加到workQueue中了,所以worker在執行的時候,會直接從workQueue中獲取任務。在workerCountOf(recheck) == 0時執行addWorker(null, false)也是為了保證線程池在RUNNING狀態下必須要有一個線程來執行任務,可以理解為一種擔保兜底機制。

      至于線程池中線程為何可以為0?這個如果我們設置了allowCoreThreadTimeOut=true,那么核心線程也是允許被回收的,后面getTask()中代碼有提及。

      分支三: else if (!addWorker(command, false)) {}

      通過分支一和分之二的分析,進入這個分支的前置條件:線程數超過核心線程數且workQueue中數據已滿。

      else if (!addWorker(command, false)),執行添加worker操作,如果執行失敗就直接走reject()拒絕策略。這里添加失敗可能是線程數已經超過了maximumPoolSize。

      分支三執行流程.png

      addWorker()源碼分析

      上面分析提交任務的方法execute()時多次用到addWorker方法,接收任務后將任務添加到Worker中。

      WorkerThreadPoolExecutor中的內部類,繼承自AQS且實現了Runnable接口。 類中包含Thread thread,它是worker內部封裝的工作線程,還有firstTask屬性,它是一個可執行的Runnable對象。在Worker的構造函數中,使用線程工廠創建了一個線程,當thread啟動的時候,會以worker.run()為入口啟動線程,這里會直接調用到runWorker()中。

      private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
      
          private static final long serialVersionUID = 6138294804551838833L;
      
          final Thread thread;
          Runnable firstTask;
          volatile long completedTasks;
      
          Worker(Runnable firstTask) {
              setState(-1);
              this.firstTask = firstTask;
              this.thread = getThreadFactory().newThread(this);
          }
      
          public void run() {
              runWorker(this);
          }
      }
      

      流程如下圖:

      添加Worker.png

      這里再回頭看下addWorker(Runnable firstTask, boolean core) 方法,這個方法主要是添加一個Worker到線程池中并執行,firstTask參數用于指定新增的線程執行的第一個任務,core參數為true表示在新增線程時會判斷當前活動線程數是否少于corePoolSizefalse表示在新增線程時會判斷當前活動線程數是否少于maximumPoolSize

      addWorker方法整體執行流程圖如下:
      addWorker流程圖.png

      接著看下源碼:

      java.util.concurrent.ThreadPoolExecutor.addWorker()

      private boolean addWorker(Runnable firstTask, boolean core) {
          retry:
          for (;;) {
              int c = ctl.get();
              int rs = runStateOf(c);
      
              if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
                  return false;
      
              for (;;) {
                  int wc = workerCountOf(c);
                  if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                      return false;
                  if (compareAndIncrementWorkerCount(c))
                      break retry;
                  c = ctl.get();
                  if (runStateOf(c) != rs)
                      continue retry;
              }
          }
      
          boolean workerStarted = false;
          boolean workerAdded = false;
          Worker w = null;
          try {
              w = new Worker(firstTask);
              final Thread t = w.thread;
              if (t != null) {
                  final ReentrantLock mainLock = this.mainLock;
                  mainLock.lock();
                  try {
                      int rs = runStateOf(ctl.get());
      
                      if (rs < SHUTDOWN ||
                          (rs == SHUTDOWN && firstTask == null)) {
                          if (t.isAlive())
                              throw new IllegalThreadStateException();
                          workers.add(w);
                          int s = workers.size();
                          if (s > largestPoolSize)
                              largestPoolSize = s;
                          workerAdded = true;
                      }
                  } finally {
                      mainLock.unlock();
                  }
                  if (workerAdded) {
                      t.start();
                      workerStarted = true;
                  }
              }
          } finally {
              if (!workerStarted)
                  addWorkerFailed(w);
          }
          return workerStarted;
      }
      

      這里是有兩層for循環,外層循環主要是判斷線程池的狀態,如果狀態不合法就直接返回false.

      只有兩種情況屬于合法狀態:

      1. RUNNING狀態
      2. SHUTDOWN狀態時,隊列中還有未處理的任務,且提交的任務為空。SHUTDOWN含義就是不再接收新任務,可以繼續處理阻塞隊列的任務。

      第二層循環是通過CAS操作更新workCount數量,如果更新成功則往線程池中中添加線程,這個所謂的線程池就是一個HashSet數組。添加失敗時判斷失敗原因,CAS失敗有兩種原因:線程池狀態被改變或者并發情況修改線程池中workCount數量,這兩種情況都會導致ctl值被修改。如果是第二種原因導致的失敗,繼續自旋更新workCount數量。

      接著繼續分析循環內部的實現,先看看第一層循環:c代表線程池ctl值,rs代表線程池運行狀態。

      if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
          return false;
      

      條件一:rs >= SHUTDOWN 成立, 說明當前線程池狀態不是RUNNING狀態

      條件二: !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())

      我們之前提到過,創建任務有兩種情況:
      1)RUNNING狀態可以提交任務,
      2)SHUTDOWN狀態下如果傳遞的任務是空且阻塞隊列中還有任務未處理的情況才是允許創建任務繼續處理的,因為阻塞隊列中的任務仍然需要繼續處理。

      上面的條件一和條件二就是處理SHUTDOWN狀態下任務創建操作的判斷。

      接著分析第二層循環,先是判斷線程池workCount數量是否大于可創建的最大值,或者是否超過了核心線程數/最大線程數,如果是則直接返回,addWorker()操作失敗。

      接著使用compareAndIncrementWorkerCount(c)將線程池中workCount+1,這里使用的是CAS操作,如果成功則直接跳出最外層循環。

      for (;;) {
          int wc = workerCountOf(c);
          if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
              return false;
          if (compareAndIncrementWorkerCount(c))
              break retry;
          c = ctl.get();
          if (runStateOf(c) != rs)
              continue retry;
      }
      

      如果CAS失敗,說明此時有競爭,會重新獲取ctl的值,判斷競爭失敗的原因是添加workCount數量還是修改線程池狀態導致的,如果線程池狀態未發生改變,就繼續循環嘗試CAS增加workCount數量,接著看循環結束后邏輯:

      boolean workerStarted = false;
      boolean workerAdded = false;
      Worker w = null;
      try {
          w = new Worker(firstTask);
          final Thread t = w.thread;
          if (t != null) {
              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              try {
                  int rs = runStateOf(ctl.get());
      
                  if (rs < SHUTDOWN ||
                      (rs == SHUTDOWN && firstTask == null)) {
                      if (t.isAlive())
                          throw new IllegalThreadStateException();
                      workers.add(w);
                      int s = workers.size();
                      if (s > largestPoolSize)
                          largestPoolSize = s;
                      workerAdded = true;
                  }
              } finally {
                  mainLock.unlock();
              }
              if (workerAdded) {
                  t.start();
                  workerStarted = true;
              }
          }
      } finally {
          if (!workerStarted)
              addWorkerFailed(w);
      }
      

      這里workerStarted代表worker是否已經啟動,workerAdded代表創建的worker是否添加到池子中,這里所謂的池子就是全局定義的一個HashSet結構的workers變量。

      接著根據傳遞的firstTask來構建一個Worker,在Worker的構造方法中也會通過ThreadFactory創建一個線程,這里判斷t != null是因為用戶可以自定義ThreadFactory,如果這里用戶不是創建線程而是直接返回null則會出現一些問題,所以需要判斷一下。

      w = new Worker(firstTask);
      final Thread t = w.thread;
      
      if (t != null) {
      
      }
      

      在往池子中添加Worker的時候,是需要先加鎖的,因為針對全局的workers操作并不是線程安全的。

      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      

      繼續看下面代碼,rs代表當前線程池的狀態,這里還是判斷線程池的狀態,如果rs < SHUTDOWN代表線程池狀態是RUNNING狀態,此時可以直接操作。
      如果是SHUTDOWN狀態,需要滿足firstTask == null才可以繼續操作。因為在SHUTDOWN狀態時不會再添加新的任務,但還是可以繼續處理workQueue中的任務。

      t.isAlive() 當線程start后,線程isAlive會返回true,這里還是防止自定義的ThreadFactory創建線程返回給外部之前,將線程start了,由此可見Doug lea考慮問題真的很全面。

      int rs = runStateOf(ctl.get());
      if (rs < SHUTDOWN ||
          (rs == SHUTDOWN && firstTask == null)) {
          if (t.isAlive())
              throw new IllegalThreadStateException();
          workers.add(w);
      }
      

      接著將創建的Worker添加到workers集合中,設置largestPoolSize,這個屬性是線程池生命周期內線程數最大值,一般是做統計數據用的。 最后修改workerAdded = true,代表當前提交的任務所創建的Worker已經添加到池子中了。

      添加worker成功后,調用線程的start()方法啟動線程,因為Worker中重寫了run()方法,最后會執行Worker.run()。最后設置workerStarted = true后釋放全局鎖。

      int rs = runStateOf(ctl.get());
      if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
      	
      	workers.add(w);
      	int s = workers.size();
      	if (s > largestPoolSize)
              largestPoolSize = s;
              
          orkerAdded = true;
      }
      

      這里再回頭看看workerAdded = false的情形,如果線程池在lock之前,狀態發生了變化,導致添加失敗。此時workerAdded也會為false,最后執行addWorkerFailed(work)操作,這個方法是將Workworkers中移除掉,然后將workCount數量減一,最后執行tryTerminate()來嘗試關閉線程池,這個方法后面會細說。

      runWorker()源碼分析

      Worker類中的run方法調用了runWorker來執行任務。上面addWorker()方法正常的執行邏輯會創建一個Worker,然后啟動Worker中的線程,這里其實就會執行到runWorker方法。

      方法調用關系.png

      runWorker的執行邏輯很簡單,啟動一個線程,執行當前傳遞的task任務,執行完后又不斷的從workQueue中獲取任務繼續執行,如果當前workCount數量小于核心線程數且隊列中沒有了任務,當前線程會被阻塞,這個就是getTask()的邏輯,一會會講到。

      如果當前線程數大于核心線程數且隊列中沒有任務,就會返回null,在runWorker這邊退出循環,回收多余的worker數據。

      runWorker流程.png

      源碼如下:

      final void runWorker(Worker w) {
          Thread wt = Thread.currentThread();
          Runnable task = w.firstTask;
          w.firstTask = null;
          w.unlock();
          boolean completedAbruptly = true;
          try {
              while (task != null || (task = getTask()) != null) {
                  w.lock();
                  if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                      !wt.isInterrupted())
                      wt.interrupt();
                  try {
                      beforeExecute(wt, task);
                      Throwable thrown = null;
                      try {
                          task.run();
                      } catch (RuntimeException x) {
                          thrown = x; throw x;
                      } catch (Error x) {
                          thrown = x; throw x;
                      } catch (Throwable x) {
                          thrown = x; throw new Error(x);
                      } finally {
                          afterExecute(task, thrown);
                      }
                  } finally {
                      task = null;
                      w.completedTasks++;
                      w.unlock();
                  }
              }
              completedAbruptly = false;
          } finally {
              processWorkerExit(w, completedAbruptly);
          }
      }
      

      這里w.unlock()是為了初始化當前Workstate==0,然后設置獨占線程為null,因為在shutDown()方法中會嘗試獲取Worker中的鎖,如果獲取成功代表當前線程沒有被加鎖處于空閑狀態,給當前線程一個中斷信號。所以這里在執行線程任務的時候需要加鎖,防止調用shutDown()的時候給當前worker線程一個中斷信號。

      判斷task是否為空,如果是一個空任務,那么就去workQueue中獲取任務,如果兩者都為空就會退出循環。

      while (task != null || (task = getTask()) != null) {}
      

      最核心的就是調用task.run()啟動當前任務,這里面還有兩個可擴展的方法,分別是beforeExecute()/afterExecute(),我們可以在任務執行前和執行后分別自定義一些操作,其中afterExecute()可以接收到任務拋出的異常信息,方便我們做后續處理。

      while (task != null || (task = getTask()) != null) {
          try {
              beforeExecute(wt, task);
              Throwable thrown = null;
              try {
                  task.run();
              } catch (RuntimeException x) {
                  thrown = x; throw x;
              } catch (Error x) {
                  thrown = x; throw x;
              } catch (Throwable x) {
                  thrown = x; throw new Error(x);
              } finally {
                  afterExecute(task, thrown);
              }
          } finally {
              task = null;
              w.completedTasks++;
              w.unlock();
          }
      }
      

      如果退出循環,說明getTask()方法返回null。會執行到finally中的processWorkerExit(w, completedAbruptly)方法,此方法是用來清理線程池中添加的work數據,completedAbruptly=true代表是異常情況下退出。

      try {
          while (task != null || (task = getTask()) != null) {
              
          }
          completedAbruptly = false;
      } finally {
          processWorkerExit(w, completedAbruptly);
      }
      

      runWorker()中只是啟動了當前線程工作,還需要源源不斷通過getTask()方法從workQueue來獲取任務執行。在workQueue沒有任務的時候,根據線程池workCount和核心線程數的對比結果來使用processWorkerExit()執行清理工作。

      getTask()源碼分析

      getTask方法用于從阻塞隊列中獲取任務,如果當前線程小于核心線程,那么當阻塞隊列中沒有任務時就會阻塞,反之會等待keepAliveTime后返回。

      這個就是keepAliveTime的使用含義:非核心的空閑線程等待新任務的時間,當然如果這里設置了allowCoreThreadTimeOut=true也會回收核心線程。

      具體代碼如下:

      private Runnable getTask() {
          boolean timedOut = false;
      
          for (;;) {
              int c = ctl.get();
              int rs = runStateOf(c);
      
              if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                  decrementWorkerCount();
                  return null;
              }
      
              int wc = workerCountOf(c);
      
              boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
      
              if ((wc > maximumPoolSize || (timed && timedOut))
                  && (wc > 1 || workQueue.isEmpty())) {
                  if (compareAndDecrementWorkerCount(c))
                      return null;
                  continue;
              }
      
              try {
                  Runnable r = timed ?
                      workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                      workQueue.take();
                  if (r != null)
                      return r;
                  timedOut = true;
              } catch (InterruptedException retry) {
                  timedOut = false;
              }
          }
      }
      

      這里核心代碼就是從workQueue中取任務,采用poll還是take取決于allowCoreThreadTimeOut和線程數量,allowCoreThreadTimeOut在構造ThreadLocalExecutor后設置的,默認為false。如果設置為true則代表核心線程數下的線程也是可以被回收的。如果使用take則表明workQueue中沒有任務當前線程就會被阻塞掛起,直到有了新的任務才會被喚醒。

      workQueue數據取出流程.png

      在這里擴展下阻塞隊列的部分方法的含義,這里主要是看poll()take()的使用區別:
      阻塞隊列插入方法:

      boolean add(E e):隊列沒有滿,則插入數據并返回true;隊列滿時,拋出異常 java.lang.IllegalStateException: Queue full。
      boolean offer(E e):隊列沒有滿,則插入數據并返回true;隊列滿時,返回false。
      void put(E e):隊列沒有滿,則插入數據;隊列滿時,阻塞調用此方法線程,直到隊列有空閑空間時此線程進入就緒狀態。
      boolean offer(E e, long timeout, TimeUnit unit):隊列沒有滿,插入數據并返回true;隊列滿時,阻塞調用此方法線程,若指定等待的時間內還不能往隊列中插入數據,返回false。

      阻塞隊列移除(獲取)方法:

      E remove():隊列非空,則以FIFO原則移除數據,并返回該數據的值;隊列為空,拋出異常 java.util.NoSuchElementException。

      E poll(): 隊列非空,移除數據,并返回該數據的值;隊列為空,返回null。

      E take(): 隊列非空,移除數據,并返回該數據的值;隊列為空,阻塞調用此方法線程,直到隊列為非空時此線程進入就緒狀態。

      E poll(long timeout, TimeUnit unit):隊列非空,移除數據,并返回該數據的值;隊列為空,阻塞調用此方法線程,若指定等待的時間內隊列都沒有數據可取,返回null。

      阻塞隊列檢查方法:

      E element(): 隊列非空,則返回隊首元素;隊列為空,拋出異常 java.util.NoSuchElementException。
      E peek(): 隊列非空,則返回隊首元素;隊列為空,返回null。

      processWorkerExit()源碼分析

      此方法的含義是清理當前線程,從線程池中移除掉剛剛添加的worker對象。

      processWorkerExit執行前置條件.png

      執行processWorkerExit()代表在runWorker()線程跳出了當前循環,一般有兩種情況:

      1. task.run()內部拋出異常,直接結束循環,然后執行processWorkerExit()
      2. getTask()返回為空,代表線程數量大于核心數量且workQueue中沒有任務,此時需要執行processWorkerExit()來清理多余的Worker對象
      private void processWorkerExit(Worker w, boolean completedAbruptly) {
          if (completedAbruptly)、
              decrementWorkerCount();
      
          final ReentrantLock mainLock = this.mainLock;
          mainLock.lock();
          try {
              completedTaskCount += w.completedTasks;
              workers.remove(w);
          } finally {
              mainLock.unlock();
          }
      
          tryTerminate();
      
          int c = ctl.get();
          if (runStateLessThan(c, STOP)) {
              if (!completedAbruptly) {
                  int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                  if (min == 0 && ! workQueue.isEmpty())
                      min = 1;
                  if (workerCountOf(c) >= min)
                      return;
              }
              addWorker(null, false);
          }
      }
      

      針對于線程池workers的操作都會進行加鎖處理,然后將當前Worker從池子中移除,累加當前線程池完成的任務總數completedTaskCount

      接著調用tryTerminate()嘗試關閉線程池,這個方法后面有詳細說明。

      接著判斷if (runStateLessThan(c, STOP)) {},含義是當前線程池狀態小于STOP,即當前線程池狀態當前線程池狀態為 RUNNINGSHUTDOWN,判斷當前線程是否是正常退出。如果當前線程是正常退出,那么completedAbruptly=false,接著判斷線程池中是否還擁有足夠多的的線程,因為異常退出可能導致線程池中線程數量不足,此時就要執行addWorker()為線程池添加新的worker數據,看下面的詳細分析:

      執行最后的addWorke()有三種可能:
      1)當前線程在執行task時 發生異常,這里一定要創建一個新worker頂上去。
      2)如果!workQueue.isEmpty()說明任務隊列中還有任務,這種情況下最起碼要留一個線程,因為當前狀態為 RUNNING || SHUTDOWN這是前提條件。
      3)當前線程數量 < corePoolSize值,此時會創建線程,維護線程池數量在corePoolSize個水平。

      tryTerminate()源碼分析

      上面移除Worker的方法中有一個tryTerminate()方法的調用,這個方法是根據線程池狀態嘗試關閉線程池。

      執行流程如下:

      tryTerminate()流程.png

      實現源碼如下:

      final void tryTerminate() {
          for (;;) {
              int c = ctl.get();
              if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                  return;
              if (workerCountOf(c) != 0) {
                  interruptIdleWorkers(ONLY_ONE);
                  return;
              }
      
              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              try {
                  if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                      try {
                          terminated();
                      } finally {
                          ctl.set(ctlOf(TERMINATED, 0));
                          termination.signalAll();
                      }
                      return;
                  }
              } finally {
                  mainLock.unlock();
              }
          }
      }
      

      首先是判斷線程池狀態:
      條件一:isRunning(c) 成立,直接返回就行,線程池很正常!
      條件二:runStateAtLeast(c, TIDYING) 說明 已經有其它線程 在執行 TIDYING -> TERMINATED狀態了,當前線程直接回去。
      條件三:(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())
      SHUTDOWN特殊情況,如果是這種情況,直接回去。得等隊列中的任務處理完畢后,再轉化狀態。

      接著執行:

      if (workerCountOf(c) != 0) {
          interruptIdleWorkers(ONLY_ONE);
          return;
      }
      

      走到這個邏輯,說明線程池狀態 >= STOP或者線程池狀態為SHUTDOWN且隊列已經空了

      當前線程池中的線程數量 > 0,調用interruptIdleWorkers()中斷一個空閑線程,然后返回。我們來分析下,在getTask()返回為空時會執行退出邏輯processWorkerExit(),這里就會調用tryTerminate()方法嘗試關閉線程池。

      如果此時線程池狀態滿足線程池狀態 >= STOP或者線程池狀態為SHUTDOWN且隊列已經空了,如果此時線程池中線程數不為0,就會中斷一個空閑線程。
      為什么這里只中斷一個線程呢?這里的設計思想是,如果線程數量特別多的話,只有一個線程去做喚醒空閑worker的任務可能會比較吃力,所以,就給了每個 被喚醒的worker線程 ,在真正退出之前協助 喚醒一個空閑線程的任務,提供吞吐量的一種常用手段。

      我們順便看下interruptIdleWorkers()源碼:

      private void interruptIdleWorkers(boolean onlyOne) {
          final ReentrantLock mainLock = this.mainLock;
          mainLock.lock();
          try {
              for (Worker w : workers) {
                  Thread t = w.thread;
                  if (!t.isInterrupted() && w.tryLock()) {
                      try {
                          t.interrupt();
                      } catch (SecurityException ignore) {
                      } finally {
                          w.unlock();
                      }
                  }
                  if (onlyOne)
                      break;
              }
          } finally {
              mainLock.unlock();
          }
      }
      

      遍歷workers,如果線程是空閑狀態(空閑狀態:queue.take()和queue.poll()返回空),則給其一個中斷信號,如果是處于workQueue阻塞的線程,會被喚醒,喚醒后,進入下一次自旋時,可能會return null執行退出相關的邏輯,接著又會調用processWorkerExit()->tryTerminate(),回到上面場景,當前線程退出的時候還是會繼續喚醒下一個空現線程。

      接著往下看tryTerminate的剩余邏輯:

      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
          if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
              try {
                  terminated();
              } finally {
                  ctl.set(ctlOf(TERMINATED, 0));
                  termination.signalAll();
              }
              return;
          }
      } finally {
          mainLock.unlock();
      }
      

      執行到這里的線程是誰?
      workerCountOf(c) == 0 時,會來到這里。
      最后一個退出的線程。 在 (線程池狀態 >= STOP || 線程池狀態為 SHUTDOWN 且 隊列已經空了)
      線程喚醒后,都會執行退出邏輯,退出過程中 會 先將 workerCount計數 -1 => ctl -1。
      調用tryTerminate 方法之前,已經減過了,所以0時,表示這是最后一個退出的線程了。

      獲取全局鎖,進行加鎖操作,通過CAS設置線程池狀態為TIDYING狀態,設置成功則執行terminated()方法,這也是一個自定義擴展的方法,當線程池中止的時候會調用此方法。

      最后設置線程池狀態為TERMINATED狀態,喚醒調用awaitTermination()方法的線程。

      awaitTermination()源碼分析

      該方法是判斷線程池狀態是否達到TERMINATED,如果達到了則直接返回true,沒有達到則會await掛起當前線程指定的時間。

      public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
          long nanos = unit.toNanos(timeout);
          final ReentrantLock mainLock = this.mainLock;
          mainLock.lock();
          try {
              for (;;) {
                  if (runStateAtLeast(ctl.get(), TERMINATED))
                      return true;
                  if (nanos <= 0)
                      return false;
                  nanos = termination.awaitNanos(nanos);
              }
          } finally {
              mainLock.unlock();
          }
      }
      

      在每次執行tryTerminate()后會喚醒所有被await的線程,繼續判斷線程池狀態。

      shutDown()/shutDownNow()源碼分析

      shutDownshutDown()方法都是直接改變線程池狀態的方法,一般我們在系統關閉之前會調用此方法優雅的關閉線程池。

      public void shutdown() {
          final ReentrantLock mainLock = this.mainLock;
          mainLock.lock();
          try {
              checkShutdownAccess();
              advanceRunState(SHUTDOWN);
              interruptIdleWorkers();
              onShutdown();
          } finally {
              mainLock.unlock();
          }
          tryTerminate();
      }
      
      public List<Runnable> shutdownNow() {
          List<Runnable> tasks;
          final ReentrantLock mainLock = this.mainLock;
          mainLock.lock();
          try {
              checkShutdownAccess();
              advanceRunState(STOP);
              interruptWorkers();
              tasks = drainQueue();
          } finally {
              mainLock.unlock();
          }
          tryTerminate();
          return tasks;
      }
      

      shutdownshutdownNow方法調用差不多,只是shutdown是將線程池狀態設置為SHUTDOWN,shutdownNow是將線程池狀態設置為STOP。
      shutdownNow會返回所有未處理的task集合。

      來看看它們共同調用的一些方法:

      private void advanceRunState(int targetState) {
          for (;;) {
              int c = ctl.get();
              if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                  break;
          }
      }
      

      這個方法是設置線程池狀態為指定狀態,runStateAtLeast(c, targetState),判斷當前線程池ctl值,如果小于targetState則會往后執行。
      ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))),通過CAS指令,修改ctl中線程池狀態為傳入的targetState

      private void interruptIdleWorkers() {
          interruptIdleWorkers(false);
      }
      
      private void interruptIdleWorkers(boolean onlyOne) {
          final ReentrantLock mainLock = this.mainLock;
          mainLock.lock();
          try {
              for (Worker w : workers) {
                  Thread t = w.thread;
                  if (!t.isInterrupted() && w.tryLock()) {
                      try {
                          t.interrupt();
                      } catch (SecurityException ignore) {
                      } finally {
                          w.unlock();
                      }
                  }
                  if (onlyOne)
                      break;
              }
          } finally {
              mainLock.unlock();
          }
      }
      

      interruptIdleWorkers含義是為空閑的線程設置中斷標識,這里要清楚worker什么時候空閑?我們在上面講解runWorker()方法時,執行task.run()之前,要針對Worker對象加鎖,設置Worker中的state值為1,防止運行的worker被添加中斷標識。接著執行getTask()方法,獲取阻塞隊列中的任務,如果是queue.take()則會阻塞掛起當前線程,釋放鎖,此時線程處于空閑狀態。如果是queue.pool()返回為空,runWorker()會釋放鎖,此時線程也是空閑狀態。

      執行interrupt()后處于queue阻塞的線程,會被喚醒,喚醒后,進入下一次自旋判斷線程池狀態是否改變,如果改變可能直接返回空,這里具體參看runWorker()getTask()方法。

      onShutdown()也是一個擴展方法,需要子類去重寫,這里代表當線程池關閉后需要做的事情。drainQueue()方法是獲取workQueue中現有的的任務列表。

      問題回顧

      1. ThreadPoolExecutor中常用參數有哪些?
        上面介紹過了,參見的參數是指ThreadPoolExecutor的構造參數,一般面試的時候都會先問這個,要解釋每個參數的含義及作用。

      2. ThreadPoolExecutor中線程池狀態和線程數量如何存儲的?
        通過AtomicInteger類型的變量ctl來存儲,前3位代表線程池狀態,后29位代表線程池中線程數量。

      3. ThreadPoolExecutor有哪些狀態,狀態之間流轉是什么樣子的?
        RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED
        線程池的狀態流轉.png

      4. ThreadPoolExecutor任務處理策略?
        這個問題就是考察execute()的執行過程,只要看過源碼就不會有問題。
        執行流程.png

      5. ThreadPoolExecutor常用的拒絕策略有哪些?
        策略處理該任務,線程池提供了4種策略:
        1)AbortPolicy:直接拋出異常,默認策略
        2)CallerRunsPolicy:用調用者所在的線程來執行任務
        3)DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,并執行當前任務
        4)DiscardPolicy:直接丟棄任務
        當然線程池是支持自定義拒絕策略的,需要實現RejectedExecutionHandler接口中rejectedExecution()方法即可。

      6. Executors工具類提供的線程池有哪些?有哪些缺陷?
        1) FixedThreadPool 和 SingleThreadPool:允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。
        2) CachedThreadPool:允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM。
        所以阿里巴巴也建議我們要自定義線程池核心線程數以及阻塞隊列的長度。

      7. ThreadPoolExecutor核心線程池中線程預熱功能?
        在創建線程池后,可以使用prestartAllCoreThreads()來預熱核心線程池。

        public int prestartAllCoreThreads() {
            int n = 0;
            while (addWorker(null, true))
                ++n;
            return n;
        }
        
      8. ThreadPoolExecutor中創建的線程如何被復用的?
        這個主要是看runWorker()和getTask()兩個方法的執行流程,當執行任務時調用runWorker()方法,執行完成后會繼續從workQueue中獲取任務繼續執行,已達到線程復用的效果,當然這里還有一些細節,可以回頭看看上面的源碼解析。

      9. ThreadPoolExecutor中關閉線程池的方法shutdownshutdownNow的區別?
        最大的區別就是shutdown()會將線程池狀態變為SHUTDOWN,此時新任務不能被提交,workQueue中還存有的任務可以繼續執行,同時會像線程池中空閑的狀態發出中斷信號。
        shutdownNow()方法是將線程池的狀態設置為STOP,此時新任務不能被提交,線程池中所有線程都會收到中斷的信號。如果線程處于wait狀態,那么中斷狀態會被清除,同時拋出InterruptedException。

      10. ThreadPoolExecutor中存在的一些擴展點?
        鉤子方法:
        1)beforeExecute()/afterExecute():runWorker()中線程執行前和執行后會調用的鉤子方法
        2)terminated:線程池的狀態從TIDYING狀態流轉為TERMINATED狀態時terminated方法會被調用的鉤子方法。
        3)onShutdown:當我們執行shutdown()方法時預留的鉤子方法。

      11. ThreadPoolExecutor支持動態調整核心線程數、最大線程數、隊列長度等一些列參數嗎?怎么操作?
        運行期間可動態調整參數的方法:
        1)setCorePoolSize():動態調整線程池核心線程數
        2)setMaximumPoolSize():動態調整線程池最大線程數
        3)setKeepAliveTime(): 空閑線程存活時間,如果設置了allowsCoreThreadTimeOut=true,核心線程也會被回收,默認只回收非核心線程
        4)allowsCoreThreadTimeOut():是否允許回收核心線程,如果是true,在getTask()方法中,獲取workQueue就采用workQueue.poll(keepAliveTime),如果超過等待時間就會被回收。

      總結

      這篇線程池源碼覆蓋到了ThreadPoolExecutor中大部分代碼,我相信認真閱讀完后肯定會對線程池有更深刻的理解。如有疑問或者建議可關注公眾號給我私信,我都會一一為大家解答。

      另外推薦一個我的up主朋友,他自己錄制了好多學習視頻并分享在B站上了,大家有時間可以看一下(PS:非恰飯非利益相關,良心推薦):小劉講源碼-B站UP主

      原創干貨分享.png

      posted @ 2020-05-24 07:48  一枝花算不算浪漫  閱讀(2022)  評論(4)    收藏  舉報
      主站蜘蛛池模板: 亚洲乱妇老熟女爽到高潮的片| 巨爆乳中文字幕爆乳区| 无码AV中文字幕久久专区| 1精品啪国产在线观看免费牛牛| 成人精品区| 亚洲色帝国综合婷婷久久| 丰满少妇在线观看网站| 国产精品一区二区三区污| 亚洲最大有声小说AV网| 久久久天堂国产精品女人| 荡乳尤物h| 国产欧美另类久久久精品不卡| 无码AV无码天堂资源网影音先锋| 国产欧美日韩视频一区二区三区 | 猫咪AV成人永久网站在线观看| 亚洲欧美偷国产日韩| 国产精品亚洲一区二区z| 中文字幕无码免费久久9一区9 | 久久久久成人精品免费播放动漫| 欧美高清一区三区在线专区| 亚洲色婷婷久久精品av蜜桃久久 | 午夜成年男人免费网站| 青青草一区二区免费精品| 国产精品入口麻豆| 亚洲熟女精品一区二区| 亚洲综合无码一区二区三区不卡| 亚洲中文字幕一区二区| 精品久久久久久无码不卡| AV在线亚洲欧洲日产一区二区| 狠狠色丁香婷婷综合久久来来去 | 最新的国产成人精品2022| 综合色综合色综合色综合| 国产亚洲精品自在久久vr| 无码免费大香伊蕉在人线国产| 福利一区二区1000| 粗壮挺进人妻水蜜桃成熟| 中文字幕免费不卡二区| 日韩精品永久免费播放平台| 精品素人AV无码不卡在线观看| 99re6这里有精品热视频 | 国产精品天干天干综合网|