初步了解Fork/Join框架
框架介紹
Fork/Join框架是Java 7提供的一個用于并行執行任務的框架,是一個把大任務分割成若干個子任務,最終匯總每個子任務的執行結果以得到大任務結果的框架。Fork/Join框架要完成兩件事情:
任務分割:Fork/Join框架需要把任務分割成足夠小的子任務,如果子任務比較大,就對子任務繼續分割;
執行任務并合并結果:分割的子任務分別放到雙端隊列里,然后幾個啟動線程分別從雙端隊列里獲取任務執行。子任務執行完的結果都放在另外一個隊列里,啟動一個線程從隊列里取數據,然后合并這些數據。
簡單的說,ForkJoin其核心思想就是分治。Fork分解任務,Join收集數據。

圖1 Fork/Join工作模型
任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中,進入隊列的頭部。當一個工作線程的隊列里暫時沒有任務時,它會隨機從其他工作線程的隊列的尾部獲取一個任務(工作竊取算法)。工作竊取算法充分利用線程進行并行計算,減少了線程間的競爭。
子任務中應該避免使用synchronized關鍵詞或其它方式的同步。也不應該是一阻塞IO或過多的訪問共享變量。在理想情況下,每個子任務的實現過程中都應該只進行CPU相關的計算,并且只適用每個問題的內部對象。唯一的同步應該只發生在子任務和創建它的父問題之間。

在Java的Fork/Join框架中,使用兩個類ForkJoinPool和ForkJoinTask完成上述操作。ForkJoinPool是ExecutorService的實現類,因此是一種特殊的線程池,使用方法與Executor框架類似。而ForkJoinTask類提供了在任務中執行fork和join的機制,需要通過ForkJoinPool來執行。通常情況下,不需要直接繼承ForkJoinTask類,只需要繼承它的子類,Fork/Join框架提供了兩個主要的接口:
- RecursiveAction:無返回值的接口;
- RecursiveTask:有返回值的接口。
Fork/Join框架實現原理
ForkJoinPool由ForkJoinTask數組和ForkJoinWorkerThread數組組成,ForkJoinTask數組負責將存放程序提交給ForkJoinPool,而ForkJoinWorkerThread負責執行這些任務。
ForkJoinTask的Fork方法的實現原理:當我們調用ForkJoinTask的fork方法時,程序會把任務放在ForkJoinWorkerThread的pushTask的workQueue中,異步地執行這個任務,然后立即返回結果,代碼如下:
|
public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; } |
pushTask方法把當前任務存放在ForkJoinTask數組隊列里。然后再調用ForkJoinPool的signalWork()方法喚醒或創建一個工作線程來執行任務。代碼如下:
final void push(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; ForkJoinPool p; int b = base, s = top, n; if ((a = array) != null) { // ignore if queue removed int m = a.length - 1; // fenced write for task visibility U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { if ((p = pool) != null) p.signalWork(p.workQueues, this); } else if (n >= m) growArray(); } }
Join方法的主要作用是阻塞當前線程并等待獲取結果。源碼如下:
public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); }
它首先調用doJoin方法,通過doJoin()方法得到當前任務的狀態來判斷返回什么結果,任務狀態有4種:已完成(NORMAL)、被取消(CANCELLED)、信號(SIGNAL)和出現異常(EXCEPTIONAL)。
- NORMAL 直接返回任務結果;
- CANCELLED 直接拋出CancellationException;
- EXCEPTIONAL 直接拋出對應的異常。
讓我們分析一下doJoin方法的實現:
private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); }

在doJoin()方法里,首先通過查看任務的狀態,看任務是否已經執行完成,如果執行完成,則直接返回任務狀態;如果沒有執行完,則從任務數組里取出任務并執行。如果任務順利執行完成,則設置任務狀態為NORMAL,如果出現異常,則記錄異常,并將任務狀態設置為EXCEPTIONAL。
Fork/Join框架的異常處理
ForkJoinTask在執行的時候可能會拋出異常,但是我們沒辦法在主線程里直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經拋出異常或已經被取消了,并且可以通過ForkJoinTask的getException方法獲取異常。使用如下代碼:
if(task.isCompletedAbnormally()){ System.out.println(task.getException()); }
getException方法返回Throwable對象,如果任務被取消了則返回CancellationException。如果任務沒有完成或者沒有拋出異常則返回null。
public final Throwable getException() { int s = status & DONE_MASK; return ((s >= NORMAL) ? null : (s == CANCELLED) ? new CancellationException() : getThrowableException()); }
案例分析
我們通過一個簡單的例子來介紹一下Fork/Join框架的使用。需求:計算1+2+3+…+100的結果。
因為是有結果的任務,所以必須繼承RecursiveTask。
import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; public class CountTask extends RecursiveTask<Integer> { private static final long serialVersionUID = 4880649443549126247L; private static final int THRESHOLD = 10; private int start; private int end; public CountTask(int start, int end) { super(); this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; boolean canCompute = (end - start) <= THRESHOLD; if (canCompute) { for (int i = start; i <= end; i++) { sum += i; } } else { int middle = (start + end) / 2;
// 遞歸 CountTask leftTask = new CountTask(start, middle); CountTask rightTask = new CountTask(middle + 1, end); leftTask.fork(); rightTask.fork(); int leftResult = leftTask.join(); int rightResult = rightTask.join(); sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(1, 100); Future<Integer> result = forkJoinPool.submit(task); try { System.out.println(result.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } if (task.isCompletedAbnormally()) { System.out.println(task.getException()); } } }
通過這個例子,我們進一步了解ForkJoinTask,ForkJoinTask與一般任務的主要區別在于它需要實現compute方法,在這個方法里,首先需要判斷任務是否足夠小,如果足夠小就直接執行任務;否則,遞歸分割任務,直到任務足夠小。最后,使用join方法會等待子任務執行完并得到其結果。
參考文獻
- http://www.rzrgm.cn/senlinyang/p/7885964.html
- http://www.rzrgm.cn/shijiaqi1066/p/4631466.html
Buy me a coffee. ?Get red packets.
浙公網安備 33010602011771號