<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      spark streaming(非structured steaming)消費(fèi)rocketMQ,并手動提交

      一、背景

      最近有一個需求是:要求有一個類對象為Order,它有string類型的字段orderNo和Long類型的字段cost,生產(chǎn)者寫到rocketMQ的value是Order對象序列化后的字節(jié)數(shù)組、key值是orderNo字段,要求spark以手動提交的方式消費(fèi)rocketMQ,并將數(shù)據(jù)依次寫入到hive表中,并且spark中有一個5分鐘滑動窗口,滑動步長為1分鐘,統(tǒng)計(jì)5分鐘內(nèi)的cost總值并輸出。

       

      二、實(shí)戰(zhàn)演練

      RocketMQ 官方并沒有像 Kafka 那樣的「Spark DirectStream 一鍵式手動提交 offset」通用方案;因此我用的是一種常見的、工程可控的做法來實(shí)現(xiàn)“手動提交 offset”語義:

      • 使用 Spark Streaming (DStream)(不是 Structured Streaming,因?yàn)槟忝鞔_要手動提交 offset;Structured Streaming 的 offset 管理是自動的,不能手動提交)。

      • 我實(shí)現(xiàn)了一個 自定義 Receiver(RocketMQReceiver),該 Receiver 在每個 batch 周期從 RocketMQ 拉取消息(Pull 模式),并把 Order 對象推入 DStream。

      • 手動提交 offset 的實(shí)現(xiàn):Receiver 不把 offset 自動永久化到 Broker,而是把“當(dāng)前讀取到的 offset”存放在一個外部可共享位置(示例使用 HDFS/local 文件作為簡單的 offset store)。Driver 在確認(rèn)該 batch 的 RDD 已成功寫入 Hive 后,會把“該 batch 已處理的 offsets”寫回到同一 offset store(即手動提交)。下一次 Receiver 啟動 / 拉取時,會從 offset store 讀取這些已提交的 offsets 繼續(xù)拉取。

        這種方案與 Kafka 的 commitAsync 思路等價(jià):只有當(dāng)你確認(rèn)完成寫入(輸出)后,才把 offset 持久化(提交)。該方法是通用的、工程上常用的“外部 offset 管理”策略,能滿足“手動提交”需求。

      下面包含:pom.xmlOrder.java、RocketMQ 生產(chǎn)者 OrderProducer.java、Hive 建表 SQL、Spark 程序 SparkRocketMQToHive.java(包含自定義 Receiver、window 邏輯與手動 offset 提交),以及運(yùn)行說明與注意事項(xiàng)

      1) pom.xml(主要依賴)

      <project xmlns="http://maven.apache.org/POM/4.0.0" >
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.example</groupId>
        <artifactId>spark-rocketmq-hive-demo</artifactId>
        <version>1.0-SNAPSHOT</version>
        <properties>
          <java.version>1.8</java.version>
          <scala.version>2.12</scala.version>
          <spark.version>3.3.2</spark.version>
          <rocketmq.version>4.9.4</rocketmq.version>
        </properties>
        <dependencies>
          <!-- Spark -->
          <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
          </dependency>
      
          <!-- RocketMQ client (producer & pull consumer) -->
          <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>${rocketmq.version}</version>
          </dependency>
      
          <!-- Logging -->
          <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.36</version>
          </dependency>
          <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.36</version>
          </dependency>
        </dependencies>
        <build>
          <plugins>
            <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.8.1</version>
              <configuration>
                <source>1.8</source>
                <target>1.8</target>
              </configuration>
            </plugin>
          </plugins>
        </build>
      </project>

      2) Hive 建表 SQL

      在 Hive CLI 或 Hue 執(zhí)行:

      CREATE DATABASE IF NOT EXISTS demo_db;
      
      -- 原始訂單表,包含事件時間 ts(毫秒)
      CREATE TABLE IF NOT EXISTS demo_db.orders (
        orderNo STRING,
        cost BIGINT,
        ts BIGINT
      )
      STORED AS PARQUET;
      
      -- 窗口聚合表
      CREATE TABLE IF NOT EXISTS demo_db.order_cost_window_agg (
        window_start TIMESTAMP,
        window_end TIMESTAMP,
        total_cost BIGINT
      )
      STORED AS PARQUET;

      3) Order 類(Order.java

       1 package com.example;
       2 
       3 import java.io.Serializable;
       4 
       5 public class Order implements Serializable {
       6     private static final long serialVersionUID = 1L;
       7 
       8     private String orderNo;
       9     private Long cost;
      10     private Long ts; // 事件時間,毫秒
      11 
      12     public Order() {}
      13 
      14     public Order(String orderNo, Long cost, Long ts) {
      15         this.orderNo = orderNo;
      16         this.cost = cost;
      17         this.ts = ts;
      18     }
      19 
      20     public String getOrderNo() { return orderNo; }
      21     public void setOrderNo(String orderNo) { this.orderNo = orderNo; }
      22 
      23     public Long getCost() { return cost; }
      24     public void setCost(Long cost) { this.cost = cost; }
      25 
      26     public Long getTs() { return ts; }
      27     public void setTs(Long ts) { this.ts = ts; }
      28 
      29     @Override
      30     public String toString() {
      31         return "Order{" + "orderNo='" + orderNo + '\'' + ", cost=" + cost + ", ts=" + ts + '}';
      32     }
      33 }

      序列化方案:示例使用 Java 原生序列化(ObjectOutputStream → byte[])。生產(chǎn)環(huán)境建議改為 JSON/Avro/ProtoBuf。

      4) RocketMQ 生產(chǎn)者示例(OrderProducer.java

       1 package com.example;
       2 
       3 import org.apache.rocketmq.client.producer.DefaultMQProducer;
       4 import org.apache.rocketmq.common.message.Message;
       5 
       6 import java.io.ByteArrayOutputStream;
       7 import java.io.ObjectOutputStream;
       8 import java.util.UUID;
       9 
      10 public class OrderProducer {
      11     public static byte[] serialize(Order order) throws Exception {
      12         ByteArrayOutputStream bos = new ByteArrayOutputStream();
      13         ObjectOutputStream oos = new ObjectOutputStream(bos);
      14         oos.writeObject(order);
      15         oos.flush();
      16         return bos.toByteArray();
      17     }
      18 
      19     public static void main(String[] args) throws Exception {
      20         String namesrvAddr = "localhost:9876"; // 修改為你的nameserver地址
      21         String topic = "orders-topic";
      22 
      23         DefaultMQProducer producer = new DefaultMQProducer("order-producer-group");
      24         producer.setNamesrvAddr(namesrvAddr);
      25         producer.start();
      26 
      27         for (int i = 0; i < 50; i++) {
      28             String orderNo = "ORD-" + UUID.randomUUID().toString().substring(0, 8);
      29             long cost = (long)(Math.random() * 1000);
      30             long ts = System.currentTimeMillis();
      31 
      32             Order order = new Order(orderNo, cost, ts);
      33             byte[] body = serialize(order);
      34 
      35             Message msg = new Message(topic, "tagA", orderNo, body);
      36             producer.send(msg); // 同步發(fā)送
      37             System.out.println("Sent: " + order);
      38             Thread.sleep(200);
      39         }
      40 
      41         producer.shutdown();
      42     }
      43 }

      5) Spark 程序(完整):SparkRocketMQToHive.java

      主要思路:

      • 自定義 RocketMQReceiver extends Receiver<Order>:在 onStart 啟動一個拉取線程,周期性從 RocketMQ 的每個 MessageQueue 拉取消息(從我們持久化的 offset store 開始),并 store(order) 推送給 Spark Streaming。

      • offset store:示例用一個簡單的文件(可以是 HDFS 路徑)保存 JSON 格式的已提交 offsets(per-queue)。

      • Driver 端的 foreachRDD 會把 RDD 寫入 Hive(append),并且在成功寫入后把該 batch 的最大 offsets 寫入 offset store(即手動提交)。

      • 窗口聚合:用 DStream 的 window(Durations.minutes(5), Durations.minutes(1)),在每個窗口上做 map(Order->("all", cost)) reduce,最后寫入聚合表。

      注意:代碼做了較多注釋,請按注釋中的 TODO 修改 RocketMQ namesrv、topic、以及 offsetStorePath(HDFS 路徑建議用 hdfs://...)。

        1 package com.example;
        2 
        3 import org.apache.hadoop.fs.*;
        4 import org.apache.hadoop.conf.Configuration;
        5 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
        6 import org.apache.rocketmq.client.consumer.PullResult;
        7 import org.apache.rocketmq.client.consumer.PullStatus;
        8 import org.apache.rocketmq.common.message.MessageExt;
        9 import org.apache.rocketmq.common.message.MessageQueue;
       10 import org.apache.spark.SparkConf;
       11 import org.apache.spark.api.java.*;
       12 import org.apache.spark.streaming.*;
       13 import org.apache.spark.streaming.api.java.*;
       14 import org.apache.spark.streaming.receiver.Receiver;
       15 import org.apache.spark.storage.StorageLevel;
       16 import org.apache.spark.sql.*;
       17 
       18 import java.io.*;
       19 import java.util.*;
       20 import java.util.concurrent.ConcurrentHashMap;
       21 import java.util.stream.Collectors;
       22 
       23 import scala.Tuple2;
       24 
       25 public class SparkRocketMQToHive {
       26 
       27     // -------------------------
       28     // OffsetStore: 通過文件(HDFS/local)保存已提交 offsets(簡單實(shí)現(xiàn))
       29     // 存儲格式(簡單 JSON-like):
       30     // queueId|brokerName|queueOffset\n
       31     // -------------------------
       32     public static class OffsetStore {
       33         private final String path; // e.g., hdfs://namenode:8020/user/offsets/orders_offsets.txt or file:///tmp/...
       34         private final Configuration hadoopConf = new Configuration();
       35 
       36         public OffsetStore(String path) {
       37             this.path = path;
       38         }
       39 
       40         // 讀取已提交 offsets -> Map<MessageQueue, Long>
       41         public Map<MessageQueue, Long> readOffsets() throws IOException {
       42             Map<MessageQueue, Long> map = new HashMap<>();
       43             Path p = new Path(path);
       44             FileSystem fs = p.getFileSystem(hadoopConf);
       45             if (!fs.exists(p)) return map;
       46 
       47             try (FSDataInputStream in = fs.open(p);
       48                  BufferedReader br = new BufferedReader(new InputStreamReader(in))) {
       49                 String line;
       50                 while ((line = br.readLine()) != null) {
       51                     // 格式: brokerName|queueId|offset
       52                     String[] parts = line.trim().split("\\|");
       53                     if (parts.length == 3) {
       54                         String broker = parts[0];
       55                         int queueId = Integer.parseInt(parts[1]);
       56                         long offset = Long.parseLong(parts[2]);
       57                         // MessageQueue requires topic too — we will reconstruct with topic externally
       58                         // To simplify, we return map keyed by "broker:queueId" in caller
       59                         MessageQueue mq = new MessageQueue(); // placeholder, set fields later
       60                         mq.setBrokerName(broker);
       61                         mq.setQueueId(queueId);
       62                         // cannot set topic here; caller will match based on broker+queueId.
       63                         map.put(mq, offset);
       64                     }
       65                 }
       66             } catch (Exception e) {
       67                 throw new IOException("readOffsets failed", e);
       68             }
       69             return map;
       70         }
       71 
       72         // 簡化寫入:由 caller 提供 list of entries (broker|queueId|offset)
       73         public void writeOffsets(List<String> lines) throws IOException {
       74             Path p = new Path(path);
       75             FileSystem fs = p.getFileSystem(hadoopConf);
       76             // 覆蓋寫入
       77             try (FSDataOutputStream out = fs.create(p, true);
       78                  BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out))) {
       79                 for (String line : lines) {
       80                     bw.write(line);
       81                     bw.newLine();
       82                 }
       83             }
       84         }
       85     }
       86 
       87     // -------------------------
       88     // RocketMQReceiver:自定義 Receiver,從 RocketMQ pull 消息并 store()
       89     // -------------------------
       90     public static class RocketMQReceiver extends Receiver<Order> {
       91         private volatile boolean stopped = false;
       92         private final String namesrvAddr;
       93         private final String topic;
       94         private final String consumerGroup;
       95         private final String offsetStorePath; // offset store path
       96         private transient DefaultMQPullConsumer pullConsumer;
       97 
       98         // local map to hold the latest pulled offsets per MessageQueue
       99         private final Map<MessageQueue, Long> latestPulledOffsets = new ConcurrentHashMap<>();
      100 
      101         public RocketMQReceiver(String namesrvAddr, String topic, String consumerGroup, String offsetStorePath) {
      102             super(StorageLevel.MEMORY_AND_DISK_2());
      103             this.namesrvAddr = namesrvAddr;
      104             this.topic = topic;
      105             this.consumerGroup = consumerGroup;
      106             this.offsetStorePath = offsetStorePath;
      107         }
      108 
      109         @Override
      110         public void onStart() {
      111             stopped = false;
      112             // start pulling thread
      113             new Thread(this::pullLoop, "rocketmq-pull-thread").start();
      114         }
      115 
      116         @Override
      117         public void onStop() {
      118             stopped = true;
      119             if (pullConsumer != null) {
      120                 try {
      121                     pullConsumer.shutdown();
      122                 } catch (Exception ignored) {
      123                 }
      124             }
      125         }
      126 
      127         private void pullLoop() {
      128             try {
      129                 pullConsumer = new DefaultMQPullConsumer(consumerGroup + "_pull");
      130                 pullConsumer.setNamesrvAddr(namesrvAddr);
      131                 pullConsumer.start();
      132 
      133                 // 獲取所有 message queues for topic
      134                 Set<MessageQueue> mqs = pullConsumer.fetchSubscribeMessageQueues(topic);
      135 
      136                 // 讀取已提交 offsets(從 offset store)
      137                 Map<MessageQueue, Long> committed = readOffsetStore();
      138 
      139                 while (!stopped) {
      140                     for (MessageQueue mq : mqs) {
      141                         long offset = 0L;
      142                         // find committed offset matching same broker+queueId
      143                         Optional<Map.Entry<MessageQueue, Long>> found = committed.entrySet().stream()
      144                                 .filter(e -> e.getKey().getBrokerName().equals(mq.getBrokerName())
      145                                         && e.getKey().getQueueId() == mq.getQueueId())
      146                                 .findFirst();
      147                         if (found.isPresent()) offset = found.get().getValue();
      148                         // Pull
      149                         try {
      150                             PullResult pullResult = pullConsumer.pullBlockIfNotFound(mq, null, offset, 32);
      151                             if (pullResult != null) {
      152                                 if (pullResult.getPullStatus() == PullStatus.FOUND) {
      153                                     List<MessageExt> msgs = pullResult.getMsgFoundList();
      154                                     if (msgs != null) {
      155                                         for (MessageExt me : msgs) {
      156                                             try {
      157                                                 byte[] body = me.getBody();
      158                                                 Order o = deserialize(body);
      159                                                 if (o != null) {
      160                                                     store(o); // 推入 DStream
      161                                                 }
      162                                             } catch (Exception e) {
      163                                                 // log and continue
      164                                                 e.printStackTrace();
      165                                             }
      166                                         }
      167                                     }
      168                                 }
      169                                 // 更新本地 latestPulledOffsets:下次從 pullResult.getNextBeginOffset() 拉
      170                                 long nextOffset = pullResult.getNextBeginOffset();
      171                                 latestPulledOffsets.put(mq, nextOffset);
      172                             }
      173                         } catch (Exception ex) {
      174                             ex.printStackTrace();
      175                         }
      176                         if (stopped) break;
      177                     }
      178                     // 拉完一輪 messageQueues 之后,短睡眠一下,等待下一 batch 拉取
      179                     Thread.sleep(500);
      180                 }
      181             } catch (Exception e) {
      182                 e.printStackTrace();
      183                 restart("rocketmq pull failed", e);
      184             }
      185         }
      186 
      187         // 從 offset store 讀取已提交 offsets(簡化)
      188         private Map<MessageQueue, Long> readOffsetStore() {
      189             OffsetStore store = new OffsetStore(offsetStorePath);
      190             try {
      191                 return store.readOffsets();
      192             } catch (Exception e) {
      193                 // log and return empty
      194                 e.printStackTrace();
      195                 return new HashMap<>();
      196             }
      197         }
      198 
      199         // 反序列化 Order(Java 原生序列化)
      200         private Order deserialize(byte[] bytes) {
      201             if (bytes == null) return null;
      202             try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
      203                  ObjectInputStream ois = new ObjectInputStream(bis)) {
      204                 return (Order) ois.readObject();
      205             } catch (Exception e) {
      206                 e.printStackTrace();
      207                 return null;
      208             }
      209         }
      210 
      211         // 提供外部訪問最新 pulled offsets 的方法(driver 可以讀取并在處理完寫入后提交)
      212         public Map<MessageQueue, Long> getLatestPulledOffsets() {
      213             return new HashMap<>(latestPulledOffsets);
      214         }
      215     }
      216 
      217     // -------------------------
      218     // 主程序:Spark Streaming 作業(yè)
      219     // -------------------------
      220     public static void main(String[] args) throws Exception {
      221         // config
      222         String namesrv = "localhost:9876"; // TODO 修改
      223         String topic = "orders-topic";
      224         String consumerGroup = "spark-rocketmq-group";
      225         String offsetStorePath = "file:///tmp/rocketmq_offsets.txt"; // TODO 改為 HDFS 路徑:hdfs://namenode:8020/user/offsets/orders_offsets.txt
      226 
      227         SparkConf conf = new SparkConf()
      228                 .setAppName("SparkRocketMQToHive")
      229                 .setIfMissing("spark.master", "local[2]")
      230                 .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
      231 
      232         // batch interval = 1 minute (符合滑動步長)
      233         JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.minutes(1));
      234         // checkpoint dir for Spark Streaming state (if needed)
      235         jssc.checkpoint("/tmp/spark-streaming-checkpoint");
      236 
      237         // 創(chuàng)建并注冊 Receiver
      238         RocketMQReceiver receiver = new RocketMQReceiver(namesrv, topic, consumerGroup, offsetStorePath);
      239         JavaReceiverInputDStream<Order> stream = jssc.receiverStream(receiver);
      240 
      241         // 將原始 order 寫入 Hive(每個 micro-batch 的 RDD)
      242         stream.foreachRDD((JavaRDD<Order> rdd, Time time) -> {
      243             if (rdd.isEmpty()) {
      244                 System.out.println("No data in this batch: " + time);
      245                 return;
      246             }
      247 
      248             // 創(chuàng)建 SparkSession(在 executor 上創(chuàng)建會出問題,因此在 driver 上操作)
      249             SparkSession spark = SparkSession.builder()
      250                     .config(conf)
      251                     .enableHiveSupport()
      252                     .getOrCreate();
      253 
      254             Dataset<Row> df = spark.createDataFrame(rdd, Order.class);
      255             // 寫入 orders 表(append)
      256             df.write().mode(SaveMode.Append).insertInto("demo_db.orders");
      257 
      258             System.out.println("Wrote batch to Hive: " + time + ", count=" + rdd.count());
      259 
      260             // 手動提交 offset:從 receiver 獲取 latestPulledOffsets,然后把它持久化到 offsetStorePath(覆蓋寫)
      261             Map<MessageQueue, Long> latestOffsets = receiver.getLatestPulledOffsets();
      262             List<String> lines = latestOffsets.entrySet().stream()
      263                     .map(e -> e.getKey().getBrokerName() + "|" + e.getKey().getQueueId() + "|" + e.getValue())
      264                     .collect(Collectors.toList());
      265             OffsetStore store = new OffsetStore(offsetStorePath);
      266             store.writeOffsets(lines);
      267             System.out.println("Committed offsets for this batch: " + lines);
      268         });
      269 
      270         // 窗口聚合:5 分鐘窗口,1 分鐘滑動,統(tǒng)計(jì) cost 總和
      271         JavaPairDStream<String, Long> kv = stream.mapToPair(order -> new Tuple2<>("all", order.getCost() == null ? 0L : order.getCost()));
      272         JavaPairDStream<String, Long> windowed = kv.reduceByKeyAndWindow((a, b) -> a + b,
      273                 Durations.minutes(5), Durations.minutes(1));
      274 
      275         // 每個窗口寫入聚合表
      276         windowed.foreachRDD((rdd, time) -> {
      277             if (rdd.isEmpty()) return;
      278             SparkSession spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate();
      279 
      280             final long windowEndMs = time.milliseconds();
      281             final long windowStartMs = windowEndMs - 5 * 60 * 1000 + 1;
      282 
      283             List<Row> rows = rdd.map(tuple -> RowFactory.create(new java.sql.Timestamp(windowStartMs),
      284                     new java.sql.Timestamp(windowEndMs), tuple._2())).collect();
      285 
      286             StructType schema = new StructType(new StructField[]{
      287                     new StructField("window_start", DataTypes.TimestampType, false, Metadata.empty()),
      288                     new StructField("window_end", DataTypes.TimestampType, false, Metadata.empty()),
      289                     new StructField("total_cost", DataTypes.LongType, false, Metadata.empty())
      290             });
      291 
      292             Dataset<Row> df = spark.createDataFrame(rows, schema);
      293             df.write().mode(SaveMode.Append).insertInto("demo_db.order_cost_window_agg");
      294             System.out.println("Wrote window agg: start=" + new java.sql.Timestamp(windowStartMs) + " end=" + new java.sql.Timestamp(windowEndMs));
      295         });
      296 
      297         // 啟動
      298         jssc.start();
      299         jssc.awaitTermination();
      300     }
      301 }

      6) 運(yùn)行步驟(本地測試)

      1. 啟動 RocketMQ (nameserver + broker)。確保 namesrvAddr 在 producer/consumer/receiver 中一致。

      2. 在 Hive 中執(zhí)行建表 SQL。

      3. 用 Maven 打包:

        mvn clean package -DskipTests
      4. 先運(yùn)行 OrderProducer 發(fā)送一些消息:

        java -cp target/spark-rocketmq-hive-demo-1.0-SNAPSHOT.jar com.example.OrderProducer
      5. 使用 spark-submit 提交 Spark Streaming 作業(yè)(注意把 rocketmq-client 的 jar 加到 classpath,或者使用 --packages/--jars):

        $SPARK_HOME/bin/spark-submit \ --class com.example.SparkRocketMQToHive \ --master local[2] \ --jars path/to/rocketmq-client-4.9.4.jar \ target/spark-rocketmq-hive-demo-1.0-SNAPSHOT.jar

        或把 rocketmq-client 打包進(jìn) fat-jar。

      6. 觀察 Hive 表 demo_db.ordersdemo_db.order_cost_window_agg 是否有數(shù)據(jù)。

      7) 設(shè)計(jì)說明與注意事項(xiàng)(必須閱讀)

      1. 為什么用 Receiver + 外部 offset store?

        • RocketMQ 與 Kafka 在生態(tài)、客戶端 API 上不同,且沒有像 Kafka DirectStream 那樣的“offsetRanges 可在 driver 中獲取并 commitAsync”現(xiàn)成模式;用 Receiver + 自己維護(hù) offset store(HDFS/file/ZK)可以實(shí)現(xiàn)“在確認(rèn)寫入外部 sink 后再提交 offset”這一語義。

      2. offset 的一致性

        • 該實(shí)現(xiàn)保證:只有在 Spark 確認(rèn)該 batch 數(shù)據(jù)已寫入 Hive 后,才把最新 pulled offsets 持久化(提交)。若寫 Hive 失敗,則不會提交 offsets,下次作業(yè)重啟或 receiver 重新讀取時會從上次提交點(diǎn)重新讀取(冪等問題需用戶在寫入時處理,比如去重或使用唯一 key 覆蓋寫入策略)。

      3. OffsetStore 的實(shí)現(xiàn)示例簡單

        • 示例使用了一個非常簡單的文件格式。生產(chǎn)環(huán)境建議使用 Zookeeper、HBase、MySQL 或 RocketMQ 的 broker commit 接口(如果可用)來存放 offsets,需要考慮并發(fā)/權(quán)限/原子寫入等問題。使用 HDFS 寫入需確保 driver 與 receiver 都能訪問同一路徑。

      4. 性能 & 并發(fā)

        • 示例中 receiver 一次拉取固定數(shù)量(32 條)并逐條 store()。你可以根據(jù)需要調(diào)整并行度(多個 receiver 實(shí)例、每 queue 的并發(fā)拉取、批量 store 等)。另外,window 聚合會把數(shù)據(jù)在 driver 上合并寫入 Hive(請按集群資源調(diào)優(yōu)并行度、分區(qū)數(shù)等)。

      5. 序列化方案

        • 示例使用 Java native serialization(ObjectOutputStream)。生產(chǎn)環(huán)境應(yīng)優(yōu)先使用 JSON / Avro / Protobuf 以便于兼容與演進(jìn)。

      6. 冪等性

        • 如果你的 Hive 寫入不是冪等的(例如重復(fù)寫會產(chǎn)生重復(fù)記錄),建議在訂單表中使用 orderNo 做主鍵去重或在寫入前做去重。因?yàn)槿魧懯『笾卦嚕瑫a(chǎn)生重復(fù)消費(fèi)。

      7. 時間與窗口

        • 示例窗口使用 processing-time(因?yàn)?DStream 沒有像 Structured Streaming 那樣方便的事件時間 watermark)。如果你需要基于 ts(Order 內(nèi)的事件時間)做更準(zhǔn)確的事件時間窗口,并處理亂序數(shù)據(jù),建議使用 Structured Streaming(但 Structured Streaming 不允許手動提交 offset)。這兩者之間是權(quán)衡:事件時間 + watermark(Structured Streaming)VS 可控手動提交 offset(本示例用 DStream + 自管理 offset)。

      8) 下步可選改進(jìn)

      • 把 offsetStore 從文件改成 ZooKeeper 實(shí)現(xiàn)(更可靠,適合分布式部署)。

      • 把序列化改成 JSON(Jackson):更易于調(diào)試和兼容。

      • 把 Receiver 的拉取邏輯改成批量化、高吞吐版本(同時按 partition 拉取并做并行 store)。

      • 如果你想改用 Structured Streaming(事件時間 + watermark 優(yōu)勢),我可以給出無法手動提交 offset 的替代方案:例如把 Hive 寫入做冪等化(基于 orderNo)從而允許使用 Structured Streaming 的自動 offset 管理與 exactly-once 語義。

       

      轉(zhuǎn)載請注明出處:http://www.rzrgm.cn/fnlingnzb-learner/p/19073689

      posted @ 2025-09-04 15:28  Boblim  閱讀(19)  評論(0)    收藏  舉報(bào)
      主站蜘蛛池模板: 天天做天天爱夜夜爽导航| 国产精一品亚洲二区在线播放 | 国产午夜福利小视频在线| 亚洲熟妇自偷自拍另亚洲| 亚洲欧美不卡高清在线| 日韩中文字幕高清有码| 亚洲男人的天堂久久香蕉| 亚洲国产日韩一区三区| 熟妇人妻系列aⅴ无码专区友真希| 中年国产丰满熟女乱子正在播放| 国产精品高清视亚洲精品| 久久五十路丰满熟女中出| 性猛交ⅹxxx富婆视频| 久久精品国产大片免费观看| 亚洲一区二区三区在线观看精品中文 | 国产一区二区三区禁18| 久久精品国产亚洲av亚| 办公室强奷漂亮少妇视频| 不卡国产一区二区三区| 国产一二三五区不在卡| 亚洲精品无码久久千人斩| 美女扒开奶罩露出奶头视频网站| 亚洲中文字幕人妻系列| 日韩精品亚洲专区在线观看| 日本欧美大码a在线观看| 日韩毛片在线视频x| 你懂的视频在线一区二区| 国产亚洲精品久久久久久大师 | 色欲狠狠躁天天躁无码中文字幕 | 午夜福利国产精品视频| 国产精品中文字幕在线| 日本韩国一区二区精品| 亚洲AV无码精品色午夜果冻| 汾阳市| 乱码中文字幕| AV在线亚洲欧洲日产一区二区| 鄂伦春自治旗| 四虎成人在线观看免费| 欧美亚洲国产日韩电影在线| 亚洲精品一区二区妖精| 欧美裸体xxxx极品|