<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      spark streaming消費rocketmq的幾種方式

      在 Spark 里接入 RocketMQ,主要有兩大類方式:


      ?? 1. 基于 老的 Spark Streaming (DStream API)

      RocketMQ 社區提供過 rocketmq-spark connector(在 apache/rocketmq-externals 里),可以像 Kafka 一樣創建 DStream:

      方式 A:Receiver 模式

      • 使用自定義的 Receiver 從 RocketMQ 拉取消息。

      • 每條消息進入 Spark Streaming 的 ReceiverInputDStream

      • 優點:實現簡單。

      • 缺點:消息會先緩存在 Spark executor 的內存里,容錯依賴 Spark 的 WAL(Write Ahead Log),性能和可靠性一般。

      a. 不使用WAL(Write Ahead Log)

      核心思路是:

      • 在 Spark 里實現一個自定義 Receiver<T>,內部運行 RocketMQ PushConsumer

      • PushConsumer 收到消息后,調用 store(msg) 把數據寫入 Spark Streaming 的內存隊列。

      • Spark Streaming 后續把這些數據打包成 RDD 處理。

      核心流程:

      1. RocketMQ → PushConsumer

        • 消息推送到 Spark 進程。

      2. Spark Receiver → store()

        • Receiver 緩存消息,存到 Spark executor 的 BlockManager。

      3. Spark Streaming Job

        • 定時將數據生成 RDD 進行處理。

      示例代碼

      1?? Order 類

       1 import java.io.Serializable;
       2 
       3 public class Order implements Serializable {
       4     private String orderNo;
       5     private Long cost;
       6 
       7     public Order(String orderNo, Long cost) {
       8         this.orderNo = orderNo;
       9         this.cost = cost;
      10     }
      11 
      12     public String getOrderNo() { return orderNo; }
      13     public Long getCost() { return cost; }
      14 
      15     @Override
      16     public String toString() {
      17         return "Order{" +
      18                 "orderNo='" + orderNo + '\'' +
      19                 ", cost=" + cost +
      20                 '}';
      21     }
      22 }

      2?? 自定義 RocketMQReceiver

       1 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
       2 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
       3 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
       4 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
       5 import org.apache.rocketmq.common.message.MessageExt;
       6 import org.apache.spark.storage.StorageLevel;
       7 import org.apache.spark.streaming.receiver.Receiver;
       8 
       9 import java.io.ByteArrayInputStream;
      10 import java.io.ObjectInputStream;
      11 import java.util.List;
      12 
      13 public class RocketMQReceiver extends Receiver<Order> {
      14     private final String namesrvAddr;
      15     private final String topic;
      16     private final String group;
      17 
      18     private transient DefaultMQPushConsumer consumer;
      19 
      20     public RocketMQReceiver(String namesrvAddr, String topic, String group) {
      21         super(StorageLevel.MEMORY_AND_DISK_2());
      22         this.namesrvAddr = namesrvAddr;
      23         this.topic = topic;
      24         this.group = group;
      25     }
      26 
      27     @Override
      28     public void onStart() {
      29         new Thread(this::initConsumer).start();
      30     }
      31 
      32     private void initConsumer() {
      33         try {
      34             consumer = new DefaultMQPushConsumer(group);
      35             consumer.setNamesrvAddr(namesrvAddr);
      36             consumer.subscribe(topic, "*");
      37 
      38             consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
      39                 for (MessageExt msg : msgs) {
      40                     Order order = deserialize(msg.getBody());
      41                     if (order != null) {
      42                         store(order); // 推送到 Spark
      43                     }
      44                 }
      45                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      46             });
      47 
      48             consumer.start();
      49             System.out.println("RocketMQReceiver started.");
      50         } catch (Exception e) {
      51             restart("Error starting RocketMQReceiver", e);
      52         }
      53     }
      54 
      55     @Override
      56     public void onStop() {
      57         if (consumer != null) {
      58             consumer.shutdown();
      59         }
      60     }
      61 
      62     private Order deserialize(byte[] body) {
      63         try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body))) {
      64             return (Order) ois.readObject();
      65         } catch (Exception e) {
      66             return null;
      67         }
      68     }
      69 }

      3?? Spark Streaming 主程序

       1 import org.apache.spark.SparkConf;
       2 import org.apache.spark.streaming.Durations;
       3 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
       4 import org.apache.spark.streaming.api.java.JavaStreamingContext;
       5 
       6 public class RocketMQStreamingApp {
       7     public static void main(String[] args) throws Exception {
       8         SparkConf conf = new SparkConf().setAppName("RocketMQReceiverExample").setMaster("local[2]");
       9         JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));
      10 
      11         // 創建 Receiver
      12         JavaReceiverInputDStream<Order> stream =
      13                 ssc.receiverStream(new RocketMQReceiver("localhost:9876", "OrderTopic", "spark_group"));
      14 
      15         // 簡單處理:打印訂單
      16         stream.foreachRDD(rdd -> {
      17             rdd.foreach(order -> System.out.println("Got order: " + order));
      18         });
      19 
      20         ssc.start();
      21         ssc.awaitTermination();
      22     }
      23 }

      b. 使用WAL(Write Ahead Log)

      在 Spark Streaming 里,開啟 WAL 很簡單:

      1. 設置 checkpoint 目錄(必須是 HDFS 或可靠存儲);

      2. Receiver 要用 StorageLevel.MEMORY_AND_DISK_SER_2()(支持 WAL 持久化);

      3. Spark 自動把每條接收到的數據先寫到 WAL,再交給 BlockManager。

      ?? 完整示例代碼(帶 WAL)

      1?? Order 類(和之前相同)

       1 import java.io.Serializable;
       2 
       3 public class Order implements Serializable {
       4     private String orderNo;
       5     private Long cost;
       6 
       7     public Order(String orderNo, Long cost) {
       8         this.orderNo = orderNo;
       9         this.cost = cost;
      10     }
      11 
      12     public String getOrderNo() { return orderNo; }
      13     public Long getCost() { return cost; }
      14 
      15     @Override
      16     public String toString() {
      17         return "Order{" +
      18                 "orderNo='" + orderNo + '\'' +
      19                 ", cost=" + cost +
      20                 '}';
      21     }
      22 }

      2?? RocketMQReceiver(改用支持 WAL 的 StorageLevel)

       1 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
       2 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
       3 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
       4 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
       5 import org.apache.rocketmq.common.message.MessageExt;
       6 import org.apache.spark.storage.StorageLevel;
       7 import org.apache.spark.streaming.receiver.Receiver;
       8 
       9 import java.io.ByteArrayInputStream;
      10 import java.io.ObjectInputStream;
      11 
      12 public class RocketMQReceiver extends Receiver<Order> {
      13     private final String namesrvAddr;
      14     private final String topic;
      15     private final String group;
      16 
      17     private transient DefaultMQPushConsumer consumer;
      18 
      19     public RocketMQReceiver(String namesrvAddr, String topic, String group) {
      20         // 使用支持 WAL 的存儲級別
      21         super(StorageLevel.MEMORY_AND_DISK_SER_2());
      22         this.namesrvAddr = namesrvAddr;
      23         this.topic = topic;
      24         this.group = group;
      25     }
      26 
      27     @Override
      28     public void onStart() {
      29         new Thread(this::initConsumer).start();
      30     }
      31 
      32     private void initConsumer() {
      33         try {
      34             consumer = new DefaultMQPushConsumer(group);
      35             consumer.setNamesrvAddr(namesrvAddr);
      36             consumer.subscribe(topic, "*");
      37 
      38             consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
      39                 for (MessageExt msg : msgs) {
      40                     Order order = deserialize(msg.getBody());
      41                     if (order != null) {
      42                         // Spark 會先寫 WAL,再寫 BlockManager
      43                         store(order);
      44                     }
      45                 }
      46                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      47             });
      48 
      49             consumer.start();
      50             System.out.println("RocketMQReceiver started with WAL.");
      51         } catch (Exception e) {
      52             restart("Error starting RocketMQReceiver", e);
      53         }
      54     }
      55 
      56     @Override
      57     public void onStop() {
      58         if (consumer != null) {
      59             consumer.shutdown();
      60         }
      61     }
      62 
      63     private Order deserialize(byte[] body) {
      64         try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body))) {
      65             return (Order) ois.readObject();
      66         } catch (Exception e) {
      67             return null;
      68         }
      69     }
      70 }

      3?? 主程序(開啟 WAL 需要 checkpoint)

       1 import org.apache.spark.SparkConf;
       2 import org.apache.spark.streaming.Durations;
       3 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
       4 import org.apache.spark.streaming.api.java.JavaStreamingContext;
       5 
       6 public class RocketMQStreamingApp {
       7     public static void main(String[] args) throws Exception {
       8         SparkConf conf = new SparkConf()
       9                 .setAppName("RocketMQReceiverWithWAL")
      10                 .setMaster("local[2]");
      11 
      12         // 每 5 秒一個 batch
      13         JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));
      14 
      15         // 設置 checkpoint 目錄(必須是可靠存儲,如 HDFS)
      16         ssc.checkpoint("hdfs://namenode:8020/spark-checkpoints/rocketmq");
      17 
      18         // 創建帶 WAL 的 Receiver
      19         JavaReceiverInputDStream<Order> stream =
      20                 ssc.receiverStream(new RocketMQReceiver("localhost:9876", "OrderTopic", "spark_group"));
      21 
      22         // 簡單處理:打印訂單
      23         stream.foreachRDD(rdd -> {
      24             rdd.foreach(order -> System.out.println("Got order (with WAL): " + order));
      25         });
      26 
      27         ssc.start();
      28         ssc.awaitTermination();
      29     }
      30 }

      注意事項

      1. checkpoint 必須是 HDFS/S3/OSS 等分布式存儲,本地路徑只適合測試。

      2. WAL 會寫日志文件,保證 至少一次(at-least-once) 語義,但仍可能有重復消息,需要業務端去重。

      3. Receiver 模式 + WAL 性能比 Direct 模式差(多一次磁盤 IO)。

      4. 若想要 exactly-once,通常推薦 Structured Streaming(自動 checkpoint + sink 支持事務)。

      ?? Receiver 模式特點

      ? 優點

      • 實現簡單:直接用 RocketMQ PushConsumer 推消息到 Spark。

      • 不需要手動管理 offset。

      ? 缺點

      • Spark Receiver 先把數據存到內存(BlockManager),如果 Spark 崩潰,數據可能丟失。

      • 容錯要依賴 WAL(Write Ahead Log),但 WAL 會寫 HDFS,性能比 Direct 模式差。

      • 難以保證嚴格 exactly-once

       

      方式 B:Direct 模式

      • 類似 Kafka Direct Stream,Spark Streaming 直接從 RocketMQ 拉取數據,不依賴 Spark Receiver。

      • 消息 offset 由用戶管理,可以手動提交(通常寫到 Zookeeper 或外部存儲)。

      • 優點:性能更好,保證數據至少一次處理。

      • 缺點:需要自己管理 offset 提交,開發復雜一些。

      而Direct 模式的特點是:

      • 不依賴 Spark Receiver(沒有 WAL 開銷)。

      • Spark Driver 直接從 RocketMQ 拉取消息。

      • 消費的 offset 由 Spark Driver 維護,通常要手動存儲到外部(比如 HDFS、MySQL、Zookeeper)以便恢復。

      實現思路

      1. 準備 RocketMQ Consumer API

        • Spark 沒有內置 RocketMQ Direct API(像 Kafka 那樣),需要借助 rocketmq-spark connector 或者自定義 Consumer。

        • 原理和 Kafka DirectStream 一樣:

          • 在每個 micro-batch 觸發時,去 RocketMQ 拉取一段消息(指定起始 offset、結束 offset)。

          • 轉換成 RDD,交給 Spark 執行。

      2. 關鍵點

        • 手動管理 offset:RocketMQ 不會自動幫 Spark 提交,需要你把 offset 存到外部存儲(比如 HDFS/ZK/MySQL)。

        • 并行度:可以按 Topic 的 Queue(partition 類似)拆分 RDD,分發到不同 task。

        • 容錯:作業失敗后,重新從存儲的 offset 位置恢復。

      ?? 代碼示例(Direct 模式偽實現)

      假設有 OrderTopic,包含多個 MessageQueue:

       1 package com.example;
       2 
       3 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
       4 import org.apache.rocketmq.client.consumer.PullResult;
       5 import org.apache.rocketmq.client.consumer.PullStatus;
       6 import org.apache.rocketmq.common.message.MessageExt;
       7 import org.apache.rocketmq.common.message.MessageQueue;
       8 import org.apache.spark.SparkConf;
       9 import org.apache.spark.api.java.JavaRDD;
      10 import org.apache.spark.api.java.JavaSparkContext;
      11 import org.apache.spark.streaming.Durations;
      12 import org.apache.spark.streaming.api.java.JavaInputDStream;
      13 import org.apache.spark.streaming.api.java.JavaStreamingContext;
      14 
      15 import java.util.*;
      16 
      17 public class RocketMQDirectStreamExample {
      18     public static void main(String[] args) throws Exception {
      19         SparkConf conf = new SparkConf()
      20                 .setAppName("RocketMQDirectStreamExample")
      21                 .setMaster("local[2]");
      22 
      23         JavaSparkContext sc = new JavaSparkContext(conf);
      24         JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(5));
      25 
      26         // RocketMQ Consumer
      27         DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("spark_consumer_group");
      28         consumer.setNamesrvAddr("localhost:9876");
      29         consumer.start();
      30 
      31         // 獲取 Topic 下所有 Queue
      32         Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("OrderTopic");
      33 
      34         // 手動維護每個 queue 的 offset
      35         Map<MessageQueue, Long> offsetTable = new HashMap<>();
      36         for (MessageQueue mq : mqs) {
      37             offsetTable.put(mq, 0L); // 可以從外部存儲恢復
      38         }
      39 
      40         // 每個 micro-batch 拉取數據
      41         ssc.foreachRDD(time -> {
      42             List<Order> orders = new ArrayList<>();
      43 
      44             for (MessageQueue mq : mqs) {
      45                 long offset = offsetTable.get(mq);
      46 
      47                 // 拉取消息
      48                 PullResult pullResult = consumer.pullBlockIfNotFound(mq, "*", offset, 32);
      49 
      50                 if (pullResult.getPullStatus() == PullStatus.FOUND) {
      51                     for (MessageExt msg : pullResult.getMsgFoundList()) {
      52                         // 反序列化消息體
      53                         Order order = deserialize(msg.getBody());
      54                         if (order != null) {
      55                             orders.add(order);
      56                         }
      57                     }
      58                     // 更新 offset
      59                     offsetTable.put(mq, pullResult.getNextBeginOffset());
      60                 }
      61             }
      62 
      63             // 轉換為 RDD
      64             JavaRDD<Order> rdd = sc.parallelize(orders);
      65             rdd.foreach(o -> System.out.println("Got Order: " + o));
      66 
      67             // TODO: 把 offsetTable 持久化到外部存儲(保證容錯)
      68         });
      69 
      70         ssc.start();
      71         ssc.awaitTermination();
      72     }
      73 
      74     private static Order deserialize(byte[] body) {
      75         try (java.io.ObjectInputStream ois =
      76                      new java.io.ObjectInputStream(new java.io.ByteArrayInputStream(body))) {
      77             return (Order) ois.readObject();
      78         } catch (Exception e) {
      79             return null;
      80         }
      81     }
      82 }

      關鍵點說明

      1. offset 管理

        • offsetTable 記錄每個 MessageQueue 的消費位置。

        • 每個 batch 消費后更新,并寫入外部存儲(比如 MySQL/HDFS)。

        • 程序重啟時先從外部恢復 offset。

      2. 并行度

        • 可以用 sc.parallelize(orders),但更高效的是:每個 MessageQueue 映射成一個 RDD 分片,利用 Spark 分布式并行消費。

      3. 語義保證

        • 默認 at-least-once,可能會有重復消費,需要結合業務去重。

        • 如果 offset 和結果同時事務寫入,可以做到 effectively-once。

      ?? 總結:

      • Receiver 模式 → 簡單但性能差。

      • Direct 模式 → 手動拉取 offset,性能高,但需要自己管理 offset。

      • Structured Streaming → 推薦的現代方案,自動 offset 管理,SQL API,更容易保證 exactly-once。


      ?? 2. 基于 Structured Streaming (DataFrame/Dataset API)

      Structured Streaming 是 Spark 2.x 之后推薦的流處理 API。RocketMQ Connector 也支持 Structured Streaming:

      方式 A:作為 Source

      直接通過:

      1 spark.readStream() .format("org.apache.rocketmq.spark") .option("namesrvAddr", "localhost:9876") .option("consumerGroup", "test_group") .option("topics", "OrderTopic") .load();

      得到一個 DataFrame,包含:

      • key

      • body

      • topic

      • tags

      • offset

      • timestamp 等字段。

      offset 自動 checkpoint,不需要手動提交。


      方式 B:作為 Sink

      Structured Streaming 也能把結果寫回 RocketMQ:

      1 df.writeStream() .format("org.apache.rocketmq.spark") .option("namesrvAddr", "localhost:9876") .option("producerGroup", "result_group") .option("topic", "ResultTopic") .start();

      這樣 Spark 的計算結果會被寫到另一個 RocketMQ topic。


      ?? 3. 自己實現 Consumer → Spark

      如果不想用官方 connector,也可以自己寫:

      1. 在 Spark 里啟動一個 RocketMQ Java Consumer

      2. 消費消息后,把數據寫到 Spark Streaming 的隊列(例如 queueStream)。

      3. Spark Streaming 從這個隊列里生成 DStream 進行計算。

      ?? 這種方式靈活,但 offset 管理和 exactly-once 語義都要自己處理,一般不推薦,除非你有特殊需求(比如自定義序列化/解碼)。


      ?? 總結

      • 老的 Spark Streaming (DStream)

        • Receiver 模式(簡單,但容錯差)。

        • Direct 模式(性能好,可手動提交 offset)。

      • Structured Streaming

        • 推薦方式,作為 RocketMQ Source(offset 自動管理,SQL API 簡潔)。

        • 可以寫回 RocketMQ。

      • 自研 Consumer + queueStream

        • 靈活,但 offset、容錯全靠自己。

       

      轉發請注明出處:http://www.rzrgm.cn/fnlingnzb-learner/p/19073518

      posted @ 2025-09-04 14:27  Boblim  閱讀(103)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 香蕉EEWW99国产精选免费| 视频一区二区三区刚刚碰| 亚洲区一区二区三区亚洲| 久久久久国产精品人妻| 国产综合视频一区二区三区| 日韩黄色av一区二区三区| 国产高清不卡视频| 亚洲精品动漫免费二区| 免费看欧美日韩一区二区三区| 国产小视频一区二区三区| 国语精品自产拍在线观看网站| 又大又紧又粉嫩18p少妇| 97欧美精品系列一区二区| 国产伦精品一区二区三区| 久久国内精品一区二区三区| 99久久精品国产一区二区蜜芽| 国产成人欧美一区二区三区| 亚洲中少妇久久中文字幕| 日本人一区二区在线观看| 婷婷六月色| 亚洲最大成人av在线天堂网| 粉嫩av蜜臀一区二区三区| 国产亚洲欧洲AⅤ综合一区| 99久久免费精品色老| 一区二区三区自拍偷拍视频 | 久久国内精品自在自线91| 国产成人高清亚洲综合| 色综合天天综合天天更新| 中文字幕不卡在线播放| 亚洲色大成网站WWW久久| 国产黄色带三级在线观看| V一区无码内射国产| 二区三区亚洲精品国产| 国产精品 无码专区| 野花在线观看免费观看高清| 久久99久国产麻精品66| 久久羞羞色院精品全部免费 | 国产精品一二区在线观看| 国产成人人综合亚洲欧美丁香花| 又大又粗又爽18禁免费看| 国产一区二区不卡91|