zoukankan      html  css  js  c++  java
  • 【Hadoop】Hadoop MR 自定义序列化类

    1、基本概念

     

    2、Mapper代码

    package com.ares.hadoop.mr.flowsum;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.util.StringUtils;
    import org.apache.log4j.Logger;
    
    import com.ares.hadoop.mr.wordcount.MRTest;
    
    //Long, String, String, Long --> LongWritable, Text, Text, LongWritable
    public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
        private static final Logger LOGGER = Logger.getLogger(MRTest.class);
        
        private String line;
        private int length;
        private final static char separator = '	';
        
        private String phoneNum;
        private long upFlow;
        private long downFlow;
        //private long sumFlow;
        
        private Text text = new Text();
        private FlowBean flowBean = new FlowBean();
        
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, FlowBean>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            //super.map(key, value, context);
            line = value.toString();
            String[] fields = StringUtils.split(line, separator);
            length = fields.length;
            if (length != 11) {
                LOGGER.error(key.get() + ", " + line + " LENGTH INVALID, IGNORE...");
            }
            
            phoneNum = fields[1];
            try {
                upFlow = Long.parseLong(fields[length-3]);
                downFlow = Long.parseLong(fields[length-2]);
                //sumFlow = upFlow + downFlow;
            } catch (Exception e) {
                // TODO: handle exception
                LOGGER.error(key.get() + ", " + line + " FLOW DATA INVALID, IGNORE...");
            }
            
            flowBean.setPhoneNum(phoneNum);
            flowBean.setUpFlow(upFlow);
            flowBean.setDownFlow(downFlow);
            //flowBean.setSumFlow(sumFlow);
            
            text.set(phoneNum);
            context.write(text, flowBean);
        }
    }

    3、Reducer代码

    package com.ares.hadoop.mr.flowsum;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
        //private static final Logger LOGGER = Logger.getLogger(MRTest.class);
        
        private FlowBean flowBean = new FlowBean();
        
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values,
                Reducer<Text, FlowBean, Text, FlowBean>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            //super.reduce(arg0, arg1, arg2);
            long upFlowCounter = 0;
            long downFlowCounter = 0;
            
            for (FlowBean flowBean : values) {
                upFlowCounter += flowBean.getUpFlow();
                downFlowCounter += flowBean.getDownFlow();
            }
            flowBean.setPhoneNum(key.toString());
            flowBean.setUpFlow(upFlowCounter);
            flowBean.setDownFlow(downFlowCounter);
            flowBean.setSumFlow(upFlowCounter + downFlowCounter);
            
            context.write(key, flowBean);
        }
    }

    4、序列化Bean代码

    package com.ares.hadoop.mr.flowsum;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
    public class FlowBean implements Writable {
        private String phoneNum;
        private long upFlow;
        private long downFlow;
        private long sumFlow;
        
        public FlowBean() {
            // TODO Auto-generated constructor stub
        }    
    //    public FlowBean(String phoneNum, long upFlow, long downFlow, long sumFlow) {
    //        super();
    //        this.phoneNum = phoneNum;
    //        this.upFlow = upFlow;
    //        this.downFlow = downFlow;
    //        this.sumFlow = sumFlow;
    //    }
    
    
        public String getPhoneNum() {
            return phoneNum;
        }
    
        public void setPhoneNum(String phoneNum) {
            this.phoneNum = phoneNum;
        }
    
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(long downFlow) {
            this.downFlow = downFlow;
        }
    
        public long getSumFlow() {
            return sumFlow;
        }
    
        public void setSumFlow(long sumFlow) {
            this.sumFlow = sumFlow;
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            // TODO Auto-generated method stub
            phoneNum = in.readUTF();
            upFlow = in.readLong();
            downFlow = in.readLong();
            sumFlow = in.readLong();
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            // TODO Auto-generated method stub
            out.writeUTF(phoneNum);
            out.writeLong(upFlow);
            out.writeLong(downFlow);
            out.writeLong(sumFlow);
        }
    
        @Override
        public String toString() {
            return "" + upFlow + "	" + downFlow + "	" + sumFlow;
        }
        
    }

    5、TestRunner代码

    package com.ares.hadoop.mr.flowsum;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.apache.log4j.Logger;
    
    public class FlowSumRunner extends Configured implements Tool {
        private static final Logger LOGGER = Logger.getLogger(FlowSumRunner.class);
        
        @Override
        public int run(String[] args) throws Exception {
            // TODO Auto-generated method stub
            LOGGER.debug("MRTest: MRTest STARTED...");
            
            if (args.length != 2) {
                LOGGER.error("MRTest: ARGUMENTS ERROR");
                System.exit(-1);
            }
            
            Configuration conf = new Configuration();
            //FOR Eclipse JVM Debug  
            //conf.set("mapreduce.job.jar", "flowsum.jar");
            Job job = Job.getInstance(conf);
            
            // JOB NAME
            job.setJobName("flowsum");
            
            // JOB MAPPER & REDUCER
            job.setJarByClass(FlowSumRunner.class);
            job.setMapperClass(FlowSumMapper.class);
            job.setReducerClass(FlowSumReducer.class);
            
            // MAP & REDUCE
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
            // MAP
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);
            
            // JOB INPUT & OUTPUT PATH
            //FileInputFormat.addInputPath(job, new Path(args[0]));
            FileInputFormat.setInputPaths(job, args[0]);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            // VERBOSE OUTPUT
            if (job.waitForCompletion(true)) {
                LOGGER.debug("MRTest: MRTest SUCCESSFULLY...");
                return 0;
            } else {
                LOGGER.debug("MRTest: MRTest FAILED...");
                return 1;
            }            
            
        }
        
        public static void main(String[] args) throws Exception {
            int result = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);
            System.exit(result);
        }
    
    }

    参考资料:

    http://www.cnblogs.com/robert-blue/p/4157768.html

    http://www.cnblogs.com/qlee/archive/2011/05/18/2049610.html

    http://blog.163.com/lzm07@126/blog/static/25705468201331611857190/

    http://blog.csdn.net/lastsweetop/article/details/9193907

  • 相关阅读:
    leetcode 268. Missing Number
    DBSCAN
    python二维数组初始化
    leetcode 661. Image Smoother
    leetcode 599. Minimum Index Sum of Two Lists
    Python中的sort() key含义
    leetcode 447. Number of Boomerangs
    leetcode 697. Degree of an Array
    滴滴快车奖励政策,高峰奖励,翻倍奖励,按成交率,指派单数分级(1月3日)
    北京Uber优步司机奖励政策(1月2日)
  • 原文地址:https://www.cnblogs.com/junneyang/p/5846195.html
Copyright © 2011-2022 走看看