深入理解JAVA并發鎖
深入理解 Java 并發鎖
1. 并發鎖簡介
確保線程安全最常見的做法是利用鎖機制(Lock、sychronized)來對共享數據做互斥同步,這樣在同一個時刻,只有一個線程可以執行某個方法或者某個代碼塊,那么操作必然是原子性的,線程安全的。
在工作、面試中,經常會聽到各種五花八門的鎖,聽的人云里霧里。鎖的概念術語很多,它們是針對不同的問題所提出的,通過簡單的梳理,也不難理解。
1.1. 可重入鎖
可重入鎖,顧名思義,指的是線程可以重復獲取同一把鎖。即同一個線程在外層方法獲取了鎖,在進入內層方法會自動獲取鎖。
可重入鎖可以在一定程度上避免死鎖。
ReentrantLock、ReentrantReadWriteLock是可重入鎖。這點,從其命名也不難看出。synchronized也是一個可重入鎖。
【示例】synchronized 的可重入示例
synchronized void setA() throws Exception{
Thread.sleep(1000);
setB();
}
synchronized void setB() throws Exception{
Thread.sleep(1000);
}
上面的代碼就是一個典型場景:如果使用的鎖不是可重入鎖的話,setB 可能不會被當前線程執行,從而造成死鎖。
【示例】ReentrantLock 的可重入示例
class Task {
private int value;
private final Lock lock = new ReentrantLock();
public Task() {
this.value = 0;
}
public int get() {
// 獲取鎖
lock.lock();
try {
return value;
} finally {
// 保證鎖能釋放
lock.unlock();
}
}
public void addOne() {
// 獲取鎖
lock.lock();
try {
// 注意:此處已經成功獲取鎖,進入 get 方法后,又嘗試獲取鎖,
// 如果鎖不是可重入的,會導致死鎖
value = 1 + get();
} finally {
// 保證鎖能釋放
lock.unlock();
}
}
}
1.2. 公平鎖與非公平鎖
- 公平鎖 - 公平鎖是指 多線程按照申請鎖的順序來獲取鎖。
- 非公平鎖 - 非公平鎖是指 多線程不按照申請鎖的順序來獲取鎖 。這就可能會出現優先級反轉(后來者居上)或者饑餓現象(某線程總是搶不過別的線程,導致始終無法執行)。
公平鎖為了保證線程申請順序,勢必要付出一定的性能代價,因此其吞吐量一般低于非公平鎖。
公平鎖與非公平鎖 在 Java 中的典型實現:
synchronized只支持非公平鎖。ReentrantLock、ReentrantReadWriteLock,默認是非公平鎖,但支持公平鎖。
1.3. 獨享鎖與共享鎖
獨享鎖與共享鎖是一種廣義上的說法,從實際用途上來看,也常被稱為互斥鎖與讀寫鎖。
- 獨享鎖 - 獨享鎖是指 鎖一次只能被一個線程所持有。
- 共享鎖 - 共享鎖是指 鎖可被多個線程所持有。
獨享鎖與共享鎖在 Java 中的典型實現:
synchronized、ReentrantLock只支持獨享鎖。ReentrantReadWriteLock其寫鎖是獨享鎖,其讀鎖是共享鎖。讀鎖是共享鎖使得并發讀是非常高效的,讀寫,寫讀 ,寫寫的過程是互斥的。
1.4. 悲觀鎖與樂觀鎖
樂觀鎖與悲觀鎖不是指具體的什么類型的鎖,而是處理并發同步的策略。
- 悲觀鎖 - 悲觀鎖對于并發采取悲觀的態度,認為:不加鎖的并發操作一定會出問題。悲觀鎖適合寫操作頻繁的場景。
- 樂觀鎖 - 樂觀鎖對于并發采取樂觀的態度,認為:不加鎖的并發操作也沒什么問題。對于同一個數據的并發操作,是不會發生修改的。在更新數據的時候,會采用不斷嘗試更新的方式更新數據。樂觀鎖適合讀多寫少的場景。
悲觀鎖與樂觀鎖在 Java 中的典型實現:
-
悲觀鎖在 Java 中的應用就是通過使用
synchronized和Lock顯示加鎖來進行互斥同步,這是一種阻塞同步。 -
樂觀鎖在 Java 中的應用就是采用
CAS機制(CAS操作通過Unsafe類提供,但這個類不直接暴露為 API,所以都是間接使用,如各種原子類)。
1.5. 偏向鎖、輕量級鎖、重量級鎖
所謂輕量級鎖與重量級鎖,指的是鎖控制粒度的粗細。顯然,控制粒度越細,阻塞開銷越小,并發性也就越高。
Java 1.6 以前,重量級鎖一般指的是 synchronized ,而輕量級鎖指的是 volatile。
Java 1.6 以后,針對 synchronized 做了大量優化,引入 4 種鎖狀態: 無鎖狀態、偏向鎖、輕量級鎖和重量級鎖。鎖可以單向的從偏向鎖升級到輕量級鎖,再從輕量級鎖升級到重量級鎖 。
-
偏向鎖 - 偏向鎖是指一段同步代碼一直被一個線程所訪問,那么該線程會自動獲取鎖。降低獲取鎖的代價。
-
輕量級鎖 - 是指當鎖是偏向鎖的時候,被另一個線程所訪問,偏向鎖就會升級為輕量級鎖,其他線程會通過自旋的形式嘗試獲取鎖,不會阻塞,提高性能。
-
重量級鎖 - 是指當鎖為輕量級鎖的時候,另一個線程雖然是自旋,但自旋不會一直持續下去,當自旋一定次數的時候,還沒有獲取到鎖,就會進入阻塞,該鎖膨脹為重量級鎖。重量級鎖會讓其他申請的線程進入阻塞,性能降低。
1.6. 分段鎖
分段鎖其實是一種鎖的設計,并不是具體的一種鎖。所謂分段鎖,就是把鎖的對象分成多段,每段獨立控制,使得鎖粒度更細,減少阻塞開銷,從而提高并發性。這其實很好理解,就像高速公路上的收費站,如果只有一個收費口,那所有的車只能排成一條隊繳費;如果有多個收費口,就可以分流了。
Hashtable 使用 synchronized 修飾方法來保證線程安全性,那么面對線程的訪問,Hashtable 就會鎖住整個對象,所有的其它線程只能等待,這種阻塞方式的吞吐量顯然很低。
Java 1.7 以前的 ConcurrentHashMap 就是分段鎖的典型案例。ConcurrentHashMap 維護了一個 Segment 數組,一般稱為分段桶。
final Segment<K,V>[] segments;
當有線程訪問 ConcurrentHashMap 的數據時,ConcurrentHashMap 會先根據 hashCode 計算出數據在哪個桶(即哪個 Segment),然后鎖住這個 Segment。
1.7. 顯示鎖和內置鎖
Java 1.5 之前,協調對共享對象的訪問時可以使用的機制只有 synchronized 和 volatile。這兩個都屬于內置鎖,即鎖的申請和釋放都是由 JVM 所控制。
Java 1.5 之后,增加了新的機制:ReentrantLock、ReentrantReadWriteLock ,這類鎖的申請和釋放都可以由程序所控制,所以常被稱為顯示鎖。
注意:如果不需要
ReentrantLock、ReentrantReadWriteLock所提供的高級同步特性,應該優先考慮使用synchronized。理由如下:
- Java 1.6 以后,
synchronized做了大量的優化,其性能已經與ReentrantLock、ReentrantReadWriteLock基本上持平。- 從趨勢來看,Java 未來更可能會優化
synchronized,而不是ReentrantLock、ReentrantReadWriteLock,因為synchronized是 JVM 內置屬性,它能執行一些優化。ReentrantLock、ReentrantReadWriteLock申請和釋放鎖都是由程序控制,如果使用不當,可能造成死鎖,這是很危險的。
以下對比一下顯示鎖和內置鎖的差異:
- 主動獲取鎖和釋放鎖
synchronized不能主動獲取鎖和釋放鎖。獲取鎖和釋放鎖都是 JVM 控制的。ReentrantLock可以主動獲取鎖和釋放鎖。(如果忘記釋放鎖,就可能產生死鎖)。
- 響應中斷
synchronized不能響應中斷。ReentrantLock可以響應中斷。
- 超時機制
synchronized沒有超時機制。ReentrantLock有超時機制。ReentrantLock可以設置超時時間,超時后自動釋放鎖,避免一直等待。
- 支持公平鎖
synchronized只支持非公平鎖。ReentrantLock支持非公平鎖和公平鎖。
- 是否支持共享
- 被
synchronized修飾的方法或代碼塊,只能被一個線程訪問(獨享)。如果這個線程被阻塞,其他線程也只能等待 ReentrantLock可以基于Condition靈活的控制同步條件。
- 被
- 是否支持讀寫分離
synchronized不支持讀寫鎖分離;ReentrantReadWriteLock支持讀寫鎖,從而使阻塞讀寫的操作分開,有效提高并發性。
2. Lock 和 Condition
2.1. 為何引入 Lock 和 Condition
并發編程領域,有兩大核心問題:一個是互斥,即同一時刻只允許一個線程訪問共享資源;另一個是同步,即線程之間如何通信、協作。這兩大問題,管程都是能夠解決的。Java SDK 并發包通過 Lock 和 Condition 兩個接口來實現管程,其中 Lock 用于解決互斥問題,Condition 用于解決同步問題。
synchronized 是管程的一種實現,既然如此,何必再提供 Lock 和 Condition。
JDK 1.6 以前,synchronized 還沒有做優化,性能遠低于 Lock。但是,性能不是引入 Lock 的最重要因素。真正關鍵在于:synchronized 使用不當,可能會出現死鎖。
synchronized 無法通過破壞不可搶占條件來避免死鎖。原因是 synchronized 申請資源的時候,如果申請不到,線程直接進入阻塞狀態了,而線程進入阻塞狀態,啥都干不了,也釋放不了線程已經占有的資源。
與內置鎖 synchronized 不同的是,Lock 提供了一組無條件的、可輪詢的、定時的以及可中斷的鎖操作,所有獲取鎖、釋放鎖的操作都是顯式的操作。
- 能夠響應中斷。synchronized 的問題是,持有鎖 A 后,如果嘗試獲取鎖 B 失敗,那么線程就進入阻塞狀態,一旦發生死鎖,就沒有任何機會來喚醒阻塞的線程。但如果阻塞狀態的線程能夠響應中斷信號,也就是說當我們給阻塞的線程發送中斷信號的時候,能夠喚醒它,那它就有機會釋放曾經持有的鎖 A。這樣就破壞了不可搶占條件了。
- 支持超時。如果線程在一段時間之內沒有獲取到鎖,不是進入阻塞狀態,而是返回一個錯誤,那這個線程也有機會釋放曾經持有的鎖。這樣也能破壞不可搶占條件。
- 非阻塞地獲取鎖。如果嘗試獲取鎖失敗,并不進入阻塞狀態,而是直接返回,那這個線程也有機會釋放曾經持有的鎖。這樣也能破壞不可搶占條件。
2.2. Lock 接口
Lock 的接口定義如下:
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
lock()- 獲取鎖。unlock()- 釋放鎖。tryLock()- 嘗試獲取鎖,僅在調用時鎖未被另一個線程持有的情況下,才獲取該鎖。tryLock(long time, TimeUnit unit)- 和tryLock()類似,區別僅在于限定時間,如果限定時間內未獲取到鎖,視為失敗。lockInterruptibly()- 鎖未被另一個線程持有,且線程沒有被中斷的情況下,才能獲取鎖。newCondition()- 返回一個綁定到Lock對象上的Condition實例。
2.3. Condition
Condition 實現了管程模型里面的條件變量。
前文中提過 Lock 接口中 有一個 newCondition() 方法用于返回一個綁定到 Lock 對象上的 Condition 實例。Condition 是什么?有什么作用?本節將一一講解。
在單線程中,一段代碼的執行可能依賴于某個狀態,如果不滿足狀態條件,代碼就不會被執行(典型的場景,如:if ... else ...)。在并發環境中,當一個線程判斷某個狀態條件時,其狀態可能是由于其他線程的操作而改變,這時就需要有一定的協調機制來確保在同一時刻,數據只能被一個線程鎖修改,且修改的數據狀態被所有線程所感知。
Java 1.5 之前,主要是利用 Object 類中的 wait、notify、notifyAll 配合 synchronized 來進行線程間通信 。
wait、notify、notifyAll 需要配合 synchronized 使用,不適用于 Lock。而使用 Lock 的線程,彼此間通信應該使用 Condition 。這可以理解為,什么樣的鎖配什么樣的鑰匙。內置鎖(synchronized)配合內置條件隊列(wait、notify、notifyAll ),顯式鎖(Lock)配合顯式條件隊列(Condition )。
Condition 的特性
Condition 接口定義如下:
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
其中,await、signal、signalAll 與 wait、notify、notifyAll 相對應,功能也相似。除此以外,Condition 相比內置條件隊列( wait、notify、notifyAll ),提供了更為豐富的功能:
- 每個鎖(
Lock)上可以存在多個Condition,這意味著鎖的狀態條件可以有多個。 - 支持公平的或非公平的隊列操作。
- 支持可中斷的條件等待,相關方法:
awaitUninterruptibly()。 - 支持可定時的等待,相關方法:
awaitNanos(long)、await(long, TimeUnit)、awaitUntil(Date)。
Condition 的用法
這里以 Condition 來實現一個消費者、生產者模式。
產品類
class Message {
private final Lock lock = new ReentrantLock();
private final Condition producedMsg = lock.newCondition();
private final Condition consumedMsg = lock.newCondition();
private String message;
private boolean state;
private boolean end;
public void consume() {
//lock
lock.lock();
try {
// no new message wait for new message
while (!state) { producedMsg.await(); }
System.out.println("consume message : " + message);
state = false;
// message consumed, notify waiting thread
consumedMsg.signal();
} catch (InterruptedException ie) {
System.out.println("Thread interrupted - viewMessage");
} finally {
lock.unlock();
}
}
public void produce(String message) {
lock.lock();
try {
// last message not consumed, wait for it be consumed
while (state) { consumedMsg.await(); }
System.out.println("produce msg: " + message);
this.message = message;
state = true;
// new message added, notify waiting thread
producedMsg.signal();
} catch (InterruptedException ie) {
System.out.println("Thread interrupted - publishMessage");
} finally {
lock.unlock();
}
}
public boolean isEnd() {
return end;
}
public void setEnd(boolean end) {
this.end = end;
}
}
消費者
class MessageConsumer implements Runnable {
private Message message;
public MessageConsumer(Message msg) {
message = msg;
}
@Override
public void run() {
while (!message.isEnd()) { message.consume(); }
}
}
生產者
class MessageProducer implements Runnable {
private Message message;
public MessageProducer(Message msg) {
message = msg;
}
@Override
public void run() {
produce();
}
public void produce() {
List<String> msgs = new ArrayList<>();
msgs.add("Begin");
msgs.add("Msg1");
msgs.add("Msg2");
for (String msg : msgs) {
message.produce(msg);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
message.produce("End");
message.setEnd(true);
}
}
測試
public class LockConditionDemo {
public static void main(String[] args) {
Message msg = new Message();
Thread producer = new Thread(new MessageProducer(msg));
Thread consumer = new Thread(new MessageConsumer(msg));
producer.start();
consumer.start();
}
}
3. ReentrantLock
ReentrantLock 類是 Lock 接口的具體實現,與內置鎖 synchronized 相同的是,它是一個可重入鎖。
3.1. ReentrantLock 的特性
ReentrantLock 的特性如下:
ReentrantLock提供了與synchronized相同的互斥性、內存可見性和可重入性。ReentrantLock支持公平鎖和非公平鎖(默認)兩種模式。ReentrantLock實現了Lock接口,支持了synchronized所不具備的靈活性。synchronized無法中斷一個正在等待獲取鎖的線程synchronized無法在請求獲取一個鎖時無休止地等待
3.2. ReentrantLock 的用法
前文了解了 ReentrantLock 的特性,接下來,我們要講述其具體用法。
ReentrantLock 的構造方法
ReentrantLock 有兩個構造方法:
public ReentrantLock() {}
public ReentrantLock(boolean fair) {}
ReentrantLock()- 默認構造方法會初始化一個非公平鎖(NonfairSync);ReentrantLock(boolean)-new ReentrantLock(true)會初始化一個公平鎖(FairSync)。
lock 和 unlock 方法
lock()- 無條件獲取鎖。如果當前線程無法獲取鎖,則當前線程進入休眠狀態不可用,直至當前線程獲取到鎖。如果該鎖沒有被另一個線程持有,則獲取該鎖并立即返回,將鎖的持有計數設置為 1。unlock()- 用于釋放鎖。
?? 注意:請務必牢記,獲取鎖操作
lock()必須在try catch塊外進行,并且將釋放鎖操作unlock()放在finally塊中進行,以保證鎖一定被被釋放,防止死鎖的發生。
示例:ReentrantLock 的基本操作
public class ReentrantLockDemo {
public static void main(String[] args) {
Task task = new Task();
MyThread tA = new MyThread("Thread-A", task);
MyThread tB = new MyThread("Thread-B", task);
MyThread tC = new MyThread("Thread-C", task);
tA.start();
tB.start();
tC.start();
}
static class MyThread extends Thread {
private Task task;
public MyThread(String name, Task task) {
super(name);
this.task = task;
}
@Override
public void run() {
task.execute();
}
}
static class Task {
private ReentrantLock lock = new ReentrantLock();
public void execute() {
lock.lock();
try {
for (int i = 0; i < 3; i++) {
System.out.println(lock.toString());
// 查詢當前線程 hold 住此鎖的次數
System.out.println("\t holdCount: " + lock.getHoldCount());
// 查詢正等待獲取此鎖的線程數
System.out.println("\t queuedLength: " + lock.getQueueLength());
// 是否為公平鎖
System.out.println("\t isFair: " + lock.isFair());
// 是否被鎖住
System.out.println("\t isLocked: " + lock.isLocked());
// 是否被當前線程持有鎖
System.out.println("\t isHeldByCurrentThread: " + lock.isHeldByCurrentThread());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
lock.unlock();
}
}
}
}
輸出結果:
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-A]
holdCount: 1
queuedLength: 2
isFair: false
isLocked: true
isHeldByCurrentThread: true
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-C]
holdCount: 1
queuedLength: 1
isFair: false
isLocked: true
isHeldByCurrentThread: true
// ...
tryLock 方法
與無條件獲取鎖相比,tryLock 有更完善的容錯機制。
tryLock()- 可輪詢獲取鎖。如果成功,則返回 true;如果失敗,則返回 false。也就是說,這個方法無論成敗都會立即返回,獲取不到鎖(鎖已被其他線程獲取)時不會一直等待。tryLock(long, TimeUnit)- 可定時獲取鎖。和tryLock()類似,區別僅在于這個方法在獲取不到鎖時會等待一定的時間,在時間期限之內如果還獲取不到鎖,就返回 false。如果如果一開始拿到鎖或者在等待期間內拿到了鎖,則返回 true。
示例:ReentrantLock 的 tryLock() 操作
修改上個示例中的 execute() 方法
public void execute() {
if (lock.tryLock()) {
try {
for (int i = 0; i < 3; i++) {
// 略...
}
} finally {
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " 獲取鎖失敗");
}
}
示例:ReentrantLock 的 tryLock(long, TimeUnit) 操作
修改上個示例中的 execute() 方法
public void execute() {
try {
if (lock.tryLock(2, TimeUnit.SECONDS)) {
try {
for (int i = 0; i < 3; i++) {
// 略...
}
} finally {
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " 獲取鎖失敗");
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " 獲取鎖超時");
e.printStackTrace();
}
}
lockInterruptibly 方法
lockInterruptibly()- 可中斷獲取鎖。可中斷獲取鎖可以在獲得鎖的同時保持對中斷的響應。可中斷獲取鎖比其它獲取鎖的方式稍微復雜一些,需要兩個try-catch塊(如果在獲取鎖的操作中拋出了InterruptedException,那么可以使用標準的try-finally加鎖模式)。- 舉例來說:假設有兩個線程同時通過
lock.lockInterruptibly()獲取某個鎖時,若線程 A 獲取到了鎖,則線程 B 只能等待。若此時對線程 B 調用threadB.interrupt()方法能夠中斷線程 B 的等待過程。由于lockInterruptibly()的聲明中拋出了異常,所以lock.lockInterruptibly()必須放在try塊中或者在調用lockInterruptibly()的方法外聲明拋出InterruptedException。
- 舉例來說:假設有兩個線程同時通過
?? 注意:當一個線程獲取了鎖之后,是不會被
interrupt()方法中斷的。單獨調用interrupt()方法不能中斷正在運行狀態中的線程,只能中斷阻塞狀態中的線程。因此當通過lockInterruptibly()方法獲取某個鎖時,如果未獲取到鎖,只有在等待的狀態下,才可以響應中斷。
示例:ReentrantLock 的 lockInterruptibly() 操作
修改上個示例中的 execute() 方法
public void execute() {
try {
lock.lockInterruptibly();
for (int i = 0; i < 3; i++) {
// 略...
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中斷");
e.printStackTrace();
} finally {
lock.unlock();
}
}
newCondition 方法
newCondition() - 返回一個綁定到 Lock 對象上的 Condition 實例。
3.3. ReentrantLock 的原理
ReentrantLock 的可見性
class X {
private final Lock rtl =
new ReentrantLock();
int value;
public void addOne() {
// 獲取鎖
rtl.lock();
try {
value+=1;
} finally {
// 保證鎖能釋放
rtl.unlock();
}
}
}
ReentrantLock,內部持有一個 volatile 的成員變量 state,獲取鎖的時候,會讀寫 state 的值;解鎖的時候,也會讀寫 state 的值(簡化后的代碼如下面所示)。也就是說,在執行 value+=1 之前,程序先讀寫了一次 volatile 變量 state,在執行 value+=1 之后,又讀寫了一次 volatile 變量 state。根據相關的 Happens-Before 規則:
- 順序性規則:對于線程 T1,value+=1 Happens-Before 釋放鎖的操作 unlock();
- volatile 變量規則:由于 state = 1 會先讀取 state,所以線程 T1 的 unlock() 操作 Happens-Before 線程 T2 的 lock() 操作;
- 傳遞性規則:線程 T1 的 value+=1 Happens-Before 線程 T2 的 lock() 操作。
ReentrantLock 的數據結構
閱讀 ReentrantLock 的源碼,可以發現它有一個核心字段:
private final Sync sync;
sync- 內部抽象類ReentrantLock.Sync對象,Sync繼承自 AQS。它有兩個子類:ReentrantLock.FairSync- 公平鎖。ReentrantLock.NonfairSync- 非公平鎖。
查看源碼可以發現,ReentrantLock 實現 Lock 接口其實是調用 ReentrantLock.FairSync 或 ReentrantLock.NonfairSync 中各自的實現,這里不一一列舉。
ReentrantLock 的獲取鎖和釋放鎖
ReentrantLock 獲取鎖和釋放鎖的接口,從表象看,是調用 ReentrantLock.FairSync 或 ReentrantLock.NonfairSync 中各自的實現;從本質上看,是基于 AQS 的實現。
仔細閱讀源碼很容易發現:
-
void lock()調用 Sync 的 lock() 方法。 -
void lockInterruptibly()直接調用 AQS 的 獲取可中斷的獨占鎖 方法lockInterruptibly()。 -
boolean tryLock()調用 Sync 的nonfairTryAcquire()。 -
boolean tryLock(long time, TimeUnit unit)直接調用 AQS 的 獲取超時等待式的獨占鎖 方法tryAcquireNanos(int arg, long nanosTimeout)。 -
void unlock()直接調用 AQS 的 釋放獨占鎖 方法release(int arg)。
直接調用 AQS 接口的方法就不再贅述了,其原理在 [AQS 的原理](#AQS 的原理) 中已經用很大篇幅進行過講解。
nonfairTryAcquire 方法源碼如下:
// 公平鎖和非公平鎖都會用這個方法區嘗試獲取鎖
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
// 如果同步狀態為0,將其設為 acquires,并設置當前線程為排它線程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
處理流程很簡單:
- 如果同步狀態為 0,設置同步狀態設為 acquires,并設置當前線程為排它線程,然后返回 true,獲取鎖成功。
- 如果同步狀態不為 0 且當前線程為排它線程,設置同步狀態為當前狀態值+acquires 值,然后返回 true,獲取鎖成功。
- 否則,返回 false,獲取鎖失敗。
公平鎖和非公平鎖
ReentrantLock 這個類有兩個構造函數,一個是無參構造函數,一個是傳入 fair 參數的構造函數。fair 參數代表的是鎖的公平策略,如果傳入 true 就表示需要構造一個公平鎖,反之則表示要構造一個非公平鎖。
鎖都對應著一個等待隊列,如果一個線程沒有獲得鎖,就會進入等待隊列,當有線程釋放鎖的時候,就需要從等待隊列中喚醒一個等待的線程。如果是公平鎖,喚醒的策略就是誰等待的時間長,就喚醒誰,很公平;如果是非公平鎖,則不提供這個公平保證,有可能等待時間短的線程反而先被喚醒。
lock 方法在公平鎖和非公平鎖中的實現:
二者的區別僅在于申請非公平鎖時,如果同步狀態為 0,嘗試將其設為 1,如果成功,直接將當前線程置為排它線程;否則和公平鎖一樣,調用 AQS 獲取獨占鎖方法 acquire。
// 非公平鎖實現
final void lock() {
if (compareAndSetState(0, 1))
// 如果同步狀態為0,將其設為1,并設置當前線程為排它線程
setExclusiveOwnerThread(Thread.currentThread());
else
// 調用 AQS 獲取獨占鎖方法 acquire
acquire(1);
}
// 公平鎖實現
final void lock() {
// 調用 AQS 獲取獨占鎖方法 acquire
acquire(1);
}
4. ReentrantReadWriteLock
ReadWriteLock 適用于讀多寫少的場景。
ReentrantReadWriteLock 類是 ReadWriteLock 接口的具體實現,它是一個可重入的讀寫鎖。ReentrantReadWriteLock 維護了一對讀寫鎖,將讀寫鎖分開,有利于提高并發效率。
讀寫鎖,并不是 Java 語言特有的,而是一個廣為使用的通用技術,所有的讀寫鎖都遵守以下三條基本原則:
- 允許多個線程同時讀共享變量;
- 只允許一個線程寫共享變量;
- 如果一個寫線程正在執行寫操作,此時禁止讀線程讀共享變量。
讀寫鎖與互斥鎖的一個重要區別就是讀寫鎖允許多個線程同時讀共享變量,而互斥鎖是不允許的,這是讀寫鎖在讀多寫少場景下性能優于互斥鎖的關鍵。但讀寫鎖的寫操作是互斥的,當一個線程在寫共享變量的時候,是不允許其他線程執行寫操作和讀操作。
4.1. ReentrantReadWriteLock 的特性
ReentrantReadWriteLock 的特性如下:
ReentrantReadWriteLock適用于讀多寫少的場景。如果是寫多讀少的場景,由于ReentrantReadWriteLock其內部實現比ReentrantLock復雜,性能可能反而要差一些。如果存在這樣的問題,需要具體問題具體分析。由于ReentrantReadWriteLock的讀寫鎖(ReadLock、WriteLock)都實現了Lock接口,所以要替換為ReentrantLock也較為容易。ReentrantReadWriteLock實現了ReadWriteLock接口,支持了ReentrantLock所不具備的讀寫鎖分離。ReentrantReadWriteLock維護了一對讀寫鎖(ReadLock、WriteLock)。將讀寫鎖分開,有利于提高并發效率。ReentrantReadWriteLock的加鎖策略是:允許多個讀操作并發執行,但每次只允許一個寫操作。ReentrantReadWriteLock為讀寫鎖都提供了可重入的加鎖語義。ReentrantReadWriteLock支持公平鎖和非公平鎖(默認)兩種模式。
ReadWriteLock 接口定義如下:
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
readLock- 返回用于讀操作的鎖(ReadLock)。writeLock- 返回用于寫操作的鎖(WriteLock)。
在讀寫鎖和寫入鎖之間的交互可以采用多種實現方式,ReadWriteLock 的一些可選實現包括:
- 釋放優先 - 當一個寫入操作釋放寫鎖,并且隊列中同時存在讀線程和寫線程,那么應該優先選擇讀線程、寫線程,還是最先發出請求的線程?
- 讀線程插隊 - 如果鎖是由讀線程持有,但有寫線程正在等待,那么新到達的讀線程能否立即獲得訪問權,還是應該在寫線程后面等待?如果允許讀線程插隊到寫線程之前,那么將提高并發性,但可能造成線程饑餓問題。
- 重入性 - 讀鎖和寫鎖是否是可重入的?
- 降級 - 如果一個線程持有寫入鎖,那么它能否在不釋放該鎖的情況下獲得讀鎖?這可能會使得寫鎖被降級為讀鎖,同時不允許其他寫線程修改被保護的資源。
- 升級 - 讀鎖能否優先于其他正在等待的讀線程和寫線程而升級為一個寫鎖?在大多數的讀寫鎖實現中并不支持升級,因為如果沒有顯式的升級操作,那么很容易造成死鎖。
4.2. ReentrantReadWriteLock 的用法
前文了解了 ReentrantReadWriteLock 的特性,接下來,我們要講述其具體用法。
ReentrantReadWriteLock 的構造方法
ReentrantReadWriteLock 和 ReentrantLock 一樣,也有兩個構造方法,且用法相似。
public ReentrantReadWriteLock() {}
public ReentrantReadWriteLock(boolean fair) {}
ReentrantReadWriteLock()- 默認構造方法會初始化一個非公平鎖(NonfairSync)。在非公平的鎖中,線程獲得鎖的順序是不確定的。寫線程降級為讀線程是可以的,但讀線程升級為寫線程是不可以的(這樣會導致死鎖)。ReentrantReadWriteLock(boolean)-new ReentrantLock(true)會初始化一個公平鎖(FairSync)。對于公平鎖,等待時間最長的線程將優先獲得鎖。如果這個鎖是讀線程持有,則另一個線程請求寫鎖,那么其他讀線程都不能獲得讀鎖,直到寫線程釋放寫鎖。
ReentrantReadWriteLock 的使用實例
在 ReentrantReadWriteLock 的特性 中已經介紹過,ReentrantReadWriteLock 的讀寫鎖(ReadLock、WriteLock)都實現了 Lock 接口,所以其各自獨立的使用方式與 ReentrantLock 一樣,這里不再贅述。
ReentrantReadWriteLock 與 ReentrantLock 用法上的差異,主要在于讀寫鎖的配合使用。本文以一個典型使用場景來進行講解。
【示例】基于 ReadWriteLock 實現一個簡單的泛型無界緩存
/**
* 簡單的無界緩存實現
* <p>
* 使用 WeakHashMap 存儲鍵值對。WeakHashMap 中存儲的對象是弱引用,JVM GC 時會自動清除沒有被引用的弱引用對象。
*/
static class UnboundedCache<K, V> {
private final Map<K, V> cacheMap = new WeakHashMap<>();
private final ReadWriteLock cacheLock = new ReentrantReadWriteLock();
public V get(K key) {
cacheLock.readLock().lock();
V value;
try {
value = cacheMap.get(key);
String log = String.format("%s 讀數據 %s:%s", Thread.currentThread().getName(), key, value);
System.out.println(log);
} finally {
cacheLock.readLock().unlock();
}
return value;
}
public V put(K key, V value) {
cacheLock.writeLock().lock();
try {
cacheMap.put(key, value);
String log = String.format("%s 寫入數據 %s:%s", Thread.currentThread().getName(), key, value);
System.out.println(log);
} finally {
cacheLock.writeLock().unlock();
}
return value;
}
public V remove(K key) {
cacheLock.writeLock().lock();
try {
return cacheMap.remove(key);
} finally {
cacheLock.writeLock().unlock();
}
}
public void clear() {
cacheLock.writeLock().lock();
try {
this.cacheMap.clear();
} finally {
cacheLock.writeLock().unlock();
}
}
}
說明:
- 使用
WeakHashMap而不是HashMap來存儲鍵值對。WeakHashMap中存儲的對象是弱引用,JVM GC 時會自動清除沒有被引用的弱引用對象。 - 向
Map寫數據前加寫鎖,寫完后,釋放寫鎖。 - 向
Map讀數據前加讀鎖,讀完后,釋放讀鎖。
測試其線程安全性:
/**
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @since 2020-01-01
*/
public class ReentrantReadWriteLockDemo {
static UnboundedCache<Integer, Integer> cache = new UnboundedCache<>();
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
executorService.execute(new MyThread());
cache.get(0);
}
executorService.shutdown();
}
/** 線程任務每次向緩存中寫入 3 個隨機值,key 固定 */
static class MyThread implements Runnable {
@Override
public void run() {
Random random = new Random();
for (int i = 0; i < 3; i++) {
cache.put(i, random.nextInt(100));
}
}
}
}
說明:示例中,通過線程池啟動 20 個并發任務。任務每次向緩存中寫入 3 個隨機值,key 固定;然后主線程每次固定讀取緩存中第一個 key 的值。
輸出結果:
main 讀數據 0:null
pool-1-thread-1 寫入數據 0:16
pool-1-thread-1 寫入數據 1:58
pool-1-thread-1 寫入數據 2:50
main 讀數據 0:16
pool-1-thread-1 寫入數據 0:85
pool-1-thread-1 寫入數據 1:76
pool-1-thread-1 寫入數據 2:46
pool-1-thread-2 寫入數據 0:21
pool-1-thread-2 寫入數據 1:41
pool-1-thread-2 寫入數據 2:63
main 讀數據 0:21
main 讀數據 0:21
// ...
4.3. ReentrantReadWriteLock 的原理
前面了解了 ReentrantLock 的原理,理解 ReentrantReadWriteLock 就容易多了。
ReentrantReadWriteLock 的數據結構
閱讀 ReentrantReadWriteLock 的源碼,可以發現它有三個核心字段:
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
sync- 內部類ReentrantReadWriteLock.Sync對象。與ReentrantLock類似,它有兩個子類:ReentrantReadWriteLock.FairSync和ReentrantReadWriteLock.NonfairSync,分別表示公平鎖和非公平鎖的實現。readerLock- 內部類ReentrantReadWriteLock.ReadLock對象,這是一把讀鎖。writerLock- 內部類ReentrantReadWriteLock.WriteLock對象,這是一把寫鎖。
ReentrantReadWriteLock 的獲取鎖和釋放鎖
public static class ReadLock implements Lock, java.io.Serializable {
// 調用 AQS 獲取共享鎖方法
public void lock() {
sync.acquireShared(1);
}
// 調用 AQS 釋放共享鎖方法
public void unlock() {
sync.releaseShared(1);
}
}
public static class WriteLock implements Lock, java.io.Serializable {
// 調用 AQS 獲取獨占鎖方法
public void lock() {
sync.acquire(1);
}
// 調用 AQS 釋放獨占鎖方法
public void unlock() {
sync.release(1);
}
}
5. StampedLock
ReadWriteLock 支持兩種模式:一種是讀鎖,一種是寫鎖。而 StampedLock 支持三種模式,分別是:寫鎖、悲觀讀鎖和樂觀讀。其中,寫鎖、悲觀讀鎖的語義和 ReadWriteLock 的寫鎖、讀鎖的語義非常類似,允許多個線程同時獲取悲觀讀鎖,但是只允許一個線程獲取寫鎖,寫鎖和悲觀讀鎖是互斥的。不同的是:StampedLock 里的寫鎖和悲觀讀鎖加鎖成功之后,都會返回一個 stamp;然后解鎖的時候,需要傳入這個 stamp。
注意這里,用的是“樂觀讀”這個詞,而不是“樂觀讀鎖”,是要提醒你,樂觀讀這個操作是無鎖的,所以相比較 ReadWriteLock 的讀鎖,樂觀讀的性能更好一些。
StampedLock 的性能之所以比 ReadWriteLock 還要好,其關鍵是 StampedLock 支持樂觀讀的方式。
- ReadWriteLock 支持多個線程同時讀,但是當多個線程同時讀的時候,所有的寫操作會被阻塞;
- 而 StampedLock 提供的樂觀讀,是允許一個線程獲取寫鎖的,也就是說不是所有的寫操作都被阻塞。
對于讀多寫少的場景 StampedLock 性能很好,簡單的應用場景基本上可以替代 ReadWriteLock,但是StampedLock 的功能僅僅是 ReadWriteLock 的子集,在使用的時候,還是有幾個地方需要注意一下。
- StampedLock 不支持重入
- StampedLock 的悲觀讀鎖、寫鎖都不支持條件變量。
- 如果線程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上時,此時調用該阻塞線程的 interrupt() 方法,會導致 CPU 飆升。使用 StampedLock 一定不要調用中斷操作,如果需要支持中斷功能,一定使用可中斷的悲觀讀鎖 readLockInterruptibly() 和寫鎖 writeLockInterruptibly()。
【示例】StampedLock 阻塞時,調用 interrupt() 導致 CPU 飆升
final StampedLock lock
= new StampedLock();
Thread T1 = new Thread(()->{
// 獲取寫鎖
lock.writeLock();
// 永遠阻塞在此處,不釋放寫鎖
LockSupport.park();
});
T1.start();
// 保證 T1 獲取寫鎖
Thread.sleep(100);
Thread T2 = new Thread(()->
// 阻塞在悲觀讀鎖
lock.readLock()
);
T2.start();
// 保證 T2 阻塞在讀鎖
Thread.sleep(100);
// 中斷線程 T2
// 會導致線程 T2 所在 CPU 飆升
T2.interrupt();
T2.join();
【示例】StampedLock 讀模板:
final StampedLock sl =
new StampedLock();
// 樂觀讀
long stamp =
sl.tryOptimisticRead();
// 讀入方法局部變量
......
// 校驗 stamp
if (!sl.validate(stamp)){
// 升級為悲觀讀鎖
stamp = sl.readLock();
try {
// 讀入方法局部變量
.....
} finally {
// 釋放悲觀讀鎖
sl.unlockRead(stamp);
}
}
// 使用方法局部變量執行業務操作
......
【示例】StampedLock 寫模板:
long stamp = sl.writeLock();
try {
// 寫共享變量
......
} finally {
sl.unlockWrite(stamp);
}
6. AQS
AbstractQueuedSynchronizer(簡稱 AQS)是隊列同步器,顧名思義,其主要作用是處理同步。它是并發鎖和很多同步工具類的實現基石(如ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore、FutureTask等)。
6.1. AQS 的要點
AQS 提供了對獨享鎖與共享鎖的支持。
在 java.util.concurrent.locks 包中的相關鎖(常用的有 ReentrantLock、 ReadWriteLock)都是基于 AQS 來實現。這些鎖都沒有直接繼承 AQS,而是定義了一個 Sync 類去繼承 AQS。為什么要這樣呢?因為鎖面向的是使用用戶,而同步器面向的則是線程控制,那么在鎖的實現中聚合同步器而不是直接繼承 AQS 就可以很好的隔離二者所關注的事情。
6.2. AQS 的應用
AQS 提供了對獨享鎖與共享鎖的支持。
獨享鎖 API
獲取、釋放獨享鎖的主要 API 如下:
public final void acquire(int arg)
public final void acquireInterruptibly(int arg)
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
public final boolean release(int arg)
acquire- 獲取獨占鎖。acquireInterruptibly- 獲取可中斷的獨占鎖。tryAcquireNanos- 嘗試在指定時間內獲取可中斷的獨占鎖。在以下三種情況下回返回:- 在超時時間內,當前線程成功獲取了鎖;
- 當前線程在超時時間內被中斷;
- 超時時間結束,仍未獲得鎖返回 false。
release- 釋放獨占鎖。
共享鎖 API
獲取、釋放共享鎖的主要 API 如下:
public final void acquireShared(int arg)
public final void acquireSharedInterruptibly(int arg)
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
public final boolean releaseShared(int arg)
acquireShared- 獲取共享鎖。acquireSharedInterruptibly- 獲取可中斷的共享鎖。tryAcquireSharedNanos- 嘗試在指定時間內獲取可中斷的共享鎖。release- 釋放共享鎖。
6.3. AQS 的原理
ASQ 原理要點:
- AQS 使用一個整型的
volatile變量來 維護同步狀態。狀態的意義由子類賦予。- AQS 維護了一個 FIFO 的雙鏈表,用來存儲獲取鎖失敗的線程。
AQS 圍繞同步狀態提供兩種基本操作“獲取”和“釋放”,并提供一系列判斷和處理方法,簡單說幾點:
- state 是獨占的,還是共享的;
- state 被獲取后,其他線程需要等待;
- state 被釋放后,喚醒等待線程;
- 線程等不及時,如何退出等待。
至于線程是否可以獲得 state,如何釋放 state,就不是 AQS 關心的了,要由子類具體實現。
AQS 的數據結構
閱讀 AQS 的源碼,可以發現:AQS 繼承自 AbstractOwnableSynchronize。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/** 等待隊列的隊頭,懶加載。只能通過 setHead 方法修改。 */
private transient volatile Node head;
/** 等待隊列的隊尾,懶加載。只能通過 enq 方法添加新的等待節點。*/
private transient volatile Node tail;
/** 同步狀態 */
private volatile int state;
}
state- AQS 使用一個整型的volatile變量來 維護同步狀態。- 這個整數狀態的意義由子類來賦予,如
ReentrantLock中該狀態值表示所有者線程已經重復獲取該鎖的次數,Semaphore中該狀態值表示剩余的許可數量。
- 這個整數狀態的意義由子類來賦予,如
head和tail- AQS 維護了一個Node類型(AQS 的內部類)的雙鏈表來完成同步狀態的管理。這個雙鏈表是一個雙向的 FIFO 隊列,通過head和tail指針進行訪問。當 有線程獲取鎖失敗后,就被添加到隊列末尾。

