AbstractQuenedSynchronizer抽象的隊列式同步器。是除了java自帶的synchronized關鍵字之外的鎖機制。AQS的全稱為(AbstractQueuedSynchronizer),這個抽象類在java.util.concurrent.locks包。
核心思想(AQS=state+CLH雙向隊列)
如果被請求的共享資源空閑,則將當前請求資源的線程設置為有效的工作線程,并將共享資源設置為鎖定狀態,如果被請求的共享資源被占用,那么就需要一套線程阻塞等待以及被喚醒時鎖分配的機制,這個機制AQS是用CLH隊列變種鎖實現的,即將暫時獲取不到鎖的線程加入到隊列中,并且該隊列是一個雙向隊列。也就是說AQS就是基于CLH隊列(三個大牛名字的組成,本來是單向的,這里采用的是一個變種),用volatile修飾共享變量state,線程通過CAS去改變狀態符,成功則獲取鎖成功,失敗則進入等待隊列,等待被喚醒。并且AQS是自旋鎖,在等待被喚醒時,會不停的自旋操作,直到獲取鎖成功。(AQS使用一個volatile的int類型的成員變量來表示同步狀態,通過內置的FIFO隊列來完成資源獲取的排隊工作將每條要去搶占資源的線程封裝成一個Node節點(Node<Thread>)來實現鎖的分配,通過CAS完成對State值的修改。)狀態標志位state為0時,說明當前資源空閑,可以被當前線程獲取。

