zoukankan      html  css  js  c++  java
  • mapreduce 自定义数据类型的简单的应用

    本文以手机流量统计为例:

    日志中包含下面字段

    现在需要统计手机的上行数据包,下行数据包,上行总流量,下行总流量。

    分析:可以以手机号为key 以上4个字段为value传传递数据。

    这样则需要自己定义一个数据类型,用于封装要统计的4个字段,在map 与reduce之间传递和shuffle

    注:作为key的自定义类型需要实现WritableComparable 里面的compareTo方法

         作为value的自定义类 则只需实现Writable里面的方法

    自定义代码如下:

    package org.apache.hadoop.mapreduce.io;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
     
    
    /***
     *  customize mobile data writable
     * @author nele
     *
     */
    public class MobileDataWritable implements
            Writable {
    
        private long upPackNum;
    
        private long downPackNum;
    
        private long upPayLoad;
    
        private long downPayLoad;
    
        public long getUpPackNum() {
            return upPackNum;
        }
    
        public void setUpPackNum(long upPackNum) {
            this.upPackNum = upPackNum;
        }
    
        public long getDownPackNum() {
            return downPackNum;
        }
    
        public void setDownPackNum(long downPackNum) {
            this.downPackNum = downPackNum;
        }
    
        public long getUpPayLoad() {
            return upPayLoad;
        }
    
        public void setUpPayLoad(long upPayLoad) {
            this.upPayLoad = upPayLoad;
        }
    
        public long getDownPayLoad() {
            return downPayLoad;
        }
    
        public void setDownPayLoad(long downPayLoad) {
            this.downPayLoad = downPayLoad;
        }
    
        public MobileDataWritable() {
        }
    
        public MobileDataWritable(long upPackNum, long downPackNum, long upPayLoad,
                long downPayLoad) {
            this.set(upPackNum, downPackNum, upPayLoad, downPayLoad);
        }
    
        public void set(long upPackNum, long downPackNum, long upPayLoad,
                long downPayLoad) {
            this.upPackNum = upPackNum;
            this.downPackNum = downPackNum;
            this.upPayLoad = upPayLoad;
            this.downPayLoad = downPayLoad;
        }
    
        public void write(DataOutput out) throws IOException {
            out.writeLong(upPackNum);
            out.writeLong(downPackNum);
            out.writeLong(upPayLoad);
            out.writeLong(downPayLoad);
        }
    
        public void readFields(DataInput in) throws IOException {
            this.upPackNum = in.readLong();
            this.downPackNum = in.readLong();
            this.upPayLoad = in.readLong();
            this.downPayLoad = in.readLong();
        }
    
        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + (int) (downPackNum ^ (downPackNum >>> 32));
            result = prime * result + (int) (downPayLoad ^ (downPayLoad >>> 32));
            result = prime * result + (int) (upPackNum ^ (upPackNum >>> 32));
            result = prime * result + (int) (upPayLoad ^ (upPayLoad >>> 32));
            return result;
        }
    
        @Override
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            MobileDataWritable other = (MobileDataWritable) obj;
            if (downPackNum != other.downPackNum)
                return false;
            if (downPayLoad != other.downPayLoad)
                return false;
            if (upPackNum != other.upPackNum)
                return false;
            if (upPayLoad != other.upPayLoad)
                return false;
            return true;
        }
    
        @Override
        public String toString() {
            return upPackNum + "	" +downPackNum+ "	" + upPayLoad + "	" + downPayLoad;
        }
    
    
    }

    现在就可以使用自定义的类型进行手机流量的统计 代码如下:

    /***
     * MapReduce Module
     * 
     * @author nele
     * 
     */
    public class MobileDataMapReduce extends Configured implements Tool {
    
        // map class
        /**
         * 
         * @author nele
         * 
         */
        public static class MobileDataMapper extends
                Mapper<LongWritable, Text, Text, MobileDataWritable> {
    
            public Text outPutKey = new Text();
            public MobileDataWritable outPutValue = new MobileDataWritable();
    
            @Override
            public void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                System.out.println(key+":"+value);
                String[] arr = value.toString().split("	");
                outPutKey.set(arr[1]);
                outPutValue.set(Long.valueOf(arr[6]), Long.valueOf(arr[7]),
                        Long.valueOf(arr[8]), Long.valueOf(arr[9]));
                context.write(outPutKey, outPutValue);
            }
    
        }
    
        // reduce class
        /***
         * 
         * @author nele
         * 
         */
        public static class MobileDataReducer extends
                Reducer<Text, MobileDataWritable, Text, MobileDataWritable> {
    
            private Text outPutKey = new Text();
            private MobileDataWritable outPutValue = new MobileDataWritable();
    
            @Override
            public void reduce(Text key, Iterable<MobileDataWritable> values,
                    Context context) throws IOException, InterruptedException {
                long upPackNum = 0;
                long downPackNum = 0;
                long upPayLoad = 0;
                long downPayLoad = 0;
                for (MobileDataWritable val : values) {
                    upPackNum += val.getUpPackNum();
                    downPackNum += val.getDownPackNum();
                    upPayLoad += val.getUpPayLoad();
                    downPayLoad += val.getDownPayLoad();
                }
               outPutKey.set(key);
               outPutValue.set(upPackNum, downPackNum, upPayLoad, downPayLoad);
               context.write(outPutKey, outPutValue);
            }
        }
    
        // run method
        public int run(String[] args) throws Exception {
            Configuration conf = super.getConf();
    
            // create job
            Job job = Job.getInstance(conf, this.getClass().getSimpleName());
            job.setJarByClass(this.getClass());
    
            // set input path
            Path inPath = new Path(args[0]);
            FileInputFormat.addInputPath(job, inPath);
    
            // map
            job.setMapperClass(MobileDataMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(MobileDataWritable.class);
    
            // conbile
            job.setCombinerClass(MobileDataReducer.class);
    
            // reduce
            job.setReducerClass(MobileDataReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(MobileDataWritable.class);
    
            // output
            Path outPath = new Path(args[1]);
            FileOutputFormat.setOutputPath(job, outPath);
    
            // submit
            return job.waitForCompletion(true) ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
             args = new String[] {
             "hdfs://bigdata5:8020/user/nele/data/input/HTTP_20130313143750.data",
             "hdfs://bigdata5:8020/user/nele/data/output/output6" };
    
            Configuration conf = new Configuration();
    
            int status = ToolRunner.run(conf, new MobileDataMapReduce(), args);
    
            System.exit(status);
        }
    
    }

    这样就可以统计 给出的数据日志中的手机各种流量的数据

  • 相关阅读:
    Browse information of one or more files is not available解决办法
    python中装饰器的使用
    python:匿名函数lambda
    python:列表生成式的学习
    python:列表切片知识的总结
    python:*args和**kwargs的用法
    NAT
    ACL
    三层交换技术和HSRP协议
    单臂路由与DHCP中继
  • 原文地址:https://www.cnblogs.com/nele/p/5178089.html
Copyright © 2011-2022 走看看