<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      JAVA JUC干貨之線程池實(shí)現(xiàn)原理和源碼詳解(下)

      摘要:分享JAVA JUC線程池干貨,首先描述線程池的基本概念,然后介紹線程工廠和拒絕策略,其次逐步深入線程池實(shí)現(xiàn)原理和線程池狀態(tài)機(jī),最后結(jié)合實(shí)戰(zhàn)講解源碼。

      JUC干貨系列目錄:

      1. JAVA JUC干貨之線程池狀態(tài)和狀態(tài)切換

      2. JAVA JUC干貨之線程池實(shí)現(xiàn)原理和源碼詳解(上)

      3. JAVA JUC干貨之線程池實(shí)現(xiàn)原理和源碼詳解(下)

      ??我在《JAVA JUC干貨之線程池實(shí)現(xiàn)原理和源碼詳解(上)》中介紹了線程池的基本功能,本文分享線程池實(shí)現(xiàn)原理,并結(jié)合案例詳解線程池源碼。

      線程池設(shè)計(jì)思路

      ??本節(jié)主要參考文章《Java 線程池詳解,圖文并茂,還有誰(shuí)不會(huì)》,感興趣的讀者可以去看看。有句話叫做藝術(shù)來(lái)源于生活,編程語(yǔ)言也是如此,很多設(shè)計(jì)思想能映射到日常生活中,比如封裝、繼承和抽象等等。今天我們要說(shuō)的線程池,它同樣可以在現(xiàn)實(shí)世界找到對(duì)應(yīng)的實(shí)體——工廠。先假想一個(gè)工廠的生產(chǎn)流程:


      工廠生產(chǎn)流程

      ??假如有一個(gè)工廠,工廠中有固定的一批工人,稱為正式員工,每個(gè)正式員工同一時(shí)刻只能做一個(gè)任務(wù),由這些正式員工協(xié)作完成工廠接收的訂單,每個(gè)訂單拆分為很多子任務(wù)。一般來(lái)說(shuō)工廠完成訂單的效率與工人的數(shù)量成正相關(guān),在訂單量激增時(shí),必定出現(xiàn)正式員工忙不過(guò)來(lái)的場(chǎng)景,工廠只能臨時(shí)將新訂單的生產(chǎn)原料存放在倉(cāng)庫(kù)中,等有空閑的正式員工時(shí)再處理。調(diào)度員負(fù)責(zé)實(shí)時(shí)調(diào)度空閑工人處理倉(cāng)庫(kù)中的生產(chǎn)任務(wù)。倉(cāng)庫(kù)爆倉(cāng)后,訂單還在增加怎么辦?只能臨時(shí)擴(kuò)招一批工人來(lái)應(yīng)對(duì)生產(chǎn)高峰,而這批工人在高峰結(jié)束后,由于訂單增長(zhǎng)的速度放緩慢了,是要辭掉的,所以稱為臨時(shí)工。當(dāng)臨時(shí)工也招滿后(受限于工位數(shù)量有上限),只能忍痛拒絕后面再來(lái)的訂單了。我們做如下一番映射:

      工廠 線程池
      訂單 任務(wù)
      正式員工 核心線程
      臨時(shí)工 非核心線程
      倉(cāng)庫(kù) 阻塞隊(duì)列
      工位數(shù)量 最大線程數(shù)
      調(diào)度員 getTask(),將任務(wù)隊(duì)列中的任務(wù)調(diào)度給空閑線程

      ??強(qiáng)調(diào)一下,線程池的工作線程是沒(méi)有屬性記錄是核心線程還是非核心線程的,但根據(jù)核心線程數(shù)和最大線程數(shù)這兩個(gè)屬性歸類,方便大家理解;工作線程無(wú)論什么時(shí)候被創(chuàng)建,都存在被回收的可能,下文將詳細(xì)解釋。分析工廠-線程池映射表后,可以得到如下線程池流程圖,兩者是不是有異曲同工之妙?


      工廠-線程池流程映射圖

      ??這樣,線程池的工作原理或者說(shuō)流程就很好理解了,在下一章詳細(xì)介紹。

      線程池實(shí)現(xiàn)原理

      ??線程池的執(zhí)行原理是通過(guò)循環(huán)地從任務(wù)隊(duì)列中取出任務(wù),然后將任務(wù)分配給空閑的工作線程執(zhí)行。當(dāng)任務(wù)隊(duì)列為空時(shí),線程池會(huì)進(jìn)入等待狀態(tài),直到有新的任務(wù)到來(lái)。線程池還提供設(shè)置線程數(shù)和任務(wù)隊(duì)列長(zhǎng)度的能力,以控制并發(fā)線程的數(shù)量。

      ??下面我們進(jìn)入正題,看一下在線程池中一個(gè)任務(wù)從提交到最終執(zhí)行完畢經(jīng)歷了哪些過(guò)程。

      ??敲黑板劃重點(diǎn),用一句話簡(jiǎn)述ThreadPoolExecutor線程池實(shí)現(xiàn)原理:首先創(chuàng)建核心線程,再把任務(wù)放入阻塞隊(duì)列,其次創(chuàng)建非核心線程,最后拋棄任務(wù)。詳細(xì)流程圖如下:


      線程池實(shí)現(xiàn)原理

      ??從執(zhí)行流程可知,屬性判斷順序如下:corePoolSize -> workQueue -> maxinumPoolSize線程池執(zhí)行流程歸納為如下:

      1.預(yù)熱核心線程:提交新任務(wù)后,如果線程數(shù)沒(méi)有達(dá)到核心線程數(shù) corePoolSize,則創(chuàng)建核心線程執(zhí)行新任務(wù),而且空閑的核心線程處于阻塞狀態(tài)不執(zhí)行新任務(wù)直到隊(duì)列中有任務(wù)。即線程池優(yōu)先填滿corePoolSize個(gè)核心線程,再?gòu)?fù)用核心線程處理任務(wù)。如果線程數(shù)達(dá)到核心線程數(shù),則進(jìn)入下個(gè)流程。

      2.任務(wù)入隊(duì):如果線程數(shù)達(dá)到核心線程數(shù),且任務(wù)隊(duì)列未滿,則把新任務(wù)放入阻塞隊(duì)列末尾;這個(gè)場(chǎng)景下,核心線程執(zhí)行完當(dāng)前任務(wù)后自動(dòng)從任務(wù)隊(duì)列中獲取任務(wù)來(lái)執(zhí)行。如果任務(wù)隊(duì)列已滿,則進(jìn)入下個(gè)流程。

      3.創(chuàng)建非核心線程:如果任務(wù)隊(duì)列爆滿而且最大線程數(shù) maximumPoolSize > 線程數(shù)poolSize > corePoolSize,則創(chuàng)建新的工作線程(一般視作非核心線程)并立刻執(zhí)行當(dāng)前的新任務(wù);溫馨提示,這個(gè)場(chǎng)景下新任務(wù)早于隊(duì)列中的任務(wù)執(zhí)行。如果線程數(shù)達(dá)到最大線程數(shù),則進(jìn)入執(zhí)行拒絕任務(wù)的流程。

      非核心線程數(shù)為 maximumPoolSize - corePoolSize,即為最大線程數(shù)減去核心線程數(shù)。

      4.拒絕任務(wù):如果線程數(shù)等于最大線程數(shù)且任務(wù)隊(duì)列已滿,則根據(jù)拒絕策略處理新任務(wù)。

      5.復(fù)用線程:線程執(zhí)行完任務(wù)后去檢查任務(wù)隊(duì)列里是否有任務(wù)需要執(zhí)行,若有則馬上執(zhí)行;否則,進(jìn)入阻塞狀態(tài)。

      6.銷毀線程:線程如果無(wú)事可做的時(shí)間超過(guò)存活時(shí)間keepAliveTime,那么就會(huì)被回收,以便減少內(nèi)存占用和資源消耗,實(shí)現(xiàn)對(duì)系統(tǒng)資源的調(diào)優(yōu)。如果調(diào)用了allowCoreThreadTimeOut(true)方法,則會(huì)給核心線程數(shù)設(shè)置存活時(shí)間,使得超過(guò)keepAliveTime的核心線程也會(huì)被銷毀,從而最終有可能導(dǎo)致線程池中的線程數(shù)為0。

      ??通常情況下,隨著阻塞隊(duì)列中任務(wù)數(shù)的減少,線程數(shù)最終會(huì)收縮到 corePoolSize 的大小,故線程數(shù)維持在corePoolSize和maximumPoolSize之間。這就是線程池動(dòng)態(tài)調(diào)整線程數(shù)的過(guò)程。一旦線程數(shù)降低到核心線程數(shù),就會(huì)暫停回收工作,這是為什么呢?因?yàn)榫€程池為了保證穩(wěn)定性,必須預(yù)留一定規(guī)模的“即戰(zhàn)力”。

      ??有些人認(rèn)為線程池會(huì)把每個(gè)工作線程標(biāo)記為核心線程或者非核心線程,然后空閑時(shí)定向回收非核心的工作線程。事實(shí)不是這樣的,線程沒(méi)有核心線程或者非核心線程的標(biāo)記,在線程池銷毀線程時(shí)對(duì)所有線程一視同仁,即無(wú)論哪個(gè)工作線程,只要同時(shí)符合如下兩個(gè)條件,都存在被銷毀的可能性:

      + 總線程數(shù)超過(guò)corePoolSize(工人多了,這是大前提)
      + 線程不處理任務(wù)的時(shí)間超過(guò)存活時(shí)間keepAliveTime(keepAliveTime時(shí)間內(nèi)都沒(méi)有產(chǎn)出)
      

      ??通俗地說(shuō),線程池“降本”時(shí)銷毀的一定是在【創(chuàng)建非核心線程】階段“新建”的第二批工作線程嗎?事實(shí)并非如此,線程池展現(xiàn)出了狼性般的敏銳與果斷,“裁員”的底層邏輯是只看上述兩條鐵律,誰(shuí)達(dá)標(biāo)就優(yōu)化誰(shuí)。

      ??乍一看這種實(shí)現(xiàn)原理似乎有點(diǎn)丈二和尚摸不著頭腦,但“為了快、想追求經(jīng)濟(jì)、最后發(fā)現(xiàn)還是要快”三個(gè)階段的構(gòu)思調(diào)整在我看來(lái)恰恰是一種鬼斧神工般的技藝,頗有點(diǎn)博弈的氣概:初次接業(yè)務(wù),果斷招聘;業(yè)務(wù)繁忙時(shí),如果隊(duì)列能頂住,勤儉持家點(diǎn)不好嗎?如果隊(duì)列被打爆頂不住了,說(shuō)明真到拼刺刀的時(shí)刻了,已顧不得太多,把全部家底都拿出來(lái)唄!

      ??這一系列轉(zhuǎn)變的底層支撐正是線程池的“臨時(shí)工機(jī)制”:業(yè)務(wù)太多忙不過(guò)來(lái)時(shí),就新增線程提效,等高峰過(guò)去以后,再銷毀線程降本。

      ??這里有個(gè)細(xì)節(jié)值得大家品味,在線程池里的線程數(shù)小于核心線程數(shù)的前提下提交任務(wù)的時(shí)候,雖然核心線程因任務(wù)隊(duì)列空空如也而可能處于閑置狀態(tài),但是依然會(huì)繼續(xù)創(chuàng)建核心線程,而非復(fù)用已有的、空閑的核心線程。

      ThreadPoolExecutor 示例

      ??下面這個(gè)使用execute提交任務(wù)的示例用于演示線程池執(zhí)行流程:

      import com.google.common.util.concurrent.ThreadFactoryBuilder;
      
      import java.util.concurrent.*;
      import java.util.concurrent.atomic.AtomicInteger;
      
      /**
       * @Author 樓蘭胡楊
       * @Date 2025-05-16
       * @Description: 通過(guò)調(diào)整for循環(huán)次數(shù)驗(yàn)證線程池執(zhí)行機(jī)制
       */
      public class ThreadPoolTest {
          public static ThreadPoolExecutor executor;
      
          public static void main(String[] args) {
              ThreadFactory guavaFactory = new ThreadFactoryBuilder().setNameFormat("pool-Wiener-%d").build();
              // 自定義線程工廠
              ThreadFactory jucFactory = new ThreadFactory() {
                  private final AtomicInteger mThreadNum = new AtomicInteger(1);
      
                  @Override
                  public Thread newThread(Runnable r) {
                      return new Thread(r, "Wiener-" + mThreadNum.getAndIncrement());
                  }
              };
              RejectionImpl rejection = new RejectionImpl();
              executor = new ThreadPoolExecutor(2, 5, 200, TimeUnit.MILLISECONDS,
                      new ArrayBlockingQueue<>(10), guavaFactory, rejection);
              for (int i = 0; i < 25; i++) {
                  MyPoolTask myTask = new MyPoolTask(i);
                  executor.execute(myTask);
      
              }
              executor.shutdown();
          }
      
      }
      
      class MyPoolTask implements Runnable {
          private int taskNum;
          ThreadPoolExecutor myExe = ThreadPoolTest.executor;
      
          public MyPoolTask(int num) {
              this.taskNum = num;
          }
      
          @Override
          public void run() {
      //        System.out.println("正在執(zhí)行task " + taskNum);
              System.out.println(Thread.currentThread().getName() + ",活躍線程數(shù)目:" + myExe.getPoolSize()
                      + ",隊(duì)列長(zhǎng)度:" +
                      myExe.getQueue().size()
                      + ",當(dāng)前任務(wù)是" + taskNum + ",已執(zhí)行任務(wù)數(shù)目:" + myExe.getCompletedTaskCount());
              try {
                  Thread.currentThread().sleep(300);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              System.out.println("執(zhí)行完畢task " + taskNum);
          }
      
          @Override
          public String toString() {
              return taskNum + "";
          }
      }
      
      class RejectionImpl implements RejectedExecutionHandler {
          @Override
          public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
              System.out.println("被拒絕任務(wù)是 " + r.toString());
          }
      }
      

      ??執(zhí)行結(jié)果如下:

      pool-Wiener-1,活躍線程數(shù)目:3,隊(duì)列長(zhǎng)度:10,當(dāng)前任務(wù)是1,已執(zhí)行任務(wù)數(shù)目:0
      pool-Wiener-0,活躍線程數(shù)目:2,隊(duì)列長(zhǎng)度:9,當(dāng)前任務(wù)是0,已執(zhí)行任務(wù)數(shù)目:0
      pool-Wiener-2,活躍線程數(shù)目:3,隊(duì)列長(zhǎng)度:10,當(dāng)前任務(wù)是12,已執(zhí)行任務(wù)數(shù)目:0
      pool-Wiener-4,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:10,當(dāng)前任務(wù)是14,已執(zhí)行任務(wù)數(shù)目:0
      pool-Wiener-3,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:10,當(dāng)前任務(wù)是13,已執(zhí)行任務(wù)數(shù)目:0
      被拒絕任務(wù)是 15
      被拒絕任務(wù)是 16
      被拒絕任務(wù)是 17
      被拒絕任務(wù)是 18
      被拒絕任務(wù)是 19
      被拒絕任務(wù)是 20
      被拒絕任務(wù)是 21
      被拒絕任務(wù)是 22
      被拒絕任務(wù)是 23
      被拒絕任務(wù)是 24
      執(zhí)行完畢task 0
      執(zhí)行完畢task 14
      pool-Wiener-0,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:9,當(dāng)前任務(wù)是2,已執(zhí)行任務(wù)數(shù)目:1
      執(zhí)行完畢task 1
      pool-Wiener-4,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:8,當(dāng)前任務(wù)是3,已執(zhí)行任務(wù)數(shù)目:2
      執(zhí)行完畢task 12
      pool-Wiener-2,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:6,當(dāng)前任務(wù)是5,已執(zhí)行任務(wù)數(shù)目:4
      pool-Wiener-1,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:7,當(dāng)前任務(wù)是4,已執(zhí)行任務(wù)數(shù)目:3
      執(zhí)行完畢task 13
      pool-Wiener-3,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:5,當(dāng)前任務(wù)是6,已執(zhí)行任務(wù)數(shù)目:5
      執(zhí)行完畢task 3
      pool-Wiener-4,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:4,當(dāng)前任務(wù)是7,已執(zhí)行任務(wù)數(shù)目:6
      執(zhí)行完畢task 2
      執(zhí)行完畢task 4
      pool-Wiener-0,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:3,當(dāng)前任務(wù)是8,已執(zhí)行任務(wù)數(shù)目:7
      執(zhí)行完畢task 6
      pool-Wiener-1,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:2,當(dāng)前任務(wù)是9,已執(zhí)行任務(wù)數(shù)目:8
      執(zhí)行完畢task 5
      pool-Wiener-3,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:1,當(dāng)前任務(wù)是10,已執(zhí)行任務(wù)數(shù)目:9
      pool-Wiener-2,活躍線程數(shù)目:5,隊(duì)列長(zhǎng)度:0,當(dāng)前任務(wù)是11,已執(zhí)行任務(wù)數(shù)目:10
      執(zhí)行完畢task 9
      執(zhí)行完畢task 8
      執(zhí)行完畢task 10
      執(zhí)行完畢task 11
      執(zhí)行完畢task 7
      

      ??從執(zhí)行結(jié)果日志可以看出,當(dāng)線程池中線程的數(shù)目大于5時(shí),便將任務(wù)放入任務(wù)緩存隊(duì)列里面,當(dāng)任務(wù)緩存隊(duì)列滿了之后,便創(chuàng)建新的線程。如果上面程序for循環(huán)中,把創(chuàng)建的任務(wù)數(shù)從25個(gè)降低到15個(gè),就不會(huì)拋出任務(wù)被拒絕的異常了。

      ThreadPoolExecutor 源碼詳解

      ??程序猿為什么需要讀源碼?閱讀源碼是程序猿成長(zhǎng)過(guò)程中非常重要的一個(gè)環(huán)節(jié)。這就像學(xué)習(xí)語(yǔ)言時(shí)需要閱讀經(jīng)典文學(xué)作品一樣,通過(guò)閱讀源碼,程序員可以學(xué)到很多優(yōu)秀的編程技巧和最佳實(shí)踐;可以了解多樣化的技術(shù)選型方案,培養(yǎng)架構(gòu)設(shè)計(jì)敏感度,為技術(shù)決策提供參考依據(jù)。具體來(lái)說(shuō),讀源碼有以下幾個(gè)好處:

      ??深入了解工具庫(kù)或框架:當(dāng)你使用第三方庫(kù)或者框架時(shí),深入其內(nèi)部結(jié)構(gòu)有助于更好地掌握它們的功能以及局限性,從而更高效地運(yùn)用到自己的項(xiàng)目中去。

      ??增強(qiáng)業(yè)務(wù)能力和團(tuán)隊(duì)協(xié)作能力:閱讀和理解其他同事敲的代碼可以使人深度認(rèn)識(shí)項(xiàng)目,精準(zhǔn)了解業(yè)務(wù)邏輯,還能促進(jìn)成員間的交流與合作。

      ??面試準(zhǔn)備:在求職過(guò)程中,了解常見(jiàn)開(kāi)源框架的源碼實(shí)現(xiàn)可以幫助你在面試環(huán)節(jié)中脫穎而出,因?yàn)樵S多公司會(huì)考察候選人對(duì)這些技術(shù)細(xì)節(jié)的理解程度。

      ??拓展技術(shù)視野:接觸不同項(xiàng)目的源碼(如微服務(wù)框架、分布式系統(tǒng))可了解多樣化的技術(shù)選型方案,培養(yǎng)架構(gòu)設(shè)計(jì)敏感度,為技術(shù)決策提供參考依據(jù)。

      ??總之,對(duì)于任何有志于提升自身技術(shù)水平的程序員而言,定期花時(shí)間去研究高質(zhì)量的源碼是一項(xiàng)不可或缺的學(xué)習(xí)活動(dòng)。你對(duì)哪些源碼感興趣?可以評(píng)論區(qū)留言,一起品嘗。下面淺談我對(duì) ThreadPoolExecutor 源碼的理解,不當(dāng)之處還請(qǐng)?jiān)谠u(píng)論區(qū)留言,一起翻越ThreadPoolExecutor這座山。

      execute提交任務(wù)源碼分析

      ??在ThreadPoolExecutor類中,最核心的任務(wù)提交方法是execute(Runnable command)方法,雖然通過(guò)submit也可以提交任務(wù),但是submit方法底層實(shí)現(xiàn)中最終調(diào)用的還是execute,所以我們只需要分析execute的源碼即可。在JDK 21中,execute(Runnable command)的源代碼如下所示,已經(jīng)添加中文注釋,同時(shí)保留了原汁原味的英文注釋:

          /**
           * Executes the given task sometime in the future.  The task
           * may execute in a new thread or in an existing pooled thread.
           *
           * If the task cannot be submitted for execution, either because this
           * executor has been shutdown or because its capacity has been reached,
           * the task is handled by the current {@link RejectedExecutionHandler}.
           *
           * @param command the task to execute
           * @throws RejectedExecutionException at discretion of
           *         {@code RejectedExecutionHandler}, if the task
           *         cannot be accepted for execution
           * @throws NullPointerException if {@code command} is null
           */
          public void execute(Runnable command) {
              // 判斷提交的任務(wù)是否為null
              if (command == null)
                  throw new NullPointerException();
              /*
               * Proceed in 3 steps(執(zhí)行過(guò)程可以分為如下三個(gè)步驟):
               *
               * 1. If fewer than corePoolSize threads are running, try to
               * start a new thread with the given command as its first
               * task.  The call to addWorker atomically checks runState and
               * workerCount, and so prevents false alarms that would add
               * threads when it shouldn't, by returning false.
               *
               * 2. If a task can be successfully queued, then we still need
               * to double-check whether we should have added a thread
               * (because existing ones died since last checking) or that
               * the pool shut down since entry into this method. So we
               * recheck state and if necessary roll back the enqueuing if
               * stopped, or start a new thread if there are none.
               *
               * 3. If we cannot queue task, then we try to add a new
               * thread.  If it fails, we know we are shut down or saturated
               * and so reject the task.
               */
              // 可以將 ctl 理解成保存了線程數(shù)和運(yùn)行狀態(tài)等信息的變量
              int c = ctl.get();
              // 判斷工作線程總數(shù)是否小于核心線程數(shù),小于的時(shí)候可以添加核心線程,并且將當(dāng)前任務(wù)作為此線程執(zhí)行的第一個(gè)任務(wù)
              if (workerCountOf(c) < corePoolSize) {
                 //新增的核心線程成功執(zhí)行任務(wù),流程結(jié)束;addWorker第二個(gè)參數(shù)為true,表示根據(jù)核心線程數(shù)判斷線程數(shù)量
                  if (addWorker(command, true))
                      return;
                // addWorker執(zhí)行報(bào)錯(cuò),需要再次檢查變量值
                  c = ctl.get();
              }
              //判斷線程池的狀態(tài)是否正常而且任務(wù)放入任務(wù)隊(duì)列是否正常
              if (isRunning(c) && workQueue.offer(command)) {
                 // 同理,為了防止向任務(wù)隊(duì)列中又提交新的任務(wù)造成錯(cuò)誤,再次更新變量值
                  int recheck = ctl.get();
                  if (! isRunning(recheck) && remove(command))
                     //當(dāng)前線程池處于非運(yùn)行狀態(tài)且將剛添加的任務(wù)成功從任務(wù)隊(duì)列移除,執(zhí)行拒絕策略
                      reject(command);
                  else if (workerCountOf(recheck) == 0)
                     // 線程數(shù)為0,第一個(gè)參數(shù)為null,表示創(chuàng)建線程但不啟動(dòng)
                      addWorker(null, false);
             //線程數(shù)超過(guò)核心線程數(shù)而且隊(duì)列已滿,需要新增非核心線程
              } else if (!addWorker(command, false))
                  //工作線程已經(jīng)達(dá)到了最大線程數(shù)閾值,或者是調(diào)用了關(guān)閉線程池的方法,觸發(fā)拒絕策略
                  reject(command);
          }
      

      ??假設(shè)線程數(shù)達(dá)到核心線程數(shù)且隊(duì)列未滿,這時(shí)新提交的任務(wù)可以直接復(fù)用空閑的核心線程嗎?從源碼來(lái)看,新提交的任務(wù)需要放入任務(wù)隊(duì)列,空閑的核心線程是從任務(wù)隊(duì)列拿任務(wù)。

      ??線程池的本質(zhì)是對(duì)任務(wù)和線程的管理,而做到這一點(diǎn)最關(guān)鍵的思想就是借助生產(chǎn)者消費(fèi)者模式將任務(wù)和線程兩者解耦,不讓兩者直接綁定,方便做后續(xù)的工作分配。在隊(duì)列為空時(shí),線程會(huì)等待任務(wù)進(jìn)入隊(duì)列;當(dāng)隊(duì)列不滿時(shí),任務(wù)會(huì)等待線程來(lái)執(zhí)行。

      ??從上述源碼不難發(fā)現(xiàn),execute 方法多次通過(guò) addWorker 方法來(lái)添加線程處理任務(wù),故我們下面來(lái)閱讀 addWorker 方法的源碼。

      addWorker創(chuàng)建線程源碼詳解

          /**
           * Set containing all worker threads in pool. 線程池中存放線程的集合,維護(hù)一組Worker對(duì)象
           * Accessed only when holding mainLock.
           */
          private final HashSet<Worker> workers = new HashSet<>();
          
          /*
           * Methods for creating, running and cleaning up after workers
           */
      
          /**
           * Checks if a new worker can be added with respect to current
           * pool state and the given bound (either core or maximum). If so,
           * the worker count is adjusted accordingly, and, if possible, a
           * new worker is created and started, running firstTask as its
           * first task. This method returns false if the pool is stopped or
           * eligible to shut down. It also returns false if the thread
           * factory fails to create a thread when asked.  If the thread
           * creation fails, either due to the thread factory returning
           * null, or due to an exception (typically OutOfMemoryError in
           * Thread.start()), we roll back cleanly.
           *
           * @param firstTask the task the new thread should run first (or
           * null if none). Workers are created with an initial first task
           * (in method execute()) to bypass queuing when there are fewer
           * than corePoolSize threads (in which case we always start one),
           * or when the queue is full (in which case we must bypass queue).
           * Initially idle threads are usually created via
           * prestartCoreThread or to replace other dying workers.
           *
           * @param core if true use corePoolSize as bound, else
           * maximumPoolSize. (A boolean indicator is used here rather than a
           * value to ensure reads of fresh values after checking other pool
           * state).
           * @return true if successful
           */
          private boolean addWorker(Runnable firstTask, boolean core) {
              // 外層循環(huán):判斷線程池狀態(tài)
              retry:
              for (int c = ctl.get();;) {
                  // Check if queue empty only if necessary.
                  if (runStateAtLeast(c, SHUTDOWN)
                      && (runStateAtLeast(c, STOP)
                          || firstTask != null
                          || workQueue.isEmpty()))
                      // 線程池已經(jīng)被停止或者關(guān)閉,或者隊(duì)列為空,終止流程,返回 false
                      return false;
                  // 死循環(huán)
                  for (;;) {
                      // 檢查當(dāng)前工作線程數(shù)是否已經(jīng)達(dá)到核心線程數(shù)(core 為 true)或最大線程數(shù)(core 為 false)
                      if (workerCountOf(c)
                          >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                          // 達(dá)到核心線程數(shù)或最大線程數(shù),返回 false
                          return false;
                       //通過(guò)cas操作增加線程池的工作線程數(shù)量
                      if (compareAndIncrementWorkerCount(c))
                          // 成功原子地增加工作線程數(shù),跳出外層的 for 循環(huán)
                          break retry;
                      // 增加線程數(shù)量失敗,再次讀取 ctl 的值
                      c = ctl.get();  // Re-read ctl
                      // 如果當(dāng)前運(yùn)行狀態(tài)不等于
                      if (runStateAtLeast(c, SHUTDOWN))
                          // 線程池處于關(guān)閉狀態(tài),跳到外層循環(huán)重新執(zhí)行
                          continue retry;
                      // else CAS failed due to workerCount change; retry inner loop
                  }
              }
              /**
               * 線程數(shù)量+1成功的后續(xù)操作:添加新工作線程到工作線程集合,啟動(dòng)工作線程執(zhí)行firstTask
               */
              boolean workerStarted = false;
              boolean workerAdded = false;
              Worker w = null;
              try {
                  //添加新的Worker對(duì)象來(lái)處理任務(wù)firstTask
                  w = new Worker(firstTask);
                  //獲取 Worker 對(duì)象中的線程。Worker類是任務(wù)線程的包裝類,內(nèi)部封裝了一個(gè)Thread類型的變量
                  final Thread t = w.thread;
                  if (t != null) {
                      // 開(kāi)始加內(nèi)置鎖
                      final ReentrantLock mainLock = this.mainLock;
                      // 基于ReentrantLock的同步塊
                      mainLock.lock();
                      try {
                          // Recheck while holding lock.
                          // Back out on ThreadFactory failure or if
                          // shut down before lock acquired.
                          // 再次檢查狀態(tài),保證線程工廠沒(méi)有失敗或者在獲取鎖之前線程池沒(méi)有被關(guān)閉
                          int c = ctl.get();
                          //添加線程的前提條件:①線程處于運(yùn)行狀態(tài),②線程處于shutdown狀態(tài)并且firstTask為null
                          if (isRunning(c) ||
                              (runStateLessThan(c, STOP) && firstTask == null)) {
                              // 檢查線程狀態(tài),如果不是新建狀態(tài)就拋異常
                              if (t.getState() != Thread.State.NEW)
                                  throw new IllegalThreadStateException();
                              //workers是一個(gè)HashSet<Woker> 集合,往里面新增Worker類型的工作線程對(duì)象
                              workers.add(w);
                              // 把工作線程添加標(biāo)志置為 true
                              workerAdded = true;
                              int s = workers.size();
                              // largestPoolSize 表示線程池中出現(xiàn)過(guò)的最大線程數(shù)
                              if (s > largestPoolSize)
                                  // 更新線程池中線程數(shù)最大值
                                  largestPoolSize = s;
                          }
                      } finally {
                          // 釋放鎖
                          mainLock.unlock();
                      }
                      if (workerAdded) {
                          // 啟動(dòng)新創(chuàng)建的線程,執(zhí)行當(dāng)前任務(wù)firstTask
                          container.start(t); 
                          // 把線程啟動(dòng)標(biāo)志置為 true
                          workerStarted = true;
                      }
                  }
              } finally {
                  //判斷線程有沒(méi)有啟動(dòng)成功,如果沒(méi)有則調(diào)用addWorkerFailed方法
                  if (! workerStarted)
                      addWorkerFailed(w);
              }
              // 返回線程啟動(dòng)標(biāo)志
              return workerStarted;
          }
      

      ??我們來(lái)看看函數(shù)addWorker(Runnable firstTask, boolean core) 的入?yún)ⅰ?strong>firstTask:添加的新線程需要執(zhí)行的第一任務(wù),執(zhí)行完成之后才能執(zhí)行任務(wù)隊(duì)列中其它任務(wù);如果沒(méi)有任務(wù),則需要設(shè)置為null。core的值如果是true,則使用corePoolSize與線程數(shù)進(jìn)行比較,判斷線程數(shù)是否已經(jīng)達(dá)到核心線程數(shù);否則,使用maximumPoolSize判斷線程數(shù)是否已經(jīng)達(dá)到最大線程數(shù)。

      ??Worker類繼承了AbstractQueuedSynchronizer,實(shí)現(xiàn)了Runnable接口。線程池中,每一個(gè)線程都被封裝成一個(gè)Worker對(duì)象,ThreadPool維護(hù)的其實(shí)就是存放在變量HashSet workers中的一組Worker對(duì)象。Worker 在執(zhí)行完任務(wù)后,還會(huì)無(wú)限循環(huán)獲取工作隊(duì)列里的任務(wù)來(lái)執(zhí)行。我們選擇關(guān)鍵源碼簡(jiǎn)單看一下 Worker 類,它是 ThreadPoolExecutor 類的一個(gè)內(nèi)部類:

          private final class Worker extends AbstractQueuedSynchronizer implements Runnable
          {
              /** Thread this worker is running in.  Null if factory fails. 處理任務(wù)的線程 */
              @SuppressWarnings("serial") // Unlikely to be serializable
              final Thread thread;
              /** Initial task to run.  Possibly null.保存?zhèn)魅氲娜蝿?wù) */
              @SuppressWarnings("serial") // Not statically typed as Serializable
              Runnable firstTask;
              /** Per-thread task counter */
              volatile long completedTasks;
      
              // TODO: switch to AbstractQueuedLongSynchronizer and move
              // completedTasks into the lock word.
      
              /**
               * Creates with given first task and thread from ThreadFactory.
               * @param firstTask the first task (null if none)
               */
              Worker(Runnable firstTask) {
                  //設(shè)置AQS的同步狀態(tài)
                  setState(-1); // inhibit interrupts until runWorker
                  // 封裝任務(wù)
                  this.firstTask = firstTask;
                  // 新增執(zhí)行任務(wù)的線程,this表示線程,故Worker對(duì)象在啟動(dòng)的時(shí)候就會(huì)調(diào)用函數(shù)run
                  this.thread = getThreadFactory().newThread(this);
              }
              /** Delegates main run loop to outer runWorker. */
              public void run() {
                  runWorker(this);
              }
              protected boolean isHeldExclusively() {
                  return getState() != 0;
              }
              // 使用AQS實(shí)現(xiàn)獨(dú)占鎖,不允許重入
              protected boolean tryAcquire(int unused) {
                  // 使用CAS修改狀態(tài),不允許重入
                  if (compareAndSetState(0, 1)) {
                      setExclusiveOwnerThread(Thread.currentThread());
                      return true;
                  }
                  return false;
              }
          }
      

      ??Worker類中的函數(shù)run()的實(shí)現(xiàn)都在 runWorker() 方法中,故我們看一下runWorker()的源碼。

      runWorker執(zhí)行任務(wù)源碼詳解

         /**
           * Main worker run loop.  Repeatedly gets tasks from queue and
           * executes them, while coping with a number of issues:
           *
           * 1. We may start out with an initial task, in which case we
           * don't need to get the first one. Otherwise, as long as pool is
           * running, we get tasks from getTask. If it returns null then the
           * worker exits due to changed pool state or configuration
           * parameters.  Other exits result from exception throws in
           * external code, in which case completedAbruptly holds, which
           * usually leads processWorkerExit to replace this thread.
           *
           * 2. Before running any task, the lock is acquired to prevent
           * other pool interrupts while the task is executing, and then we
           * ensure that unless pool is stopping, this thread does not have
           * its interrupt set.
           *
           * 3. Each task run is preceded by a call to beforeExecute, which
           * might throw an exception, in which case we cause thread to die
           * (breaking loop with completedAbruptly true) without processing
           * the task.
           *
           * 4. Assuming beforeExecute completes normally, we run the task,
           * gathering any of its thrown exceptions to send to afterExecute.
           * We separately handle RuntimeException, Error (both of which the
           * specs guarantee that we trap) and arbitrary Throwables.
           * Because we cannot rethrow Throwables within Runnable.run, we
           * wrap them within Errors on the way out (to the thread's
           * UncaughtExceptionHandler).  Any thrown exception also
           * conservatively causes thread to die.
           *
           * 5. After task.run completes, we call afterExecute, which may
           * also throw an exception, which will also cause thread to
           * die. According to JLS Sec 14.20, this exception is the one that
           * will be in effect even if task.run throws.
           *
           * The net effect of the exception mechanics is that afterExecute
           * and the thread's UncaughtExceptionHandler have as accurate
           * information as we can provide about any problems encountered by
           * user code.
           *
           * @param w the worker
           */
          final void runWorker(Worker w) {
              Thread wt = Thread.currentThread();// 獲取當(dāng)前線程
              // 用創(chuàng)建線程時(shí)傳入的firstTask初始化任務(wù)
              Runnable task = w.firstTask;
              w.firstTask = null;// 清空 Worker 對(duì)象的第一個(gè)任務(wù)
              // 解鎖,允許中斷
              w.unlock(); // allow interrupts
              // 標(biāo)記線程是否因?yàn)楫惓M顺鲅h(huán),默認(rèn)為 true
              boolean completedAbruptly = true;
              try {
                  //循環(huán)獲取任務(wù),在getTask()返回null時(shí)跳出while循環(huán),且回收線程
                  // 如果firstTask為null,則使用getTask()從任務(wù)隊(duì)列取任務(wù)
                  while (task != null || (task = getTask()) != null) {
                      w.lock();
                      // If pool is stopping, ensure thread is interrupted;
                      // if not, ensure thread is not interrupted.  This
                      // requires a recheck in second case to deal with
                      // shutdownNow race while clearing interrupt
                      if ((runStateAtLeast(ctl.get(), STOP) ||
                           (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                          !wt.isInterrupted())
                          //當(dāng)前線程被打上中斷標(biāo)志
                          wt.interrupt();
                      try {
                          //執(zhí)行任務(wù)前的鉤子方法,讓繼承類做一些統(tǒng)計(jì)之類的事情
                          beforeExecute(wt, task);
                          try {
                              // 執(zhí)行任務(wù)
                              task.run();
                              // 執(zhí)行任務(wù)后的鉤子方法
                              afterExecute(task, null);
                          } catch (Throwable ex) {
                              afterExecute(task, ex);
                              throw ex;
                          }
                      } finally {
                          // 把執(zhí)行完的任務(wù)設(shè)置為null,從而觸發(fā)getTask()重新獲取任務(wù);增加任務(wù)數(shù),同時(shí)釋放鎖
                          task = null;
                          // 更新已完成的任務(wù)數(shù)
                          w.completedTasks++;
                          w.unlock();
                      }
                  }
                  // 標(biāo)記線程正常退出
                  completedAbruptly = false;
              } finally {
                  // 回收/銷毀線程
                  processWorkerExit(w, completedAbruptly);
              }
          }
      

      ??只要一直有新增任務(wù)或者能夠循環(huán)地使用getTask獲取任務(wù),線程就永遠(yuǎn)死不了。但是,如果getTask結(jié)果為null,就需要跳出while循環(huán),執(zhí)行processWorkerExit,銷毀線程;如果執(zhí)行processWorkerExit的入?yún)ompletedAbruptly=true,表示線程意外退出,需要減少計(jì)數(shù)。

      ??線程池的核心功能就是實(shí)現(xiàn)了線程的重復(fù)利用,那么它是怎么實(shí)現(xiàn)線程復(fù)用的呢?在這個(gè)方法中,while (task != null || (task = getTask()) != null)說(shuō)明線程會(huì)無(wú)限循環(huán)地【使用getTask函數(shù)從任務(wù)隊(duì)列中取任務(wù)】、【執(zhí)行任務(wù)】和【清空?qǐng)?zhí)行結(jié)束的任務(wù)】,直到取出的任務(wù)為null,此時(shí)線程池已經(jīng)關(guān)閉或者任務(wù)隊(duì)列為空,這就是線程能夠復(fù)用的主要原因。接下來(lái)就會(huì)跳出 while 循環(huán)進(jìn)入 finally 語(yǔ)句塊執(zhí)行processWorkerExit(),嘗試回收線程。這行while循環(huán)代碼也說(shuō)明如果隊(duì)列中沒(méi)有任務(wù)且核心線程數(shù)小于corePoolSize,則來(lái)任務(wù)時(shí)一定會(huì)創(chuàng)建核心線程,而已經(jīng)創(chuàng)建的核心線程哪怕一直無(wú)所事事也不會(huì)執(zhí)行新任務(wù)。

      ??到這里就介紹完線程池腳踏實(shí)地干活的Worker類了,簡(jiǎn)單歸納一下:worker對(duì)象同時(shí)封裝了一個(gè)任務(wù)Runnable firstTask和一個(gè)線程final Thread thread,它的創(chuàng)建依賴于線程狀態(tài),任務(wù)的執(zhí)行也是在worker里去處理。這里有個(gè)小技巧值得大家細(xì)細(xì)品味——在函數(shù)runWorker中,會(huì)把執(zhí)行完的任務(wù)及時(shí)設(shè)置為null但保留線程,從而觸發(fā)函數(shù)getTask()重新獲取任務(wù),實(shí)現(xiàn)線程復(fù)用。

      processWorkerExit回收線程源碼詳解

         /**
           * Performs cleanup and bookkeeping for a dying worker. Called
           * only from worker threads. Unless completedAbruptly is set,
           * assumes that workerCount has already been adjusted to account
           * for exit.  This method removes thread from worker set, and
           * possibly terminates the pool or replaces the worker if either
           * it exited due to user task exception or if fewer than
           * corePoolSize workers are running or queue is non-empty but
           * there are no workers.
           *
           * @param w the worker
           * @param completedAbruptly if the worker died due to user exception
           */
          private void processWorkerExit(Worker w, boolean completedAbruptly) {
              if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
                  decrementWorkerCount();
      
              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              try {
                  completedTaskCount += w.completedTasks;
                  // 將線程引用移出線程池
                  workers.remove(w);
              } finally {
                  mainLock.unlock();
              }
              // 回收線程
              tryTerminate();
      
              int c = ctl.get();
              if (runStateLessThan(c, STOP)) {
                  if (!completedAbruptly) {
                      int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                      if (min == 0 && ! workQueue.isEmpty())
                          min = 1;
                      if (workerCountOf(c) >= min)
                          return; // replacement not needed
                  }
                  addWorker(null, false);
              }
          }
      

      ??事實(shí)上,在這個(gè)processWorkerExit函數(shù)中,將線程引用移出線程池線程集合且調(diào)用了tryTerminate()就已經(jīng)回收了線程。但由于引起線程銷毀的可能性有很多,故線程池要判斷是什么觸發(fā)了這次銷毀,是否要改變線程池的現(xiàn)階段狀態(tài),是否要根據(jù)新?tīng)顟B(tài)重新分配線程。

      getTask獲取任務(wù)源碼詳解

      ??本節(jié),我們來(lái)分析線程怎么通過(guò) getTask() 方法從任務(wù)隊(duì)列中取出任務(wù)的。

          /**
           * Performs blocking or timed wait for a task, depending on
           * current configuration settings, or returns null if this worker
           * must exit because of any of:
           * 1. There are more than maximumPoolSize workers (due to
           *    a call to setMaximumPoolSize).
           * 2. The pool is stopped.
           * 3. The pool is shutdown and the queue is empty.
           * 4. This worker timed out waiting for a task, and timed-out
           *    workers are subject to termination (that is,
           *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
           *    both before and after the timed wait, and if the queue is
           *    non-empty, this worker is not the last thread in the pool.
           *
           * @return task, or null if the worker must exit, in which case
           *         workerCount is decremented
           */
          private Runnable getTask() {
              boolean timedOut = false; // Did the last poll() time out?
              // 死循環(huán),此方法要么返回 null,要么返回 Runnable 對(duì)象代表取到了任務(wù)
              for (;;) {
                  int c = ctl.get();
      
                  // Check if queue empty only if necessary.
                  if (runStateAtLeast(c, SHUTDOWN)
                      && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
                    //線程處于非運(yùn)行狀態(tài)時(shí),若滿足如下兩個(gè)條件之一,則線程數(shù)減1
                    // 1.線程處于 STOP 及以上(STOP、TIDYING、TERMINAL)的狀態(tài);2.任務(wù)隊(duì)列為空
                      decrementWorkerCount();
                      return null;
                  }
                  // 計(jì)算線程總數(shù)
                  int wc = workerCountOf(c);
      
                  // Are workers subject to culling? 可以銷毀線程嗎
                  // 變量timed用于判斷是否需要進(jìn)行超時(shí)控制
                  boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                 // 如下場(chǎng)景可以銷毀線程:①線程數(shù)大于最大線程數(shù);
                 // ②允許核心線程被回收且空閑時(shí)間超過(guò)存活時(shí)間;
                 // ③非核心線程空閑時(shí)間超過(guò)存活時(shí)間
                 // TODO wc可以為0嗎?
                  if ((wc > maximumPoolSize || (timed && timedOut))
                      && (wc > 1 || workQueue.isEmpty())) {
                      // 線程數(shù)減一成功時(shí)返回null,表名當(dāng)前線程可以被回收
                      if (compareAndDecrementWorkerCount(c))
                          return null;
                      continue;
                  }
      
                  try {
                  /**
                   *  從阻塞隊(duì)列中取Runnable類型的任務(wù)對(duì)象,
                   */
                      Runnable r = timed ?
                      // timed 為 true,調(diào)用poll方法,如果keepAliveTime納秒內(nèi)沒(méi)有拿到任務(wù),則返回null
                          workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                      // timed 為 false,調(diào)用take方法,此時(shí)線程永久阻塞直到有任務(wù)對(duì)象返回
                          workQueue.take();
                      if (r != null)
                          // 返回從阻塞隊(duì)列中取到的任務(wù),終止循環(huán)
                          return r;
                      // 線程沒(méi)有拿到任務(wù),標(biāo)記為已過(guò)期等待被銷毀
                      timedOut = true;
                  } catch (InterruptedException retry) {
                      //當(dāng)前線程在獲取任務(wù)時(shí)發(fā)生中斷,則設(shè)置為不過(guò)期并返回循環(huán)重試
                      timedOut = false;
                  }
              }
          }
      

      ??如果不允許核心線程超時(shí)(allowCoreThreadTimeOutfalse)且線程數(shù)不大于核心線程數(shù)(wc <= corePoolSize),則workQueue.take()操作會(huì)導(dǎo)致線程持續(xù)阻塞,避免線程被銷毀。這就是在【預(yù)熱線程】階段不能復(fù)用線程的原因。如果支持核心線程超時(shí)(allowCoreThreadTimeOuttrue)或者線程數(shù)大于核心線程數(shù)(wc > corePoolSize),則執(zhí)行workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)操作,若在keepAliveTime納秒內(nèi)沒(méi)有拿到任務(wù),將返回null。

      tryTerminate終止線程源碼詳解

      在《Java線程池狀態(tài)和狀態(tài)切換》中已經(jīng)介紹過(guò),通過(guò) shutdownNow 與 shutdown可以觸發(fā)線程池關(guān)閉流程,當(dāng)方法執(zhí)行完畢后,線程池將會(huì)進(jìn)入 STOP 或者 SHUTDOWN 狀態(tài)。但是此時(shí)線程池并未真正的被關(guān)閉,在runWorker方法最后的finally塊中,調(diào)用了processWorkerExit方法,其邏輯實(shí)現(xiàn)中調(diào)用了一個(gè) tryTerminate 方法,這個(gè)才是正在關(guān)閉線程池的鉤子方法。樓蘭胡楊本節(jié)和各位一起看看函數(shù)tryTerminate的源碼。

          /**
           * Transitions to TERMINATED state if either (SHUTDOWN and pool
           * and queue empty) or (STOP and pool empty).  If otherwise
           * eligible to terminate but workerCount is nonzero, interrupts an
           * idle worker to ensure that shutdown signals propagate. This
           * method must be called following any action that might make
           * termination possible -- reducing worker count or removing tasks
           * from the queue during shutdown. The method is non-private to
           * allow access from ScheduledThreadPoolExecutor.
           */
          final void tryTerminate() {
              for (;;) {
                  int c = ctl.get();
                  // 如果線程池不處于預(yù)停機(jī)狀態(tài),則不進(jìn)行停機(jī)
                  if (isRunning(c) ||
                      runStateAtLeast(c, TIDYING) ||
                      (runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
                      return;
                  if (workerCountOf(c) != 0) { // Eligible to terminate
                      // 當(dāng)前還有工作線程,不停機(jī)
                      interruptIdleWorkers(ONLY_ONE);
                      return;
                  }
                  // 線程處于預(yù)關(guān)閉狀態(tài),開(kāi)始關(guān)閉線程池
                  final ReentrantLock mainLock = this.mainLock;
                  mainLock.lock();
                  try {
                      // 嘗試通過(guò) CAS 將線程池狀態(tài)修改為 TIDYING
                      if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                          try {
                              terminated();
                          } finally {
                              // 嘗試通過(guò) CAS 將線程池狀態(tài)修改為 TERMINATED
                              ctl.set(ctlOf(TERMINATED, 0));
                              termination.signalAll();
                              container.close();
                          }
                          return;
                      }
                  } finally {
                      mainLock.unlock();
                  }
                  // else retry on failed CAS
              }
          }
      

      ??哦了,再來(lái)看看函數(shù) terminated(),是不是瞬間感覺(jué)很坑爹?它的方法體里面神!馬!也!沒(méi)!干!淡定,其實(shí)它是個(gè)鉤子方法,允許通過(guò)重寫(xiě)在線程池被終止時(shí)做一些特殊的業(yè)務(wù)邏輯,默認(rèn)的線程池沒(méi)有什么要做的事情,當(dāng)然也沒(méi)有必要寫(xiě)什么啦~

          /**
           * Method invoked when the Executor has terminated.  Default
           * implementation does nothing. Note: To properly nest multiple
           * overridings, subclasses should generally invoke
           * {@code super.terminated} within this method.
           */
          protected void terminated() { }
      

      創(chuàng)建多少線程比較合適

      ??要想合理的配置線程池線程數(shù),就必須首先分析任務(wù)特性,可以從以下幾個(gè)角度來(lái)進(jìn)行分析:

      • 任務(wù)的性質(zhì):CPU 密集型任務(wù),IO 密集型任務(wù)和混合型任務(wù);
      • 任務(wù)的優(yōu)先級(jí):高,中和低;
      • 任務(wù)的執(zhí)行時(shí)間
      • 任務(wù)的依賴關(guān)系:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫(kù)連接;

      ??上面都是需要考慮的因素,至于應(yīng)該創(chuàng)建多個(gè)線程,還是需要壓測(cè)的,單純考慮CPU 密集型任務(wù)和IO 密集型不合適。另外,建議任務(wù)性質(zhì)不同的任務(wù)用不同規(guī)模的線程池分開(kāi)處理,保證業(yè)務(wù)解耦。

      初始化線程池

      ??默認(rèn)情況下,創(chuàng)建線程池之后,線程池中是沒(méi)有線程的,需要提交任務(wù)之后才會(huì)創(chuàng)建線程。在實(shí)際中如果需要線程池創(chuàng)建之后立即創(chuàng)建線程,可以通過(guò)prestartCoreThreadhe 和 prestartAllCoreThreads兩個(gè)方法實(shí)現(xiàn),二者都通過(guò)調(diào)用函數(shù)boolean addWorker(Runnable firstTask, boolean core)來(lái)實(shí)現(xiàn)。

      ??prestartCoreThread()用于在創(chuàng)建線程池的時(shí)候初始化一個(gè)核心線程,實(shí)現(xiàn)源碼如下:

          /**
           * Starts a core thread, causing it to idly wait for work. This
           * overrides the default policy of starting core threads only when
           * new tasks are executed. This method will return {@code false}
           * if all core threads have already been started.
           *
           * @return {@code true} if a thread was started
           */
          public boolean prestartCoreThread() {
              return workerCountOf(ctl.get()) < corePoolSize &&
                  //注意傳進(jìn)去的參數(shù)firstTask是null
                  addWorker(null, true);
          }
      

      ??prestartAllCoreThreads():初始線程池時(shí)創(chuàng)建所有核心線程,實(shí)現(xiàn)源碼如下:

          /**
           * Starts all core threads, causing them to idly wait for work. This
           * overrides the default policy of starting core threads only when
           * new tasks are executed.
           *
           * @return the number of threads started
           */
          public int prestartAllCoreThreads() {
              int n = 0;
              while (addWorker(null, true))//注意傳入的任務(wù)是null
                  ++n;
              return n;
          }
      

      ??注意上面?zhèn)鬟M(jìn)boolean addWorker(Runnable firstTask, boolean core)的參數(shù)firstTask是null,創(chuàng)建的核心線程會(huì)被阻塞在getTask方法中,等待獲取任務(wù)隊(duì)列中的任務(wù)。

      ??八股文:當(dāng)任務(wù)數(shù)超過(guò)核心線程數(shù)時(shí),如何直接啟用最大線程數(shù)maximumPoolSize?

      分析:題目中【任務(wù)數(shù)超過(guò)核心線程數(shù)】可以理解為【線程數(shù)超過(guò)核心線程數(shù)】。

      :綜合【線程池執(zhí)行流程】所述得知,既然我們的預(yù)期是任務(wù)數(shù)超過(guò)核心線程數(shù)時(shí)新提交的任務(wù)不進(jìn)入任務(wù)隊(duì)列,就需要人為干預(yù)第二步 任務(wù)入隊(duì)。這答案就顯而易見(jiàn)了——在創(chuàng)建線程池的時(shí)候,指定任務(wù)隊(duì)列使用SynchronousQueue。SynchronousQueue是不能存儲(chǔ)元素的一個(gè)隊(duì)列,它的特性是每生產(chǎn)一個(gè)任務(wù)就需要指定一個(gè)消費(fèi)者來(lái)處理這個(gè)任務(wù);否則,阻塞生產(chǎn)者。

      結(jié)束語(yǔ)

      ??至此,已經(jīng)介紹完線程池核心知識(shí)點(diǎn),預(yù)祝各位讀者在工作中能夠迅速而準(zhǔn)確地處理線程池相關(guān)需求,就像運(yùn)斤成風(fēng)一樣。

      ??在編程這個(gè)復(fù)雜嚴(yán)峻的環(huán)境中,請(qǐng)活得優(yōu)雅坦然:也許你的錢包空空如也,也許你的工作不夠好,也許你正處在困境中,也許你被情所棄。不論什么原因,請(qǐng)你在出門(mén)時(shí),一定要把自己打扮地清清爽爽,昂起頭,挺起胸,面帶微笑,從容自若地面對(duì)生活和面對(duì)工作。人生就像蒲公英,沒(méi)事盡量少吹風(fēng);只要你自己真正撐起來(lái)了一片天地,別人無(wú)論如何是壓不垮你的,內(nèi)心的強(qiáng)大才是真正的強(qiáng)大。

      Reference

      posted @ 2025-05-30 10:22  樓蘭胡楊  閱讀(119)  評(píng)論(1)    收藏  舉報(bào)
      主站蜘蛛池模板: av新版天堂在线观看| 国产精品福利自产拍在线观看| 国产高清自产拍av在线| 激情亚洲内射一区二区三区| 好男人社区影视在线WWW| 国产精品天天看天天狠| 蜜臀av午夜精品福利| 亚洲欧美中文日韩v在线97| av鲁丝一区鲁丝二区鲁丝三区| 色综合视频一区二区三区| 97欧美精品系列一区二区| 国产又大又黑又粗免费视频 | 久青草视频在线视频在线| 国产乱人伦av在线无码| 97精品国产91久久久久久久| 日韩女同一区二区三区久久| 国产亚洲tv在线观看| 在线看国产精品自拍内射| 亚洲精品一区二区三天美| 撕开奶罩揉吮奶头高潮av| 亚洲精品成人老司机影视| 亚洲a∨国产av综合av| 成人天堂资源www在线| 国产精品色哟哟成人av| 国产成人综合在线女婷五月99播放| 阿拉善盟| 九九热精品在线观看| 九九热视频在线免费观看| 亚洲精品欧美综合二区| 久久亚洲日本激情战少妇| 人妻少妇精品性色av蜜桃| 中文字幕亚洲制服在线看| 日韩中文字幕一区二区不卡| 综合无码一区二区三区| 国产极品尤物粉嫩在线观看| japanese丰满奶水| 日韩人妻精品中文字幕| 亚洲综合一区二区三区| 黑人av无码一区| 欧美人与动牲交精品| 亚洲AV永久无码嘿嘿嘿嘿|