再來看一下 Node 的源碼
static final class Node {
/** 該等待同步的節點處于共享模式 */
static final Node SHARED = new Node();
/** 該等待同步的節點處于獨占模式 */
static final Node EXCLUSIVE = null;
/** 線程等待狀態,狀態值有: 0、1、-1、-2、-3 */
volatile int waitStatus;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/** 前驅節點 */
volatile Node prev;
/** 后繼節點 */
volatile Node next;
/** 等待鎖的線程 */
volatile Thread thread;
/** 和節點是否共享有關 */
Node nextWaiter;
}
很顯然,Node 是一個雙鏈表結構。
waitStatus-Node使用一個整型的volatile變量來 維護 AQS 同步隊列中線程節點的狀態。waitStatus有五個狀態值:CANCELLED(1)- 此狀態表示:該節點的線程可能由于超時或被中斷而 處于被取消(作廢)狀態,一旦處于這個狀態,表示這個節點應該從等待隊列中移除。SIGNAL(-1)- 此狀態表示:后繼節點會被掛起,因此在當前節點釋放鎖或被取消之后,必須喚醒(unparking)其后繼結點。CONDITION(-2)- 此狀態表示:該節點的線程 處于等待條件狀態,不會被當作是同步隊列上的節點,直到被喚醒(signal),設置其值為 0,再重新進入阻塞狀態。PROPAGATE(-3)- 此狀態表示:下一個acquireShared應無條件傳播。- 0 - 非以上狀態。
獨占鎖的獲取和釋放
獲取獨占鎖
AQS 中使用 acquire(int arg) 方法獲取獨占鎖,其大致流程如下:
- 先嘗試獲取同步狀態,如果獲取同步狀態成功,則結束方法,直接返回。
- 如果獲取同步狀態不成功,AQS 會不斷嘗試利用 CAS 操作將當前線程插入等待同步隊列的隊尾,直到成功為止。
- 接著,不斷嘗試為等待隊列中的線程節點獲取獨占鎖。


