Structured Streaming消費rocketMQ
一、背景
最近有一個需求是:要求有一個類對象為Order,它有string類型的字段orderNo和Long類型的字段cost,生產者寫到rocketMQ的value是Order對象序列化后的字節數組、key值是orderNo字段,要求spark以自動提交的方式消費rocketMQ,并將數據依次寫入到hive表中,并且spark中有一個5分鐘滑動窗口,滑動步長為1分鐘,統計5分鐘內的cost總值并輸出。
二、實戰演練
1. Order 類
1 package com.example; 2 3 import java.io.Serializable; 4 5 public class Order implements Serializable { 6 private static final long serialVersionUID = 1L; 7 8 private String orderNo; 9 private Long cost; 10 11 public Order() {} 12 13 public Order(String orderNo, Long cost) { 14 this.orderNo = orderNo; 15 this.cost = cost; 16 } 17 18 public String getOrderNo() { 19 return orderNo; 20 } 21 22 public void setOrderNo(String orderNo) { 23 this.orderNo = orderNo; 24 } 25 26 public Long getCost() { 27 return cost; 28 } 29 30 public void setCost(Long cost) { 31 this.cost = cost; 32 } 33 34 @Override 35 public String toString() { 36 return "Order{" + 37 "orderNo='" + orderNo + '\'' + 38 ", cost=" + cost + 39 '}'; 40 } 41 }
2. RocketMQ Producer 示例
1 package com.example; 2 3 import org.apache.rocketmq.client.producer.DefaultMQProducer; 4 import org.apache.rocketmq.client.producer.SendResult; 5 import org.apache.rocketmq.common.message.Message; 6 7 import java.io.ByteArrayOutputStream; 8 import java.io.ObjectOutputStream; 9 import java.util.UUID; 10 11 public class RocketMQOrderProducer { 12 public static void main(String[] args) throws Exception { 13 DefaultMQProducer producer = new DefaultMQProducer("order_producer_group"); 14 producer.setNamesrvAddr("localhost:9876"); // RocketMQ NameServer 地址 15 producer.start(); 16 17 for (int i = 0; i < 20; i++) { 18 String orderNo = "ORD-" + UUID.randomUUID().toString().substring(0, 8); 19 long cost = (long) (Math.random() * 1000); 20 Order order = new Order(orderNo, cost); 21 22 byte[] body; 23 try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); 24 ObjectOutputStream oos = new ObjectOutputStream(bos)) { 25 oos.writeObject(order); 26 body = bos.toByteArray(); 27 } 28 29 Message msg = new Message("OrderTopic", orderNo, body); 30 SendResult result = producer.send(msg); 31 System.out.printf("Sent: %s, result=%s%n", order, result); 32 } 33 34 producer.shutdown(); 35 } 36 }
3. Hive 建表 SQL
-- 原始訂單表 DROP TABLE IF EXISTS demo_db.orders; CREATE TABLE demo_db.orders ( orderNo STRING, cost BIGINT ) STORED AS PARQUET; -- 聚合結果表 DROP TABLE IF EXISTS demo_db.order_cost_window_agg; CREATE TABLE demo_db.order_cost_window_agg ( window_start TIMESTAMP, window_end TIMESTAMP, total_cost BIGINT ) STORED AS PARQUET;
4. Spark Structured Streaming 消費 RocketMQ & 寫入 Hive
這里用 rocketmq-spark connector,Structured Streaming 模式下,offset 會自動 checkpoint。
1 package com.example; 2 3 import org.apache.spark.sql.*; 4 import org.apache.spark.sql.streaming.StreamingQuery; 5 import org.apache.spark.sql.streaming.Trigger; 6 import org.apache.spark.sql.types.DataTypes; 7 8 import java.io.ByteArrayInputStream; 9 import java.io.ObjectInputStream; 10 11 public class StructuredStreamingRocketMQToHive { 12 public static void main(String[] args) throws Exception { 13 SparkSession spark = SparkSession.builder() 14 .appName("StructuredStreamingRocketMQToHive") 15 .enableHiveSupport() 16 .getOrCreate(); 17 18 // 注冊反序列化 UDF 19 spark.udf().register("deserializeOrderNo", (byte[] bytes) -> { 20 try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes))) { 21 Order o = (Order) ois.readObject(); 22 return o.getOrderNo(); 23 } catch (Exception e) { 24 return null; 25 } 26 }, DataTypes.StringType); 27 28 spark.udf().register("deserializeCost", (byte[] bytes) -> { 29 try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes))) { 30 Order o = (Order) ois.readObject(); 31 return o.getCost(); 32 } catch (Exception e) { 33 return null; 34 } 35 }, DataTypes.LongType); 36 37 // 讀取 RocketMQ 流 38 Dataset<Row> df = spark.readStream() 39 .format("org.apache.rocketmq.spark") 40 .option("namesrvAddr", "localhost:9876") 41 .option("consumerGroup", "order_consumer_group") 42 .option("topics", "OrderTopic") 43 .load(); 44 45 // RocketMQ Connector 輸出的 DataFrame 包含 key、body、topic、offset 等字段 46 Dataset<Row> orderDF = df.withColumn("orderNo", functions.callUDF("deserializeOrderNo", df.col("body"))) 47 .withColumn("cost", functions.callUDF("deserializeCost", df.col("body"))) 48 .select("orderNo", "cost") 49 .filter("orderNo IS NOT NULL AND cost IS NOT NULL"); 50 51 // 寫入 Hive 原始訂單表 52 StreamingQuery orderQuery = orderDF.writeStream() 53 .format("hive") 54 .outputMode("append") 55 .option("checkpointLocation", "/user/hive/warehouse/checkpoints/orders") 56 .toTable("demo_db.orders"); 57 58 // 窗口統計(處理時間窗口) 59 Dataset<Row> aggDF = orderDF 60 .withColumn("event_time", functions.current_timestamp()) 61 .groupBy(functions.window(functions.col("event_time"), "5 minutes", "1 minute")) 62 .agg(functions.sum("cost").alias("total_cost")) 63 .selectExpr("window.start as window_start", "window.end as window_end", "total_cost"); 64 65 // 寫入 Hive 聚合表 66 StreamingQuery aggQuery = aggDF.writeStream() 67 .format("hive") 68 .outputMode("append") 69 .option("checkpointLocation", "/user/hive/warehouse/checkpoints/orders_agg") 70 .toTable("demo_db.order_cost_window_agg"); 71 72 spark.streams().awaitAnyTermination(); 73 } 74 }
5. 關鍵點
-
Structured Streaming 自動管理 offset,checkpoint 存在 HDFS/S3,恢復時會從 checkpoint 繼續。
-
outputMode("append")+ Hive sink,需要 Spark 開啟enableHiveSupport()。 -
這里為了簡單,用的是 處理時間窗口(
current_timestamp()),如果你想改成 事件時間窗口,需要在Order類里加ts字段并解析出來,再配合withWatermark()使用。

浙公網安備 33010602011771號