JUC并發編程學習(十三)ForkJoin
ForkJoin
什么是ForkJoin
ForkJoin在JDK1.7,并發執行任務!大數據量時提高效率。
大數據:Map Reduce(把大任務拆分成小任務)

ForkJoin特點:工作竊取

為什么可以取竊取其他線程的任務呢?因為這里面維護的都是雙端隊列(即隊列的兩端都可以取元素)
ForkJoin操作
在java.util.concurrent下的接口摘要中,有以下兩個接口


點進其中一個找到具體的類,可以看到ForkJoinPool

這是具體的執行類,就像ThreadPoolExecutor

具體的執行通過ForkJoinTask類來執行

使用ForkJoin
1、需要一個ForkJoinPool,通過execute方法來執行,參數為ForkJoinTask
2、計算任務 forkJoinPool.execute(ForkJoinTask<?> task)
3、定義一個ForkJoinTask

如何去定義一個ForkJoinTask,打開jdk文檔,找到ForkJoinTask類,查看具體的子類。

其中遞歸事件沒有返回值,而任務肯定要有結果,所以遞歸任務是有返回值的
點進任務,查看示例

代碼示例:
package org.example.forkjoin;
/*
* 求和計算的任務
*
*
* 程序員的三六九等
* 三(普通求和)、六(ForkJoin)、九(Stream并行流)
*
*
* 使用ForkJoin:
* 1、ForkJoinPoll 通過他來執行
* 2、計算任務 forkJoinPool.execute(ForkJoinTask<?> task)
* 3、定義一個ForkJoinTask
*
*
* */
import java.util.concurrent.RecursiveTask;
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start;
private Long end;
//臨界值
private Long temp = 10000L;
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
//計算方法
@Override
protected Long compute() {
if ((end-start)<temp){
Long sum = 0L;
for (Long i = start; i <= end; i++) {
sum+=i;
}
return sum;
}else {
//使用ForkJoin分支合并計算
Long middle = (end + start) / 2;//中間值
ForkJoinDemo task1 = new ForkJoinDemo(start,middle);
//拆分任務、把任務壓入線程隊列
task1.fork();
ForkJoinDemo task2 = new ForkJoinDemo(middle+1,end);
task2.fork();
return task1.join()+task2.join();
}
}
}
三六九等程序員的測試
package org.example.forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// test01();//11720
// test02();//7369
test03();//239
}
/*
* 普通程序員
* */
public static void test01() {
long startTime = System.currentTimeMillis();
Long sum = 0L;
for (Long i = 1L; i <= 10_0000_0000L; i++) {
sum+=i;
}
long endTime = System.currentTimeMillis();
System.out.println("結果:" + sum + "-->耗時:" + (endTime - startTime));
}
/*
* 會使用ForkJoin
* */
public static void test02() throws ExecutionException, InterruptedException {
long startTime = System.currentTimeMillis();
ForkJoinDemo forkJoinDemo = new ForkJoinDemo(0L,10_0000_0000L);
ForkJoinPool forkJoinPool = new ForkJoinPool();
// forkJoinPool.execute(forkJoinDemo);//執行
ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinDemo);
Long aLong = submit.get();
long endTime = System.currentTimeMillis();
System.out.println("結果:" + aLong + "-->耗時:" + (endTime - startTime));
}
/*
* 九等程序員:Stream流并行運算
* 效率高十幾倍
*
* */
public static void test03() {
long start = System.currentTimeMillis();
Long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("結果:" + sum + "-->耗時:" + (end - start));
}
}

浙公網安備 33010602011771號