zoukankan      html  css  js  c++  java
  • hadoop MultipleOutputs

    MultipleOutputs: 

      write data to multiple files with customized name, can be used for both map and reduce phase.

    http://www.lichun.cc/blog/2013/11/how-to-use-hadoop-multipleoutputs/

    public static class MyMap extends
                Mapper<LongWritable, Text, Text, DoubleWritable> {
            MultipleOutputs<Text, DoubleWritable> mos;
    
            public void map(LongWritable inKey, Text inValue, Context context)
                    throws IOException, InterruptedException {
    
                mos.write(map_out_file, NullWritable.get(), new Text(name));
    
            }
    
            @Override
            public void setup(Context context) {
                mos = new MultipleOutputs<Text, DoubleWritable>(context);
            }
    
            @Override
            protected void cleanup(Context context) throws IOException,
                    InterruptedException {
                mos.close();
            }
    
        }

    example

    package a5p2;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.RawComparator;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    public class ClassAvg2 {
        public static final String map_out_file = "mapOutFileIndividualStudentAverage";
        public static final String reduce_out_file = "reduceOutFileClassAverage";
    
        public static class AvgMap extends
                Mapper<LongWritable, Text, Text, DoubleWritable> {
            MultipleOutputs<Text, DoubleWritable> mos;
    
            public void map(LongWritable inKey, Text inValue, Context context)
                    throws IOException, InterruptedException {
    
                String line = inValue.toString();
                StringTokenizer myToken = new StringTokenizer(line);
                String name = myToken.nextToken();
                int cnt = 0;
                double sum = 0;
                double avg;
                while (myToken.hasMoreTokens()) {
                    sum += Float.parseFloat(myToken.nextToken());
                    cnt++;
                }
                avg = sum / cnt;
                context.write(new Text(name), new DoubleWritable(avg));
                mos.write(map_out_file, NullWritable.get(), new Text(name + " "
                        + avg));
    
            }
    
            @Override
            public void setup(Context context) {
                mos = new MultipleOutputs<Text, DoubleWritable>(context);
            }
    
            @Override
            protected void cleanup(Context context) throws IOException,
                    InterruptedException {
                mos.close();
            }
    
        }
    
        public static class AvgReduce extends
                Reducer<Text, DoubleWritable, Text, DoubleWritable> {
            MultipleOutputs<Text, DoubleWritable> mos;
    
            public void reduce(Text key, Iterable<DoubleWritable> inValues,
                    Context context) throws IOException, InterruptedException {
    
                double classSum = 0;
                int cnt = 0;
                for (DoubleWritable dw : inValues) {
                    classSum += dw.get();
                    cnt++;
                }
                double classAvg = classSum / cnt;
                mos.write(reduce_out_file, NullWritable.get(), new Text(
                        "Class average: " + classAvg));
                // context.write(new Text("class average"), new DoubleWritable(
                // classAvg));
    
            }
    
            @Override
            public void setup(Context context) {
                mos = new MultipleOutputs<Text, DoubleWritable>(context);
            }
    
            @Override
            protected void cleanup(Context context) throws IOException,
                    InterruptedException {
                mos.close();
            }
    
        }
    
        public static class AvgGroupComparator implements RawComparator<Text> {
    
            public int compare(Text t1, Text t2) {
                return 0;
            }
    
            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
                return 0;
            }
        }
    
        public static void main(String[] args) throws IOException,
                ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            Job job = new Job(conf, "class avg");
            job.setJarByClass(ClassAvg2.class);
    
            // mapper
            job.setMapperClass(AvgMap.class);
            job.setGroupingComparatorClass(AvgGroupComparator.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(DoubleWritable.class);
    
            // reducer
            job.setReducerClass(AvgReduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DoubleWritable.class);
    
            // input and output
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            MultipleOutputs.addNamedOutput(job, map_out_file,
                    TextOutputFormat.class, NullWritable.class, Text.class);
            MultipleOutputs.addNamedOutput(job, reduce_out_file,
                    TextOutputFormat.class, NullWritable.class, Text.class);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    
        }
    
    }
  • 相关阅读:
    Unity3D写雷电游戏修改飞机尾部火焰
    随机生成路径(二)
    Unity3D写雷电游戏(四)
    maven package,clean,install,compile命令
    asp.net 有什么框架,有什么技术
    牛腩购物29:用户中心订单页面制作,com+事务的运用(Transactions/TransactionScope)
    牛腩购物网30:用户中心其他功能制作(获取购物的总金额,判断用户是 普通会员还是VIP会员,用户申请VIP)
    asp.net 事务的处理,dts 的设置,asp.net三种事务处理方法,三层架构,微软企业库,动软生成器生成的代码下如何使用事务
    牛腩购物网28:购物车中商品转换为订单,asp.net 页面间传值,asp.net 事务,ToString("D5")填充到5位数,同时插入订单表和订单详情表
    hdu1016 prime ring problem
  • 原文地址:https://www.cnblogs.com/phoenix13suns/p/4470529.html
Copyright © 2011-2022 走看看