計算相同key的數據平均值 - aggregateByKey

查看源碼發現 aggregateByKey 的返回值與傳入的zeroVlue類型是一樣的
package com.pzb.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Description TODO aggregateByKey小練習
* @author 海綿先生
* @date 2023/3/16-15:17
*/
object Spark18_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 計算相同key值數據的平均值
val rdd: RDD[(String, Int)] = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
), 2)
// (Int,Int):第一個表示相同key的value之和,第二個Int表示相同key出現的次數
val newRDD: RDD[(String, (Int, Int))] = rdd.aggregateByKey((0, 0))(
(t, v) => {
(t._1 + v, t._2 + 1) // t表示的是初始值元組
},
(t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
)
// 計算平均值
val resultRDD: RDD[(String, Int)] = newRDD.map(
data => (data._1, data._2._1 / data._2._2)
)
resultRDD.collect().foreach(println)
/*
(b,4)
(a,3)
*/
// 模式匹配,如果不這樣寫,就要用"()",并且把value當成一個整體,像上面map一樣
val resultRDD2: RDD[(String, Int)] = newRDD.mapValues {
case (total, count) => {
total / count
}
}
resultRDD2.collect().foreach(println)
/*
(b,4)
(a,3)
*/
sc.stop()
}
}


另外一種實現方式:
package com.pzb.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Description TODO
* @author 海綿先生
* @date 2023/3/16-15:17
*/
object Spark19_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO transform算子 - combineByKey
val rdd: RDD[(String, Int)] = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
), 2)
// combineByKey : 方法需要三個參數
// 第一個參數表示:將相同key的第一個數據進行結構的轉換,實現操作
// 第二個參數表示:分區內的計算規則
// 第三個參數表示:分區間的計算規則
val newRDD: RDD[(String, (Int, Int))] = rdd.combineByKey(
v => (v, 1), // 指定不同key的第一個value元素格式
(t: (Int, Int), v) => { // 因為需要動態識別,所以要事先標明數據類型,否則有肯能會出錯
(t._1 + v, t._2 + 1) // t._1 + v:value的總數
},
(t1: (Int, Int), t2: (Int, Int)) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
)
val resultRDD: RDD[(String, Int)] = newRDD.mapValues {
case (sum, count) => sum / count
}
resultRDD.collect().foreach(println)
/*
(b,4)
(a,3)
*/
sc.stop()
}
}

浙公網安備 33010602011771號