zoukankan      html  css  js  c++  java
  • hadoop2.2.0 MapReduce的序列化

    package com.my.hadoop.mapreduce.dataformat;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;

    import com.my.hadoop.common.Configs;



    /**
     * hadoop的序列化
     * @author yao
     *
     */
    public class DataCount {

        static class DTMap extends Mapper<LongWritable, Text, Text, DataBean>{
            DataBean dataBean = null;
            @Override
            public void map(LongWritable key, Text value, Context context) throws IOException ,InterruptedException {
                String[] fields = value.toString().split(" ");
                String telNo = fields[1];
                long upPayLoad = Long.parseLong(fields[8]);
                long downPayLoad = Long.parseLong(fields[9]);
                dataBean = new DataBean(telNo, upPayLoad, downPayLoad);
                context.write(new Text(telNo), dataBean);
            }
        }
        
        static class DTReduce extends Reducer<Text, DataBean, Text, DataBean>{
            @Override
            public void reduce(Text key, Iterable<DataBean> dataBeans, Context context) throws IOException ,InterruptedException {
                long upPayLoad = 0;
                long downPayLoad = 0;
                for (DataBean dataBean : dataBeans) {
                    upPayLoad += dataBean.getUpPayLoad();
                    downPayLoad += dataBean.getDownPayLoad();
                }
                DataBean dataBean = new DataBean("", upPayLoad, downPayLoad);
                context.write(key, dataBean);
            }
        }
        
        public static void main(String[] args) throws Exception {
            Configuration conf = Configs.getConfigInstance();
            
            String[] paths = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (paths.length != 2) {
                System.err.println("Usage: " + DataCount.class.getName() + " <in> <out>");
                System.exit(2);
            }
            
            Job job = Job.getInstance(conf, DataCount.class.getSimpleName());
            job.setJarByClass(DataCount.class);                                //设置main函数所在的类
            
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            job.setMapperClass(DTMap.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(DataBean.class);
            
            job.setReducerClass(DTReduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DataBean.class);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            System.exit(job.waitForCompletion(true) ? 0 : 1);                //等待MapReduce执行完成并打印作业进度详情
            
        }

    }

    /**
     * 实现Writable接口,重写Write方法和readFields方法,严格按字段顺序进行写入写出
     * @author yao
     *
     */
    class DataBean implements Writable {

        private String telNo;
        private long upPayLoad;
        private long downPayLoad;
        private long totalPayLoad;
        
        public DataBean(){
            
        }
        
        public DataBean(String telNo, long upPayLoad, long downPayLoad) {
            super();
            this.telNo = telNo;
            this.upPayLoad = upPayLoad;
            this.downPayLoad = downPayLoad;
            this.totalPayLoad = upPayLoad + downPayLoad;
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            this.telNo = in.readUTF();
            this.upPayLoad = in.readLong();
            this.downPayLoad = in.readLong();
            this.totalPayLoad = in.readLong();
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(telNo);
            out.writeLong(upPayLoad);
            out.writeLong(downPayLoad);
            out.writeLong(totalPayLoad);
        }

        @Override
        public String toString() {
            return this.telNo+" "+this.upPayLoad+" "+this.downPayLoad+" "+this.totalPayLoad;
        }

        public String getTelNo() {
            return telNo;
        }

        public void setTelNo(String telNo) {
            this.telNo = telNo;
        }

        public long getUpPayLoad() {
            return upPayLoad;
        }

        public void setUpPayLoad(long upPayLoad) {
            this.upPayLoad = upPayLoad;
        }

        public long getDownPayLoad() {
            return downPayLoad;
        }

        public void setDownPayLoad(long downPayLoad) {
            this.downPayLoad = downPayLoad;
        }

        public long getTotalPayLoad() {
            return totalPayLoad;
        }

        public void setTotalPayLoad(long totalPayLoad) {
            this.totalPayLoad = totalPayLoad;
        }

    }

  • 相关阅读:
    Docker 镜像
    为什么要用 Docker
    什么是 Docker
    python编码
    Python File(文件) 方法
    Python 日期和时间
    Python 字符串字典内置函数&方法
    Python 元组内置函数
    Epos消费管理系统使用发布订阅实现数据库SQL SERVER 2005同步复制
    Epos消费管理系统复制迁移SQL SERVER 2005数据库
  • 原文地址:https://www.cnblogs.com/mengyao/p/4135931.html
Copyright © 2011-2022 走看看