【轉(zhuǎn)】-CountDownLatch詳解
CountDownLatch詳解
該博客轉(zhuǎn)載自?愛寶貝丶 的?CountDownLatch詳解
1. 簡介
CountDownLatch中count down是倒數(shù)的意思,latch則是門閂的含義。整體含義可以理解為倒數(shù)的門栓,似乎有一點(diǎn)“三二一,芝麻開門”的感覺。CountDownLatch的作用也是如此,在構(gòu)造CountDownLatch的時(shí)候需要傳入一個(gè)整數(shù)n,在這個(gè)整數(shù)“倒數(shù)”到0之前,主線程需要等待在門口,而這個(gè)“倒數(shù)”過程則是由各個(gè)執(zhí)行線程驅(qū)動(dòng)的,每個(gè)線程執(zhí)行完一個(gè)任務(wù)“倒數(shù)”一次。總結(jié)來說,CountDownLatch的作用就是等待其他的線程都執(zhí)行完任務(wù),必要時(shí)可以對各個(gè)任務(wù)的執(zhí)行結(jié)果進(jìn)行匯總,然后主線程才繼續(xù)往下執(zhí)行。
2. 使用方法
CountDownLatch主要有兩個(gè)方法:countDown()和await()。countDown()方法用于使計(jì)數(shù)器減一,其一般是執(zhí)行任務(wù)的線程調(diào)用,await()方法則使調(diào)用該方法的線程處于等待狀態(tài),其一般是主線程調(diào)用。這里需要注意的是,countDown()方法并沒有規(guī)定一個(gè)線程只能調(diào)用一次,當(dāng)同一個(gè)線程調(diào)用多次countDown()方法時(shí),每次都會(huì)使計(jì)數(shù)器減一;另外,await()方法也并沒有規(guī)定只能有一個(gè)線程執(zhí)行該方法,如果多個(gè)線程同時(shí)執(zhí)行await()方法,那么這幾個(gè)線程都將處于等待狀態(tài),并且以共享模式享有同一個(gè)鎖。如下是其使用示例:
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
Service service = new Service(latch);
Runnable task = () -> service.exec();
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(task);
thread.start();
}
System.out.println("main thread await. ");
latch.await();
System.out.println("main thread finishes await. ");
}
}
public class Service {
private CountDownLatch latch;
public Service(CountDownLatch latch) {
this.latch = latch;
}
public void exec() {
try {
System.out.println(Thread.currentThread().getName() + " execute task. ");
sleep(2);
System.out.println(Thread.currentThread().getName() + " finished task. ");
} finally {
latch.countDown();
}
}
private void sleep(int seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
? 在上面的例子中,首先聲明了一個(gè)CountDownLatch對象,并且由主線程創(chuàng)建了5個(gè)線程,分別執(zhí)行任務(wù),在每個(gè)任務(wù)中,當(dāng)前線程會(huì)休眠2秒。在啟動(dòng)線程之后,主線程調(diào)用了CountDownLatch.await()方法,此時(shí),主線程將在此處等待創(chuàng)建的5個(gè)線程執(zhí)行完任務(wù)之后才繼續(xù)往下執(zhí)行。如下是執(zhí)行結(jié)果:
Thread-0 execute task.
Thread-1 execute task.
Thread-2 execute task.
Thread-3 execute task.
Thread-4 execute task.
main thread await.
Thread-0 finished task.
Thread-4 finished task.
Thread-3 finished task.
Thread-1 finished task.
Thread-2 finished task.
main thread finishes await.
? 從輸出結(jié)果可以看出,主線程先啟動(dòng)了五個(gè)線程,然后主線程進(jìn)入等待狀態(tài),當(dāng)這五個(gè)線程都執(zhí)行完任務(wù)之后主線程才結(jié)束了等待。上述代碼中需要注意的是,在執(zhí)行任務(wù)的線程中,使用了try...finally結(jié)構(gòu),該結(jié)構(gòu)可以保證創(chuàng)建的線程發(fā)生異常時(shí)CountDownLatch.countDown()方法也會(huì)執(zhí)行,也就保證了主線程不會(huì)一直處于等待狀態(tài)。
3. 使用場景
? CountDownLatch非常適合于對任務(wù)進(jìn)行拆分,使其并行執(zhí)行,比如某個(gè)任務(wù)執(zhí)行2s,其對數(shù)據(jù)的請求可以分為五個(gè)部分,那么就可以將這個(gè)任務(wù)拆分為5個(gè)子任務(wù),分別交由五個(gè)線程執(zhí)行,執(zhí)行完成之后再由主線程進(jìn)行匯總,此時(shí),總的執(zhí)行時(shí)間將決定于執(zhí)行最慢的任務(wù),平均來看,還是大大減少了總的執(zhí)行時(shí)間。
? 另外一種比較合適使用CountDownLatch的地方是使用某些外部鏈接請求數(shù)據(jù)的時(shí)候,比如圖片。在本人所從事的項(xiàng)目中就有類似的情況,因?yàn)槲覀兪褂玫膱D片服務(wù)只提供了獲取單個(gè)圖片的功能,而每次獲取圖片的時(shí)間不等,一般都需要1.5s~2s。當(dāng)我們需要批量獲取圖片的時(shí)候,比如列表頁需要展示一系列的圖片,如果使用單個(gè)線程順序獲取,那么等待時(shí)間將會(huì)極長,此時(shí)我們就可以使用CountDownLatch對獲取圖片的操作進(jìn)行拆分,并行的獲取圖片,這樣也就縮短了總的獲取時(shí)間。
4. 原理剖析
? CountDownLatch是基于AbstractQueuedSynchronizer實(shí)現(xiàn)的,在AbstractQueuedSynchronizer中維護(hù)了一個(gè)volatile類型的整數(shù)state,volatile可以保證多線程環(huán)境下該變量的修改對每個(gè)線程都可見,并且由于該屬性為整型,因而對該變量的修改也是原子的。創(chuàng)建一個(gè)CountDownLatch對象時(shí),所傳入的整數(shù)n就會(huì)賦值給state屬性,當(dāng)countDown()方法調(diào)用時(shí),該線程就會(huì)嘗試對state減一,而調(diào)用await()方法時(shí),當(dāng)前線程就會(huì)判斷state屬性是否為0,如果為0,則繼續(xù)往下執(zhí)行,如果不為0,則使當(dāng)前線程進(jìn)入等待狀態(tài),直到某個(gè)線程將state屬性置為0,其就會(huì)喚醒在await()方法中等待的線程。如下是countDown()方法的源代碼:
public void countDown() {
sync.releaseShared(1);
}
? 這里sync也即一個(gè)繼承了AbstractQueuedSynchronizer的類實(shí)例,該類是CountDownLatch的一個(gè)內(nèi)部類,其聲明如下:
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState(); // 獲取當(dāng)前state屬性的值
if (c == 0) // 如果state為0,則說明當(dāng)前計(jì)數(shù)器已經(jīng)計(jì)數(shù)完成,直接返回
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc)) // 使用CAS算法對state進(jìn)行設(shè)置
return nextc == 0; // 設(shè)置成功后返回當(dāng)前是否為最后一個(gè)設(shè)置state的線程
}
}
}
? 這里tryReleaseShared(int)方法即對state屬性進(jìn)行減一操作的代碼。可以看到,CAS也即compare and set的縮寫,jvm會(huì)保證該方法的原子性,其會(huì)比較state是否為c,如果是則將其設(shè)置為nextc(自減1),如果state不為c,則說明有另外的線程在getState()方法和compareAndSetState()方法調(diào)用之間對state進(jìn)行了設(shè)置,當(dāng)前線程也就沒有成功設(shè)置state屬性的值,其會(huì)進(jìn)入下一次循環(huán)中,如此往復(fù),直至其成功設(shè)置state屬性的值,即countDown()方法調(diào)用成功。
? 在countDown()方法中調(diào)用的sync.releaseShared(1)調(diào)用時(shí)實(shí)際還是調(diào)用的tryReleaseShared(int)方法,如下是releaseShared(int)方法的實(shí)現(xiàn):
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
? 可以看到,在執(zhí)行sync.releaseShared(1)方法時(shí),其在調(diào)用tryReleaseShared(int)方法時(shí)會(huì)在無限for循環(huán)中設(shè)置state屬性的值,設(shè)置成功之后其會(huì)根據(jù)設(shè)置的返回值(此時(shí)state已經(jīng)自減了一),即當(dāng)前線程是否為將state屬性設(shè)置為0的線程,來判斷是否執(zhí)行if塊中的代碼。doReleaseShared()方法主要作用是喚醒調(diào)用了await()方法的線程。需要注意的是,如果有多個(gè)線程調(diào)用了await()方法,這些線程都是以共享的方式等待在await()方法處的,試想,如果以獨(dú)占的方式等待,那么當(dāng)計(jì)數(shù)器減少至零時(shí),就只有一個(gè)線程會(huì)被喚醒執(zhí)行await()之后的代碼,這顯然不符合邏輯。如下是doReleaseShared()方法的實(shí)現(xiàn)代碼:
private void doReleaseShared() {
for (;;) {
Node h = head; // 記錄等待隊(duì)列中的頭結(jié)點(diǎn)的線程
if (h != null && h != tail) { // 頭結(jié)點(diǎn)不為空,且頭結(jié)點(diǎn)不等于尾節(jié)點(diǎn)
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // SIGNAL狀態(tài)表示當(dāng)前節(jié)點(diǎn)正在等待被喚醒
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 清除當(dāng)前節(jié)點(diǎn)的等待狀態(tài)
continue;
unparkSuccessor(h); // 喚醒當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head) // 如果h還是指向頭結(jié)點(diǎn),說明前面這段代碼執(zhí)行過程中沒有其他線程對頭結(jié)點(diǎn)進(jìn)行過處理
break;
}
}
? 在doReleaseShared()方法中(始終注意當(dāng)前方法是最后一個(gè)執(zhí)行countDown()方法的線程執(zhí)行的),首先判斷頭結(jié)點(diǎn)不為空,且不為尾節(jié)點(diǎn),說明等待隊(duì)列中有等待喚醒的線程,這里需要說明的是,在等待隊(duì)列中,頭節(jié)點(diǎn)中并沒有保存正在等待的線程,其只是一個(gè)空的Node對象,真正等待的線程是從頭節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)開始存放的,因而會(huì)有對頭結(jié)點(diǎn)是否等于尾節(jié)點(diǎn)的判斷。在判斷等待隊(duì)列中有正在等待的線程之后,其會(huì)清除頭結(jié)點(diǎn)的狀態(tài)信息,并且調(diào)用unparkSuccessor(Node)方法喚醒頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),使其繼續(xù)往下執(zhí)行。如下是unparkSuccessor(Node)方法的具體實(shí)現(xiàn):
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 清除當(dāng)前節(jié)點(diǎn)的等待狀態(tài)
Node s = node.next;
if (s == null || s.waitStatus > 0) { // s的等待狀態(tài)大于0說明該節(jié)點(diǎn)中的線程已經(jīng)被外部取消等待了
s = null;
// 從隊(duì)列尾部往前遍歷,找到最后一個(gè)處于等待狀態(tài)的節(jié)點(diǎn),用s記錄下來
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 喚醒離傳入節(jié)點(diǎn)最近的處于等待狀態(tài)的節(jié)點(diǎn)線程
}
? 可以看到,unparkSuccessor(Node)方法的作用是喚醒離傳入節(jié)點(diǎn)最近的一個(gè)處于等待狀態(tài)的線程,使其繼續(xù)往下執(zhí)行。前面我們講到過,等待隊(duì)列中的線程可能有多個(gè),而調(diào)用countDown()方法的線程只喚醒了一個(gè)處于等待狀態(tài)的線程,這里剩下的等待線程是如何被喚醒的呢?其實(shí)這些線程是被當(dāng)前喚醒的線程喚醒的。具體的我們可以看看await()方法的具體執(zhí)行過程。如下是await()方法的代碼:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
? await()方法實(shí)際還是調(diào)用了Sync對象的方法acquireSharedInterruptibly(int)方法,如下是該方法的具體實(shí)現(xiàn):
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
? 可以看到acquireSharedInterruptibly(int)方法判斷當(dāng)前線程是否需要以共享狀態(tài)獲取執(zhí)行權(quán)限,這里tryAcquireShared(int)方法是AbstractQueuedSynchronizer中的一個(gè)模板方法,其具體實(shí)現(xiàn)在前面的Sync類中,可以看到,其主要是判斷state是否為零,如果為零則返回1,表示當(dāng)前線程不需要進(jìn)行權(quán)限獲取,可直接執(zhí)行后續(xù)代碼,返回-1則表示當(dāng)前線程需要進(jìn)行共享權(quán)限。具體的獲取執(zhí)行權(quán)限的代碼在doAcquireSharedInterruptibly(int)方法中,如下是該方法的具體實(shí)現(xiàn):
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 使用當(dāng)前線程創(chuàng)建一個(gè)共享模式的節(jié)點(diǎn)
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor(); // 獲取當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)
if (p == head) { // 判斷前一個(gè)節(jié)點(diǎn)是否為頭結(jié)點(diǎn)
int r = tryAcquireShared(arg); // 查看當(dāng)前線程是否獲取到了執(zhí)行權(quán)限
if (r >= 0) { // 大于0表示獲取了執(zhí)行權(quán)限
setHeadAndPropagate(node, r); // 將當(dāng)前節(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn),并且喚醒后面處于等待狀態(tài)的節(jié)點(diǎn)
p.next = null; // help GC
failed = false;
return;
}
}
// 走到這一步說明沒有獲取到執(zhí)行權(quán)限,就使當(dāng)前線程進(jìn)入“擱置”狀態(tài)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
? 在doAcquireSharedInterruptibly(int)方法中,首先使用當(dāng)前線程創(chuàng)建一個(gè)共享模式的節(jié)點(diǎn)。然后在一個(gè)for循環(huán)中判斷當(dāng)前線程是否獲取到執(zhí)行權(quán)限,如果有(r >= 0判斷)則將當(dāng)前節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn),并且喚醒后續(xù)處于共享模式的節(jié)點(diǎn);如果沒有,則對調(diào)用shouldParkAfterFailedAcquire(Node, Node)和parkAndCheckInterrupt()方法使當(dāng)前線程處于“擱置”狀態(tài),該“擱置”狀態(tài)是由操作系統(tǒng)進(jìn)行的,這樣可以避免該線程無限循環(huán)而獲取不到執(zhí)行權(quán)限,造成資源浪費(fèi),這里也就是線程處于等待狀態(tài)的位置,也就是說當(dāng)線程被阻塞的時(shí)候就是阻塞在這個(gè)位置。當(dāng)有多個(gè)線程調(diào)用await()方法而進(jìn)入等待狀態(tài)時(shí),這幾個(gè)線程都將等待在此處。這里回過頭來看前面將的countDown()方法,其會(huì)喚醒處于等待隊(duì)列中離頭節(jié)點(diǎn)最近的一個(gè)處于等待狀態(tài)的線程,也就是說該線程被喚醒之后會(huì)繼續(xù)從這個(gè)位置開始往下執(zhí)行,此時(shí)執(zhí)行到tryAcquireShared(int)方法時(shí),發(fā)現(xiàn)r大于0(因?yàn)閟tate已經(jīng)被置為0了),該線程就會(huì)調(diào)用setHeadAndPropagate(Node, int)方法,并且退出當(dāng)前循環(huán),也就開始執(zhí)行awat()方法之后的代碼。這里我們看看setHeadAndPropagate(Node, int)方法的具體實(shí)現(xiàn):
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node); // 將當(dāng)前節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)
// 檢查喚醒過程是否需要往下傳遞,并且檢查頭結(jié)點(diǎn)的等待狀態(tài)
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared()) // 如果下一個(gè)節(jié)點(diǎn)是嘗試以共享狀態(tài)獲取獲取執(zhí)行權(quán)限的節(jié)點(diǎn),則將其喚醒
doReleaseShared();
}
}
? setHeadAndPropagate(Node, int)方法主要作用是設(shè)置當(dāng)前節(jié)點(diǎn)為頭結(jié)點(diǎn),并且將喚醒工作往下傳遞,在傳遞的過程中,其會(huì)判斷被傳遞的節(jié)點(diǎn)是否是以共享模式嘗試獲取執(zhí)行權(quán)限的,如果不是,則傳遞到該節(jié)點(diǎn)處為止(一般情況下,等待隊(duì)列中都只會(huì)都是處于共享模式或者處于獨(dú)占模式的節(jié)點(diǎn))。也就是說,頭結(jié)點(diǎn)會(huì)依次喚醒后續(xù)處于共享狀態(tài)的節(jié)點(diǎn),這也就是共享鎖與獨(dú)占鎖的實(shí)現(xiàn)方式。這里doReleaseShared()方法也就是我們前面講到的會(huì)將離頭結(jié)點(diǎn)最近的一個(gè)處于等待狀態(tài)的節(jié)點(diǎn)喚醒的方法。

浙公網(wǎng)安備 33010602011771號(hào)