zoukankan      html  css  js  c++  java
  • 简单实现CombineFileInputFormat

    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.Reducer.Context;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.ReflectionUtils;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class TestCombine extends Configured implements Tool {
    	private static class ProvinceMapper extends
    			Mapper<Object, Text, Text, Text> {
    		@Override
    		protected void map(Object key, Text value, Context context)
    				throws IOException, InterruptedException {
    			System.out.println("value : " + value + " Context " + context);
    			context.write(value, value);
    		}
    	}
    
    	private static class ProvinceReducer extends
    			Reducer<Text, Text, Text, Text> {
    		@Override
    		protected void reduce(Text key, Iterable<Text> values, Context context)
    				throws IOException, InterruptedException {
    			for (Text va : values) {
    			    System.out.println("reduce " + key);
    				context.write(key, key);
    			}
    		}
    	}
    	
    	public static class CombineSequenceFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {  
    	    @SuppressWarnings({ "unchecked", "rawtypes" })  
    	    @Override  
    	    public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {  
    	        return new CombineFileRecordReader((CombineFileSplit)split, context, CombineLineRecordReader.class);  
    	    }  
    	}  
    	
    	public static class CombineLineRecordReader<K, V> extends RecordReader<K, V> {  
    	    private CombineFileSplit split;  
    	    private TaskAttemptContext context;  
    	    private int index;  
    	    private RecordReader<K, V> rr;  
    	  
    	    @SuppressWarnings("unchecked")  
    	    public CombineLineRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException {  
    	        this.index = index;
    	        this.split = (CombineFileSplit) split;  
    	        this.context = context;  
    	  
    	        this.rr = (RecordReader<K, V>) ReflectionUtils.newInstance(LineRecordReader.class, context.getConfiguration());  
    	    }  
    	  
    	    @SuppressWarnings("unchecked")  
    	    @Override  
    	    public void initialize(InputSplit curSplit, TaskAttemptContext curContext) throws IOException, InterruptedException {  
    	        this.split = (CombineFileSplit) curSplit;  
    	        this.context = curContext;  
    	  
    	        if (null == rr) {  
    	            rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration());  
    	        }  
    	  
    	        FileSplit fileSplit = new FileSplit(this.split.getPath(index),  
    	                this.split.getOffset(index), this.split.getLength(index),  
    	                this.split.getLocations());  
    	          
    	        this.rr.initialize(fileSplit, this.context);  
    	    }  
    	  
    	    @Override  
    	    public float getProgress() throws IOException, InterruptedException {  
    	        return rr.getProgress();  
    	    }  
    	  
    	    @Override  
    	    public void close() throws IOException {  
    	        if (null != rr) {  
    	            rr.close();  
    	            rr = null;  
    	        }  
    	    }  
    	  
    	    @Override  
    	    public K getCurrentKey()  
    	    throws IOException, InterruptedException {  
    	        return rr.getCurrentKey();  
    	    }  
    	  
    	    @Override  
    	    public V getCurrentValue()  
    	    throws IOException, InterruptedException {  
    	        return rr.getCurrentValue();  
    	    }  
    	  
    	    @Override  
    	    public boolean nextKeyValue() throws IOException, InterruptedException {  
    	        return rr.nextKeyValue();  
    	    }  
    	}  
    
    	
    	public int run(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		
    		Job job = new Job(conf);
    		job.setJobName("TestCombine");
    		job.setJarByClass(TestCombine.class);
    
    		job.setMapperClass(ProvinceMapper.class);
    		job.setReducerClass(ProvinceReducer.class);
    		
    		job.setInputFormatClass(CombineSequenceFileInputFormat.class);
    		
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    		
    		String inpath = "/home/hadoop/tmp/combine";
    		String outpath = "/home/hadoop/tmp/combineout";
    		Path p = new Path(outpath);
    		
    		FileSystem fs = FileSystem.get(conf);
    		if (fs.exists(p)){
    			fs.delete(p);
    		}
    		FileInputFormat.addInputPaths(job, inpath);
    		FileOutputFormat.setOutputPath(job, p);
    
    		return job.waitForCompletion(true) ? 0 : 1;
    	} 
    
    	public static void main(String[] args) throws Exception {
    		int ret = ToolRunner.run(new TestCombine(), args);
    		System.exit(ret);
    	} 
    } 
    
  • 相关阅读:
    linux常用命令
    mysql 开发基础系列20 事务控制和锁定语句(上)
    sql server 性能调优之 资源等待 CXPACKET
    mysql 开发基础系列19 触发器
    mysql 开发基础系列18 存储过程和函数(下)
    mysql 开发基础系列17 存储过程和函数(上)
    sql server 性能调优之 资源等待PAGEIOLATCH
    mysql 开发基础系列16 视图
    mysql 开发基础系列15 索引的设计和使用
    sql server 性能调优之 当前用户请求分析 (1)
  • 原文地址:https://www.cnblogs.com/chengxin1982/p/3961259.html
Copyright © 2011-2022 走看看