zoukankan      html  css  js  c++  java
  • MR中简单实现自定义的输入输出格式

    import java.io.DataOutput;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.Reducer.Context;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.ReflectionUtils;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class TestCombine extends Configured implements Tool {
    	private static class ProvinceMapper extends
    			Mapper<Object, Text, Text, Text> {
    		@Override
    		protected void map(Object key, Text value, Context context)
    				throws IOException, InterruptedException {
    			System.out.println("value : " + value + " Context " + context);
    			context.write(value, value);
    		}
    	}
    
    	private static class ProvinceReducer extends
    			Reducer<Text, Text, Text, Text> {
    		@Override
    		protected void reduce(Text key, Iterable<Text> values, Context context)
    				throws IOException, InterruptedException {
    			for (Text va : values) {
    			    System.out.println("reduce " + key);
    				context.write(key, key);
    			}
    		}
    	}
    	
    	 // 输入格式
         static class CombineSequenceFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {  
    	    @SuppressWarnings({ "unchecked", "rawtypes" })  
    	    @Override  
    	    public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {  
    	        return new CombineFileRecordReader((CombineFileSplit)split, context, CombineLineRecordReader.class);  
    	    }  
    	}  
    	
    	 static class CombineLineRecordReader<K, V> extends RecordReader<K, V> {  
    	    private CombineFileSplit split;  
    	    private TaskAttemptContext context;  
    	    private int index;  
    	    private RecordReader<K, V> rr;  
    	  
    	    @SuppressWarnings("unchecked")  
    	    public CombineLineRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException {  
    	        this.index = index;
    	        this.split = (CombineFileSplit) split;  
    	        this.context = context;  
    	  
    	        this.rr = (RecordReader<K, V>) ReflectionUtils.newInstance(LineRecordReader.class, context.getConfiguration());  
    	    }  
    	  
    	    @SuppressWarnings("unchecked")  
    	    @Override  
    	    public void initialize(InputSplit curSplit, TaskAttemptContext curContext) throws IOException, InterruptedException {  
    	        this.split = (CombineFileSplit) curSplit;  
    	        this.context = curContext;  
    	  
    	        if (null == rr) {  
    	            rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration());  
    	        }  
    	  
    	        FileSplit fileSplit = new FileSplit(this.split.getPath(index),  
    	                this.split.getOffset(index), this.split.getLength(index),  
    	                this.split.getLocations());  
    	          
    	        this.rr.initialize(fileSplit, this.context);  
    	    }  
    	  
    	    @Override  
    	    public float getProgress() throws IOException, InterruptedException {  
    	        return rr.getProgress();  
    	    }  
    	  
    	    @Override  
    	    public void close() throws IOException {  
    	        if (null != rr) {  
    	            rr.close();  
    	            rr = null;  
    	        }  
    	    }  
    	  
    	    @Override  
    	    public K getCurrentKey()  
    	    throws IOException, InterruptedException {  
    	        return rr.getCurrentKey();  
    	    }  
    	  
    	    @Override  
    	    public V getCurrentValue()  
    	    throws IOException, InterruptedException {  
    	        return rr.getCurrentValue();  
    	    }  
    	  
    	    @Override  
    	    public boolean nextKeyValue() throws IOException, InterruptedException {  
    	        return rr.nextKeyValue();  
    	    }  
    	}  
    	
    	// 输出格式
    	 static class MyOutputFormat extends FileOutputFormat<Text, Text>{
    		@Override
    		public RecordWriter<Text, Text> getRecordWriter(
    				TaskAttemptContext job) throws IOException, InterruptedException {
    			return new MyRecordWriter(job);
    		}
    	}
    
    	  public static class  MyRecordWriter extends RecordWriter<Text, Text> {		
    		private Map<String, FSDataOutputStream> outputMap = null;
    		private static final String LINESEPARATOR = "
    ";
    		private FileSystem fs;
    		private JobContext job;
    		
    		public MyRecordWriter(JobContext job) throws IOException {
    			this.outputMap = new HashMap<String, FSDataOutputStream>();
    			this.job = job;
    			this.fs = FileSystem.get(job.getConfiguration());
    		}
    		
    		// 参考 MultipleOutputs
    		public void write(Text key, Text value) throws IOException {
    			String k = key.toString();
    			if(k.isEmpty())
    				return;
    			FSDataOutputStream out = outputMap.get(k);
    			if(out==null) {
    				if(k.isEmpty())
    					System.out.println(value.toString());
    				Path outputPath = new Path(FileOutputFormat.getOutputPath(job), k);
    				if(!fs.exists(outputPath))
    					out = fs.create(outputPath);
    				else
    					return;
    				outputMap.put(k, out);
    			}
    			out.write(value.getBytes());
    			out.write(LINESEPARATOR.getBytes());
    		}
    
    		@Override
    		public void close(TaskAttemptContext context) throws IOException,
    				InterruptedException {
    			for(FSDataOutputStream out : outputMap.values()) {
    				out.close();
    			}
    		}
    	}
    
    	
    	public int run(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		
    		Job job = new Job(conf);
    		job.setJobName("TestCombine");
    		job.setJarByClass(TestCombine.class);
    
    		job.setMapperClass(ProvinceMapper.class);
    		job.setReducerClass(ProvinceReducer.class);
    		
    		//job.setInputFormatClass(CombineSequenceFileInputFormat.class);
    		job.setOutputFormatClass(MyOutputFormat.class);
    		
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    		
    		String inpath = "/home/hadoop/tmp/combine";
    		String outpath = "/home/hadoop/tmp/combineout";
    		Path p = new Path(outpath);
    		
    		FileSystem fs = FileSystem.get(conf);
    		if (fs.exists(p)){
    			fs.delete(p);
    		}
    		FileInputFormat.addInputPaths(job, inpath);
    		FileOutputFormat.setOutputPath(job, p);
    
    		return job.waitForCompletion(true) ? 0 : 1;
    	} 
    
    	public static void main(String[] args) throws Exception {
    		int ret = ToolRunner.run(new TestCombine(), args);
    		System.exit(ret);
    	} 
    } 
    
  • 相关阅读:
    Checking Types Against the Real World in TypeScript
    nexus pip proxy config
    go.rice 强大灵活的golang 静态资源嵌入包
    几个golang 静态资源嵌入包
    rpm 子包创建学习
    Rpm Creating Subpackages
    ava 类似jest snapshot 功能试用
    ava js 测试框架基本试用
    The Architectural Principles Behind Vrbo’s GraphQL Implementation
    graphql-compose graphql schema 生成工具集
  • 原文地址:https://www.cnblogs.com/chengxin1982/p/3961679.html
Copyright © 2011-2022 走看看