AQS
static final class Node { //內部類Node . . . } private transient volatile Node head; //同步器頭指針 private transient volatile Node tail; //同步器尾指針 private volatile int state; //標志位state
AQS的內部類Node,Node=waitStatus+前后指針 waitStatus表示當前節點在隊列中的狀態(volatile的int類型)。
static final class Node { //共享的 static final Node SHARED = new Node(); //排他的 static final Node EXCLUSIVE = null; //waitStatus的狀態 static final int CANCELLED = 1; //表示當前線程獲取鎖的請求已經被取消 static final int SIGNAL = -1; //為-1,表示線程已經準備好了,就等資源釋放了 static final int CONDITION = -2; //為-2,表示節點在等待隊列中,節點線程等待喚醒 static final int PROPAGATE = -3; //為-3,當前線程處于SHARED情況下,該字段才能被使用 volatile int waitStatus; //前指針 volatile Node prev; //后指針 volatile Node next; //當前Node中的線程,排隊的線程 volatile Thread thread; Node nextWaiter;
AQS 定義了兩種資源共享方式:
1.Exclusive:獨占,只有一個線程能執行,如ReentrantLock
2.Share:共享,多個線程可以同時執行,如Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier不同的自定義的同步器爭用共享資源的方式也不同。
以ReentrantLock對AQS分析
ReentrantLock是重入鎖的一種實現,一次只能由一個線程持有鎖,實現了Lock接口。其中包含三個內部類:Sync,NonfairSync,FairSync。其鎖的實現主要依賴于這三個內部類。
abstract static class Sync extends AbstractQueuedSynchronizer static final class FairSync extends Sync //公平性鎖 static final class NonfairSync extends Sync //非公平性鎖

先看看Sync方法,公平鎖和非公平鎖的父類。
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; //序列化 abstract void lock(); //抽象的加鎖方法,需要子類來實現 final boolean nonfairTryAcquire(int acquires) { //非公平和公平性加鎖操作的都需要調用的方法 final Thread current = Thread.currentThread(); int c = getState(); //獲取AQS中state屬性值,state = 0:鎖空閑,大于0: 鎖占用 小于0:鎖溢出 if (c == 0) { //鎖空閑 if (compareAndSetState(0, acquires)) { //通過CAS來確保多線程的安全性,嘗試獲取鎖 setExclusiveOwnerThread(current); //設置當前持有鎖的線程 return true; //加鎖成功 } } //鎖占用(當前線程持有鎖或者其他線程持有鎖)或者鎖溢出 else if (current == getExclusiveOwnerThread()) { //判斷是否當前持有鎖的線程與現在想再次獲取鎖的線程是否是同一個 (可重入鎖的體現,當前獲取到鎖的線程可以再次獲取到鎖,并且對state修改) int nextc = c + acquires; //對持有鎖的次數進行變更 if (nextc < 0) //被鎖次數上溢(超過int的最大值), throw new Error("Maximum lock count exceeded"); setState(nextc); //當前持有鎖的線程變更state值 return true; //獲取鎖成功 } return false; //當前鎖被占用,并且不是當前持有鎖的線程想再次獲取鎖 } /** * 釋放鎖調用的方法 */ protected final boolean tryRelease(int releases) { int c = getState() - releases; //只能由當前持有鎖的線程釋放鎖 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; //鎖被釋放 setExclusiveOwnerThread(null); //將持有鎖的線程信息置為null } setState(c); //變更鎖的狀態 return free; //釋放鎖操作,當 c==0時,才能真正釋放鎖,不等于0時,只能變更鎖狀態,不能真正釋放 } //釋放當前線程持有的鎖 protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } //獲取ConditionObject對象 final ConditionObject newCondition() { return new ConditionObject(); } //獲取持有鎖的線程 final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } //獲取加鎖次數 final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } //判斷是否加鎖 final boolean isLocked() { return getState() != 0; } }
Lock有公平性鎖和非公平性鎖,下面分析調用lock()方法的非公平性鎖
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; //lock加鎖操作 final void lock() { if (compareAndSetState(0, 1)) //直接通過CAS搶鎖,true:搶鎖成功(設置AQS里的state量) (體現一個非公平性,當前線程一來就可以進行CAS嘗試搶鎖的操作) setExclusiveOwnerThread(Thread.currentThread());//設置鎖的持有者 else acquire(1); //獲取鎖失敗,進入到常規流程,acquire會首先調用tryAcquire (稍后分析該方法) } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
AQS類的acquire()方法
public final void acquire(int arg) { if (!tryAcquire(arg) && //獲取鎖失敗返回false,(注意:取反之后就是true),調用的是上面類中的tyrAcquire()方法,繼續進行后續的判斷;
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //繼續調用addWaiter()方法,Node.EXCLUSIVE是一個排他屬性 //調用acquireQueued()
selfInterrupt(); }
在AQS中的tyrAcquire()方法如下,并么有實際的邏輯代碼:(采用了模板模式,子類必須實現該方法,如果么有實現則會拋出異常)。
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
真正調用的是非公平鎖的類里面的tyrAcquire方法,由該方法調用父類Sync里面的nonfairTryAcquire()方法。(前面有分析)
AQS類的addWaiter()方法
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode);//當前Node中的線程是搶鎖失敗的線程 Node pred = tail; //當前隊列中還么有元素,tail為null,則會進入enq方法 if (pred != null) { //當不為空時 node.prev = pred; //新節點的前一個節點就是尾結點(尾插法) if (compareAndSetTail(pred, node)) { //設置尾結點 pred.next = node; return node; } } enq(node); //當隊列為空時調用該方法 return node; }
AQS類的enq()方法
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { //當前的隊列中無元素 if (compareAndSetHead(new Node())) //初始化一個頭結點(哨兵結點) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { //設置尾結點 t.next = node; return t; } } } }
上面方法的如圖分析:
當隊列為空時,死循環第一次進入(t == null),可以看出隊列的第一個節點并不是一個有效的節點,而是一個哨兵結點。

第二次進入,才是第一個有效結點

AQS的acquireQueued()
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; // try { boolean interrupted = false; //判斷是否是可中斷的 for (;;) { final Node p = node.predecessor(); //哨兵結點p if (p == head && tryAcquire(arg)) { //當前線程再次嘗試獲取鎖 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && //設置哨兵結點的waitstatues = -1。 parkAndCheckInterrupt()) //阻塞線程 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); //阻塞當前線程,等待被喚醒 return Thread.interrupted(); }
調用unLock()
public void unlock() { sync.release(1); }
AQS的release()方法
public final boolean release(int arg) { if (tryRelease(arg)) { //調用tryRelease方法,詳細見開頭Sync的分析 Node h = head; //哨兵結點 if (h != null && h.waitStatus != 0) unparkSuccessor(h); // return true; } return false; }
浙公網安備 33010602011771號