<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Java并發之線程池ThreadPoolExecutor源碼分析學習

      線程池學習

      以下所有內容以及源碼分析都是基于JDK1.8的,請知悉。

      線程池的優勢

      ? 既然說到線程池了,而且大多數的大牛也都會建議我們使用池化技術來管理一些資源,那線程池肯定也是有它的好處的,要不然怎么會那么出名并且讓大家使用呢?

      ? 我們就來看看它究竟有什么優勢?

      • 資源可控性:使用線程池可以避免創建大量線程而導致內存的消耗

      • 提高響應速度:線程池地創建實際上是很消耗時間和性能的,由線程池創建好有任務就運行,提升響應速度。

      • 便于管理:池化技術最突出的一個特點就是可以幫助我們對池子里的資源進行管理。由線程池統一分配和管理。

      線程池的創建

      ? 我們要用線程池來統一分配和管理我們的線程,那首先我們要創建一個線程池出來,還是有很多大牛已經幫我們寫好了很多方面的代碼的,Executors的工廠方法就給我們提供了創建多種不同線程池的方法。因為這個類只是一個創建對象的工廠,并沒有涉及到很多的具體實現,所以我不會過于詳細地去說明。

      ? 老規矩,還是直接上代碼吧。

      public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }

      這里也就舉出一個方法的例子來進行之后的講解吧,我們可以看出,Executors只是個工廠而已,方法也只是來實例化不同的對象,實際上實例化出來的關鍵類就是ThreadPoolExecutor。現在我們就先來簡單地對ThreadPoolExecutor構造函數內的每個參數進行解釋一下吧。

      • corePoolSize(核心線程池大小):當提交一個任務到線程池時,線程池會創建一個線程來執行任務,即使其他空閑的基本線程能夠執行新任務也會創建線程,當任務數大于核心線程數的時候就不會再創建。在這里要注意一點,線程池剛創建的時候,其中并沒有創建任何線程,而是等任務來才去創建線程,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法 ,這樣才會預先創建好corePoolSize個線程或者一個線程。

      • maximumPoolSize(線程池最大線程數):線程池允許創建的最大線程數,如果隊列滿了,并且已創建的線程數小于最大線程數,則線程池會再創建新的線程執行任務。值得注意的是,如果使用了無界隊列,此參數就沒有意義了。

      • keepAliveTime(線程活動保持時間):此參數默認在線程數大于corePoolSize的情況下才會起作用, 當線程的空閑時間達到keepAliveTime的時候就會終止,直至線程數目小于corePoolSize。不過如果調用了allowCoreThreadTimeOut方法,則當線程數目小于corePoolSize的時候也會起作用.

      • unit(keelAliveTime的時間單位):keelAliveTime的時間單位,一共有7種,在這里就不列舉了。

      • workQueue(阻塞隊列):阻塞隊列,用來存儲等待執行的任務,這個參數也是非常重要的,在這里簡單介紹一下幾個阻塞隊列。

        • ArrayBlockingQueue:這是一個基于數組結構的有界阻塞隊列,此隊列按照FIFO的原則對元素進行排序。

        • LinkedBlockingQueue:一個基于鏈表結構的阻塞隊列,此隊列按照FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()就是使用了這個隊列。

        • SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀態。吞吐量通常要高于LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool()就使用了這個隊列。

        • PriorityBlockingQueue:一個具有優先級的無阻塞隊列。

      • handler(飽和策略);當線程池和隊列都滿了,說明線程池已經處于飽和狀態了,那么必須采取一種策略來處理還在提交過來的新任務。這個飽和策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。共有四種飽和策略提供,當然我們也可以選擇自己實現飽和策略。

        • AbortPolicy:直接丟棄并且拋出RejectedExecutionException異常

        • CallerRunsPolicy:只用調用者所在線程來運行任務。

        • DiscardOldestPolicy:丟棄隊列里最近的一個任務,并執行當前任務。

        • DiscardPolicy:丟棄任務并且不拋出異常。

      ?

      線程池的執行流程就用參考資料里的圖介紹一下了,具體我們還是通過代碼去講解。
      線程池流程.jpg

      在上面我們簡單的講解了一下Executors這個工廠類里的工廠方法,并且講述了一下創建線程池的一些參數以及它們的作用,當然上面的講解并不是很深入,因為想要弄懂的話是需要持續地花時間去看去理解的,而博主自己也還是沒有完全弄懂,不過博主的學習方法是先學了個大概,再回頭來看看之前的知識點,可能會更加好理解,所以我們接著往下面講吧。

      ThreadPoolExecutor源碼分析

      ? 在上面我們就發現了,Executors的工廠方法主要就返回了ThreadPoolExecutor對象,至于另一個在這里暫時不講,也就是說,要學習線程池,其實關鍵的還是得學會分析ThreadPoolExecutor這個對象里面的源碼,我們接下來就會對ThreadPoolExecutor里的關鍵代碼進行分析。

      AtomicInteger ctl

      ctl是主要的控制狀態,是一個復合類型的變量,其中包括了兩個概念。

      • workerCount:表示有效的線程數目

      • runState:線程池里線程的運行狀態


      我們來分析一下跟ctl有關的一些源代碼吧,直接上代碼

           private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
      
           //用來表示線程池數量的位數,很明顯是29,Integer.SIZE=32
           private static final int COUNT_BITS = Integer.SIZE - 3;
           //線程池最大數量,2^29 - 1
           private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
      
           // runState is stored in the high-order bits
           //我們可以看出有5種runState狀態,證明至少需要3位來表示runState狀態
           //所以高三位就是表示runState了
           private static final int RUNNING    = -1 << COUNT_BITS;
           private static final int SHUTDOWN   =  0 << COUNT_BITS;
           private static final int STOP       =  1 << COUNT_BITS;
           private static final int TIDYING    =  2 << COUNT_BITS;
           private static final int TERMINATED =  3 << COUNT_BITS;
      
           // Packing and unpacking ctl
           private static int runStateOf(int c)     { return c & ~CAPACITY; }
           private static int workerCountOf(int c)  { return c & CAPACITY; }
           private static int ctlOf(int rs, int wc) { return rs | wc; }
      
           //用于存放線程任務的阻塞隊列
           private final BlockingQueue<Runnable> workQueue;
      
           //重入鎖
           private final ReentrantLock mainLock = new ReentrantLock();
      
           //線程池當中的線程集合,只有當擁有mainLock鎖的時候,才可以進行訪問
           private final HashSet<Worker> workers = new HashSet<Worker>();
      
           //等待條件支持終止
           private final Condition termination = mainLock.newCondition();
      
           //創建新線程的線程工廠
           private volatile ThreadFactory threadFactory;
      
           //飽和策略
           private volatile RejectedExecutionHandler handler;
      1. CAPACITY

        在這里我們講一下這個線程池最大數量的計算吧,因為這里涉及到源碼以及位移之類的操作,我感覺大多數人都還是不太會這個,因為我一開始看的時候也是不太會的。

      private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

      從代碼我們可以看出,是需要1往左移29位,然后再減去1,那個1往左移29位是怎么計算的呢?

           1 << COUNT_BITS
            ?
            1的32位2進制是
            00000000 00000000 00000000 00000001
            ?
            左移29位的話就是
            00100000 00000000 00000000 00000000
            ?
            再進行減一的操作
            000 11111 11111111 11111111 11111111
            ?
            也就是說線程池最大數目就是
            000 11111 11111111 11111111 11111111

      2.runState

      正數的原碼、反碼、補碼都是一樣的
      在計算機底層,是用補碼來表示的

      private static final int RUNNING    = -1 << COUNT_BITS;
      private static final int SHUTDOWN = 0 << COUNT_BITS;
      private static final int STOP = 1 << COUNT_BITS;
      private static final int TIDYING    = 2 << COUNT_BITS;
      private static final int TERMINATED = 3 << COUNT_BITS;
      • RUNNING

        可以接受新任務并且處理已經在阻塞隊列的任務
        高3位全部是1的話,就是RUNNING狀態

      -1 << COUNT_BITS
      
      這里是-1往左移29位,稍微有點不一樣,-1的話需要我們自己算出補碼來
                ?
      -1的原碼
      10000000 00000000 00000000 00000001
                ?
      -1的反碼,負數的反碼是將原碼除符號位以外全部取反
      11111111 11111111 11111111 11111110
                ?
      -1的補碼,負數的補碼就是將反碼+1
      11111111 11111111 11111111 11111111
                ?
      關鍵了,往左移29位,所以高3位全是1就是RUNNING狀態
      111 00000 00000000 00000000 00000000
      • SHUTDOWN

        不接受新任務,但是處理已經在阻塞隊列的任務
        高3位全是0,就是SHUTDOWN狀態

      0 << COUNT_BITS
                ?
      0的表示
      00000000 00000000 00000000 00000000
                ?
      往左移29位
      00000000 00000000 00000000 00000000
      • STOP

        不接受新任務,也不處理阻塞隊列里的任務,并且會中斷正在處理的任務
        所以高3位是001,就是STOP狀態

      1 << COUNT_BITS
                ?
      1的表示
      00000000 00000000 00000000 00000001
                ?
      往左移29位
      00100000 00000000 00000000 00000000
      • TIDYING

        所有任務都被中止,workerCount是0,線程狀態轉化為TIDYING并且調用terminated()鉤子方法
        所以高3位是010,就是TIDYING狀態

      2 << COUNT_BITS
                ?
      2的32位2進制
      00000000 00000000 00000000 00000010
                ?
      往左移29位
      01000000 00000000 00000000 00000000
      • TERMINATED

        terminated()鉤子方法已經完成
        所以高3位是110,就是TERMINATED狀態

      3 << COUNT_BITS
                ?
      3的32位2進制
      00000000 00000000 00000000 00000011
                ?
      往左移29位
      11000000 00000000 00000000 00000000

      3.部分方法介紹

      • runStateOf(int c)

      實時獲取runState的方法

      private static int runStateOf(int c)     { return c & ~CAPACITY; }
      ~CAPACITY
      ~是按位取反的意思
      &是按位與的意思
                ?
      而CAPACITY是,高位3個0,低29位都是1,所以是
      000 11111 11111111 11111111 11111111
                ?
      取反的話就是
      111 00000 00000000 00000000 00000000
                ?
      傳進來的c參數與取反的CAPACITY進行按位與操作
      1、低位29個0進行按位與,還是29個0
      2、高位3個1,既保持c參數的高3位
      既高位保持原樣,低29位都是0,這也就獲得了線程池的運行狀態runState
      • workerCountOf(int c)

      獲取線程池的當前有效線程數目

      private static int workerCountOf(int c)  { return c & CAPACITY; }
      CAPACITY的32位2進制是
      000 11111 11111111 11111111 11111111
                ?
      用入參c跟CAPACITY進行按位與操作
      1、低29位都是1,所以保留c的低29位,也就是有效線程數
      2、高3位都是0,所以c的高3位也是0
                ?
      這樣獲取出來的便是workerCount的值
      • ctlOf(int rs, int wc)

        原子整型變量ctl的初始化方法

      //結合這幾句代碼來看
      private static final int RUNNING    = -1 << COUNT_BITS;
      private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
                ?
      private static int ctlOf(int rs, int wc) { return rs | wc; }
      RUNNING是
      111 00000 00000000 00000000 00000000
                ?
      ctlOf是將rs和wc進行按位或的操作
                ?
      初始化的時候是將RUNNING和0進行按位或
      0的32位2進制是
      00000000 00000000 00000000 00000000
                ?
      所以初始化的ctl是
      111 00000 00000000 00000000 00000000

      核心方法源碼分析

      1. execute(Runnable command)方法
            public void execute(Runnable command) {
                //需要執行的任務command為空,拋出空指針異常
                if (command == null)  // 1
                    throw new NullPointerException();
      
                /*
                *執行的流程實際上分為三步
                *1、如果運行的線程小于corePoolSize,以用戶給定的Runable對象新開一個線程去執行
                *  并且執行addWorker方法會以原子性操作去檢查runState和workerCount,以防止當返回false的
                *  時候添加了不應該添加的線程
                *2、 如果任務能夠成功添加到隊列當中,我們仍需要對添加的線程進行雙重檢查,有可能添加的線程在前
                *  一次檢查時已經死亡,又或者在進入該方法的時候線程池關閉了。所以我們需要復查狀態,并有有必
                *  要的話需要在停止時回滾入列操作,或者在沒有線程的時候新開一個線程
                *3、如果任務無法入列,那我們需要嘗試新增一個線程,如果新建線程失敗了,我們就知道線程可能關閉了
                *  或者飽和了,就需要拒絕這個任務
                *
                */
      
                //獲取線程池的控制狀態
                int c = ctl.get();  // 2
      
                //通過workCountOf方法算workerCount值,小于corePoolSize
                if (workerCountOf(c) < corePoolSize) {
                    //添加任務到worker集合當中
            if (addWorker(command, true)) 
                        return;  //成功返回
                    //失敗的話再次獲取線程池的控制狀態
                    c = ctl.get();
                }
      
                /*
                *判斷線程池是否正處于RUNNING狀態
                *是的話添加Runnable對象到workQueue隊列當中
                */
                if (isRunning(c) && workQueue.offer(command)) {  // 3
      
                    //再次獲取線程池的狀態
                    int recheck = ctl.get();
      
                    //再次檢查狀態
                    //線程池不處于RUNNING狀態,將任務從workQueue隊列中移除
                    if (! isRunning(recheck) && remove(command))
                        //拒絕任務
                        reject(command);
                    //workerCount等于0
                    else if (workerCountOf(recheck) == 0)  // 4
                        //添加worker
                        addWorker(null, false);
                }
                //加入阻塞隊列失敗,則嘗試以線程池最大線程數新開線程去執行該任務
            else if (!addWorker(command, false))  // 5 
                    //執行失敗則拒絕任務
                    reject(command);
            }

      我們來說一下上面這個代碼的流程:

      1、首先判斷任務是否為空,空則拋出空指針異常
      2、不為空則獲取線程池控制狀態,判斷小于corePoolSize,添加到worker集合當中執行,

      • 如成功,則返回
      • 失敗的話再接著獲取線程池控制狀態,因為只有狀態變了才會失敗,所以重新獲取
        3、判斷線程池是否處于運行狀態,是的話則添加command到阻塞隊列,加入時也會再次獲取狀態并且檢測
        ? 狀態是否不處于運行狀態,不處于的話則將command從阻塞隊列移除,并且拒絕任務
        4、如果線程池里沒有了線程,則創建新的線程去執行獲取阻塞隊列的任務執行
        5、如果以上都沒執行成功,則需要開啟最大線程池里的線程來執行任務,失敗的話就丟棄

      有時候再多的文字也不如一個流程圖來的明白,所以還是畫了個execute的流程圖給大家方便理解。
      execute執行流程.jpg

      2.addWorker(Runnable firstTask, boolean core)

            private boolean addWorker(Runnable firstTask, boolean core) {
                //外部循環標記
                retry:
                //外層死循環
                for (;;) {
                    //獲取線程池控制狀態
                    int c = ctl.get();
                    //獲取runState
                    int rs = runStateOf(c);
            ?
                    // Check if queue empty only if necessary.
      
                    /**
                    *1.如果線程池runState至少已經是SHUTDOWN
                    *2\. 有一個是false則addWorker失敗,看false的情況
                    * - runState==SHUTDOWN,即狀態已經大于SHUTDOWN了
                    * - firstTask為null,即傳進來的任務為空,結合上面就是runState是SHUTDOWN,但是
                    *  firstTask不為空,代表線程池已經關閉了還在傳任務進來
                    * - 隊列為空,既然任務已經為空,隊列為空,就不需要往線程池添加任務了
                    */
                    if (rs >= SHUTDOWN &&  //runState大于等于SHUTDOWN,初始位RUNNING
                        ! (rs == SHUTDOWN &&  //runState等于SHUTDOWN
                           firstTask == null &&  //firstTask為null
                           ! workQueue.isEmpty()))  //workQueue隊列不為空
                        return false;
            ?
                    //內層死循環
                    for (;;) {
                        //獲取線程池的workerCount數量
                        int wc = workerCountOf(c);
                        //如果workerCount超出最大值或者大于corePoolSize/maximumPoolSize
                        //返回false
                        if (wc >= CAPACITY ||
                            wc >= (core ? corePoolSize : maximumPoolSize))
                            return false;
                        //通過CAS操作,使workerCount數量+1,成功則跳出循環,回到retry標記
                        if (compareAndIncrementWorkerCount(c))
                            break retry;
      
                        //CAS操作失敗,再次獲取線程池的控制狀態
                        c = ctl.get();  // Re-read ctl
                        //如果當前runState不等于剛開始獲取的runState,則跳出內層循環,繼續外層循環
                        if (runStateOf(c) != rs)
                            continue retry;
                        // else CAS failed due to workerCount change; retry inner loop
                        //CAS由于更改workerCount而失敗,繼續內層循環
                    }
                }
            ?
                //通過以上循環,能執行到這是workerCount成功+1了
      
                //worker開始標記
                boolean workerStarted = false;
                //worker添加標記
                boolean workerAdded = false;
                //初始化worker為null
                Worker w = null;
                try {
                    //初始化一個當前Runnable對象的worker對象
                    w = new Worker(firstTask);
                    //獲取該worker對應的線程
                    final Thread t = w.thread;
                    //如果線程不為null
                    if (t != null) {
                        //初始線程池的鎖
                        final ReentrantLock mainLock = this.mainLock;
                        //獲取鎖
                        mainLock.lock();
                        try {
                            // Recheck while holding lock.
                            // Back out on ThreadFactory failure or if
                            // shut down before lock acquired.
                            //獲取鎖后再次檢查,獲取線程池runState
                            int rs = runStateOf(ctl.get());
            ?
                            //當runState小于SHUTDOWN或者runState等于SHUTDOWN并且firstTask為null
                            if (rs < SHUTDOWN ||
                                (rs == SHUTDOWN && firstTask == null)) {
      
                                //線程已存活
                                if (t.isAlive()) // precheck that t is startable
                                    //線程未啟動就存活,拋出IllegalThreadStateException異常
                                    throw new IllegalThreadStateException();
      
                                //將worker對象添加到workers集合當中
                                workers.add(w);
                                //獲取workers集合的大小
                                int s = workers.size();
                                //如果大小超過largestPoolSize
                                if (s > largestPoolSize)
                                    //重新設置largestPoolSize
                                    largestPoolSize = s;
                                //標記worker已經被添加
                                workerAdded = true;
                            }
                        } finally {
                            //釋放鎖
                            mainLock.unlock();
                        }
                        //如果worker添加成功
                        if (workerAdded) {
                            //啟動線程
                            t.start();
                            //標記worker已經啟動
                            workerStarted = true;
                        }
                    }
                } finally {
                    //如果worker沒有啟動成功
                    if (! workerStarted)
                        //workerCount-1的操作
                        addWorkerFailed(w);
                }
                //返回worker是否啟動的標記
                return workerStarted;
            }

      我們也簡單說一下這個代碼的流程吧,還真的是挺難的,博主寫的時候都停了好多次,想砸鍵盤的說:

      1、獲取線程池的控制狀態,進行判斷,不符合則返回false,符合則下一步
      2、死循環,判斷workerCount是否大于上限,或者大于corePoolSize/maximumPoolSize,沒有的話則對workerCount+1操作,
      3、如果不符合上述判斷或+1操作失敗,再次獲取線程池的控制狀態,獲取runState與剛開始獲取的runState相比,不一致則跳出內層循環繼續外層循環,否則繼續內層循環
      4、+1操作成功后,使用重入鎖ReentrantLock來保證往workers當中添加worker實例,添加成功就啟動該實例。

      接下來看看流程圖來理解一下上面代碼的一個執行流程
      addWorker.jpg

      3.addWorkerFailed(Worker w)

      addWorker方法添加worker失敗,并且沒有成功啟動任務的時候,就會調用此方法,將任務從workers中移除,并且workerCount做-1操作。

            private void addWorkerFailed(Worker w) {
                //重入鎖
                final ReentrantLock mainLock = this.mainLock;
                //獲取鎖
                mainLock.lock();
                try {
                    //如果worker不為null
                    if (w != null)
                        //workers移除worker
                        workers.remove(w);
                    //通過CAS操作,workerCount-1
                    decrementWorkerCount();
                    tryTerminate();
                } finally {
                    //釋放鎖
                    mainLock.unlock();
                }
            }

      4.tryTerminate()

      當對線程池執行了非正常成功邏輯的操作時,都會需要執行tryTerminate嘗試終止線程池

            final void tryTerminate() {
                //死循環
                for (;;) {
                    //獲取線程池控制狀態
                    int c = ctl.get();
      
                    /*
                    *線程池處于RUNNING狀態
                    *線程池狀態最小大于TIDYING
                    *線程池==SHUTDOWN并且workQUeue不為空
                    *直接return,不能終止
                    */
                    if (isRunning(c) ||
                        runStateAtLeast(c, TIDYING) ||
                        (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                        return;
      
                    //如果workerCount不為0
                    if (workerCountOf(c) != 0) { // Eligible to terminate
                        interruptIdleWorkers(ONLY_ONE);
                        return;
                    }
            ?
                    //獲取線程池的鎖
                    final ReentrantLock mainLock = this.mainLock;
                    //獲取鎖
                    mainLock.lock();
                    try {
                        //通過CAS操作,設置線程池狀態為TIDYING
                        if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                            try {
                                terminated();
                            } finally {
                                //設置線程池的狀態為TERMINATED
                                ctl.set(ctlOf(TERMINATED, 0));
                                //發送釋放信號給在termination條件上等待的線程
                                termination.signalAll();
                            }
                            return;
                        }
                    } finally {
                        //釋放鎖
                        mainLock.unlock();
                    }
                    // else retry on failed CAS
                }
            }

      5.runWorker(Worker w)

      該方法的作用就是去執行任務

      final void runWorker(Worker w) {
            //獲取當前線程
            Thread wt = Thread.currentThread();
            //獲取worker里的任務
            Runnable task = w.firstTask;
            //將worker實例的任務賦值為null
            w.firstTask = null;
      
            /*
            *unlock方法會調用AQS的release方法
            *release方法會調用具體實現類也就是Worker的tryRelease方法
            *也就是將AQS狀態置為0,允許中斷
            */
            w.unlock(); // allow interrupts
            //是否突然完成
            boolean completedAbruptly = true;
            try {
                //worker實例的task不為空,或者通過getTask獲取的不為空
                while (task != null || (task = getTask()) != null) {
                    //獲取鎖
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    /*
                    *獲取線程池的控制狀態,至少要大于STOP狀態
                    *如果狀態不對,檢查當前線程是否中斷并清除中斷狀態,并且再次檢查線程池狀態是否大于STOP
                    *如果上述滿足,檢查該對象是否處于中斷狀態,不清除中斷標記
                    */
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        //中斷改對象
                        wt.interrupt();
                    try {
                        //執行前的方法,由子類具體實現
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            //執行任務
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            //執行完后調用的方法,也是由子類具體實現
                            afterExecute(task, thrown);
                        }
                    } finally {//執行完后
                        //task設置為null
                        task = null;
                        //已完成任務數+1
                        w.completedTasks++;
                        //釋放鎖
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                //處理并退出當前worker
                processWorkerExit(w, completedAbruptly);
            }
        }

      接下來我們用文字來說明一下執行任務這個方法的具體邏輯和流程。

      1. 首先在方法一進來,就執行了w.unlock(),這是為了將AQS的狀態改為0,因為只有getState() >= 0的時候,線程才可以被中斷;
      2. 判斷firstTask是否為空,為空則通過getTask()獲取任務,不為空接著往下執行
      3. 判斷是否符合中斷狀態,符合的話設置中斷標記
      4. 執行beforeExecute(),task.run(),afterExecute()方法
      5. 任何一個出異常都會導致任務執行的終止;進入processWorkerExit來退出任務
      6. 正常執行的話會接著回到步驟2

      附上一副簡單的流程圖:
      runWorker.jpg

      6.getTask()

      在上面的runWorker方法當中我們可以看出,當firstTask為空的時候,會通過該方法來接著獲取任務去執行,那我們就看看獲取任務這個方法到底是怎么樣的?

            private Runnable getTask() {
                //標志是否獲取任務超時
                boolean timedOut = false; // Did the last poll() time out?
            ?
                //死循環
                for (;;) {
                    //獲取線程池的控制狀態
                    int c = ctl.get();
                    //獲取線程池的runState
                    int rs = runStateOf(c);
            ?
                    // Check if queue empty only if necessary.
                    /*
                    *判斷線程池的狀態,出現以下兩種情況
                    *1、runState大于等于SHUTDOWN狀態
                    *2、runState大于等于STOP或者阻塞隊列為空
                    *將會通過CAS操作,進行workerCount-1并返回null
                    */
                    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                        decrementWorkerCount();
                        return null;
                    }
            ?
                    //獲取線程池的workerCount
                    int wc = workerCountOf(c);
            ?
                    // Are workers subject to culling?
      
                    /*
                    *allowCoreThreadTimeOut:是否允許core Thread超時,默認false
                    *workerCount是否大于核心核心線程池
                    */
                    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            ?
                    /*
                    *1、wc大于maximumPoolSize或者已超時
                    *2、隊列不為空時保證至少有一個任務
                    */
                    if ((wc > maximumPoolSize || (timed && timedOut))
                        && (wc > 1 || workQueue.isEmpty())) {
                        /*
                        *通過CAS操作,workerCount-1
                        *能進行-1操作,證明wc大于maximumPoolSize或者已經超時
                        */
                        if (compareAndDecrementWorkerCount(c))
                            //-1操作成功,返回null
                            return null;
                        //-1操作失敗,繼續循環
                        continue;
                    }
            ?
                    try {
                        /*
                        *wc大于核心線程池
                        *執行poll方法
                        *小于核心線程池
                        *執行take方法
                        */
                        Runnable r = timed ?
                            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                        //判斷任務不為空返回任務
                        if (r != null)
                            return r;
                        //獲取一段時間沒有獲取到,獲取超時
                        timedOut = true;
                    } catch (InterruptedException retry) {
                        timedOut = false;
                    }
                }
            }

      還是文字解說一下上面的代碼邏輯和流程:

      1. 獲取線程池控制狀態和runState,判斷線程池是否已經關閉或者正在關閉,是的話則workerCount-1操作返回null
      2. 獲取workerCount判斷是否大于核心線程池
      3. 判斷workerCount是否大于最大線程池數目或者已經超時,是的話workerCount-1,-1成功則返回null,不成功則回到步驟1重新繼續
      4. 判斷workerCount是否大于核心線程池,大于則用poll方法從隊列獲取任務,否則用take方法從隊列獲取任務
      5. 判斷任務是否為空,不為空則返回獲取的任務,否則回到步驟1重新繼續

      接下來依然有一副流程圖:
      getTask.jpg

      7.processWorkerExit

      明顯的,在執行任務當中,會去獲取任務進行執行,那既然是執行任務,肯定就會有執行完或者出現異常中斷執行的時候,那這時候肯定也會有相對應的操作,至于具體操作是怎么樣的,我們還是直接去看源碼最實際。

           private void processWorkerExit(Worker w, boolean completedAbruptly) {
                /*
                *completedAbruptly:在runWorker出現,代表是否突然完成的意思
                *也就是在執行任務過程當中出現異常,就會突然完成,傳true
                *
                *如果是突然完成,需要通過CAS操作,workerCount-1
                *不是突然完成,則不需要-1,因為getTask方法當中已經-1
                *
                *下面的代碼注釋貌似與代碼意思相反了
                */
                if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
                    decrementWorkerCount();
            ?
                //生成重入鎖
                final ReentrantLock mainLock = this.mainLock;
                //獲取鎖
                mainLock.lock();
                try {
                    //線程池統計的完成任務數completedTaskCount加上worker當中完成的任務數
                    completedTaskCount += w.completedTasks;
                    //從HashSet<Worker>中移除
                    workers.remove(w);
                } finally {
                    //釋放鎖
                    mainLock.unlock();
                }
            ?
                //因為上述操作是釋放任務或線程,所以會判斷線程池狀態,嘗試終止線程池
                tryTerminate();
            ?
                //獲取線程池的控制狀態
                int c = ctl.get();
      
                //判斷runState是否小魚STOP,即是RUNNING或者SHUTDOWN
                //如果是RUNNING或者SHUTDOWN,代表沒有成功終止線程池
                if (runStateLessThan(c, STOP)) {
                    /*
                    *是否突然完成
                    *如若不是,代表已經沒有任務可獲取完成,因為getTask當中是while循環
                    */
                    if (!completedAbruptly) {
                        /*
                        *allowCoreThreadTimeOut:是否允許core thread超時,默認false
                        *min-默認是corePoolSize
                        */
                        int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                        //允許core thread超時并且隊列不為空
                        //min為0,即允許core thread超時,這樣就不需要維護核心核心線程池了
                        //如果workQueue不為空,則至少保持一個線程存活
                        if (min == 0 && ! workQueue.isEmpty())
                            min = 1;
                        //如果workerCount大于min,則表示滿足所需,可以直接返回
                        if (workerCountOf(c) >= min)
                            return; // replacement not needed
                    }
                    //如果是突然完成,添加一個空任務的worker線程--這里我也不太理解
                    addWorker(null, false);
                }
            }
      1. 首先判斷線程是否突然終止,如果是突然終止,通過CAS,workerCount-1
      2. 統計線程池完成任務數,并將worker從workers當中移除
      3. 判斷線程池狀態,嘗試終止線程池
      4. 線程池沒有成功終止
        • 判斷是否突然完成任務,不是則進行下一步,是則進行第三步
        • 如允許核心線程超時,隊列不為空,則至少保證一個線程存活
        • 添加一個空任務的worker線程

      Worker內部類

      ? 我們在上面已經算是挺詳細地講了線程池執行任務execute的執行流程和一些細節,在上面頻繁地出現了一個字眼,那就是worker實例,那么這個worker究竟是什么呢?里面都包含了一些什么信息,以及worker這個任務究竟是怎么執行的呢?

      ? 我們就在這個部分來介紹一下吧,還是直接上源碼:

      我們可以看到Worker內部類繼承AQS同步器并且實現了Runnable接口,所以Worker很明顯就是一個可執行任務并且又可以控制中斷、起到鎖效果的類。

        private final class Worker
                extends AbstractQueuedSynchronizer
                implements Runnable
            {
                /**
                 * This class will never be serialized, but we provide a
                 * serialVersionUID to suppress a javac warning.
                 */
                private static final long serialVersionUID = 6138294804551838833L;
        ?
                /** 工作線程,如果工廠失敗則為空. */
                final Thread thread;
                /** 初始化任務,有可能為空 */
                Runnable firstTask;
                /** 已完成的任務計數 */
                volatile long completedTasks;
        ?
                /**
                 * 創建并初始化第一個任務,使用線程工廠來創建線程
                 * 初始化有3步
                 *1、設置AQS的同步狀態為-1,表示該對象需要被喚醒
                 *2、初始化第一個任務
                 *3、調用ThreadFactory來使自身創建一個線程,并賦值給worker的成員變量thread
                 */
                Worker(Runnable firstTask) {
                    setState(-1); // inhibit interrupts until runWorker
                    this.firstTask = firstTask;
                    this.thread = getThreadFactory().newThread(this);
                }
        ?
          //重寫Runnable的run方法
                /** Delegates main run loop to outer runWorker  */
                public void run() {
                    //調用ThreadPoolExecutor的runWorker方法
                    runWorker(this);
                }
        ?
                // Lock methods
                //
                // The value 0 represents the unlocked state.
                // The value 1 represents the locked state.
          //代表是否獨占鎖,0-非獨占  1-獨占
                protected boolean isHeldExclusively() {
                    return getState() != 0;
                }
      
          //重寫AQS的tryAcquire方法嘗試獲取鎖
                protected boolean tryAcquire(int unused) {
                 //嘗試將AQS的同步狀態從0改為1
                    if (compareAndSetState(0, 1)) {
                     //如果改變成,則將當前獨占模式的線程設置為當前線程并返回true
                        setExclusiveOwnerThread(Thread.currentThread());
                        return true;
                    }
                    //否則返回false
                    return false;
                }
        ?
          //重寫AQS的tryRelease嘗試釋放鎖
                protected boolean tryRelease(int unused) {
                 //設置當前獨占模式的線程為null
                    setExclusiveOwnerThread(null);
                    //設置AQS同步狀態為0
                    setState(0);
                    //返回true
                    return true;
                }
        ?
          //獲取鎖
                public void lock()        { acquire(1); }
                //嘗試獲取鎖
                public boolean tryLock()  { return tryAcquire(1); }
                //釋放鎖
                public void unlock()      { release(1); }
                //是否被獨占
                public boolean isLocked() { return isHeldExclusively(); }
        ?
                void interruptIfStarted() {
                    Thread t;
                    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                        try {
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        }
                    }
                }
        }

       

      posted @ 2019-07-26 10:06  豆腐魚  閱讀(234)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 国产精一区二区黑人巨大| 熟妇人妻激情偷爽文| 额尔古纳市| 精品无码国产污污污免费| 亚洲狠狠婷婷综合久久久| 亚洲成av人片无码天堂下载| 色情无码一区二区三区| 最新国产精品好看的精品| 1024你懂的国产精品| 亚洲精品男男一区二区| 香港经典a毛片免费观看播放| 亚洲成av人片无码迅雷下载| 伊人色综合九久久天天蜜桃| 久久亚洲精品中文字幕波多野结衣| 夜夜添狠狠添高潮出水| 国产成人精彩在线视频| 中国国产免费毛卡片| 99九九视频高清在线| 中文字幕无码专区一VA亚洲V专| 四虎永久精品免费视频| 亚洲老熟女一区二区三区| 肥臀浪妇太爽了快点再快点| 阿巴嘎旗| 蜜桃成熟色综合久久av| 成年午夜免费韩国做受视频| 亚洲欧美牲交| 日本免费一区二区三区日本| 国产午夜福利小视频合集| 日韩人妻无码精品久久| 精品国产综合成人亚洲区| 午夜av高清在线观看| 国产在线精品无码二区| 国产地址二永久伊甸园| 综合色一色综合久久网| 亚洲日韩在线中文字幕第一页 | 男人的天堂av社区在线| 国产麻豆精品一区一区三区| 国产在线精品国偷产拍| 爆乳2把你榨干哦ova在线观看| 亚洲av永久无码精品网站| 亚洲图片自拍偷图区|