詳細流程可以用下圖來表示,請結合源碼來理解(一圖勝千言):

釋放獨占鎖
AQS 中使用 release(int arg) 方法釋放獨占鎖,其大致流程如下:
- 先嘗試獲取解鎖線程的同步狀態,如果獲取同步狀態不成功,則結束方法,直接返回。
- 如果獲取同步狀態成功,AQS 會嘗試喚醒當前線程節點的后繼節點。
獲取可中斷的獨占鎖
AQS 中使用 acquireInterruptibly(int arg) 方法獲取可中斷的獨占鎖。
acquireInterruptibly(int arg) 實現方式相較于獲取獨占鎖方法( acquire)非常相似,區別僅在于它會通過 Thread.interrupted 檢測當前線程是否被中斷,如果是,則立即拋出中斷異常(InterruptedException)。
獲取超時等待式的獨占鎖
AQS 中使用 tryAcquireNanos(int arg) 方法獲取超時等待的獨占鎖。
doAcquireNanos 的實現方式 相較于獲取獨占鎖方法( acquire)非常相似,區別在于它會根據超時時間和當前時間計算出截止時間。在獲取鎖的流程中,會不斷判斷是否超時,如果超時,直接返回 false;如果沒超時,則用 LockSupport.parkNanos 來阻塞當前線程。
共享鎖的獲取和釋放
獲取共享鎖
AQS 中使用 acquireShared(int arg) 方法獲取共享鎖。
acquireShared 方法和 acquire 方法的邏輯很相似,區別僅在于自旋的條件以及節點出隊的操作有所不同。
成功獲得共享鎖的條件如下:
tryAcquireShared(arg)返回值大于等于 0 (這意味著共享鎖的 permit 還沒有用完)。- 當前節點的前驅節點是頭結點。
釋放共享鎖
AQS 中使用 releaseShared(int arg) 方法釋放共享鎖。
releaseShared 首先會嘗試釋放同步狀態,如果成功,則解鎖一個或多個后繼線程節點。釋放共享鎖和釋放獨享鎖流程大體相似,區別在于:
對于獨享模式,如果需要 SIGNAL,釋放僅相當于調用頭節點的 unparkSuccessor。
獲取可中斷的共享鎖
AQS 中使用 acquireSharedInterruptibly(int arg) 方法獲取可中斷的共享鎖。
acquireSharedInterruptibly 方法與 acquireInterruptibly 幾乎一致,不再贅述。
獲取超時等待式的共享鎖
AQS 中使用 tryAcquireSharedNanos(int arg) 方法獲取超時等待式的共享鎖。
tryAcquireSharedNanos 方法與 tryAcquireNanos 幾乎一致,不再贅述。
7. 死鎖
7.1. 什么是死鎖
死鎖是一種特定的程序狀態,在實體之間,由于循環依賴導致彼此一直處于等待之中,沒有任何個體可以繼續前進。死鎖不僅僅是在線程之間會發生,存在資源獨占的進程之間同樣也
可能出現死鎖。通常來說,我們大多是聚焦在多線程場景中的死鎖,指兩個或多個線程之間,由于互相持有對方需要的鎖,而永久處于阻塞的狀態。
7.2. 如何定位死鎖
定位死鎖最常見的方式就是利用 jstack 等工具獲取線程棧,然后定位互相之間的依賴關系,進而找到死鎖。如果是比較明顯的死鎖,往往 jstack 等就能直接定位,類似 JConsole 甚至可以在圖形界面進行有限的死鎖檢測。
如果我們是開發自己的管理工具,需要用更加程序化的方式掃描服務進程、定位死鎖,可以考慮使用 Java 提供的標準管理 API,ThreadMXBean,其直接就提供了 findDeadlockedThreads() 方法用于定位。
7.3. 如何避免死鎖
基本上死鎖的發生是因為:
- 互斥,類似 Java 中 Monitor 都是獨占的。
- 長期保持互斥,在使用結束之前,不會釋放,也不能被其他線程搶占。
- 循環依賴,多個個體之間出現了鎖的循環依賴,彼此依賴上一環釋放鎖。
由此,我們可以分析出避免死鎖的思路和方法。
(1)避免一個線程同時獲取多個鎖。
避免一個線程在鎖內同時占用多個資源,盡量保證每個鎖只占用一個資源。
嘗試使用定時鎖 lock.tryLock(timeout),避免鎖一直不能釋放。
對于數據庫鎖,加鎖和解鎖必須在一個數據庫連接中里,否則會出現解鎖失敗的情況。
8. 參考資料
關注公眾號:java寶典

浙公網安備 33010602011771號