spark streaming消費rocketmq的幾種方式
在 Spark 里接入 RocketMQ,主要有兩大類方式:
?? 1. 基于 老的 Spark Streaming (DStream API)
RocketMQ 社區提供過 rocketmq-spark connector(在 apache/rocketmq-externals 里),可以像 Kafka 一樣創建 DStream:
方式 A:Receiver 模式
-
使用自定義的
Receiver從 RocketMQ 拉取消息。 -
每條消息進入 Spark Streaming 的
ReceiverInputDStream。 -
優點:實現簡單。
-
缺點:消息會先緩存在 Spark executor 的內存里,容錯依賴 Spark 的 WAL(Write Ahead Log),性能和可靠性一般。
a. 不使用WAL(Write Ahead Log)
核心思路是:
-
在 Spark 里實現一個自定義
Receiver<T>,內部運行 RocketMQ PushConsumer。 -
PushConsumer 收到消息后,調用
store(msg)把數據寫入 Spark Streaming 的內存隊列。 -
Spark Streaming 后續把這些數據打包成 RDD 處理。
核心流程:
-
RocketMQ → PushConsumer
-
消息推送到 Spark 進程。
-
-
Spark Receiver → store()
-
Receiver 緩存消息,存到 Spark executor 的 BlockManager。
-
-
Spark Streaming Job
-
定時將數據生成 RDD 進行處理。
-
示例代碼
1?? Order 類
1 import java.io.Serializable; 2 3 public class Order implements Serializable { 4 private String orderNo; 5 private Long cost; 6 7 public Order(String orderNo, Long cost) { 8 this.orderNo = orderNo; 9 this.cost = cost; 10 } 11 12 public String getOrderNo() { return orderNo; } 13 public Long getCost() { return cost; } 14 15 @Override 16 public String toString() { 17 return "Order{" + 18 "orderNo='" + orderNo + '\'' + 19 ", cost=" + cost + 20 '}'; 21 } 22 }
2?? 自定義 RocketMQReceiver
1 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; 2 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; 3 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; 4 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; 5 import org.apache.rocketmq.common.message.MessageExt; 6 import org.apache.spark.storage.StorageLevel; 7 import org.apache.spark.streaming.receiver.Receiver; 8 9 import java.io.ByteArrayInputStream; 10 import java.io.ObjectInputStream; 11 import java.util.List; 12 13 public class RocketMQReceiver extends Receiver<Order> { 14 private final String namesrvAddr; 15 private final String topic; 16 private final String group; 17 18 private transient DefaultMQPushConsumer consumer; 19 20 public RocketMQReceiver(String namesrvAddr, String topic, String group) { 21 super(StorageLevel.MEMORY_AND_DISK_2()); 22 this.namesrvAddr = namesrvAddr; 23 this.topic = topic; 24 this.group = group; 25 } 26 27 @Override 28 public void onStart() { 29 new Thread(this::initConsumer).start(); 30 } 31 32 private void initConsumer() { 33 try { 34 consumer = new DefaultMQPushConsumer(group); 35 consumer.setNamesrvAddr(namesrvAddr); 36 consumer.subscribe(topic, "*"); 37 38 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { 39 for (MessageExt msg : msgs) { 40 Order order = deserialize(msg.getBody()); 41 if (order != null) { 42 store(order); // 推送到 Spark 43 } 44 } 45 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 46 }); 47 48 consumer.start(); 49 System.out.println("RocketMQReceiver started."); 50 } catch (Exception e) { 51 restart("Error starting RocketMQReceiver", e); 52 } 53 } 54 55 @Override 56 public void onStop() { 57 if (consumer != null) { 58 consumer.shutdown(); 59 } 60 } 61 62 private Order deserialize(byte[] body) { 63 try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body))) { 64 return (Order) ois.readObject(); 65 } catch (Exception e) { 66 return null; 67 } 68 } 69 }
3?? Spark Streaming 主程序
1 import org.apache.spark.SparkConf; 2 import org.apache.spark.streaming.Durations; 3 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; 4 import org.apache.spark.streaming.api.java.JavaStreamingContext; 5 6 public class RocketMQStreamingApp { 7 public static void main(String[] args) throws Exception { 8 SparkConf conf = new SparkConf().setAppName("RocketMQReceiverExample").setMaster("local[2]"); 9 JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5)); 10 11 // 創建 Receiver 12 JavaReceiverInputDStream<Order> stream = 13 ssc.receiverStream(new RocketMQReceiver("localhost:9876", "OrderTopic", "spark_group")); 14 15 // 簡單處理:打印訂單 16 stream.foreachRDD(rdd -> { 17 rdd.foreach(order -> System.out.println("Got order: " + order)); 18 }); 19 20 ssc.start(); 21 ssc.awaitTermination(); 22 } 23 }
b. 使用WAL(Write Ahead Log)
在 Spark Streaming 里,開啟 WAL 很簡單:
-
設置 checkpoint 目錄(必須是 HDFS 或可靠存儲);
-
Receiver 要用
StorageLevel.MEMORY_AND_DISK_SER_2()(支持 WAL 持久化); -
Spark 自動把每條接收到的數據先寫到 WAL,再交給 BlockManager。
?? 完整示例代碼(帶 WAL)
1?? Order 類(和之前相同)
1 import java.io.Serializable; 2 3 public class Order implements Serializable { 4 private String orderNo; 5 private Long cost; 6 7 public Order(String orderNo, Long cost) { 8 this.orderNo = orderNo; 9 this.cost = cost; 10 } 11 12 public String getOrderNo() { return orderNo; } 13 public Long getCost() { return cost; } 14 15 @Override 16 public String toString() { 17 return "Order{" + 18 "orderNo='" + orderNo + '\'' + 19 ", cost=" + cost + 20 '}'; 21 } 22 }
2?? RocketMQReceiver(改用支持 WAL 的 StorageLevel)
1 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; 2 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; 3 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; 4 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; 5 import org.apache.rocketmq.common.message.MessageExt; 6 import org.apache.spark.storage.StorageLevel; 7 import org.apache.spark.streaming.receiver.Receiver; 8 9 import java.io.ByteArrayInputStream; 10 import java.io.ObjectInputStream; 11 12 public class RocketMQReceiver extends Receiver<Order> { 13 private final String namesrvAddr; 14 private final String topic; 15 private final String group; 16 17 private transient DefaultMQPushConsumer consumer; 18 19 public RocketMQReceiver(String namesrvAddr, String topic, String group) { 20 // 使用支持 WAL 的存儲級別 21 super(StorageLevel.MEMORY_AND_DISK_SER_2()); 22 this.namesrvAddr = namesrvAddr; 23 this.topic = topic; 24 this.group = group; 25 } 26 27 @Override 28 public void onStart() { 29 new Thread(this::initConsumer).start(); 30 } 31 32 private void initConsumer() { 33 try { 34 consumer = new DefaultMQPushConsumer(group); 35 consumer.setNamesrvAddr(namesrvAddr); 36 consumer.subscribe(topic, "*"); 37 38 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { 39 for (MessageExt msg : msgs) { 40 Order order = deserialize(msg.getBody()); 41 if (order != null) { 42 // Spark 會先寫 WAL,再寫 BlockManager 43 store(order); 44 } 45 } 46 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 47 }); 48 49 consumer.start(); 50 System.out.println("RocketMQReceiver started with WAL."); 51 } catch (Exception e) { 52 restart("Error starting RocketMQReceiver", e); 53 } 54 } 55 56 @Override 57 public void onStop() { 58 if (consumer != null) { 59 consumer.shutdown(); 60 } 61 } 62 63 private Order deserialize(byte[] body) { 64 try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body))) { 65 return (Order) ois.readObject(); 66 } catch (Exception e) { 67 return null; 68 } 69 } 70 }
3?? 主程序(開啟 WAL 需要 checkpoint)
1 import org.apache.spark.SparkConf; 2 import org.apache.spark.streaming.Durations; 3 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; 4 import org.apache.spark.streaming.api.java.JavaStreamingContext; 5 6 public class RocketMQStreamingApp { 7 public static void main(String[] args) throws Exception { 8 SparkConf conf = new SparkConf() 9 .setAppName("RocketMQReceiverWithWAL") 10 .setMaster("local[2]"); 11 12 // 每 5 秒一個 batch 13 JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5)); 14 15 // 設置 checkpoint 目錄(必須是可靠存儲,如 HDFS) 16 ssc.checkpoint("hdfs://namenode:8020/spark-checkpoints/rocketmq"); 17 18 // 創建帶 WAL 的 Receiver 19 JavaReceiverInputDStream<Order> stream = 20 ssc.receiverStream(new RocketMQReceiver("localhost:9876", "OrderTopic", "spark_group")); 21 22 // 簡單處理:打印訂單 23 stream.foreachRDD(rdd -> { 24 rdd.foreach(order -> System.out.println("Got order (with WAL): " + order)); 25 }); 26 27 ssc.start(); 28 ssc.awaitTermination(); 29 } 30 }
注意事項
-
checkpoint 必須是 HDFS/S3/OSS 等分布式存儲,本地路徑只適合測試。
-
WAL 會寫日志文件,保證 至少一次(at-least-once) 語義,但仍可能有重復消息,需要業務端去重。
-
Receiver 模式 + WAL 性能比 Direct 模式差(多一次磁盤 IO)。
-
若想要 exactly-once,通常推薦 Structured Streaming(自動 checkpoint + sink 支持事務)。
?? Receiver 模式特點
? 優點
-
實現簡單:直接用 RocketMQ PushConsumer 推消息到 Spark。
-
不需要手動管理 offset。
? 缺點
-
Spark Receiver 先把數據存到內存(BlockManager),如果 Spark 崩潰,數據可能丟失。
-
容錯要依賴 WAL(Write Ahead Log),但 WAL 會寫 HDFS,性能比 Direct 模式差。
-
難以保證嚴格 exactly-once。
方式 B:Direct 模式
-
類似 Kafka Direct Stream,Spark Streaming 直接從 RocketMQ 拉取數據,不依賴 Spark Receiver。
-
消息 offset 由用戶管理,可以手動提交(通常寫到 Zookeeper 或外部存儲)。
-
優點:性能更好,保證數據至少一次處理。
-
缺點:需要自己管理 offset 提交,開發復雜一些。
而Direct 模式的特點是:
-
不依賴 Spark Receiver(沒有 WAL 開銷)。
-
Spark Driver 直接從 RocketMQ 拉取消息。
-
消費的 offset 由 Spark Driver 維護,通常要手動存儲到外部(比如 HDFS、MySQL、Zookeeper)以便恢復。
實現思路
-
準備 RocketMQ Consumer API
-
Spark 沒有內置 RocketMQ Direct API(像 Kafka 那樣),需要借助 rocketmq-spark connector 或者自定義 Consumer。
-
原理和 Kafka DirectStream 一樣:
-
在每個 micro-batch 觸發時,去 RocketMQ 拉取一段消息(指定起始 offset、結束 offset)。
-
轉換成 RDD,交給 Spark 執行。
-
-
-
關鍵點
-
手動管理 offset:RocketMQ 不會自動幫 Spark 提交,需要你把 offset 存到外部存儲(比如 HDFS/ZK/MySQL)。
-
并行度:可以按 Topic 的 Queue(partition 類似)拆分 RDD,分發到不同 task。
-
容錯:作業失敗后,重新從存儲的 offset 位置恢復。
-
?? 代碼示例(Direct 模式偽實現)
假設有 OrderTopic,包含多個 MessageQueue:
1 package com.example; 2 3 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; 4 import org.apache.rocketmq.client.consumer.PullResult; 5 import org.apache.rocketmq.client.consumer.PullStatus; 6 import org.apache.rocketmq.common.message.MessageExt; 7 import org.apache.rocketmq.common.message.MessageQueue; 8 import org.apache.spark.SparkConf; 9 import org.apache.spark.api.java.JavaRDD; 10 import org.apache.spark.api.java.JavaSparkContext; 11 import org.apache.spark.streaming.Durations; 12 import org.apache.spark.streaming.api.java.JavaInputDStream; 13 import org.apache.spark.streaming.api.java.JavaStreamingContext; 14 15 import java.util.*; 16 17 public class RocketMQDirectStreamExample { 18 public static void main(String[] args) throws Exception { 19 SparkConf conf = new SparkConf() 20 .setAppName("RocketMQDirectStreamExample") 21 .setMaster("local[2]"); 22 23 JavaSparkContext sc = new JavaSparkContext(conf); 24 JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(5)); 25 26 // RocketMQ Consumer 27 DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("spark_consumer_group"); 28 consumer.setNamesrvAddr("localhost:9876"); 29 consumer.start(); 30 31 // 獲取 Topic 下所有 Queue 32 Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("OrderTopic"); 33 34 // 手動維護每個 queue 的 offset 35 Map<MessageQueue, Long> offsetTable = new HashMap<>(); 36 for (MessageQueue mq : mqs) { 37 offsetTable.put(mq, 0L); // 可以從外部存儲恢復 38 } 39 40 // 每個 micro-batch 拉取數據 41 ssc.foreachRDD(time -> { 42 List<Order> orders = new ArrayList<>(); 43 44 for (MessageQueue mq : mqs) { 45 long offset = offsetTable.get(mq); 46 47 // 拉取消息 48 PullResult pullResult = consumer.pullBlockIfNotFound(mq, "*", offset, 32); 49 50 if (pullResult.getPullStatus() == PullStatus.FOUND) { 51 for (MessageExt msg : pullResult.getMsgFoundList()) { 52 // 反序列化消息體 53 Order order = deserialize(msg.getBody()); 54 if (order != null) { 55 orders.add(order); 56 } 57 } 58 // 更新 offset 59 offsetTable.put(mq, pullResult.getNextBeginOffset()); 60 } 61 } 62 63 // 轉換為 RDD 64 JavaRDD<Order> rdd = sc.parallelize(orders); 65 rdd.foreach(o -> System.out.println("Got Order: " + o)); 66 67 // TODO: 把 offsetTable 持久化到外部存儲(保證容錯) 68 }); 69 70 ssc.start(); 71 ssc.awaitTermination(); 72 } 73 74 private static Order deserialize(byte[] body) { 75 try (java.io.ObjectInputStream ois = 76 new java.io.ObjectInputStream(new java.io.ByteArrayInputStream(body))) { 77 return (Order) ois.readObject(); 78 } catch (Exception e) { 79 return null; 80 } 81 } 82 }
關鍵點說明
-
offset 管理
-
offsetTable記錄每個 MessageQueue 的消費位置。 -
每個 batch 消費后更新,并寫入外部存儲(比如 MySQL/HDFS)。
-
程序重啟時先從外部恢復 offset。
-
-
并行度
-
可以用
sc.parallelize(orders),但更高效的是:每個MessageQueue映射成一個 RDD 分片,利用 Spark 分布式并行消費。
-
-
語義保證
-
默認 at-least-once,可能會有重復消費,需要結合業務去重。
-
如果 offset 和結果同時事務寫入,可以做到 effectively-once。
-
?? 總結:
-
Receiver 模式 → 簡單但性能差。
-
Direct 模式 → 手動拉取 offset,性能高,但需要自己管理 offset。
-
Structured Streaming → 推薦的現代方案,自動 offset 管理,SQL API,更容易保證 exactly-once。
?? 2. 基于 Structured Streaming (DataFrame/Dataset API)
Structured Streaming 是 Spark 2.x 之后推薦的流處理 API。RocketMQ Connector 也支持 Structured Streaming:
方式 A:作為 Source
直接通過:
得到一個 DataFrame,包含:
-
key -
body -
topic -
tags -
offset -
timestamp等字段。
offset 自動 checkpoint,不需要手動提交。
方式 B:作為 Sink
Structured Streaming 也能把結果寫回 RocketMQ:
這樣 Spark 的計算結果會被寫到另一個 RocketMQ topic。
?? 3. 自己實現 Consumer → Spark
如果不想用官方 connector,也可以自己寫:
-
在 Spark 里啟動一個 RocketMQ Java Consumer。
-
消費消息后,把數據寫到 Spark Streaming 的隊列(例如
queueStream)。 -
Spark Streaming 從這個隊列里生成 DStream 進行計算。
?? 這種方式靈活,但 offset 管理和 exactly-once 語義都要自己處理,一般不推薦,除非你有特殊需求(比如自定義序列化/解碼)。
?? 總結
-
老的 Spark Streaming (DStream):
-
Receiver 模式(簡單,但容錯差)。
-
Direct 模式(性能好,可手動提交 offset)。
-
-
Structured Streaming:
-
推薦方式,作為 RocketMQ Source(offset 自動管理,SQL API 簡潔)。
-
可以寫回 RocketMQ。
-
-
自研 Consumer + queueStream:
-
靈活,但 offset、容錯全靠自己。
-

浙公網安備 33010602011771號