<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      ThreadPoolExecutor-初識(shí)

      初識(shí)

      ?? NOTE

      《初識(shí)》

      正所謂“先有孫子后有爺”,想研究threadPoolExecutor 我們先拔一拔它的上層。

      image-20240905151547969


      可以看出最上級(jí)的接口是Executor, AutoCloseable,分別提供execute和close方法,ExecutorService繼承于這兩個(gè)接口,我們常用的submit就是這個(gè)類所提供的,抽象方法AbstractExecutorService去實(shí)現(xiàn)ExecutorService, 而我們的主角ThreadPoolExecutor是繼承于AbstractExecutorService。

      了解祖孫三代后,需要思考幾個(gè)問題

      • 既然叫線程池,他是如何管理線程的?如何創(chuàng)建?
      • 八股文說

      線程池是一種基于池化思想管理線程的工具,使用線程池可以減少創(chuàng)建銷毀線程的開銷,避免線程過多導(dǎo)致系統(tǒng)資源耗盡。充分利用池內(nèi)計(jì)算資源,等待分配并發(fā)執(zhí)行任務(wù),提高系統(tǒng)性能和響應(yīng)能力。

      在業(yè)務(wù)系統(tǒng)開發(fā)過程中,線程池有兩個(gè)常見的應(yīng)用場(chǎng)景,分別是:快速響應(yīng)用戶請(qǐng)求和快速處理批量任務(wù)

      那么線程池如何做到減少創(chuàng)建銷毀線程?是如何保存線程的?

      • 剛才看到的執(zhí)行方法execute和submit有什么區(qū)別?
      • 線程池線程安全是如何實(shí)現(xiàn)的?

      Tips: 任何未知的東西都需要保持疑問,帶著問題去探索成功率會(huì)更高。


      交織

      帶著上面的問題,默默的打開了源碼。

      核心成員

      找到最全的一個(gè)構(gòu)造器 分析一下

      
      /**
           * corePoolSize:在線程池中的線程數(shù)量,即使他們處于空閑的狀態(tài)
           * maximumPoolSize:池中允許的最大線程數(shù)
           * keepAliveTime:當(dāng)線程數(shù)大于核心時(shí),這是多余的空閑線程在終止之前等待新任務(wù)的最長時(shí)間。
           * unit:keepAliveTime的時(shí)間單位。
           * workQueue:用于在執(zhí)行任務(wù)之前保存任務(wù)的隊(duì)列。此隊(duì)列將僅保存可運(yùn)行提交的任務(wù)執(zhí)行方法。
           * threadFactory:執(zhí)行器創(chuàng)建新線程時(shí)要使用的工廠
           * handler:由于達(dá)到線程邊界和隊(duì)列容量而阻塞執(zhí)行時(shí)要使用的處理程序
      */
          public ThreadPoolExecutor(int corePoolSize,
                                    int maximumPoolSize,
                                    long keepAliveTime,
                                    TimeUnit unit,
                                    BlockingQueue<Runnable> workQueue,
                                    ThreadFactory threadFactory,
                                    RejectedExecutionHandler handler) {
              if (corePoolSize < 0 ||
                  maximumPoolSize <= 0 ||
                  maximumPoolSize < corePoolSize ||
                  keepAliveTime < 0)
                  throw new IllegalArgumentException();
              if (workQueue == null || threadFactory == null || handler == null)
                  throw new NullPointerException();
              this.corePoolSize = corePoolSize;
              this.maximumPoolSize = maximumPoolSize;
              this.workQueue = workQueue;
              this.keepAliveTime = unit.toNanos(keepAliveTime);
              this.threadFactory = threadFactory;
              this.handler = handler;
      
              String name = Objects.toIdentityString(this);
              this.container = SharedThreadContainer.create(name);
          }
      

      2.1 corePoolSize

      注解描述的是,當(dāng)線程池接收新的任務(wù)進(jìn)來,即使存在空閑的線程也需要?jiǎng)?chuàng)建新的線程,除非線程數(shù)量達(dá)到了corePoolSize。線程池默認(rèn)的線程數(shù)量是空的,而是等任務(wù)進(jìn)來才會(huì)去創(chuàng)建線程。但內(nèi)置了prestartAllCoreThreads()和prestartCoreThread()兩個(gè)預(yù)處理方法,即使沒有任何也會(huì)預(yù)先創(chuàng)建線程數(shù)量為corePoolSize,超過后方到阻塞隊(duì)列里。

      2.2 maximumPoolSize

      注釋描述為:池中允許的最大線程數(shù)。會(huì)有人問剛才不是說超過corePoolSize就放到了隊(duì)列中了嗎,怎么又出現(xiàn)最大的概念? 這里關(guān)系可以理解為:一個(gè)工廠5個(gè)工人,一個(gè)工人只能同時(shí)做一個(gè)雞哥牌玩偶,玩偶供不應(yīng)求 一天需要10個(gè)玩偶,5個(gè)工人在做 另外5個(gè)就排隊(duì)唄,但突然增加到13個(gè)了。這是老板決定臨時(shí)雇3個(gè)工人但最多只能3個(gè),這里3+5就是最大線程數(shù)。過段時(shí)間不忙了 5個(gè)工人足以支撐的時(shí)候3個(gè)工人就辭退了 這里的時(shí)間就是下面說到的線程存活時(shí)間

      2.3 keepAliveTime

      當(dāng)線程數(shù)大于核心時(shí),這是多余的空閑線程在終止之前等待新任務(wù)的最長時(shí)間。即:外包人員辭退時(shí)間

      2.4 workQueue

      runnableTaskQueue(任務(wù)隊(duì)列):用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列。可以選擇以下幾個(gè)阻塞隊(duì)列

      • ArrayBlockingQueue:是一個(gè)基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,此隊(duì)列按 FIFO(先進(jìn)先出)原則對(duì)元素進(jìn)行排序
      • LinkedBlockingQueue:一個(gè)基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,此隊(duì)列按FIFO (先進(jìn)先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個(gè)隊(duì)列
      • SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個(gè)隊(duì)列
      • PriorityBlockingQueue:一個(gè)具有優(yōu)先級(jí)得無限阻塞隊(duì)列

      2.5 handler

      • CallerRunsPolicy:拒絕任務(wù)的處理程序,它直接在execute方法的調(diào)用線程中運(yùn)行拒絕的任務(wù),除非執(zhí)行器已關(guān)閉,在這種情況下,任務(wù)將被丟棄。
      • AbortPolicy:引發(fā)RejectedExecutionException的被拒絕任務(wù)的處理程序。這是ThreadPoolExecutor和ScheduledThreadPoolExecutor的默認(rèn)處理程序
      • DiscardPolicy:直接丟棄,其他啥都沒有
      • DiscardOldestPolicy:當(dāng)觸發(fā)拒絕策略,只要線程池沒有關(guān)閉的話,丟棄阻塞隊(duì)列 workQueue 中最老的一個(gè)任務(wù),并將新任務(wù)加入。

      2.6 基本屬性

      // 初始化線程池控制變量,初始狀態(tài)為RUNNING且沒有工作線程
      private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
      
      // 計(jì)算工作線程計(jì)數(shù)位數(shù),共31位
      private static final int COUNT_BITS = Integer.SIZE - 3;
      
      // 工作線程計(jì)數(shù)掩碼,用于獲取ctl中的工作線程數(shù)
      private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
      
      // 狀態(tài)存儲(chǔ)在高3位中
      // 線程池運(yùn)行狀態(tài),表示線程池正在運(yùn)行
      private static final int RUNNING    = -1 << COUNT_BITS;
      
      // 線程池關(guān)閉狀態(tài),表示不再接受新任務(wù)但未執(zhí)行完的任務(wù)繼續(xù)執(zhí)行
      private static final int SHUTDOWN   =  0 << COUNT_BITS;
      
      // 線程池停止?fàn)顟B(tài),表示不再接受新任務(wù)并且立即中斷所有正在執(zhí)行的任務(wù)
      private static final int STOP       =  1 << COUNT_BITS;
      
      // 線程池整理狀態(tài),表示所有任務(wù)已終止,工作線程正在執(zhí)行terminated()
      private static final int TIDYING    =  2 << COUNT_BITS;
      
      // 線程池終止?fàn)顟B(tài),表示terminated()已完成
      private static final int TERMINATED =  3 << COUNT_BITS;
      
      // 解包和打包c(diǎn)tl值
      // 獲取ctl中的運(yùn)行狀態(tài)
      private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
      
      // 獲取ctl中的工作線程數(shù)
      private static int workerCountOf(int c)  { return c & COUNT_MASK; }
      
      // 根據(jù)運(yùn)行狀態(tài)和工作線程數(shù)創(chuàng)建ctl值
      private static int ctlOf(int rs, int wc) { return rs | wc; }
      
      // 不需要解包c(diǎn)tl的位字段訪問器
      // 判斷運(yùn)行狀態(tài)是否小于給定狀態(tài)
      private static boolean runStateLessThan(int c, int s) {
          return c < s;
      }
      
      // 判斷運(yùn)行狀態(tài)是否大于等于給定狀態(tài)
      private static boolean runStateAtLeast(int c, int s) {
          return c >= s;
      }
      
      // 判斷線程池是否處于運(yùn)行狀態(tài)
      private static boolean isRunning(int c) {
          return c < SHUTDOWN;
      }
      
      

      這里ctl 變量是一個(gè) AtomicInteger 類型的變量,它用于同時(shí)保存線程池的狀態(tài)和當(dāng)前活動(dòng)的工作線程數(shù)。具體來說:
      高3位:表示線程池的狀態(tài)。
      低31位:表示當(dāng)前活躍的工作線程數(shù)。

      可以理解為他同時(shí)維護(hù)了線程池的狀態(tài)和工作線程數(shù)量,至于具體怎么算的不重要,后面還會(huì)再次聊這個(gè)。

      這幾個(gè)屬性和方法在后面會(huì)經(jīng)常遇見,更是保證線程安全的手段。

      工作流程

      說完基本屬性聊一下線程池工作流程:

      img


      1. 如果運(yùn)行的線程數(shù)少于corePoolSize的時(shí)候,直接創(chuàng)建新線程來處理任務(wù)。即使線程池中的其他任務(wù)是空閑的(會(huì)默認(rèn)先把corePoolSize填充滿,保證核心線程數(shù))
      2. 當(dāng)corePoolSize滿了的時(shí)候,新任務(wù)提交時(shí),會(huì)被推入任務(wù)隊(duì)列進(jìn)行等待。完成任務(wù)的核心線程會(huì)從任務(wù)隊(duì)列中進(jìn)行領(lǐng)取。
      3. 當(dāng)任務(wù)隊(duì)列被任務(wù)推滿,并且繼續(xù)被推新任務(wù)時(shí),會(huì)創(chuàng)建非核心線程。非核心線程的數(shù)量不能超過maxinumPoolSize,被創(chuàng)建后立即領(lǐng)取推入的新任務(wù)進(jìn)行執(zhí)行
      4. 非核心線程完成新任務(wù),處于空閑時(shí),會(huì)從任務(wù)隊(duì)列中領(lǐng)取任務(wù)進(jìn)行執(zhí)行。如果沒領(lǐng)取到,則會(huì)一直處于空閑狀態(tài),直到keepAliveTime耗盡,該非核心線程會(huì)被銷毀。
      5. 如果corePoolSize與maxinumPollSize相同的話,那么創(chuàng)建的線程池的大小是固定的,此時(shí)如果有新任務(wù)提交,而且workQueue還沒滿,就把請(qǐng)求放到workQueue中,等待有空閑的線程去workQueue中取出任務(wù)后在執(zhí)行(如果corePoolSize與maxinumPollSize相同,說明不允許在線程池中創(chuàng)建新的線程了,讓任務(wù)在隊(duì)列中等待)
      6. 如果運(yùn)行的線程數(shù)量大于maxinumPollSize,如果workQueue也已經(jīng)滿了,會(huì)根據(jù)指定的拒絕策略來處理該任務(wù)

      判斷順序:corePoolSize -> workQueue -> maxinumPoolSize

      Untitled diagram-2024-09-05-095907

      這個(gè)做個(gè)小測(cè)試

      public class ThreadPoolTest {
      
          public static void main(String[] args) {
              // 創(chuàng)建線程池
              ThreadPoolExecutor executor = new ThreadPoolExecutor(
                      5,                                        // 核心線程數(shù)
                      10,                                       // 最大線程數(shù)
                      200,                                      // 非核心線程數(shù)存活時(shí)間數(shù)
                      TimeUnit.MILLISECONDS,                    // 非核心線程數(shù)存活時(shí)間單位
                      new LinkedBlockingQueue<>(5),             // 阻塞隊(duì)列
                      new ThreadFactoryBuilder().setNamePrefix("測(cè)試").build(), // 線程工廠
                      new ThreadPoolExecutor.AbortPolicy()      // 拒絕策略
              );
      
              // 提交20個(gè)任務(wù)
              for (int i = 0; i < 20; i++) {
                  try {
                      MyTask myTask = new MyTask(i);
                      executor.execute(myTask);
                      printThreadPoolStatus(executor);
                      Thread.sleep(10); // 等待10ms
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              }
      
              // 關(guān)閉線程池
              executor.shutdown();
              System.out.println("-----------------over---------------------");
              printThreadPoolStatus(executor);
          }
      
          private static void printThreadPoolStatus(ThreadPoolExecutor executor) {
              System.out.println("線程池中線程數(shù)目:" + executor.getPoolSize() +
                      ",隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:" + executor.getQueue().size() +
                      ",已執(zhí)行完成的任務(wù)數(shù)目:" + executor.getCompletedTaskCount());
              System.out.println("--------------------------------------------------");
          }
      }
      
      class MyTask implements Runnable {
          private int taskNum;
      
          public MyTask(int num) {
              this.taskNum = num;
          }
      
          @Override
          public void run() {
              System.out.println("正在執(zhí)行task-" + taskNum);
              try {
                  Thread.sleep(10000); // 模擬任務(wù)執(zhí)行時(shí)間
              } catch (InterruptedException e) {
                  Thread.currentThread().interrupt();
                  System.err.println("任務(wù)中斷 Task:" + taskNum + ", 線程:" + Thread.currentThread().getName());
              }
              System.out.println("task-" + taskNum + "執(zhí)行完畢");
          }
      }
      

      看看結(jié)果:

      image-20240910101904976

      CleanShot 2024-09-10 at 10.22.07@2x

      自己分析吧您。

      execute

      /**
       * 執(zhí)行給定的任務(wù),該任務(wù)可能在一個(gè)新線程或現(xiàn)有的線程池線程中執(zhí)行。
       *
       * 如果任務(wù)無法被提交執(zhí)行,可能是由于線程池已被關(guān)閉或其容量已達(dá)到上限,
       * 則任務(wù)將由當(dāng)前的{@link RejectedExecutionHandler}處理。
       *
       * @param command 要執(zhí)行的任務(wù)
       * @throws RejectedExecutionException 如果任務(wù)無法被接受執(zhí)行,則根據(jù)
       *         {@code RejectedExecutionHandler}的策略拋出此異常
       * @throws NullPointerException 如果 {@code command} 為 null
       */
      public void execute(Runnable command) {
          // 檢查任務(wù)是否為 null
          if (command == null)
              throw new NullPointerException();
      
          /*
           * 執(zhí)行過程分為三個(gè)步驟:
           *
           * 1. 如果運(yùn)行中的線程數(shù)少于核心線程數(shù),嘗試啟動(dòng)一個(gè)新的線程來執(zhí)行任務(wù)。
           *    調(diào)用 addWorker 方法原子地檢查 runState 和 workerCount,以防止不必要的線程創(chuàng)建。
           *
           * 2. 如果任務(wù)可以成功入隊(duì),仍然需要再次檢查是否應(yīng)該添加新的線程(因?yàn)橹暗木€程可能已經(jīng)死亡)
           *    或者線程池是否已經(jīng)關(guān)閉。如果線程池已關(guān)閉,則回滾入隊(duì)操作;如果沒有線程,則啟動(dòng)新線程。
           *
           * 3. 如果無法將任務(wù)入隊(duì),則嘗試添加新線程。如果失敗,則表示線程池已關(guān)閉或已滿,拒絕任務(wù)。
           */
      
          // 獲取當(dāng)前控制變量的值
          int c = ctl.get();
      
          // 檢查當(dāng)前運(yùn)行的線程數(shù)是否少于核心線程數(shù)
          if (workerCountOf(c) < corePoolSize) {
              // 嘗試添加新線程并執(zhí)行任務(wù)
              if (addWorker(command, true))
                  return;
              // 再次獲取當(dāng)前控制變量的值
              c = ctl.get();
          }
      
          // 檢查線程池是否處于運(yùn)行狀態(tài),并嘗試將任務(wù)入隊(duì)
          if (isRunning(c) && workQueue.offer(command)) {
              // 再次檢查當(dāng)前控制變量的值
              int recheck = ctl.get();
              // 如果線程池已停止并且任務(wù)已入隊(duì),則回滾入隊(duì)操作并拒絕任務(wù)
              if (!isRunning(recheck) && remove(command))
                  reject(command);
              // 如果沒有線程,則啟動(dòng)新線程
              else if (workerCountOf(recheck) == 0)
                  addWorker(null, false);
          }
          // 如果無法將任務(wù)入隊(duì),則嘗試添加新線程
          else if (!addWorker(command, false))
              reject(command);
      }
      
      

      execute方法的邏輯并不復(fù)雜:

      • 當(dāng)前運(yùn)行的線程數(shù)小于核心線程數(shù),則調(diào)用addworker添加新線程執(zhí)行任務(wù)
      • 檢查線程池是否處于running狀態(tài),如果任務(wù)可以成功入隊(duì),仍然需要再次檢查是否應(yīng)該添加新的線程(因?yàn)橹暗木€程可能已經(jīng)死亡)或者線程池是否已經(jīng)關(guān)閉。如果線程池已關(guān)閉,則回滾入隊(duì)操作;如果沒有線程,則啟動(dòng)新線程。
      • 如果無法將任務(wù)入隊(duì),則嘗試添加新線程。如果失敗,則表示線程池已關(guān)閉或已滿,拒絕任務(wù)。

      這里出現(xiàn)了幾個(gè)重要的方法,isRunning()判斷線程池是否運(yùn)行、workQueue.offer()加入阻塞隊(duì)列、remove()會(huì)滾移除任務(wù)、reject()拒絕任務(wù)以及更核心的addWorker()添加工人方法。后面會(huì)在聊

      通過上面這一小段代碼,我們就已經(jīng)完整地看到了通過一個(gè) ctl 變量進(jìn)行全局狀態(tài)控制,從而保證了線程安全性。整個(gè)框架并沒有使用鎖,但是卻是線程安全的

      2.1 submit

          /**
           * @throws RejectedExecutionException {@inheritDoc}
           * @throws NullPointerException       {@inheritDoc}
           */
          public Future<?> submit(Runnable task) {
              if (task == null) throw new NullPointerException();
              RunnableFuture<Void> ftask = newTaskFor(task, null);
              execute(ftask);
              return ftask;
          }
      
          /**
           * @throws RejectedExecutionException {@inheritDoc}
           * @throws NullPointerException       {@inheritDoc}
           */
          public <T> Future<T> submit(Runnable task, T result) {
              if (task == null) throw new NullPointerException();
              RunnableFuture<T> ftask = newTaskFor(task, result);
              execute(ftask);
              return ftask;
          }
      
          /**
           * @throws RejectedExecutionException {@inheritDoc}
           * @throws NullPointerException       {@inheritDoc}
           */
          public <T> Future<T> submit(Callable<T> task) {
              if (task == null) throw new NullPointerException();
              RunnableFuture<T> ftask = newTaskFor(task);
              execute(ftask);
              return ftask;
          }
      
      

      submit本身是父類繼承過來的,可以看到有三個(gè)重載方法,Runnable為無參數(shù)無返回、Callable為無參數(shù)有返回。

      三個(gè)方法本質(zhì)就是封裝任務(wù)和返回結(jié)果為 RunnableFuture, 統(tǒng)一交由具體的子類執(zhí)行,然后調(diào)用execute方法。

      future類后面單獨(dú)聊

      2.2 invoke


      worker

      無論是execute還是submit,真正干活的是worker。你不干我不干老板何時(shí)才能提bwm

      2.1 worker

      /**
       * Worker 類主要用于維護(hù)正在運(yùn)行任務(wù)的線程的中斷控制狀態(tài)以及其他一些輔助信息。
       * 這個(gè)類擴(kuò)展了 AbstractQueuedSynchronizer,簡(jiǎn)化了每次任務(wù)執(zhí)行時(shí)鎖的獲取和釋放。
       * 這樣可以防止中斷原本用于喚醒等待任務(wù)的線程卻中斷了正在運(yùn)行的任務(wù)。
       * 實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的非重入互斥鎖而不是使用 ReentrantLock,因?yàn)槲覀儾幌M蝿?wù)在調(diào)用
       * 線程池控制方法(如 setCorePoolSize)時(shí)能夠重新獲取鎖。
       * 此外,為了抑制線程實(shí)際開始運(yùn)行任務(wù)前的中斷,我們將鎖狀態(tài)初始化為負(fù)值,并在啟動(dòng)時(shí)清除(在 runWorker 中)。
       */
      private final class Worker
          extends AbstractQueuedSynchronizer
          implements Runnable
      {
          /**
           * 該類永遠(yuǎn)不會(huì)被序列化,但我們提供一個(gè) serialVersionUID 來抑制 javac 警告。
           */
          private static final long serialVersionUID = 6138294804551838833L;
      
          /** 當(dāng)前線程正在運(yùn)行的線程。如果工廠創(chuàng)建失敗,則為 null。 */
          @SuppressWarnings("serial") // 不太可能被序列化
          final Thread thread;
          /** 初始任務(wù)??赡転?null。 */
          @SuppressWarnings("serial") // 未靜態(tài)類型化為 Serializable
          Runnable firstTask;
          /** 每個(gè)線程的任務(wù)計(jì)數(shù)器 */
          volatile long completedTasks;
      
          // TODO: 切換到 AbstractQueuedLongSynchronizer 并將 completedTasks 移動(dòng)到鎖字中。
      
          /**
           * 使用給定的第一個(gè)任務(wù)和 ThreadFactory 創(chuàng)建線程。
           * @param firstTask 第一個(gè)任務(wù)(如果沒有則為 null)
           */
          Worker(Runnable firstTask) {
              setState(-1); // 抑制直到 runWorker 開始前的中斷
              this.firstTask = firstTask;
              this.thread = getThreadFactory().newThread(this);
          }
      
          /** 委托主要的運(yùn)行循環(huán)到外部的 runWorker 方法。 */
          public void run() {
              runWorker(this);
          }
      
          // 鎖方法
          //
          // 值 0 表示未鎖定狀態(tài)。
          // 值 1 表示鎖定狀態(tài)。
      
          protected boolean isHeldExclusively() {
              return getState() != 0; // 如果狀態(tài)不為 0,則表示已鎖定
          }
      
          protected boolean tryAcquire(int unused) {
              if (compareAndSetState(0, 1)) { // 嘗試將狀態(tài)從 0 設(shè)置為 1
                  setExclusiveOwnerThread(Thread.currentThread()); // 設(shè)置當(dāng)前線程為獨(dú)占所有者
                  return true;
              }
              return false;
          }
      
          protected boolean tryRelease(int unused) {
              setExclusiveOwnerThread(null); // 清除獨(dú)占所有者
              setState(0); // 將狀態(tài)設(shè)置為 0
              return true;
          }
      
          public void lock() { // 鎖定
              acquire(1);
          }
      
          public boolean tryLock() { // 嘗試鎖定
              return tryAcquire(1);
          }
      
          public void unlock() { // 解鎖
              release(1);
          }
      
          public boolean isLocked() { // 檢查是否鎖定
              return isHeldExclusively();
          }
      
          /**
           * 如果線程已啟動(dòng)且未被中斷,則中斷線程。
           */
          void interruptIfStarted() {
              Thread t;
              if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                  try {
                      t.interrupt(); // 中斷線程
                  } catch (SecurityException ignore) {
                      // 忽略安全異常
                  }
              }
          }
      }
      
      

      這個(gè)類有點(diǎn)多我們一點(diǎn)點(diǎn)分析,首先他繼承于aqs,經(jīng)常背八股文應(yīng)該知道使用aqs簡(jiǎn)化了每次任務(wù)執(zhí)行時(shí)鎖的獲取和釋放。(關(guān)于aqs可以單獨(dú)看這篇文章[]{}。

      • 基本屬性:thread運(yùn)行的線程、firstTask 初始任務(wù)、completedTasks程序計(jì)數(shù)器。構(gòu)造器在初始化的時(shí)候設(shè)置aqs狀態(tài)為-1setState(-1),同時(shí)使用ThreadFactory 去創(chuàng)建線程。

      • 基本方法:run()委托主要的運(yùn)行循環(huán)到外部的 runWorker 方法;interruptIfStarted()如果線程已啟動(dòng)且未被中斷,則中斷線程。

      • 鎖方法:isHeldExclusively boolean值:表示未鎖定狀態(tài)。值 1 表示鎖定狀態(tài)。

        ? tryAcquire() 原子操作設(shè)置當(dāng)前線程

        ? tryRelease 原子操作 清除線程占有

        ? lock 枷鎖

        ? tryLock 嘗試枷鎖

        ? unlock 釋放鎖

        ? isLocked 是否鎖定

        到這里worker類屬性方法基本理清了,但是這個(gè)類到底是干嘛用的 還是有點(diǎn)模糊吧。worker類只提供了一個(gè)功能性方法就是run 也就是執(zhí)行方法 參數(shù)為Runnable, 也就是可以理解為worker是一個(gè)維護(hù)處理一個(gè)線程和一個(gè)任務(wù)的類。

        ?

      2.2 addWorker

      /**
      檢查是否可以根據(jù)當(dāng)前池狀態(tài)和給定的綁定 (核心或最大值) 添加新的工作線程。
      如果是這樣,則相應(yīng)地調(diào)整工作線程計(jì)數(shù),如果可能,將創(chuàng)建并啟動(dòng)新的工作線程,并將firstTask作為其第一個(gè)任務(wù)運(yùn)行。
      如果池已停止或有資格關(guān)閉,則此方法返回false。如果線程工廠在詢問時(shí)無法創(chuàng)建線程,它也會(huì)返回false。
      如果線程創(chuàng)建失敗,要么是由于線程工廠返回null,要么是由于異常 (通常是thread. start中的OutOfMemoryError ()), 我們會(huì)干凈地回滾。
      firstTask -新線程應(yīng)首先運(yùn)行的任務(wù) (如果沒有,則為null)。
      創(chuàng)建worker的初始第一個(gè)任務(wù) (在方法execute() 中),以便在少于corePoolSize線程 (在這種情況下,我們總是啟動(dòng)一個(gè)) 或隊(duì)列已滿 (在這種情況下,我們必須繞過隊(duì)列) 時(shí)繞過隊(duì)列。
      最初空閑的線程通常通過prestartCoreThread創(chuàng)建或替換其他垂死的工人。
      core -如果為true,則使用corePoolSize作為綁定,否則使用maximumPoolSize。(這里使用布爾指示器而不是一個(gè)值,以確保在檢查其他池狀態(tài)后讀取新鮮值)。
      退貨:
      如果成功,則為true
      **/
      private boolean addWorker(Runnable firstTask, boolean core) {
          retry:
          for (int c = ctl.get(); ; ) {
              // Check if queue empty only if necessary.
              if (runStateAtLeast(c, SHUTDOWN)
                  && (runStateAtLeast(c, STOP)
                      || firstTask != null
                      || workQueue.isEmpty())) {
                  // 如果線程池處于關(guān)閉狀態(tài)或者停止?fàn)顟B(tài),或者隊(duì)列為空,則返回 false
                  return false;
              }
      
              for (;;) {
                  if (workerCountOf(c)
                      >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) {
                      // 如果當(dāng)前活動(dòng)線程數(shù)已經(jīng)達(dá)到核心線程數(shù)或最大線程數(shù),則返回 false
                      return false;
                  }
                  if (compareAndIncrementWorkerCount(c)) {
                      // 嘗試增加一個(gè)線程計(jì)數(shù),如果成功則跳出循環(huán)
                      break retry;
                  }
                  c = ctl.get();  // 重新讀取 ctl
                  if (runStateAtLeast(c, SHUTDOWN)) {
                      // 如果線程池處于關(guān)閉狀態(tài),則繼續(xù)重試
                      continue retry;
                  }
                  // 否則 CAS 失敗是由于 workerCount 改變,重試內(nèi)循環(huán)
              }
          }
      
          boolean workerStarted = false;
          boolean workerAdded = false;
          Worker w = null;
          try {
              w = new Worker(firstTask);
              final Thread t = w.thread;
              if (t != null) {
                  final ReentrantLock mainLock = this.mainLock;
                  mainLock.lock();
                  try {
                      // 再次檢查狀態(tài),確保線程工廠沒有失敗并且線程池沒有關(guān)閉
                      int c = ctl.get();
      
                      if (isRunning(c) ||
                          (runStateLessThan(c, STOP) && firstTask == null)) {
                          // 如果線程池正在運(yùn)行,或者狀態(tài)小于 STOP 且沒有初始任務(wù)
                          if (t.getState() != Thread.State.NEW) {
                              // 如果線程狀態(tài)不是 NEW,則拋出異常
                              throw new IllegalThreadStateException();
                          }
                          workers.add(w);  // 添加 Worker 到集合中
                          workerAdded = true;
                          int s = workers.size();
                          if (s > largestPoolSize) {
                              // 更新最大線程數(shù)
                              largestPoolSize = s;
                          }
                      }
                  } finally {
                      mainLock.unlock();  // 釋放鎖
                  }
                  if (workerAdded) {
                      container.start(t);  // 啟動(dòng)線程
                      workerStarted = true;
                  }
              }
          } finally {
              if (!workerStarted) {
                  // 如果線程啟動(dòng)失敗,則回滾
                  addWorkerFailed(w);
              }
          }
          return workerStarted;
      }
      
      

      看似也很多,實(shí)際并不復(fù)雜。前面兩個(gè)循環(huán)就可以理解為業(yè)務(wù)代碼里的數(shù)據(jù)校驗(yàn):

      • 檢查線程池狀態(tài)是否合法,是否可以新建線程
      • 用于嘗試增加線程計(jì)數(shù),并處理 CAS 操作可能失敗的情況。

      就是為了在多線程的情況下能夠處理線程池狀態(tài)變化和cas計(jì)數(shù)器的變化,從而保證線程池行為正常;下面異常捕獲里才是主邏輯:

      • 創(chuàng)建worker
        • w = new Worker(firstTask);:創(chuàng)建一個(gè)新的 Worker 對(duì)象。
        • final Thread t = w.thread;:獲取 Worker 對(duì)象中的線程。
        • mainLock.lock();:鎖定 mainLock。主要是對(duì)worker集合的操作線程安全
      • 再次檢查狀態(tài)
        • int c = ctl.get();:重新讀取 ctl 的值。
        • if (isRunning(c) || ...):檢查線程池是否正在運(yùn)行,或者狀態(tài)小于 STOP 且沒有初始任務(wù)。
        • if (t.getState() != Thread.State.NEW):檢查線程狀態(tài)是否為 NEW。
        • workers.add(w);:將 Worker 添加到集合中。
        • workerAdded = true;:標(biāo)記已添加 Worker。
        • int s = workers.size();:獲取當(dāng)前 Worker 集合的大小。
        • largestPoolSize = s;:更新最大線程數(shù)
      • 解鎖并啟動(dòng)線程
        • mainLock.unlock();:釋放鎖。
        • container.start(t);:?jiǎn)?dòng)線程。
        • workerStarted = true;:標(biāo)記線程已啟動(dòng)。
      • 會(huì)滾和返回
        • if (!workerStarted):如果線程啟動(dòng)失敗,則回滾。
        • addWorkerFailed(w);:處理失敗情況。
        • return workerStarted;:返回線程是否成功啟動(dòng)的結(jié)果。

      2.3 runWorker

      final void runWorker(Worker w) {
          Thread wt = Thread.currentThread();  // 獲取當(dāng)前線程
          Runnable task = w.firstTask;        // 獲取 Worker 對(duì)象的第一個(gè)任務(wù)
          w.firstTask = null;                 // 清空 Worker 對(duì)象的第一個(gè)任務(wù)
          w.unlock();                         // 解鎖,允許中斷
      
          boolean completedAbruptly = true;   // 標(biāo)記線程是否突然退出,默認(rèn)為 true
      
          try {
              while (task != null || (task = getTask()) != null) {  // 循環(huán)處理任務(wù)
                  w.lock();                                           // 加鎖
      
                  // 如果線程池正在停止,確保線程被中斷;否則,確保線程不被中斷。
                  // 這里需要在第二種情況下進(jìn)行重新檢查,以處理在清除中斷標(biāo)志時(shí)的 `shutdownNow` 競(jìng)態(tài)條件
                  if ((runStateAtLeast(ctl.get(), STOP) ||
                       (Thread.interrupted() &&
                        runStateAtLeast(ctl.get(), STOP))) &&
                      !wt.isInterrupted()) {
                      wt.interrupt();  // 中斷當(dāng)前線程
                  }
      
                  try {
                      beforeExecute(wt, task);  // 執(zhí)行任務(wù)前的鉤子方法
      
                      try {
                          task.run();           // 執(zhí)行任務(wù)
                          afterExecute(task, null);  // 執(zhí)行任務(wù)后的鉤子方法
                      } catch (Throwable ex) {
                          afterExecute(task, ex);    // 處理任務(wù)執(zhí)行中的異常
                          throw ex;                  // 重新拋出異常
                      }
                  } finally {
                      task = null;                  // 清空任務(wù)
                      w.completedTasks++;           // 增加已完成的任務(wù)數(shù)
                      w.unlock();                   // 解鎖
                  }
              }
              completedAbruptly = false;  // 標(biāo)記線程正常退出
          } finally {
              processWorkerExit(w, completedAbruptly);  // 處理 Worker 線程退出
          }
      }
      
      

      這里執(zhí)行的方法就很容易讀了

      • 初始化變量
        • Thread wt = Thread.currentThread(); // 獲取當(dāng)前線程
        • Runnable task = w.firstTask; // 獲取 Worker 對(duì)象的第一個(gè)任務(wù)
        • w.firstTask = null; // 清空 Worker 對(duì)象的第一個(gè)任務(wù)
        • w.unlock(); // 解鎖,允許中斷
      • 狀態(tài)校驗(yàn):雖然寫的一大堆 但總結(jié)一句話就是狀態(tài)是stop或者之上就再次給你中斷一下。
      • 執(zhí)行邏輯:這里有兩個(gè)抽象方法afterExecute和beforeExecute供給你做一些自定義的邏輯。finally需要清空任務(wù)增加任務(wù)數(shù)同時(shí)解鎖。
      • processWorkerExit()方法 可以理解為工人生死簿
        • 如果completedAbruptly=true就是線程意外退出則減少計(jì)數(shù)
        • 更新完成的任務(wù),完成了就把worker在集合中移除,判處死刑
        • 所有任務(wù)都完成了 會(huì)嘗試中止線程池
        • 也會(huì)根據(jù)線程池的狀態(tài)考慮是否需要增加worker或移除

      2.4總結(jié)

      到這里線程池核心干活的worker這部分就完事了,總結(jié)一下:

      在線程池一個(gè)worker對(duì)應(yīng)一個(gè)任務(wù)一個(gè)線程,worker的創(chuàng)建依賴于線程狀態(tài),任務(wù)的執(zhí)行也是在worker里去處理。這里有個(gè)小技巧 worker類有一個(gè)firstTask在每個(gè)任務(wù)執(zhí)行后會(huì)置空這個(gè)字段,同時(shí)下個(gè)任務(wù)來又繼續(xù)如此,小細(xì)節(jié)避免了數(shù)據(jù)競(jìng)爭(zhēng)也是線程安全的。

      失敗處理

      線程池拒絕策略是實(shí)現(xiàn)的RejectedExecutionHandler接口內(nèi)置有4個(gè)

      • CallerRunsPolicy:調(diào)用者執(zhí)行策略,當(dāng)線程池線程數(shù)滿時(shí),它不再丟給線程池執(zhí)行,也不丟棄掉,而是自己線程來執(zhí)行,把異步任務(wù)變成同步任務(wù)。
      • AbortPolicy:默認(rèn)拒絕策略,直接拋出異常
      • DiscardPolicy:丟棄策略會(huì)在提交任務(wù)失敗時(shí)默默地把任務(wù)丟棄掉,失敗就失敗,完全不管它。其底層實(shí)現(xiàn)源碼就是啥也不干
      • DiscardOldestPolicy:丟棄最老任務(wù)策略,它就是目前等待最長時(shí)間也是最老的任務(wù)。
      /**
           * Invokes the rejected execution handler for the given command.
           * Package-protected for use by ScheduledThreadPoolExecutor.
           */
          final void reject(Runnable command) {
              // 拒絕策略是在構(gòu)造方法時(shí)傳入的,默認(rèn)為 RejectedExecutionHandler
              // 即用戶只需實(shí)現(xiàn) rejectedExecution 方法,即可以自定義拒絕策略了
              handler.rejectedExecution(command, this);
          }
      

      AbortPolicy 中止策略,線程池默認(rèn)的拒絕策略,也是我們最常用的拒絕策略。當(dāng)系統(tǒng)線程池滿載的時(shí)候,可以通過異常的形式告知使用方,交由使用方自行處理。一般出現(xiàn)此異常時(shí),我們可以提示用戶稍后再試,或者我們把未執(zhí)行的任務(wù)記錄下來,等到適當(dāng)時(shí)機(jī)再次執(zhí)行。

      shutdown

      在前文,我們提到線程池通過 ctl 一共可以表示五種狀態(tài)

      • RUNNING:運(yùn)行狀態(tài)。線程池接受并處理新任務(wù)。
      • SHUTDOWN :關(guān)閉狀態(tài)。線程池不能接受新任務(wù),處理完剩余任務(wù)后關(guān)閉。調(diào)用 shutdown 方法會(huì)進(jìn)入該狀態(tài)。
      • STOP:停止?fàn)顟B(tài)。線程池不能接受新任務(wù),并且嘗試中斷舊任務(wù)。調(diào)用 shutdownNow 方法會(huì)進(jìn)入該狀態(tài)。
      • TIDYING:整理狀態(tài)。由關(guān)閉狀態(tài)轉(zhuǎn)變,線程池任務(wù)隊(duì)列為空且沒有任何工作線程時(shí)時(shí)進(jìn)入該狀態(tài),會(huì)調(diào)用 terminated 方法。
      • TERMINATED:終止?fàn)顟B(tài)。terminated 方法執(zhí)行完畢后進(jìn)入該狀態(tài),線程池徹底停止。

      它們具體的流轉(zhuǎn)關(guān)系可以參考下圖:

      CleanShot 2024-09-11 at 17.13.37@2x


      2.1 shutdown()

      shutdown 是正常關(guān)閉,這個(gè)方法主要做了這幾件事:

      1. 改變當(dāng)前線程池狀態(tài)為 SHUTDOWN;
      2. 將線程池中的空閑工作線程標(biāo)記為中斷;
      3. 完成上述過程后將線程池狀態(tài)改為 TIDYING;
      4. 此后等到最后一個(gè)線程也退出后則將狀態(tài)改為 TERMINATED。
        public void shutdown() {
          final ReentrantLock mainLock = this.mainLock;
          // 加鎖
          mainLock.lock();
          try {
              checkShutdownAccess();
              // 將線程池狀態(tài)改為 SHUTDOWN
              advanceRunState(SHUTDOWN);
              // 中斷線程池中的所有空閑線程
              interruptIdleWorkers();
              // 鉤子函數(shù),默認(rèn)空實(shí)現(xiàn)
              onShutdown(); // hook for ScheduledThreadPoolExecutor
          } finally {
              mainLock.unlock();
          }
          tryTerminate();
      }
      

      2.2 shutdownNow()

      shutdownNowshutdown 流程類似,不過它是立即停機(jī),因此在細(xì)節(jié)上又有點(diǎn)區(qū)別:

      1. 改變當(dāng)前線程池狀態(tài)為 STOP;
      2. 將線程池中的所有工作線程標(biāo)記為中斷;
      3. 將任務(wù)隊(duì)列中的任務(wù)全部移除;
      4. 完成上述過程后將線程池狀態(tài)改為 TIDYING;
      5. 此后等到最后一個(gè)線程也退出后則將狀態(tài)改為 TERMINATED。
      public List<Runnable> shutdownNow() {
          List<Runnable> tasks;
          final ReentrantLock mainLock = this.mainLock;
          mainLock.lock();
          try {
              checkShutdownAccess();
              // 將線程池狀態(tài)改為 STOP
              advanceRunState(STOP);
              // 中斷線程池中的所有線程
              interruptWorkers();
              // 刪除任務(wù)隊(duì)列中的任務(wù)
              tasks = drainQueue();
          } finally {
              mainLock.unlock();
          }
          tryTerminate();
          return tasks;
      }
      

      看到這里是不是回想起上面addWorker方法中對(duì)stop狀態(tài)做進(jìn)一步的區(qū)別。

      2.3 真正的停機(jī)

      我們已經(jīng)知道通過 shutdownNowshutdown可以觸發(fā)停機(jī)流程,當(dāng)兩個(gè)方法執(zhí)行完畢后,線程池將會(huì)進(jìn)入 STOP 或者 SHUTDOWN 狀態(tài)。

      但是此時(shí)線程池并未真正的停機(jī),還記得runWorker方法中捕獲后面有個(gè)finally塊調(diào)用的processWorkerExit方法嗎,里面調(diào)用了一個(gè) tryTerminate 方法:

      final void tryTerminate() {
          for (;;) {
              int c = ctl.get();
              // 如果線程池不處于預(yù)停機(jī)狀態(tài),則不進(jìn)行停機(jī)
              if (isRunning(c) ||
                  runStateAtLeast(c, TIDYING) ||
                  (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                  return;
              // 如果當(dāng)前還有工作線程,則不進(jìn)行停機(jī)
              if (workerCountOf(c) != 0) {
                  interruptIdleWorkers(ONLY_ONE);
                  return;
              }
      
              // 線程現(xiàn)在處于預(yù)停機(jī)狀態(tài),嘗試進(jìn)行停機(jī)
              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              try {
                  // 嘗試通過 CAS 將線程池狀態(tài)修改為 TIDYING
                  if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                      try {
                          terminated();
                      } finally {
                          // 嘗試通過 CAS 將線程池狀態(tài)修改為 TERMINATED
                          ctl.set(ctlOf(TERMINATED, 0));
                          termination.signalAll();
                      }
                      return;
                  }
              } finally {
                  mainLock.unlock();
              }
              // 進(jìn)入下一次循環(huán)
          }
      }
      

      簡(jiǎn)單的來說,由于在當(dāng)我們調(diào)用了停機(jī)方法時(shí),實(shí)際上工作線程仍然還在執(zhí)行任務(wù),我們可能并沒有辦法立刻終止線程的執(zhí)行。

      因此,每個(gè)線程執(zhí)行完任務(wù)并且開始退出時(shí),它都有可能是線程池中最后一個(gè)線程,此時(shí)它就需要承擔(dān)起后續(xù)的收尾工作:

      1. 將線程池狀態(tài)修改為 TIDYING
      2. 調(diào)用 terminated 回調(diào)方法,觸發(fā)自定義停機(jī)邏輯;
      3. 將線程池狀態(tài)修改為 TERMINATED
      4. 喚醒通過 awaitTerminated 阻塞的外部線程。

      至此,線程池就真正的停機(jī)了。

      再聊狀態(tài)值

      通過上面的過程,可以看到,整個(gè)ThreadPoolExecutor 非狀態(tài)的依賴是非常強(qiáng)的。所以一個(gè)好的狀態(tài)值的設(shè)計(jì)就顯得很重要了。

      。。。。。

      線程池與線程狀態(tài)

      RUNNING:
      線程池接受新任務(wù),并處理阻塞隊(duì)列中的任務(wù)。
      工作線程正常運(yùn)行,執(zhí)行任務(wù)。

      SHUTDOWN:
      線程池不再接受新任務(wù),但是仍然處理阻塞隊(duì)列中的任務(wù)。
      工作線程繼續(xù)執(zhí)行隊(duì)列中的任務(wù),直到隊(duì)列為空。

      STOP:
      線程池不再接受新任務(wù),并且嘗試取消正在執(zhí)行的任務(wù),同時(shí)不處理阻塞隊(duì)列中的任務(wù)。
      工作線程會(huì)被中斷,試圖盡快退出。

      TIDYING_UP:
      所有的任務(wù)都已經(jīng)完成,所有工作線程都在調(diào)用 workerCompleted() 方法。
      線程池正在清理階段,準(zhǔn)備進(jìn)入 TERMINATED 狀態(tài)。

      TERMINATED:
      線程池已完成所有任務(wù),所有工作線程都已經(jīng)終止。
      線程池處于最終狀態(tài),不能再接收新任務(wù)。
      線程池中的線程本身也會(huì)處于操作系統(tǒng)的線程狀態(tài)中的一種,比如:
      運(yùn)行狀態(tài)(Running):線程正在執(zhí)行任務(wù)。
      阻塞狀態(tài)(Blocked):線程正在等待某個(gè)鎖或其他資源。
      等待狀態(tài)(Waiting):線程在等待某種條件成立,如等待任務(wù)隊(duì)列中的任務(wù)。
      就緒狀態(tài)(Ready):線程已經(jīng)準(zhǔn)備好執(zhí)行,等待CPU調(diào)度。
      休眠狀態(tài)(Timed Waiting):線程在等待一段時(shí)間后被喚醒,如 Thread.sleep()。
      通過合理地管理線程的狀態(tài),線程池能夠有效地利用系統(tǒng)資源,提高并發(fā)任務(wù)的執(zhí)行效率

      未完待續(xù)。。。。


      回首

      回答一下上面的思考:

      1. 線程池對(duì)線程的管理,不予回答

      2. 線程池減少開銷如何實(shí)現(xiàn):

        減少線程創(chuàng)建開銷:

        創(chuàng)建一個(gè)新的線程涉及到分配內(nèi)存、初始化線程上下文以及操作系統(tǒng)內(nèi)核中的線程調(diào)度器對(duì)新線程的注冊(cè)等一系列操作,這些操作都需要一定的計(jì)算資源。
        使用線程池可以在程序啟動(dòng)時(shí)預(yù)先創(chuàng)建一定數(shù)量的線程,并讓這些線程處于等待任務(wù)的狀態(tài)。當(dāng)有新的任務(wù)到來時(shí),可以直接使用這些預(yù)先創(chuàng)建好的線程來執(zhí)行任務(wù),而無需再進(jìn)行線程創(chuàng)建的過程。

        減少線程銷毀開銷:
        當(dāng)一個(gè)線程執(zhí)行完任務(wù)后,如果沒有線程池,這個(gè)線程就會(huì)被銷毀。銷毀線程同樣涉及到釋放內(nèi)存、清理線程上下文以及操作系統(tǒng)內(nèi)核中的注銷操作。
        線程池中的線程在執(zhí)行完一個(gè)任務(wù)后并不會(huì)立即銷毀,而是等待新的任務(wù)到來。這避免了頻繁的線程銷毀操作,節(jié)省了資源。

        線程復(fù)用:

        線程池中的線程可以被多次復(fù)用來執(zhí)行不同的任務(wù),減少了線程創(chuàng)建和銷毀的頻率,從而提高了程序的性能。

      未完待續(xù)。。。。。

      posted @ 2024-09-12 17:33  早點(diǎn)下班回家吃飯叻  閱讀(145)  評(píng)論(0)    收藏  舉報(bào)
      主站蜘蛛池模板: 野外做受三级视频| 国产精品黄色精品黄色大片| 国产另类ts人妖一区二区| 2021国产精品一卡2卡三卡4卡| 国产精品国产片在线观看| 欧美s码亚洲码精品m码| 不卡一区二区国产在线| 国产午夜福利视频第三区| 青青草一区二区免费精品| 国产日韩久久免费影院| 亚洲精品一区二区天堂| 亚洲中文字幕久在线| 国产99视频精品免费观看9| 一区二区三区精品偷拍| 亚洲精品亚洲人成人网| 粉嫩jk制服美女啪啪| 日本熟妇XXXX潮喷视频| 亚洲综合网中文字幕在线| 一区二区三区国产综合在线| 国产亚洲精品在av| 日本中文字幕在线播放| 国产在线自拍一区二区三区| 免费a级黄毛片| 强行交换配乱婬bd| 性色av一区二区三区v视界影院| av无码小缝喷白浆在线观看| 极品粉嫩小泬无遮挡20p| 99国产欧美久久久精品蜜芽| 国产高清无遮挡内容丰富 | 91精品国产自产91精品| 国产精品久久久久久久久人妻| 曰韩精品无码一区二区三区视频| 亚洲精品香蕉一区二区| 襄樊市| 秋霞电影网| 精品人妻丰满久久久a| 东方四虎在线观看av| 中文字幕日韩精品国产| 日韩一区二区三区精彩视频| 欧美高清精品一区二区| 亚洲熟妇自偷自拍另欧美|