zoukankan      html  css  js  c++  java
  • hadoop2.2.0 MapReduce求和并排序

    javabean必须实现WritableComparable接口,并实现该接口的序列化,反序列话和比较方法

    package com.my.hadoop.mapreduce.sort;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    import org.apache.hadoop.io.WritableComparable;

    public class InfoBean implements WritableComparable<InfoBean> {
        
        private String account;
        private double income;
        private double expences;
        private double surplus;

        public void set(String account, double income, double expences){
            this.account = account;
            this.income = income;
            this.expences = expences;
            this.surplus = income - expences;
        }
        
        @Override
        public String toString() {
            return income+" "+expences+" "+surplus;
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            this.account = in.readUTF();
            this.income = in.readDouble();
            this.expences = in.readDouble();
            this.surplus = in.readDouble();
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(this.account);
            out.writeDouble(this.income);
            out.writeDouble(this.expences);
            out.writeDouble(this.surplus);
        }

        @Override
        public int compareTo(InfoBean o) {
            if (this.income == o.getIncome()) {
                return this.expences > o.getExpences() ? 1 : -1;
            } else {
                return this.income > o.getIncome() ? -1 : 1;
            }
        }

        public String getAccount() {
            return account;
        }

        public void setAccount(String account) {
            this.account = account;
        }

        public double getIncome() {
            return income;
        }

        public void setIncome(double income) {
            this.income = income;
        }

        public double getExpences() {
            return expences;
        }

        public void setExpences(double expences) {
            this.expences = expences;
        }

        public double getSurplus() {
            return surplus;
        }

        public void setSurplus(double surplus) {
            this.surplus = surplus;
        }
        

    }

    先求和

    package com.my.hadoop.mapreduce.sort;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class SumStep {

        public static class SumMap extends Mapper<LongWritable, Text, Text, InfoBean>{
            private Text k = new Text();
            private InfoBean v = new InfoBean();
            
            @Override
            public void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
                String[] fields = value.toString().split(" ");
                String account = fields[0];
                double in = Double.parseDouble(fields[1]);
                double out = Double.parseDouble(fields[2]);
                k.set(account);
                v.set(account, in, out);
                context.write(k, v);
            }
        }
        
        public static class SumReduce extends Reducer<Text, InfoBean, Text, InfoBean>{
            private InfoBean v = new InfoBean();
            
            @Override
            public void reduce(Text key, Iterable<InfoBean> value, Context context) throws java.io.IOException ,InterruptedException {
                double in_sum = 0;
                double out_sum = 0;
                for (InfoBean bean : value) {
                    in_sum += bean.getIncome();
                    out_sum += bean.getExpences();
                }
                v.set("", in_sum, out_sum);
                context.write(key, v);
            }
        }
        
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, SumStep.class.getSimpleName());
            job.setJarByClass(SumStep.class);
            
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            job.setMapperClass(SumMap.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(InfoBean.class);
            
            job.setReducerClass(SumReduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(InfoBean.class);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            System.exit(job.waitForCompletion(true) ? 0 :1);
            
        }

    }

    后排序

    package com.my.hadoop.mapreduce.sort;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class SortStep {

        public static class SortMap extends Mapper<LongWritable, Text, InfoBean, NullWritable>{
            private InfoBean k = new InfoBean();
            
            @Override
            public void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
                System.out.println("===="+value.toString()+"====");
                String[] fields = value.toString().split(" ");
                String account = fields[0];
                double in = Double.parseDouble(fields[1]);
                double out = Double.parseDouble(fields[2]);
                k.set(account, in, out);
                context.write(k, NullWritable.get());
            }
        }
        
        public static class SortReduce extends Reducer<InfoBean, NullWritable, Text, InfoBean>{
            private Text k = new Text();
            
            @Override
            public void reduce(InfoBean bean, Iterable<NullWritable> value, Context context) throws java.io.IOException ,InterruptedException {
                k.set(bean.getAccount());
                context.write(k, bean);
            }
        }
        
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, SortStep.class.getSimpleName());
            job.setJarByClass(SortStep.class);
            
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            job.setMapperClass(SortMap.class);
            job.setMapOutputKeyClass(InfoBean.class);
            job.setMapOutputValueClass(NullWritable.class);
            
            job.setReducerClass(SortReduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(InfoBean.class);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            System.exit(job.waitForCompletion(true) ? 0 :1);        
            
        }

    }

  • 相关阅读:
    tomcat---基本知识点解读;配置文件详解
    nginx常用配置参数介绍-upstream
    nginx配置文件-详解
    nginx简单介绍
    GCC编译已经引入math.h文件提示cos,sin等函数未定义
    进程与进程间通信(3)
    进程与进程间通信(1)
    进程与进程间通信(2)
    primer_c++_迭代器
    primer_C++_3.5 数组
  • 原文地址:https://www.cnblogs.com/mengyao/p/4151509.html
Copyright © 2011-2022 走看看