<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      并發(fā)編程--上篇

      Java并發(fā)探索--上篇

      1.基本概念

      • 線程與進(jìn)程:線程是程序執(zhí)行的最小單位,而進(jìn)程是系統(tǒng)進(jìn)行資源分配和調(diào)度的基本單位。例如,一個(gè) Java 程序可以包含多個(gè)線程,它們共享進(jìn)程的資源。
      • 并發(fā)與并行:并發(fā)是指多個(gè)任務(wù)在同一時(shí)間段內(nèi)執(zhí)行,而并行是指多個(gè)任務(wù)在同一時(shí)刻執(zhí)行。在多核 CPU 系統(tǒng)中,可以實(shí)現(xiàn)真正的并行。
      • 同步與異步:同步是指程序按照順序依次執(zhí)行,而異步是指程序在執(zhí)行某個(gè)任務(wù)時(shí),不需要等待該任務(wù)完成,可以繼續(xù)執(zhí)行其他任務(wù)。

      “Java并發(fā)探索--下篇” --- 在下面找

      博客園

      http://www.rzrgm.cn/jackjavacpp

      CSDN

      https://blog.csdn.net/okok__TXF

      2.探索線程的創(chuàng)建

      ①線程的狀態(tài)

      Thread源碼里面看出

      public enum State {
       	// 尚未啟動(dòng)的線程的線程狀態(tài)。
          NEW,
      	// 就緒
          RUNNABLE,
       	// 等待監(jiān)視器鎖的線程的線程狀態(tài)
          BLOCKED,
      	/*
      	等待線程的線程狀態(tài),線程由于調(diào)用以下方法之一而處于等待狀態(tài):
      	Object.wait() 沒(méi)有超時(shí)
      	Thread.join() 沒(méi)有超時(shí)
      	LockSupport.park()
      	*/
          WAITING,
      	/*
      	指定等待時(shí)間的等待線程的線程狀態(tài)
      	線程處于定時(shí)等待狀態(tài),因?yàn)檎{(diào)用了以下方法之一,并指定等待時(shí)間:
      	Thread.sleep
          Object.wait with timeout
          Thread.join with timeout
          LockSupport.parkNanos
          LockSupport.parkUntil
      	*/
          TIMED_WAITING,
      	//終止線程的線程狀態(tài)。線程已完成執(zhí)行。
          TERMINATED;
      }
      

      下面看一張圖,很清楚的解釋了各狀態(tài)之間的關(guān)系:【節(jié)選自https://blog.csdn.net/agonie201218/article/details/128712507】

      在Java中,一個(gè)Thread有大致六個(gè)狀態(tài)。

      線程創(chuàng)建之后(new Thread)它將處于 NEW(新建) 狀態(tài),調(diào)用 start() 方法后開(kāi)始運(yùn)行,線程這時(shí)候處于 RUNNABLE(就緒) 狀態(tài)。可運(yùn)行狀態(tài)的線程獲得了 CPU 時(shí)間片后就處于 RUNNING(運(yùn)行) 狀態(tài)。

      明白了線程的運(yùn)行狀態(tài),接下來(lái)讓我們來(lái)看一下在爪哇里面如何創(chuàng)建并且啟動(dòng)線程。

      ②線程創(chuàng)建

      1)兩種基本方式

      • 繼承Thread類(lèi),重寫(xiě)run方法
      public class MyThread1 extends Thread {
          @Override
          public void run() {
              System.out.println(Thread.currentThread().getName() + ": hello world");
          }
      }
      public class JUCMain {
          public static void main(String[] args) {
              new MyThread1().start();
          }
      }
      
      • 實(shí)現(xiàn)Runnable接口,傳入Thread
      public class Runnable1 implements Runnable{
          @Override
          public void run() {
              System.out.println("hello world, Runnable");
          }
      }
      public class JUCMain {
          public static void main(String[] args) {
              new Thread(new Runnable1()).start();
          }
      }
      

      網(wǎng)上還傳有其他創(chuàng)建線程的方式,比如: Callable接口,重寫(xiě)call,結(jié)合FutureTask;線程池;lambda表達(dá)式等等。。。誠(chéng)然,這也確實(shí)是創(chuàng)建線程啟動(dòng)的方式不錯(cuò)。但是本文畢竟是探索性質(zhì)的文章,我們要探索其本質(zhì)。

      首先從start()方法看起(這個(gè)方式屬于Thread類(lèi)的)。調(diào)用start()后,JVM會(huì)創(chuàng)建一個(gè)新線程并執(zhí)行該線程的run()方法。注意:直接調(diào)用run()不會(huì)啟動(dòng)新線程,而是在當(dāng)前線程中執(zhí)行。

      // 啟動(dòng)線程并觸發(fā) JVM 創(chuàng)建原生線程
      // synchronized后面解釋【見(jiàn) 探索“鎖”】
      public synchronized void start() {
          // 零狀態(tài)值對(duì)應(yīng)于狀態(tài) “NEW”
          // 線程想要start,必須是為0的狀態(tài)
          if (threadStatus != 0)
              throw new IllegalThreadStateException();
          /*
          group 是線程所屬的線程組。這行代碼將當(dāng)前線程實(shí)例添加到線程組中,
          同時(shí)線程組的未啟動(dòng)線程計(jì)數(shù)會(huì)減1。
          */
          group.add(this);
          boolean started = false;
          try {
              start0(); //關(guān)鍵!調(diào)用本地方法(native)
              started = true;
          } finally {
              try {
                  if (!started) { //啟動(dòng)失敗時(shí)回滾
                      //如果 started 為 false,說(shuō)明線程啟動(dòng)失敗,
                      //調(diào)用 group.threadStartFailed(this) 方法通知線程組該線程啟動(dòng)失敗。
                      group.threadStartFailed(this);
                  }
              } catch (Throwable ignore) {
                  /* do nothing. If start0 threw a Throwable then
                        it will be passed up the call stack */
              }
          }
      }
      //========== native
      private native void start0();
      

      那么執(zhí)行的是run()方法,run方法里面是啥呢

      private Runnable target; // target是Runnable類(lèi)型
      
      @Override
      public void run() {
          if (target != null) {
              target.run();
          }
      }
      

      如果繼承Thread類(lèi)后,重寫(xiě)run()方法,那么run方法就會(huì)覆蓋上面的方法。

      如果是實(shí)現(xiàn)的Runnable接口,new Thread(new Runnable1())的時(shí)候,就會(huì)把target賦值,然后調(diào)用run()方法的時(shí)候,就執(zhí)行的是target的run方法了。

      2) 其他創(chuàng)建方式

      .lambda
      • lambda表達(dá)式創(chuàng)建:這個(gè)僅僅是寫(xiě)法不同而已。因?yàn)镽unnable是個(gè)函數(shù)式接口
      @FunctionalInterface
      public interface Runnable {
          public abstract void run();
      }
      
      .callable
      • Callable創(chuàng)建的方式
      public class MyCall implements Callable<String> {
          @Override
          public String call() throws Exception {
              Thread.sleep(2000);
              return "Hello Callable";
          }
      }
      
      public static void main(String[] args) throws ExecutionException, InterruptedException {
          FutureTask<String> task = new FutureTask<>(new MyCall());
          new Thread(task).start();
          System.out.println(task.get());
      }
      

      new Thread(Runnable runnable)要求傳的類(lèi)型是Runnable,但是現(xiàn)在傳的是FutureTask。所以先來(lái)看一看FutureTask和Runnable之間有什么聯(lián)系.

      從上面可以看到,F(xiàn)utureTask實(shí)現(xiàn)了RunnableFuture接口,然后RunnableFuture接口繼承了Future和Runnable兩個(gè)接口。

      Future<V>

      Future 接口是 Java 并發(fā)編程中的一個(gè)重要接口,位于 java.util.concurrent 包下,它代表了一個(gè)異步計(jì)算的結(jié)果。異步計(jì)算意味著在調(diào)用方法后,程序不會(huì)立即等待結(jié)果返回,而是可以繼續(xù)執(zhí)行其他任務(wù),當(dāng)結(jié)果準(zhǔn)備好時(shí),再通過(guò) Future 對(duì)象獲取。

      // 這里使用了泛型 <V>,表示該 Future 對(duì)象所代表的異步計(jì)算結(jié)果的類(lèi)型。
      public interface Future<V> {
      
          //嘗試取消異步任務(wù)的執(zhí)行。
          /*
          如果任務(wù)已經(jīng)完成、已經(jīng)被取消或者由于其他原因無(wú)法取消,則返回 false;
          如果任務(wù)成功取消,則返回 true。
          */
          boolean cancel(boolean mayInterruptIfRunning);
      
          //如果任務(wù)在完成之前被取消,則返回 true;否則返回 false。
          boolean isCancelled();
      
          //如果任務(wù)已經(jīng)完成,則返回 true;否則返回 false。
          boolean isDone();
      
          //獲取異步任務(wù)的計(jì)算結(jié)果。如果任務(wù)還未完成,調(diào)用該方法的線程會(huì)被阻塞,直到任務(wù)完成。
          V get() throws InterruptedException, ExecutionException;
      
          //獲取異步任務(wù)的計(jì)算結(jié)果,并且可以指定一個(gè)超時(shí)時(shí)間。
          //如果在指定的時(shí)間內(nèi)任務(wù)還未完成,調(diào)用該方法的線程會(huì)被阻塞,直到任務(wù)完成或者超時(shí)。
          V get(long timeout, TimeUnit unit)
              throws InterruptedException, ExecutionException, TimeoutException;
      }
      

      RunnableFuture

      public interface RunnableFuture<V> extends Runnable, Future<V> {
          // 很簡(jiǎn)單嘛,這個(gè)是來(lái)自Runnable的
          void run();
      }
      

      這個(gè)接口就相當(dāng)于組合了Runnable和Future,能夠獲取到返回值了。

      FutureTask<V> 既然要把它當(dāng)做參數(shù)傳進(jìn)Thread的構(gòu)造函數(shù),那么想必它肯定是實(shí)現(xiàn)了run方法的。

      public class FutureTask<V> implements RunnableFuture<V> {
          // 基本屬性
          private volatile int state;
          private static final int NEW          = 0;
          private static final int COMPLETING   = 1;
          private static final int NORMAL       = 2;
          private static final int EXCEPTIONAL  = 3;
          private static final int CANCELLED    = 4;
          private static final int INTERRUPTING = 5;
          private static final int INTERRUPTED  = 6;
          /** The underlying callable; nulled out after running */
          private Callable<V> callable;
          /** 結(jié)果 */
          private Object outcome; 
          /** The thread running the callable; CAS ed during run() */
          private volatile Thread runner;
          /** Treiber stack of waiting threads */
          private volatile WaitNode waiters;
          
          // 看它的構(gòu)造函數(shù)1
          public FutureTask(Callable<V> callable) {
              if (callable == null)
                  throw new NullPointerException();
              this.callable = callable; // 賦值callable========
              this.state = NEW; // ensure visibility of callable
          }
          // 構(gòu)造函數(shù)2 ==== 本質(zhì)還是把Runnable加了一層,給封裝成Callable了
          public FutureTask(Runnable runnable, V result) {
              this.callable = Executors.callable(runnable, result);
              this.state = NEW;       // ensure visibility of callable
          }
          /*
          Executors::callable(xx, xx)方法==========
          public static <T> Callable<T> callable(Runnable task, T result) {
              if (task == null)
                  throw new NullPointerException();
              return new RunnableAdapter<T>(task, result);
          }
          static final class RunnableAdapter<T> implements Callable<T> {
              final Runnable task;
              final T result;
              RunnableAdapter(Runnable task, T result) {
                  this.task = task;
                  this.result = result;
              }
              public T call() {
                  task.run(); // 調(diào)用Runnable的run()
                  return result;
              }
          }
          */
          
          // run()方法 --------------- 
          // new Thread(new FutureTask<>(new MyCall()))
          public void run() {
              if (state != NEW ||
                  !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                               null, Thread.currentThread()))
                  return;
              try {
                  Callable<V> c = callable;
                  if (c != null && state == NEW) {
                      V result;
                      boolean ran;
                      try {
                          //====調(diào)用callable.call()
                          result = c.call(); 
                          ran = true;
                      } catch (Throwable ex) {
                         .........
                      }
                      // 如果運(yùn)行OK了,設(shè)置結(jié)果!
                      if (ran) set(result);
                  }
              } finally {
                  .............
              }
          }
          
          // 設(shè)置結(jié)果outcome
          protected void set(V v) {
              // http://www.rzrgm.cn/jackjavacpp/p/18787832
              // 使用CAS --- 【見(jiàn)上一篇文章 java map & CAS & AQS】
              if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                  outcome = v; // 這里
                  UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
                  finishCompletion();
              }
          }
          
          // 比較核心的get方法================start
          public V get() throws InterruptedException, ExecutionException {
              int s = state;
              if (s <= COMPLETING) // 如果狀態(tài)不是完成
                  s = awaitDone(false, 0L); // 等待完成
              return report(s); // 返回結(jié)果
          }
          private int awaitDone(boolean timed, long nanos)
              throws InterruptedException {
        		// 1.計(jì)算超時(shí)截止時(shí)間
              final long deadline = timed ? System.nanoTime() + nanos : 0L;
              WaitNode q = null;
              boolean queued = false;
              for (;;) { // 2.自旋循環(huán)等待任務(wù)完成
                  // 2.1如果該線程中斷了
                  if (Thread.interrupted()) {
                      removeWaiter(q);// 從等待隊(duì)列中移除當(dāng)前節(jié)點(diǎn)
                      throw new InterruptedException();
                  }
                  // 2.2檢查狀態(tài)
                  int s = state;
                  // 任務(wù)已終態(tài)(NORMAL, EXCEPTIONAL, CANCELLED)
                  if (s > COMPLETING) {
                      if (q != null)
                          q.thread = null;
                      return s;// 返回最終狀態(tài)
                  }
                  // 2.3若任務(wù)狀態(tài)等于 COMPLETING,表明任務(wù)正在完成,
                  // 此時(shí)調(diào)用 Thread.yield() 方法讓當(dāng)前線程讓出 CPU 時(shí)間片,等待任務(wù)完成。
                  else if (s == COMPLETING) // cannot time out yet
                      Thread.yield();
                  else if (q == null)
                      q = new WaitNode();
                  else if (!queued) //將節(jié)點(diǎn)加入等待隊(duì)列
                      queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                           q.next = waiters, q);
                  else if (timed) { // 2.4如果是有時(shí)限的get()
                      nanos = deadline - System.nanoTime();
                      if (nanos <= 0L) { 
                          removeWaiter(q);
                          return state; // 返回狀態(tài)
                      }
                      LockSupport.parkNanos(this, nanos);
                  }
                  else //若沒(méi)有設(shè)置超時(shí)時(shí)間,就調(diào)用 LockSupport.park 方法讓當(dāng)前線程無(wú)限期阻塞,直到被喚醒。
                      LockSupport.park(this);
              }
          }
          private V report(int s) throws ExecutionException {
              Object x = outcome;
              if (s == NORMAL)
                  return (V)x; // 返回outcome
             ......
          }
          //==================================end
      }
      

      awaitDone 方法的核心功能是讓當(dāng)前線程等待異步任務(wù)完成,它會(huì)持續(xù)檢查任務(wù)的狀態(tài),根據(jù)不同的狀態(tài)采取相應(yīng)的處理措施,同時(shí)支持設(shè)置超時(shí)時(shí)間。在等待過(guò)程中,若線程被中斷,會(huì)拋出 InterruptedException 異常。

      通過(guò)上面的分析,Callable這種方式實(shí)際上本質(zhì)還是Runnable嚯。使用FutureTask將Future和Runnable結(jié)合起來(lái),功能更加豐富。

      .線程池ThreadPoolExecutor
      • 線程池創(chuàng)建線程

      如下使用方式。

      public class PoolMain {
          public static void main(String[] args) {
              // 創(chuàng)建一個(gè)線程池
              ExecutorService pool = Executors.newFixedThreadPool(1);
              long start = System.currentTimeMillis();
              // execute=============
              pool.execute(() -> {
                  try {
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                      throw new RuntimeException(e);
                  }
                  System.out.println("execute pool創(chuàng)建啟動(dòng)線程!");
              });
              // submit==============
              Future<Integer> future = pool.submit(() -> {
                  try {
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                      throw new RuntimeException(e);
                  }
                  System.out.println("submit pool創(chuàng)建啟動(dòng)線程!");
                  return 100;
              });
              try {
                  System.out.println(future.get());
              } catch (InterruptedException | ExecutionException e) {
                  throw new RuntimeException(e);
              }
              System.out.println("main線程執(zhí)行時(shí)間:" + (System.currentTimeMillis() - start));
              pool.shutdown();
          }
      }
      

      從上面的例子可以看出,大致有ExecutorServiceExecutors, newFixedThreadPool()方法本質(zhì)是 new ThreadPoolExecutor(),故還有一個(gè)ThreadPoolExecutor類(lèi)。

      接下來(lái)梳理一下這些類(lèi)背后的關(guān)系。【通過(guò)idea得到下面的關(guān)系圖】此外,Executors只是一個(gè)工具類(lèi)。

      Executor是頂級(jí)接口

      public interface Executor {
      	// 只定義了一個(gè)方法
          void execute(Runnable command);
      }
      

      ExecutorService:是一個(gè)比Executor使用更廣泛的子類(lèi)接口,其提供了生命周期管理的方法,以及可跟蹤一個(gè)或多個(gè)異步任務(wù)執(zhí)行狀況返回Future的方法

      public interface ExecutorService extends Executor {
          void shutdown();
          List<Runnable> shutdownNow();
          <T> Future<T> submit(Callable<T> task);
          <T> Future<T> submit(Runnable task, T result);
          Future<?> submit(Runnable task);
          //....
          <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
              throws InterruptedException;
          <T> T invokeAny(Collection<? extends Callable<T>> tasks)
              throws InterruptedException, ExecutionException;
      }
      

      AbstractExecutorServiceExecutorService執(zhí)行方法的默認(rèn)實(shí)現(xiàn),發(fā)現(xiàn)下面的submit()底層實(shí)際執(zhí)行的是execute(ftask)方法【Executor接口的execute()方法,在這個(gè)抽象類(lèi)里面沒(méi)有具體實(shí)現(xiàn),到具體子類(lèi)ThreadPoolExecutor在可以看到】

      public abstract class AbstractExecutorService implements ExecutorService {
          // 這里重點(diǎn)只看一下submit方法的默認(rèn)實(shí)現(xiàn)
          // 優(yōu)點(diǎn)1: 可以有Future返回值
      	public Future<?> submit(Runnable task) {
              if (task == null) throw new NullPointerException();
              RunnableFuture<Void> ftask = newTaskFor(task, null);
              execute(ftask);
              return ftask;
          }
          public <T> Future<T> submit(Runnable task, T result) {
              if (task == null) throw new NullPointerException();
              RunnableFuture<T> ftask = newTaskFor(task, result);
              execute(ftask);
              return ftask;
          }
          // 優(yōu)點(diǎn)2: 支持Callable參數(shù)
          public <T> Future<T> submit(Callable<T> task) { 
              if (task == null) throw new NullPointerException();
              RunnableFuture<T> ftask = newTaskFor(task);
              execute(ftask);
              return ftask;
          }
      }
      

      ThreadPoolExecutor:線程池,可以通過(guò)調(diào)用Executors靜態(tài)工廠方法來(lái)創(chuàng)建線程池并返回一個(gè)ExecutorService對(duì)象

      public class ThreadPoolExecutor extends AbstractExecutorService {
           /**
           * 七大參數(shù)!!!!======
           */
          public ThreadPoolExecutor(int corePoolSize,//線程池的核心線程數(shù)量
                                    int maximumPoolSize,//線程池的最大線程數(shù)
                                    long keepAliveTime,//當(dāng)線程數(shù)大于核心線程數(shù)時(shí),多余的空閑線程存活的最長(zhǎng)時(shí)間
                                    TimeUnit unit,//時(shí)間單位
                                    BlockingQueue<Runnable> workQueue,//任務(wù)隊(duì)列,用來(lái)儲(chǔ)存等待執(zhí)行任務(wù)的隊(duì)列
                                    ThreadFactory threadFactory,//線程工廠,用來(lái)創(chuàng)建線程,一般默認(rèn)即可
                                    RejectedExecutionHandler handler//拒絕策略,當(dāng)提交的任務(wù)過(guò)多而不能及時(shí)處理時(shí),定制策略來(lái)處理任務(wù)
                                     ) {
              if (corePoolSize < 0 || maximumPoolSize <= 0 ||
                  maximumPoolSize < corePoolSize || keepAliveTime < 0)
                  throw new IllegalArgumentException();
              if (workQueue == null || threadFactory == null || handler == null)
                  throw new NullPointerException();
              this.corePoolSize = corePoolSize;
              this.maximumPoolSize = maximumPoolSize;
              this.workQueue = workQueue;
              this.keepAliveTime = unit.toNanos(keepAliveTime);
              this.threadFactory = threadFactory;
              this.handler = handler;
          }
      }
      

      回到這一小節(jié)最開(kāi)始的時(shí)候,例子中的線程池有兩種提交并運(yùn)行線程的方式executesubmit兩個(gè)方法。現(xiàn)在來(lái)看一下ThreadPoolExecutor中的execute()方法是什么樣子的。submit()我們已經(jīng)知道了是在AbstractExecutorService中有默認(rèn)實(shí)現(xiàn)的。

      // ThreadPoolExecutor::execute(Runnable command)
      public void execute(Runnable command) {
          if (command == null)
              throw new NullPointerException();
          int c = ctl.get();
          // 1.若當(dāng)前工作線程數(shù)小于核心線程數(shù)(corePoolSize),嘗試創(chuàng)建新的核心線程
          // 這里是用的位運(yùn)算的,我沒(méi)有深究
          if (workerCountOf(c) < corePoolSize) {
              if (addWorker(command, true)) // 
                  return;
              // 創(chuàng)建新線程失敗,重新獲取ctl
              c = ctl.get();
          }
          // 2.任務(wù)入隊(duì)
          // 線程池處于運(yùn)行狀態(tài)(isRunning(c))
          // 且任務(wù)成功加入阻塞隊(duì)列(workQueue.offer(command))
          if (isRunning(c) && workQueue.offer(command)) {
              int recheck = ctl.get();
              // 2.2 雙重檢查
              /*
              2.2.1再次檢查線程池狀態(tài)(可能在此期間線程池被關(guān)閉)。
              2.2.2若線程池已關(guān)閉,嘗試從隊(duì)列中移除任務(wù),移除成功則拒絕任務(wù)(reject(command))。
              2.2.3若線程池仍運(yùn)行但無(wú)活躍線程(workerCountOf(recheck) == 0),
              添加一個(gè)非核心線程(addWorker(null, false)),該線程會(huì)從隊(duì)列中拉取任務(wù)執(zhí)行。
              */
              if (! isRunning(recheck) && remove(command))
                  reject(command);
              else if (workerCountOf(recheck) == 0)
                  addWorker(null, false);
          }
          // 3.若任務(wù)無(wú)法入隊(duì)(隊(duì)列已滿),嘗試創(chuàng)建非核心線程(addWorker(command, false))
          else if (!addWorker(command, false))
              //若創(chuàng)建失敗(線程數(shù)已達(dá)maximumPoolSize或線程池已關(guān)閉),
              //執(zhí)行拒絕策略(reject(command))
              reject(command);
      
      }
      /*
      execute(command)
      │
      ├─ 檢查command非空
      │
      ├─ 當(dāng)前線程數(shù) < corePoolSize?
      │   ├─ 是 → 創(chuàng)建核心線程 → 成功則返回
      │   └─ 否 → 進(jìn)入下一步
      │
      ├─ 線程池是否RUNNING且任務(wù)入隊(duì)成功?
      │   ├─ 是 → 雙重檢查狀態(tài)
      │   │   ├─ 線程池已關(guān)閉 → 移除任務(wù)并拒絕
      │   │   └─ 線程池仍運(yùn)行且無(wú)活躍線程 → 創(chuàng)建非核心線程
      │   │
      │   └─ 否 → 嘗試創(chuàng)建非核心線程
      │       ├─ 成功 → 處理任務(wù)
      │       └─ 失敗 → 拒絕任務(wù)
      */
      
      為什么需要二次檢查線程池狀態(tài)?
      
      - 任務(wù)入隊(duì)后,其他線程可能關(guān)閉了線程池(如調(diào)用shutdown())。
      - 處理:
        - 若線程池已關(guān)閉,需移除已入隊(duì)任務(wù)并拒絕。
        - 若線程池仍運(yùn)行但無(wú)活躍線程(如核心線程數(shù)為0且任務(wù)在隊(duì)列中),需創(chuàng)建新線程處理隊(duì)列任務(wù)。
      

      場(chǎng)景1:核心線程數(shù)未滿

      • 線程池處于 RUNNING,當(dāng)前線程數(shù) 2(corePoolSize=5)。
      • addWorker(task, true) 成功創(chuàng)建核心線程并執(zhí)行任務(wù)。

      場(chǎng)景2:隊(duì)列已滿,創(chuàng)建臨時(shí)線程

      • 核心線程滿載,隊(duì)列已滿,線程數(shù)未達(dá) maximumPoolSize
      • addWorker(task, false) 創(chuàng)建臨時(shí)線程處理任務(wù)。

      場(chǎng)景3:SHUTDOWN 狀態(tài)處理剩余任務(wù)

      • 線程池調(diào)用 shutdown(),狀態(tài)變?yōu)?SHUTDOWN
      • 已有任務(wù)在隊(duì)列中,addWorker(null, true) 創(chuàng)建線程處理隊(duì)列任務(wù)。

      ThreadPoolExecutor設(shè)計(jì)思想:

      • 核心線程優(yōu)先:優(yōu)先使用核心線程處理任務(wù)。
      • 隊(duì)列緩沖:核心線程滿載后,任務(wù)入隊(duì)等待。
      • 非核心線程應(yīng)急:隊(duì)列滿后,創(chuàng)建臨時(shí)線程處理任務(wù)(不超過(guò)maximumPoolSize

      addWorker 創(chuàng)建工作線程

      addWorker 方法通過(guò)精細(xì)的狀態(tài)檢查和并發(fā)控制,確保線程池在動(dòng)態(tài)擴(kuò)縮容時(shí)保持線程安全。【方便理解,可以把這個(gè)方法看作是創(chuàng)建線程】其核心在于:

      • 雙重循環(huán):外層處理狀態(tài)變化,內(nèi)層處理線程數(shù)修改。
      • 鎖與原子操作結(jié)合:保證 workers 集合和 workerCount 的一致性。
      • 異常回滾機(jī)制:確保資源不會(huì)泄漏(如線程數(shù)虛增或 Worker 未清理)。

      firstTask為我們最開(kāi)始寫(xiě)的Runnable。【記一個(gè)代號(hào)叫做 “我的任務(wù)” 】

      // ======== firstTask
      pool.execute(() -> {
         try {
             Thread.sleep(1000);
         } catch (InterruptedException e) {
             throw new RuntimeException(e);
         }
         System.out.println("execute pool創(chuàng)建啟動(dòng)線程!");
      });
      
      // addWorker(runnable, core)
      //標(biāo)記是否以核心線程數(shù)(corePoolSize)為上限創(chuàng)建線程。
      //若為 false,則使用最大線程數(shù)(maximumPoolSize)
      private boolean addWorker(Runnable firstTask, boolean core) {
          retry:
          for (;;) {
              ........
              int rs = runStateOf(c);// 獲取線程池狀態(tài)
              // 檢查是否允許添加Worker
              if (rs >= SHUTDOWN &&
                  !(rs == SHUTDOWN &&
                     firstTask == null &&
                     !workQueue.isEmpty()))
                  return false;
      
              for (;;) {
                  .........
                  //CAS 增加線程數(shù)
                  if (compareAndIncrementWorkerCount(c))
                      break retry;// 成功增加,跳出循環(huán)
                  c = ctl.get();  // Re-read ctl
                  if (runStateOf(c) != rs)
                      continue retry;
              }
          }
          ...........
          try {
              // 把“我的任務(wù)”傳進(jìn)去了
              w = new Worker(firstTask); // 創(chuàng)建的一個(gè)Worker
              final Thread t = w.thread;
              if (t != null) {
                  final ReentrantLock mainLock = this.mainLock;
                  // 加鎖保護(hù) workers 集合
                  mainLock.lock();
                  try {
                      // 再次檢查狀態(tài)(防止在加鎖前狀態(tài)變化)
                      int rs = runStateOf(ctl.get());
                      if (rs < SHUTDOWN ||
                          (rs == SHUTDOWN && firstTask == null)) {
                          if (t.isAlive())
                             //======拋出異常....
                          workers.add(w); // 將 Worker 加入集合
                          int s = workers.size();
                          if (s > largestPoolSize)
                              largestPoolSize = s;
                          workerAdded = true;
                      }
                  } finally {
                      mainLock.unlock();
                  }
                  if (workerAdded) {
                      // 【W(wǎng)orker類(lèi)里面的thread】
                      // 啟動(dòng)線程=========重點(diǎn)【見(jiàn)下面的分析】
                      t.start();
                      workerStarted = true;
                  }
              }
          } finally {
              if (! workerStarted)
                  addWorkerFailed(w);
          }
          return workerStarted;
      }
      

      Worker是ThreadPoolExecutor的內(nèi)部類(lèi),可以看出是一個(gè)Runnable,那么肯定重寫(xiě)了run()方法

      private final class Worker
              extends AbstractQueuedSynchronizer implements Runnable
      {
          final Thread thread;
          Runnable firstTask; //"我的任務(wù)"到這里來(lái)了
          // 構(gòu)造函數(shù)
          Worker(Runnable firstTask) {
              setState(-1); // inhibit interrupts until runWorker
              this.firstTask = firstTask;
              // 利用線程工廠創(chuàng)建了一個(gè)線程
              /*
              如果final Thread t = w.thread;
              t.start();啟動(dòng)的話,執(zhí)行的是這個(gè)內(nèi)部類(lèi)的run()
              */
              this.thread = getThreadFactory().newThread(this);
          }
          // run就這一行
          public void run() {runWorker(this);}
          //到這里了
          final void runWorker(Worker w) {
              ..
              //"我的任務(wù)"
              Runnable task = w.firstTask;
              ...
              try {
                //getTask()從等待隊(duì)列里面取出Runnable
                while (task != null || (task = getTask()) != null) {
                    ....
                    task.run();//==========
                }
              }......
              ..
              //// 無(wú)任務(wù)時(shí)回收線程
               processWorkerExit(w, completedAbruptly);
              
          }
      }
      

      ThreadPoolExecutor的靜態(tài)內(nèi)部類(lèi) :: jdk自帶的四種拒絕策略

      public interface RejectedExecutionHandler {
          void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
      }
      // 1.直接拋出異常
      public static class AbortPolicy implements RejectedExecutionHandler {
          .........
          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
              throw new RejectedExecutionException("Task " + r.toString() +
                                                   " rejected from " +
                                                   e.toString());
          }
      }
      // 2.直接丟棄
      public static class DiscardPolicy implements RejectedExecutionHandler {
          .....
              public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
          }
      }
      // 3.丟棄等待隊(duì)列的第一個(gè)
      public static class DiscardOldestPolicy implements RejectedExecutionHandler {
          ........
              public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      		//如果線程池未關(guān)閉,就彈出隊(duì)列頭部的元素,然后嘗試執(zhí)行
              if (!e.isShutdown()) {
                  e.getQueue().poll();
                  e.execute(r);
              }
          }
      }
      // 4.調(diào)用者運(yùn)行,直接執(zhí)行run()方法里面的邏輯。
      // 只要線程池沒(méi)有關(guān)閉,就由提交任務(wù)的當(dāng)前線程處理
      public static class CallerRunsPolicy implements RejectedExecutionHandler {
          ......
              public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
              if (!e.isShutdown()) {
                  r.run();
              }
          }
      }
      

      總結(jié)一下,在了解到了ThreadPoolExecutor的一些類(lèi)間關(guān)系、以及一些基本流程、屬性之后。接下來(lái)我們來(lái)梳理一遍,線程池的運(yùn)行方式。

      1. 創(chuàng)建線程池(七大參數(shù)、四大拒絕策略)

      2. 任務(wù)提交

      //2.1
      executor.execute(() -> {
          // 任務(wù)邏輯
      });
      

      2.2任務(wù)處理決策鏈:::

      2.2.1嘗試創(chuàng)建核心線程:當(dāng)前工作線程數(shù) < corePoolSize,直接創(chuàng)建新核心線程執(zhí)行任務(wù)

      if (workerCount < corePoolSize) {
          addWorker(command, true); // true表示檢查corePoolSize
          return;
      }
      

      2.2.2任務(wù)入隊(duì): 若核心線程已滿,任務(wù)嘗試加入工作隊(duì)列。

      if (workQueue.offer(command)) {
          // 雙重檢查線程池狀態(tài)
          if (線程池已關(guān)閉) 移除任務(wù)并拒絕;
          else if (當(dāng)前無(wú)活躍線程) 創(chuàng)建非核心線程處理隊(duì)列任務(wù);
      }
      

      2.2.3嘗試創(chuàng)建非核心線程: 若隊(duì)列已滿且線程數(shù) < maximumPoolSize,創(chuàng)建非核心線程。

      else if (!addWorker(command, false)) { // false表示檢查maximumPoolSize
          reject(command); // 觸發(fā)拒絕策略
      }
      

      2.2.4拒絕任務(wù)

      1. 工作線程執(zhí)行任務(wù)

      3.1Worker初始化:每個(gè)Worker綁定一個(gè)線程和初始任務(wù)(firstTask)。

      Worker w = new Worker(firstTask);
      final Thread t = w.thread;
      t.start(); // 啟動(dòng)線程執(zhí)行runWorker()
      

      3.2任務(wù)執(zhí)行循環(huán)(runWorker方法)

      while (task != null || (task = getTask()) != null) {
          try {
              task.run(); // 執(zhí)行任務(wù)
          } finally {
              task = null; // 清理任務(wù)引用
          }
      }
      

      3.3從隊(duì)列獲取任務(wù)(getTask方法)

      • 阻塞模式:若為核心線程或允許核心線程超時(shí),調(diào)用workQueue.take()永久阻塞。
      • 超時(shí)模式:若非核心線程,調(diào)用workQueue.poll(keepAliveTime)超時(shí)等待。
      Runnable r = timed ? 
          workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
          workQueue.take();
      
      1. 線程回收與資源釋放
      private void processWorkerExit(Worker w, boolean completedAbruptly) {
          if (completedAbruptly) // 異常終止時(shí)補(bǔ)償workerCount
              decrementWorkerCount();
          mainLock.lock();
          try {
              workers.remove(w); // 從集合中移除Worker
              if (線程池仍在運(yùn)行 && 隊(duì)列非空) 
                  addWorker(null, false); // 補(bǔ)充線程處理隊(duì)列任務(wù)
          } finally {
              mainLock.unlock();
          }
      }
      

      線程池本質(zhì)也是Runnable!

      一張好圖:【來(lái)自:【線程池工作原理https://blog.csdn.net/fighting_yu/article/details/89473175】

      3.探索”鎖“

      上面探索了線程以及線程池的創(chuàng)建,發(fā)現(xiàn)其源碼中存在這種代碼;

      //1.Thread的start方法
      public synchronized void start()
      
      //2.線程池部分源碼addWorker()方法
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      
      //3.
      LockSupport.park(this);
      

      這些是什么呢?這就是鎖。。

      確保線程安全最常見(jiàn)的做法是利用鎖機(jī)制(Locksychronized)來(lái)對(duì)共享數(shù)據(jù)做互斥同步,這樣在同一個(gè)時(shí)刻,只有一個(gè)線程可以執(zhí)行某個(gè)方法或者某個(gè)代碼塊,那么操作必然是原子性的,線程安全的。

      ① synchronized

      synchronized 是 Java 中最基本的同步機(jī)制,它可以修飾方法或代碼塊,確保同一時(shí)刻只有一個(gè)線程可以執(zhí)行被修飾的代碼。

      public class SynchronizedTest {
          public static void main(String[] args) {
              SynchronizedTest lock1 = new SynchronizedTest();
              new Thread(lock1::test).start();
              new Thread(lock1::test2).start();
              new Thread(lock1::testStatic).start();
              new Thread(lock1::testFs).start();
          }
          public void testStatic() {
              // 鎖的是Class對(duì)象
              synchronized (SynchronizedTest.class){
                  try {
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                      throw new RuntimeException(e);
                  }
                  System.out.println("testStatic()");
              }
          }
      
          // 鎖的是一個(gè)實(shí)例對(duì)象
          public void test(){
              synchronized (this){
                  try {
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                      throw new RuntimeException(e);
                  }
                  System.out.println("test()");
              }
          }
      
          public synchronized void test2(){
              try {
                  Thread.sleep(1000);
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
              System.out.println("test2()");
          }
      
          public void testFs(){
              try {
                  Thread.sleep(1000);
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
              System.out.println("testFs()");
          }
      }
      

      上面只有test() 和 test2() 是互斥的。也就是1秒過(guò)后,testStatic()、testFs()、和 【test()、test1() 其中之一】一起輸出打印。

      修飾代碼塊、修飾方法:鎖的是該對(duì)象;

      修飾靜態(tài)成員:鎖的是該類(lèi)的Class對(duì)象;這種方式可以確保對(duì)靜態(tài)變量的訪問(wèn)是線程安全的

      還可以鎖任意對(duì)象。

      其實(shí)主要弄清楚各自鎖的是什么對(duì)象就行了,看是否需要的是一個(gè)鎖,就可以判斷是否互斥了;

      //鎖的是Class對(duì)象
      public static synchronized void testStatic1() {
          try {
              Thread.sleep(1000);
          } catch (InterruptedException e) {
              throw new RuntimeException(e);
          }
          System.out.println("testStaticMethod()");
      }
      public void testStatic() {
          synchronized (SynchronizedTest.class){
              try {
                  Thread.sleep(1000);
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
              System.out.println("testStatic()");
          }
      }
      public synchronized void test2(){
          try {
              Thread.sleep(1000);
          } catch (InterruptedException e) {
              throw new RuntimeException(e);
          }
          System.out.println("test2()");
      }
      

      如上述代碼示例,testStatic1和testStatic需要持有的對(duì)象是同一個(gè),故這二者會(huì)產(chǎn)生互斥,test2需要持有的是該類(lèi)的一個(gè)實(shí)例對(duì)象,故不會(huì)與這二者產(chǎn)生互斥。

      需要注意的是: synchronized 并不屬于方法定義的一部分,故synchronized 關(guān)鍵字不能被繼承。如果在父類(lèi)中的某個(gè)方 法使用了 synchronized 關(guān)鍵字,而在子類(lèi)中覆蓋了這個(gè)方法,在子類(lèi)中的這 個(gè)方法默認(rèn)情況下并不是同步的,而必須顯式地在子類(lèi)的這個(gè)方法中加上 synchronized 關(guān)鍵字才可以。當(dāng)然,還可以在子類(lèi)方法中調(diào)用父類(lèi)中相應(yīng)的方 法,這樣雖然子類(lèi)中的方法不是同步的,但子類(lèi)調(diào)用了父類(lèi)的同步方法,因此, 子類(lèi)的方法也就相當(dāng)于同步了。
      來(lái)看看如下示例:

      public class Father {
          public synchronized void method1(){
              try {
                  Thread.sleep(1000);
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
              System.out.println("Father method1");
          }
      }
      //
      public class Son extends Father{
          public void syncSon() {
              super.method1(); // 調(diào)用的父類(lèi)的同步方法
          }
          public void syncSon1() {
              super.method1();
          }
          public void method1() { // 重寫(xiě)了,但是synchronized并不會(huì)繼承過(guò)來(lái)
              try {
                  Thread.sleep(1000);
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
              System.out.println("Son method1");
          }
      
          public void sonHaha() { method1(); }
          public void sonHehe() { method1(); }
      
          public static void main(String[] args) {
              Son son = new Son();
              // new Thread(son::syncSon).start();
              // new Thread(son::syncSon1).start(); // 會(huì)互斥
              new Thread(son::sonHaha).start();
              new Thread(son::sonHehe).start(); // 不會(huì)
          }
      }
      

      synchronized【隱式鎖】的底層原理涉及到 Java 對(duì)象頭(Object Header)和 Monitor(監(jiān)視器)兩個(gè)關(guān)鍵概念。

      每個(gè) Java 對(duì)象在內(nèi)存中分為三部分:

      1. 對(duì)象頭(Header)
        • Mark Word(標(biāo)記字段):存儲(chǔ)哈希碼、GC 分代年齡、鎖狀態(tài)等。
        • Klass Pointer(類(lèi)型指針):指向類(lèi)的元數(shù)據(jù)。
      2. 實(shí)例數(shù)據(jù)(Instance Data)
      3. 對(duì)齊填充(Padding)

      Java 對(duì)象頭:在 Java 虛擬機(jī)中,每個(gè)對(duì)象都有一個(gè)對(duì)象頭,用于存儲(chǔ)對(duì)象的元數(shù)據(jù)信息,包括對(duì)象的哈希碼、GC 相關(guān)信息、鎖狀態(tài)等。對(duì)象頭通常包含一個(gè)標(biāo)記字段(Mark Word),用于標(biāo)識(shí)對(duì)象的鎖狀態(tài)

      Monitor(監(jiān)視器):Monitor 是一種同步機(jī)制,負(fù)責(zé)管理對(duì)象的鎖。每個(gè)對(duì)象都與一個(gè) Monitor 相關(guān)聯(lián)。當(dāng)一個(gè)線程嘗試進(jìn)入一個(gè)被synchronized修飾的代碼塊或方法時(shí),它會(huì)嘗試獲取對(duì)象的 Monitor。如果 Monitor 處于無(wú)鎖狀態(tài),則當(dāng)前線程會(huì)嘗試將其鎖定;如果 Monitor 已經(jīng)被其他線程鎖定,則當(dāng)前線程會(huì)進(jìn)入阻塞狀態(tài),直到持有鎖的線程釋放鎖。

      // C++ 實(shí)現(xiàn)(HotSpot 源碼)
      class ObjectMonitor {
          void*   _header;       // Mark Word
          void*   _owner;        // 持有鎖的線程
          volatile intptr_t  _count;     // 重入次數(shù)
          ObjectWaiter* _WaitSet;        // 等待隊(duì)列(調(diào)用 wait() 的線程)
          ObjectWaiter* _EntryList;      // 阻塞隊(duì)列(競(jìng)爭(zhēng)鎖失敗的線程)
          // ...
      };
      

      查看本小節(jié)開(kāi)頭示例的test()方法的字節(jié)碼:

      synchronized 同步語(yǔ)句塊的實(shí)現(xiàn)使用的是 monitorentermonitorexit 指令,當(dāng)執(zhí)行 monitorenter 指令時(shí),線程試圖獲取鎖也就是獲取 對(duì)象監(jiān)視器 monitor 的持有權(quán)。第一個(gè)monitorexit正常退出同步塊, 第二個(gè)是異常退出同步塊。

      synchronized優(yōu)化:

      鎖升級(jí)(JDK 6+)

      3.0 無(wú)鎖

      • 無(wú)鎖:當(dāng)?shù)谝粋€(gè)線程第一次訪問(wèn)一個(gè)對(duì)象的同步塊時(shí),JVM會(huì)在對(duì)象頭中設(shè)置該線程的ID,并將對(duì)象頭的狀態(tài)位設(shè)置為“偏向鎖”。這個(gè)過(guò)程稱(chēng)為“偏向”,表示對(duì)象當(dāng)前偏向于第一個(gè)訪問(wèn)它的線程。

      3.1 偏向鎖(Biased Locking)

      • 原理:第一個(gè)獲取鎖的線程將線程 ID 寫(xiě)入 Mark Word,后續(xù)無(wú)需 CAS。這樣如果該線程再來(lái)的時(shí)候,由于是已經(jīng)設(shè)置了鎖偏向該線程,故只需比對(duì)一下對(duì)象頭里面的Mark Word就行了。
      • 觸發(fā)條件:JVM 啟用了偏向鎖(默認(rèn)開(kāi)啟),且對(duì)象未處于鎖定狀態(tài)。
      • 撤銷(xiāo):當(dāng)其他線程嘗試競(jìng)爭(zhēng)時(shí),撤銷(xiāo)偏向鎖并升級(jí)為輕量級(jí)鎖。

      3.2 輕量級(jí)鎖(Lightweight Locking)

      • 加鎖流程
        1. 在當(dāng)前線程棧幀中創(chuàng)建 Lock Record
        2. 通過(guò) CAS 將 Mark Word 復(fù)制到 Lock Record,并嘗試將 Mark Word 指向 Lock Record。

      輕量級(jí)鎖在以下場(chǎng)景會(huì)升級(jí)為重量級(jí)鎖:

      1. 自旋失敗:競(jìng)爭(zhēng)線程自旋一定次數(shù)后仍未獲取鎖。
      2. 競(jìng)爭(zhēng)加劇:等待鎖的線程數(shù)超過(guò) JVM 自適應(yīng)的閾值

      3.3 重量級(jí)鎖(Heavyweight Locking)

      • 實(shí)現(xiàn):通過(guò)操作系統(tǒng)提供的互斥量(Mutex)和條件變量實(shí)現(xiàn),線程競(jìng)爭(zhēng)失敗后進(jìn)入阻塞狀態(tài)。
      • 性能問(wèn)題:涉及用戶態(tài)到內(nèi)核態(tài)的切換,開(kāi)銷(xiāo)較大。

      synchronized優(yōu)化:

      鎖會(huì)升級(jí),從低到高升級(jí),反著降級(jí)不可以:無(wú)鎖狀態(tài) -> 偏向鎖狀態(tài) -> 輕量級(jí)鎖狀態(tài) -> 重量級(jí)鎖狀態(tài)

      鎖類(lèi)型 實(shí)現(xiàn)機(jī)制 適用場(chǎng)景 性能開(kāi)銷(xiāo)
      偏向鎖 通過(guò) Mark Word 記錄線程 ID 單線程重復(fù)訪問(wèn)同步塊 最低
      輕量級(jí)鎖 CAS + 自旋(適應(yīng)性自旋) 低競(jìng)爭(zhēng)、短時(shí)間同步 中等
      重量級(jí)鎖 操作系統(tǒng)互斥量(Mutex) + Monitor 高競(jìng)爭(zhēng)、長(zhǎng)時(shí)間同步 最高

      【節(jié)選自 鎖升級(jí)】 :https://blog.csdn.net/weixin_45433817/article/details/132216383

      問(wèn):synchronized是公平鎖嗎?

      首先要知道公平鎖和非公平鎖的概念:

      • 公平鎖:公平鎖指的是多個(gè)線程按照申請(qǐng)鎖的順序來(lái)獲取鎖,先到先得。當(dāng)一個(gè)線程請(qǐng)求鎖時(shí),如果該鎖當(dāng)前處于可用狀態(tài),并且在該線程之前已經(jīng)有其他線程在等待該鎖,那么這個(gè)線程會(huì)被放入等待隊(duì)列的尾部,等待前面的線程依次獲取并釋放鎖后,它才能獲取鎖。
      • 非公平鎖:非公平鎖則不保證線程獲取鎖的順序與申請(qǐng)鎖的順序一致。當(dāng)一個(gè)線程請(qǐng)求鎖時(shí),即使有其他線程在等待該鎖,它也會(huì)先嘗試直接獲取鎖,如果獲取成功就可以直接執(zhí)行,而不用排隊(duì)等待。

      那么,synchronized 基于對(duì)象頭的 Mark Word 和監(jiān)視器(Monitor)實(shí)現(xiàn)。當(dāng)一個(gè)線程進(jìn)入同步塊時(shí),它會(huì)嘗試獲取對(duì)象的監(jiān)視器。如果監(jiān)視器處于空閑狀態(tài),該線程會(huì)直接獲取監(jiān)視器,而不會(huì)考慮是否有其他線程已經(jīng)在等待這個(gè)監(jiān)視器。例如,當(dāng)一個(gè)線程釋放了 synchronized 修飾的同步塊的鎖后,新到來(lái)的線程有很大機(jī)會(huì)直接獲取到鎖,而不是等待那些在等待隊(duì)列中的線程,這就體現(xiàn)了其非公平性。

      class SynchronizedNonFairExample {
          private static final Object lock = new Object();
          private static int counter = 0;
          public static void main(String[] args) {
              for (int i = 0; i < 5; i++) {
                  new Thread(() -> {
                      while (true) {
                          synchronized (lock) {
                              counter++;
                              System.out.println(Thread.currentThread().getName() + " 獲取到鎖,計(jì)數(shù): " + counter);
                              try {
                                  // 模擬執(zhí)行任務(wù)
                                  Thread.sleep(100);
                              } catch (InterruptedException e) {
                                  e.printStackTrace();
                              }
                          }
                      }
                  }, "Thread-" + i).start();
              }
          }
      }
      

      在上述代碼中,多個(gè)線程競(jìng)爭(zhēng) lock 對(duì)象的鎖。運(yùn)行代碼時(shí),你會(huì)發(fā)現(xiàn)線程獲取鎖的順序并不是按照線程啟動(dòng)的順序,這就說(shuō)明了 synchronized 是非公平鎖。

      ② Lock

      上一節(jié)的synchronized是jdk內(nèi)置的關(guān)鍵字,屬于隱式鎖、也叫內(nèi)置鎖。現(xiàn)在這一節(jié)來(lái)探索一下顯式鎖,其提供更細(xì)粒度的控制(如可中斷、超時(shí)、公平性),核心實(shí)現(xiàn)為 ReentrantLock

      public interface Lock {
          //獲取鎖。如果鎖不可用,則當(dāng)前線程將出于線程調(diào)度目的而被禁用,并在獲取鎖之前處于休眠狀態(tài)。
          void lock();
          void lockInterruptibly() throws InterruptedException;
          //如果鎖可用,則獲取鎖,并立即返回值為 true。如果鎖不可用,則此方法將立即返回值 false。
      	boolean tryLock();
          /*
          如果在給定的等待時(shí)間內(nèi)有空閑,并且當(dāng)前線程尚未中斷,則獲取該鎖。
          如果鎖可用,則此方法立即返回值 true。如果鎖不可用,則當(dāng)前線程將出于線程調(diào)度目的而被禁用,并處于休眠狀態(tài),
          直到發(fā)生以下三種情況之一:鎖由當(dāng)前線程獲取;或者其他線程中斷當(dāng)前線程,支持中斷鎖獲取;或指定的等待時(shí)間已用
          */
          boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
          //釋放鎖
      	void unlock();
          //返回綁定到此 Lock 實(shí)例的新 Condition 實(shí)例。在等待條件之前,鎖必須由當(dāng)前線程持有
      	Condition newCondition();
      }
      

      從上圖中,我們可以得知juc包下的幾個(gè)主要的實(shí)現(xiàn)類(lèi),綠色圈圈連接的是里面的內(nèi)部類(lèi)。

      1) ReentrantLock

      接下來(lái)從我們最熟知的ReentrantLock開(kāi)始看起吧,他的簡(jiǎn)單使用:

      public class LockTest {
          Lock lock = new ReentrantLock();
          int count = 0;
          public static void main(String[] args) throws InterruptedException {
              LockTest test = new LockTest();
              for (int i = 1; i <= 2; i++) {
                  new Thread(test::add).start();
              }
              Thread.sleep(2000);
              /*
              如果不加鎖的話,結(jié)果就不一定是兩萬(wàn)了
              */
              System.out.println(test.count);
          }
      
          public void add() {
              // 標(biāo)準(zhǔn)寫(xiě)法 try加鎖 finally釋放鎖
              try {
                  lock.lock();
                  for (int i = 0; i < 10000; i++) {
                      count++;
                  }
              } finally {
                  lock.unlock();
              }
          }
      }
      

      我們分析一下,首先是調(diào)用了其無(wú)參構(gòu)造函數(shù)創(chuàng)建了一個(gè)對(duì)象,里面是new了一個(gè)看名字是非公平同步標(biāo)記的對(duì)象,那他肯定會(huì)有公平的同步標(biāo)記。

      // 下面都是在ReentrantLock.java里面
      private final Sync sync;
      // 無(wú)參構(gòu)造
      public ReentrantLock() {
          sync = new NonfairSync();
      }
      // 有參構(gòu)造
      public ReentrantLock(boolean fair) {
          sync = fair ? new FairSync() : new NonfairSync();
      }
      // 然后調(diào)用lock.lock()實(shí)際是調(diào)用的sync.lock();
      public void lock() {
          sync.lock();
      }
      // 然后調(diào)用lock.unlock()實(shí)際是調(diào)用的sync.release(1);;
      public void unlock() {
          sync.release(1);
      }
      // 是ReentrantLock的靜態(tài)內(nèi)部類(lèi)
      static final class NonfairSync extends Sync {
          private static final long serialVersionUID = 7316153563782823691L;
      
          final void lock() {
              if (compareAndSetState(0, 1))
                  setExclusiveOwnerThread(Thread.currentThread());
              else
                  acquire(1);
          }
      
          protected final boolean tryAcquire(int acquires) {
              return nonfairTryAcquire(acquires);
          }
      }
      // 是ReentrantLock的靜態(tài)內(nèi)部類(lèi)
      static final class FairSync extends Sync {
          private static final long serialVersionUID = -3000897897090466540L;
      
          final void lock() {
              acquire(1);
          }
          protected final boolean tryAcquire(int acquires) {
              final Thread current = Thread.currentThread();
              int c = getState();
              if (c == 0) {
                  if (!hasQueuedPredecessors() &&
                      compareAndSetState(0, acquires)) {
                      setExclusiveOwnerThread(current);
                      return true;
                  }
              }
              else if (current == getExclusiveOwnerThread()) {
                  int nextc = c + acquires;
                  if (nextc < 0)
                      throw new Error("Maximum lock count exceeded");
                  setState(nextc);
                  return true;
              }
              return false;
          }
      }
      // Sync繼承了AbstractQueuedSynchronizer:【AQS】
      abstract static class Sync extends AbstractQueuedSynchronizer {
          .....
      }
      

      從上面額關(guān)系我們可以看出一切源頭就是AbstractQueuedSynchronizer,也就是我們熟悉的AQS。在這篇文章的“補(bǔ)充知識(shí)點(diǎn)”環(huán)節(jié)中,對(duì)AQS做了一個(gè)簡(jiǎn)單的介紹及分析。【AQS】:https://blog.csdn.net/okok__TXF/article/details/146455487

      博客園】:https://blog.csdn.net/okok__TXF/article/details/146455487

      它是是 Java 并發(fā)包 java.util.concurrent.locks 下的一個(gè)核心類(lèi),是構(gòu)建鎖和其他同步工具(如 ReentrantLockSemaphoreCountDownLatch 等)的基礎(chǔ)框架。

      里面定義了兩種資源共享模式:

      • 獨(dú)占模式(Exclusive):同一時(shí)刻只有一個(gè)線程能獲取資源,如 ReentrantLock

      在獨(dú)占模式的時(shí)候,tryAcquire(int):嘗試獲取資源,成功返回 true,失敗返回 falsetryRelease(int):嘗試釋放資源,成功返回 true,失敗返回 false

      • 共享模式(Share):多個(gè)線程可同時(shí)獲取資源,如 Semaphore(信號(hào)量)、CountDownLatch(倒計(jì)時(shí) latch)。

      在共享模式的時(shí)候,tryAcquireShared(int):嘗試獲取共享資源,負(fù)數(shù)表示失敗;0 表示成功但無(wú)剩余資源;正數(shù)表示成功且有剩余資源。tryReleaseShared(int):嘗試釋放共享資源,釋放后若允許喚醒后續(xù)等待節(jié)點(diǎn),返回 true,否則 false

      - 非公平鎖

      回到ReentrantLock中,我們以lock()方法舉例子:【lock是無(wú)參構(gòu)造的】非公平鎖

      //ReentrantLock.java
      public void lock() {
          sync.lock(); // ===========1
      }
      //內(nèi)部的抽象類(lèi)Sync
      abstract void lock();
      final boolean nonfairTryAcquire(int acquires) { // ===========7
          final Thread current = Thread.currentThread();
          int c = getState();
          if (c == 0) {
              if (compareAndSetState(0, acquires)) {
                  setExclusiveOwnerThread(current);
                  return true;
              }
          }
          else if (current == getExclusiveOwnerThread()) {
              int nextc = c + acquires;
              if (nextc < 0) // overflow
                  throw new Error("Maximum lock count exceeded");
              setState(nextc);
              return true;
          }
          return false;
      }
      
      //具體實(shí)現(xiàn)類(lèi)NonfairSync
      final void lock() { // ===========2
          if (compareAndSetState(0, 1))
              setExclusiveOwnerThread(Thread.currentThread());
          else
              acquire(1); // ===========3
      }
      protected final boolean tryAcquire(int acquires) { // ===========6
          return nonfairTryAcquire(acquires); // 這個(gè)是抽象類(lèi)Sync的
      }
      
      //AbstractQueuedSynchronizer.java --- acquire(1)
      public final void acquire(int arg) { // ===========4
          // 這里的tryAcquire是NonfairSync.java里面的
          if (!tryAcquire(arg) && // ===========5
              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
              selfInterrupt();
      }
      final boolean acquireQueued(final Node node, int arg) {
          boolean failed = true;
          try {
              boolean interrupted = false;
              for (;;) {
                  final Node p = node.predecessor();
                  if (p == head && tryAcquire(arg)) {
                      setHead(node);
                      p.next = null; // help GC
                      failed = false;
                      return interrupted;
                  }
                  if (shouldParkAfterFailedAcquire(p, node) &&
                      parkAndCheckInterrupt())
                      interrupted = true;
              }
          } finally {
              if (failed)
                  cancelAcquire(node);
          }
      }
      

      已經(jīng)按順序?qū)?234567標(biāo)注在了上面,模板方法的設(shè)計(jì)模式有時(shí)候真的會(huì)讓人暈頭轉(zhuǎn)向。。

      一句簡(jiǎn)簡(jiǎn)單單的lock.lock()方法做了什么呢?

      首先,會(huì)直接嘗試CAS獲取鎖【compareAndSetState(0, 1) 會(huì)通過(guò) CAS 操作嘗試將 AQS 中的 state 狀態(tài)從 0 改為 1】,如果成功的話成功則設(shè)置當(dāng)前線程為鎖持有者,否則進(jìn)入AQS的獲取流程;【在這里,當(dāng)線程調(diào)用 lock() 時(shí),會(huì)先通過(guò) CAS 操作嘗試將 AQSstate0 改為 1此時(shí)不會(huì)檢查等待隊(duì)列中是否有其他線程在排隊(duì),只要 CAS 成功,就直接獲取鎖,體現(xiàn)了 “插隊(duì)” 的特性。】

      其次,進(jìn)入aqs的acquire流程

      1.tryAcquire(arg)   2.addWaiter(Node.EXCLUSIVE)   3.acquireQueued(xxx)
      

      第一個(gè)方法tryAcquire再次嘗試獲取鎖(非公平鎖的 tryAcquirenonfairTryAcquire(acquires)),在Sync :: nonfairTryAcquire(int acquires)方法里面,得到aqs里面的state,如果是0,再次嘗試 CAS 搶占(體現(xiàn)非公平性,不檢查隊(duì)列);如果不是0,說(shuō)明被搶占了,判斷持有鎖的線程是不是當(dāng)前線程,如果是(體現(xiàn)可重入性),更新state,如果不是返回false。

      然后,tryAcquire失敗【返回false】,調(diào)用addWaiter(Node.EXCLUSIVE) 會(huì)將當(dāng)前線程包裝成一個(gè)獨(dú)占模式的 Node 節(jié)點(diǎn)加入 AQS 隊(duì)列。接著 acquireQueued 會(huì)使線程在隊(duì)列中自旋等待,不斷嘗試獲取鎖或被喚醒后嘗試獲取,直到成功。

      接下來(lái)分析一下lock.unlock()方法,這個(gè)就在代碼里面注釋解釋了

      //ReentrantLock.java
      public void unlock() {
          //調(diào)用其內(nèi)部同步器 sync 的 release 方法:
          sync.release(1); // ===========1
      }
      // 內(nèi)部抽象類(lèi)Sync
      protected final boolean tryRelease(int releases) { // ===========3
          int c = getState() - releases; // 減少同步狀態(tài)值(釋放一次鎖,`state` 減 1)
          // 檢查當(dāng)前線程是否為鎖的持有者,不是則拋異常
          if (Thread.currentThread() != getExclusiveOwnerThread())
              throw new IllegalMonitorStateException();
          boolean free = false;
          if (c == 0) {// 當(dāng) `state` 減為 0 時(shí),完全釋放鎖
              free = true;
              setExclusiveOwnerThread(null);// 清除獨(dú)占鎖的線程引用
          }
          setState(c);// 更新 `state` 值
          return free;// 返回是否完全釋放鎖
      }
      
      //AbstractQueuedSynchronizer.java --- acquire(1)
      public final boolean release(int arg) {
          //tryRelease 嘗試釋放鎖,由子類(lèi)實(shí)現(xiàn)具體邏輯
          if (tryRelease(arg)) { // ===========2
              Node h = head;// 獲取等待隊(duì)列頭節(jié)點(diǎn)
              if (h != null && h.waitStatus != 0)
                  unparkSuccessor(h);// 喚醒后繼節(jié)點(diǎn) ===========4
              return true;
          }
          return false;
      }
      //喚醒后繼節(jié)點(diǎn)
      private void unparkSuccessor(Node node) {  ===========5
          int ws = node.waitStatus;
          if (ws < 0) // 將頭節(jié)點(diǎn)的 `waitStatus` 設(shè)為 0(取消之前的狀態(tài))
              compareAndSetWaitStatus(node, ws, 0);
          Node s = node.next; // 找到頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)
          if (s == null || s.waitStatus > 0) { // 若后繼節(jié)點(diǎn)為空或已取消,從隊(duì)尾向前找第一個(gè)非取消節(jié)點(diǎn)
              s = null;
              for (Node t = tail; t != null && t != node; t = t.prev)
                  if (t.waitStatus <= 0)
                      s = t;
          }
          if (s != null) // 喚醒找到的節(jié)點(diǎn)對(duì)應(yīng)的線程
              LockSupport.unpark(s.thread);
      }
      

      在內(nèi)部抽象類(lèi)Sync中的tryRelease中:

      1. 減少 stateReentrantLock 支持重入,state 記錄鎖的重入次數(shù)。每次調(diào)用 unlock()state 減 1。
      2. 檢查線程所有權(quán):確保只有鎖的持有者才能釋放鎖,否則拋出 IllegalMonitorStateException
      3. 完全釋放鎖:當(dāng) state 減為 0 時(shí),將 setExclusiveOwnerThread(null),表示鎖已完全釋放,返回 true

      ReentrantLock支持重入:【其實(shí)從名字就可以看出來(lái)了 -- Reentrant(再進(jìn)去的、就是可重入嘛)】

      ReentrantLock lock = new ReentrantLock();
      lock.lock();  // state=1,線程持有鎖
      lock.lock();  // state=2(重入)
      lock.unlock(); // state=1(未完全釋放)
      lock.unlock(); // state=0,釋放鎖并喚醒等待線程
      
      - 公平鎖

      那么ReentrantLock的公平鎖是什么樣子的呢?其實(shí)大致步驟都差不多,主要是在FairSync.java

      protected final boolean tryAcquire(int acquires) {
          final Thread current = Thread.currentThread();
          int c = getState();
          if (c == 0) {
              //公平鎖會(huì)先檢查等待隊(duì)列是否有前驅(qū)節(jié)點(diǎn),若有則不能搶鎖
              if (!hasQueuedPredecessors() &&
                  compareAndSetState(0, acquires)) {
                  setExclusiveOwnerThread(current);
                  return true;
              }
          }
          else if (current == getExclusiveOwnerThread()) {
              int nextc = c + acquires;
              if (nextc < 0)
                  throw new Error("Maximum lock count exceeded");
              setState(nextc);
              return true;
          }
          return false; 
      }
      public final boolean hasQueuedPredecessors() {
          // thread is first in queue.
          Node t = tail; // Read fields in reverse initialization order
          Node h = head;
          Node s;
          return h != t &&
              ((s = h.next) == null || s.thread != Thread.currentThread());
      }
      

      公平鎖在 state == 0 時(shí),會(huì)先通過(guò) hasQueuedPredecessors 檢查等待隊(duì)列。若有其他線程在排隊(duì),則當(dāng)前線程不能搶占,必須入隊(duì)等待,保證 “先來(lái)先得”,體現(xiàn)了公平性。而非公平鎖跳過(guò)這一步,直接搶鎖,這就是非公平性的核心體現(xiàn)。

      2) 讀寫(xiě)鎖

      //ReadWriteLock 維護(hù)一對(duì)關(guān)聯(lián)的鎖,一個(gè)用于只讀作,一個(gè)用于寫(xiě)入。只要沒(méi)有寫(xiě)入器,多個(gè)讀取器線程就可以同時(shí)持有讀取鎖。
      //寫(xiě)鎖是獨(dú)占的。讀寫(xiě)鎖允許在訪問(wèn)共享數(shù)據(jù)時(shí)實(shí)現(xiàn)比互斥鎖允許的更高級(jí)別的并發(fā)。它利用了這樣一個(gè)事實(shí),
      //即雖然一次只有一個(gè)線程(寫(xiě)入線程)可以修改共享數(shù)據(jù),但在許多情況下,任意數(shù)量的線程都可以同時(shí)讀取數(shù)據(jù)(因此是讀取線程)。
      //從理論上講,與使用互斥鎖相比,使用讀寫(xiě)鎖允許的并發(fā)性增加將導(dǎo)致性能改進(jìn)。
      public interface ReadWriteLock {
          //返回用于讀取的鎖。
          Lock readLock();
      	//返回用于寫(xiě)入的鎖。
          Lock writeLock();
      }
      

      讀寫(xiě)鎖是否會(huì)比使用互斥鎖提高性能 取決于讀取數(shù)據(jù)的頻率與修改數(shù)據(jù)的頻率、讀取和寫(xiě)入作的持續(xù)時(shí)間以及數(shù)據(jù)的爭(zhēng)用 - 即嘗試同時(shí)讀取或?qū)懭霐?shù)據(jù)的線程數(shù)。例如,最初填充數(shù)據(jù),此后不經(jīng)常修改的集合;經(jīng)常搜索(例如某種目錄)是使用讀寫(xiě)鎖的理想候選者。但是,如果更新變得頻繁,則數(shù)據(jù)將花費(fèi)大部分時(shí)間進(jìn)行獨(dú)占鎖定,并發(fā)性幾乎沒(méi)有增加。只有分析和測(cè)量才能確定使用讀寫(xiě)鎖是否適合您的應(yīng)用程序。

      讀寫(xiě)鎖允許多個(gè)線程同時(shí)讀(沒(méi)有寫(xiě)入時(shí),多個(gè)線程允許同時(shí)讀(提高性能)),但只要有一個(gè)線程在寫(xiě),其他線程就必須等待(只允許一個(gè)線程寫(xiě)入(其他線程既不能寫(xiě)入也不能讀取))。也就是讀讀不沖突、讀寫(xiě)就要沖突了。

      - ReadWriteLock

      下面給出一個(gè)簡(jiǎn)單示例:ReadWriteLock

      public class ReadWriteLockTest2 {
          private static final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
          private static final Lock readLock = readWriteLock.readLock();
          private static final Lock writeLock = readWriteLock.writeLock();
          private static int[] a = new int[10];
          public static void main(String[] args) throws InterruptedException {
              // 1個(gè)線程寫(xiě)
              new Thread(ReadWriteLockTest2::write).start();
              for (int i = 0; i < 9; i++)  // 10個(gè)線程讀
                  new Thread(()-> System.out.println(get())).start();
              Thread.sleep(2000);
          }
          public static Object get() {
             readLock.lock();
              try {
                  Thread.sleep(100);
                  return a[1];
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              } finally {
                  readLock.unlock();
              }
          }
          public static void write() {
              writeLock.lock();
              try {
                  a[1]++;
                  System.out.println("寫(xiě)進(jìn)行~~~");
                  Thread.sleep(1000);
                  System.out.println("寫(xiě)ok~~~");
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              } finally {
                  writeLock.unlock();
              }
          }
      }
      

      寫(xiě)操作在執(zhí)行的時(shí)候,讀線程是會(huì)阻塞的。但是10個(gè)讀線程之間并沒(méi)有阻塞

      - StampedLock

      StampedLock對(duì)比 ReentrantReadWriteLock 有所增強(qiáng),在原先讀寫(xiě)鎖的基礎(chǔ)之上新增了樂(lè)觀讀的模式。該模式并不會(huì)加鎖,所以不會(huì)阻塞線程,會(huì)有更高的吞吐量和更高的性能。(樂(lè)觀鎖和悲觀鎖)

      StampedLock具有三種控制讀/寫(xiě)訪問(wèn)的模式:

      1、寫(xiě)入(Writing):方法writeLock可能阻塞等待獨(dú)占訪問(wèn),并返回一個(gè)戳,該戳可在方法unlockWrite中使用以釋放鎖。還提供了tryWriteLock的非定時(shí)和定時(shí)版本。當(dāng)鎖保持在寫(xiě)模式時(shí),不能獲得讀鎖,并且所有樂(lè)觀讀驗(yàn)證都將失敗。

      2、讀取(Reading):方法readLock可能會(huì)阻塞等待非獨(dú)占訪問(wèn),并返回一個(gè)戳,該戳可在方法unlockRead中使用以釋放鎖。還提供了tryReadLock的非定時(shí)和定時(shí)版本。

      3、樂(lè)觀讀取(Optimistic Reading):tryOptimisticRead方法返回一個(gè)非0的stamp,只有當(dāng)前同步狀態(tài)沒(méi)有被寫(xiě)模式所占有是才能獲取到。他是在獲取stamp值后對(duì)數(shù)據(jù)進(jìn)行讀取操作,最后驗(yàn)證該stamp值是否發(fā)生變化,如果發(fā)生變化則讀取無(wú)效,代表有數(shù)據(jù)寫(xiě)入。這種方式能夠降低競(jìng)爭(zhēng)和提高吞吐量。

      簡(jiǎn)單示例:

      public class StampedLockTest {
          private static final StampedLock stampedLock = new StampedLock();
          private static double x = 1.0;
          private static double y = 1.0;
          public static void main(String[] args) {
              // 1. 一個(gè)線程寫(xiě)
              new Thread(() -> addXY(1, 1)).start();
              // 2. 10個(gè)線程讀
              for (int i = 0; i < 10; i++) {
                  new Thread(() -> System.out.println(getSArea())).start();
              }
          }
          private static double getSArea() {
              // 樂(lè)觀讀
              long stamp = stampedLock.tryOptimisticRead();
              double s1 = x * y;
              // 驗(yàn)證一下
              if (!stampedLock.validate(stamp)) { // 驗(yàn)證失敗
                  stamp = stampedLock.readLock(); // 升級(jí)為讀鎖
                  try {
                      s1 = x * y;
                  } finally {
                      stampedLock.unlockRead(stamp);
                  }
              }
              return s1;
          }
          private static void addXY(double a, double b) {
              long stamp = stampedLock.writeLock();
              try {
                  System.out.println("寫(xiě)進(jìn)行~~");
                  x += a;
                  y += b;
                  Thread.sleep(1000);
                  System.out.println("寫(xiě)ok~~");
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              } finally {
                  stampedLock.unlockWrite(stamp);
              }
          }
      }
      
      

      寫(xiě)操作:writeLock() 返回一個(gè) stamp(時(shí)間戳),表示獲取寫(xiě)鎖成功。寫(xiě)鎖是獨(dú)占的,獲取時(shí)會(huì)阻塞所有讀鎖和其他寫(xiě)鎖(除了樂(lè)觀讀)。通過(guò) unlockWrite(stamp) 釋放寫(xiě)鎖,stamp 必須與獲取時(shí)的一致,否則拋出異常。

      樂(lè)觀讀: tryOptimisticRead():獲取一個(gè) 樂(lè)觀讀時(shí)間戳,不實(shí)際加鎖,直接讀取數(shù)據(jù)(成本極低),然后validate(stamp):檢查該時(shí)間戳對(duì)應(yīng)的讀操作期間是否有寫(xiě)操作發(fā)生。若 stamp 有效(無(wú)寫(xiě)操作),則數(shù)據(jù)一致;否則需要升級(jí)為讀鎖。鎖升級(jí):若驗(yàn)證失敗,說(shuō)明數(shù)據(jù)可能被修改,通過(guò) readLock() 獲取讀鎖(阻塞直到寫(xiě)鎖釋放),確保后續(xù)讀取的數(shù)據(jù)是最新的。

      - 可重入性探討

      JVM允許同一個(gè)線程重復(fù)獲取同一個(gè)鎖,這種能被同一個(gè)線程反復(fù)獲取的鎖,就叫做可重入鎖

      本小節(jié)看一下上面兩種讀寫(xiě)鎖的可重入性,首先是ReadWriteLock,從他的實(shí)現(xiàn)類(lèi)來(lái)看就是可重入的了【ReentrantReadWriteLock】

      ReentrantReadWriteLock中也有Sync的抽象內(nèi)部類(lèi),當(dāng)調(diào)用寫(xiě)鎖的lock時(shí),實(shí)際是會(huì)經(jīng)過(guò)里面重寫(xiě)的tryAcquire,從下面的代碼可以知道同一線程可多次獲取寫(xiě)鎖:當(dāng)線程獲取寫(xiě)鎖后,再次調(diào)用 writeLock() 會(huì)直接成功(無(wú)需等待),因?yàn)閮?nèi)部維護(hù)了一個(gè) 重入計(jì)數(shù)器(類(lèi)似 ReentrantLock)。每次獲取寫(xiě)鎖時(shí)計(jì)數(shù)器加 1,釋放時(shí)減 1,計(jì)數(shù)器為 0 時(shí)才真正釋放鎖。

      protected final boolean tryAcquire(int acquires) {
          Thread current = Thread.currentThread();
          int c = getState();
          int w = exclusiveCount(c);
          if (c != 0) {
              if (w == 0 || current != getExclusiveOwnerThread())
                  return false;
              if (w + exclusiveCount(acquires) > MAX_COUNT)
                 ....
              // Reentrant acquire
              setState(c + acquires);
              return true;
          }
          ....
          return true;
      }
      

      ReentrantReadWriteLock 基于 AQS(AbstractQueuedSynchronizer) 實(shí)現(xiàn),通過(guò) state 變量的高 16 位和低 16 位分別記錄 讀鎖的共享次數(shù)寫(xiě)鎖的重入次數(shù)

      • 寫(xiě)鎖(獨(dú)占模式):使用低 16 位記錄當(dāng)前線程的重入次數(shù)(和 ReentrantLock 類(lèi)似)。
      • 讀鎖(共享模式):使用高 16 位記錄所有線程的讀鎖獲取次數(shù),但會(huì)通過(guò)線程本地變量(ThreadLocal)記錄當(dāng)前線程的讀鎖重入次數(shù),避免不同線程的計(jì)數(shù)干擾。

      這種設(shè)計(jì)使得同一線程多次獲取寫(xiě)鎖或在讀鎖 / 寫(xiě)鎖之間按規(guī)則重入時(shí),不會(huì)出現(xiàn)死鎖,符合可重入鎖的定義。

      public static void write() {
          writeLock.lock();
          try {
              writeLock.lock();
              a[1]++;
              System.out.println("寫(xiě)進(jìn)行~~~");
              Thread.sleep(1000);
              System.out.println("寫(xiě)ok~~~");
          } catch (InterruptedException e) {
              throw new RuntimeException(e);
          } finally {
              writeLock.unlock(); // 可重入
              writeLock.unlock();
          }
      }
      

      StampedLock是不可重入的,為什么呢?

      1. 沒(méi)有鎖計(jì)數(shù)機(jī)制:StampedLock 并沒(méi)有像 ReentrantLock 那樣維護(hù)一個(gè)鎖的重入計(jì)數(shù)。在 ReentrantLock 中,state 變量用于記錄鎖的重入次數(shù),每次獲取鎖時(shí) state 加 1,釋放鎖時(shí) state 減 1。而 StampedLock 中的 state 變量主要用于表示鎖的狀態(tài)和版本信息,并非用于記錄重入次數(shù)。

      2. 如果一個(gè)線程已經(jīng)持有了 StampedLock 的寫(xiě)鎖或讀鎖,再次嘗試獲取相同類(lèi)型的鎖時(shí),會(huì)出現(xiàn)以下情況:

        • 寫(xiě)鎖情況:如果線程已經(jīng)持有寫(xiě)鎖,再次調(diào)用 writeLock() 方法,由于寫(xiě)鎖是獨(dú)占的,該線程會(huì)被阻塞,因?yàn)樗鼤?huì)等待自己釋放寫(xiě)鎖后才能再次獲取,這顯然會(huì)導(dǎo)致死鎖。
        • 讀鎖情況:如果線程已經(jīng)持有讀鎖,再次調(diào)用 readLock() 方法,雖然讀鎖是共享的,但 StampedLock 并不會(huì)像可重入鎖那樣允許線程多次獲取而不產(chǎn)生問(wèn)題。而且如果在持有讀鎖的情況下嘗試獲取寫(xiě)鎖,會(huì)導(dǎo)致死鎖,因?yàn)閷?xiě)鎖需要獨(dú)占資源,而當(dāng)前線程已經(jīng)持有了讀鎖。
      private static void addXY(double a, double b) {
          long stamp = stampedLock.writeLock();
          try {
              long lock = stampedLock.writeLock(); // 不可重入
              System.out.println("寫(xiě)進(jìn)行~~");
              x += a;
              y += b;
              Thread.sleep(1000);
              System.out.println("寫(xiě)ok~~");
          } catch (InterruptedException e) {
              throw new RuntimeException(e);
          } finally {
              stampedLock.unlockWrite(stamp);
          }
      }
      

      ③ 鎖案例

      上面只掌握了一丟丟的理論,沒(méi)有實(shí)踐怎么行呢?

      1) 交替打印

      第一個(gè),我們來(lái)實(shí)現(xiàn)一下三個(gè)線程交替打印A B C試試,第一個(gè)線程打印A,第二個(gè)B,第三個(gè)C

      // synchronized實(shí)現(xiàn)
      public class PrintABCSynchronized {
          private int now = 1;
          public static void main(String[] args) {
              PrintABCSynchronized obj = new PrintABCSynchronized();
              new Thread(obj::printA).start();
              new Thread(obj::printB).start();
              new Thread(obj::printC).start();
          }
          public void printA() {
              for (int i = 0; i < 10; i++) {
                  synchronized (this) {
                      while ( now != 1 ) { // 為什么用while,不用if?留給讀者思考
                          try {this.wait();}
                          catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                      System.out.println("A"); now = 2;
                      this.notifyAll();
                  }
              }
          }
          public void printB() {
              for (int i = 0; i < 10; i++) {
                  synchronized (this) {
                      while ( now != 2 ) { 
                          try {this.wait();}
                          catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                      System.out.println("B"); now = 3;
                      this.notifyAll();
                  }
              }
          }
          public void printC() {
              for (int i = 0; i < 10; i++) {
                  synchronized (this) {
                      while ( now != 3 ) {
                          try {this.wait();}
                          catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                      System.out.println("C"); now = 1;
                      this.notifyAll();
                  }
              }
          }
      }
      

      wait - notify 【這個(gè)只能用在synchronized同步代碼塊中,是屬于Object的方法】上面的缺陷很?chē)?yán)重,那就是一下子就喚醒了所有掛起的線程,其實(shí)有的線程根本就不用喚醒,有沒(méi)有一種辦法,就是我想喚醒誰(shuí)就喚醒誰(shuí)呢?

      public class PrintABCLock {
          private Lock lock = new ReentrantLock();
          private Condition a = lock.newCondition();
          private Condition b = lock.newCondition();
          private Condition c = lock.newCondition();
          private int flag = 1;
          public static void main(String[] args) {
              PrintABCLock obj = new PrintABCLock();
              new Thread(obj::printA).start();
              new Thread(obj::printB).start();
              new Thread(obj::printC).start();
          }
          public void printA()  {
              lock.lock();
              try {
                  for (int i = 0; i < 10; i++) {
                      while ( flag != 1 ) a.await();
                      System.out.println('A'); flag = 2;
                      b.signal();
                  }
              } catch ( InterruptedException e) {
                  e.printStackTrace();
              }
              finally {
                  lock.unlock();
              }
          }
          public void printB() {
              lock.lock();
              try {
                  for (int i = 0; i < 10; i++) {
                      while ( flag != 2 ) b.await();
                      System.out.println('B'); flag = 3;
                      c.signal();
                  }
              } catch ( InterruptedException e) {
                  e.printStackTrace();
              }
              finally {
                  lock.unlock();
              }
          }
          public void printC() {
              lock.lock();
              try {
                  for (int i = 0; i < 10; i++) {
                      while ( flag != 3 ) c.await();
                      System.out.println('C'); flag = 1;
                      a.signal();
                  }
              } catch ( InterruptedException e) {
                  e.printStackTrace();
              }
              finally {
                  lock.unlock();
              }
          }
      }
      

      2) 阻塞隊(duì)列

      下面模仿jdk中ArrayBlockingQueue的源碼,給了一個(gè)簡(jiǎn)潔的阻塞隊(duì)列

      class TBlockedQueue<T> {
          private final Lock lock;
          private final Condition notEmpty;
          private final Condition notFull;
          private final int capacity;
          private final LinkedList<T> list;
          public TBlockedQueue(int capacity) {
              if (capacity <= 0)
                  throw new IllegalArgumentException("Capacity 不能小于1");
              this.capacity = capacity;
              list = new LinkedList<>();
              lock = new ReentrantLock();
              notEmpty = lock.newCondition();
              notFull = lock.newCondition();
          }
          public void add( T t ) {
              list.addLast(t);
          }
      
          // 1. 入隊(duì) -- 當(dāng)隊(duì)列已滿時(shí),向隊(duì)列中添加元素的操作會(huì)被阻塞,直到隊(duì)列有空間可用。
          public void put( T t ) {
              if (t == null) throw new NullPointerException();
              lock.lock();
              try {
                  while (list.size() == capacity) notFull.await();
                  list.addLast(t);
                  notEmpty.signal();
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              } finally {
                  lock.unlock();
              }
          }
          // 2. 出隊(duì) -- 當(dāng)隊(duì)列為空時(shí),從隊(duì)列中獲取元素的操作會(huì)被阻塞,直到隊(duì)列中有新元素加入
          public T take() {
              lock.lock();
              try {
                  while (list.isEmpty()) notEmpty.await();
                  T t = list.removeFirst();
                  notFull.signal();
                  return t;
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              } finally {
                  lock.unlock();
              }
          }
      }
      

      這里自定義的 TBlockedQueue 是一個(gè)典型的 有界阻塞隊(duì)列,其核心思路是通過(guò) 鎖(Lock)和條件變量(Condition) 實(shí)現(xiàn)線程間的同步與協(xié)調(diào),確保在多線程環(huán)境下對(duì)隊(duì)列的操作是安全的。通過(guò) lock.lock()lock.unlock() 包裹對(duì)共享資源 list 的操作,確保同一時(shí)刻只有一個(gè)線程修改隊(duì)列。同時(shí),使用 while 循環(huán)檢查條件(如 list.size() == capacity),防止 虛假喚醒導(dǎo)致條件不滿足時(shí)錯(cuò)誤地繼續(xù)執(zhí)行。

      • notEmpty:當(dāng)隊(duì)列為空時(shí),take 操作會(huì)等待此條件;當(dāng)有元素入隊(duì)時(shí),通過(guò) signal() 喚醒等待的消費(fèi)者線程。
      • notFull:當(dāng)隊(duì)列已滿時(shí),put 操作會(huì)等待此條件;當(dāng)有元素出隊(duì)時(shí),通過(guò) signal() 喚醒等待的生產(chǎn)者線程。

      3) AQS自定義鎖

      前面分析可以知道ReentrantLock是以AQS為基礎(chǔ)框架來(lái)實(shí)現(xiàn)的,那么,此節(jié)我們自定義來(lái)實(shí)現(xiàn)一個(gè)鎖。

      見(jiàn) “Java并發(fā)探索--下篇”

      4.探索并發(fā)工具

      5.虛擬線程

      見(jiàn) “Java并發(fā)探索--下篇” --- 在下面找

      博客園

      http://www.rzrgm.cn/jackjavacpp

      CSDN

      https://blog.csdn.net/okok__TXF

      end.參考

      1. https://blog.csdn.net/agonie201218/article/details/128712507
      2. https://blog.csdn.net/xu_yong_lin/article/details/117521773
      3. http://www.rzrgm.cn/java-bible/p/13930006.html
      4. https://blog.csdn.net/fighting_yu/article/details/89473175
      5. https://tech.meituan.com/2018/11/15/java-lock.html
      6. https://blog.csdn.net/weixin_44772566/article/details/137398521
      7. https://blog.csdn.net/m0_73978383/article/details/146442443 【synchronized詳解】
      8. https://liaoxuefeng.com/books/java/threading 【廖雪峰的官方網(wǎng)站--- 神中神】
      posted @ 2025-04-28 20:24  別來(lái)無(wú)恙?  閱讀(172)  評(píng)論(0)    收藏  舉報(bào)
      主站蜘蛛池模板: 狠狠色噜噜狠狠狠狠蜜桃| 国产精品普通话国语对白露脸| 成在线人永久免费视频播放| 免费无遮挡无码永久在线观看视频| 久久国内精品自在自线91| 91福利一区福利二区| 亚洲日韩av无码一区二区三区人 | 青青草成人免费自拍视频| 久久精品丝袜高跟鞋| 在线观看AV永久免费| 深夜释放自己在线观看| 天堂av资源在线免费| 国产精品黑色丝袜在线观看| 亚洲AV无码东方伊甸园| 天天爽夜夜爱| 99久久国产成人免费网站| 日韩欧美亚洲综合久久| 日韩一区二区三区精彩视频| 国产又黄又爽又不遮挡视频| 欧美人与动牲猛交A欧美精品| 天峻县| 麻豆一区二区三区蜜桃免费 | 国产av成人精品播放| 拍真实国产伦偷精品| 中文字幕久久六月色综合| 国产精品午夜福利资源| 97精品亚成在人线免视频| 亚洲AV旡码高清在线观看| 99福利一区二区视频| 亚洲春色在线视频| 亚洲熟妇自偷自拍另类| 久久88香港三级台湾三级播放| 狠狠综合久久综合88亚洲| 安康市| 四虎成人在线观看免费| 亚洲AV蜜桃永久无码精品| 成人无码一区二区三区网站| 日韩精品人妻黄色一级片| 亚洲中文字幕在线无码一区二区| 中文字幕成人精品久久不卡| 精品少妇av蜜臀av|