<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      技術面:Java并發(線程池、ForkJoinPool)

      線程池是如何實現的?

      線程池

      線程池,就是提前創建好一批線程,然后存儲在線程池中,當有任務需要執行的時候,從線程池中選一個線程來執行。可以頻繁的避免線程的創建和銷毀的開銷。

      線程池是基于池化思想的一種實現,本質就是提前準備好一批資源,以備不時之需,在資源有限的情況下,可以大大的提高資源的利用率,提升性能。

      還有一些其他基于池化思想的實現:

      • 連接池
      • 內存池
      • 對象池

      Java中線程池各接口與實現類之間的關系
      在這里插入圖片描述

      線程池的實現原理

      除了ForkJoinPool以外,上圖中,無論是通過接口還是實現類來創建線程池,最終都是通過ThreadPoolExecutor的構造方法來實現的。

      在構造方法中參數,可以反應出這個對象的數據結構,就是下面這些參數
      在這里插入圖片描述

      • corePoolSize,核心線程數數量,線程池中正式員工的數量。
      • maximumPoolSize,最大線程數數量,線程池中,正式員工與臨時工(非核心線程)兩者總共最大的數量
      • workQueue,任務等待隊列,當核心線程數量的線程任務處理不過來的時候,會先將任務放到這個隊列里面進行等待,直到隊列滿了,然后再有任務就繼續創建線程,直到創建線程的數量到達maximumPoolSize數量。
      • keepAliveTime,非核心線程的最大空閑時間,就是當沒有任務需要處理的時候,臨時工可以待多久,超過這個時間就會被解雇
      • threadFactory,創建線程的工程,可以統一處理創建線程的屬性。可以理解為每個公司對員工的要求都不一樣,可以在這里指定員工手冊。
      • handler,線程池拒絕策略,當核心線程數,處理不過來任務,等待隊列里也滿了,算上臨時工線程數量也已經到了maxmumPoolSize了,還有任務提交過來,這個時候可以配置的拒絕任務的策略。默認情況下是拋出異常,告訴任務提交者,“忙不過來了,老子不干了!”

      拒絕策略JDK提供的有這么幾種:

      1. AbortPolicy(默認策略)
        拋出 RejectedExecutionException 異常,立即拒絕任務。
        適用場景:任務必須被處理,拒絕后需人工干預。
      2. DiscardPolicy
        靜默丟棄任務,不拋異常。
        適用場景:非關鍵任務(如日志記錄、統計)。
      3. DiscardOldestPolicy
        丟棄隊列中最舊的任務,再嘗試提交新任務。
        適用場景:實時性要求高的任務(如實時計算)。
      4. CallerRunsPolicy
        由調用線程(提交任務的線程)直接執行任務。
        適用場景:降低任務提交速度,緩沖系統壓力。

      在這里插入圖片描述

      Worker

      ThreadPoolExecutor里面還有一個重要的內部類Worker,這個Worker的概念也是比較重要的。它實現了Runnable接口,并且每個Worker對象包含一個任務和一個線程。

      • 任務(Runnable firstTask),這個任務就是我們提交給線程池要執行的那個任務(Runnable類型),就是說一個任務想要被線程池執行就必須變成一個Worker
      • 線程Thread thead),每個Worker會有一個線程來執行,這個線程是有ThreadPoolExecutor來進行管理的。

      在這里插入圖片描述

      Worker被創建時,它會通過構造函數接收一個 Runnable 類型的任務。但是Worker并不是執行完這個任務就結束了,而是會繼續從任務隊列中取任務并執行,直到線程池關閉或任務隊列為空。

      Worker 中的Thread 對象,表示實際執行任務的工作線程。
      每個 Worker都會擁有一個工作線程,工作線程會執行run()方法中的任務。

      在 run()方法中,Worker 反復執行 runTask(firstTask)來執行任務。執行完一個任務后, Worker 會繼續檢查線程池的狀態(runStateAtLeast(ctl.get(),SHUTDOWN))并獲取新的任務,直到線程池關閉。

      ThreadPoolExecutor中有一個字段,workers類型是HashSet<Worker>,專門用來存儲工作線程集合,負責管理所有工作線程的生命周期,無論是想停止線程池還是說結束線程池,都會檢查workers集合中是否還有正在運行的工作線程。

      線程池執行任務

      下面我們來看一下線程池是如何執行任務的,直接貼源碼,因為用線程池執行任務,無論是使用execute方法還是使用submit方法,最終都是會調用execute方法,所以直接貼出execute方法的源碼

      public void execute(Runnable command) {
           if (command == null)
               throw new NullPointerException();
           /*
            * Proceed in 3 steps:
            *
            * 1. If fewer than corePoolSize threads are running, try to
            * start a new thread with the given command as its first
            * task.  The call to addWorker atomically checks runState and
            * workerCount, and so prevents false alarms that would add
            * threads when it shouldn't, by returning false.
            *
            * 2. If a task can be successfully queued, then we still need
            * to double-check whether we should have added a thread
            * (because existing ones died since last checking) or that
            * the pool shut down since entry into this method. So we
            * recheck state and if necessary roll back the enqueuing if
            * stopped, or start a new thread if there are none.
            *
            * 3. If we cannot queue task, then we try to add a new
            * thread.  If it fails, we know we are shut down or saturated
            * and so reject the task.
            * 這段注釋就是介紹的線程池的執行流程,后面有翻譯成中文的說明。
            */
           int c = ctl.get();
           // 1. 如果當前運行的線程數少于 corePoolSize,嘗試啟動一個新線程并將其給定的任務作為第一個任務。
           if (workerCountOf(c) < corePoolSize) {
           // 調用 addWorker 方法會原子性地檢查 runState 和 workerCount,通過返回 false 來防止在不應該添加線程時的誤報。
               if (addWorker(command, true))
                   return;
               c = ctl.get();
           }
           // 2. 如果任務可以成功排隊,那么我們仍然需要再次檢查是否應該添加一個線程(因為自上次檢查以來已有線程死亡)
           if (isRunning(c) && workQueue.offer(command)) {
               int recheck = ctl.get();
               // 或者在此方法進入后線程池已關閉。因此我們需要重新檢查狀態,如果停止則回滾入隊操作,或者在沒有線程的情況下啟動新線程。
               if (! isRunning(recheck) && remove(command))
                   reject(command);
               else if (workerCountOf(recheck) == 0)
                   addWorker(null, false);
           }
           // 3. 如果我們無法將任務加入隊列,則嘗試添加一個新線程。如果失敗,我們知道線程池已經關閉或飽和,因此拒絕該任務。
           else if (!addWorker(command, false))
               reject(command);
       }
      

      執行步驟

      1. 如果當前運行的線程數少于 corePoolSize,嘗試啟動一個新線程并將其給定的任務作為第一個任務。調用 addWorker 方法會原子性地檢查 runStateworkerCount,通過返回 false 來防止在不應該添加線程時的誤報。
      2. 如果任務可以成功排隊,那么我們仍然需要再次檢查是否應該添加一個線程(因為自上次檢查以來已有線程死亡)或者在此方法進入后線程池已關閉。因此我們需要重新檢查狀態,如果停止則回滾入隊操作,或者在沒有線程的情況下啟動新線程。
      3. 如果我們無法將任務加入隊列,則嘗試添加一個新線程。如果失敗,我們知道線程池已經關閉或飽和,因此拒絕該任務。

      通過上面這段源碼,我們可以看出來,最核心的,用來執行任務的方法就是addWorker那就也看看addWorker是如何執行的。

      private boolean addWorker(Runnable firstTask, boolean core) {
          // 檢查代碼邏輯省略
          boolean workerStarted = false;
          boolean workerAdded = false;
          Worker w = null;
          try {
              //1. 創建一個worker對象,firstTask作為傳遞給worker的任務。
              w = new Worker(firstTask);
              // 2. 創建完worker對象后,會從線程池里面拿出一個線程用來執行worker
              final Thread t = w.thread;
              if (t != null) {
              // 3. 由于線程池需要保持對工作線程集合(workers)的同步訪問,線程池會用一個鎖來保護執行任務的邏輯。
                  final ReentrantLock mainLock = this.mainLock;
                  mainLock.lock();
                  try {
                      // Recheck while holding lock.
                      // Back out on ThreadFactory failure or if
                      // shut down before lock acquired.
                      int c = ctl.get();
      				// 4. 先判斷線程池是否處于運行狀態,
      				// 若線程池沒有關閉且任務有效,則允許添加工作線程。
                      if (isRunning(c) ||
                          (runStateLessThan(c, STOP) && firstTask == null)) {
                          // 5. 確保新創建出來的線程狀態是NEW,即尚未開始執行。
                          if (t.getState() != Thread.State.NEW)
                              throw new IllegalThreadStateException();
                          // 6. 將worker對象添加到工作線程集合(workers)中。
                          workers.add(w);
                          workerAdded = true;
                          int s = workers.size();
                          if (s > largestPoolSize)
                          // 7. 更新largestPoolSize,
                          // 記錄線程池中最大線程數,方便監控線程池負載情況。
                              largestPoolSize = s;
                      }
                  } finally {
                      mainLock.unlock();
                  }
                  // 8. worker對象添加到工作線程集合成功,開始啟動工作線程執行worker。
                  if (workerAdded) {
                      container.start(t);
                      workerStarted = true;
                  }
              }
          } finally {
              if (! workerStarted)
                  addWorkerFailed(w);
          }
          return workerStarted;
      }
      
      1. 創建一個worker對象,firstTask作為傳遞給worker的任務。
      2. 創建完worker對象后,會從線程池里面拿出一個線程用來執行worker,如果能從線程池中拿到線程,接下來就用這個線程開始執行worker
      3. 由于線程池需要保持對工作線程集合(workers)的同步訪問,線程池會用一個鎖來保護執行任務的邏輯。
      4. 先判斷線程池是否處于運行狀態,若線程池沒有關閉且任務有效,則允許添加工作線程。
      5. 確保新創建出來的線程狀態是NEW,即尚未開始執行。
      6. worker對象添加到工作線程集合(workers)中。
      7. 更新largestPoolSize,記錄線程池中最大線程數,方便監控線程池負載情況。
      8. worker對象添加到工作線程集合成功,開始啟動工作線程執行worker

      那么線程池的線程數具體應該設置成多少呢?

      這個問題,面試官一般不是想聽到你給出一個具體的數值,而是想聽到的是你的一個思考過程,就算你回答出來了一個具體數值,也會問你為什么是這個值。

      影響線程池線程數量的因素

      1. CPU核數,多核處理器當然是每個CPU運行一個線程最高效,但是隨著技術的發展現在很多的CPU都有了超線程技術,也就是利用特殊的硬件指令,將兩個邏輯內核模擬成物理處理器,單核處理器可以讓線程并行執行,所以會看到有“4核8線程的CPU”。
      2. 任務類型
        • CPU密集型,這種任務的核心線程數最好設置成cpu數的1至1.5倍
        • I/O密集型,有阻塞有等待的任務,例如:數據庫連接,文件操作,網絡傳輸等,可以將核心線程數量設置成cpu數量的2倍,利用阻塞時間讓其他CPU去干更多的事情。
      3. JVM和系統資源
        • 內存限制,每個線程占用一定的內存,線程過多有內存溢出的風險。
        • 操作系統限制,通常操作系統對單個進程可創建的線程也是有數量限制的,數量過多會降低系統效率。
      4. 并發量與響應時間
        • 高并發場景:增加線程數,但需避免資源競爭。
        • 快速響應需求:減少任務等待時間,適當增加線程數或隊列容量。

      具體該怎么設置線程數量呢?

      網上流傳著一些固定的公式來告訴大家如何配置核心線程數量。

      就是基于簡單因素考慮,在主要參考CPU和任務類型時:

      • CPU密集型任務,線程池的線程數量配置為(CPU數量+1)
      • I/O密集型任務,線程池的線程數量配置為(2*CPU數量)+1

      由于無法根據具體的指標判斷任務類型到底是CPU密集型還是I/O密集型,所以又有了,下面一個公式:
      在這里插入圖片描述

      等段時間,線程執行過程中等待外部操作完成的時間。在等待時間內,線程通常不占用CPU資源。
      計算時間,通常指線程實際計算處理的時間。

      不建議直接套用公式

      雖然網上流傳了這些公式,但是并不是這個公式就是萬能呢,很多時候我們的任務在執行的時候要考慮的因素有很多。而且現在很多服務器都是虛擬機,并不能真正的發揮出物理機的全部能力,所以很多依賴因素也是不準確的。

      所以建議用以下的方式來進行配置:

      1. 可以在剛上線的時候,先根據公式大致的設置一個數值,然后再根據你自己的實際業務情況,以及不斷的壓測結果,再不斷調整,最終達到一個相對合理的值。
      2. 也可以結合監控工具(如PrometheusGrafana)實時檢測線程池的線程數量,然后再通過ThreadPoolExecutor.setCorePoolSize()setMaximumPoolSize() 動態修改參數。一些成熟的動態線程池框架,比如dynamicTp,不僅支持線程數調整,還支持隊列容量和拒絕策略的調整。

      ForkJoinPool和ThreadPoolExecutor有什么區別?

      ForkJoinPool是基于工作竊取(Work-Stealing)算法實現的線程池,ForkJoinPool 中每個線程都有自己的工作隊列,用于存儲待執行的任務。當一個線程執行完自己的任務之后,會從其他線程的工作隊列中竊取任務執行,以此來實現任務的動態均衡和線程的利用率最大化。

      ThreadPoolExecutor 是基于任務分配(Task-Assignment)算法實現的線程池,ThreadPoolExecutor 中線程池中有一個共享的工作隊列,所有任務都將提交到這個隊列中。線程池中的線程會從隊列中獲取任務執行,如果隊列為空,則線程會等待,直到隊列中有任務為止。

      ForkJoinPool的任務調度是通過fork()拆分,再通過join() 合并結果,支持遞歸分治。
      默認線程數等于 CPU 核心數(Runtime.getRuntime().availableProcessors()),支持動態調整。
      通過 ForkJoinTask 的異常傳播機制處理子任務異常。

      在這里插入圖片描述
      ForkJoinPool 中的工作線程是一種特殊的線程,與普通線程池中的工作線程有所不同。
      它們會自動地創建和銷毀,以及自動地管理線程的數量和調度。
      這種方式可以降低線程池的管理成本,提高線程的利用率和并行度。

      提交任務方式與使用場景

      提交任務

      特性 ForkJoinPool ThreadPoolExecutor
      任務類型 必須繼承 ForkJoinTask 的子類(如 RecursiveActionRecursiveTask)。 提交普通 RunnableCallable 任務。
      任務提交方法 使用 submit(ForkJoinTask)invoke(ForkJoinTask) 使用 execute(Runnable)submit(Callable/Runnable)
      任務依賴性 任務間存在依賴關系
      (需合并子任務結果)。
      任務間獨立,無依賴關系。

      使用場景

      ForkJoinPool ThreadPoolExecutor
      并行計算(如數組求和、歸并排序);
      分治算法(如矩陣乘法);
      - Java 并行流(parallelStream());
      網絡請求處理;
      -文件批量處理;
      定時任務(如 ScheduledThreadPoolExecutor);

      CompletableFuture底層就是用ForkJoinPool來實現。

      代碼示例

      public class SumTask extends RecursiveTask<Long> {
          private final long[] array;
          private final int start, end;
          private static final int THRESHOLD = 1000;
      
          public SumTask(long[] array, int start, int end) {
              this.array = array;
              this.start = start;
              this.end = end;
          }
      
          @Override
          protected Long compute() {
              if (end - start <= THRESHOLD) {
                  long sum = 0;
                  for (int i = start; i < end; i++) sum += array[i];
                  return sum;
              } else {
                  int mid = (start + end) / 2;
                  SumTask left = new SumTask(array, start, mid);
                  SumTask right = new SumTask(array, mid, end);
                  left.fork(); // 異步執行左子任務
                  return left.join() + right.compute(); // 合并結果
              }
          }
      }
      
      public static void main(String[] args) {
          // 使用 ForkJoinPool
          ForkJoinPool pool = new ForkJoinPool();
          long[] data = new long[1000000];
          // 初始化 data
          for(int i=0;i<data.length;i++){
              data[i] = i;
          }
          // 執行任務
          Long result = pool.invoke(new SumTask(data, 0, data.length));
      }
      
      posted @ 2025-09-05 21:37  紀莫  閱讀(264)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 激情综合网五月婷婷| 国产午夜福利精品片久久| 久久99国产亚洲高清观看首页| 精品国产一区二区三区性色| 久久精品国产亚洲av麻豆长发| 国产精品三级爽片免费看| 日韩亚洲精品中文字幕| 久久精品国产99国产精品澳门| 人人妻人人狠人人爽| 青青国产揄拍视频| 黑人巨大亚洲一区二区久| 在线看免费无码的av天堂| 国产精品天堂蜜av在线播放| 久久精品国产久精国产| 日本熟妇人妻一区二区三区| √天堂中文www官网在线| 精品乱码一区二区三四五区| 色吊丝免费av一区二区| 午夜成人精品福利网站在线观看| 武装少女在线观看高清完整版免费| 韩日午夜在线资源一区二区| 国产精品福利自产拍久久| 色噜噜一区二区三区| 日日摸夜夜添狠狠添欧美| 永久免费的av在线电影网| 亚洲精品岛国片在线观看| 久久大香萑太香蕉av黄软件 | 无码专区视频精品老司机| 艳妇乳肉豪妇荡乳xxx| 精品国产第一国产综合精品| 午夜天堂av天堂久久久| 国产乱码精品一区二区三上| 成人无码午夜在线观看| 国产精品午夜福利资源| 国产美女深夜福利在线一 | 边吃奶边添下面好爽| 高清无码午夜福利视频| 欧洲亚洲国内老熟女超碰| 日韩精品无码区免费专区| 亚洲爆乳WWW无码专区| 国产亚洲精品一区二区无|