<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Structured Streaming消費rocketMQ

      一、背景

      最近有一個需求是:要求有一個類對象為Order,它有string類型的字段orderNo和Long類型的字段cost,生產者寫到rocketMQ的value是Order對象序列化后的字節數組、key值是orderNo字段,要求spark以自動提交的方式消費rocketMQ,并將數據依次寫入到hive表中,并且spark中有一個5分鐘滑動窗口,滑動步長為1分鐘,統計5分鐘內的cost總值并輸出。

       

      二、實戰演練

      1. Order

       1 package com.example;
       2 
       3 import java.io.Serializable;
       4 
       5 public class Order implements Serializable {
       6     private static final long serialVersionUID = 1L;
       7 
       8     private String orderNo;
       9     private Long cost;
      10 
      11     public Order() {}
      12 
      13     public Order(String orderNo, Long cost) {
      14         this.orderNo = orderNo;
      15         this.cost = cost;
      16     }
      17 
      18     public String getOrderNo() {
      19         return orderNo;
      20     }
      21 
      22     public void setOrderNo(String orderNo) {
      23         this.orderNo = orderNo;
      24     }
      25 
      26     public Long getCost() {
      27         return cost;
      28     }
      29 
      30     public void setCost(Long cost) {
      31         this.cost = cost;
      32     }
      33 
      34     @Override
      35     public String toString() {
      36         return "Order{" +
      37                 "orderNo='" + orderNo + '\'' +
      38                 ", cost=" + cost +
      39                 '}';
      40     }
      41 }

      2. RocketMQ Producer 示例

       1 package com.example;
       2 
       3 import org.apache.rocketmq.client.producer.DefaultMQProducer;
       4 import org.apache.rocketmq.client.producer.SendResult;
       5 import org.apache.rocketmq.common.message.Message;
       6 
       7 import java.io.ByteArrayOutputStream;
       8 import java.io.ObjectOutputStream;
       9 import java.util.UUID;
      10 
      11 public class RocketMQOrderProducer {
      12     public static void main(String[] args) throws Exception {
      13         DefaultMQProducer producer = new DefaultMQProducer("order_producer_group");
      14         producer.setNamesrvAddr("localhost:9876"); // RocketMQ NameServer 地址
      15         producer.start();
      16 
      17         for (int i = 0; i < 20; i++) {
      18             String orderNo = "ORD-" + UUID.randomUUID().toString().substring(0, 8);
      19             long cost = (long) (Math.random() * 1000);
      20             Order order = new Order(orderNo, cost);
      21 
      22             byte[] body;
      23             try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
      24                  ObjectOutputStream oos = new ObjectOutputStream(bos)) {
      25                 oos.writeObject(order);
      26                 body = bos.toByteArray();
      27             }
      28 
      29             Message msg = new Message("OrderTopic", orderNo, body);
      30             SendResult result = producer.send(msg);
      31             System.out.printf("Sent: %s, result=%s%n", order, result);
      32         }
      33 
      34         producer.shutdown();
      35     }
      36 }

      3. Hive 建表 SQL

      -- 原始訂單表
      DROP TABLE IF EXISTS demo_db.orders;
      CREATE TABLE demo_db.orders (
        orderNo STRING,
        cost BIGINT
      )
      STORED AS PARQUET;
      
      -- 聚合結果表
      DROP TABLE IF EXISTS demo_db.order_cost_window_agg;
      CREATE TABLE demo_db.order_cost_window_agg (
        window_start TIMESTAMP,
        window_end TIMESTAMP,
        total_cost BIGINT
      )
      STORED AS PARQUET;

      4. Spark Structured Streaming 消費 RocketMQ & 寫入 Hive

      這里用 rocketmq-spark connector,Structured Streaming 模式下,offset 會自動 checkpoint。

       1 package com.example;
       2 
       3 import org.apache.spark.sql.*;
       4 import org.apache.spark.sql.streaming.StreamingQuery;
       5 import org.apache.spark.sql.streaming.Trigger;
       6 import org.apache.spark.sql.types.DataTypes;
       7 
       8 import java.io.ByteArrayInputStream;
       9 import java.io.ObjectInputStream;
      10 
      11 public class StructuredStreamingRocketMQToHive {
      12     public static void main(String[] args) throws Exception {
      13         SparkSession spark = SparkSession.builder()
      14                 .appName("StructuredStreamingRocketMQToHive")
      15                 .enableHiveSupport()
      16                 .getOrCreate();
      17 
      18         // 注冊反序列化 UDF
      19         spark.udf().register("deserializeOrderNo", (byte[] bytes) -> {
      20             try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes))) {
      21                 Order o = (Order) ois.readObject();
      22                 return o.getOrderNo();
      23             } catch (Exception e) {
      24                 return null;
      25             }
      26         }, DataTypes.StringType);
      27 
      28         spark.udf().register("deserializeCost", (byte[] bytes) -> {
      29             try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes))) {
      30                 Order o = (Order) ois.readObject();
      31                 return o.getCost();
      32             } catch (Exception e) {
      33                 return null;
      34             }
      35         }, DataTypes.LongType);
      36 
      37         // 讀取 RocketMQ 流
      38         Dataset<Row> df = spark.readStream()
      39                 .format("org.apache.rocketmq.spark")
      40                 .option("namesrvAddr", "localhost:9876")
      41                 .option("consumerGroup", "order_consumer_group")
      42                 .option("topics", "OrderTopic")
      43                 .load();
      44 
      45         // RocketMQ Connector 輸出的 DataFrame 包含 key、body、topic、offset 等字段
      46         Dataset<Row> orderDF = df.withColumn("orderNo", functions.callUDF("deserializeOrderNo", df.col("body")))
      47                 .withColumn("cost", functions.callUDF("deserializeCost", df.col("body")))
      48                 .select("orderNo", "cost")
      49                 .filter("orderNo IS NOT NULL AND cost IS NOT NULL");
      50 
      51         // 寫入 Hive 原始訂單表
      52         StreamingQuery orderQuery = orderDF.writeStream()
      53                 .format("hive")
      54                 .outputMode("append")
      55                 .option("checkpointLocation", "/user/hive/warehouse/checkpoints/orders")
      56                 .toTable("demo_db.orders");
      57 
      58         // 窗口統計(處理時間窗口)
      59         Dataset<Row> aggDF = orderDF
      60                 .withColumn("event_time", functions.current_timestamp())
      61                 .groupBy(functions.window(functions.col("event_time"), "5 minutes", "1 minute"))
      62                 .agg(functions.sum("cost").alias("total_cost"))
      63                 .selectExpr("window.start as window_start", "window.end as window_end", "total_cost");
      64 
      65         // 寫入 Hive 聚合表
      66         StreamingQuery aggQuery = aggDF.writeStream()
      67                 .format("hive")
      68                 .outputMode("append")
      69                 .option("checkpointLocation", "/user/hive/warehouse/checkpoints/orders_agg")
      70                 .toTable("demo_db.order_cost_window_agg");
      71 
      72         spark.streams().awaitAnyTermination();
      73     }
      74 }

      5. 關鍵點

      1. Structured Streaming 自動管理 offset,checkpoint 存在 HDFS/S3,恢復時會從 checkpoint 繼續。

      2. outputMode("append") + Hive sink,需要 Spark 開啟 enableHiveSupport()。

      3. 這里為了簡單,用的是 處理時間窗口current_timestamp()),如果你想改成 事件時間窗口,需要在 Order 類里加 ts 字段并解析出來,再配合 withWatermark() 使用。

       轉發請注明出處:http://www.rzrgm.cn/fnlingnzb-learner/p/19073626

      posted @ 2025-09-04 15:05  Boblim  閱讀(13)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 精品视频不卡免费观看| 亚洲av久久精品狠狠爱av| 91精品人妻中文字幕色| 在线免费观看毛片av| 中文字幕在线视频不卡一区二区| 亚洲午夜福利精品一二飞| 亚洲另类无码专区国内精品| 亚洲日韩性欧美中文字幕| 福利一区二区1000| 亚洲人成网线在线播放VA| 淅川县| 九九热精品免费在线视频| 高清偷拍一区二区三区| awww在线天堂bd资源在线| 亚洲av综合色区在线观看| 久久精品国产亚洲av天海翼| 欧美亚洲国产精品久久| 国产精品老熟女一区二区| 无码抽搐高潮喷水流白浆 | 欧美熟妇乱子伦XX视频| 日韩一本不卡一区二区三区| 啊灬啊灬啊灬快灬高潮了电影片段| www国产精品内射熟女| 亚洲av无码乱码在线观看野外| 国产啪视频免费观看视频| 亚洲精品成a人在线观看| 国产仑乱无码内谢| 亚洲成aⅴ人在线电影| 亚洲蜜臀av乱码久久| 亚洲精品久久| 国产太嫩了在线观看| 极品人妻少妇一区二区| 亚洲精品日韩精品久久| 欧美人与动牲交a免费| 欧美亚洲综合成人a∨在线| 好男人日本社区www| 亚洲天堂av日韩精品| 亚洲人成电影在线天堂色| 成人亚洲狠狠一二三四区| 成人精品视频一区二区三区| av无码一区二区大桥久未|