spark streaming(非structured steaming)消費(fèi)rocketMQ,并手動提交
一、背景
最近有一個需求是:要求有一個類對象為Order,它有string類型的字段orderNo和Long類型的字段cost,生產(chǎn)者寫到rocketMQ的value是Order對象序列化后的字節(jié)數(shù)組、key值是orderNo字段,要求spark以手動提交的方式消費(fèi)rocketMQ,并將數(shù)據(jù)依次寫入到hive表中,并且spark中有一個5分鐘滑動窗口,滑動步長為1分鐘,統(tǒng)計(jì)5分鐘內(nèi)的cost總值并輸出。
二、實(shí)戰(zhàn)演練
RocketMQ 官方并沒有像 Kafka 那樣的「Spark DirectStream 一鍵式手動提交 offset」通用方案;因此我用的是一種常見的、工程可控的做法來實(shí)現(xiàn)“手動提交 offset”語義:
-
使用 Spark Streaming (DStream)(不是 Structured Streaming,因?yàn)槟忝鞔_要手動提交 offset;Structured Streaming 的 offset 管理是自動的,不能手動提交)。
-
我實(shí)現(xiàn)了一個 自定義 Receiver(RocketMQReceiver),該 Receiver 在每個 batch 周期從 RocketMQ 拉取消息(Pull 模式),并把
Order對象推入 DStream。 -
手動提交 offset 的實(shí)現(xiàn):Receiver 不把 offset 自動永久化到 Broker,而是把“當(dāng)前讀取到的 offset”存放在一個外部可共享位置(示例使用 HDFS/local 文件作為簡單的 offset store)。Driver 在確認(rèn)該 batch 的 RDD 已成功寫入 Hive 后,會把“該 batch 已處理的 offsets”寫回到同一 offset store(即手動提交)。下一次 Receiver 啟動 / 拉取時,會從 offset store 讀取這些已提交的 offsets 繼續(xù)拉取。
這種方案與 Kafka 的
commitAsync思路等價(jià):只有當(dāng)你確認(rèn)完成寫入(輸出)后,才把 offset 持久化(提交)。該方法是通用的、工程上常用的“外部 offset 管理”策略,能滿足“手動提交”需求。
下面包含:pom.xml、Order.java、RocketMQ 生產(chǎn)者 OrderProducer.java、Hive 建表 SQL、Spark 程序 SparkRocketMQToHive.java(包含自定義 Receiver、window 邏輯與手動 offset 提交),以及運(yùn)行說明與注意事項(xiàng)。
1) pom.xml(主要依賴)
<project xmlns="http://maven.apache.org/POM/4.0.0" > <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>spark-rocketmq-hive-demo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <java.version>1.8</java.version> <scala.version>2.12</scala.version> <spark.version>3.3.2</spark.version> <rocketmq.version>4.9.4</rocketmq.version> </properties> <dependencies> <!-- Spark --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <!-- RocketMQ client (producer & pull consumer) --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>${rocketmq.version}</version> </dependency> <!-- Logging --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.36</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.36</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
2) Hive 建表 SQL
在 Hive CLI 或 Hue 執(zhí)行:
CREATE DATABASE IF NOT EXISTS demo_db; -- 原始訂單表,包含事件時間 ts(毫秒) CREATE TABLE IF NOT EXISTS demo_db.orders ( orderNo STRING, cost BIGINT, ts BIGINT ) STORED AS PARQUET; -- 窗口聚合表 CREATE TABLE IF NOT EXISTS demo_db.order_cost_window_agg ( window_start TIMESTAMP, window_end TIMESTAMP, total_cost BIGINT ) STORED AS PARQUET;
3) Order 類(Order.java)
1 package com.example; 2 3 import java.io.Serializable; 4 5 public class Order implements Serializable { 6 private static final long serialVersionUID = 1L; 7 8 private String orderNo; 9 private Long cost; 10 private Long ts; // 事件時間,毫秒 11 12 public Order() {} 13 14 public Order(String orderNo, Long cost, Long ts) { 15 this.orderNo = orderNo; 16 this.cost = cost; 17 this.ts = ts; 18 } 19 20 public String getOrderNo() { return orderNo; } 21 public void setOrderNo(String orderNo) { this.orderNo = orderNo; } 22 23 public Long getCost() { return cost; } 24 public void setCost(Long cost) { this.cost = cost; } 25 26 public Long getTs() { return ts; } 27 public void setTs(Long ts) { this.ts = ts; } 28 29 @Override 30 public String toString() { 31 return "Order{" + "orderNo='" + orderNo + '\'' + ", cost=" + cost + ", ts=" + ts + '}'; 32 } 33 }
序列化方案:示例使用 Java 原生序列化(ObjectOutputStream → byte[])。生產(chǎn)環(huán)境建議改為 JSON/Avro/ProtoBuf。
4) RocketMQ 生產(chǎn)者示例(OrderProducer.java)
1 package com.example; 2 3 import org.apache.rocketmq.client.producer.DefaultMQProducer; 4 import org.apache.rocketmq.common.message.Message; 5 6 import java.io.ByteArrayOutputStream; 7 import java.io.ObjectOutputStream; 8 import java.util.UUID; 9 10 public class OrderProducer { 11 public static byte[] serialize(Order order) throws Exception { 12 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 13 ObjectOutputStream oos = new ObjectOutputStream(bos); 14 oos.writeObject(order); 15 oos.flush(); 16 return bos.toByteArray(); 17 } 18 19 public static void main(String[] args) throws Exception { 20 String namesrvAddr = "localhost:9876"; // 修改為你的nameserver地址 21 String topic = "orders-topic"; 22 23 DefaultMQProducer producer = new DefaultMQProducer("order-producer-group"); 24 producer.setNamesrvAddr(namesrvAddr); 25 producer.start(); 26 27 for (int i = 0; i < 50; i++) { 28 String orderNo = "ORD-" + UUID.randomUUID().toString().substring(0, 8); 29 long cost = (long)(Math.random() * 1000); 30 long ts = System.currentTimeMillis(); 31 32 Order order = new Order(orderNo, cost, ts); 33 byte[] body = serialize(order); 34 35 Message msg = new Message(topic, "tagA", orderNo, body); 36 producer.send(msg); // 同步發(fā)送 37 System.out.println("Sent: " + order); 38 Thread.sleep(200); 39 } 40 41 producer.shutdown(); 42 } 43 }
5) Spark 程序(完整):SparkRocketMQToHive.java
主要思路:
-
自定義
RocketMQReceiver extends Receiver<Order>:在onStart啟動一個拉取線程,周期性從 RocketMQ 的每個 MessageQueue 拉取消息(從我們持久化的 offset store 開始),并store(order)推送給 Spark Streaming。 -
offset store:示例用一個簡單的文件(可以是 HDFS 路徑)保存 JSON 格式的已提交 offsets(per-queue)。 -
Driver 端的
foreachRDD會把 RDD 寫入 Hive(append),并且在成功寫入后把該 batch 的最大 offsets 寫入 offset store(即手動提交)。 -
窗口聚合:用 DStream 的
window(Durations.minutes(5), Durations.minutes(1)),在每個窗口上做map(Order->("all", cost))reduce,最后寫入聚合表。
注意:代碼做了較多注釋,請按注釋中的
TODO修改 RocketMQ namesrv、topic、以及 offsetStorePath(HDFS 路徑建議用 hdfs://...)。
1 package com.example; 2 3 import org.apache.hadoop.fs.*; 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; 6 import org.apache.rocketmq.client.consumer.PullResult; 7 import org.apache.rocketmq.client.consumer.PullStatus; 8 import org.apache.rocketmq.common.message.MessageExt; 9 import org.apache.rocketmq.common.message.MessageQueue; 10 import org.apache.spark.SparkConf; 11 import org.apache.spark.api.java.*; 12 import org.apache.spark.streaming.*; 13 import org.apache.spark.streaming.api.java.*; 14 import org.apache.spark.streaming.receiver.Receiver; 15 import org.apache.spark.storage.StorageLevel; 16 import org.apache.spark.sql.*; 17 18 import java.io.*; 19 import java.util.*; 20 import java.util.concurrent.ConcurrentHashMap; 21 import java.util.stream.Collectors; 22 23 import scala.Tuple2; 24 25 public class SparkRocketMQToHive { 26 27 // ------------------------- 28 // OffsetStore: 通過文件(HDFS/local)保存已提交 offsets(簡單實(shí)現(xiàn)) 29 // 存儲格式(簡單 JSON-like): 30 // queueId|brokerName|queueOffset\n 31 // ------------------------- 32 public static class OffsetStore { 33 private final String path; // e.g., hdfs://namenode:8020/user/offsets/orders_offsets.txt or file:///tmp/... 34 private final Configuration hadoopConf = new Configuration(); 35 36 public OffsetStore(String path) { 37 this.path = path; 38 } 39 40 // 讀取已提交 offsets -> Map<MessageQueue, Long> 41 public Map<MessageQueue, Long> readOffsets() throws IOException { 42 Map<MessageQueue, Long> map = new HashMap<>(); 43 Path p = new Path(path); 44 FileSystem fs = p.getFileSystem(hadoopConf); 45 if (!fs.exists(p)) return map; 46 47 try (FSDataInputStream in = fs.open(p); 48 BufferedReader br = new BufferedReader(new InputStreamReader(in))) { 49 String line; 50 while ((line = br.readLine()) != null) { 51 // 格式: brokerName|queueId|offset 52 String[] parts = line.trim().split("\\|"); 53 if (parts.length == 3) { 54 String broker = parts[0]; 55 int queueId = Integer.parseInt(parts[1]); 56 long offset = Long.parseLong(parts[2]); 57 // MessageQueue requires topic too — we will reconstruct with topic externally 58 // To simplify, we return map keyed by "broker:queueId" in caller 59 MessageQueue mq = new MessageQueue(); // placeholder, set fields later 60 mq.setBrokerName(broker); 61 mq.setQueueId(queueId); 62 // cannot set topic here; caller will match based on broker+queueId. 63 map.put(mq, offset); 64 } 65 } 66 } catch (Exception e) { 67 throw new IOException("readOffsets failed", e); 68 } 69 return map; 70 } 71 72 // 簡化寫入:由 caller 提供 list of entries (broker|queueId|offset) 73 public void writeOffsets(List<String> lines) throws IOException { 74 Path p = new Path(path); 75 FileSystem fs = p.getFileSystem(hadoopConf); 76 // 覆蓋寫入 77 try (FSDataOutputStream out = fs.create(p, true); 78 BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out))) { 79 for (String line : lines) { 80 bw.write(line); 81 bw.newLine(); 82 } 83 } 84 } 85 } 86 87 // ------------------------- 88 // RocketMQReceiver:自定義 Receiver,從 RocketMQ pull 消息并 store() 89 // ------------------------- 90 public static class RocketMQReceiver extends Receiver<Order> { 91 private volatile boolean stopped = false; 92 private final String namesrvAddr; 93 private final String topic; 94 private final String consumerGroup; 95 private final String offsetStorePath; // offset store path 96 private transient DefaultMQPullConsumer pullConsumer; 97 98 // local map to hold the latest pulled offsets per MessageQueue 99 private final Map<MessageQueue, Long> latestPulledOffsets = new ConcurrentHashMap<>(); 100 101 public RocketMQReceiver(String namesrvAddr, String topic, String consumerGroup, String offsetStorePath) { 102 super(StorageLevel.MEMORY_AND_DISK_2()); 103 this.namesrvAddr = namesrvAddr; 104 this.topic = topic; 105 this.consumerGroup = consumerGroup; 106 this.offsetStorePath = offsetStorePath; 107 } 108 109 @Override 110 public void onStart() { 111 stopped = false; 112 // start pulling thread 113 new Thread(this::pullLoop, "rocketmq-pull-thread").start(); 114 } 115 116 @Override 117 public void onStop() { 118 stopped = true; 119 if (pullConsumer != null) { 120 try { 121 pullConsumer.shutdown(); 122 } catch (Exception ignored) { 123 } 124 } 125 } 126 127 private void pullLoop() { 128 try { 129 pullConsumer = new DefaultMQPullConsumer(consumerGroup + "_pull"); 130 pullConsumer.setNamesrvAddr(namesrvAddr); 131 pullConsumer.start(); 132 133 // 獲取所有 message queues for topic 134 Set<MessageQueue> mqs = pullConsumer.fetchSubscribeMessageQueues(topic); 135 136 // 讀取已提交 offsets(從 offset store) 137 Map<MessageQueue, Long> committed = readOffsetStore(); 138 139 while (!stopped) { 140 for (MessageQueue mq : mqs) { 141 long offset = 0L; 142 // find committed offset matching same broker+queueId 143 Optional<Map.Entry<MessageQueue, Long>> found = committed.entrySet().stream() 144 .filter(e -> e.getKey().getBrokerName().equals(mq.getBrokerName()) 145 && e.getKey().getQueueId() == mq.getQueueId()) 146 .findFirst(); 147 if (found.isPresent()) offset = found.get().getValue(); 148 // Pull 149 try { 150 PullResult pullResult = pullConsumer.pullBlockIfNotFound(mq, null, offset, 32); 151 if (pullResult != null) { 152 if (pullResult.getPullStatus() == PullStatus.FOUND) { 153 List<MessageExt> msgs = pullResult.getMsgFoundList(); 154 if (msgs != null) { 155 for (MessageExt me : msgs) { 156 try { 157 byte[] body = me.getBody(); 158 Order o = deserialize(body); 159 if (o != null) { 160 store(o); // 推入 DStream 161 } 162 } catch (Exception e) { 163 // log and continue 164 e.printStackTrace(); 165 } 166 } 167 } 168 } 169 // 更新本地 latestPulledOffsets:下次從 pullResult.getNextBeginOffset() 拉 170 long nextOffset = pullResult.getNextBeginOffset(); 171 latestPulledOffsets.put(mq, nextOffset); 172 } 173 } catch (Exception ex) { 174 ex.printStackTrace(); 175 } 176 if (stopped) break; 177 } 178 // 拉完一輪 messageQueues 之后,短睡眠一下,等待下一 batch 拉取 179 Thread.sleep(500); 180 } 181 } catch (Exception e) { 182 e.printStackTrace(); 183 restart("rocketmq pull failed", e); 184 } 185 } 186 187 // 從 offset store 讀取已提交 offsets(簡化) 188 private Map<MessageQueue, Long> readOffsetStore() { 189 OffsetStore store = new OffsetStore(offsetStorePath); 190 try { 191 return store.readOffsets(); 192 } catch (Exception e) { 193 // log and return empty 194 e.printStackTrace(); 195 return new HashMap<>(); 196 } 197 } 198 199 // 反序列化 Order(Java 原生序列化) 200 private Order deserialize(byte[] bytes) { 201 if (bytes == null) return null; 202 try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes); 203 ObjectInputStream ois = new ObjectInputStream(bis)) { 204 return (Order) ois.readObject(); 205 } catch (Exception e) { 206 e.printStackTrace(); 207 return null; 208 } 209 } 210 211 // 提供外部訪問最新 pulled offsets 的方法(driver 可以讀取并在處理完寫入后提交) 212 public Map<MessageQueue, Long> getLatestPulledOffsets() { 213 return new HashMap<>(latestPulledOffsets); 214 } 215 } 216 217 // ------------------------- 218 // 主程序:Spark Streaming 作業(yè) 219 // ------------------------- 220 public static void main(String[] args) throws Exception { 221 // config 222 String namesrv = "localhost:9876"; // TODO 修改 223 String topic = "orders-topic"; 224 String consumerGroup = "spark-rocketmq-group"; 225 String offsetStorePath = "file:///tmp/rocketmq_offsets.txt"; // TODO 改為 HDFS 路徑:hdfs://namenode:8020/user/offsets/orders_offsets.txt 226 227 SparkConf conf = new SparkConf() 228 .setAppName("SparkRocketMQToHive") 229 .setIfMissing("spark.master", "local[2]") 230 .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); 231 232 // batch interval = 1 minute (符合滑動步長) 233 JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.minutes(1)); 234 // checkpoint dir for Spark Streaming state (if needed) 235 jssc.checkpoint("/tmp/spark-streaming-checkpoint"); 236 237 // 創(chuàng)建并注冊 Receiver 238 RocketMQReceiver receiver = new RocketMQReceiver(namesrv, topic, consumerGroup, offsetStorePath); 239 JavaReceiverInputDStream<Order> stream = jssc.receiverStream(receiver); 240 241 // 將原始 order 寫入 Hive(每個 micro-batch 的 RDD) 242 stream.foreachRDD((JavaRDD<Order> rdd, Time time) -> { 243 if (rdd.isEmpty()) { 244 System.out.println("No data in this batch: " + time); 245 return; 246 } 247 248 // 創(chuàng)建 SparkSession(在 executor 上創(chuàng)建會出問題,因此在 driver 上操作) 249 SparkSession spark = SparkSession.builder() 250 .config(conf) 251 .enableHiveSupport() 252 .getOrCreate(); 253 254 Dataset<Row> df = spark.createDataFrame(rdd, Order.class); 255 // 寫入 orders 表(append) 256 df.write().mode(SaveMode.Append).insertInto("demo_db.orders"); 257 258 System.out.println("Wrote batch to Hive: " + time + ", count=" + rdd.count()); 259 260 // 手動提交 offset:從 receiver 獲取 latestPulledOffsets,然后把它持久化到 offsetStorePath(覆蓋寫) 261 Map<MessageQueue, Long> latestOffsets = receiver.getLatestPulledOffsets(); 262 List<String> lines = latestOffsets.entrySet().stream() 263 .map(e -> e.getKey().getBrokerName() + "|" + e.getKey().getQueueId() + "|" + e.getValue()) 264 .collect(Collectors.toList()); 265 OffsetStore store = new OffsetStore(offsetStorePath); 266 store.writeOffsets(lines); 267 System.out.println("Committed offsets for this batch: " + lines); 268 }); 269 270 // 窗口聚合:5 分鐘窗口,1 分鐘滑動,統(tǒng)計(jì) cost 總和 271 JavaPairDStream<String, Long> kv = stream.mapToPair(order -> new Tuple2<>("all", order.getCost() == null ? 0L : order.getCost())); 272 JavaPairDStream<String, Long> windowed = kv.reduceByKeyAndWindow((a, b) -> a + b, 273 Durations.minutes(5), Durations.minutes(1)); 274 275 // 每個窗口寫入聚合表 276 windowed.foreachRDD((rdd, time) -> { 277 if (rdd.isEmpty()) return; 278 SparkSession spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate(); 279 280 final long windowEndMs = time.milliseconds(); 281 final long windowStartMs = windowEndMs - 5 * 60 * 1000 + 1; 282 283 List<Row> rows = rdd.map(tuple -> RowFactory.create(new java.sql.Timestamp(windowStartMs), 284 new java.sql.Timestamp(windowEndMs), tuple._2())).collect(); 285 286 StructType schema = new StructType(new StructField[]{ 287 new StructField("window_start", DataTypes.TimestampType, false, Metadata.empty()), 288 new StructField("window_end", DataTypes.TimestampType, false, Metadata.empty()), 289 new StructField("total_cost", DataTypes.LongType, false, Metadata.empty()) 290 }); 291 292 Dataset<Row> df = spark.createDataFrame(rows, schema); 293 df.write().mode(SaveMode.Append).insertInto("demo_db.order_cost_window_agg"); 294 System.out.println("Wrote window agg: start=" + new java.sql.Timestamp(windowStartMs) + " end=" + new java.sql.Timestamp(windowEndMs)); 295 }); 296 297 // 啟動 298 jssc.start(); 299 jssc.awaitTermination(); 300 } 301 }
6) 運(yùn)行步驟(本地測試)
-
啟動 RocketMQ (nameserver + broker)。確保
namesrvAddr在 producer/consumer/receiver 中一致。 -
在 Hive 中執(zhí)行建表 SQL。
-
用 Maven 打包:
-
先運(yùn)行
OrderProducer發(fā)送一些消息: -
使用
spark-submit提交 Spark Streaming 作業(yè)(注意把 rocketmq-client 的 jar 加到 classpath,或者使用 --packages/--jars):或把 rocketmq-client 打包進(jìn) fat-jar。
-
觀察 Hive 表
demo_db.orders和demo_db.order_cost_window_agg是否有數(shù)據(jù)。
7) 設(shè)計(jì)說明與注意事項(xiàng)(必須閱讀)
-
為什么用 Receiver + 外部 offset store?
-
RocketMQ 與 Kafka 在生態(tài)、客戶端 API 上不同,且沒有像 Kafka DirectStream 那樣的“offsetRanges 可在 driver 中獲取并 commitAsync”現(xiàn)成模式;用 Receiver + 自己維護(hù) offset store(HDFS/file/ZK)可以實(shí)現(xiàn)“在確認(rèn)寫入外部 sink 后再提交 offset”這一語義。
-
-
offset 的一致性
-
該實(shí)現(xiàn)保證:只有在 Spark 確認(rèn)該 batch 數(shù)據(jù)已寫入 Hive 后,才把最新 pulled offsets 持久化(提交)。若寫 Hive 失敗,則不會提交 offsets,下次作業(yè)重啟或 receiver 重新讀取時會從上次提交點(diǎn)重新讀取(冪等問題需用戶在寫入時處理,比如去重或使用唯一 key 覆蓋寫入策略)。
-
-
OffsetStore 的實(shí)現(xiàn)示例簡單
-
示例使用了一個非常簡單的文件格式。生產(chǎn)環(huán)境建議使用 Zookeeper、HBase、MySQL 或 RocketMQ 的 broker commit 接口(如果可用)來存放 offsets,需要考慮并發(fā)/權(quán)限/原子寫入等問題。使用 HDFS 寫入需確保 driver 與 receiver 都能訪問同一路徑。
-
-
性能 & 并發(fā)
-
示例中 receiver 一次拉取固定數(shù)量(32 條)并逐條
store()。你可以根據(jù)需要調(diào)整并行度(多個 receiver 實(shí)例、每 queue 的并發(fā)拉取、批量 store 等)。另外,window 聚合會把數(shù)據(jù)在 driver 上合并寫入 Hive(請按集群資源調(diào)優(yōu)并行度、分區(qū)數(shù)等)。
-
-
序列化方案
-
示例使用 Java native serialization(ObjectOutputStream)。生產(chǎn)環(huán)境應(yīng)優(yōu)先使用 JSON / Avro / Protobuf 以便于兼容與演進(jìn)。
-
-
冪等性
-
如果你的 Hive 寫入不是冪等的(例如重復(fù)寫會產(chǎn)生重復(fù)記錄),建議在訂單表中使用
orderNo做主鍵去重或在寫入前做去重。因?yàn)槿魧懯『笾卦嚕瑫a(chǎn)生重復(fù)消費(fèi)。
-
-
時間與窗口
-
示例窗口使用 processing-time(因?yàn)?DStream 沒有像 Structured Streaming 那樣方便的事件時間 watermark)。如果你需要基于
ts(Order 內(nèi)的事件時間)做更準(zhǔn)確的事件時間窗口,并處理亂序數(shù)據(jù),建議使用 Structured Streaming(但 Structured Streaming 不允許手動提交 offset)。這兩者之間是權(quán)衡:事件時間 + watermark(Structured Streaming)VS 可控手動提交 offset(本示例用 DStream + 自管理 offset)。
-
8) 下步可選改進(jìn)
-
把 offsetStore 從文件改成 ZooKeeper 實(shí)現(xiàn)(更可靠,適合分布式部署)。
-
把序列化改成 JSON(Jackson):更易于調(diào)試和兼容。
-
把 Receiver 的拉取邏輯改成批量化、高吞吐版本(同時按 partition 拉取并做并行 store)。
-
如果你想改用 Structured Streaming(事件時間 + watermark 優(yōu)勢),我可以給出無法手動提交 offset 的替代方案:例如把 Hive 寫入做冪等化(基于 orderNo)從而允許使用 Structured Streaming 的自動 offset 管理與 exactly-once 語義。
轉(zhuǎn)載請注明出處:http://www.rzrgm.cn/fnlingnzb-learner/p/19073689

浙公網(wǎng)安備 33010602011771號