<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Loading

      虛擬線程 - VirtualThread源碼透視

      前提

      JDK192022-09-20發布GA版本,該版本提供了虛擬線程的預覽功能。下載JDK19之后翻看了一下有關虛擬線程的一些源碼,跟早些時候的Loom項目構建版本基本并沒有很大出入,也跟第三方JDK如鵝廠的Kona虛擬線程實現方式基本一致,這里分析一下虛擬線程設計與源碼實現。

      Platform Thread與Virtual Thread

      因為引入了虛擬線程,原來JDK存在java.lang.Thread類,俗稱線程,為了更好地區分虛擬線程和原有的線程類,引入了一個全新類java.lang.VirtualThreadThread類的一個子類型),直譯過來就是"虛擬線程"。

      • 題外話:在Loom項目早期規劃里面,核心API其實命名為Fiber,直譯過來就是"纖程"或者"協程",后來成為了廢案,在一些歷史提交的Test類或者文檔中還能看到類似于下面的代碼:
      // java.lang.Fiber
      Fiber f = Fiber.execute({
          out.println("Good morning");
          readLock.lock();
          try{
              out.println("Good night");
          } finally{
              readLock.unlock();
          }
          out.println("Good night");
      });
      

      Thread在此基礎上做了不少兼容性工作。此外,還應用了建造者模式引入了線程建造器,提供了靜態工廠方法Thread#ofPlatform()Thread#ofVirtual()分別用于實例化Thread(工廠)建造器和VirtualThread(工廠)建造器,顧名思義,兩種建造器分別用于創建Thread或者VirtualThread,例如:

      // demo-1 build platform thread
      Thread platformThread = Thread.ofPlatform().daemon().name("worker").unstarted(runnable);
      
      // demo-2 create platform thread factory
      ThreadFactory platformThreadFactory = Thread.ofPlatform().daemon().name("worker-", 0).factory();
      
      // demo-3 build virtual thread
      Thread virtualThread = Thread.ofVirtual().name("virtual-worker").unstarted(runnable);
      
      // demo-4 create virtual thread factory
      ThreadFactory virtualThreadFactory = Thread.ofVirtual().name("virtual-worker-", 0).factory();
      

      更新的JDK文檔中也把原來的Thread稱為Platform Thread,可以更明晰地與Virtual Thread區分開來。這里Platform Thread直譯為"平臺線程",其實就是"虛擬線程"出現之前的老生常談的"線程"。

      后文會把Platform Thread稱為平臺線程,Virtual Thread稱為虛擬線程,或者直接用其英文名稱

      那么平臺線程與虛擬線程的聯系和區別是什么?JDK中的每個java.lang.Thread實例也就是每個平臺線程實例都在底層操作系統線程上運行Java代碼,并且平臺線程在運行代碼的整個生命周期內捕獲系統線程。可以得出一個結論,平臺線程與底層系統線程是一一對應的,平臺線程實例本質是由系統內核的線程調度程序進行調度,并且平臺線程的總數量受限于系統線程的總數量。

      vt-source-code-1

      總的來說,平臺線程有下面的一些特點或者說限制:

      • 資源有限導致系統線程總量有限,進而導致與系統線程一一對應的平臺線程有限
      • 平臺線程的調度依賴于系統的線程調度程序,當平臺線程創建過多,會消耗大量資源用于處理線程上下文切換
      • 每個平臺線程都會開辟一塊私有的棧空間,大量平臺線程會占據大量內存

      這些限制導致開發者不能極大量地創建平臺線程,為了滿足性能需要,需要引入池化技術、添加任務隊列構建消費者-生產者模式等方案去讓平臺線程適配多變的現實場景。顯然,開發者們迫切需要一種輕量級線程實現,剛好可以彌補上面提到的平臺線程的限制,這種輕量級線程可以滿足:

      • 可以大量創建,例如十萬級別、百萬級別,而不會占據大量內存
      • JVM進行調度和狀態切換,并且與系統線程"松綁"
      • 用法與原來平臺線程差不多,或者說盡量兼容平臺線程現存的API

      Loom項目中開發的虛擬線程就是為了解決這個問題,看起來它的運行示意圖如下:

      vt-source-code-2

      當然,平臺線程不是簡單地與虛擬線程進行1:N的綁定,后面的章節會深入分析虛擬線程的運行原理。

      虛擬線程實現原理

      虛擬線程是一種輕量級(用戶模式)線程,這種線程是由Java虛擬機調度,而不是操作系統。虛擬線程占用空間小,任務切換開銷幾乎可以忽略不計,因此可以極大量地創建和使用??傮w來看,虛擬線程實現如下:

      virtual thread = continuation + scheduler
      

      虛擬線程會把任務(一般是java.lang.Runnable)包裝到一個Continuation實例中:

      • 當任務需要阻塞掛起的時候,會調用Continuationyield操作進行阻塞
      • 當任務需要解除阻塞繼續執行的時候,Continuation會被繼續執行

      Scheduler也就是執行器,會把任務提交到一個載體線程池中執行:

      • 執行器是java.util.concurrent.Executor的子類
      • 虛擬線程框架提供了一個默認的ForkJoinPool用于執行虛擬線程任務

      下文會把carrier thread稱為"載體線程",指的是負責執行虛擬線程中任務的平臺線程,或者說運行虛擬線程的平臺線程稱為它的載體線程

      操作系統調度系統線程,而Java平臺線程與系統線程一一映射,所以平臺線程被操作系統調度,但是虛擬線程是由JVM調度。JVM把虛擬線程分配給平臺線程的操作稱為mount(掛載),反過來取消分配平臺線程的操作稱為unmount(卸載):

      • mount操作:虛擬線程掛載到平臺線程,虛擬線程中包裝的Continuation棧數據幀或者引用棧數據會被拷貝到平臺線程的線程棧,這是一個從堆復制到棧的過程
      • unmount操作:虛擬線程從平臺線程卸載,大多數虛擬線程中包裝的Continuation棧數據幀會留在堆內存中

      這個mount -> run -> unmount過程用偽代碼表示如下:

      mount();
      try {
          Continuation.run();
      } finally {
          unmount();
      }
      

      Java代碼的角度來看,虛擬線程和它的載體線程暫時共享一個OS線程實例這個事實是不可見,因為虛擬線程的堆棧跟蹤和線程本地變量與平臺線程是完全隔離的。JDK中專門是用了一個FIFO模式的ForkJoinPool作為虛擬線程的調度程序,從這個調度程序看虛擬線程任務的執行流程大致如下:

      • 調度器(線程池)中的平臺線程等待處理任務

      vt-source-code-5

      • 一個虛擬線程被分配平臺線程,該平臺線程作為運載線程執行虛擬線程中的任務

      vt-source-code-6

      • 虛擬線程運行其Continuation,從而執行基于Runnable包裝的用戶任務

      vt-source-code-7

      • 虛擬線程任務執行完成,標記Continuation終結,標記虛擬線程為終結狀態,清空一些上下文變量,運載線程"返還"到調度器(線程池)中作為平臺線程等待處理下一個任務

      vt-source-code-5

      上面是描述一般的虛擬線程任務執行情況,在執行任務時候首次調用Continuation#run()獲取鎖(ReentrantLock)的時候會觸發Continuationyield操作讓出控制權,等待虛擬線程重新分配運載線程并且執行,見下面的代碼:

      public class VirtualThreadLock {
      
          public static void main(String[] args) throws Exception {
              ReentrantLock lock = new ReentrantLock();
              Thread.startVirtualThread(() -> {
                  lock.lock();     // <------ 這里確保鎖已經被另一個虛擬線程持有
              });
              Thread.sleep(1000);
              Thread.startVirtualThread(() -> {
                  System.out.println("first");
                  lock.lock();
                  try {
                      System.out.println("second");
                  } finally {
                      lock.unlock();
                  }
                  System.out.println("third");
              });
              Thread.sleep(Long.MAX_VALUE);
          }
      }
      
      • 虛擬線程中任務執行時候首次調用Continuation#run()執行了部分任務代碼,然后嘗試獲取鎖,會導致Continuationyield操作讓出控制權(任務切換),也就是unmount,運載線程棧數據會移動到Continuation棧的數據幀中,保存在堆內存,虛擬線程任務完成(但是虛擬線程沒有終結,同時其Continuation也沒有終結和釋放),運載線程被釋放到執行器中等待新的任務;如果Continuationyield操作失敗,則會對運載線程進行park調用,阻塞在運載線程上

      vt-source-code-8

      • 當鎖持有者釋放鎖之后,會喚醒虛擬線程獲取鎖(成功后),虛擬線程會重新進行mount,讓虛擬線程任務再次執行,有可能是分配到另一個運載線程中執行,Continuation棧會的數據幀會被恢復到運載線程棧中,然后再次調用Continuation#run()恢復任務執行:

      vt-source-code-9

      • 最終虛擬線程任務執行完成,標記Continuation終結,標記虛擬線程為終結狀態,清空一些上下文變量,運載線程"返還"到調度器(線程池)中作為平臺線程等待處理下一個任務

      Continuation組件十分重要,它既是用戶真實任務的包裝器,也是任務切換虛擬線程與平臺線程之間數據轉移的一個句柄,它提供的yield操作可以實現任務上下文的中斷和恢復。由于Continuation被封閉在java.base/jdk.internal.vm下,可以通過增加編譯參數--add-exports java.base/jdk.internal.vm=ALL-UNNAMED暴露對應的功能,從而編寫實驗性案例,IDEA中可以按下圖進行編譯參數添加:

      vt-source-code-10

      然后編寫和運行下面的例子:

      import jdk.internal.vm.Continuation;
      import jdk.internal.vm.ContinuationScope;
      
      public class ContinuationDemo {
      
          public static void main(String[] args) {
              ContinuationScope scope = new ContinuationScope("scope");
              Continuation continuation = new Continuation(scope, () -> {
                  System.out.println("Running before yield");
                  Continuation.yield(scope);
                  System.out.println("Running after yield");
              });
              System.out.println("First run");
              // 第一次執行Continuation.run
              continuation.run();
              System.out.println("Second run");
              // 第二次執行Continuation.run
              continuation.run();
              System.out.println("Done");
          }
      }
      
      // 運行代碼,神奇的結果出現了
      First run
      Running before yield
      Second run
      Running after yield
      Done
      

      這里可以看出Continuation的奇妙之處,Continuation實例進行yield調用后,再次調用其run方法就可以從yield的調用之處往下執行,從而實現了程序的中斷和恢復。

      源碼分析

      主要包括:

      • Continuation
      • VirtualThread
      • 線程建造器

      Continuation

      Continuation直譯為"連續",一般來說表示一種語言構造,使語言可以在任意點保存執行狀態并且在之后的某個點返回。在JDK中對應類jdk.internal.vm.Continuation,這個類只有一句類注釋A one-shot delimited continuation,直譯為一個只能執行一次的回調函數。由于Continuation的成員和方法缺少詳細的注釋,并且大部分功能由JVM實現,這里只能閱讀其一些骨干源碼和上一小節編寫的Continuation相關例子去了解其實現(筆者C語言比較薄弱,有興趣的可以翻閱JVM的源碼)。先看成員變量和構造函數:

      // 判斷是否需要保留當前線程的本地緩存,由系統參數jdk.preserveExtentLocalCache決定
      private static final boolean PRESERVE_EXTENT_LOCAL_CACHE;
      
      // 真正要被執行的任務實例
      private final Runnable target;
      
      // 標識Continuation的范圍,
      private final ContinuationScope scope;
      
      // Continuation的父節點,如果為空的時候則為本地線程棧
      private Continuation parent;
      
      // Continuation的子節點,非空時候說明在子Continuation中進行了yield操作
      private Continuation child;
      
      // 猜測為Continuation棧結構,由JVM管理,無法得知其真實作用
      private StackChunk tail;
      
      // 標記Continuation是否已經完成
      private boolean done;
      
      // 標記是否進行了mount操作
      private volatile boolean mounted = false;
      
      // yield操作時候設置的信息
      private Object yieldInfo;
      
      // 標記一個未掛載的Continuation是否通過強制搶占式卸載
      private boolean preempted;
      
      // 保留當前線程的本地緩存的副本
      private Object[] extentLocalCache;
      
      // 構造函數,要求傳入范圍和任務包裝實例
      public Continuation(ContinuationScope scope, Runnable target) {
          this.scope = scope;
          this.target = target;
      }
      

      Continuation是一個雙向鏈表設計,它的唯一一組構造參數是ContinuationScopeRunnable

      vt-source-code-11

      這里不深入研究內部StackChunk、Pinned等實現,直接看run、enter系列方法和yield方法:

      // Continuation.run()
      public final void run() {
          // 設置死循環
          while (true) {
              // 進行mount操作
              mount();
              JLA.setExtentLocalCache(extentLocalCache);
              // 如果Continuation已完成則拋出異常
              if (done)
                  throw new IllegalStateException("Continuation terminated");
              // 獲取當前虛擬線程分配的運載線程
              Thread t = currentCarrierThread();
              if (parent != null) {
                  if (parent != JLA.getContinuation(t))
                      throw new IllegalStateException();
              } else
                  this.parent = JLA.getContinuation(t);
              // 運載線程設置當前Continuation實例
              JLA.setContinuation(t, this);
      
              try {
                  // 判斷ContinuationScope是否虛擬線程范圍
                  boolean isVirtualThread = (scope == JLA.virtualThreadContinuationScope());
                  if (!isStarted()) { // is this the first run? (at this point we know !done)
                      // 激活enter系列方法,標記isContinue為false,標記是否虛擬線程范圍
                      enterSpecial(this, false, isVirtualThread);
                  } else {
                      assert !isEmpty();
                      // 激活enter系列方法,標記isContinue為true,標記是否虛擬線程范圍
                      enterSpecial(this, true, isVirtualThread);
                  }
              } finally {
                  // 設置內存屏障
                  fence();
                  try {
                      assert isEmpty() == done : "empty: " + isEmpty() + " done: " + done + " cont: " + Integer.toHexString(System.identityHashCode(this));
                      // 當前Continuation執行完成后,把運載線程的Continuation指向父Continuation
                      JLA.setContinuation(currentCarrierThread(), this.parent);
                      if (parent != null)
                          parent.child = null;
                      // 進行后置的yield清理工作
                      postYieldCleanup();
                      // 進行unmount操作
                      unmount();
                      // 判斷是否需要保留當前線程的本地緩存并處理
                      if (PRESERVE_EXTENT_LOCAL_CACHE) {
                          extentLocalCache = JLA.extentLocalCache();
                      } else {
                          extentLocalCache = null;
                      }
                      JLA.setExtentLocalCache(null);
                  } catch (Throwable e) { e.printStackTrace(); System.exit(1); }
              }
              // we're now in the parent continuation
      
              assert yieldInfo == null || yieldInfo instanceof ContinuationScope;
              // 父Continuation的yieldInfo緩存當前的scope實例,清空當前Continuation的父節點和yieldInfo
              if (yieldInfo == null || yieldInfo == scope) {
                  this.parent = null;
                  this.yieldInfo = null;
                  // 這個位置是死循環的唯一跳出點
                  return;
              } else {
                  // 執行到這個位置說明在當前是子Continuation并且進行了yield操作,需要跳轉到父Continuation進行yield操作
                  parent.child = this;
                  parent.yield0((ContinuationScope)yieldInfo, this);
                  parent.child = null;
              }
          }
      }
      
      // Continuation.enter()系列方法
      
      // 這是一個native方法,它最終會根據判斷回調到enter()方法
      private native static void enterSpecial(Continuation c, boolean isContinue, boolean isVirtualThread);
      
      // Continuation的入口方法,用戶任務回調的入口
      @DontInline
      @IntrinsicCandidate
      private static void enter(Continuation c, boolean isContinue) {
          // This method runs in the "entry frame".
          // A yield jumps to this method's caller as if returning from this method.
          try {
              c.enter0();
          } finally {
              c.finish();
          }
      }
      
      // 真正任務包裝器執行的回調方法
      private void enter0() {
          target.run();
      }
      
      // Continuation完成,標記done為true
      private void finish() {
          done = true;
          assert isEmpty();
      }
      
      
      // Continuation.yield()方法,靜態方法
      public static boolean yield(ContinuationScope scope) {
          // 獲取當前運載線程的Continuation實例
          Continuation cont = JLA.getContinuation(currentCarrierThread());
          Continuation c;
          // 基于Continuation實例當前向父節點遍歷,直到匹配虛擬線程類型的ContinuationScope的Continuation,如果沒有匹配的Continuation會拋出異常中斷流程
          for (c = cont; c != null && c.scope != scope; c = c.parent)
              ;
          if (c == null)
              throw new IllegalStateException("Not in scope " + scope);
          // 把當前的Continuation掛起到給定的ContinuationScope
          return cont.yield0(scope, null);
      }
      
      // 透過上下文猜測是當前的Continuation實例掛起到給定的ContinuationScope
      private boolean yield0(ContinuationScope scope, Continuation child) {
          // 強制搶占式卸載標記為false
          preempted = false;
          // 如果當前Continuation實例的yieldInfo不等于傳入的ContinuationScope實例,則進行更新,相等的情況下yieldInfo會保持是一個空值
          if (scope != this.scope)
              this.yieldInfo = scope;
          // 最終的yield調用,最終當前Continuation就是阻塞在此方法,從下文源碼猜測,當該方法喚醒后,res值為0的時候,當前Continuation實例會繼續執行,返回其他值的時候則會打印pined線程棧
          int res = doYield();
          // 放置內存屏障防止指令重排,后面注釋提到是防止編譯器進行某些轉換
          U.storeFence(); // needed to prevent certain transformations by the compiler
      
          assert scope != this.scope || yieldInfo == null : "scope: " + scope + " this.scope: " + this.scope + " yieldInfo: " + yieldInfo + " res: " + res;
          assert yieldInfo == null || scope == this.scope || yieldInfo instanceof Integer : "scope: " + scope + " this.scope: " + this.scope + " yieldInfo: " + yieldInfo + " res: " + res;
      
          if (child != null) { // TODO: ugly <----- 這個位置還有一句吐槽的代碼注釋:丑陋的代碼
              if (res != 0) {
                  child.yieldInfo = res;
              } else if (yieldInfo != null) {
                  assert yieldInfo instanceof Integer;
                  child.yieldInfo = yieldInfo;
              } else {
                  child.yieldInfo = res;
              }
              this.yieldInfo = null;
          } else {
              if (res == 0 && yieldInfo != null) {
                  res = (Integer)yieldInfo;
              }
              this.yieldInfo = null;
      
              if (res == 0)
                  // Continuation實例繼續執行前回調
                  onContinue();
              else
                  // Continuation固定在運載線程前回調,res是pined的級別
                  onPinned0(res);
          }
          assert yieldInfo == null;
          // 返回布爾值結果表示當前Continuation實例是否會繼續執行
          return res == 0;
      }
      
      // 最終的yield調用,看實現是拋出異常,猜測是由JVM實現
      @IntrinsicCandidate
      private static int doYield() { throw new Error("Intrinsic not installed"); }
      

      說實話,Continuation源碼的可讀性比想象中低,連代碼注釋也留下了"丑陋的"這句吐槽。通過上面源碼分析和上一節Continuation的一個例子,可以得知Continuation#yield()可以讓程序代碼中斷,然后再次調用Continuation#run()可以從上一個中斷位置繼續執行,JVM在這個過程中為使用者屏蔽了Continuation和運行此Continuation的平臺線程之間的交互細節,讓使用者可以專注實際的任務開發即可。

      VirtualThread

      前面花了不少篇幅介紹Continuation,它是一個全新的API。已有的JUC類庫已經十分完善,如果可以把Continuation融入到已有的JUC體系,那么就可以通過線程池技術去管理運載線程,原有的大多數并發相關API也能直接在協程體系中使用。從這個背景來看,創造一個Thread類的全新子類用于融合JUCContinuation是十分合適的,這樣通過很小的改造成本就能通過Java繼承特性把這個全新子類適配JUC體系,也能擴展一些API讓它適配協程新引入的特性,這個全新的子類就是java.lang.VirtualThread

      vt-source-code-12

      VirtualThread類的繼承體系如下:

      package java.lang;
      
      final class VirtualThread extends BaseVirtualThread {
        // ...
      }
      
      package java.lang;
      
      sealed abstract class BaseVirtualThread extends Thread
              permits VirtualThread, ThreadBuilders.BoundVirtualThread {
        // ... 
      }
      

      VirtualThreadBaseVirtualThread的子類,而BaseVirtualThread是一個"密封類",它是Thread的子類,只對VirtualThreadThreadBuilders.BoundVirtualThread開放,并且VirtualThread包私有訪問權限的同時用final關鍵字修飾,無法被繼承。接著看VirtualThread的成員變量和構造函數:

      // java.lang.VirtualThread
      
      // Unsafe實例
      private static final Unsafe U = Unsafe.getUnsafe();
      
      // 虛擬線程的ContinuationScope靜態常量
      private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
      
      // 調度器,或者說執行器,默認就是用此調度器運行虛擬線程
      private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
      
      // 調度線程池實例,用于喚醒帶超時阻塞的虛擬線程實例,主要用于sleep的喚醒
      private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();
      
      // pin模式,也就是pined thread的跟蹤模式,決定打印堆棧的詳細程度,來自于系統參數jdk.tracePinnedThreads,full表示詳細,short表示簡略
      private static final int TRACE_PINNING_MODE = tracePinningMode();
      
      // 下面幾個都是成員地址,用于Unsafe直接操作成員
      private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
      private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
      private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
      private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
      
      // 調度器實例
      private final Executor scheduler;
      
      // Continuation實例
      private final Continuation cont;
      
      // Continuation實例的Runnable包裝實例
      private final Runnable runContinuation;
      
      // 虛擬線程狀態,這個值由JVM訪問和修改
      private volatile int state;
      
      // 下面的狀態集合
      private static final int NEW      = 0;
      private static final int STARTED  = 1;
      private static final int RUNNABLE = 2;     // runnable-unmounted
      private static final int RUNNING  = 3;     // runnable-mounted
      private static final int PARKING  = 4;
      private static final int PARKED   = 5;     // unmounted
      private static final int PINNED   = 6;     // mounted
      private static final int YIELDING = 7;     // Thread.yield
      private static final int TERMINATED = 99;  // final state
      
      // 虛擬線程unmount后可以從調度過程中掛起的狀態
      private static final int SUSPENDED = 1 << 8;
      private static final int RUNNABLE_SUSPENDED = (RUNNABLE | SUSPENDED);
      private static final int PARKED_SUSPENDED   = (PARKED | SUSPENDED);
      
      // park操作許可
      private volatile boolean parkPermit;
      
      // 運載線程實例
      private volatile Thread carrierThread;
      
      // 終結倒數柵欄實例,主要用于join操作
      private volatile CountDownLatch termination;
      
      // 唯一構造函數
      VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
          // 默認標記bound為false,當bound為true的時候標記為綁定到系統線程
          super(name, characteristics, /*bound*/ false);
          Objects.requireNonNull(task);
          // 如果傳入的調度器實例非空則直接使用
          // 否則,如果父線程是虛擬線程,則使用父虛擬線程的調度器實例
          // 如果傳入的調度器實例為空,父線程為平臺線程,那么使用默認的調度器
          // choose scheduler if not specified
          if (scheduler == null) {
              Thread parent = Thread.currentThread();
              if (parent instanceof VirtualThread vparent) {
                  scheduler = vparent.scheduler;
              } else {
                  scheduler = DEFAULT_SCHEDULER;
              }
          }
          // 賦值調度器
          this.scheduler = scheduler;
          // 封裝和初始化Continuation
          this.cont = new VThreadContinuation(this, task);
          // 初始化Continuation的Runnable包裝器,最終提交到調度器中執行
          this.runContinuation = this::runContinuation;
      }
      
      // 虛擬線程Continuation的專有子類,默認為ContinuationScope("VirtualThreads"),從而實現Continuation.enter()執行時候實際上執行的是VirtualThread.run()方法
      // 也就是 Runnable.run()[runContinuation by carrier thread from executor] --> Continuation.run() --> Continuation.enter() --> VirtualThread.run() --> Runnable.run()[user task]
      private static class VThreadContinuation extends Continuation {
      
          VThreadContinuation(VirtualThread vthread, Runnable task) {
              super(VTHREAD_SCOPE, () -> vthread.run(task));
          }
      
          // pin之前回調的方法,基于TRACE_PINNING_MODE的返回值決定pinned線程棧的打印詳略
          @Override
          protected void onPinned(Continuation.Pinned reason) {
              if (TRACE_PINNING_MODE > 0) {
                  boolean printAll = (TRACE_PINNING_MODE == 1);
                  PinnedThreadPrinter.printStackTrace(System.out, printAll);
              }
          }
      }
      
      // 在當前線程上運行或繼續Continuation的執行,必須由平臺線程運行此方法,最終會封裝為Runnble包裝器提交到執行器中運行
      private void runContinuation() {
          // the carrier must be a platform thread
          if (Thread.currentThread().isVirtual()) {
              throw new WrongThreadException();
          }
      
          // set state to RUNNING
          boolean firstRun;
          int initialState = state();
          // 當前為STARTED狀態并且CAS更新為RUNNING狀態則標記首次運行為true
          if (initialState == STARTED && compareAndSetState(STARTED, RUNNING)) {
              // first run
              firstRun = true;
          } else if (initialState == RUNNABLE && compareAndSetState(RUNNABLE, RUNNING)) {
              // 當前為RUNNABLE狀態并且CAS更新為RUNNING狀態則標記首次運行為false,并且設置park許可為false
              // consume parking permit
              setParkPermit(false);
              firstRun = false;
          } else {
              // not runnable
              return;
          }
      
          // notify JVMTI before mount
          if (notifyJvmtiEvents) notifyJvmtiMountBegin(firstRun);
      
          try {
              // 執行Continuation.run()
              cont.run();
          } finally {
              // Continuation執行完成,回調鉤子方法afterTerminate
              if (cont.isDone()) {
                  afterTerminate(/*executed*/ true);
              } else {
                  // Continuation沒有執行完成,說明調用了Continuation.yield或者pin到運載線程中進行了park操作
                  afterYield();
              }
          }
      }
      
      // Continuation執行完成回調的鉤子方法
      private void afterTerminate(boolean executed) {
          assert (state() == TERMINATED) && (carrierThread == null);
      
          if (executed) {
              if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(true);
          }
      
          // 如果有其他線程阻塞等待虛擬線程的返回,例如調用了join方法,那么在這里解除阻塞
          CountDownLatch termination = this.termination;
          if (termination != null) {
              assert termination.getCount() == 1;
              termination.countDown();
          }
          
          // 如果執行成功則通知線程容器當前線程實例退出,清空線程本地變量引用
          if (executed) {
              // notify container if thread executed
              threadContainer().onExit(this);
      
              // clear references to thread locals
              clearReferences();
          }
      }
      
      // 由于Continuation的yield操作或者調用了Thread.yield()導致Continuation掛起,需要重新把Continuation的包裝器"懶提交"到調度器中
      private void afterYield() {
          int s = state();
          assert (s == PARKING || s == YIELDING) && (carrierThread == null);
          // 如果是PARKING狀態,這種對應于Continuation的yield操作調用
          if (s == PARKING) {
              // 更變為PARKED狀態
              setState(PARKED);
      
              // notify JVMTI that unmount has completed, thread is parked
              if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false);
      
              // 得到park許可,并且CAS為RUNNABLE狀態
              if (parkPermit && compareAndSetState(PARKED, RUNNABLE)) {
                  // 進行懶提交,如果可能的話,用當前線程作為運載線程繼續執行任務
                  lazySubmitRunContinuation();
              }
          } else if (s == YIELDING) {   // 如果是YIELDING狀態,這種對應于調用了Thread.yield
              // 更變為RUNNABLE狀態
              setState(RUNNABLE);
      
              // notify JVMTI that unmount has completed, thread is runnable
              if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false);
      
              // 進行懶提交,如果可能的話,用當前線程作為運載線程繼續執行任
              lazySubmitRunContinuation();
          }
      }
      

      這里唯一的構造函數是比較復雜的,拋開一些鉤子接口,最終想達到的效果就是:

      Runnable.run()[runContinuation by carrier thread from executor] --> Continuation.run() --> Continuation.enter() --> VirtualThread.run() --> Runnable.run()[user task]
      

      用戶任務實際被包裹了很多層,在最里面一層才會回調。VirtualThread中提供了兩個靜態全局的線程池實例,一個用于調度,一個用于喚醒,這里看看兩個線程池是如何構造的:

      // java.lang.VirtualThread
      
      private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
      private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();
      
      // 創建默認的調度器
      private static ForkJoinPool createDefaultScheduler() {
          // 線程工廠,默認創建CarrierThread實例,CarrierThread是ForkJoinWorkerThread的一個子類
          ForkJoinWorkerThreadFactory factory = pool -> {
              PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool);
              return AccessController.doPrivileged(pa);
          };
          PrivilegedAction<ForkJoinPool> pa = () -> {
              int parallelism, maxPoolSize, minRunnable;
              String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
              String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
              String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
              if (parallelismValue != null) {
                  parallelism = Integer.parseInt(parallelismValue);
              } else {
                  parallelism = Runtime.getRuntime().availableProcessors();
              }
              if (maxPoolSizeValue != null) {
                  maxPoolSize = Integer.parseInt(maxPoolSizeValue);
                  parallelism = Integer.min(parallelism, maxPoolSize);
              } else {
                  maxPoolSize = Integer.max(parallelism, 256);
              }
              if (minRunnableValue != null) {
                  minRunnable = Integer.parseInt(minRunnableValue);
              } else {
                  minRunnable = Integer.max(parallelism / 2, 1);
              }
              Thread.UncaughtExceptionHandler handler = (t, e) -> { };
              boolean asyncMode = true; // FIFO
              return new ForkJoinPool(parallelism, factory, handler, asyncMode,
                              0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
          };
          return AccessController.doPrivileged(pa);
      }
      
      // 創建調度線程池,用于虛擬線程帶超時時間的unpark操作
      private static ScheduledExecutorService createDelayedTaskScheduler() {
          String propValue = GetPropertyAction.privilegedGetProperty("jdk.unparker.maxPoolSize");
          int poolSize;
          if (propValue != null) {
              poolSize = Integer.parseInt(propValue);
          } else {
              // 確保至少有一個工作線程
              poolSize = 1;
          }
          ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
              Executors.newScheduledThreadPool(poolSize, task -> {
                  return InnocuousThread.newThread("VirtualThread-unparker", task);
              });
          // 任務取消后馬上從工作隊列移除
          stpe.setRemoveOnCancelPolicy(true);
          return stpe;
      }
      

      對于默認調度器(DEFAULT_SCHEDULER)的創建,它是一個ForkJoinPool實例,構造參數的選取如下:

      • parallelism參數由系統變量jdk.virtualThreadScheduler.parallelism決定,默認值為Runtime.getRuntime().availableProcessors(),如果配置了系統參數jdk.virtualThreadScheduler.maxPoolSize則取min(parallelism,maxPoolSize)
      • maxPoolSize參數由系統變量jdk.virtualThreadScheduler.maxPoolSize決定,默認值為min(parallelism, maxPoolSize)
      • minRunnable參數由系統變量jdk.virtualThreadScheduler.minRunnable決定,默認值為max(parallelism / 2, 1)
      • asyncMode參數固定值true,也就是選用FIFO模式
      • keepAliveTime參數為固定值30
      • saturate參數在JDK17引入,是一個Predicate函數,在此固定返回true,用于忽略minRunnable值允許線程池飽和
      • 線程工廠用于創建CarrierThread實例,CarrierThreadForkJoinWorkerThread的子類

      Intel 4C8T開發機器環境中,該ForkJoinPool實例創建時候的幾個參數分別為:parallelism = 8, maxPoolSize = 256, minRunnable = 4。

      對于調度線程池(UNPARKER)的創建,它是一個ScheduledThreadPoolExecutor實例,構造參數的選取如下:

      • corePoolSize參數由系統變量jdk.unparker.maxPoolSize決定,并且確保最小值為1
      • 線程工廠用于創建InnocuousThread實例,線程名稱為VirtualThread-unparker

      接著看虛擬線程的啟動方法start()

      // java.lang.VirtualThread
      
      @Override
      public void start() {
          start(ThreadContainers.root());
      }
      
      // 調度虛擬線程讓之運行
      @Override
      void start(ThreadContainer container) {
          // CAS由NEW轉換為STARTED狀態
          if (!compareAndSetState(NEW, STARTED)) {
              throw new IllegalThreadStateException("Already started");
          }
       
          // 綁定當前虛擬線程到線程容器
          setThreadContainer(container);
      
          // 標記為未啟動
          boolean started = false;
          // 回調start鉤子方法
          container.onStart(this); // may throw
          try {
              // 從給定容器繼承extent-local綁定參數
              inheritExtentLocalBindings(container);
              // 提交'runContinuation'任務到調度器
              submitRunContinuation();
              // 標記為啟動完成
              started = true;
          } finally {
              // 如果啟動失敗,則標記最終狀態和回調終結鉤子方法
              if (!started) {
                  setState(TERMINATED);
                  container.onExit(this);
                  afterTerminate(/*executed*/ false);
              }
          }
      }
      
      // 提交'runContinuation'任務到調度器
      private void submitRunContinuation() {
          submitRunContinuation(false);
      }
      
      // 提交'runContinuation'任務到調度器,lazySubmit參數決定是否"懶提交"
      private void submitRunContinuation(boolean lazySubmit) {
          try {
              if (lazySubmit && scheduler instanceof ForkJoinPool pool) {
                  // ForkJoinPool類型調度器并且lazySubmit為true,對runContinuation這個Runnable實例適配為ForkJoinTask類型,進行"懶提交"到ForkJoinPool
                  pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
              } else {
                  // 非ForkJoinPool類型調度器或者lazySubmit為false,直接使用Executor.execute()提交任務
                  scheduler.execute(runContinuation);
              }
          } catch (RejectedExecutionException ree) {
              // 線程池拒絕接收任務,發布提交失敗事件到JVM
              var event = new VirtualThreadSubmitFailedEvent();
              if (event.isEnabled()) {
                  event.javaThreadId = threadId();
                  event.exceptionMessage = ree.getMessage();
                  event.commit();
              }
              throw ree;
          }
      }
      

      ForkJoinPool#lazySubmit()JDK19新增的一個API,它的方法注釋如下:

      提交給定的任務,但不保證它最終會在沒有可用活動線程的情況下執行。在某些上下文中,這種方法可以通過依賴于特定于上下文的知識來減少競爭和開銷,即現有線程(如果在此池中操作,則可能包括調用線程)最終將可用來執行任務

      使用此方法提交的目的就是希望可以用當前調用線程去執行任務,對于首次提交Continuation任務可能作用不明顯,但是對于Continuation.yield()調用后的再次提交意義比較重大,因為這樣就可以把運行的Continuation.run()方法鏈分配到同一個運載線程實例,在開發者的角度就是虛擬線程任務執行中斷后恢復執行,執行任務的運載線程沒有改變。

      源碼中還可以發現,run()方法覆蓋了Thread#run()替換為空實現,因為VirtualThread最終是觸發Continuation#run(),這一點已經在start()方法進行提交和調度。最后分析虛擬線程的阻塞(不帶超時,也就是timeout = 0)、限時阻塞(timeout > 0)、join的實現。先看相對簡單的joinNanos()

      // java.lang.VirtualThread
      // Thread.join() --> VirtualThread.joinNanos()
      
      // 虛擬線程join調用
      boolean joinNanos(long nanos) throws InterruptedException {
          // 如果狀態為TERMINATED直接返回true
          if (state() == TERMINATED)
              return true;
          // 獲取數柵欄實例
          CountDownLatch termination = getTermination();
          // 再次驗證如果狀態為TERMINATED直接返回true
          if (state() == TERMINATED)
              return true;
      
          // 如果nanos為0則調用CountDownLatch.await()阻塞
          if (nanos == 0) {
              termination.await();
          } else {
              // 如果nanos大于0則調用CountDownLatch.await(nanos,TimeUnit)限時阻塞
              boolean terminated = termination.await(nanos, NANOSECONDS);
              if (!terminated) {
                  // 阻塞到超時時限過了返回,非解除阻塞下的正常返回
                  return false;
              }
          }
          assert state() == TERMINATED;
          // 解除阻塞下的正常返回
          return true;
      }
      
      // 懶創建終結倒數柵欄實例,設置資源值為1,這里用到CAS是考慮之前已經創建和保存到成員變量,如果已創建則直接選用成員變量的那個實例
      private CountDownLatch getTermination() {
          CountDownLatch termination = this.termination;
          if (termination == null) {
              termination = new CountDownLatch(1);
              if (!U.compareAndSetReference(this, TERMINATION, null, termination)) {
                  termination = this.termination;
              }
          }
          return termination;
      }
      

      接著看虛擬線程阻塞和限時阻塞的現實:

      // java.lang.VirtualThread
      // Thread.sleep() --> VirtualThread.sleepNanos()
      
      // 給定休眠時間讓當前虛擬線程休眠
      void sleepNanos(long nanos) throws InterruptedException {
          assert Thread.currentThread() == this;
          // nanos必須大于等于0
          if (nanos >= 0) {
              // 如果支持線程休眠事件發布則在休眠處理前后處理休眠事件,最終的休眠操作調用doSleepNanos()完成
              if (ThreadSleepEvent.isTurnedOn()) {
                  ThreadSleepEvent event = new ThreadSleepEvent();
                  try {
                      event.time = nanos;
                      event.begin();
                      doSleepNanos(nanos);
                  } finally {
                      event.commit();
                  }
              } else {
                  doSleepNanos(nanos);
              }
          }
      }
      
      // 讓當前線程休眠給定的睡眠時間(單位為納秒)。如果nanos為0時,線程將嘗試yield
      private void doSleepNanos(long nanos) throws InterruptedException {
          assert nanos >= 0;
          // 響應中斷清理中斷狀態,拋出中斷異常
          if (getAndClearInterrupt())
              throw new InterruptedException();
          if (nanos == 0) {
              // nanos為0的時候直接進行yield操作,具體是Continuation.yield()
              tryYield();
          } else {
              // park for the sleep time
              try {
                  long remainingNanos = nanos;
                  // 臨時變量記錄開始休眠時間
                  long startNanos = System.nanoTime();
                  while (remainingNanos > 0) {
                      // 剩余休眠時間大于0納秒,進行park操作
                      parkNanos(remainingNanos);
                      // 響應中斷清理中斷狀態,拋出中斷異常
                      if (getAndClearInterrupt()) {
                          throw new InterruptedException();
                      }
                      // 重新計算剩余休眠事件
                      remainingNanos = nanos - (System.nanoTime() - startNanos);
                  }
              } finally {
                  // park會消耗park許可,走到這里說明unpark了,可以重新設置許可
                  setParkPermit(true);
              }
          }
      }
      
      // 當前虛擬線程park(阻塞)直至指定等候時間,進行unpark操作或者中斷也能解除park狀態
      @Override
      void parkNanos(long nanos) {
          assert Thread.currentThread() == this;
      
          // 已經消耗了park許可或者處于中斷狀態,直接返回
          if (getAndSetParkPermit(false) || interrupted)
              return;
      
          // 當前虛擬線程park(阻塞)直至指定等候時間
          if (nanos > 0) {
              // 記錄開始park的時間
              long startTime = System.nanoTime();
              // 記錄是否yield成功
              boolean yielded;
              // 通過調度線程池提交一個延時執行的unpark任務,用于進行unpark操作解除當前虛擬線程阻塞等待
              Future<?> unparker = scheduleUnpark(nanos);
              // 設置為PARKING狀態
              setState(PARKING);
              try {
                  // 執行Continuation.yield()
                  yielded = yieldContinuation();
              } finally {
                  assert (Thread.currentThread() == this)
                          && (state() == RUNNING || state() == PARKING);
                  // 執行Continuation.yield()執行完畢后,如果該unparker任務未完成則進行取消操作
                  cancel(unparker);
              }
      
              // Continuation.yield()調用失敗,則重新計算等待時間并基于運載線程進行park操作
              if (!yielded) {
                  long deadline = startTime + nanos;
                  if (deadline < 0L)
                      deadline = Long.MAX_VALUE;
                  parkOnCarrierThread(true, deadline - System.nanoTime());
              }
          }
      }
      
      // 當前虛擬線程的運載線程park(阻塞)直至指定等候時間,這就是前面提到過的pinned thread產生的過程
      private void parkOnCarrierThread(boolean timed, long nanos) {
          assert state() == PARKING;
      
          var pinnedEvent = new VirtualThreadPinnedEvent();
          pinnedEvent.begin();
          // 設置狀態為PINNED
          setState(PINNED);
          try {
              // 如果沒有park許可,則不處理,否則使用Usafe的park api進行平臺線程阻塞
              if (!parkPermit) {
                  if (!timed) {
                      U.park(false, 0);
                  } else if (nanos > 0) {
                      U.park(false, nanos);
                  }
              }
          } finally {
              // 阻塞解除后狀態為RUNNING
              setState(RUNNING);
          }
      
          // 解除阻塞后此park操作消耗了park許可
          setParkPermit(false);
      
          pinnedEvent.commit();
      }
      
      @ChangesCurrentThread
      private Future<?> scheduleUnpark(long nanos) {
          Thread carrier = this.carrierThread;
          // need to switch to current platform thread to avoid nested parking
          carrier.setCurrentThread(carrier);
          try {
              return UNPARKER.schedule(() -> unpark(), nanos, NANOSECONDS);
          } finally {
              carrier.setCurrentThread(this);
          }
      }
      
      // 如果unpark任務未完成則取消它,這個過程需要切換到當前平臺線程以避免嵌套park操作
      @ChangesCurrentThread
      private void cancel(Future<?> future) {
          if (!future.isDone()) {
              Thread carrier = this.carrierThread;
              // need to switch to current platform thread to avoid nested parking
              carrier.setCurrentThread(carrier);
              try {
                  future.cancel(false);
              } finally {
                  carrier.setCurrentThread(this);
              }
          }
      }
      
      // unpark操作,重新啟用當前虛擬線程進行調度,如果虛擬線程處于park狀態會將它解除阻塞
      @Override
      @ChangesCurrentThread
      void unpark() {
          Thread currentThread = Thread.currentThread();
          // 重置park許可false -> true,并且判斷當前線程是虛擬線程
          if (!getAndSetParkPermit(true) && currentThread != this) {
              int s = state();
              // 命中虛擬線程PARKED狀態,則CAS設置為RUNNABLE狀態,并且重新提交Continuation的Runnable包裝器到調度器中,這個提交過程需要切換到當前運載線程,然后恢復為當前虛擬線程
              if (s == PARKED && compareAndSetState(PARKED, RUNNABLE)) {
                  if (currentThread instanceof VirtualThread vthread) {
                      Thread carrier = vthread.carrierThread;
                      carrier.setCurrentThread(carrier);
                      try {
                          submitRunContinuation();
                      } finally {
                          carrier.setCurrentThread(vthread);
                      }
                  } else {
                      submitRunContinuation();
                  }
              } else if (s == PINNED) {
                  // park操作基于運載線程阻塞,則調用Usafe的unpark api進行喚醒,喚醒后在parkOnCarrierThread()中會重新被修改為RUNNING狀態
                  synchronized (carrierThreadAccessLock()) {
                      Thread carrier = carrierThread;
                      if (carrier != null && state() == PINNED) {
                          U.unpark(carrier);
                      }
                  }
              }
          }
      }
      
      // 嘗試執行Continuation.yield()
      void tryYield() {
          assert Thread.currentThread() == this;
          // 設置狀態為YIELDING
          setState(YIELDING);
          try {
              // 執行Continuation.yield(),忽略返回值處理
              yieldContinuation();
          } finally {
              assert Thread.currentThread() == this;
              // 虛擬線程重新mount并且運行,設置為RUNNING狀態
              if (state() != RUNNING) {
                  assert state() == YIELDING;
                  setState(RUNNING);
              }
          }
      }
      
      // 執行Continuation.yield()
      private boolean yieldContinuation() {
          boolean notifyJvmti = notifyJvmtiEvents;
          // 當前虛擬線程進行unmount操作
          if (notifyJvmti) notifyJvmtiUnmountBegin(false);
          unmount();
          try {
              // 執行Continuation.yield()
              return Continuation.yield(VTHREAD_SCOPE);
          } finally {
              // 當前虛擬線程重新進行mount操作
              mount();
              if (notifyJvmti) notifyJvmtiMountEnd(false);
          }
      }
      

      總的來說就是:

      • 阻塞:通過Continuation.yield()調用實現阻塞,主要是提供給Thread.sleep()調用
      • 限時阻塞:Continuation.yield()調用之前計算喚醒時間并且向調度線程池(UNPARKER)提交一個延時執行unpark任務通過"懶提交"方式重新運行Continuation.run()調用鏈解除阻塞,主要是提供給Thread.sleep(long nanos)調用
      • join(Nanos):通過CountDownLatch.await()調用實現阻塞,在虛擬線程終結鉤子方法afterTerminate()中調用CountDownLatch.countDown()解除阻塞,join(Nanos)()方法主要是提供給Thread.join()調用
      • 特殊情況:如果Continuation.yield()調用失敗,則會通過Unsafe提供的park API阻塞在運載線程上,在unpark任務中通過Unsafe提供的unpark API解除阻塞

      分析完虛擬線程實現的核心代碼,這里總結一下虛擬線程的狀態切換,由于支持的狀態比較多,這里通過一張狀態圖進行展示:

      vt-source-code-13

      還有其他像獲取虛擬線程棧、JVM狀態通知、獲取虛擬線程狀態、狀態切換的CAS操作等方法限于篇幅這里就不展開分析。

      線程建造器

      線程建造器和線程工廠建造器用于快速創建平臺線程實例、平臺線程工廠實例、虛擬線程實例或者虛擬線程工廠實例。熟悉Builder模式的開發者看這個新引入的功能源碼應該比較輕松:

      // 內部類:java.lang.Thread.Builder
      // Builder只對OfPlatform、OfVirtual、BaseThreadBuilder開放繼承權限
      @PreviewFeature(feature = PreviewFeature.Feature.VIRTUAL_THREADS)
      public sealed interface Builder
              permits Builder.OfPlatform,
                      Builder.OfVirtual,
                      ThreadBuilders.BaseThreadBuilder {
      
          // 設置線程名稱
          Builder name(String name);
          
          // 設置線程名稱規則,最終線程名稱為:$prefix$start++
          // 如prefix: worker-, start: 0,則worker-0, worker-1.... worker-n
          Builder name(String prefix, long start);
      
          Builder allowSetThreadLocals(boolean allow);
      
          // 是否開啟InheritableThreadLocal
          Builder inheritInheritableThreadLocals(boolean inherit);
      
          // 設置未捕獲異常處理器
          Builder uncaughtExceptionHandler(UncaughtExceptionHandler ueh);
      
          // 設置非啟動前的任務實例
          Thread unstarted(Runnable task);
      
          // 設置任務實例并且馬上啟動
          Thread start(Runnable task);
      
          // 構建線程工廠實例
          ThreadFactory factory();
      
          // 平臺線程Builder接口
          @PreviewFeature(feature = PreviewFeature.Feature.VIRTUAL_THREADS)
          sealed interface OfPlatform extends Builder
                  permits ThreadBuilders.PlatformThreadBuilder {
      
              @Override OfPlatform name(String name);
              @Override OfPlatform name(String prefix, long start);
              @Override OfPlatform allowSetThreadLocals(boolean allow);
              @Override OfPlatform inheritInheritableThreadLocals(boolean inherit);
              @Override OfPlatform uncaughtExceptionHandler(UncaughtExceptionHandler ueh);
      
              // 設置平臺線程組
              OfPlatform group(ThreadGroup group);
      
              // 設置新建平臺線程是否為守護線程
              OfPlatform daemon(boolean on);
      
              // 判斷新建平臺線程是否為守護線程
              default OfPlatform daemon() {
                  return daemon(true);
              }
      
              // 設置優先級
              OfPlatform priority(int priority);
      
              // 設置線程棧大小
              OfPlatform stackSize(long stackSize);
          }
      
          // 虛擬線程Builder接口
          @PreviewFeature(feature = PreviewFeature.Feature.VIRTUAL_THREADS)
          sealed interface OfVirtual extends Builder
                  permits ThreadBuilders.VirtualThreadBuilder {
      
              @Override OfVirtual name(String name);
              @Override OfVirtual name(String prefix, long start);
              @Override OfVirtual allowSetThreadLocals(boolean allow);
              @Override OfVirtual inheritInheritableThreadLocals(boolean inherit);
              @Override OfVirtual uncaughtExceptionHandler(UncaughtExceptionHandler ueh);
          }
      }
      

      上面的Builder接口都在java.lang.ThreadBuilders中進行實現,因為整體實現比較簡單,這里只看全新引入的VirtualThreadFactoryVirtualThreadBuilder

      // 內部類:java.lang.ThreadBuilders.VirtualThreadFactory
      private static class VirtualThreadFactory extends BaseThreadFactory {
      
          // 執行器或者說調度器實例
          private final Executor scheduler;
          
          // 線程工廠構造函數基本與平臺線程工廠實現一致,但是必須提供執行器實例
          VirtualThreadFactory(Executor scheduler,
                               String name,
                               long start,
                               int characteristics,
                               UncaughtExceptionHandler uhe) {
              super(name, start, characteristics, uhe);
              this.scheduler = scheduler;
          }
      
          @Override
          public Thread newThread(Runnable task) {
              Objects.requireNonNull(task);
              // 獲取下一個虛擬線程名稱,start >= 0則為$name$start++,否則固定為name
              String name = nextThreadName();
              // 創建新的虛擬線程實例
              Thread thread = newVirtualThread(scheduler, name, characteristics(), task);
              UncaughtExceptionHandler uhe = uncaughtExceptionHandler();
              if (uhe != null)
                  // 設置未捕獲異常處理器
                  thread.uncaughtExceptionHandler(uhe);
              return thread;
          }
      }
      
      // 靜態方法:java.lang.ThreadBuilders#newVirtualThread()
      static Thread newVirtualThread(Executor scheduler,
                                     String name,
                                     int characteristics,
                                     Runnable task) {
          // 當前JVM支持Continuation,則創建初始化一個新的虛擬線程實例
          if (ContinuationSupport.isSupported()) {
              return new VirtualThread(scheduler, name, characteristics, task);
          } else {
              // 當前的JVM不支持Continuation,則虛擬線程退化為一個平臺線程的包裝類,要求執行器必須為空
              if (scheduler != null)
                  throw new UnsupportedOperationException();
              return new BoundVirtualThread(name, characteristics, task);
          }
      }
      
      // 內部類:java.lang.ThreadBuilders.VirtualThreadBuilder
      static final class VirtualThreadBuilder
              extends BaseThreadBuilder<OfVirtual> implements OfVirtual {
      
          // 執行器成員變量
          private Executor scheduler;
      
          VirtualThreadBuilder() {
          }
          
          // 目前VirtualThreadBuilder的構造都是默認修飾符,Executor只能在單元測試中調用
          // 也就是用戶無法設置Executor,因為所有虛擬線程默認都是由全局的ForkJoinPool調度
          // invoked by tests
          VirtualThreadBuilder(Executor scheduler) {
              if (!ContinuationSupport.isSupported())
                  throw new UnsupportedOperationException();
              this.scheduler = Objects.requireNonNull(scheduler);
          }
          
          // 創建虛擬線程實例,設置任務,處于非啟動狀態
          @Override
          public Thread unstarted(Runnable task) {
              Objects.requireNonNull(task);
              var thread = newVirtualThread(scheduler, nextThreadName(), characteristics(), task);
              UncaughtExceptionHandler uhe = uncaughtExceptionHandler();
              if (uhe != null)
                  thread.uncaughtExceptionHandler(uhe);
              return thread;
          }
      
          // 創建虛擬線程實例,設置任務并且馬上啟動
          @Override
          public Thread start(Runnable task) {
              Thread thread = unstarted(task);
              thread.start();
              return thread;
          }
          
          // 初始化虛擬線程工廠實例
          @Override
          public ThreadFactory factory() {
              return new VirtualThreadFactory(scheduler, name(), counter(), characteristics(),
                      uncaughtExceptionHandler());
          }
      }
      

      值得注意的是:虛擬線程實現上來看都是"守護線程",也就是說虛擬線程不需要設置daemon參數。平臺線程或者虛擬線程的建造器或者工廠實現都是包訪問權限的內部類,其父類使用了permits關鍵字指定繼承范圍,目前是只能通過鏈式設置值的方式初始化,無法修改其中的成員或者方法。

      其他探討

      其他探討主要包括:

      • 自定義執行器
      • 內存占用評估
      • 局限性
      • 適用場景
      • JUC親和性

      自定義執行器

      雖然虛擬線程建造器屏蔽了執行器Executor實例的公共訪問權限,在目前預留功能版本下只能所有虛擬線程的任務最終都是由全局的ForkJoinPool執行,可以通過VarHandle對其進行強制值設置,這樣就能修改虛擬線程底層的載體線程為我們自定義線程池中的平臺線程,例如這樣:

      public class VirtualThreadCustomExecutor {
      
          /**
           * virtual thread with custom executor
           * add VM options: --add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.lang.reflect=ALL-UNNAMED
           */
          public static void main(String[] args) throws Exception {
              ExecutorService carrier = Executors.newSingleThreadExecutor(runnable -> {
                  Thread thread = new Thread(runnable);
                  thread.setDaemon(true);
                  thread.setName("CustomVirtualCarrier");
                  return thread;
              });
              Thread.Builder.OfVirtual ofVirtual = Thread.ofVirtual();
              Class<?> klass = Class.forName("java.lang.ThreadBuilders$VirtualThreadBuilder");
              VarHandle varHandle = MethodHandles.privateLookupIn(klass, MethodHandles.lookup()).findVarHandle(klass, "scheduler", Executor.class);
              varHandle.set(ofVirtual, carrier);
              ThreadFactory factory = ofVirtual.name("VirtualWorker-", 0).allowSetThreadLocals(false).factory();
              ExecutorService virtualWorkerPool = Executors.newThreadPerTaskExecutor(factory);
              virtualWorkerPool.execute(() -> {
                  Thread thread = Thread.currentThread();
                  System.out.printf("first task ==> 線程名稱:%s,載體線程名稱:%s,是否虛擬線程:%s\n", thread.getName(), getCurrentCarrierThreadName(thread), thread.isVirtual());
              });
              virtualWorkerPool.execute(() -> {
                  Thread thread = Thread.currentThread();
                  System.out.printf("second task ==> 線程名稱:%s,載體線程名稱:%s,是否虛擬線程:%s\n", thread.getName(), getCurrentCarrierThreadName(thread), thread.isVirtual());
              });
              Thread.sleep(Long.MAX_VALUE);
          }
      
          private static String getCurrentCarrierThreadName(Thread currentThread) {
              if (currentThread.isVirtual()) {
                  try {
                      MethodHandle methodHandle = MethodHandles.privateLookupIn(Thread.class, MethodHandles.lookup())
                              .findStatic(Thread.class, "currentCarrierThread", MethodType.methodType(Thread.class));
                      Thread carrierThread = (Thread) methodHandle.invoke();
                      return carrierThread.getName();
                  } catch (Throwable e) {
                      e.printStackTrace();
                  }
              }
              return "UNKNOWN";
          }
      }
      
      // 運行結果
      first task ==> 線程名稱:VirtualWorker-0,載體線程名稱:CustomVirtualCarrier,是否虛擬線程:true
      second task ==> 線程名稱:VirtualWorker-1,載體線程名稱:CustomVirtualCarrier,是否虛擬線程:true
      

      可以看到最終效果,虛擬線程中的任務最終在自定義線程池中的唯一平臺線程中運行。這里只是做一個實驗性例子,使用反射或者MethodHandle對未穩定的API進行操作以后有很大概率會出現兼容性問題,不建議在生產環境這樣操作,待虛擬線程完成預覽正式發布后應該會提供對應的API讓開發者設置自定義執行器。

      資源占用評估

      平臺線程(單個實例)的資源占用:

      • 通常是預留1 mb線程??臻g,額外需要16 kb操作系統核心數據源結構
      • 對于已經啟動的平臺線程實例,會占據2000+ byte數據,包括VM中平臺線程的元數據等

      虛擬線程(單個實例)的資源占用:

      • Continuation棧會占據數百byte到數百kb內存空間
      • 虛擬線程實例會占據200 - 240 byte

      兩者對比一看,理論上得知單個平臺線程占用的內存空間至少是kb級別的,而通常單個虛擬線程實例占用的內存空間是byte級別,兩者的內存占用相差1個數量級。這里可以使用NMT參數和jcmd命令進行驗證,見下面的代碼和結果。

      public class PlatformThreadFootprint {
      
          private static final int COUNT = 100000;
      
          /**
           * platform thread footprint -Xms1g -Xmx1g -XX:NativeMemoryTracking=detail
           *
           * @param args args
           */
          public static void main(String[] args) throws Exception {
              for (int i = 0; i < COUNT; i++) {
                  new Thread(() -> {
                      try {
                          Thread.sleep(Long.MAX_VALUE);
                      } catch (Exception e) {
                          e.printStackTrace();
                      }
                  }, String.valueOf(i)).start();
              }
              Thread.sleep(Long.MAX_VALUE);
          }
      }
      

      上面的程序運行后啟動10w平臺線程,通過NMT參數和jcmd命令查看所有線程占據的內存空間如下:

      vt-source-code-3

      可見總已提交內存大部分來自創建的平臺線程,這些平臺線程占用了大概613 mb空間,它們的總線程棧空間占用約為5862 mb,兩者加起來占據總使用內存(7495 mb)的86 %以上。用類似的方式編寫運行虛擬線程的程序:

      public class VirtualThreadFootprint {
      
          private static final int COUNT = 100000;
      
          /**
           * virtual thread footprint -Xms10m -Xmx100m -XX:NativeMemoryTracking=detail
           *
           * @param args args
           */
          public static void main(String[] args) throws Exception {
              for (int i = 0; i < COUNT; i++) {
                  Thread.startVirtualThread(() -> {
                      try {
                          Thread.sleep(Long.MAX_VALUE);
                      } catch (Exception e) {
                          e.printStackTrace();
                      }
                  });
              }
              Thread.sleep(Long.MAX_VALUE);
          }
      }
      

      上面的程序運行后啟動10w虛擬線程,同樣通過NMT參數和jcmd命令查看:

      vt-source-code

      這里有意縮小虛擬線程程序的最小最大堆內存為-Xms10m -Xmx100m,程序依然正常運行,并且堆內存的實際占用量和總內存的實際占用量都不超過200 mb,由此可以證明虛擬線程確實在極大量創建的前提下不會占據大量內存空間(這里暫時沒有考慮到復雜調用情況下Continuation棧占據內存空間大小,不過已經大幅度優于平臺線程)。

      局限性

      當前的虛擬線程實現有如下局限性:

      • Continuation棧存在native方法或者外部函數(FFMAPI,見JEP-424)調用不能進行yield操作
      • 當持有監視器或者等待監視器的時候(一般是使用了synchronized關鍵字或者Object.wait())不能進行yield操作
      • Continuation棧存在native方法調用、外部函數調用或者當持有監視器或者等待監視器的時候,虛擬線程會Pin到平臺線程,導致虛擬線程無法從平臺線程卸載,雖然不會影響程序正確執行,但是會影響性能,也就是如果這些虛擬線程是可復用的,永遠無法切換到其運載線程,導致任務切換開銷永久性增大
      • 虛擬線程可以像平臺線程一樣使用ThreadLocal,但是由于一般虛擬線程實例是會大量創建的,ThreadLocal本質是哈希表的一個鏈接,創建大量哈希表會帶來額外的內存開銷(這一點不算局限性,更接近于開發建議,建議使用虛擬線程的時候禁用ThreadLocal

      對于前三點出現的情況,一些文檔中提到會導致虛擬線程無法從運載線程卸載,這個現象稱為Pinned Thread,通過系統參數jdk.tracePinnedThreads可以打印具體的Pinned Thread棧,從而定位到哪些虛擬線程被固定到哪些平臺線程中。對于這個問題,目前可以通過編程規范去規避,也就是虛擬線程執行的任務盡量規避調用native方法或者外部函數,對于synchronized關鍵字可以使用JUC中的鎖API進行替換,例如ReentrantLock等等。

      適用場景

      基于繼承的特性,通過對java.lang.Thread(虛擬線程的超類)薄封裝,也就是基于ThreadAPI可以直接透明地實現虛擬線程的掛起和恢復等操作,對使用者屏蔽了虛擬線程復雜的調度實現。由于虛擬線程實例占據的資源比較少,可以大量地創建而無須考慮池化,因此滿足類似下面的使用場景:

      • 大批量的處理時間較短的計算任務
      • 大量的IO阻塞等待處理
      • thread-per-request風格的應用程序,例如主流的Tomcat線程模型或者基于類似線程模型實現的SpringMVC框架等等

      JUC親和性

      還是基于繼承的特性,java.lang.VirtualThreadjava.lang.Thread子類型,因此使用到Thread類型的地方原則上可以透明使用VirtualThread,就是說通過下面的形式可以池化虛擬線程

      public class VirtualThreadPool {
          
          public static void main(String[] args) throws Exception {
              ThreadFactory factory = Thread.ofVirtual().allowSetThreadLocals(false)
                      .name("VirtualFactoryWorker-", 0)
                      .inheritInheritableThreadLocals(false)
                      .factory();
              // core = max = 10
              ThreadPoolExecutor fixedVirtualThreadPool
                      = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory);
              fixedVirtualThreadPool.execute(() -> {
                  Thread thread = Thread.currentThread();
                  System.out.printf("線程名稱:%s,是否虛擬線程:%s\n", thread.getName(), thread.isVirtual());
              });
              fixedVirtualThreadPool.shutdown();
              fixedVirtualThreadPool.awaitTermination(5, TimeUnit.SECONDS);
          }
      }
      

      但是前面也提到過:由于虛擬線程本身是輕量級的,在執行計算任務的時候更建議每個任務新創建一個虛擬線程實例,因為池化操作本身是會引入額外開銷。另外,JUC下很多類庫都是基于AQS數據結構實現,而AQS中無論獨占模式還是共享模式,在隊列中等待的節點以及搶占虛擬頭節點的對象本質都是Thread實例,基于這一點來看,AQS也是無縫適配VirtualThread。見下面的例子:

      public class VirtualThreadJuc {
      
          public static void main(String[] args) throws Exception {
              CountDownLatch latch = new CountDownLatch(1);
              Thread.startVirtualThread(() -> {
                  try {
                      System.out.println("before await");
                      latch.await();
                      System.out.println("after await");
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  Thread thread = Thread.currentThread();
                  System.out.printf("線程名稱:%s,是否虛擬線程:%s\n", thread.getName(), thread.isVirtual());
              });
              Thread.sleep(1000);
              System.out.println("main count down");
              latch.countDown();
              Thread.sleep(Long.MAX_VALUE);
          }
      }
      
      // 運行結果
      before await
      main count down
      after await
      線程名稱:,是否虛擬線程:true
      

      總的來說,VirtualThreadJUC既有類庫是親和的,大部分類庫可以在虛擬線程任務中使用,并且不建議池化虛擬線程而是使用per task per virtual thread的編程模式。

      小結

      本文詳細介紹了平臺線程與虛擬線程的區別、虛擬線程實現原理、虛擬線程的源碼實現以及關于虛擬線程的一些探討,希望能夠幫到讀者理解Java虛擬線程。在JDK19中,虛擬線程是預覽特性,希望這個特性能夠早點發布GA版本,這樣才能填補Java協程這一塊短板,也能讓大量基礎API和框架進行一輪革新。

      參考資料:

      • JEP-425https://openjdk.org/jeps/425
      • JVMLS2018.pdf(這份PDF文檔詳細地介紹了Loom項目的目標和實現方式):https://cr.openjdk.java.net/~rpressler/loom/loom/JVMLS2018.pdf

      (本文完 e-a-20221005 c-3-d)

      posted @ 2022-10-07 02:13  throwable  閱讀(11506)  評論(8)    收藏  舉報
      主站蜘蛛池模板: 无码a∨高潮抽搐流白浆| 亚洲 制服 丝袜 无码| 无码人妻aⅴ一区二区三区69岛| 97久久综合亚洲色hezyo| 午夜国产小视频| 国产成人无码网站| 无码av中文字幕久久专区| 一级片免费网站| 99中文字幕国产精品| 日韩狼人精品在线观看| 鲁一鲁一鲁一鲁一澡| 午夜国产精品福利一二| 久久婷婷大香萑太香蕉AV人| 亚洲精品久久久久久无码色欲四季| 国产精品国产高清国产一区| 欧美人与性动交ccoo| 亚洲不卡一区二区在线看| 国内极度色诱视频网站| 亚洲精品天堂一区二区| 在线日韩日本国产亚洲| 又爽又大又黄a级毛片在线视频| 亚洲卡1卡2卡3精品| 久久天天躁夜夜躁狠狠85| 亚洲成人av综合一区| 久久精品亚洲精品国产色婷 | 蜜桃久久精品成人无码av| 自拍亚洲综合在线精品| 亚洲一区久久蜜臀av| 国产精品亚洲综合色区丝瓜| 亚洲日本韩国欧美云霸高清| 国产高清自产拍av在线| 少妇高潮太爽了在线视频| 国产成人亚洲老熟女精品| 玩弄放荡人妻少妇系列| 洪湖市| 日韩中文字幕免费在线观看| 久久香蕉国产线熟妇人妻| 激情综合网激情综合网五月| 白丝乳交内射一二三区| 免费现黄频在线观看国产| 午夜夜福利一区二区三区|