zoukankan      html  css  js  c++  java
  • 【Hadoop】Hadoop MR 自定义排序

    1、概念

    2、代码示例

    FlowSort

    package com.ares.hadoop.mr.flowsort;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.StringUtils;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.apache.log4j.Logger;
    
    import com.ares.hadoop.mr.exception.LineException;
    
    public class FlowSort extends Configured implements Tool {
        private static final Logger LOGGER = Logger.getLogger(FlowSort.class);
        enum Counter {
            LINESKIP
        }
        
        public static class FlowSortMapper extends Mapper<LongWritable, Text, 
            FlowBean, NullWritable> {
            private String line;
            private int length;
            private final static char separator = '	';
            
            private String phoneNum;
            private long upFlow;
            private long downFlow;
            private long sumFlow;
            
            private FlowBean flowBean = new FlowBean();
            private NullWritable nullWritable = NullWritable.get();
            
            @Override
            protected void map(
                    LongWritable key,
                    Text value,
                    Mapper<LongWritable, Text, FlowBean, NullWritable>.Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                //super.map(key, value, context);
                String errMsg;
                try {
                    line = value.toString();
                    String[] fields = StringUtils.split(line, separator);
                    length = fields.length;
                    if (length != 4) {
                        throw new LineException(key.get() + ", " + line + " LENGTH INVALID, IGNORE...");
                    }
                    
                    phoneNum = fields[0];
                    upFlow = Long.parseLong(fields[1]);
                    downFlow = Long.parseLong(fields[2]);
                    sumFlow = Long.parseLong(fields[3]);
                    
                    flowBean.setPhoneNum(phoneNum);
                    flowBean.setUpFlow(upFlow);
                    flowBean.setDownFlow(downFlow);
                    flowBean.setSumFlow(sumFlow);
                    
                    context.write(flowBean, nullWritable);
                } catch (LineException e) {
                    // TODO: handle exception
                    LOGGER.error(e);
                    System.out.println(e);
                    context.getCounter(Counter.LINESKIP).increment(1);
                    return;
                } catch (NumberFormatException e) {
                    // TODO: handle exception
                    errMsg = key.get() + ", " + line + " FLOW DATA INVALID, IGNORE...";
                    LOGGER.error(errMsg);
                    System.out.println(errMsg);
                    context.getCounter(Counter.LINESKIP).increment(1);
                    return;
                } catch (Exception e) {
                    // TODO: handle exception
                    LOGGER.error(e);
                    System.out.println(e);
                    context.getCounter(Counter.LINESKIP).increment(1);
                    return;
                }            
            }
        }
        
        public static class FlowSortReducer extends Reducer<FlowBean, NullWritable, 
            FlowBean, NullWritable> {
            @Override
            protected void reduce(
                    FlowBean key,
                    Iterable<NullWritable> values,
                    Reducer<FlowBean, NullWritable, FlowBean, NullWritable>.Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                //super.reduce(arg0, arg1, arg2);
                context.write(key, NullWritable.get());
            }
        }
    
        @Override
        public int run(String[] args) throws Exception {
            // TODO Auto-generated method stub
            String errMsg = "FlowSort: TEST STARTED...";
            LOGGER.debug(errMsg);
            System.out.println(errMsg);
            
            Configuration conf = new Configuration();
            //FOR Eclipse JVM Debug  
            //conf.set("mapreduce.job.jar", "flowsum.jar");
            Job job = Job.getInstance(conf);
            
            // JOB NAME
            job.setJobName("FlowSort");
            
            // JOB MAPPER & REDUCER
            job.setJarByClass(FlowSort.class);
            job.setMapperClass(FlowSortMapper.class);
            job.setReducerClass(FlowSortReducer.class);
            
            // MAP & REDUCE
            job.setOutputKeyClass(FlowBean.class);
            job.setOutputValueClass(NullWritable.class);
            // MAP
            job.setMapOutputKeyClass(FlowBean.class);
            job.setMapOutputValueClass(NullWritable.class);
            
            // JOB INPUT & OUTPUT PATH
            //FileInputFormat.addInputPath(job, new Path(args[0]));
            FileInputFormat.setInputPaths(job, args[1]);
            FileOutputFormat.setOutputPath(job, new Path(args[2]));
            
            // VERBOSE OUTPUT
            if (job.waitForCompletion(true)) {
                errMsg = "FlowSort: TEST SUCCESSFULLY...";
                LOGGER.debug(errMsg);
                System.out.println(errMsg);
                return 0;
            } else {
                errMsg = "FlowSort: TEST FAILED...";
                LOGGER.debug(errMsg);
                System.out.println(errMsg);
                return 1;
            }            
            
        }
        
        public static void main(String[] args) throws Exception {
            if (args.length != 3) {
                String errMsg = "FlowSort: ARGUMENTS ERROR";
                LOGGER.error(errMsg);
                System.out.println(errMsg);
                System.exit(-1);
            }
            
            int result = ToolRunner.run(new Configuration(), new FlowSort(), args);
            System.exit(result);
        }
    }

    FlowBean

    package com.ares.hadoop.mr.flowsort;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    public class FlowBean implements WritableComparable<FlowBean>{
        private String phoneNum;
        private long upFlow;
        private long downFlow;
        private long sumFlow;
        
        public FlowBean() {
            // TODO Auto-generated constructor stub
        }    
    //    public FlowBean(String phoneNum, long upFlow, long downFlow, long sumFlow) {
    //        super();
    //        this.phoneNum = phoneNum;
    //        this.upFlow = upFlow;
    //        this.downFlow = downFlow;
    //        this.sumFlow = sumFlow;
    //    }
    
    
        public String getPhoneNum() {
            return phoneNum;
        }
    
        public void setPhoneNum(String phoneNum) {
            this.phoneNum = phoneNum;
        }
    
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(long downFlow) {
            this.downFlow = downFlow;
        }
    
        public long getSumFlow() {
            return sumFlow;
        }
    
        public void setSumFlow(long sumFlow) {
            this.sumFlow = sumFlow;
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            // TODO Auto-generated method stub
            phoneNum = in.readUTF();
            upFlow = in.readLong();
            downFlow = in.readLong();
            sumFlow = in.readLong();
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            // TODO Auto-generated method stub
            out.writeUTF(phoneNum);
            out.writeLong(upFlow);
            out.writeLong(downFlow);
            out.writeLong(sumFlow);
        }
    
        @Override
        public String toString() {
            return "" + phoneNum + "	" + upFlow + "	" + downFlow + "	" + sumFlow;
        }
    
        @Override
        public int compareTo(FlowBean o) {
            // TODO Auto-generated method stub
            return sumFlow>o.getSumFlow()?-1:1;
        }
        
    }

    LineException

    package com.ares.hadoop.mr.exception;
    
    public class LineException extends RuntimeException {
        private static final long serialVersionUID = 2536144005398058435L;
        
        public LineException() {
            super();
            // TODO Auto-generated constructor stub
        }
    
        public LineException(String message, Throwable cause) {
            super(message, cause);
            // TODO Auto-generated constructor stub
        }
    
        public LineException(String message) {
            super(message);
            // TODO Auto-generated constructor stub
        }
    
        public LineException(Throwable cause) {
            super(cause);
            // TODO Auto-generated constructor stub
        }
    }
  • 相关阅读:
    Codeforces 877 C. Slava and tanks
    Codeforces 877 D. Olya and Energy Drinks
    2017 10.25 NOIP模拟赛
    2017 国庆湖南 Day1
    UVA 12113 Overlapping Squares
    学大伟业 国庆Day2
    51nod 1629 B君的圆锥
    51nod 1381 硬币游戏
    [JSOI2010]满汉全席
    学大伟业 2017 国庆 Day1
  • 原文地址:https://www.cnblogs.com/junneyang/p/5848770.html
Copyright © 2011-2022 走看看