Spark streaming的窗口操作(window、reduceByWindow等)和foreachRDD結(jié)合
一、背景
最近有一個(gè)需求是:要求有一個(gè)類對(duì)象為Order,它有string類型的字段orderNo和Long類型的字段cost,生產(chǎn)者寫(xiě)到kafka的value是Order對(duì)象序列化后的字節(jié)數(shù)組、key值是orderNo字段,要求spark以手動(dòng)提交的方式消費(fèi)kafka,并將數(shù)據(jù)依次寫(xiě)入到hive表中,并且spark中有一個(gè)5分鐘滑動(dòng)窗口,滑動(dòng)步長(zhǎng)為1分鐘,統(tǒng)計(jì)5分鐘內(nèi)的cost總值并輸出。
然后實(shí)現(xiàn)代碼里用到了reduceByKeyAndWindow的窗口操作,畢竟配合了foreachRDD進(jìn)行了操作,所以對(duì)這一塊做了一個(gè)研究。代碼如下:
1 package com.example; 2 3 import org.apache.kafka.clients.consumer.ConsumerConfig; 4 import org.apache.kafka.common.serialization.ByteArrayDeserializer; 5 import org.apache.kafka.common.serialization.StringDeserializer; 6 import org.apache.spark.SparkConf; 7 import org.apache.spark.api.java.JavaRDD; 8 import org.apache.spark.api.java.JavaSparkContext; 9 import org.apache.spark.api.java.function.Function; 10 import org.apache.spark.sql.*; 11 import org.apache.spark.streaming.*; 12 import org.apache.spark.streaming.api.java.*; 13 import org.apache.spark.streaming.kafka010.*; 14 15 import java.io.ByteArrayInputStream; 16 import java.io.ObjectInputStream; 17 import java.sql.Timestamp; 18 import java.util.*; 19 import java.util.stream.Collectors; 20 21 import scala.Tuple2; 22 23 public class SparkKafkaToHive { 24 public static Order deserialize(byte[] bytes) throws Exception { 25 if (bytes == null) return null; 26 ByteArrayInputStream bis = new ByteArrayInputStream(bytes); 27 ObjectInputStream ois = new ObjectInputStream(bis); 28 Object o = ois.readObject(); 29 return (Order) o; 30 } 31 32 // SparkSession 單例幫助類(在 foreachRDD 中復(fù)用 SparkSession) 33 public static class JavaSparkSessionSingleton { 34 private static transient SparkSession instance = null; 35 36 public static SparkSession getInstance(SparkConf conf) { 37 if (instance == null) { 38 synchronized (JavaSparkSessionSingleton.class) { 39 if (instance == null) { 40 instance = SparkSession.builder() 41 .config(conf) 42 .enableHiveSupport() 43 .getOrCreate(); 44 } 45 } 46 } 47 return instance; 48 } 49 } 50 51 public static void main(String[] args) throws Exception { 52 String brokers = "localhost:9092"; 53 String topic = "orders-topic"; 54 String groupId = "spark-orders-consumer-group"; 55 56 // SparkConf & StreamingContext(微批間隔:1 分鐘,符合滑動(dòng)步長(zhǎng)) 57 SparkConf sparkConf = new SparkConf() 58 .setAppName("SparkKafkaToHiveDemo") 59 .setIfMissing("spark.master", "local[2]") // 本地測(cè)試時(shí)使用 60 .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); 61 62 // 這里設(shè)置 1 分鐘 batch interval(因?yàn)榛瑒?dòng)步長(zhǎng)是 1 分鐘) 63 Duration batchInterval = Durations.minutes(1); 64 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, batchInterval); 65 66 // Kafka params 67 Map<String, Object> kafkaParams = new HashMap<>(); 68 kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); 69 kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 70 kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); 71 kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 72 kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); 73 // 禁用自動(dòng)提交,由我們手動(dòng)提交 74 kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 75 76 Collection<String> topics = Collections.singletonList(topic); 77 78 // Create Direct Stream 79 JavaInputDStream<org.apache.kafka.clients.consumer.ConsumerRecord<String, byte[]>> stream = 80 KafkaUtils.createDirectStream( 81 jssc, 82 LocationStrategies.PreferConsistent(), 83 ConsumerStrategies.<String, byte[]>Subscribe(topics, kafkaParams) 84 ); 85 86 // --- 1) 將每個(gè) micro-batch 的所有訂單寫(xiě)入 Hive --- 87 stream.foreachRDD((rdd, time) -> { 88 if (rdd.isEmpty()) { 89 // 沒(méi)有數(shù)據(jù)也應(yīng)該嘗試提交 offsets?這里跳過(guò)提交以示例簡(jiǎn)單(可視需求決定) 90 return; 91 } 92 93 // 獲取 offset ranges 94 OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); 95 96 // 反序列化為 Order 對(duì)象 JavaRDD<Order> 97 JavaRDD<Order> ordersRDD = rdd.map(record -> { 98 try { 99 return deserialize(record.value()); 100 } catch (Exception e) { 101 // 處理反序列化異常(記錄日志并丟棄) 102 e.printStackTrace(); 103 return null; 104 } 105 }).filter(Objects::nonNull); 106 107 // 寫(xiě)入 Hive 108 if (!ordersRDD.isEmpty()) { 109 // 獲?。ɑ騽?chuàng)建)SparkSession 110 SparkSession spark = JavaSparkSessionSingleton.getInstance(sparkConf); 111 Dataset<Row> df = spark.createDataFrame(ordersRDD, Order.class); 112 // 寫(xiě)入 Hive 表(append) 113 df.write().mode(SaveMode.Append).insertInto("demo_db.orders"); 114 } 115 116 // 處理完業(yè)務(wù)后,提交 offsets 到 Kafka(手動(dòng)提交) 117 // 需要對(duì) stream 進(jìn)行類型轉(zhuǎn)換以調(diào)用 commitAsync 118 try { 119 ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges, (offsets, exception) -> { 120 if (exception != null) { 121 System.err.println("CommitAsync failed: " + exception.getMessage()); 122 exception.printStackTrace(); 123 } else { 124 System.out.println("Committed offsets: " + Arrays.toString(offsetRanges)); 125 } 126 }); 127 } catch (Exception commitEx) { 128 commitEx.printStackTrace(); 129 } 130 }); 131 132 // --- 2) 窗口聚合:5 分鐘窗口,1 分鐘滑動(dòng),統(tǒng)計(jì) cost 總和,并寫(xiě)入 Hive --- 133 // 先將 stream 轉(zhuǎn)成 PairDStream<"all", cost> 134 JavaPairDStream<String, Long> costPairs = stream.mapToPair(record -> { 135 try { 136 Order o = deserialize(record.value()); 137 if (o != null && o.getCost() != null) { 138 return new Tuple2<>("all", o.getCost()); 139 } else { 140 return new Tuple2<>("all", 0L); 141 } 142 } catch (Exception e) { 143 e.printStackTrace(); 144 return new Tuple2<>("all", 0L); 145 } 146 }); 147 148 // reduceByKeyAndWindow (windowDuration=5min, slideDuration=1min) 149 JavaPairDStream<String, Long> windowed = costPairs.reduceByKeyAndWindow( 150 (a, b) -> a + b, 151 Durations.minutes(5), 152 Durations.minutes(1) 153 ); 154 155 // 在每個(gè)窗口的 RDD 中寫(xiě)入 Hive(可以插入到 order_cost_window_agg) 156 windowed.foreachRDD((rdd, time) -> { 157 if (rdd.isEmpty()) return; 158 159 // time 是窗口的結(jié)束時(shí)間(即當(dāng)前批次時(shí)間) 160 long windowEndMs = time.milliseconds(); 161 long windowStartMs = windowEndMs - 5 * 60 * 1000 + 1; // 包含窗口起點(diǎn) 162 163 List<Row> rows = rdd.map(tuple -> { 164 long total = tuple._2(); 165 return RowFactory.create(new Timestamp(windowStartMs), new Timestamp(windowEndMs), total); 166 }).collect(); 167 168 if (!rows.isEmpty()) { 169 SparkSession spark = JavaSparkSessionSingleton.getInstance(sparkConf); 170 StructType schema = new StructType(new StructField[]{ 171 new StructField("window_start", DataTypes.TimestampType, false, Metadata.empty()), 172 new StructField("window_end", DataTypes.TimestampType, false, Metadata.empty()), 173 new StructField("total_cost", DataTypes.LongType, false, Metadata.empty()) 174 }); 175 Dataset<Row> df = spark.createDataFrame(rows, schema); 176 // 寫(xiě)入 Hive 聚合表 177 df.write().mode(SaveMode.Append).insertInto("demo_db.order_cost_window_agg"); 178 179 // 同時(shí)打印到控制臺(tái) 180 df.show(false); 181 } 182 }); 183 184 // 啟動(dòng)流 185 jssc.start(); 186 jssc.awaitTermination(); 187 } 188 }
其中通過(guò)reduceByKeyAndWindow方法,使用了窗口大小為5分鐘,滑動(dòng)步長(zhǎng)為1分鐘的滑動(dòng)窗口,并將窗口大小內(nèi)的多個(gè)批次的RDD(每分鐘從kafka拉一批數(shù)據(jù),就是一個(gè)批次)匯總成一個(gè)窗口DStream(一個(gè)滑動(dòng)窗口對(duì)應(yīng)一個(gè)RDD,RDD包含該滑動(dòng)窗口的所有數(shù)據(jù)),并進(jìn)行了reduce操作,而最后通過(guò)foreachRDD對(duì)每個(gè)滑動(dòng)窗口(一個(gè)滑動(dòng)窗口對(duì)應(yīng)一個(gè)RDD)聚合后的數(shù)據(jù)寫(xiě)入到hive表中,觸發(fā)寫(xiě)入的時(shí)機(jī)為每個(gè)滑動(dòng)窗口的結(jié)束時(shí)間邊界。
二、原理解析
在 window() 操作中,foreachRDD() 會(huì)處理每個(gè)滑動(dòng)窗口內(nèi)的數(shù)據(jù),并且每個(gè)滑動(dòng)窗口只對(duì)應(yīng)一個(gè)RDD,這個(gè)RDD包含該滑動(dòng)窗口內(nèi)的所有數(shù)據(jù),包含了所有小批次的RDD數(shù)據(jù)(比如上面1分鐘讀一次kafka,那么一分鐘kafka就是一個(gè)批次)的聚合
window() 和 foreachRDD() 解釋
-
window()操作:-
window()用于將一個(gè)時(shí)間范圍內(nèi)的數(shù)據(jù)分成窗口,并且每個(gè)窗口的 時(shí)間區(qū)間是獨(dú)立的。例如,假設(shè)你使用了 5 分鐘的窗口和 1 分鐘的滑動(dòng)步長(zhǎng),每個(gè)窗口都是獨(dú)立的時(shí)間段,不會(huì)跨越多個(gè)窗口。 -
這個(gè)操作會(huì)創(chuàng)建多個(gè) 時(shí)間窗口(滑動(dòng)窗口),而每個(gè)窗口的聚合結(jié)果通常會(huì)被傳遞到 foreachRDD() 中。
-
-
foreachRDD()執(zhí)行次數(shù):-
對(duì)于時(shí)間窗口(滑動(dòng)窗口),
foreachRDD()會(huì)被調(diào)用一次。滑動(dòng)窗口中的數(shù)據(jù)是當(dāng)前窗口的所有數(shù)據(jù)。每個(gè)窗口的聚合結(jié)果會(huì)作為一個(gè)獨(dú)立的 RDD 傳遞給foreachRDD。 -
所以每個(gè)滑動(dòng)窗口的數(shù)據(jù)聚合 只會(huì)在對(duì)應(yīng)的窗口內(nèi) 計(jì)算一次。
-
例子:
假設(shè)你有一個(gè) 5 分鐘的窗口,1 分鐘滑動(dòng)步長(zhǎng),數(shù)據(jù)從 10:00 到 10:10(包括10:10)。你使用 window() 對(duì)數(shù)據(jù)進(jìn)行分組操作,類似這樣:
JavaDStream<Tuple2<String, Integer>> windowedStream = dstream .window(new Duration(5 * 60 * 1000), new Duration(1 * 60 * 1000)) .reduceByKey((x, y) -> x + y);
對(duì)于上面的代碼:
-
每個(gè)窗口會(huì)處理 5 分鐘的時(shí)間段。
-
如果你以 1 分鐘的滑動(dòng)步長(zhǎng)處理數(shù)據(jù),那么你會(huì)有如下的窗口區(qū)間:
-
10:00 到 10:05
-
10:01 到 10:06
-
10:02 到 10:07
-
10:03 到 10:08
-
10:04 到 10:09
-
10:05 到 10:10
-
每個(gè)窗口內(nèi)的數(shù)據(jù)會(huì)被獨(dú)立地處理,并且在 foreachRDD() 中輸出。
foreachRDD() 執(zhí)行的次數(shù)
-
對(duì)于每個(gè)時(shí)間窗口(滑動(dòng)窗口),
foreachRDD()會(huì)被調(diào)用一次,每次傳遞一個(gè)包含該時(shí)間段數(shù)據(jù)的 RDD。 -
窗口會(huì)按時(shí)間滑動(dòng),并且每個(gè)窗口的數(shù)據(jù)會(huì)生成一個(gè)獨(dú)立的 RDD,并在
foreachRDD()中執(zhí)行。 -
foreachRDD()的執(zhí)行次數(shù)等于窗口的數(shù)量,具體由窗口的大小和滑動(dòng)步長(zhǎng)決定。
示例:
假設(shè)你有以下數(shù)據(jù)流:
10:00, ORD1, 100 10:01, ORD2, 150 10:02, ORD3, 200 10:03, ORD4, 250 10:04, ORD5, 300 10:05, ORD6, 350 10:06, ORD7, 400 10:07, ORD8, 450 10:08, ORD9, 500 10:09, ORD10, 550 10:10, ORD11, 600
如果你使用了一個(gè) 5 分鐘窗口,1 分鐘滑動(dòng)步長(zhǎng):
window() 和 foreachRDD() 結(jié)果:
-
第一個(gè)窗口(10:00-10:05):
-
數(shù)據(jù):ORD1, ORD2, ORD3, ORD4, ORD5
-
聚合結(jié)果:
ORD1 + ORD2 + ORD3 + ORD4 + ORD5= 100 + 150 + 200 + 250 + 300 = 1000 -
結(jié)果會(huì)在 10:05 輸出
-
-
第二個(gè)窗口(10:01-10:06):
-
數(shù)據(jù):ORD2, ORD3, ORD4, ORD5, ORD6
-
聚合結(jié)果:
ORD2 + ORD3 + ORD4 + ORD5 + ORD6= 150 + 200 + 250 + 300 + 350 = 1250 -
結(jié)果會(huì)在 10:06 輸出
-
-
第三個(gè)窗口(10:02-10:07):
-
數(shù)據(jù):ORD3, ORD4, ORD5, ORD6, ORD7
-
聚合結(jié)果:
ORD3 + ORD4 + ORD5 + ORD6 + ORD7= 200 + 250 + 300 + 350 + 400 = 1500 -
結(jié)果會(huì)在 10:07 輸出
-
-
第四個(gè)窗口(10:03-10:08):
-
數(shù)據(jù):ORD4, ORD5, ORD6, ORD7, ORD8
-
聚合結(jié)果:
ORD4 + ORD5 + ORD6 + ORD7 + ORD8= 250 + 300 + 350 + 400 + 450 = 1750 -
結(jié)果會(huì)在 10:08 輸出
-
輸出示例:
關(guān)鍵點(diǎn)總結(jié):
-
window()會(huì)根據(jù)時(shí)間區(qū)間分割數(shù)據(jù),每個(gè)窗口處理一次foreachRDD()。 -
每個(gè)批次的數(shù)據(jù)是基于時(shí)間段的,因此每個(gè)窗口的輸出結(jié)果是獨(dú)立的。
-
foreachRDD()會(huì)在每個(gè)窗口對(duì)應(yīng)的批次內(nèi)執(zhí)行一次,而每個(gè)批次只會(huì)處理一個(gè)窗口的數(shù)據(jù),并不會(huì)匯總多個(gè)窗口的數(shù)據(jù)。
轉(zhuǎn)發(fā)請(qǐng)注明出處:http://www.rzrgm.cn/fnlingnzb-learner/p/19076296

浙公網(wǎng)安備 33010602011771號(hào)