SparkCore根據key寫往不同目錄下
在做統計計算的時候,根據條件將不同數據分組存放,可以方便后續取數、分析。
Flink中有分流算子,可以將這一批處理后的數據,分成不同的流數據,Spark雖然沒有這種算子,但是有類似的操作。
- 根據key值,將數據寫到不同目錄下
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
public class XXXX
{
public static void main(String[] args) throws IOException {
/*
.....
*/
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
SparkSession sparkSession = new SparkSession.Builder().config(conf).getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());
jsc.setLogLevel("WARN");
jsc.textFile(input)
/*
...
*/
.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<String, Result>>, String, String>() { // 主要是要將數據轉換成Pair格式,其他算子也能實現
@Override
public Iterator<Tuple2<String, String>> call(Iterator<Tuple2<String, Result>> tuple2Iterator) throws Exception {
return new Iterator<Tuple2<String, String>>() {
@Override
public boolean hasNext() {
return tuple2Iterator.hasNext();
}
@Override
public Tuple2<String, String> next() {
Tuple2<String, Result> next = tuple2Iterator.next();
int key = -1;
String value = "";
/*
...
...
*/
if (key == -1){
return new Tuple2<>("key1", value);
} else if (0 <= key && key <=3 ) {
return new Tuple2<>("key2", value);
} else if (4 <= key && key <= 5) {
return new Tuple2<>("key3", value);
} else if (key == 6) {
sixNum.add(1L);
return new Tuple2<>("key4", value);
} else if (7 <= key && key <= 11) {
return new Tuple2<>("key5", value);
}else {
return new Tuple2<>("other", value);
}
return null;
}
};
}
}).filter(data -> data != null)
.coalesce(10)
.saveAsHadoopFile(output, String.class, String.class, XXXX.RDDMultipleTextOutputFormat.class); // 第一個String.class是key的類型,第二個是value的類型
sparkSession.stop();
jsc.stop();
}
public static class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat<String, String> {
@Override
public String generateFileNameForKeyValue(String key, String value,
String name) {
return key + "/" + name;
}
@Override
public String generateActualKey(String key, String value) {
return null;
}
}
}
最后生成寫入的文件路徑就是/output/key 了

浙公網安備 33010602011771號