zoukankan      html  css  js  c++  java
  • multipleOutputs Hadoop

    package org.lukey.hadoop.muloutput;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class TestMultipleOutput {
    
        static String baseOutputPath = "/user/hadoop/test_out";
    
        private static MultipleOutputs<Text, IntWritable> mos;
    
        // Mapper
        static class WordsOfClassCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
            private final static IntWritable one = new IntWritable(1);
            private Text className = new Text();
    
            @Override
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                FileSplit fileSplit = (FileSplit) context.getInputSplit();
    
                // 文件名
                String fileName = fileSplit.getPath().getName();
    
                // 文件夹名
                String dirName = fileSplit.getPath().getParent().getName();
    
                className.set(dirName + "/" + fileName);
    
                // Country:ABDBI 1
                mos.write(value, one, className.toString());
                // context.write(className, one);
    
            }
    
            @Override
            protected void cleanup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                mos.close();
            }
    
            @Override
            protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                mos = new MultipleOutputs<Text, IntWritable>(context);
            }
    
        }
    
        // Reducer
        static class WordsOfClassCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
            // result 表示每个文件里面单词个数
            IntWritable result = new IntWritable();
    
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values,
                    Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                            throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                int sum = 0;
                for (IntWritable value : values) {
                    sum += value.get();
                }
                result.set(sum);
    
                context.write(key, result);
            }
    
        }
    
        // Client
        public static void main(String[] args) throws Exception {
    
            Configuration conf = new Configuration();
    
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    
            if (otherArgs.length != 2) {
                System.out.println("Usage <in> <out>");
                System.exit(-1);
            }
    
            Job job = new Job(conf, "file count");
    
            job.setJarByClass(TestMultipleOutput.class);
    
            job.setMapperClass(WordsOfClassCountMapper.class);
            job.setReducerClass(WordsOfClassCountReducer.class);
    
            FileSystem fileSystem = FileSystem.get(conf);
    
            Path path = new Path(otherArgs[0]);
    
            FileStatus[] fileStatus = fileSystem.listStatus(path);
    
            for (FileStatus fs : fileStatus) {
                if (fs.isDir()) {
                    Path p = new Path(fs.getPath().toString());
                    FileInputFormat.addInputPath(job, p);
                }else{
                    FileInputFormat.addInputPath(job, fs.getPath());
                }
            }
    
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    
        }
    }
  • 相关阅读:
    Python CI中iOS项目自动打包运行
    Jquery 插件开发公开属性顺序的影响.
    MVC4使用SignalR出现$.connection is undefined错误备忘
    C语言运算符的优先级与结合性
    CF478C Table Decorations (贪心)
    LightOJ1370 Bishoe and Phishoe (欧拉函数+二分)
    经典排序:冒泡排序法与选择排序法
    STL初学
    博客园使用Markdown和公式
    为知笔记(Wiz)发布博客到博客园(cnblog)
  • 原文地址:https://www.cnblogs.com/luolizhi/p/4928976.html
Copyright © 2011-2022 走看看