<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      [Java/并發編程] 核心源碼精講:Java 并行流(parallelStream) [JDK8-]

      • 項目中利用了Java 8 的并行流(parallelStream)來優化程序處理性能:
          public static LinkedList<CycleCanSequenceDto> batchParseCloudMessageToCycleSequences(
              List<byte []> cloudMessageBytesList
              , CanHeaderConfigDto cloudMessageHeaderConfig
          ) {
              List<LinkedList<CycleCanSequenceDto>> cycleCanSequenceDtoListList = cloudMessageBytesList.parallelStream().map(cloudMessageBytes -> {//并行處理
                  LinkedList<CycleCanSequenceDto> canSequenceDtos = null;
                  try {
                      canSequenceDtos = parseCloudMessageToCycleSequences(cloudMessageBytes, cloudMessageHeaderConfig);
                  } catch (IOException e) {
                      String errorMessage = "Parse cloud message to cycle sequences fail!cloudMessageBytesHex:" + BytesUtils.bytesToHexString(cloudMessageBytes);
                      log.error( errorMessage );
                      throw new RuntimeException(errorMessage);
                  }
                  return canSequenceDtos;
              } ).collect(Collectors.toCollection(LinkedList::new));
      
              LinkedList<CycleCanSequenceDto> cycleCanSequenceDtoList = cycleCanSequenceDtoListList.parallelStream().flatMap(cycleCanSequenceDtoListElement -> {//并行處理
                  return cycleCanSequenceDtoListElement.stream();
              }).collect(Collectors.toCollection(LinkedList::new));
              return cycleCanSequenceDtoList;
          }
      
      @Setter
      @Getter
      public class CycleCanSequenceDto extends CycleMessageSequenceDto {
          /**
           * 獲取 MessagePayloadDto 的總個數
           * @param cycleCanSequences
           * @return
           */
          public static Long getMessagePayloadSize(List<CycleCanSequenceDto> cycleCanSequences){
              AtomicLong messagePayloadSize = new AtomicLong(0);
              if(cycleCanSequences==null) {
                  return -1L;
              }
              cycleCanSequences.parallelStream().forEach(cycleCanSequenceDto -> {
                  Integer currentCycleCanSequenceDtoMessagePayloadSize = cycleCanSequenceDto.getContent().size();
                  messagePayloadSize.addAndGet( currentCycleCanSequenceDtoMessagePayloadSize );
              });
              return messagePayloadSize.get();
          }
      }
      

      概述:Java 并行流(parallelStream)[JDK8 - ]

      • 并行流(parallelStream)是Java 8引入的強大特性,它能夠自動將流操作【并行化】,以利用多核處理器的優勢。

      java.util.Collection#parallelStream()
      Java 8引入了流的概念去對數據進行復雜的操作,而且使用并行流Parallel Steams)支持并發,大大加快了運行效率。

      • 與【并行流】對應的是【順序流】
      //順序流
      list.stream()
      	.filter(i -> i > 10)
      	.collect( Collectors.toList() );
      
      
      //并行流
      list.parallelStream()
      	.filter(i -> i > 10)
      	.collect( Collectors.toList() );
      

      下面我們將全面探討parallelStream的使用方法、原理和最佳實踐。

      并行流基礎

      創建并行流

      // 從集合創建并行流
      List<String> list = Arrays.asList("a", "b", "c");
      Stream<String> parallelStream = list.parallelStream();
      
      // 將順序流轉為并行流
      Stream<String> parallelStream2 = Stream.of("a", "b", "c").parallel();
      

      基本使用示例

      List<Integer> numbers = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());
      
      // 并行計算平方和
      long sum = numbers.parallelStream()
      	.mapToLong(i -> i * i)
      	.sum();
      

      并行流工作原理

      底層機制

      • 并行流使用Fork/Join框架實現:
      • 將任務分割為多個子任務(fork)
      • 并行執行這些子任務
      • 合并結果(join)

      算法思想: 分治

      • 案例講解: 以代碼list.parallelStream().filter(...).collect(...)為例
      • Stage鏈構建:通過Head節點(Stage0)和中間操作(如filter、sorted)形成雙向鏈表,每個階段(Stage)封裝操作邏輯。
      • 任務拆分:Spliterator將數據分割為多個子任務,分發到ForkJoinPool的線程隊列。
      • 并行執行:各線程獨立處理子任務,通過opWrapSink方法將操作鏈應用到數據流。
      • 結果合并:終端操作(如collect)調用combiner合并子任務結果。
      // 示例:ArrayList的Spliterator實現
      public Spliterator<E> spliterator() {
          return new ArrayListSpliterator<>(this, 0, -1, 0); // 初始范圍[0, size)
      }
      

      底層框架:Fork/Join 框架

      • 并行流基于Java 7引入的Fork/Join框架實現,其核心是ForkJoinPool線程池,采用工作竊取算法Work-Stealing優化任務分配

      每個線程維護一個雙端隊列,優先處理自己的任務,空閑時竊取其他線程隊列尾部的任務最大化CPU利用率

      • 關鍵類分析:
      • ForkJoinTask:任務基類,子類包括RecursiveTask(有返回值)和RecursiveAction(無返回值)。
      • Spliterator:數據拆分器,負責將數據源分割為可并行處理的子塊。

      例如: ArrayListSpliterator支持高效隨機訪問分割。

      源碼級關鍵機制解析

      1) 數據拆分與合并

      • Spliterator特性:通過characteristics()方法返回特性值(如ORDERED、SIZED),影響拆分策略。

      例如: ArrayList支持高效平均分割,而LinkedList拆分成本高。

      • 任務鏈構造:中間操作(如filter、map)通過StatelessOpStatefulOp節點構建操作鏈,StatefulOp(如sorted)需緩存中間數據。

      2) 并行流線程模型

      • 默認線程池:使用ForkJoinPool.commonPool() (JVM內共享的公共線程池, 被【整個應用程序】所使用)
      • 默認的線程數為: Runtime.getRuntime().availableProcessors() - 1 即: CPU核心數-1。

      -1是因為還有 JVM 的主線程需要占用1個線程

      • 可自定義系統屬性: java.util.concurrent.ForkJoinPool.common.parallelism

      最佳實踐: 由于主線程也會參與任務搶占CPU,所以 ForkJoinPool.commonPool 的線程數盡量設置為 (CPU核心數*N - 1)

      // 設置全局并行度
      System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");
      
      • 自定義線程池:可通過自定義ForkJoinPool提交任務,但需注意避免資源競爭。

      支持通過 ForkJoinPool 定義私有線程池:

      ForkJoinPool forkJoinPool = new ForkJoinPool(8);
      List<Long> longs = forkJoinPool.submit(() -> aList.parallelStream().map( e -> {
          return e + 1;
      }).collect(Collectors.toList())).get();
      

      適用場景

      適合使用并行流的場景

      • 數據量大:通常超過10,000個元素
      • 計算密集型操作(CPU):如復雜的數學運算
      • 無狀態操作:如map、filter、flatMap等
      • 獨立操作:元素處理不依賴其他元素

      不適合的場景

      • 順序依賴操作:如limit、findFirst等
      • 有狀態操作:如sorted、distinct
      • I/O密集型操作:可能導致線程阻塞 (補充意見:但也不絕對不適合,有些情況下順序執行,反而更慢)
      • 小數據集:并行開銷可能超過收益

      性能優化技巧

      正確測量性能

      long start = System.nanoTime();
      result = list.parallelStream().[...].collect(Collectors.toList());
      long duration = (System.nanoTime() - start) / 1_000_000;
      System.out.println("耗時: " + duration + " ms");
      

      選擇合適的并行度

      // 自定義線程池
      ForkJoinPool customPool = new ForkJoinPool(4);
      customPool.submit(() -> {
          list.parallelStream().[...].collect(Collectors.toList());
      }).get();
      

      避免共享可變狀態

      // 錯誤示例 - 存在競態條件
      List<String> result = new ArrayList<>();
      list.parallelStream().forEach(s -> result.add(s.toUpperCase()));  // 可能拋出異常
      
      // 正確做法
      List<String> safeResult = list.parallelStream()
      	.map(String::toUpperCase)
      	.collect(Collectors.toList());
      

      高級應用

      自定義Spliterator

      class CustomSpliterator<T> implements Spliterator<T> {
          // 實現方法...
      }
      
      Spliterator<String> spliterator = new CustomSpliterator<>(data);
      Stream<String> parallelStream = StreamSupport.stream(spliterator, true);
      

      并行收集器

      // 使用線程安全的收集器
      Map<String, List<Student>> studentsByClass = students.parallelStream()
          .collect(Collectors.groupingByConcurrent(Student::getClassName));
      

      FAQ: 并行流的常見陷阱與解決方案

      Q:并行流與順序流的性能對比?

      • 測試示例
      List<Integer> numbers = IntStream.rangeClosed(1, 10_000_000).boxed().collect(Collectors.toList());
      
      // 順序流
      long seqTime = measureTime(() -> numbers.stream().reduce(0, Integer::sum));
      
      // 并行流
      long parTime = measureTime(() -> numbers.parallelStream().reduce(0, Integer::sum));
      
      System.out.println("順序流: " + seqTime + "ms");
      System.out.println("并行流: " + parTime + "ms");
      
      • 對比結果
      操作 數據量 順序流耗時 并行流耗時
      求和 100萬 15ms 8ms
      過濾 1000萬 120ms 45ms
      排序 100萬 650ms 750ms

      Q:線程安全問題

      • 問題
      int[] counter = new int[1];
      list.parallelStream().forEach(e -> counter[0]++);  // 競態條件
      
      • 解決
      // 使用原子類
      AtomicInteger counter = new AtomicInteger();
      list.parallelStream().forEach(e -> counter.incrementAndGet());
      
      // 或使用歸約操作
      int sum = list.parallelStream().mapToInt(e -> 1).sum();
      

      Q:順序敏感操作

      • 問題
      // 并行流中findFirst可能不如預期
      Optional<Integer> first = list.parallelStream()
      	.filter(i -> i > 10)
      	.findFirst();
      
      • 解決
      // 如需順序保證,使用【順序流】,而非并行流
      Optional<Integer> first = list.stream()
      	.filter(i -> i > 10)
      	.findFirst();
      

      Q:性能層面的考量:是否需要單獨構建線程池?

      • ? 建議單獨構建線程池的場景
      場景 原因
      I/O 密集型任務 默認線程數較少(CPU-1),不適合阻塞操作(如 DB、HTTP),容易拖慢整個 commonPool(),影響其他并行任務 。
      任務隔離需求 避免與其他模塊共享線程池,防止任務間資源競爭、死鎖或阻塞 。
      需要精確控制并發度 自定義線程池可設置合適的線程數,避免過度切換或資源浪費 。
      • ? 可不單獨構建線程池的場景
      場景 原因
      CPU 密集型任務 默認 commonPool() 的線程數已接近 CPU 核心數,適合計算密集型任務 。
      簡單一次性任務 代碼簡潔、無需復雜控制,使用默認線程池即可 。

      Q:最佳實踐經驗

      • 先測試后優化:不要假設并行一定更快,實際測量性能

      • 避免副作用:確保lambda表達式沒有副作用

      • 考慮順序性:需要順序保證時使用順序流

      • 合理設置并行度:根據CPU核心數和任務特性調整

      • 注意數據結構:ArrayList比LinkedList更適合并行處理

      • 避免自動裝箱:使用原始類型流(IntStream等)提升性能

      • 是否需要單獨創建線程池來執行并行流?

      • CPU 密集型任務:可直接使用 parallelStream(),無需額外線程池。
      • I/O 密集型或關鍵業務:建議如下方式使用自定義 ForkJoinPool:

      若任務為 I/O 密集型或對隔離性、并發度有要求,有必要單獨構建線程池以提升性能與穩定性 。

      ForkJoinPool customPool = new ForkJoinPool(20); // 自定義線程數
      customPool.submit(() -> 
          list.parallelStream().forEach(item -> doSomething(item))
      ).get();
      customPool.shutdown();
      

      并行流是強大的工具,但需要謹慎使用。正確使用時可以顯著提升性能,錯誤使用則可能導致潛在問題。理解其工作原理和適用場景是有效使用并行流的關鍵。

      X 參考文獻

      posted @ 2025-08-12 13:13  千千寰宇  閱讀(179)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 潮喷失禁大喷水无码| 日韩乱码人妻无码中文字幕视频| 巨爆乳中文字幕爆乳区| 伊人精品无码av一区二区三区| 久久伊99综合婷婷久久伊| 亚洲精品国产一区二区三| 中文字幕无码色综合网| 人妻性奴波多野结衣无码| 国产成人一区二区三区免费| 亚洲精品一区二区三区中文字幕| 免费无码国模国产在线观看| 亚洲日本精品一区二区| 洮南市| 综合色天天久久| 亚洲av免费看一区二区| 给我播放片在线观看| 正在播放肥臀熟妇在线视频| 国产成人综合网亚洲第一| 午夜综合网| 久久精品国产99国产精品亚洲| 2020年最新国产精品正在播放| 国产成人无码www免费视频播放| 午夜福利国产盗摄久久性| 护士张开腿被奷日出白浆| 亚洲伊人成无码综合网| 国产一区二区三区精品综合 | 中国女人内谢69xxxx| 热久久这里只有精品国产| 亚洲中文精品一区二区| 免费无码一区二区三区蜜桃大| 久久婷婷五月综合色99啪ak| 蜜桃无码一区二区三区| 亚洲精品视频一二三四区| 成人性生交片无码免费看| 亚洲高清日韩heyzo| 亚洲最大av资源站无码av网址| 亚洲国产精品成人无码区| 中字幕人妻一区二区三区| 日韩精品人妻av一区二区三区| 亚洲乱妇熟女爽到高潮的片| 国产精品第一页一区二区|