<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Spark streaming的窗口操作(window、reduceByWindow等)和foreachRDD結(jié)合

      一、背景

      最近有一個(gè)需求是:要求有一個(gè)類對(duì)象為Order,它有string類型的字段orderNo和Long類型的字段cost,生產(chǎn)者寫(xiě)到kafka的value是Order對(duì)象序列化后的字節(jié)數(shù)組、key值是orderNo字段,要求spark以手動(dòng)提交的方式消費(fèi)kafka,并將數(shù)據(jù)依次寫(xiě)入到hive表中,并且spark中有一個(gè)5分鐘滑動(dòng)窗口,滑動(dòng)步長(zhǎng)為1分鐘,統(tǒng)計(jì)5分鐘內(nèi)的cost總值并輸出。

      然后實(shí)現(xiàn)代碼里用到了reduceByKeyAndWindow的窗口操作,畢竟配合了foreachRDD進(jìn)行了操作,所以對(duì)這一塊做了一個(gè)研究。代碼如下:

        1 package com.example;
        2 
        3 import org.apache.kafka.clients.consumer.ConsumerConfig;
        4 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
        5 import org.apache.kafka.common.serialization.StringDeserializer;
        6 import org.apache.spark.SparkConf;
        7 import org.apache.spark.api.java.JavaRDD;
        8 import org.apache.spark.api.java.JavaSparkContext;
        9 import org.apache.spark.api.java.function.Function;
       10 import org.apache.spark.sql.*;
       11 import org.apache.spark.streaming.*;
       12 import org.apache.spark.streaming.api.java.*;
       13 import org.apache.spark.streaming.kafka010.*;
       14 
       15 import java.io.ByteArrayInputStream;
       16 import java.io.ObjectInputStream;
       17 import java.sql.Timestamp;
       18 import java.util.*;
       19 import java.util.stream.Collectors;
       20 
       21 import scala.Tuple2;
       22 
       23 public class SparkKafkaToHive {
       24     public static Order deserialize(byte[] bytes) throws Exception {
       25         if (bytes == null) return null;
       26         ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
       27         ObjectInputStream ois = new ObjectInputStream(bis);
       28         Object o = ois.readObject();
       29         return (Order) o;
       30     }
       31 
       32     // SparkSession 單例幫助類(在 foreachRDD 中復(fù)用 SparkSession)
       33     public static class JavaSparkSessionSingleton {
       34         private static transient SparkSession instance = null;
       35 
       36         public static SparkSession getInstance(SparkConf conf) {
       37             if (instance == null) {
       38                 synchronized (JavaSparkSessionSingleton.class) {
       39                     if (instance == null) {
       40                         instance = SparkSession.builder()
       41                                 .config(conf)
       42                                 .enableHiveSupport()
       43                                 .getOrCreate();
       44                     }
       45                 }
       46             }
       47             return instance;
       48         }
       49     }
       50 
       51     public static void main(String[] args) throws Exception {
       52         String brokers = "localhost:9092";
       53         String topic = "orders-topic";
       54         String groupId = "spark-orders-consumer-group";
       55 
       56         // SparkConf & StreamingContext(微批間隔:1 分鐘,符合滑動(dòng)步長(zhǎng))
       57         SparkConf sparkConf = new SparkConf()
       58                 .setAppName("SparkKafkaToHiveDemo")
       59                 .setIfMissing("spark.master", "local[2]") // 本地測(cè)試時(shí)使用
       60                 .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
       61 
       62         // 這里設(shè)置 1 分鐘 batch interval(因?yàn)榛瑒?dòng)步長(zhǎng)是 1 分鐘)
       63         Duration batchInterval = Durations.minutes(1);
       64         JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, batchInterval);
       65 
       66         // Kafka params
       67         Map<String, Object> kafkaParams = new HashMap<>();
       68         kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
       69         kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       70         kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
       71         kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
       72         kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
       73         // 禁用自動(dòng)提交,由我們手動(dòng)提交
       74         kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
       75 
       76         Collection<String> topics = Collections.singletonList(topic);
       77 
       78         // Create Direct Stream
       79         JavaInputDStream<org.apache.kafka.clients.consumer.ConsumerRecord<String, byte[]>> stream =
       80                 KafkaUtils.createDirectStream(
       81                         jssc,
       82                         LocationStrategies.PreferConsistent(),
       83                         ConsumerStrategies.<String, byte[]>Subscribe(topics, kafkaParams)
       84                 );
       85 
       86         // --- 1) 將每個(gè) micro-batch 的所有訂單寫(xiě)入 Hive ---
       87         stream.foreachRDD((rdd, time) -> {
       88             if (rdd.isEmpty()) {
       89                 // 沒(méi)有數(shù)據(jù)也應(yīng)該嘗試提交 offsets?這里跳過(guò)提交以示例簡(jiǎn)單(可視需求決定)
       90                 return;
       91             }
       92 
       93             // 獲取 offset ranges
       94             OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
       95 
       96             // 反序列化為 Order 對(duì)象 JavaRDD<Order>
       97             JavaRDD<Order> ordersRDD = rdd.map(record -> {
       98                 try {
       99                     return deserialize(record.value());
      100                 } catch (Exception e) {
      101                     // 處理反序列化異常(記錄日志并丟棄)
      102                     e.printStackTrace();
      103                     return null;
      104                 }
      105             }).filter(Objects::nonNull);
      106 
      107             // 寫(xiě)入 Hive
      108             if (!ordersRDD.isEmpty()) {
      109                 // 獲?。ɑ騽?chuàng)建)SparkSession
      110                 SparkSession spark = JavaSparkSessionSingleton.getInstance(sparkConf);
      111                 Dataset<Row> df = spark.createDataFrame(ordersRDD, Order.class);
      112                 // 寫(xiě)入 Hive 表(append)
      113                 df.write().mode(SaveMode.Append).insertInto("demo_db.orders");
      114             }
      115 
      116             // 處理完業(yè)務(wù)后,提交 offsets 到 Kafka(手動(dòng)提交)
      117             // 需要對(duì) stream 進(jìn)行類型轉(zhuǎn)換以調(diào)用 commitAsync
      118             try {
      119                 ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges, (offsets, exception) -> {
      120                     if (exception != null) {
      121                         System.err.println("CommitAsync failed: " + exception.getMessage());
      122                         exception.printStackTrace();
      123                     } else {
      124                         System.out.println("Committed offsets: " + Arrays.toString(offsetRanges));
      125                     }
      126                 });
      127             } catch (Exception commitEx) {
      128                 commitEx.printStackTrace();
      129             }
      130         });
      131 
      132         // --- 2) 窗口聚合:5 分鐘窗口,1 分鐘滑動(dòng),統(tǒng)計(jì) cost 總和,并寫(xiě)入 Hive ---
      133         // 先將 stream 轉(zhuǎn)成 PairDStream<"all", cost>
      134         JavaPairDStream<String, Long> costPairs = stream.mapToPair(record -> {
      135             try {
      136                 Order o = deserialize(record.value());
      137                 if (o != null && o.getCost() != null) {
      138                     return new Tuple2<>("all", o.getCost());
      139                 } else {
      140                     return new Tuple2<>("all", 0L);
      141                 }
      142             } catch (Exception e) {
      143                 e.printStackTrace();
      144                 return new Tuple2<>("all", 0L);
      145             }
      146         });
      147 
      148         // reduceByKeyAndWindow (windowDuration=5min, slideDuration=1min)
      149         JavaPairDStream<String, Long> windowed = costPairs.reduceByKeyAndWindow(
      150                 (a, b) -> a + b,
      151                 Durations.minutes(5),
      152                 Durations.minutes(1)
      153         );
      154 
      155         // 在每個(gè)窗口的 RDD 中寫(xiě)入 Hive(可以插入到 order_cost_window_agg)
      156         windowed.foreachRDD((rdd, time) -> {
      157             if (rdd.isEmpty()) return;
      158 
      159             // time 是窗口的結(jié)束時(shí)間(即當(dāng)前批次時(shí)間)
      160             long windowEndMs = time.milliseconds();
      161             long windowStartMs = windowEndMs - 5 * 60 * 1000 + 1; // 包含窗口起點(diǎn)
      162 
      163             List<Row> rows = rdd.map(tuple -> {
      164                 long total = tuple._2();
      165                 return RowFactory.create(new Timestamp(windowStartMs), new Timestamp(windowEndMs), total);
      166             }).collect();
      167 
      168             if (!rows.isEmpty()) {
      169                 SparkSession spark = JavaSparkSessionSingleton.getInstance(sparkConf);
      170                 StructType schema = new StructType(new StructField[]{
      171                         new StructField("window_start", DataTypes.TimestampType, false, Metadata.empty()),
      172                         new StructField("window_end", DataTypes.TimestampType, false, Metadata.empty()),
      173                         new StructField("total_cost", DataTypes.LongType, false, Metadata.empty())
      174                 });
      175                 Dataset<Row> df = spark.createDataFrame(rows, schema);
      176                 // 寫(xiě)入 Hive 聚合表
      177                 df.write().mode(SaveMode.Append).insertInto("demo_db.order_cost_window_agg");
      178 
      179                 // 同時(shí)打印到控制臺(tái)
      180                 df.show(false);
      181             }
      182         });
      183 
      184         // 啟動(dòng)流
      185         jssc.start();
      186         jssc.awaitTermination();
      187     }
      188 }

      其中通過(guò)reduceByKeyAndWindow方法,使用了窗口大小為5分鐘,滑動(dòng)步長(zhǎng)為1分鐘的滑動(dòng)窗口,并將窗口大小內(nèi)的多個(gè)批次的RDD(每分鐘從kafka拉一批數(shù)據(jù),就是一個(gè)批次)匯總成一個(gè)窗口DStream(一個(gè)滑動(dòng)窗口對(duì)應(yīng)一個(gè)RDD,RDD包含該滑動(dòng)窗口的所有數(shù)據(jù)),并進(jìn)行了reduce操作,而最后通過(guò)foreachRDD對(duì)每個(gè)滑動(dòng)窗口(一個(gè)滑動(dòng)窗口對(duì)應(yīng)一個(gè)RDD)聚合后的數(shù)據(jù)寫(xiě)入到hive表中,觸發(fā)寫(xiě)入的時(shí)機(jī)為每個(gè)滑動(dòng)窗口的結(jié)束時(shí)間邊界。

      二、原理解析

       window() 操作中,foreachRDD() 會(huì)處理每個(gè)滑動(dòng)窗口內(nèi)的數(shù)據(jù),并且每個(gè)滑動(dòng)窗口只對(duì)應(yīng)一個(gè)RDD,這個(gè)RDD包含該滑動(dòng)窗口內(nèi)的所有數(shù)據(jù),包含了所有小批次的RDD數(shù)據(jù)(比如上面1分鐘讀一次kafka,那么一分鐘kafka就是一個(gè)批次)的聚合

      window() 和 foreachRDD() 解釋

      1. window() 操作:

        • window() 用于將一個(gè)時(shí)間范圍內(nèi)的數(shù)據(jù)分成窗口,并且每個(gè)窗口的 時(shí)間區(qū)間是獨(dú)立的。例如,假設(shè)你使用了 5 分鐘的窗口和 1 分鐘的滑動(dòng)步長(zhǎng),每個(gè)窗口都是獨(dú)立的時(shí)間段,不會(huì)跨越多個(gè)窗口。

        • 這個(gè)操作會(huì)創(chuàng)建多個(gè) 時(shí)間窗口(滑動(dòng)窗口),而每個(gè)窗口的聚合結(jié)果通常會(huì)被傳遞到 foreachRDD() 中。

      2. foreachRDD() 執(zhí)行次數(shù):

        • 對(duì)于時(shí)間窗口(滑動(dòng)窗口),foreachRDD() 會(huì)被調(diào)用一次。滑動(dòng)窗口中的數(shù)據(jù)是當(dāng)前窗口的所有數(shù)據(jù)。每個(gè)窗口的聚合結(jié)果會(huì)作為一個(gè)獨(dú)立的 RDD 傳遞給 foreachRDD

        • 所以每個(gè)滑動(dòng)窗口的數(shù)據(jù)聚合 只會(huì)在對(duì)應(yīng)的窗口內(nèi) 計(jì)算一次。

      例子:

      假設(shè)你有一個(gè) 5 分鐘的窗口,1 分鐘滑動(dòng)步長(zhǎng),數(shù)據(jù)從 10:00 到 10:10(包括10:10)。你使用 window() 對(duì)數(shù)據(jù)進(jìn)行分組操作,類似這樣:

      JavaDStream<Tuple2<String, Integer>> windowedStream = dstream
          .window(new Duration(5 * 60 * 1000), new Duration(1 * 60 * 1000))
          .reduceByKey((x, y) -> x + y);

      對(duì)于上面的代碼:

      • 每個(gè)窗口會(huì)處理 5 分鐘的時(shí)間段。

      • 如果你以 1 分鐘的滑動(dòng)步長(zhǎng)處理數(shù)據(jù),那么你會(huì)有如下的窗口區(qū)間:

        • 10:00 到 10:05

        • 10:01 到 10:06

        • 10:02 到 10:07

        • 10:03 到 10:08

        • 10:04 到 10:09

        • 10:05 到 10:10

      每個(gè)窗口內(nèi)的數(shù)據(jù)會(huì)被獨(dú)立地處理,并且在 foreachRDD() 中輸出。

      foreachRDD() 執(zhí)行的次數(shù)

      • 對(duì)于每個(gè)時(shí)間窗口(滑動(dòng)窗口),foreachRDD() 會(huì)被調(diào)用一次,每次傳遞一個(gè)包含該時(shí)間段數(shù)據(jù)的 RDD。

      • 窗口會(huì)按時(shí)間滑動(dòng),并且每個(gè)窗口的數(shù)據(jù)會(huì)生成一個(gè)獨(dú)立的 RDD,并在 foreachRDD() 中執(zhí)行。

      • foreachRDD() 的執(zhí)行次數(shù)等于窗口的數(shù)量,具體由窗口的大小和滑動(dòng)步長(zhǎng)決定。

      示例:

      假設(shè)你有以下數(shù)據(jù)流:

      10:00, ORD1, 100 10:01, ORD2, 150 10:02, ORD3, 200 10:03, ORD4, 250 10:04, ORD5, 300 10:05, ORD6, 350 10:06, ORD7, 400 10:07, ORD8, 450 10:08, ORD9, 500 10:09, ORD10, 550 10:10, ORD11, 600

      如果你使用了一個(gè) 5 分鐘窗口,1 分鐘滑動(dòng)步長(zhǎng):

      window() 和 foreachRDD() 結(jié)果:

      • 第一個(gè)窗口(10:00-10:05):

        • 數(shù)據(jù):ORD1, ORD2, ORD3, ORD4, ORD5

        • 聚合結(jié)果:ORD1 + ORD2 + ORD3 + ORD4 + ORD5 = 100 + 150 + 200 + 250 + 300 = 1000

        • 結(jié)果會(huì)在 10:05 輸出

      • 第二個(gè)窗口(10:01-10:06):

        • 數(shù)據(jù):ORD2, ORD3, ORD4, ORD5, ORD6

        • 聚合結(jié)果:ORD2 + ORD3 + ORD4 + ORD5 + ORD6 = 150 + 200 + 250 + 300 + 350 = 1250

        • 結(jié)果會(huì)在 10:06 輸出

      • 第三個(gè)窗口(10:02-10:07):

        • 數(shù)據(jù):ORD3, ORD4, ORD5, ORD6, ORD7

        • 聚合結(jié)果:ORD3 + ORD4 + ORD5 + ORD6 + ORD7 = 200 + 250 + 300 + 350 + 400 = 1500

        • 結(jié)果會(huì)在 10:07 輸出

      • 第四個(gè)窗口(10:03-10:08):

        • 數(shù)據(jù):ORD4, ORD5, ORD6, ORD7, ORD8

        • 聚合結(jié)果:ORD4 + ORD5 + ORD6 + ORD7 + ORD8 = 250 + 300 + 350 + 400 + 450 = 1750

        • 結(jié)果會(huì)在 10:08 輸出

      輸出示例:

      Time: 2025-09-05 10:05:00, Window: [2025-09-05 10:00:00, 2025-09-05 10:05:00], Total Cost: 1000 Time: 2025-09-05 10:06:00, Window: [2025-09-05 10:01:00, 2025-09-05 10:06:00], Total Cost: 1250 Time: 2025-09-05 10:07:00, Window: [2025-09-05 10:02:00, 2025-09-05 10:07:00], Total Cost: 1500 Time: 2025-09-05 10:08:00, Window: [2025-09-05 10:03:00, 2025-09-05 10:08:00], Total Cost: 1750 ...

      關(guān)鍵點(diǎn)總結(jié):

      • window() 會(huì)根據(jù)時(shí)間區(qū)間分割數(shù)據(jù),每個(gè)窗口處理一次 foreachRDD()。

      • 每個(gè)批次的數(shù)據(jù)是基于時(shí)間段的,因此每個(gè)窗口的輸出結(jié)果是獨(dú)立的。

      • foreachRDD() 會(huì)在每個(gè)窗口對(duì)應(yīng)的批次內(nèi)執(zhí)行一次,而每個(gè)批次只會(huì)處理一個(gè)窗口的數(shù)據(jù),并不會(huì)匯總多個(gè)窗口的數(shù)據(jù)。

       

      參考:DStream 中有幾個(gè)RDD ?

      轉(zhuǎn)發(fā)請(qǐng)注明出處:http://www.rzrgm.cn/fnlingnzb-learner/p/19076296

      posted @ 2025-09-06 01:25  Boblim  閱讀(15)  評(píng)論(0)    收藏  舉報(bào)
      主站蜘蛛池模板: 深夜精品免费在线观看| 日韩精品一区二区三免费| 亚洲国产成人午夜在线一区| 国产成人午夜精品福利| 亚洲一区二区三区自拍偷拍| 日本高清免费不卡视频| 中文字幕制服国产精品| 国产麻豆放荡av激情演绎| 精品国产欧美一区二区三区在线 | 精选国产av精选一区二区三区 | 精品乱码一区二区三四五区| 久久人人97超碰人人澡爱香蕉| 国产成人8X人网站视频| 国产在线精品欧美日韩电影| 免费人成在线观看成人片| 国产无套粉嫩白浆在线| 亚洲av日韩在线资源| 国偷自产一区二区三区在线视频 | 怡红院一区二区三区在线| 极品美女扒开粉嫩小泬图片 | 国产伦子沙发午休系列资源曝光 | 乐都县| 亚洲精品美女久久久久9999 | 国产激情艳情在线看视频| 免费网站看av片| 少妇宾馆粉嫩10p| 天堂а√在线最新版中文在线| 亚洲ΑV久久久噜噜噜噜噜| 舞阳县| 国产精品高清中文字幕| 国产成人精品国产成人亚洲| 成人中文在线| 欧洲精品色在线观看| 欧美中文字幕在线看| 中文字幕精品人妻丝袜| 日韩熟女熟妇久久精品综合| 国产成人综合色就色综合| 好吊视频一区二区三区人妖| av色蜜桃一区二区三区| 激情动态图亚洲区域激情| 狠狠色丁香婷婷综合尤物|