多線程與高并發(四)—— 根據 HotSpot 源碼講透 Java 中斷機制
前言
我們首先介紹中斷的三個 APPI 及其底層代碼,在對方法的實現有了清晰的認知后,再結合場景談談什么是中斷,以及中斷該如何正確使用?
一、中斷方法
1. isInterrupted
public boolean isInterrupted() {
// 調用isInterrupted 方法,中斷標記設置為 true
return isInterrupted(false);
}
這個方法很簡單,就是返回當前線程的中斷標記值,這個方法是由 native 方法實現。
/**
* Tests if some Thread has been interrupted. The interrupted state
* is reset or not based on the value of ClearInterrupted that is
* passed.
*/
private native boolean isInterrupted(boolean ClearInterrupted);
根據 Thread 類找到對應路徑下 Thread.c,可以看到該方法的底層實現方法JVM_IsInterrupted:

在 hotspot 源碼 jvm.c 文件中,可以看到 JVM_IsInterrupted 的底層實現依賴于操作系統的 is_interrupted 方法,我們就看 os_linux 的實現:

該方法中的邏輯也很簡單,就是返回了操作系統的線程中斷狀態,如果清除標記為 true,那么就重置中斷標記
bool os::is_interrupted(Thread* thread, bool clear_interrupted) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
// 拿到對應的操作系統線程
OSThread* osthread = thread->osthread();
// 獲取該線程的中斷狀態
bool interrupted = osthread->interrupted();
// 如果中斷狀態為true 并且需要清除中斷標記,那么將中斷標記重置為 false
if (interrupted && clear_interrupted) {
osthread->set_interrupted(false);
// consider thread->_SleepEvent->reset() ... optional optimization
}
// 返回中斷標記
return interrupted;
}
2. interrupted()
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
看到這個方法可以發現,它和 isinterrupted() 的實現都是調用 isInterrupted 方法,只是參數不一樣,interrupted 的參數為 true,這個參數的含義就是是否要清除中斷標記。因此該方法的作用是返回當前線程的中斷標記并且重置中斷標記。
3. interrupt()
public void interrupt() {
// 檢查調用方線程是否有對被調用線程的修改權
if (this != Thread.currentThread())
checkAccess();
// 中斷網絡 IO
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
// 調用 native 方法
interrupt0();
}
先忽略中斷網絡 IO 這塊的代碼,下面會講,可以看到這個方法最終依賴 native 方法 interrupt0(),關注其底層實現,可以發現該中斷方法主要做了兩件事情:
- 設置中斷標記
- 喚醒當前線程
void os::interrupt(Thread* thread) {
// 確保執行該方法的線程是當前線程
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
if (!osthread->interrupted()) {
// 設置中斷標記
osthread->set_interrupted(true);
// More than one thread can get here with the same value of osthread,
// resulting in multiple notifications. We do, however, want the store
// to interrupted() to be visible to other threads before we execute unpark().
// 屏障指令,保證在執行 unpark() 之前對其他線程可見
OrderAccess::fence();
// 喚醒 sleep() 阻塞的線程
ParkEvent * const slp = thread->_SleepEvent ;
if (slp != NULL) slp->unpark() ;
}
// 喚醒 park() 阻塞的線程
// For JSR166. Unpark even if interrupt status already was set 只有中斷狀態被設置了之后才能執行 unpark
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();
// 線程被wait()方法阻塞的線程
ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL) ev->unpark() ;
}
根據上述源碼可以發現 Java 中的中斷并沒有中斷任務的功能,所提供的僅僅是設置標記以及喚醒線程,對應的也就是下述兩種應用場景,如要中斷任務需要開發者根據打斷標記自行編碼打斷任務,如要阻止線程繼續阻塞,則喚醒線程。
注意該方法上的注釋(中文翻譯),在執行 interrupt() 方法時,其產生的結果因不同場景而不同,具體如下:
- 線程因調用 object.wait、Thread.join、Thread.sleep 方法阻塞,將會拋出InterruptedException,同時清除線程的中斷狀態;
- 線程阻塞在 java.nio.channels.InterruptibleChannel 的 IO 上,Channel 將會被關閉,線程被置為中斷狀態,并拋出 java.nio.channels.ClosedByInterruptException;
- 如果線程堵塞在 java.nio.channels.Selector 上,線程被置為中斷狀態,select方法會馬上返回,類似調用wakeup的效果;
- 如果線程處于 not alive(線程剛被 new 出來沒有運行,或者已經死亡) 的狀態則毫無影響
二、什么是中斷?
中斷,顧名思義是對線程的當前狀態的改變,使其恢復到原有狀態,具體如何改變狀態依據不同的情景有著不同的結果。在對 Java 的中斷能不能真正的中斷線程這個問題上,不少博客各執一詞,最后我發現大家是在雞同鴨講。能否中斷線程因線程狀態而異,具體如下:
中斷任務的執行
在某些時候,如生產者/消費者模式下,線程循環執行任務,如何讓線程停下來?
在 Java 中沒有提供停止線程的操作(stop()方法會導致錯誤,已被官方標記過時),Java 提供 interrupt() 方法設置中斷標記,但是如何停止線程卻是開發者自己決定的事情。
上面源碼講述已經提到 interrupt() 方法并沒有提供打斷線程的機制(如下圖,線程不會停止運行),要實現運行中的線程中斷,需要調用 interrupt() 修改線程的中斷標記,然后需要在被調用線程中自己實現邏輯,通常的做法是在合適的位置(例如在while 循環處)不斷檢查此線程的中斷標記是否被設置,在檢測到中斷標記為 true 的地方再停止線程(使用方式可搜索兩階段終止提交,此文不涉及應用)。

