<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      累加器和廣播變量

      前言:由于Spark的閉包檢查,Driver端的數(shù)據(jù)無法獲取到Executor端的計算數(shù)據(jù)。

      因此需要特殊類型——累加器(ACC)

      累加器實現(xiàn)原理

      累加器用來把Executor端變量信息聚合到Driver端。再Driver程序中定義的變量,在Executor 端的每個 Task 都會得到這個變量的一份新的副本,每個 task 更新這些副本的值后, 傳回 Driver 端進行 merge。

      調(diào)用系統(tǒng)累加器

      package com.pzb.acc
      
      import org.apache.spark.rdd.RDD
      import org.apache.spark.util.LongAccumulator
      import org.apache.spark.{SparkConf, SparkContext}
      
      /**
       * @Description TODO
       * @author 海綿先生
       * @date 2023/4/16-20:16
       */
      object Spark02_Acc {
      
        def main(args: Array[String]): Unit = {
          val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkAcc")
          val sc = new SparkContext(sparkConf)
      
          val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
      
          // 獲取系統(tǒng)累加器
          // Spark默認就提供了簡單數(shù)據(jù)聚合的累加器
          val sumAcc: LongAccumulator = sc.longAccumulator("sum")
      
          rdd.foreach(
            num => {
              sumAcc.add(num)
            }
          )
          println("sumAcc:" + sumAcc) // sumAcc:LongAccumulator(id: 0, name: Some(sum), value: 10)
          sc.stop()
        }
      }
      

      該累加器為long類型,會自動累加并返回結果

      系統(tǒng)中還有double、collection類型(內(nèi)部是List類型)

      注意事項

      package com.pzb.acc
      
      import org.apache.spark.rdd.RDD
      import org.apache.spark.util.LongAccumulator
      import org.apache.spark.{SparkConf, SparkContext}
      
      /**
       * @Description TODO
       * @author 海綿先生
       * @date 2023/4/16-20:16
       */
      object Spark03_Acc {
      
        def main(args: Array[String]): Unit = {
          val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkAcc")
          val sc = new SparkContext(sparkConf)
      
          val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
      
          // TODO 累加器常遇到的問題
          val sumAcc: LongAccumulator = sc.longAccumulator("sum")
      
          // 在轉換算子中調(diào)用
          val mapRDD: RDD[Int] = rdd.map(
            num => {
              // 使用累加器
              sumAcc.add(num)
              num
            }
          )
          // 少加:轉換算子中調(diào)用累加器,如果沒有行動算子的話,那么不會執(zhí)行
          println(sumAcc.value) // 0
      
          // 多加:轉換算子中調(diào)用累加器,如果作業(yè)被多次執(zhí)行,那么就會被多次計算
          mapRDD.collect()
          mapRDD.collect()
          println(sumAcc.value) // 20
      
          // TODO 因此累加器一般情況下是放在行動算子中使用的
      
          sc.stop()
        }
      }
      

      自定義累加器

      官方給的集合類型累加器內(nèi)部是List類型,如果想換成Map獲取其他類型就要自定義累加器。

      自定義累加器步驟:
      1.繼承一個抽象類——AccumulatorV2[IN, OUT]

      • IN:輸入類型
      • OUT:輸出類型

      2.注冊累加器

      package com.pzb.acc
      
      import org.apache.spark.rdd.RDD
      import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
      import org.apache.spark.{SparkConf, SparkContext}
      
      import scala.collection.mutable
      
      /**
       * @Description TODO 自定義累加器
       * @author 海綿先生
       * @date 2023/4/16-20:16
       */
      object Spark04_Acc {
      
        def main(args: Array[String]): Unit = {
          val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkAcc")
          val sc = new SparkContext(sparkConf)
      
          val rdd: RDD[String] = sc.makeRDD(List("Hello","Spark","Hello","Word"))
      
          // 創(chuàng)建累加器對象
          val accumulator = new MyAccumulator()
          // 注冊累加器
          sc.register(accumulator,"MyAcc")
      
          rdd.foreach{
            word => {
              accumulator.add(word)
            }
          }
          println(accumulator.value) // Map(Word -> 1, Hello -> 2, Spark -> 1)
      
          sc.stop()
        }
        class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]]{
      
          // 定義累加器value
          private var wcMap = mutable.Map[String, Long]()
      
          // 初始化累加器
          override def isZero: Boolean = {
            wcMap.isEmpty
          }
      
          // 復制累加器方法
          override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
            new MyAccumulator()
          }
      
          // 重置累加器方法
          override def reset(): Unit = {
            wcMap.clear()
          }
      
          // 獲取累加器需要計算的值
          override def add(word: String): Unit = {
            // 通過key獲取value,如果沒有key就為0
            val newCnt = wcMap.getOrElse(word, 0L) + 1
            // 更改Map的值
            wcMap.update(word, newCnt)
          }
      
          //Driver端合并多個累加器
          override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
            val map1: mutable.Map[String, Long] = this.wcMap
            // 獲取其他累加器的值
            val map2: mutable.Map[String, Long] = other.value
            map2.foreach{
              case (word, count) => {
                val newCnt: Long = map1.getOrElse(word, 0L) + count
                map1.update(word,newCnt)
              }
            }
          }
      
          // 獲取累加器結果
          override def value: mutable.Map[String, Long] = {
            wcMap
          }
        }
      }
      

      對一些簡單的累加統(tǒng)計可以用累加器來完成,減少Shuffle的操作。

      廣播變量

      閉包數(shù)據(jù)都是以Task為單位發(fā)送的,每個任務中包含閉包數(shù)據(jù),這樣可能會導致一個Executor中含有大量重復的數(shù)據(jù),并且占用大量的內(nèi)存(Executor其實就是一個JVM,在啟動時就會自動分配好內(nèi)存)。

      Spark中的廣播變量就可以將閉包的數(shù)據(jù)保存到Executor的內(nèi)存中。
      Spark中的廣播變量是不能改的:分布式共享只讀變量。

      package com.pzb.acc
      
      import org.apache.spark.broadcast.Broadcast
      import org.apache.spark.rdd.RDD
      import org.apache.spark.util.AccumulatorV2
      import org.apache.spark.{SparkConf, SparkContext}
      
      import scala.collection.mutable
      
      /**
       * @Description TODO 自定義累加器
       * @author 海綿先生
       * @date 2023/4/16-20:16
       */
      object Spark05_Bc {
      
        def main(args: Array[String]): Unit = {
          val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkAcc")
          val sc = new SparkContext(sparkConf)
      
          val rdd: RDD[(String,Int)] = sc.makeRDD(List(("a",1),("b",2),("c",3)))
          val map1 = mutable.Map(("a",4), ("b",5), ("c",6))
      
          // 封裝廣播信息
          val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map1)
      
          rdd.map{
            case (w, c) => {
              val l: Int = bc.value.getOrElse(w,0)
              (w,(c,l))
            }
          }.collect().foreach(println)
          /*
          (a,(1,4))
          (b,(2,5))
          (c,(3,6))
           */
          sc.stop()
        }
      }
      

      上面的案例通過廣播變量很好的避免了笛卡爾集的發(fā)生

      posted @ 2025-04-10 23:42  MrSponge  Views(59)  Comments(0)    收藏  舉報
      主站蜘蛛池模板: 一二三四中文字幕日韩乱码| 色偷偷亚洲女人天堂观看| 免费a级毛片18以上观看精品| 亚洲色成人网站www永久男男 | 日韩精品一区二区三区激情视频| 亚洲av本道一区二区| 一区二区三区四区精品视频| 国产成人无码免费视频在线| 日韩av一区二区高清不卡| 92国产精品午夜福利免费| 精品剧情V国产在线观看| 德江县| 久久国产免费观看精品3| 2019香蕉在线观看直播视频| 国产精品高清一区二区不卡| 国产精品午夜福利资源| 77se77亚洲欧美在线| 亚洲一区二区三区色视频| 国产热A欧美热A在线视频| 久热视频这里只有精品6| 国产亚洲精品成人av久| 精品亚洲没码中文字幕| 肉大捧一进一出免费视频| 久久99精品国产麻豆婷婷| 网友偷拍视频一区二区三区| 国产精品久久蜜臀av| 国产久免费热视频在线观看| 国产最大成人亚洲精品| 无码人妻丝袜在线视频| 国产成人午夜在线视频极速观看| 亚洲免费视频一区二区三区| 一本大道久久香蕉成人网| 男女猛烈无遮挡免费视频APP| 亚洲中文字幕伊人久久无码| 大屁股国产白浆一二区| 国产精品自在拍在线播放| 久久久av男人的天堂| 国产精品老熟女一区二区| 中文字幕成人精品久久不卡| 99精品国产中文字幕| 92国产精品午夜福利免费|