zoukankan      html  css  js  c++  java
  • 【Hadoop】Hadoop MR 自定义序列化类

    1、基本概念

     

    2、Mapper代码

    package com.ares.hadoop.mr.flowsum;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.util.StringUtils;
    import org.apache.log4j.Logger;
    
    import com.ares.hadoop.mr.wordcount.MRTest;
    
    //Long, String, String, Long --> LongWritable, Text, Text, LongWritable
    public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
        private static final Logger LOGGER = Logger.getLogger(MRTest.class);
        
        private String line;
        private int length;
        private final static char separator = '	';
        
        private String phoneNum;
        private long upFlow;
        private long downFlow;
        //private long sumFlow;
        
        private Text text = new Text();
        private FlowBean flowBean = new FlowBean();
        
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, FlowBean>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            //super.map(key, value, context);
            line = value.toString();
            String[] fields = StringUtils.split(line, separator);
            length = fields.length;
            if (length != 11) {
                LOGGER.error(key.get() + ", " + line + " LENGTH INVALID, IGNORE...");
            }
            
            phoneNum = fields[1];
            try {
                upFlow = Long.parseLong(fields[length-3]);
                downFlow = Long.parseLong(fields[length-2]);
                //sumFlow = upFlow + downFlow;
            } catch (Exception e) {
                // TODO: handle exception
                LOGGER.error(key.get() + ", " + line + " FLOW DATA INVALID, IGNORE...");
            }
            
            flowBean.setPhoneNum(phoneNum);
            flowBean.setUpFlow(upFlow);
            flowBean.setDownFlow(downFlow);
            //flowBean.setSumFlow(sumFlow);
            
            text.set(phoneNum);
            context.write(text, flowBean);
        }
    }

    3、Reducer代码

    package com.ares.hadoop.mr.flowsum;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
        //private static final Logger LOGGER = Logger.getLogger(MRTest.class);
        
        private FlowBean flowBean = new FlowBean();
        
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values,
                Reducer<Text, FlowBean, Text, FlowBean>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            //super.reduce(arg0, arg1, arg2);
            long upFlowCounter = 0;
            long downFlowCounter = 0;
            
            for (FlowBean flowBean : values) {
                upFlowCounter += flowBean.getUpFlow();
                downFlowCounter += flowBean.getDownFlow();
            }
            flowBean.setPhoneNum(key.toString());
            flowBean.setUpFlow(upFlowCounter);
            flowBean.setDownFlow(downFlowCounter);
            flowBean.setSumFlow(upFlowCounter + downFlowCounter);
            
            context.write(key, flowBean);
        }
    }

    4、序列化Bean代码

    package com.ares.hadoop.mr.flowsum;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
    public class FlowBean implements Writable {
        private String phoneNum;
        private long upFlow;
        private long downFlow;
        private long sumFlow;
        
        public FlowBean() {
            // TODO Auto-generated constructor stub
        }    
    //    public FlowBean(String phoneNum, long upFlow, long downFlow, long sumFlow) {
    //        super();
    //        this.phoneNum = phoneNum;
    //        this.upFlow = upFlow;
    //        this.downFlow = downFlow;
    //        this.sumFlow = sumFlow;
    //    }
    
    
        public String getPhoneNum() {
            return phoneNum;
        }
    
        public void setPhoneNum(String phoneNum) {
            this.phoneNum = phoneNum;
        }
    
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(long downFlow) {
            this.downFlow = downFlow;
        }
    
        public long getSumFlow() {
            return sumFlow;
        }
    
        public void setSumFlow(long sumFlow) {
            this.sumFlow = sumFlow;
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            // TODO Auto-generated method stub
            phoneNum = in.readUTF();
            upFlow = in.readLong();
            downFlow = in.readLong();
            sumFlow = in.readLong();
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            // TODO Auto-generated method stub
            out.writeUTF(phoneNum);
            out.writeLong(upFlow);
            out.writeLong(downFlow);
            out.writeLong(sumFlow);
        }
    
        @Override
        public String toString() {
            return "" + upFlow + "	" + downFlow + "	" + sumFlow;
        }
        
    }

    5、TestRunner代码

    package com.ares.hadoop.mr.flowsum;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.apache.log4j.Logger;
    
    public class FlowSumRunner extends Configured implements Tool {
        private static final Logger LOGGER = Logger.getLogger(FlowSumRunner.class);
        
        @Override
        public int run(String[] args) throws Exception {
            // TODO Auto-generated method stub
            LOGGER.debug("MRTest: MRTest STARTED...");
            
            if (args.length != 2) {
                LOGGER.error("MRTest: ARGUMENTS ERROR");
                System.exit(-1);
            }
            
            Configuration conf = new Configuration();
            //FOR Eclipse JVM Debug  
            //conf.set("mapreduce.job.jar", "flowsum.jar");
            Job job = Job.getInstance(conf);
            
            // JOB NAME
            job.setJobName("flowsum");
            
            // JOB MAPPER & REDUCER
            job.setJarByClass(FlowSumRunner.class);
            job.setMapperClass(FlowSumMapper.class);
            job.setReducerClass(FlowSumReducer.class);
            
            // MAP & REDUCE
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
            // MAP
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);
            
            // JOB INPUT & OUTPUT PATH
            //FileInputFormat.addInputPath(job, new Path(args[0]));
            FileInputFormat.setInputPaths(job, args[0]);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            // VERBOSE OUTPUT
            if (job.waitForCompletion(true)) {
                LOGGER.debug("MRTest: MRTest SUCCESSFULLY...");
                return 0;
            } else {
                LOGGER.debug("MRTest: MRTest FAILED...");
                return 1;
            }            
            
        }
        
        public static void main(String[] args) throws Exception {
            int result = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);
            System.exit(result);
        }
    
    }

    参考资料:

    http://www.cnblogs.com/robert-blue/p/4157768.html

    http://www.cnblogs.com/qlee/archive/2011/05/18/2049610.html

    http://blog.163.com/lzm07@126/blog/static/25705468201331611857190/

    http://blog.csdn.net/lastsweetop/article/details/9193907

  • 相关阅读:
    spring 之 AOP
    spring 之 动态代理
    为 NSDate 添加扩展类 判断时间
    iOS 日期相关总结
    iOS 请求出现 "Request failed: bad request (400)"
    NSURLSession/NSURLConnection HTTP load failed (kCFStreamErrorDomainSSL, -9802) 解决办法
    poj-3302
    辽宁省赛——杨鲁斯卡尔专场-J
    辽宁省赛——杨鲁斯卡尔专场 -F
    zzuli训练赛_05_13-D
  • 原文地址:https://www.cnblogs.com/junneyang/p/5846195.html
Copyright © 2011-2022 走看看