累加器和廣播變量
前言:由于Spark的閉包檢查,Driver端的數(shù)據(jù)無法獲取到Executor端的計算數(shù)據(jù)。

因此需要特殊類型——累加器(ACC)

累加器實現(xiàn)原理
累加器用來把Executor端變量信息聚合到Driver端。再Driver程序中定義的變量,在Executor 端的每個 Task 都會得到這個變量的一份新的副本,每個 task 更新這些副本的值后, 傳回 Driver 端進行 merge。
調(diào)用系統(tǒng)累加器
package com.pzb.acc
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Description TODO
* @author 海綿先生
* @date 2023/4/16-20:16
*/
object Spark02_Acc {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkAcc")
val sc = new SparkContext(sparkConf)
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 獲取系統(tǒng)累加器
// Spark默認就提供了簡單數(shù)據(jù)聚合的累加器
val sumAcc: LongAccumulator = sc.longAccumulator("sum")
rdd.foreach(
num => {
sumAcc.add(num)
}
)
println("sumAcc:" + sumAcc) // sumAcc:LongAccumulator(id: 0, name: Some(sum), value: 10)
sc.stop()
}
}
該累加器為long類型,會自動累加并返回結果
系統(tǒng)中還有double、collection類型(內(nèi)部是List類型)
注意事項
package com.pzb.acc
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Description TODO
* @author 海綿先生
* @date 2023/4/16-20:16
*/
object Spark03_Acc {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkAcc")
val sc = new SparkContext(sparkConf)
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// TODO 累加器常遇到的問題
val sumAcc: LongAccumulator = sc.longAccumulator("sum")
// 在轉換算子中調(diào)用
val mapRDD: RDD[Int] = rdd.map(
num => {
// 使用累加器
sumAcc.add(num)
num
}
)
// 少加:轉換算子中調(diào)用累加器,如果沒有行動算子的話,那么不會執(zhí)行
println(sumAcc.value) // 0
// 多加:轉換算子中調(diào)用累加器,如果作業(yè)被多次執(zhí)行,那么就會被多次計算
mapRDD.collect()
mapRDD.collect()
println(sumAcc.value) // 20
// TODO 因此累加器一般情況下是放在行動算子中使用的
sc.stop()
}
}
自定義累加器
官方給的集合類型累加器內(nèi)部是List類型,如果想換成Map獲取其他類型就要自定義累加器。
自定義累加器步驟:
1.繼承一個抽象類——AccumulatorV2[IN, OUT]
- IN:輸入類型
- OUT:輸出類型
2.注冊累加器
package com.pzb.acc
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
/**
* @Description TODO 自定義累加器
* @author 海綿先生
* @date 2023/4/16-20:16
*/
object Spark04_Acc {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkAcc")
val sc = new SparkContext(sparkConf)
val rdd: RDD[String] = sc.makeRDD(List("Hello","Spark","Hello","Word"))
// 創(chuàng)建累加器對象
val accumulator = new MyAccumulator()
// 注冊累加器
sc.register(accumulator,"MyAcc")
rdd.foreach{
word => {
accumulator.add(word)
}
}
println(accumulator.value) // Map(Word -> 1, Hello -> 2, Spark -> 1)
sc.stop()
}
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]]{
// 定義累加器value
private var wcMap = mutable.Map[String, Long]()
// 初始化累加器
override def isZero: Boolean = {
wcMap.isEmpty
}
// 復制累加器方法
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
new MyAccumulator()
}
// 重置累加器方法
override def reset(): Unit = {
wcMap.clear()
}
// 獲取累加器需要計算的值
override def add(word: String): Unit = {
// 通過key獲取value,如果沒有key就為0
val newCnt = wcMap.getOrElse(word, 0L) + 1
// 更改Map的值
wcMap.update(word, newCnt)
}
//Driver端合并多個累加器
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
val map1: mutable.Map[String, Long] = this.wcMap
// 獲取其他累加器的值
val map2: mutable.Map[String, Long] = other.value
map2.foreach{
case (word, count) => {
val newCnt: Long = map1.getOrElse(word, 0L) + count
map1.update(word,newCnt)
}
}
}
// 獲取累加器結果
override def value: mutable.Map[String, Long] = {
wcMap
}
}
}
對一些簡單的累加統(tǒng)計可以用累加器來完成,減少Shuffle的操作。
廣播變量
閉包數(shù)據(jù)都是以Task為單位發(fā)送的,每個任務中包含閉包數(shù)據(jù),這樣可能會導致一個Executor中含有大量重復的數(shù)據(jù),并且占用大量的內(nèi)存(Executor其實就是一個JVM,在啟動時就會自動分配好內(nèi)存)。

Spark中的廣播變量就可以將閉包的數(shù)據(jù)保存到Executor的內(nèi)存中。
Spark中的廣播變量是不能改的:分布式共享只讀變量。

package com.pzb.acc
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
/**
* @Description TODO 自定義累加器
* @author 海綿先生
* @date 2023/4/16-20:16
*/
object Spark05_Bc {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkAcc")
val sc = new SparkContext(sparkConf)
val rdd: RDD[(String,Int)] = sc.makeRDD(List(("a",1),("b",2),("c",3)))
val map1 = mutable.Map(("a",4), ("b",5), ("c",6))
// 封裝廣播信息
val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map1)
rdd.map{
case (w, c) => {
val l: Int = bc.value.getOrElse(w,0)
(w,(c,l))
}
}.collect().foreach(println)
/*
(a,(1,4))
(b,(2,5))
(c,(3,6))
*/
sc.stop()
}
}
上面的案例通過廣播變量很好的避免了笛卡爾集的發(fā)生

浙公網(wǎng)安備 33010602011771號