java8提供一個fork/join framework,fork/join框架是ExecutorService接口的一個實現,它可以幫助你充分利用你電腦中的多核處理器,它的設計理念是將一個任務分割成多個可以遞歸執行的小任務,這些小任務可由不同的處理器進行并行執行,顯然這樣設計的一個目的就是提高應用的性能。
fork/join框架的和興是ForkJoinPool類,是AbstractExecutorService類的擴展,它繼承了核心的并行執行算法,可以執行ForkJoinTask進程。基本用法可以如下偽代碼:
if (my portion of the work is small enough)
do the work directly
else
split my work into two pieces
invoke the two pieces and wait for the results
將上面的代碼通過ForkJoinTask子類進行封裝,通常繼承ForkJoinTask更具體的子類來實現,如RecursiveTask或者RecursiveAction,當子類創建完畢,再創建一個表示要處理的任務的對象,將它作為一個參數傳遞給ForkJoinPool實例的invoke()方法。官方文檔給了一個對照片進行模糊處理的示例,以這個示例為切入點了解fork/join操作,在對照片進行模糊處理時,思路是將一個圖片轉成一個數組,這個數組中的每個值代表一個像素,然后對這個像素進行處理,顯然將圖片轉成的數組將會是一個很大的數組,為了加快處理,可以對數組進行切割,切割成一個個小的數組,然后對這個小的數組進行并行處理,將處理后的結果再合并成一個大的結果返給用戶,代碼如下:
class ForkBlur extends RecursiveAction { private int[] mSource; private int mStart; private int mLength; private int[] mDestination; private int mBlurWidth = 15; // Processing window size, should be odd. public ForkBlur(int[] src, int start, int length, int[] dst) { mSource = src; mStart = start; mLength = length; mDestination = dst; } // 計算原圖片的平均像素, 將結果寫道目的數組中. protected void computeDirectly() { int sidePixels = (mBlurWidth - 1) / 2; for (int index = mStart; index < mStart + mLength; index++) { // 計算平均值. float rt = 0, gt = 0, bt = 0; for (int mi = -sidePixels; mi <= sidePixels; mi++) { int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1); int pixel = mSource[mindex]; rt += (float) ((pixel & 0x00ff0000) >> 16) / mBlurWidth; gt += (float) ((pixel & 0x0000ff00) >> 8) / mBlurWidth; bt += (float) ((pixel & 0x000000ff) >> 0) / mBlurWidth; } // 重新組合目標像素. int dpixel = (0xff000000) | (((int) rt) << 16) | (((int) gt) << 8) | (((int) bt) << 0); mDestination[index] = dpixel; } } //定義計算的閾值 protected static int sThreshold = 10000; @Override protected void compute() { //長度小于指定的閾值,則直接進行計算 if (mLength < sThreshold) { computeDirectly(); return; } //進行切分 int split = mLength / 2; //并行執行所有的任務 invokeAll(new ForkBlur(mSource, mStart, split, mDestination), new ForkBlur(mSource, mStart + split, mLength - split, mDestination)); } // Plumbing follows. public static void main(String[] args) throws Exception { String srcName = "C:\\Users\\Administrator\\Desktop\\信息文件\\研究院工作\\個人信息\\aaa.jpg"; File srcFile = new File(srcName); BufferedImage image = ImageIO.read(srcFile); System.out.println("Source image: " + srcName); BufferedImage blurredImage = blur(image); String dstName = "C:\\Users\\Administrator\\Desktop\\信息文件\\研究院工作\\個人信息\\bbb.jpg"; File dstFile = new File(dstName); ImageIO.write(blurredImage, "jpg", dstFile); System.out.println("Output image: " + dstName); } public static BufferedImage blur(BufferedImage srcImage) { int w = srcImage.getWidth(); int h = srcImage.getHeight(); int[] src = srcImage.getRGB(0, 0, w, h, null, 0, w); int[] dst = new int[src.length]; System.out.println("Array size is " + src.length); System.out.println("Threshold is " + sThreshold); int processors = Runtime.getRuntime().availableProcessors(); System.out.println(Integer.toString(processors) + " processor" + (processors != 1 ? "s are " : " is ") + "available"); //創建目標任務對象 ForkBlur fb = new ForkBlur(src, 0, src.length, dst); //創建任務執行對象 ForkJoinPool pool = new ForkJoinPool(); long startTime = System.currentTimeMillis(); //執行模糊操作 pool.invoke(fb); long endTime = System.currentTimeMillis(); System.out.println("Image blur took " + (endTime - startTime) + " milliseconds."); BufferedImage dstImage = new BufferedImage(w, h, BufferedImage.TYPE_INT_ARGB); dstImage.setRGB(0, 0, w, h, dst, 0, w); return dstImage; } }
上面的代碼中,ForkBlur任務繼承RecursiveAction抽象類,它的執行過程整體分為三步:
1、創建一個表示需要執行動作的任務:ForkBlur fb=new ForkBlur(...)
2、創建ForkJoinPool對象,以觸發任務的執行:ForkJoinPool pool=new ForkJoinPool();
3、執行任務 pool.invoke(fb)
并行計算在流中也有很多引用,它的思想是:java會將流分割成多分子流,聚合操作會以并行的形式遍歷處理這些子流,最后將結果匯合成一個結果,在創建平行流的時候需要調用paralleStream()方法指明創建并行流:
double average = roster
.parallelStream()
.filter(p -> p.getGender() == Person.Sex.MALE)
.mapToInt(Person::getAge)
.average()
.getAsDouble();
java中的并發歸約操作,前面講到,如果將人按照性別的方式進行區分,則歸約操作如下:
Map<Person.Sex, List<Person>> byGender =
roster
.stream()
.collect(
Collectors.groupingBy(Person::getGender));
上面的代碼的結果和下面的操作方式是等價的:
ConcurrentMap<Person.Sex, List<Person>> byGender =
roster
.parallelStream()
.collect(
Collectors.groupingByConcurrent(Person::getGender));
上面的操作方式稱為并發歸約操作(concurrent reduction),如果以下條件均滿足,則一個包含collect操作的管道就是并發歸約操作:
1、流是并行的,stream is parallel
2、collect操作的參數,也就是待操作的集合需要有Collector.Characteristics.CONCURRENT特征,查看一個集合的特征,可以調用Collector.characteristics方法
3、無論流是無序的,還是集合具有Collector.Characteristics.UNORDERED的特征,為了保證流是無序的,可以調用BaseStream.unordered操作。
一個管道在處理流的順序取決于這個流值順序執行的還是并行執行的(in serial or in parallel)、流的源和中間操作;例如打印一個list集合中的元素,這里使用forEach操作,代碼如下:
Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8 };
List<Integer> listOfIntegers =
new ArrayList<>(Arrays.asList(intArray));
System.out.println("listOfIntegers:");
listOfIntegers
.stream()
.forEach(e -> System.out.print(e + " "));
System.out.println("");
System.out.println("listOfIntegers sorted in reverse order:");
Comparator<Integer> normal = Integer::compare;
Comparator<Integer> reversed = normal.reversed();
Collections.sort(listOfIntegers, reversed);
listOfIntegers
.stream()
.forEach(e -> System.out.print(e + " "));
System.out.println("");
System.out.println("Parallel stream");
listOfIntegers
.parallelStream()
.forEach(e -> System.out.print(e + " "));
System.out.println("");
System.out.println("Another parallel stream:");
listOfIntegers
.parallelStream()
.forEach(e -> System.out.print(e + " "));
System.out.println("");
System.out.println("With forEachOrdered:");
listOfIntegers
.parallelStream()
.forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");
===========執行結果為=============
listOfIntegers:
1 2 3 4 5 6 7 8
listOfIntegers sorted in reverse order:
8 7 6 5 4 3 2 1
Parallel stream:
3 4 1 6 2 5 7 8
Another parallel stream:
6 3 1 5 7 8 4 2
With forEachOrdered:
8 7 6 5 4 3 2 1
上面的代碼中,第三和第四個管道的輸出結果顯然是無序的,而第五個管道使用forEachOrdered方法,這時候無論使用的stream還是parallelStream,都使得執行過程按照執行順序進行,當然,這也就是失去了并行流的優勢了。
以上是java并行處理數據的小示例,判斷什么時候使用并行處理也是一個問題,知識是無涯的,不要小看任何東西...
浙公網安備 33010602011771號