<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      spark streaming消費kafka數據寫入hdfs避免文件覆蓋方案(java版)

      1.寫在前面

      spark streaming+kafka對流式數據處理過程中,往往是spark streaming消費kafka的數據寫入hdfs中,再進行hive映射形成數倉,當然也可以利用sparkSQL直接寫入hive形成數倉。對于寫入hdfs中,如果是普通的rdd則API為saveAsTextFile(),如果是PairRDD則API為saveAsHadoopFile()。當然高版本的spark可能將這兩個合二為一。這兩種API在spark streaming中如果不自定義的話會導致新寫入的hdfs文件覆蓋歷史寫入的hdfs文件,下面來重現這個問題。

      2.saveAsTextFile()寫新寫入的hdfs文件覆蓋歷史寫入的hdfs文件測試代碼

      package com.surfilter.dp.timer.job;
      
      import kafka.message.MessageAndMetadata;
      import kafka.serializer.StringDecoder;
      import org.apache.spark.SparkConf;
      import org.apache.spark.api.java.JavaPairRDD;
      import org.apache.spark.api.java.JavaRDD;
      import org.apache.spark.api.java.JavaSparkContext;
      import org.apache.spark.api.java.function.FlatMapFunction;
      import org.apache.spark.api.java.function.Function;
      import org.apache.spark.api.java.function.PairFlatMapFunction;
      import org.apache.spark.api.java.function.VoidFunction;
      import org.apache.spark.streaming.Seconds;
      import org.apache.spark.streaming.api.java.JavaInputDStream;
      import org.apache.spark.streaming.api.java.JavaStreamingContext;
      import org.apache.spark.streaming.kafka.KafkaUtils;
      
      import java.text.SimpleDateFormat;
      import java.util.*;
      
      public class TestStreaming extends BaseParams {
          public static void main(String args[]) {
              String totalParameterString = null;
              if (null != args && args.length > 0) {
                  totalParameterString = args[0];
              }
              if (null != totalParameterString && !"".equals(totalParameterString)) {
                  ParameterParse parameterParse = new ParameterParse(totalParameterString);
                  SparkConf conf = new SparkConf().setAppName(parameterParse.getSpark_app_name());
                  setSparkConf(parameterParse, conf);
                  JavaSparkContext sparkContext = new JavaSparkContext(conf);
                  JavaStreamingContext streamingContext = new JavaStreamingContext(sparkContext, Seconds.apply(Long.parseLong(parameterParse.getSpark_streaming_duration())));
      
                  JavaInputDStream<String> dStream = KafkaUtils.createDirectStream(streamingContext, String.class, String.class,
                          StringDecoder.class, StringDecoder.class, String.class,
                          generatorKafkaParams(parameterParse), generatorTopicOffsets(parameterParse, "test_20200509"),
                          new Function<MessageAndMetadata<String, String>, String>() {
                              private static final long serialVersionUID = 1L;
      
                              @Override
                              public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
                                  return msgAndMd.message();
                              }
                          });
      
                  dStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
                      @Override
                      public void call(JavaRDD<String> rdd) throws Exception {
                          JavaRDD<String> saveHdfsRdd = rdd.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
                              @Override
                              public Iterable<String> call(Iterator<String> iterator) throws Exception {
                                  List<String> returnList = new ArrayList<>();
                                  while (iterator.hasNext()){
                                      String message = iterator.next().toString();
                                      returnList.add(message);
                                  }
                                  return returnList;
                              }
                          });
      
                          String dt = new SimpleDateFormat("yyyyMMdd").format(new Date());
                          String hour = new SimpleDateFormat("HH").format(new Date());
                          String savePath = "hdfs://gawh220:8020/user/hive/warehouse/rzx_standard.db/meijs_test/dt=" + dt + "/hour=" + hour + "/";
                          saveHdfsRdd.saveAsTextFile(savePath);
                      }
                  });
      
                  streamingContext.start();
                  streamingContext.awaitTermination();
                  streamingContext.close();
              }
          }
      }
      

      在yarn上執行spark streaming觀察,用命令行的方式往test_20200509的topic手動生產一段測試數據,發現spark streaming立馬檢測到并執行完成

      之后查看寫入的hdfs文件

      發現hdfs文件寫入正常,也是有數據的。之后不再繼續命令行生產數據,當sprak streaming新的一個批次記錄為0的任務開始執行并執行完成

      再觀察寫入的hdfs文件,發現文件依然有,但是文件的內容為空,這就證明了第一批有數據的被覆蓋掉了

      為什么被覆蓋?
      spark streaming是按照特定的配置時間去一批批的拉取kafka的數據,在寫入的時候也是按照分區的狀態寫入hdfs中的,比如下圖

      可以看出三個分區寫成三個文件,每一批寫入都是按照這種方式自動生成文件名并寫入文件中,所以會造成最新一批覆蓋之前的一批

      3.利用saveAsHadoopFile()自定義輸出文件格式避免覆蓋問題

      package com.surfilter.dp.timer.job;
      
      import com.surfilter.dp.timer.parse.BaseParams;
      import com.surfilter.dp.timer.parse.ParameterParse;
      import kafka.message.MessageAndMetadata;
      import kafka.serializer.StringDecoder;
      import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
      import org.apache.spark.SparkConf;
      import org.apache.spark.api.java.JavaPairRDD;
      import org.apache.spark.api.java.JavaRDD;
      import org.apache.spark.api.java.JavaSparkContext;
      import org.apache.spark.api.java.function.Function;
      import org.apache.spark.api.java.function.PairFlatMapFunction;
      import org.apache.spark.api.java.function.VoidFunction;
      import org.apache.spark.sql.hive.HiveContext;
      import org.apache.spark.streaming.Seconds;
      import org.apache.spark.streaming.api.java.JavaInputDStream;
      import org.apache.spark.streaming.api.java.JavaStreamingContext;
      import org.apache.spark.streaming.kafka.KafkaUtils;
      import scala.Tuple2;
      
      import java.text.SimpleDateFormat;
      import java.util.ArrayList;
      import java.util.Date;
      import java.util.Iterator;
      import java.util.List;
      
      public class TestStreaming extends BaseParams {
          public static void main(String args[]) {
              String totalParameterString = null;
              if (null != args && args.length > 0) {
                  totalParameterString = args[0];
              }
              if (null != totalParameterString && !"".equals(totalParameterString)) {
                  ParameterParse parameterParse = new ParameterParse(totalParameterString);
                  SparkConf conf = new SparkConf().setAppName(parameterParse.getSpark_app_name());
                  setSparkConf(parameterParse, conf);
                  JavaSparkContext sparkContext = new JavaSparkContext(conf);
                  JavaStreamingContext streamingContext = new JavaStreamingContext(sparkContext, Seconds.apply(Long.parseLong(parameterParse.getSpark_streaming_duration())));
      
                  JavaInputDStream<String> dStream = KafkaUtils.createDirectStream(streamingContext, String.class, String.class,
                          StringDecoder.class, StringDecoder.class, String.class,
                          generatorKafkaParams(parameterParse), generatorTopicOffsets(parameterParse, "test_20200509"),
                          new Function<MessageAndMetadata<String, String>, String>() {
                              private static final long serialVersionUID = 1L;
      
                              @Override
                              public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
                                  return msgAndMd.message();
                              }
                          });
      
      
                  dStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
                      @Override
                      public void call(JavaRDD<String> rdd) {
                          JavaPairRDD<String, String> pairRDD = rdd.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() {
                              @Override
                              public Iterable<Tuple2<String, String>> call(Iterator<String> iterator) {
                                  List<Tuple2<String, String>> returnTuple = new ArrayList<>();
                                  while (iterator.hasNext()) {
                                      String message = iterator.next().toString();
                                      returnTuple.add(new Tuple2<>(message, ""));
                                  }
                                  return returnTuple;
                              }
                          });
      
                          String dt = new SimpleDateFormat("yyyyMMdd").format(new Date());
                          String hour = new SimpleDateFormat("HH").format(new Date());
                          String savePath = "hdfs://gawh220:8020/user/hive/warehouse/rzx_standard.db/meijs_test/dt=" + dt + "/hour=" + hour + "/";
      
                          pairRDD.saveAsHadoopFile(savePath, String.class, String.class, RDDMultipleTextOutputFormat.class);
                      }
                  });
      
                  streamingContext.start();
                  streamingContext.awaitTermination();
                  streamingContext.close();
              }
          }
      }
      
      class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat {
          private static String system_time = System.currentTimeMillis() + "";
      
          @Override
          protected String generateFileNameForKeyValue(Object key, Object value, String name) {
              name = system_time + "-" + name;
              return super.generateFileNameForKeyValue(key, value, name);
          }
      }
      

      用命令行的方式往test_20200509的topic手動生產一段測試數據,發現spark streaming立馬檢測到并執行完成

      之后查看寫入的hdfs文件

      發現hdfs文件寫入正常,也是有數據的。之后不再繼續命令行生產數據,當sprak streaming新的一個批次記錄為0的任務開始執行并執行完成

      再觀察寫入的hdfs文件,發現并沒有產生新的hdfs文件

      再命令行的方式往test_20200509的topic手動生產一段測試數據,發現spark streaming立馬檢測到并執行完成

      之后查看寫入的hdfs文件,發現新寫入的hdfs文件是追加到之前的文件的方式并且有數據的,如果之前的文件大小超過hdfs設定的大小,則會追加新的文件方式

      說明:這種方式不但可以避免覆蓋問題,而且可以避免大量小文件

      posted @ 2020-05-09 17:22  技術即藝術  閱讀(3513)  評論(2)    收藏  舉報
      主站蜘蛛池模板: 国产成人精品a视频一区| 五月综合婷婷开心综合婷婷| 欧美亚洲人成网站在线观看| 国产精品∧v在线观看| 日韩有码av中文字幕| 国产av成人精品播放| 欧美z0zo人禽交另类视频| 国产伦精品一区二区亚洲| 人妻丰满熟妇AV无码区乱| 99久久er热在这里只有精品99| 久久久久青草线蕉综合超碰| 樱花草视频www日本韩国| 免费网站看sm调教视频 | 亚洲成av人在线播放无码| 国产综合视频精品一区二区| 国产精品亚洲а∨天堂2021| 欧美高清狂热视频60一70| 成人性无码专区免费视频| 亚洲成av一区二区三区| 久久毛片少妇高潮| 少妇精品无码一区二区免费视频| 97久久精品人人澡人人爽| 精品国产迷系列在线观看| 国产成人免费午夜在线观看| 国产97视频人人做人人爱| 长白| 国产精品中出一区二区三区| 中文字幕av无码免费一区| 漂亮人妻被中出中文字幕| 变态另类视频一区二区三区| 久久精品国产中文字幕| 女人下边被添全过视频的网址| 亚洲综合国产成人丁香五| 天天做天天爱夜夜爽导航| 日韩一级伦理片一区二区| 国产人妻人伦精品婷婷| 啪啪av一区二区三区| 亚洲国产片一区二区三区| 国产精品入口麻豆| 亚洲v欧美v日韩v国产v| 东方av四虎在线观看|