hadoop多文件格式輸入
版本號(hào):
CDH5.0.0 (hdfs:2.3。mapreduce:2.3,yarn:2.3)
hadoop多文件格式輸入,一般能夠使用MultipleInputs類指定不同的輸入文件路徑以及輸入文件格式。
比方如今有例如以下的需求:
現(xiàn)有兩份數(shù)據(jù):
phone:
123,good number 124,common number 125,bad numberuser:
zhangsan,123 lisi,124 wangwu,125
如今須要把user和phone依照phone number連接起來(lái),得到以下的結(jié)果:
zhangsan,123,good number lisi,124,common number wangwu,125,bad number那么就能夠使用MultipleInputs來(lái)操作,這里把user和phone上傳到hdfs文件夾中,各自是/multiple/user/user , /multiple/phone/phone。
設(shè)計(jì)的MultipleDriver例如以下:
package multiple.input;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
/**
* input1(/multiple/user/user):
* username,user_phone
*
* input2(/multiple/phone/phone):
* user_phone,description
*
* output: username,user_phone,description
*
* @author fansy
*
*/
public class MultipleDriver extends Configured implements Tool{
// private Logger log = LoggerFactory.getLogger(MultipleDriver.class);
private String input1=null;
private String input2=null;
private String output=null;
private String delimiter=null;
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
// conf.set("fs.defaultFS", "hdfs://node33:8020");
// conf.set("mapreduce.framework.name", "yarn");
// conf.set("yarn.resourcemanager.address", "node33:8032");
ToolRunner.run(conf, new MultipleDriver(), args);
}
@Override
public int run(String[] arg0) throws Exception {
configureArgs(arg0);
checkArgs();
Configuration conf= getConf();
conf.set("delimiter", delimiter);
@SuppressWarnings("deprecation")
Job job = new Job(conf, "merge user and phone information ");
job.setJarByClass(MultipleDriver.class);
job.setReducerClass(MultipleReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlagStringDataType.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(1);
MultipleInputs.addInputPath(job, new Path(input1), TextInputFormat.class, Multiple1Mapper.class);
MultipleInputs.addInputPath(job, new Path(input2), TextInputFormat.class, Multiple2Mapper.class);
FileOutputFormat.setOutputPath(job, new Path(output));
int res = job.waitForCompletion(true) ? 0 : 1;
return res;
}
/**
* check the args
*/
private void checkArgs() {
if(input1==null||"".equals(input1)){
System.out.println("no user input...");
printUsage();
System.exit(-1);
}
if(input2==null||"".equals(input2)){
System.out.println("no phone input...");
printUsage();
System.exit(-1);
}
if(output==null||"".equals(output)){
System.out.println("no output...");
printUsage();
System.exit(-1);
}
if(delimiter==null||"".equals(delimiter)){
System.out.println("no delimiter...");
printUsage();
System.exit(-1);
}
}
/**
* configuration the args
* @param args
*/
private void configureArgs(String[] args) {
for(int i=0;i<args.length;i++){
if("-i1".equals(args[i])){
input1=args[++i];
}
if("-i2".equals(args[i])){
input2=args[++i];
}
if("-o".equals(args[i])){
output=args[++i];
}
if("-delimiter".equals(args[i])){
delimiter=args[++i];
}
}
}
public static void printUsage(){
System.err.println("Usage:");
System.err.println("-i1 input \t user data path.");
System.err.println("-i2 input \t phone data path.");
System.err.println("-o output \t output data path.");
System.err.println("-delimiter data delimiter , default is comma .");
}
}
這里指定兩個(gè)mapper和一個(gè)reducer,兩個(gè)mapper分別相應(yīng)處理user和phone的數(shù)據(jù),分別例如以下:
mapper1(處理user數(shù)據(jù)):
package multiple.input;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* input :
* username,phone
*
* output:
* <key,value> --> <[phone],[0,username]>
* @author fansy
*
*/
public class Multiple1Mapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{
private Logger log = LoggerFactory.getLogger(Multiple1Mapper.class);
private String delimiter=null; // default is comma
@Override
public void setup(Context cxt){
delimiter= cxt.getConfiguration().get("delimiter", ",");
log.info("This is the begin of Multiple1Mapper");
}
@Override
public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
String info= new String(value.getBytes(),"UTF-8");
String[] values = info.split(delimiter);
if(values.length!=2){
return;
}
log.info("key-->"+values[1]+"=========value-->"+"[0,"+values[0]+"]");
cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0]));
}
}
mapper2(處理phone數(shù)據(jù)):
package multiple.input;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* input :
* phone,description
*
* output:
* <key,value> --> <[phone],[1,description]>
* @author fansy
*
*/
public class Multiple2Mapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{
private Logger log = LoggerFactory.getLogger(Multiple2Mapper.class);
private String delimiter=null; // default is comma
@Override
public void setup(Context cxt){
delimiter= cxt.getConfiguration().get("delimiter", ",");
log.info("This is the begin of Multiple2Mapper");
}
@Override
public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
String[] values= value.toString().split(delimiter);
if(values.length!=2){
return;
}
log.info("key-->"+values[0]+"=========value-->"+"[1,"+values[1]+"]");
cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1]));
}
}
這里的FlagStringDataType是自己定義的:package multiple.input;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.primitives.Ints;
public class FlagStringDataType implements WritableComparable<FlagStringDataType> {
private Logger log = LoggerFactory.getLogger(FlagStringDataType.class);
private String value;
private int flag;
public FlagStringDataType() {
}
public FlagStringDataType(int flag,String value) {
this.value = value;
this.flag=flag;
}
public String get() {
return value;
}
public void set(String value) {
this.value = value;
}
@Override
public boolean equals(Object other) {
return other != null && getClass().equals(other.getClass())
&& ((FlagStringDataType) other).get() == value
&&((FlagStringDataType) other).getFlag()==flag;
}
@Override
public int hashCode() {
return Ints.hashCode(flag)+value.hashCode();
}
@Override
public int compareTo(FlagStringDataType other) {
if (flag >= other.flag) {
if (flag > other.flag) {
return 1;
}
} else {
return -1;
}
return value.compareTo(other.value);
}
@Override
public void write(DataOutput out) throws IOException {
log.info("in write()::"+"flag:"+flag+",vlaue:"+value);
out.writeInt(flag);
out.writeUTF(value);
}
@Override
public void readFields(DataInput in) throws IOException {
log.info("in read()::"+"flag:"+flag+",vlaue:"+value);
flag=in.readInt();
value = in.readUTF();
log.info("in read()::"+"flag:"+flag+",vlaue:"+value);
}
public int getFlag() {
return flag;
}
public void setFlag(int flag) {
this.flag = flag;
}
public String toString(){
return flag+":"+value;
}
}
這個(gè)自己定義類,使用一個(gè)flag來(lái)指定是哪個(gè)數(shù)據(jù)。而value則相應(yīng)是其值。
這樣做的優(yōu)點(diǎn)是在reduce端能夠依據(jù)flag的值來(lái)推斷其輸出位置。這樣的設(shè)計(jì)方式能夠?qū)Χ喾N輸入的整合有非常大幫助,在mahout中也能夠看到這樣的設(shè)計(jì)。
reducer(匯總輸出數(shù)據(jù)):
package multiple.input;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MultipleReducer extends Reducer<Text,FlagStringDataType,Text,NullWritable>{
private Logger log = LoggerFactory.getLogger(MultipleReducer.class);
private String delimiter=null; // default is comma
@Override
public void setup(Context cxt){
delimiter= cxt.getConfiguration().get("delimiter", ",");
}
@Override
public void reduce(Text key, Iterable<FlagStringDataType> values,Context cxt) throws IOException,InterruptedException{
log.info("================");
log.info(" =======");
log.info(" ==");
String[] value= new String[3];
value[2]=key.toString();
for(FlagStringDataType v:values){
int index= v.getFlag();
log.info("index:"+index+"-->value:"+v.get());
value[index]= v.get();
}
log.info(" ==");
log.info(" =======");
log.info("================");
cxt.write(new Text(value[2]+delimiter+value[0]+delimiter+value[1]),NullWritable.get());
}
}
這樣設(shè)計(jì)的優(yōu)點(diǎn)是,能夠針對(duì)不同的輸入數(shù)據(jù)採(cǎi)取不同的邏輯處理。并且不同的輸入數(shù)據(jù)能夠是序列文件的格式。
以下介紹一種方式和上面的比。略有不足。可是能夠借鑒。
首先是Driver:
package multiple.input;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
/**
* input1(/multiple/user/user):
* username,user_phone
*
* input2(/multiple/phone/phone):
* user_phone,description
*
* output: username,user_phone,description
*
* @author fansy
*
*/
public class MultipleDriver2 extends Configured implements Tool{
// private Logger log = LoggerFactory.getLogger(MultipleDriver.class);
private String input1=null;
private String input2=null;
private String output=null;
private String delimiter=null;
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
// conf.set("fs.defaultFS", "hdfs://node33:8020");
// conf.set("mapreduce.framework.name", "yarn");
// conf.set("yarn.resourcemanager.address", "node33:8032");
ToolRunner.run(conf, new MultipleDriver2(), args);
}
@Override
public int run(String[] arg0) throws Exception {
configureArgs(arg0);
checkArgs();
Configuration conf= getConf();
conf.set("delimiter", delimiter);
@SuppressWarnings("deprecation")
Job job = new Job(conf, "merge user and phone information ");
job.setJarByClass(MultipleDriver2.class);
job.setMapperClass(MultipleMapper.class);
job.setReducerClass(MultipleReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlagStringDataType.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(1);
FileInputFormat.addInputPath(job, new Path(input1));
FileInputFormat.addInputPath(job, new Path(input2));
FileOutputFormat.setOutputPath(job, new Path(output));
int res = job.waitForCompletion(true) ? 0 : 1;
return res;
}
/**
* check the args
*/
private void checkArgs() {
if(input1==null||"".equals(input1)){
System.out.println("no user input...");
printUsage();
System.exit(-1);
}
if(input2==null||"".equals(input2)){
System.out.println("no phone input...");
printUsage();
System.exit(-1);
}
if(output==null||"".equals(output)){
System.out.println("no output...");
printUsage();
System.exit(-1);
}
if(delimiter==null||"".equals(delimiter)){
System.out.println("no delimiter...");
printUsage();
System.exit(-1);
}
}
/**
* configuration the args
* @param args
*/
private void configureArgs(String[] args) {
for(int i=0;i<args.length;i++){
if("-i1".equals(args[i])){
input1=args[++i];
}
if("-i2".equals(args[i])){
input2=args[++i];
}
if("-o".equals(args[i])){
output=args[++i];
}
if("-delimiter".equals(args[i])){
delimiter=args[++i];
}
}
}
public static void printUsage(){
System.err.println("Usage:");
System.err.println("-i1 input \t user data path.");
System.err.println("-i2 input \t phone data path.");
System.err.println("-o output \t output data path.");
System.err.println("-delimiter data delimiter , default is comma .");
}
}
這里加入路徑直接使用FileInputFormat加入輸入路徑,這樣的話,針對(duì)不同的輸入數(shù)據(jù)的不同業(yè)務(wù)邏輯能夠在mapper中先推斷眼下正在處理的是那個(gè)數(shù)據(jù)。然后依據(jù)其路徑來(lái)進(jìn)行相應(yīng)的業(yè)務(wù)邏輯處理:
package multiple.input;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
* input1 :
* username,phone
*
* input2
* phone,description
*
* output:
* <key,value> --> <[phone],[0,username]>
* <key,value> --> <[phone],[1,description]>
* @author fansy
*
*/
public class MultipleMapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{
private String delimiter=null; // default is comma
private boolean flag=false;
@Override
public void setup(Context cxt){
delimiter= cxt.getConfiguration().get("delimiter", ",");
InputSplit input=cxt.getInputSplit();
String filename=((FileSplit) input).getPath().getParent().getName();
if("user".equals(filename)){
flag=true;
}
}
@Override
public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
String[] values= value.toString().split(delimiter);
if(values.length!=2){
return;
}
if(flag){
cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0]));
}else{
cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1]));
}
}
}
整體來(lái)說(shuō)。這樣的處理方式事實(shí)上是不如第一種的,在每一個(gè)map函數(shù)中都須要進(jìn)行推斷。比第一種多了非常多操作;同一時(shí)候。針對(duì)不同的序列文件,這樣的方式處理不了(Key、value的類型不一樣的情況下)。
所以針對(duì)多文件格式的輸入,不妨使用第一種方式。
分享,成長(zhǎng),快樂(lè)
轉(zhuǎn)載請(qǐng)注明blog地址:http://blog.csdn.net/fansy1990
浙公網(wǎng)安備 33010602011771號(hào)