zoukankan      html  css  js  c++  java
  • hadoop MultipleOutputs

    MultipleOutputs: 

      write data to multiple files with customized name, can be used for both map and reduce phase.

    http://www.lichun.cc/blog/2013/11/how-to-use-hadoop-multipleoutputs/

    public static class MyMap extends
                Mapper<LongWritable, Text, Text, DoubleWritable> {
            MultipleOutputs<Text, DoubleWritable> mos;
    
            public void map(LongWritable inKey, Text inValue, Context context)
                    throws IOException, InterruptedException {
    
                mos.write(map_out_file, NullWritable.get(), new Text(name));
    
            }
    
            @Override
            public void setup(Context context) {
                mos = new MultipleOutputs<Text, DoubleWritable>(context);
            }
    
            @Override
            protected void cleanup(Context context) throws IOException,
                    InterruptedException {
                mos.close();
            }
    
        }

    example

    package a5p2;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.RawComparator;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    public class ClassAvg2 {
        public static final String map_out_file = "mapOutFileIndividualStudentAverage";
        public static final String reduce_out_file = "reduceOutFileClassAverage";
    
        public static class AvgMap extends
                Mapper<LongWritable, Text, Text, DoubleWritable> {
            MultipleOutputs<Text, DoubleWritable> mos;
    
            public void map(LongWritable inKey, Text inValue, Context context)
                    throws IOException, InterruptedException {
    
                String line = inValue.toString();
                StringTokenizer myToken = new StringTokenizer(line);
                String name = myToken.nextToken();
                int cnt = 0;
                double sum = 0;
                double avg;
                while (myToken.hasMoreTokens()) {
                    sum += Float.parseFloat(myToken.nextToken());
                    cnt++;
                }
                avg = sum / cnt;
                context.write(new Text(name), new DoubleWritable(avg));
                mos.write(map_out_file, NullWritable.get(), new Text(name + " "
                        + avg));
    
            }
    
            @Override
            public void setup(Context context) {
                mos = new MultipleOutputs<Text, DoubleWritable>(context);
            }
    
            @Override
            protected void cleanup(Context context) throws IOException,
                    InterruptedException {
                mos.close();
            }
    
        }
    
        public static class AvgReduce extends
                Reducer<Text, DoubleWritable, Text, DoubleWritable> {
            MultipleOutputs<Text, DoubleWritable> mos;
    
            public void reduce(Text key, Iterable<DoubleWritable> inValues,
                    Context context) throws IOException, InterruptedException {
    
                double classSum = 0;
                int cnt = 0;
                for (DoubleWritable dw : inValues) {
                    classSum += dw.get();
                    cnt++;
                }
                double classAvg = classSum / cnt;
                mos.write(reduce_out_file, NullWritable.get(), new Text(
                        "Class average: " + classAvg));
                // context.write(new Text("class average"), new DoubleWritable(
                // classAvg));
    
            }
    
            @Override
            public void setup(Context context) {
                mos = new MultipleOutputs<Text, DoubleWritable>(context);
            }
    
            @Override
            protected void cleanup(Context context) throws IOException,
                    InterruptedException {
                mos.close();
            }
    
        }
    
        public static class AvgGroupComparator implements RawComparator<Text> {
    
            public int compare(Text t1, Text t2) {
                return 0;
            }
    
            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
                return 0;
            }
        }
    
        public static void main(String[] args) throws IOException,
                ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            Job job = new Job(conf, "class avg");
            job.setJarByClass(ClassAvg2.class);
    
            // mapper
            job.setMapperClass(AvgMap.class);
            job.setGroupingComparatorClass(AvgGroupComparator.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(DoubleWritable.class);
    
            // reducer
            job.setReducerClass(AvgReduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DoubleWritable.class);
    
            // input and output
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            MultipleOutputs.addNamedOutput(job, map_out_file,
                    TextOutputFormat.class, NullWritable.class, Text.class);
            MultipleOutputs.addNamedOutput(job, reduce_out_file,
                    TextOutputFormat.class, NullWritable.class, Text.class);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    
        }
    
    }
  • 相关阅读:
    【Java】IO流--文件夹的复制
    【Java】IO流--序列化与反序列化
    【Java】Java中的数组是对象吗?(转载)
    【Java】Java漫谈-数组(转载)
    【Java】IO流--对象流--ObjectInputStream、ObjectOutputStream
    【Java】IO流--数据流--DataInputStream、DataOutputStream
    【Java】IO流--打印流--PrintStream
    【Java】IO流--转换流--InpuStreamReader、OutputStreamWriter
    CentOS快速安装Nginx的方法,nginx如何启动重启停止
    Mac查看端口号是否被占用及释放
  • 原文地址:https://www.cnblogs.com/phoenix13suns/p/4470529.html
Copyright © 2011-2022 走看看