<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      線程池續:你必須要知道的線程池submit()實現原理之FutureTask!

      FutureTask思維導圖.png

      前言

      上一篇內容寫了Java中線程池的實現原理及源碼分析,說好的是實實在在的大滿足,想通過一篇文章讓大家對線程池有個透徹的了解,但是文章寫完總覺得還缺點什么?

      上篇文章只提到線程提交的execute()方法,并沒有講解線程提交的submit()方法,submit()有一個返回值,可以獲取線程執行的結果Future<T>,這一講就那深入學習下submit()FutureTask實現原理。

      使用場景&示例

      使用場景

      我能想到的使用場景就是在并行計算的時候,例如一個方法中調用methodA()、methodB(),我們可以通過線程池異步去提交方法A、B,然后在主線程中獲取組裝方法A、B計算后的結果,能夠大大提升方法的吞吐量。

      FutureTask使用場景.png

      使用示例

      /**
       * @author wangmeng
       * @date 2020/5/28 15:30
       */
      public class FutureTaskTest {
          public static void main(String[] args) throws InterruptedException, ExecutionException {
              ExecutorService threadPool = Executors.newCachedThreadPool();
      
              System.out.println("====執行FutureTask線程任務====");
              Future<String> futureTask = threadPool.submit(new Callable<String>() {
                  @Override
                  public String call() throws Exception {
                      System.out.println("FutureTask執行業務邏輯");
                      Thread.sleep(2000);
                      System.out.println("FutureTask業務邏輯執行完畢!");
                      return "歡迎關注: 一枝花算不算浪漫!";
                  }
              });
      
              System.out.println("====執行主線程任務====");
              Thread.sleep(1000);
              boolean flag = true;
              while(flag){
                  if(futureTask.isDone() && !futureTask.isCancelled()){
                      System.out.println("FutureTask異步任務執行結果:" + futureTask.get());
                      flag = false;
                  }
              }
      
              threadPool.shutdown();
          }
      }
      

      上面的使用很簡單,submit()內部傳遞的實際上是個Callable接口,我們自己實現其中的call()方法,我們通過futureTask既可以獲取到具體的返回值。

      submit()實現原理

      submit() 是也是提交任務到線程池,只是它可以獲取任務返回結果,返回結果是通過FutureTask來實現的,先看下ThreadPoolExecutor中代碼實現:

      public class ThreadPoolExecutor extends AbstractExecutorService {
          public <T> Future<T> submit(Callable<T> task) {
              if (task == null) throw new NullPointerException();
              RunnableFuture<T> ftask = newTaskFor(task);
              execute(ftask);
              return ftask;
          }
      }
      
      public abstract class AbstractExecutorService implements ExecutorService {
          protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
              return new FutureTask<T>(callable);
          }
      }
      

      提交任務還是執行execute()方法,只是task被包裝成了FutureTask ,也就是在excute()中啟動線程后會執行FutureTask.run()方法。

      再來具體看下它執行的完整鏈路圖:

      submit()全鏈路圖.png

      上圖可以看到,執行任務并返回執行結果的核心邏輯實在FutureTask中,我們以FutureTask.run/get 兩個方法為突破口,一點點剖析FutureTask的實現原理。

      FutureTask源碼初探

      先看下FutureTask中部分屬性:

      FutureTask屬性.png

      public class FutureTask<V> implements RunnableFuture<V> {
          private volatile int state;
          private static final int NEW          = 0;
          private static final int COMPLETING   = 1;
          private static final int NORMAL       = 2;
          private static final int EXCEPTIONAL  = 3;
          private static final int CANCELLED    = 4;
          private static final int INTERRUPTING = 5;
          private static final int INTERRUPTED  = 6;
      
          private Callable<V> callable;
          private Object outcome;
          private volatile Thread runner;
          private volatile WaitNode waiters;
      }
      
      1. state

      當前task狀態,共有7中類型。
      NEW: 當前任務尚未執行
      COMPLETING: 當前任務正在結束,尚未完全結束,一種臨界狀態
      NORMAL:當前任務正常結束
      EXCEPTIONAL: 當前任務執行過程中發生了異常。
      CANCELLED: 當前任務被取消
      INTERRUPTING: 當前任務中斷中..
      INTERRUPTED: 當前任務已中斷

      1. callble

      用戶提交任務傳遞的Callable,自定義call方法,實現業務邏輯

      1. outcome

      任務結束時,outcome保存執行結果或者異常信息。

      1. runner

      當前任務被線程執行期間,保存當前任務的線程對象引用

      1. waiters

      因為會有很多線程去get當前任務的結果,所以這里使用了一種stack數據結構來保存

      FutureTask.run()實現原理

      我們已經知道在線程池runWorker()中最終會調用到FutureTask.run()方法中,我們就來看下它的執行原理吧:

      run()執行邏輯.png

      具體代碼如下:

      public class FutureTask<V> implements RunnableFuture<V> {
          public void run() {
              if (state != NEW ||
                  !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
                  return;
              try {
                  Callable<V> c = callable;
                  if (c != null && state == NEW) {
                      V result;
                      boolean ran;
                      try {
                          result = c.call();
                          ran = true;
                      } catch (Throwable ex) {
                          result = null;
                          ran = false;
                          setException(ex);
                      }
                      if (ran)
                          set(result);
                  }
              } finally {
                  runner = null;
                  int s = state;
                  if (s >= INTERRUPTING)
                      handlePossibleCancellationInterrupt(s);
              }
          }
      }
      

      首先是判斷FutureTaskstate狀態,必須是NEW才可以繼續執行。

      然后通過CAS修改runner引用為當前線程。

      接著執行用戶自定義的call()方法,將返回結果設置到result中,result可能為正常返回也可能為異常信息。這里主要是調用set()/setException()

      FutureTask.set()實現原理

      set()方法的實現很簡單,直接看下代碼:

      public class FutureTask<V> implements RunnableFuture<V> {
          protected void set(V v) {
              if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                  outcome = v;
                  UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
                  finishCompletion();
              }
          }
      }
      

      call()返回的數據賦值給全局變量outcome上,然后修改state狀態為NORMAL,最后調用finishCompletion()來做掛起線程的喚醒操作,這個方法等到get()后面再來講解。

      FutureTask.get()實現原理

      get().png

      接著看下代碼:

      public class FutureTask<V> implements RunnableFuture<V> {
          public V get() throws InterruptedException, ExecutionException {
              int s = state;
              if (s <= COMPLETING)
                  s = awaitDone(false, 0L);
              return report(s);
          }
      }
      

      如果FutureTaskstateNORMAL或者COMPLETING,說明當前任務并沒有執行完成,調用get()方法會被阻塞,具體的阻塞邏輯在awaitDone()方法:

      private int awaitDone(boolean timed, long nanos) throws InterruptedException {
      
              final long deadline = timed ? System.nanoTime() + nanos : 0L;
              WaitNode q = null;
              boolean queued = false;
              for (;;) {
                  if (Thread.interrupted()) {
                      removeWaiter(q);
                      throw new InterruptedException();
                  }
      
                  int s = state;
                  if (s > COMPLETING) {
                      if (q != null)
                          q.thread = null;
                      return s;
                  }
                  else if (s == COMPLETING)
                      Thread.yield();
                  else if (q == null)
                      q = new WaitNode();
                  else if (!queued)
                      queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
                  else if (timed) {
                      nanos = deadline - System.nanoTime();
                      if (nanos <= 0L) {
                          removeWaiter(q);
                          return state;
                      }
                      LockSupport.parkNanos(this, nanos);
                  }
                  else
                      LockSupport.park(this);
              }
          }
      

      這個方法可以說是FutureTask中最核心的方法了,一步步來分析:

      如果timed不為空,這說明指定nanos時間還未返回結果,線程就會退出。

      q是一個WaitNode對象,是將當前引用線程封裝在一個stack數據結構中,WaitNode對象屬性如下:

       static final class WaitNode {
          volatile Thread thread;
          volatile WaitNode next;
          WaitNode() { thread = Thread.currentThread(); }
      }
      

      接著判斷當前線程是否中斷,如果中斷則拋出中斷異常。

      下面就進入一輪輪的if... else if...判斷邏輯,我們還是采用分支的方式去分析。

      分支一:if (s > COMPLETING) {

      此時get()方法已經有結果了,無論是正常返回的結果,還是異常、中斷、取消等,此時直接返回state狀態,然后執行report()方法。

      分支二:else if (s == COMPLETING)

      條件成立,說明當前任務接近完成狀態,這里讓當前線程再釋放cpu,進行下一輪搶占cpu

      分支三:else if (q == null)

      第一次自旋執行,WaitNode還沒有初始化,初始化q=new WaitNode();

      分支四:else if (!queued){

      queued代表當前線程是否入棧,如果沒有入棧則進行入棧操作,順便將全局變量waiters指向棧頂元素。

      分支五/六:LockSupport.park

      如果設置了超時時間,則使用parkNanos來掛起當前線程,否則使用park()

      經過這么一輪自旋循環后,如果執行call()還沒有返回結果,那么調用get()方法的線程都會被掛起。

      被掛起的線程會等待run()返回結果后依次喚醒,具體的執行邏輯在finishCompletion()中。

      最終stack結構中數據如下:

      WaitNode.png

      FutureTask.finishCompletion()實現原理

      finishCompletion().png

      具體實現代碼如下:

      private void finishCompletion() {
          for (WaitNode q; (q = waiters) != null;) {
              if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                  for (;;) {
                      Thread t = q.thread;
                      if (t != null) {
                          q.thread = null;
                          LockSupport.unpark(t);
                      }
                      WaitNode next = q.next;
                      if (next == null)
                          break;
                      q.next = null;
                      q = next;
                  }
                  break;
              }
          }
      
          done();
      
          callable = null;
      }
      

      代碼實現很簡單,看過get()方法后,我們知道所有調用get()方法的線程,在run()還沒有返回結果前,都會保存到一個有WaitNode構成的statck數據結構中,而且每個線程都會被掛起。

      這里是遍歷waiters棧頂元素,然后依次查詢起next節點進行喚醒,喚醒后的節點接著會往后調用report()方法。

      FutureTask.report()實現原理

      report().png

      具體代碼如下:

      private V report(int s) throws ExecutionException {
          Object x = outcome;
          if (s == NORMAL)
              return (V)x;
          if (s >= CANCELLED)
              throw new CancellationException();
          throw new ExecutionException((Throwable)x);
      }
      

      這個方法很簡單,因為執行到了這里,說明當前state狀態肯定大于COMPLETING,判斷如果是正常返回,那么返回outcome數據。

      如果state是取消狀態,拋出CancellationException異常。

      如果狀態都不滿足,則說明執行中出現了差錯,直接拋出ExecutionException

      FutureTask.cancel()實現原理

      cancel().png

      public boolean cancel(boolean mayInterruptIfRunning) {
          if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
              return false;
          try {
              if (mayInterruptIfRunning) {
                  try {
                      Thread t = runner;
                      if (t != null)
                          t.interrupt();
                  } finally {
                      UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                  }
              }
          } finally {
              finishCompletion();
          }
          return true;
      }
      

      cancel()方法的邏輯很簡單,就是修改state狀態為CANCELLED,然后調用finishCompletion()來喚醒等待的線程。

      這里如果mayInterruptIfRunning,就會先中斷當前線程,然后再去喚醒等待的線程。

      總結

      FutureTask的實現原理其實很簡單,每個方法基本上都畫了一個簡單的流程圖來方便立即。

      后面還打算分享一個BlockingQueue相關的源碼解讀,這樣線程池也可以算是完結了。

      在這之前可能會先分享一個SpringCloud常見配置代碼分析、最佳實踐等手冊,方便工作中使用,也是對之前看過的源碼一種總結。敬請期待!
      歡迎關注:
      原創干貨分享.png

      posted @ 2020-06-01 10:15  一枝花算不算浪漫  閱讀(2169)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 久久se精品一区二区三区| 国产亚洲精品第一综合另类| 最近中文字幕日韩有码| 黑人巨茎大战白人美女| 亚洲www永久成人网站| 亚洲天堂在线观看完整版| 日本久久久久久久做爰片日本| 伊人色综合久久天天| 国产精品福利片在线观看| 一本久久a久久精品综合| 国产初高中生粉嫩无套第一次| av午夜福利一片免费看久久| 中文无码av一区二区三区| 黄色免费在线网址| 国产成人无码区免费内射一片色欲| 精品欧美一区二区三区久久久| 国产精品一码在线播放| 久久国产乱子精品免费女| 在线视频中文字幕二区| 日本亚洲一区二区精品久久| 精品久久精品久久精品九九| 国产成人剧情AV麻豆果冻| 国产精品高清一区二区三区| 99久久亚洲综合精品成人| 亚洲av激情综合在线| 99热门精品一区二区三区无码| 国产精品午夜福利合集| 日日噜噜夜夜爽爽| 久久99精品国产99久久6尤物| 国精产品自偷自偷ym使用方法| 亚洲欧美在线观看品| 国产成人亚洲精品狼色在线| 丝袜国产一区av在线观看| 亚洲精品一区二区三区大桥未久 | 亚洲韩国精品无码一区二区三区 | 十八禁午夜福利免费网站| 国产极品美女高潮抽搐免费网站| 亚洲欧美成人综合久久久| 国产欧美综合在线观看第十页| 先锋影音av最新资源| 乱码中文字幕|