<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      hadoop多文件格式輸入

      版本號(hào):

      CDH5.0.0 (hdfs:2.3。mapreduce:2.3,yarn:2.3)

      hadoop多文件格式輸入,一般能夠使用MultipleInputs類指定不同的輸入文件路徑以及輸入文件格式。

      比方如今有例如以下的需求:

      現(xiàn)有兩份數(shù)據(jù):

      phone:

      123,good number
      124,common number
      125,bad number
      user:

      zhangsan,123
      lisi,124
      wangwu,125

      如今須要把user和phone依照phone number連接起來(lái),得到以下的結(jié)果:

      zhangsan,123,good number
      lisi,124,common number
      wangwu,125,bad number
      那么就能夠使用MultipleInputs來(lái)操作,這里把user和phone上傳到hdfs文件夾中,各自是/multiple/user/user , /multiple/phone/phone。

      設(shè)計(jì)的MultipleDriver例如以下:

      package multiple.input;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.conf.Configured;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
      import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      import org.apache.hadoop.util.Tool;
      import org.apache.hadoop.util.ToolRunner;
      //import org.slf4j.Logger;
      //import org.slf4j.LoggerFactory;
      /**
       * input1(/multiple/user/user):
       * username,user_phone
       *  
       * input2(/multiple/phone/phone):
       *  user_phone,description 
       *  
       * output: username,user_phone,description
       * 
       * @author fansy
       *
       */
      public class MultipleDriver extends Configured implements Tool{
      //	private  Logger log = LoggerFactory.getLogger(MultipleDriver.class);
      	
      	private String input1=null;
      	private String input2=null;
      	private String output=null;
      	private String delimiter=null;
      	
      	public static void main(String[] args) throws Exception {
      		Configuration conf=new Configuration();
      //		conf.set("fs.defaultFS", "hdfs://node33:8020");  
      //        conf.set("mapreduce.framework.name", "yarn");  
      //        conf.set("yarn.resourcemanager.address", "node33:8032"); 
              
      		ToolRunner.run(conf, new MultipleDriver(), args);
      	}
      
      	@Override
      	public int run(String[] arg0) throws Exception {
      		configureArgs(arg0);
      		checkArgs();
      		
      		Configuration conf= getConf();
      		conf.set("delimiter", delimiter);
      		 @SuppressWarnings("deprecation")
      		Job job = new Job(conf, "merge user and phone information ");
              job.setJarByClass(MultipleDriver.class);
      
              job.setReducerClass(MultipleReducer.class);
              job.setMapOutputKeyClass(Text.class);
              job.setMapOutputValueClass(FlagStringDataType.class);
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(NullWritable.class);
              
              job.setNumReduceTasks(1);
              MultipleInputs.addInputPath(job, new Path(input1), TextInputFormat.class, Multiple1Mapper.class);
              MultipleInputs.addInputPath(job, new Path(input2), TextInputFormat.class, Multiple2Mapper.class);
              FileOutputFormat.setOutputPath(job, new Path(output));
              
              int res = job.waitForCompletion(true) ? 0 : 1;
              return res;
      	}
      	
      
      	/**
      	 * check the args 
      	 */
      	private void checkArgs() {
      		if(input1==null||"".equals(input1)){
      			System.out.println("no user input...");
      			printUsage();
      			System.exit(-1);
      		}
      		if(input2==null||"".equals(input2)){
      			System.out.println("no phone input...");
      			printUsage();
      			System.exit(-1);
      		}
      		if(output==null||"".equals(output)){
      			System.out.println("no output...");
      			printUsage();
      			System.exit(-1);
      		}
      		if(delimiter==null||"".equals(delimiter)){
      			System.out.println("no delimiter...");
      			printUsage();
      			System.exit(-1);
      		}
      	
      	}
      
      	/**
      	 * configuration the args
      	 * @param args
      	 */
      	private void configureArgs(String[] args) {
          	for(int i=0;i<args.length;i++){
          		if("-i1".equals(args[i])){
          			input1=args[++i];
          		}
          		if("-i2".equals(args[i])){
          			input2=args[++i];
          		}
          		
          		if("-o".equals(args[i])){
          			output=args[++i];
          		}
          		
          		if("-delimiter".equals(args[i])){
          			delimiter=args[++i];
          		}
          		
          	}
      	}
      	public static void printUsage(){
          	System.err.println("Usage:");
          	System.err.println("-i1 input \t user data path.");
          	System.err.println("-i2 input \t phone data path.");
          	System.err.println("-o output \t output data path.");
          	System.err.println("-delimiter  data delimiter , default is comma  .");
          }
      }
      
      

      這里指定兩個(gè)mapper和一個(gè)reducer,兩個(gè)mapper分別相應(yīng)處理user和phone的數(shù)據(jù),分別例如以下:

      mapper1(處理user數(shù)據(jù)):

      package multiple.input;
      
      import java.io.IOException;
      
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      /**
       * input :
       * username,phone
       * 
       * output:
       * <key,value>  --> <[phone],[0,username]>
       * @author fansy
       *
       */
      public class Multiple1Mapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{
      	private  Logger log = LoggerFactory.getLogger(Multiple1Mapper.class);
      	private String delimiter=null; // default is comma
      	@Override
      	public void setup(Context cxt){
      		delimiter= cxt.getConfiguration().get("delimiter", ",");
      		log.info("This is the begin of Multiple1Mapper");
      	} 
      	
      	@Override
      	public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
      		String info= new String(value.getBytes(),"UTF-8");
      		String[] values = info.split(delimiter);
      		if(values.length!=2){
      			return;
      		}
      		log.info("key-->"+values[1]+"=========value-->"+"[0,"+values[0]+"]");
      		cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0]));
      	}
      }
      

      mapper2(處理phone數(shù)據(jù)):

      package multiple.input;
      
      import java.io.IOException;
      
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      /**
       * input :
       * phone,description
       * 
       * output:
       * <key,value>  --> <[phone],[1,description]>
       * @author fansy
       *
       */
      public class Multiple2Mapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{
      	private  Logger log = LoggerFactory.getLogger(Multiple2Mapper.class);
      	private String delimiter=null; // default is comma
      	@Override
      	public void setup(Context cxt){
      		delimiter= cxt.getConfiguration().get("delimiter", ",");
      		log.info("This is the begin of Multiple2Mapper");
      	} 
      	
      	@Override
      	public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
      		String[] values= value.toString().split(delimiter);
      		if(values.length!=2){
      			return;
      		}
      		log.info("key-->"+values[0]+"=========value-->"+"[1,"+values[1]+"]");
      		cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1]));
      	}
      }
      
      這里的FlagStringDataType是自己定義的:

      package multiple.input;
      
      import java.io.DataInput;
      import java.io.DataOutput;
      import java.io.IOException;
      
      import org.apache.hadoop.io.WritableComparable;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import com.google.common.primitives.Ints;
      
      public class FlagStringDataType implements WritableComparable<FlagStringDataType> {
      	private  Logger log = LoggerFactory.getLogger(FlagStringDataType.class);
        private String value;
        private int flag;
        public FlagStringDataType() {
        }
      
        public FlagStringDataType(int flag,String value) {
          this.value = value;
          this.flag=flag;
        }
      
        public String get() {
          return value;
        }
      
        public void set(String value) {
          this.value = value;
        }
      
        @Override
        public boolean equals(Object other) {
          return other != null && getClass().equals(other.getClass()) 
          		&& ((FlagStringDataType) other).get() == value
          		&&((FlagStringDataType) other).getFlag()==flag;
        }
      
        @Override
        public int hashCode() {
          return Ints.hashCode(flag)+value.hashCode();
        }
      
        @Override
        public int compareTo(FlagStringDataType other) {
      	 
          if (flag >= other.flag) {
            if (flag > other.flag) {
              return 1;
            }
          } else {
            return -1;
          }
          return value.compareTo(other.value);
        }
      
        @Override
        public void write(DataOutput out) throws IOException {
      	log.info("in write()::"+"flag:"+flag+",vlaue:"+value);
          out.writeInt(flag);
          out.writeUTF(value);
        }
      
        @Override
        public void readFields(DataInput in) throws IOException {
      	  log.info("in read()::"+"flag:"+flag+",vlaue:"+value);
      	  flag=in.readInt();
      	  value = in.readUTF();
      	  log.info("in read()::"+"flag:"+flag+",vlaue:"+value);
        }
      
      public int getFlag() {
      	return flag;
      }
      
      public void setFlag(int flag) {
      	this.flag = flag;
      }
      
      public String toString(){
      	return flag+":"+value;
      }
      
      }
      

      這個(gè)自己定義類,使用一個(gè)flag來(lái)指定是哪個(gè)數(shù)據(jù)。而value則相應(yīng)是其值。

      這樣做的優(yōu)點(diǎn)是在reduce端能夠依據(jù)flag的值來(lái)推斷其輸出位置。這樣的設(shè)計(jì)方式能夠?qū)Χ喾N輸入的整合有非常大幫助,在mahout中也能夠看到這樣的設(shè)計(jì)。

      reducer(匯總輸出數(shù)據(jù)):

      package multiple.input;
      
      import java.io.IOException;
      
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      public class MultipleReducer extends Reducer<Text,FlagStringDataType,Text,NullWritable>{
      	private  Logger log = LoggerFactory.getLogger(MultipleReducer.class);
      	private String delimiter=null; // default is comma
      	@Override
      	public void setup(Context cxt){
      		delimiter= cxt.getConfiguration().get("delimiter", ",");
      	} 
      	@Override
      	public void reduce(Text key, Iterable<FlagStringDataType> values,Context cxt) throws IOException,InterruptedException{
      		log.info("================");
      		log.info("         =======");
      		log.info("              ==");
      		String[] value= new String[3];
      		value[2]=key.toString();
      		for(FlagStringDataType v:values){
      			int index= v.getFlag();
      			log.info("index:"+index+"-->value:"+v.get());
      			value[index]= v.get();
      		}
      		log.info("              ==");
      		log.info("         =======");
      		log.info("================");
      		cxt.write(new Text(value[2]+delimiter+value[0]+delimiter+value[1]),NullWritable.get());
      	}
      }
      

      這樣設(shè)計(jì)的優(yōu)點(diǎn)是,能夠針對(duì)不同的輸入數(shù)據(jù)採(cǎi)取不同的邏輯處理。并且不同的輸入數(shù)據(jù)能夠是序列文件的格式。

      以下介紹一種方式和上面的比。略有不足。可是能夠借鑒。

      首先是Driver:

      package multiple.input;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.conf.Configured;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      import org.apache.hadoop.util.Tool;
      import org.apache.hadoop.util.ToolRunner;
      //import org.slf4j.Logger;
      //import org.slf4j.LoggerFactory;
      /**
       * input1(/multiple/user/user):
       * username,user_phone
       *  
       * input2(/multiple/phone/phone):
       *  user_phone,description 
       *  
       * output: username,user_phone,description
       * 
       * @author fansy
       *
       */
      public class MultipleDriver2 extends Configured implements Tool{
      //	private  Logger log = LoggerFactory.getLogger(MultipleDriver.class);
      	
      	private String input1=null;
      	private String input2=null;
      	private String output=null;
      	private String delimiter=null;
      	
      	public static void main(String[] args) throws Exception {
      		Configuration conf=new Configuration();
      //		conf.set("fs.defaultFS", "hdfs://node33:8020");  
      //        conf.set("mapreduce.framework.name", "yarn");  
      //        conf.set("yarn.resourcemanager.address", "node33:8032"); 
              
      		ToolRunner.run(conf, new MultipleDriver2(), args);
      	}
      
      	@Override
      	public int run(String[] arg0) throws Exception {
      		configureArgs(arg0);
      		checkArgs();
      		
      		Configuration conf= getConf();
      		conf.set("delimiter", delimiter);
      		 @SuppressWarnings("deprecation")
      		Job job = new Job(conf, "merge user and phone information ");
              job.setJarByClass(MultipleDriver2.class);
              job.setMapperClass(MultipleMapper.class);
              job.setReducerClass(MultipleReducer.class);
              job.setMapOutputKeyClass(Text.class);
              job.setMapOutputValueClass(FlagStringDataType.class);
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(NullWritable.class);
              
              job.setNumReduceTasks(1);
              FileInputFormat.addInputPath(job, new Path(input1));
              FileInputFormat.addInputPath(job, new Path(input2));
              FileOutputFormat.setOutputPath(job, new Path(output));
              
              int res = job.waitForCompletion(true) ? 0 : 1;
              return res;
      	}
      	
      
      	/**
      	 * check the args 
      	 */
      	private void checkArgs() {
      		if(input1==null||"".equals(input1)){
      			System.out.println("no user input...");
      			printUsage();
      			System.exit(-1);
      		}
      		if(input2==null||"".equals(input2)){
      			System.out.println("no phone input...");
      			printUsage();
      			System.exit(-1);
      		}
      		if(output==null||"".equals(output)){
      			System.out.println("no output...");
      			printUsage();
      			System.exit(-1);
      		}
      		if(delimiter==null||"".equals(delimiter)){
      			System.out.println("no delimiter...");
      			printUsage();
      			System.exit(-1);
      		}
      	
      	}
      
      	/**
      	 * configuration the args
      	 * @param args
      	 */
      	private void configureArgs(String[] args) {
          	for(int i=0;i<args.length;i++){
          		if("-i1".equals(args[i])){
          			input1=args[++i];
          		}
          		if("-i2".equals(args[i])){
          			input2=args[++i];
          		}
          		
          		if("-o".equals(args[i])){
          			output=args[++i];
          		}
          		
          		if("-delimiter".equals(args[i])){
          			delimiter=args[++i];
          		}
          		
          	}
      	}
      	public static void printUsage(){
          	System.err.println("Usage:");
          	System.err.println("-i1 input \t user data path.");
          	System.err.println("-i2 input \t phone data path.");
          	System.err.println("-o output \t output data path.");
          	System.err.println("-delimiter  data delimiter , default is comma  .");
          }
      }
      
      

      這里加入路徑直接使用FileInputFormat加入輸入路徑,這樣的話,針對(duì)不同的輸入數(shù)據(jù)的不同業(yè)務(wù)邏輯能夠在mapper中先推斷眼下正在處理的是那個(gè)數(shù)據(jù)。然后依據(jù)其路徑來(lái)進(jìn)行相應(yīng)的業(yè)務(wù)邏輯處理:

      package multiple.input;
      
      import java.io.IOException;
      
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.InputSplit;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.lib.input.FileSplit;
      /**
       * input1 :
       * username,phone
       * 
       * input2
       * phone,description
       * 
       * output:
       * <key,value>  --> <[phone],[0,username]>
       * <key,value>  --> <[phone],[1,description]>
       * @author fansy
       *
       */
      public class MultipleMapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{
      	
      	private String delimiter=null; // default is comma
      	private boolean flag=false;
      	@Override
      	public void setup(Context cxt){
      		delimiter= cxt.getConfiguration().get("delimiter", ",");
      		InputSplit input=cxt.getInputSplit();  
      	    String filename=((FileSplit) input).getPath().getParent().getName();
      	    if("user".equals(filename)){
      	    	flag=true;
      	    }
      	} 
      	
      	@Override
      	public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
      		String[] values= value.toString().split(delimiter);
      		if(values.length!=2){
      			return;
      		}
      		if(flag){
      			cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0]));
      		}else{
      			cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1]));
      		}
      	}
      }
      

      整體來(lái)說(shuō)。這樣的處理方式事實(shí)上是不如第一種的,在每一個(gè)map函數(shù)中都須要進(jìn)行推斷。比第一種多了非常多操作;同一時(shí)候。針對(duì)不同的序列文件,這樣的方式處理不了(Key、value的類型不一樣的情況下)。

      所以針對(duì)多文件格式的輸入,不妨使用第一種方式。



      分享,成長(zhǎng),快樂(lè)

      轉(zhuǎn)載請(qǐng)注明blog地址:http://blog.csdn.net/fansy1990




      posted @ 2017-08-21 11:48  zsychanpin  閱讀(712)  評(píng)論(0)    收藏  舉報(bào)
      主站蜘蛛池模板: 亚洲精品国产男人的天堂| 中文字幕精品久久久久人妻红杏1| 花莲市| av亚洲一区二区在线| 亚洲人成网站18禁止无码| 亚洲免费观看视频| 国产精品久久久久久亚洲色| 精品激情视频一区二区三区| 国产成人精品1024免费下载| 国产成人无码免费视频在线| 亚洲中文字幕在线无码一区二区| 午夜三级成人在线观看| 亚洲午夜精品久久久久久浪潮| 亚洲情色av一区二区| 国产在线观看免费人成视频| 翘臀少妇被扒开屁股日出水爆乳| 国产成人精品亚洲资源| 高清破外女出血AV毛片| 竹山县| 乱码中文字幕| 亚洲aⅴ综合av国产八av| 人妻中文字幕精品系列| 国产精品一区二区小视频| 高清性欧美暴力猛交| 色综合网天天综合色中文| 亚洲成在人线AV品善网好看| 亚洲国产精品无码久久久秋霞1| 国产蜜臀在线一区二区三区| 麻豆亚州无矿码专区视频| 少妇高潮喷水久久久影院| 亚洲综合色婷婷中文字幕| 国产亚洲精品VA片在线播放| 亚洲精品一区二区制服| 国产精品一区二区传媒蜜臀| 野花社区在线观看视频| 亚洲一区二区三区久久受| 喷潮出白浆视频在线观看| 精品熟女日韩中文十区| 国产精品亚洲第一区在线| 少妇无码av无码一区| 内射极品少妇xxxxxhd|