中斷線程的阻塞狀態
如下圖是中斷線程的阻塞狀態,在線程檢查到中斷標記為true 的時候會在這些阻塞方法調用處拋出 InterruptedException , 并且清除中斷標記。

并不是所有阻塞態的線程都能被中斷,Synchronized 不支持鎖中斷,見下文。
三、其他
關于 Synchronized 不可中斷
先看以下場景,T2 和 T1 競爭鎖,在 T1 因獲取鎖阻塞的時候去打斷,看結果如何。
/**
* description
*
* @author greysonchance
* @since 2022/8/1
*/
@Slf4j
public class TestSynchronized {
public static void main(String[] args) {
Object lock = new Object();
Thread t1 = new Thread(() -> {
synchronized (lock) {
log.info("線程 T1 拿到鎖, 開始執行");
while (!Thread.currentThread().isInterrupted()) {
}
}
log.info("線程 T1 退出");
}, "T1");
Thread t2 = new Thread(() -> {
synchronized (lock) {
log.info("線程 T2 拿到鎖, 開始執行");
t1.start();
try {
// 保證 T1 線程已經開始獲取鎖進入阻塞態
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("T2 輸出線程 T1 的狀態是:{}", t1.getState());
// 死循環不釋放鎖,讓 T1 一直處于 Blocked 態
while (true) {
}
}
}, "T2");
t2.start();
log.info("開始中斷線程 T1");
t1.interrupt();
try {
// 等待一會看狀態是否被修改
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("Main 輸出線程 T1 的狀態是:{}", t1.getState());
}
}
如圖,線程 T2 啟動并且先獲取到鎖, 然后 t1.start() 啟動 T1 線程,但是 T1 因獲取不到鎖阻塞,此時執行 interrupt 方法并不能中斷 T1,線程仍然處于 Blocked 態。

這是因為線程 T1 因獲取不到鎖,阻塞在 Monitor 的隊列中,interrupt() 方法并不能將該線程從隊列中移出。

再看下面場景,線程獲取鎖之后調用了 wait() 方法,然后再調用 interrupt() 方法,線程被中斷成功
/**
* description
*
* @author greysonchance
* @since 2022/8/1
*/
@Slf4j
public class TestSynchronized2 {
public static void main(String[] args) throws InterruptedException {
Object lock = new Object();
Thread t1 = new Thread(() -> {
synchronized (lock) {
log.info("線程 T1 拿到鎖, 開始執行");
for (int i = 0; i < 3; i++) {
System.out.println("執行任務");
}
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.info("線程 T1 退出");
}, "T1");
t1.start();
Thread.sleep(1000);
t1.interrupt();
Thread.sleep(1000);
log.info("線程 T1 狀態:{}", t1.getState());
}
}

綜上,Synchronized 的不可中斷指的是當線程因獲取鎖失敗阻塞在隊列中時(狀態為 BLOCKED)不可被打斷,如果已經獲取了鎖調用 sleep()\wait() 等方法而阻塞(狀態為 WAITED或TIME_WAITING)是可以被打斷的。
interrupt() 與 park() 對 unpark() 的影響
interrupt() 在喚醒被 unpark() 阻塞的線程時會修改中斷標記為true,而 park() 喚醒不會。
@Slf4j
public class TestUnpark {
public static void main(String[] args) {
Thread t0 = new Thread(new Runnable() {
@Override
public void run() {
Thread current = Thread.currentThread();
log.info("current Thread name:{}", current.getName());
log.info("準備park當前線程:{}", current.getName());
log.info("當前線程的中斷標記:{}",current.isInterrupted());
LockSupport.park();
log.info("線程{}阻斷后又運行了", current.getName());
log.info("當前線程被喚醒后的中斷標記:{}",current.isInterrupted());
}
}, "t0");
t0.start();
try {
log.info("休眠…………");
Thread.sleep(2000);
// log.info("調用LockSupport.unpark方法,喚醒線程{}", t0.getName());
// LockSupport.unpark(t0);
log.info("調用interrupt方法,喚醒線程{}", t0.getName());
t0.interrupt();
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
interrupt() 的實驗結果:

unpark() 的實驗結果:


浙公網安備 33010602011771號