zoukankan      html  css  js  c++  java
  • hadoop 多文件输出

    这两天在网上看了个MapReduce的多文件输出的帖子: http://blog.csdn.net/inkfish。写的不错。

    我试着完成了一下。也是分为三个文件:我这三个文件,跟原作者的稍有不同。其中有些类是我原来写的,我直接拷贝过来的,所以有点不同。

    My_LineRead.java

    1. public class My_LineRead<K, V> extends RecordWriter<K, V>{  
    2.         private static final String utf8 = "UTF-8";  
    •         private static final  String colon = "----";  //划分符号   
    •         private static final byte[] newline;  
    •         static {  
    •           try {  
    •             newline = "/n".getBytes(utf8);  
    •           } catch (UnsupportedEncodingException uee) {  
    •             throw new IllegalArgumentException("can't find " + utf8 + " encoding");  
    •           }  
    •         }  
    •         protected DataOutputStream out;  
    •         private final byte[] keyValueSeparator;  
    •           
    •         public My_LineRead(DataOutputStream out) {  
    •             this(out, colon); //调用下面的构造函数   
    •         }  
    •         public My_LineRead(DataOutputStream out, String keyValueSeparator) {  
    •             // TODO Auto-generated constructor stub   
    •             this.out = out;  
    •             try {  
    •                 this.keyValueSeparator = keyValueSeparator.getBytes(utf8);  
    •             } catch (UnsupportedEncodingException e) {  
    •                 // TODO Auto-generated catch block   
    •                 throw new IllegalArgumentException("can't find " + utf8 + " encoding");  
    •             }  
    •         }  
    •         @Override  
    •         public void close(TaskAttemptContext arg0) throws IOException,  
    •                 InterruptedException {  
    •             // TODO Auto-generated method stub   
    •             out.close();  
    •         }  
    •   
    •         @Override  
    •         public void write(K key, V value) throws IOException,  
    •                 InterruptedException {  
    •             if (!(key == null && key instanceof NullWritable)){  
    •                 //如果key不为空者输出key   
    •                 if ((Object)key instanceof Text){  
    •                     Text to = (Text) key;  
    •                     out.write(to.getBytes(), 0, to.getLength());  
    •                 }  
    •                 else  
    •                 {  
    •                     out.write(key.toString().getBytes(utf8));  
    •                 }  
    •                 out.write(keyValueSeparator);  
    •             }  
    •             if (!(value == null && value instanceof NullWritable)){  
    •                 //如果value不为空则输出value   
    •                 if ((Object)value instanceof Text){  
    •                     Text to = (Text) value;  
    •                     out.write(to.getBytes(), 0, to.getLength());  
    •                 }  
    •                 else  
    •                 {  
    •                     out.write(value.toString().getBytes(utf8));  
    •                 }  
    •                 out.write(newline);  
    •             }  
    •               
    •         }  
    •     }  

    MyMultipleOutputFormat.java //这个类,我添加了些注释便于理解

    1. public abstract class MyMultipleOutputFormat  <K extends WritableComparable<?>, V extends Writable>    
    2.         extends FileOutputFormat<K, V> {  
    3.     //接口类,需要在主程序中实现generateFileNameForKeyValue来获取文件名   
    •     private MultiRecordWriter writer = null;    
    •     @Override  
    •     public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)  
    •             throws IOException, InterruptedException {  
    •         // TODO Auto-generated method stub   
    •         //如果第一次调用那么writer=null   
    •         if (writer == null) {    
    •             //getTaskOutputPath获取output路径   
    •             writer = new MultiRecordWriter(job, getTaskOutputPath(job));    
    •         }    
    •         return writer;  
    •     }  
    •     private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {  
    •         Path workPath = null;  
    •         OutputCommitter committer = super.getOutputCommitter(conf);  
    •         if (committer instanceof FileOutputCommitter) {  
    •             workPath = ((FileOutputCommitter) committer).getWorkPath();  
    •         } else {  
    •             Path outputPath = super.getOutputPath(conf);  
    •             if (outputPath == null) {  
    •                 throw new IOException("Undefined job output-path");  
    •             }  
    •             workPath = outputPath;  
    •         }  
    •         return workPath;  
    •     }  
    •     /**通过key, value, conf来确定输出文件名(含扩展名)*/  
    •     //返回值就是文件名。可以根据key,value来判断   
    •     protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);  
    •       
    •     //MultiRecordWriter类   
    •     public class MultiRecordWriter extends RecordWriter<K, V> {  
    •         /**RecordWriter的缓存*/  
    •         private HashMap<String, RecordWriter<K, V>> recordWriters = null;  
    •         private TaskAttemptContext job = null;  
    •         /**输出目录*/  
    •         private Path workPath = null;  
    •         //构造函数   
    •         public MultiRecordWriter(TaskAttemptContext job, Path workPath) {  
    •             super();  
    •             this.job = job;  
    •             this.workPath = workPath;  
    •             recordWriters = new HashMap<String, RecordWriter<K, V>>();  
    •         }  
    •         //关闭,应该可能是多个文件进行关闭,所有采用循环   
    •         //recordWriters.values() 就是指的getBaseRecordWriter返回的值。   
    •         @Override  
    •         public void close(TaskAttemptContext context) throws IOException, InterruptedException {  
    •             Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();  
    •             while (values.hasNext()) {  
    •                 values.next().close(context);  
    •             }  
    •             this.recordWriters.clear();  
    •         }  
    •         @Override  
    •         public void write(K key, V value) throws IOException, InterruptedException {  
    •             //得到输出文件名   
    •             String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());  
    •             //如果recordWriters里没有文件名,那么就建立。否则就直接写值。   
    •             RecordWriter<K, V> rw = this.recordWriters.get(baseName);  
    •             if (rw == null) {  
    •                 rw = getBaseRecordWriter(job, baseName);  
    •                 //放入HashMap   
    •                 this.recordWriters.put(baseName, rw);  
    •             }  
    •             rw.write(key, value);  
    •         }  
    •         // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}   
    •         private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName)  
    •                 throws IOException, InterruptedException {  
    •             //获取配置文件   
    •             Configuration conf = job.getConfiguration();  
    •             //查看是否使用解码器   
    •             boolean isCompressed = getCompressOutput(job);  
    •             String keyValueSeparator = ",";  
    •             RecordWriter<K, V> recordWriter = null;  
    •             if (isCompressed) {  
    •                 Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,  
    •                         GzipCodec.class);  
    •                 CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);  
    •                 Path file = new Path(workPath, baseName + codec.getDefaultExtension());  
    •                 FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);  
    •                 recordWriter = new My_LineRead<K, V>(new DataOutputStream(codec  
    •                         .createOutputStream(fileOut)), keyValueSeparator);  
    •             }  
    •             //如果不使用解码器   
    •             else {  
    •                 Path file = new Path(workPath, baseName);  
    •                 FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);  
    •                 //recordWriter = new My_LineRead<K, V>(fileOut, keyValueSeparator);   
    •                 //这里我使用的我自己的OutputFormat   
    •                 recordWriter = new My_LineRead<K, V>(fileOut);  
    •             }  
    •             return recordWriter;  
    •         }  
    •     }  
    • }  

    最后就是测试类,WordCount_MulFileOut.java

    1. public class WordCount_MulFileOut {  
    2.     public static  class wordcountMapper extends  
    •         Mapper<LongWritable, Text, Text, IntWritable>{  
    •         private final static IntWritable one = new IntWritable(1);  
    •         private Text word = new Text();  
    •         public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{  
    •             String line = value.toString();  
    •             StringTokenizer itr = new StringTokenizer(line);  
    •             while(itr.hasMoreElements()){  
    •                 word.set(itr.nextToken());  
    •                 context.write(word, one);  
    •             }  
    •         }  
    •     }  
    •     public static  class wordcountReduce extends  
    •         Reducer<Text, IntWritable, Text, IntWritable>{  
    •         public void reduce(Text key, Iterable<IntWritable>values, Context context)throws IOException, InterruptedException{  
    •             int sum = 0;  
    •             for (IntWritable str : values){  
    •                 sum += str.get();  
    •             }  
    •             context.write(key, new IntWritable(sum));  
    •         }  
    •     }  
    •     public static class MyMultiple extends MyMultipleOutputFormat{  
    •   
    •         @Override  
    •         protected String generateFileNameForKeyValue(WritableComparable key,  
    •                 Writable value, Configuration conf) {  
    •             // TODO Auto-generated method stub   
    •             return "other.txt";  
    •         }  
    •           
    •     }  
    •     public static  void main(String args[])throws Exception{  
    •           
    •         Configuration conf = new Configuration();  
    •   
    •         Job job = new Job(conf, "wordcount");  
    •           
    •         job.setJarByClass(WordCount_MulFileOut.class);  
    •           
    •         job.setInputFormatClass(TextInputFormat.class);  
    •           
    •         job.setOutputFormatClass(MyMultiple.class);  
    •         job.setOutputKeyClass(Text.class);  
    •         job.setOutputValueClass(IntWritable.class);  
    •           
    •         job.setMapperClass(wordcountMapper.class);  
    •         job.setReducerClass(wordcountReduce.class);  
    •         job.setCombinerClass(wordcountReduce.class);  
    •           
    •         FileInputFormat.setInputPaths(job, new Path(args[1]));  
    •         FileOutputFormat.setOutputPath(job, new Path(args[2]));  
    •           
    •         job.waitForCompletion(true);  
    •     }  
  • 相关阅读:
    echarts 实时获取数据
    js 对象与数组相互转化的快捷方法 Object.keys()、Object.values()、Object.entries()
    koa2 中使用 svg-captcha 生成验证码
    分享面试资料包
    8位单片机中一个容易被忽视的溢出问题
    献给半夜加班到深夜的女程序员
    java调用WebService接口方法
    算法小记:快速排序
    STL源码剖析 迭代器(iterator)概念与编程技法(三)
    [置顶] 蓝牙基础知识进阶——Physical channel
  • 原文地址:https://www.cnblogs.com/itgg168/p/2790974.html
Copyright © 2011-2022 走看看