[Java/并發編程] 核心源碼精講:Java 并行流(parallelStream) [JDK8-]
序
- 項目中利用了Java 8 的并行流(
parallelStream)來優化程序處理性能:
public static LinkedList<CycleCanSequenceDto> batchParseCloudMessageToCycleSequences(
List<byte []> cloudMessageBytesList
, CanHeaderConfigDto cloudMessageHeaderConfig
) {
List<LinkedList<CycleCanSequenceDto>> cycleCanSequenceDtoListList = cloudMessageBytesList.parallelStream().map(cloudMessageBytes -> {//并行處理
LinkedList<CycleCanSequenceDto> canSequenceDtos = null;
try {
canSequenceDtos = parseCloudMessageToCycleSequences(cloudMessageBytes, cloudMessageHeaderConfig);
} catch (IOException e) {
String errorMessage = "Parse cloud message to cycle sequences fail!cloudMessageBytesHex:" + BytesUtils.bytesToHexString(cloudMessageBytes);
log.error( errorMessage );
throw new RuntimeException(errorMessage);
}
return canSequenceDtos;
} ).collect(Collectors.toCollection(LinkedList::new));
LinkedList<CycleCanSequenceDto> cycleCanSequenceDtoList = cycleCanSequenceDtoListList.parallelStream().flatMap(cycleCanSequenceDtoListElement -> {//并行處理
return cycleCanSequenceDtoListElement.stream();
}).collect(Collectors.toCollection(LinkedList::new));
return cycleCanSequenceDtoList;
}
@Setter
@Getter
public class CycleCanSequenceDto extends CycleMessageSequenceDto {
/**
* 獲取 MessagePayloadDto 的總個數
* @param cycleCanSequences
* @return
*/
public static Long getMessagePayloadSize(List<CycleCanSequenceDto> cycleCanSequences){
AtomicLong messagePayloadSize = new AtomicLong(0);
if(cycleCanSequences==null) {
return -1L;
}
cycleCanSequences.parallelStream().forEach(cycleCanSequenceDto -> {
Integer currentCycleCanSequenceDtoMessagePayloadSize = cycleCanSequenceDto.getContent().size();
messagePayloadSize.addAndGet( currentCycleCanSequenceDtoMessagePayloadSize );
});
return messagePayloadSize.get();
}
}
概述:Java 并行流(parallelStream)[JDK8 - ]
- 并行流(
parallelStream)是Java 8引入的強大特性,它能夠自動將流操作【并行化】,以利用多核處理器的優勢。
java.util.Collection#parallelStream()
Java 8引入了流的概念去對數據進行復雜的操作,而且使用并行流(Parallel Steams)支持并發,大大加快了運行效率。
- 與【并行流】對應的是【順序流】
//順序流
list.stream()
.filter(i -> i > 10)
.collect( Collectors.toList() );
//并行流
list.parallelStream()
.filter(i -> i > 10)
.collect( Collectors.toList() );
下面我們將全面探討parallelStream的使用方法、原理和最佳實踐。
并行流基礎
創建并行流
// 從集合創建并行流
List<String> list = Arrays.asList("a", "b", "c");
Stream<String> parallelStream = list.parallelStream();
// 將順序流轉為并行流
Stream<String> parallelStream2 = Stream.of("a", "b", "c").parallel();
基本使用示例
List<Integer> numbers = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());
// 并行計算平方和
long sum = numbers.parallelStream()
.mapToLong(i -> i * i)
.sum();
并行流工作原理
底層機制
- 并行流使用
Fork/Join框架實現:
- 將任務分割為多個子任務(
fork)- 并行執行這些子任務
- 合并結果(
join)
算法思想: 分治
- 案例講解: 以代碼
list.parallelStream().filter(...).collect(...)為例
- Stage鏈構建:通過Head節點(Stage0)和中間操作(如filter、sorted)形成雙向鏈表,每個階段(Stage)封裝操作邏輯。
- 任務拆分:Spliterator將數據分割為多個子任務,分發到ForkJoinPool的線程隊列。
- 并行執行:各線程獨立處理子任務,通過opWrapSink方法將操作鏈應用到數據流。
- 結果合并:終端操作(如collect)調用combiner合并子任務結果。
// 示例:ArrayList的Spliterator實現
public Spliterator<E> spliterator() {
return new ArrayListSpliterator<>(this, 0, -1, 0); // 初始范圍[0, size)
}
底層框架:Fork/Join 框架
- 并行流基于
Java 7引入的Fork/Join框架實現,其核心是ForkJoinPool線程池,采用工作竊取算法(Work-Stealing) 優化任務分配。
每個線程維護一個雙端隊列,優先處理自己的任務,空閑時竊取其他線程隊列尾部的任務,最大化CPU利用率。
- 關鍵類分析:
ForkJoinTask:任務基類,子類包括RecursiveTask(有返回值)和RecursiveAction(無返回值)。Spliterator:數據拆分器,負責將數據源分割為可并行處理的子塊。例如:
ArrayListSpliterator支持高效隨機訪問分割。
源碼級關鍵機制解析
1) 數據拆分與合并
Spliterator特性:通過characteristics()方法返回特性值(如ORDERED、SIZED),影響拆分策略。
例如: ArrayList支持高效平均分割,而LinkedList拆分成本高。
- 任務鏈構造:中間操作(如filter、map)通過
StatelessOp或StatefulOp節點構建操作鏈,StatefulOp(如sorted)需緩存中間數據。
2) 并行流線程模型
- 默認線程池:使用
ForkJoinPool.commonPool()(JVM內共享的公共線程池, 被【整個應用程序】所使用)
- 默認的線程數為:
Runtime.getRuntime().availableProcessors() - 1即: CPU核心數-1。
-1是因為還有 JVM 的主線程需要占用1個線程
- 可自定義系統屬性:
java.util.concurrent.ForkJoinPool.common.parallelism最佳實踐: 由于主線程也會參與任務搶占CPU,所以 ForkJoinPool.commonPool 的線程數盡量設置為 (CPU核心數*N - 1)
// 設置全局并行度
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");
- 自定義線程池:可通過自定義
ForkJoinPool提交任務,但需注意避免資源競爭。
支持通過
ForkJoinPool定義私有線程池:
ForkJoinPool forkJoinPool = new ForkJoinPool(8);
List<Long> longs = forkJoinPool.submit(() -> aList.parallelStream().map( e -> {
return e + 1;
}).collect(Collectors.toList())).get();
適用場景
適合使用并行流的場景
- 數據量大:通常超過10,000個元素
- 計算密集型操作(CPU):如復雜的數學運算
- 無狀態操作:如map、filter、flatMap等
- 獨立操作:元素處理不依賴其他元素
不適合的場景
- 順序依賴操作:如limit、findFirst等
- 有狀態操作:如sorted、distinct
- I/O密集型操作:可能導致線程阻塞 (補充意見:但也不絕對不適合,有些情況下順序執行,反而更慢)
- 小數據集:并行開銷可能超過收益
性能優化技巧
正確測量性能
long start = System.nanoTime();
result = list.parallelStream().[...].collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("耗時: " + duration + " ms");
選擇合適的并行度
// 自定義線程池
ForkJoinPool customPool = new ForkJoinPool(4);
customPool.submit(() -> {
list.parallelStream().[...].collect(Collectors.toList());
}).get();
避免共享可變狀態
// 錯誤示例 - 存在競態條件
List<String> result = new ArrayList<>();
list.parallelStream().forEach(s -> result.add(s.toUpperCase())); // 可能拋出異常
// 正確做法
List<String> safeResult = list.parallelStream()
.map(String::toUpperCase)
.collect(Collectors.toList());
高級應用
自定義Spliterator
class CustomSpliterator<T> implements Spliterator<T> {
// 實現方法...
}
Spliterator<String> spliterator = new CustomSpliterator<>(data);
Stream<String> parallelStream = StreamSupport.stream(spliterator, true);
并行收集器
// 使用線程安全的收集器
Map<String, List<Student>> studentsByClass = students.parallelStream()
.collect(Collectors.groupingByConcurrent(Student::getClassName));
FAQ: 并行流的常見陷阱與解決方案
Q:并行流與順序流的性能對比?
- 測試示例
List<Integer> numbers = IntStream.rangeClosed(1, 10_000_000).boxed().collect(Collectors.toList());
// 順序流
long seqTime = measureTime(() -> numbers.stream().reduce(0, Integer::sum));
// 并行流
long parTime = measureTime(() -> numbers.parallelStream().reduce(0, Integer::sum));
System.out.println("順序流: " + seqTime + "ms");
System.out.println("并行流: " + parTime + "ms");
- 對比結果
| 操作 | 數據量 | 順序流耗時 | 并行流耗時 |
|---|---|---|---|
| 求和 | 100萬 | 15ms | 8ms |
| 過濾 | 1000萬 | 120ms | 45ms |
| 排序 | 100萬 | 650ms | 750ms |
Q:線程安全問題
- 問題
int[] counter = new int[1];
list.parallelStream().forEach(e -> counter[0]++); // 競態條件
- 解決
// 使用原子類
AtomicInteger counter = new AtomicInteger();
list.parallelStream().forEach(e -> counter.incrementAndGet());
// 或使用歸約操作
int sum = list.parallelStream().mapToInt(e -> 1).sum();
Q:順序敏感操作
- 問題
// 并行流中findFirst可能不如預期
Optional<Integer> first = list.parallelStream()
.filter(i -> i > 10)
.findFirst();
- 解決
// 如需順序保證,使用【順序流】,而非并行流
Optional<Integer> first = list.stream()
.filter(i -> i > 10)
.findFirst();
Q:性能層面的考量:是否需要單獨構建線程池?
- ? 建議單獨構建線程池的場景
| 場景 | 原因 |
|---|---|
| I/O 密集型任務 | 默認線程數較少(CPU-1),不適合阻塞操作(如 DB、HTTP),容易拖慢整個 commonPool(),影響其他并行任務 。 |
| 任務隔離需求 | 避免與其他模塊共享線程池,防止任務間資源競爭、死鎖或阻塞 。 |
| 需要精確控制并發度 | 自定義線程池可設置合適的線程數,避免過度切換或資源浪費 。 |
- ? 可不單獨構建線程池的場景
| 場景 | 原因 |
|---|---|
| CPU 密集型任務 | 默認 commonPool() 的線程數已接近 CPU 核心數,適合計算密集型任務 。 |
| 簡單一次性任務 | 代碼簡潔、無需復雜控制,使用默認線程池即可 。 |
Q:最佳實踐經驗
-
先測試后優化:不要假設并行一定更快,實際測量性能
-
避免副作用:確保lambda表達式沒有副作用
-
考慮順序性:需要順序保證時使用順序流
-
合理設置并行度:根據CPU核心數和任務特性調整
-
注意數據結構:ArrayList比LinkedList更適合并行處理
-
避免自動裝箱:使用原始類型流(IntStream等)提升性能
-
是否需要單獨創建線程池來執行并行流?
- CPU 密集型任務:可直接使用 parallelStream(),無需額外線程池。
- I/O 密集型或關鍵業務:建議如下方式使用自定義 ForkJoinPool:
若任務為 I/O 密集型或對隔離性、并發度有要求,有必要單獨構建線程池以提升性能與穩定性 。
ForkJoinPool customPool = new ForkJoinPool(20); // 自定義線程數
customPool.submit(() ->
list.parallelStream().forEach(item -> doSomething(item))
).get();
customPool.shutdown();
并行流是強大的工具,但需要謹慎使用。正確使用時可以顯著提升性能,錯誤使用則可能導致潛在問題。理解其工作原理和適用場景是有效使用并行流的關鍵。
X 參考文獻
本文作者:
千千寰宇
本文鏈接: http://www.rzrgm.cn/johnnyzen
關于博文:評論和私信會在第一時間回復,或直接私信我。
版權聲明:本博客所有文章除特別聲明外,均采用 BY-NC-SA 許可協議。轉載請注明出處!
日常交流:大數據與軟件開發-QQ交流群: 774386015 【入群二維碼】參見左下角。您的支持、鼓勵是博主技術寫作的重要動力!
本文鏈接: http://www.rzrgm.cn/johnnyzen
關于博文:評論和私信會在第一時間回復,或直接私信我。
版權聲明:本博客所有文章除特別聲明外,均采用 BY-NC-SA 許可協議。轉載請注明出處!
日常交流:大數據與軟件開發-QQ交流群: 774386015 【入群二維碼】參見左下角。您的支持、鼓勵是博主技術寫作的重要動力!

浙公網安備 33010602011771號