zoukankan      html  css  js  c++  java
  • hadoop多文件格式输入

    版本号:

    CDH5.0.0 (hdfs:2.3。mapreduce:2.3,yarn:2.3)

    hadoop多文件格式输入,一般能够使用MultipleInputs类指定不同的输入文件路径以及输入文件格式。

    比方如今有例如以下的需求:

    现有两份数据:

    phone:

    123,good number
    124,common number
    125,bad number
    user:

    zhangsan,123
    lisi,124
    wangwu,125

    如今须要把user和phone依照phone number连接起来,得到以下的结果:

    zhangsan,123,good number
    lisi,124,common number
    wangwu,125,bad number
    那么就能够使用MultipleInputs来操作,这里把user和phone上传到hdfs文件夹中,各自是/multiple/user/user , /multiple/phone/phone。

    设计的MultipleDriver例如以下:

    package multiple.input;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    //import org.slf4j.Logger;
    //import org.slf4j.LoggerFactory;
    /**
     * input1(/multiple/user/user):
     * username,user_phone
     *  
     * input2(/multiple/phone/phone):
     *  user_phone,description 
     *  
     * output: username,user_phone,description
     * 
     * @author fansy
     *
     */
    public class MultipleDriver extends Configured implements Tool{
    //	private  Logger log = LoggerFactory.getLogger(MultipleDriver.class);
    	
    	private String input1=null;
    	private String input2=null;
    	private String output=null;
    	private String delimiter=null;
    	
    	public static void main(String[] args) throws Exception {
    		Configuration conf=new Configuration();
    //		conf.set("fs.defaultFS", "hdfs://node33:8020");  
    //        conf.set("mapreduce.framework.name", "yarn");  
    //        conf.set("yarn.resourcemanager.address", "node33:8032"); 
            
    		ToolRunner.run(conf, new MultipleDriver(), args);
    	}
    
    	@Override
    	public int run(String[] arg0) throws Exception {
    		configureArgs(arg0);
    		checkArgs();
    		
    		Configuration conf= getConf();
    		conf.set("delimiter", delimiter);
    		 @SuppressWarnings("deprecation")
    		Job job = new Job(conf, "merge user and phone information ");
            job.setJarByClass(MultipleDriver.class);
    
            job.setReducerClass(MultipleReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlagStringDataType.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            
            job.setNumReduceTasks(1);
            MultipleInputs.addInputPath(job, new Path(input1), TextInputFormat.class, Multiple1Mapper.class);
            MultipleInputs.addInputPath(job, new Path(input2), TextInputFormat.class, Multiple2Mapper.class);
            FileOutputFormat.setOutputPath(job, new Path(output));
            
            int res = job.waitForCompletion(true) ? 0 : 1;
            return res;
    	}
    	
    
    	/**
    	 * check the args 
    	 */
    	private void checkArgs() {
    		if(input1==null||"".equals(input1)){
    			System.out.println("no user input...");
    			printUsage();
    			System.exit(-1);
    		}
    		if(input2==null||"".equals(input2)){
    			System.out.println("no phone input...");
    			printUsage();
    			System.exit(-1);
    		}
    		if(output==null||"".equals(output)){
    			System.out.println("no output...");
    			printUsage();
    			System.exit(-1);
    		}
    		if(delimiter==null||"".equals(delimiter)){
    			System.out.println("no delimiter...");
    			printUsage();
    			System.exit(-1);
    		}
    	
    	}
    
    	/**
    	 * configuration the args
    	 * @param args
    	 */
    	private void configureArgs(String[] args) {
        	for(int i=0;i<args.length;i++){
        		if("-i1".equals(args[i])){
        			input1=args[++i];
        		}
        		if("-i2".equals(args[i])){
        			input2=args[++i];
        		}
        		
        		if("-o".equals(args[i])){
        			output=args[++i];
        		}
        		
        		if("-delimiter".equals(args[i])){
        			delimiter=args[++i];
        		}
        		
        	}
    	}
    	public static void printUsage(){
        	System.err.println("Usage:");
        	System.err.println("-i1 input 	 user data path.");
        	System.err.println("-i2 input 	 phone data path.");
        	System.err.println("-o output 	 output data path.");
        	System.err.println("-delimiter  data delimiter , default is comma  .");
        }
    }
    
    

    这里指定两个mapper和一个reducer,两个mapper分别相应处理user和phone的数据,分别例如以下:

    mapper1(处理user数据):

    package multiple.input;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    /**
     * input :
     * username,phone
     * 
     * output:
     * <key,value>  --> <[phone],[0,username]>
     * @author fansy
     *
     */
    public class Multiple1Mapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{
    	private  Logger log = LoggerFactory.getLogger(Multiple1Mapper.class);
    	private String delimiter=null; // default is comma
    	@Override
    	public void setup(Context cxt){
    		delimiter= cxt.getConfiguration().get("delimiter", ",");
    		log.info("This is the begin of Multiple1Mapper");
    	} 
    	
    	@Override
    	public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
    		String info= new String(value.getBytes(),"UTF-8");
    		String[] values = info.split(delimiter);
    		if(values.length!=2){
    			return;
    		}
    		log.info("key-->"+values[1]+"=========value-->"+"[0,"+values[0]+"]");
    		cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0]));
    	}
    }
    

    mapper2(处理phone数据):

    package multiple.input;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    /**
     * input :
     * phone,description
     * 
     * output:
     * <key,value>  --> <[phone],[1,description]>
     * @author fansy
     *
     */
    public class Multiple2Mapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{
    	private  Logger log = LoggerFactory.getLogger(Multiple2Mapper.class);
    	private String delimiter=null; // default is comma
    	@Override
    	public void setup(Context cxt){
    		delimiter= cxt.getConfiguration().get("delimiter", ",");
    		log.info("This is the begin of Multiple2Mapper");
    	} 
    	
    	@Override
    	public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
    		String[] values= value.toString().split(delimiter);
    		if(values.length!=2){
    			return;
    		}
    		log.info("key-->"+values[0]+"=========value-->"+"[1,"+values[1]+"]");
    		cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1]));
    	}
    }
    
    这里的FlagStringDataType是自己定义的:

    package multiple.input;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.google.common.primitives.Ints;
    
    public class FlagStringDataType implements WritableComparable<FlagStringDataType> {
    	private  Logger log = LoggerFactory.getLogger(FlagStringDataType.class);
      private String value;
      private int flag;
      public FlagStringDataType() {
      }
    
      public FlagStringDataType(int flag,String value) {
        this.value = value;
        this.flag=flag;
      }
    
      public String get() {
        return value;
      }
    
      public void set(String value) {
        this.value = value;
      }
    
      @Override
      public boolean equals(Object other) {
        return other != null && getClass().equals(other.getClass()) 
        		&& ((FlagStringDataType) other).get() == value
        		&&((FlagStringDataType) other).getFlag()==flag;
      }
    
      @Override
      public int hashCode() {
        return Ints.hashCode(flag)+value.hashCode();
      }
    
      @Override
      public int compareTo(FlagStringDataType other) {
    	 
        if (flag >= other.flag) {
          if (flag > other.flag) {
            return 1;
          }
        } else {
          return -1;
        }
        return value.compareTo(other.value);
      }
    
      @Override
      public void write(DataOutput out) throws IOException {
    	log.info("in write()::"+"flag:"+flag+",vlaue:"+value);
        out.writeInt(flag);
        out.writeUTF(value);
      }
    
      @Override
      public void readFields(DataInput in) throws IOException {
    	  log.info("in read()::"+"flag:"+flag+",vlaue:"+value);
    	  flag=in.readInt();
    	  value = in.readUTF();
    	  log.info("in read()::"+"flag:"+flag+",vlaue:"+value);
      }
    
    public int getFlag() {
    	return flag;
    }
    
    public void setFlag(int flag) {
    	this.flag = flag;
    }
    
    public String toString(){
    	return flag+":"+value;
    }
    
    }
    

    这个自己定义类,使用一个flag来指定是哪个数据。而value则相应是其值。

    这样做的优点是在reduce端能够依据flag的值来推断其输出位置。这样的设计方式能够对多种输入的整合有非常大帮助,在mahout中也能够看到这样的设计。

    reducer(汇总输出数据):

    package multiple.input;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class MultipleReducer extends Reducer<Text,FlagStringDataType,Text,NullWritable>{
    	private  Logger log = LoggerFactory.getLogger(MultipleReducer.class);
    	private String delimiter=null; // default is comma
    	@Override
    	public void setup(Context cxt){
    		delimiter= cxt.getConfiguration().get("delimiter", ",");
    	} 
    	@Override
    	public void reduce(Text key, Iterable<FlagStringDataType> values,Context cxt) throws IOException,InterruptedException{
    		log.info("================");
    		log.info("         =======");
    		log.info("              ==");
    		String[] value= new String[3];
    		value[2]=key.toString();
    		for(FlagStringDataType v:values){
    			int index= v.getFlag();
    			log.info("index:"+index+"-->value:"+v.get());
    			value[index]= v.get();
    		}
    		log.info("              ==");
    		log.info("         =======");
    		log.info("================");
    		cxt.write(new Text(value[2]+delimiter+value[0]+delimiter+value[1]),NullWritable.get());
    	}
    }
    

    这样设计的优点是,能够针对不同的输入数据採取不同的逻辑处理。并且不同的输入数据能够是序列文件的格式。

    以下介绍一种方式和上面的比。略有不足。可是能够借鉴。

    首先是Driver:

    package multiple.input;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    //import org.slf4j.Logger;
    //import org.slf4j.LoggerFactory;
    /**
     * input1(/multiple/user/user):
     * username,user_phone
     *  
     * input2(/multiple/phone/phone):
     *  user_phone,description 
     *  
     * output: username,user_phone,description
     * 
     * @author fansy
     *
     */
    public class MultipleDriver2 extends Configured implements Tool{
    //	private  Logger log = LoggerFactory.getLogger(MultipleDriver.class);
    	
    	private String input1=null;
    	private String input2=null;
    	private String output=null;
    	private String delimiter=null;
    	
    	public static void main(String[] args) throws Exception {
    		Configuration conf=new Configuration();
    //		conf.set("fs.defaultFS", "hdfs://node33:8020");  
    //        conf.set("mapreduce.framework.name", "yarn");  
    //        conf.set("yarn.resourcemanager.address", "node33:8032"); 
            
    		ToolRunner.run(conf, new MultipleDriver2(), args);
    	}
    
    	@Override
    	public int run(String[] arg0) throws Exception {
    		configureArgs(arg0);
    		checkArgs();
    		
    		Configuration conf= getConf();
    		conf.set("delimiter", delimiter);
    		 @SuppressWarnings("deprecation")
    		Job job = new Job(conf, "merge user and phone information ");
            job.setJarByClass(MultipleDriver2.class);
            job.setMapperClass(MultipleMapper.class);
            job.setReducerClass(MultipleReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlagStringDataType.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            
            job.setNumReduceTasks(1);
            FileInputFormat.addInputPath(job, new Path(input1));
            FileInputFormat.addInputPath(job, new Path(input2));
            FileOutputFormat.setOutputPath(job, new Path(output));
            
            int res = job.waitForCompletion(true) ? 0 : 1;
            return res;
    	}
    	
    
    	/**
    	 * check the args 
    	 */
    	private void checkArgs() {
    		if(input1==null||"".equals(input1)){
    			System.out.println("no user input...");
    			printUsage();
    			System.exit(-1);
    		}
    		if(input2==null||"".equals(input2)){
    			System.out.println("no phone input...");
    			printUsage();
    			System.exit(-1);
    		}
    		if(output==null||"".equals(output)){
    			System.out.println("no output...");
    			printUsage();
    			System.exit(-1);
    		}
    		if(delimiter==null||"".equals(delimiter)){
    			System.out.println("no delimiter...");
    			printUsage();
    			System.exit(-1);
    		}
    	
    	}
    
    	/**
    	 * configuration the args
    	 * @param args
    	 */
    	private void configureArgs(String[] args) {
        	for(int i=0;i<args.length;i++){
        		if("-i1".equals(args[i])){
        			input1=args[++i];
        		}
        		if("-i2".equals(args[i])){
        			input2=args[++i];
        		}
        		
        		if("-o".equals(args[i])){
        			output=args[++i];
        		}
        		
        		if("-delimiter".equals(args[i])){
        			delimiter=args[++i];
        		}
        		
        	}
    	}
    	public static void printUsage(){
        	System.err.println("Usage:");
        	System.err.println("-i1 input 	 user data path.");
        	System.err.println("-i2 input 	 phone data path.");
        	System.err.println("-o output 	 output data path.");
        	System.err.println("-delimiter  data delimiter , default is comma  .");
        }
    }
    
    

    这里加入路径直接使用FileInputFormat加入输入路径,这样的话,针对不同的输入数据的不同业务逻辑能够在mapper中先推断眼下正在处理的是那个数据。然后依据其路径来进行相应的业务逻辑处理:

    package multiple.input;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    /**
     * input1 :
     * username,phone
     * 
     * input2
     * phone,description
     * 
     * output:
     * <key,value>  --> <[phone],[0,username]>
     * <key,value>  --> <[phone],[1,description]>
     * @author fansy
     *
     */
    public class MultipleMapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{
    	
    	private String delimiter=null; // default is comma
    	private boolean flag=false;
    	@Override
    	public void setup(Context cxt){
    		delimiter= cxt.getConfiguration().get("delimiter", ",");
    		InputSplit input=cxt.getInputSplit();  
    	    String filename=((FileSplit) input).getPath().getParent().getName();
    	    if("user".equals(filename)){
    	    	flag=true;
    	    }
    	} 
    	
    	@Override
    	public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
    		String[] values= value.toString().split(delimiter);
    		if(values.length!=2){
    			return;
    		}
    		if(flag){
    			cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0]));
    		}else{
    			cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1]));
    		}
    	}
    }
    

    整体来说。这样的处理方式事实上是不如第一种的,在每一个map函数中都须要进行推断。比第一种多了非常多操作;同一时候。针对不同的序列文件,这样的方式处理不了(Key、value的类型不一样的情况下)。

    所以针对多文件格式的输入,不妨使用第一种方式。



    分享,成长,快乐

    转载请注明blog地址:http://blog.csdn.net/fansy1990




  • 相关阅读:
    TS流批量下载----------Xamarin开发系列
    TS视频流下载合成
    B站缓存视频音频合并
    ActiveX录音插件
    FZU-2191 完美的数字 简单数论
    COCI2011/2012 破解密文 KMP
    COCI2010/2011 踢踏舞 线段树区间子段和
    [USACO03FALL] 受欢迎的牛-G tarjan缩点
    洛谷 P1967 货车运输 最大生成树+倍增LCA
    洛谷 P1613 跑路 倍增+最短路
  • 原文地址:https://www.cnblogs.com/zsychanpin/p/7403569.html
Copyright © 2011-2022 走看看