zoukankan      html  css  js  c++  java
  • Hadoop基础---流量求和MapReduce程序及自定义数据类型

    一:测试数据

    1363157985066    13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
    1363157995052    13826544101    5C-0E-8B-C7-F1-E0:CMCC    120.197.40.4            4    0    264    0    200
    1363157991076    13926435656    20-10-7A-28-CC-0A:CMCC    120.196.100.99            2    4    132    1512    200
    1363154400022    13926251106    5C-0E-8B-8B-B1-50:CMCC    120.197.40.4            4    0    240    0    200
    1363157993044    18211575961    94-71-AC-CD-E6-18:CMCC-EASY    120.196.100.99    iface.qiyi.com    视频网站    15    12    1527    2106    200
    1363157995074    84138413    5C-0E-8B-8C-E8-20:7DaysInn    120.197.40.4    122.72.52.12        20    16    4116    1432    200
    1363157993055    13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1116    954    200
    1363157995033    15920133257    5C-0E-8B-C7-BA-20:CMCC    120.197.40.4    sug.so.360.cn    信息安全    20    20    3156    2936    200
    1363157983019    13719199419    68-A1-B7-03-07-B1:CMCC-EASY    120.196.100.82            4    0    240    0    200
    1363157984041    13660577991    5C-0E-8B-92-5C-20:CMCC-EASY    120.197.40.4    s19.cnzz.com    站点统计    24    9    6960    690    200
    1363157973098    15013685858    5C-0E-8B-C7-F7-90:CMCC    120.197.40.4    rank.ie.sogou.com    搜索引擎    28    27    3659    3538    200
    1363157986029    15989002119    E8-99-C4-4E-93-E0:CMCC-EASY    120.196.100.99    www.umeng.com    站点统计    3    3    1938    180    200
    1363157992093    13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            15    9    918    4938    200
    1363157986041    13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    180    180    200
    1363157984040    13602846565    5C-0E-8B-8B-B6-00:CMCC    120.197.40.4    2052.flash2-http.qq.com    综合门户    15    12    1938    2910    200
    1363157995093    13922314466    00-FD-07-A2-EC-BA:CMCC    120.196.100.82    img.qfc.cn        12    12    3008    3720    200
    1363157982040    13502468823    5C-0A-5B-6A-0B-D4:CMCC-EASY    120.196.100.99    y0.ifengimg.com    综合门户    57    102    7335    110349    200
    1363157986072    18320173382    84-25-DB-4F-10-1A:CMCC-EASY    120.196.100.99    input.shouji.sogou.com    搜索引擎    21    18    9531    2412    200
    1363157990043    13925057413    00-1F-64-E1-E6-9A:CMCC    120.196.100.55    t3.baidu.com    搜索引擎    69    63    11058    48243    200
    1363157988072    13760778710    00-FD-07-A4-7B-08:CMCC    120.196.100.82            2    2    120    120    200
    1363157985079    13823070001    20-7C-8F-70-68-1F:CMCC    120.196.100.99            6    3    360    180    200
    1363157985069    13600217502    00-1F-64-E2-E8-B1:CMCC    120.196.100.55            18    138    1080    186852    200

    二:按照需求自定义数据类型

    参考LongWritable进行改造:

    package cn.hadoop.mr.wc;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    public class FlowBean implements WritableComparable<FlowBean> {
        private String phoneNB;
        private long up_flow;
        private long down_flow;
        private long sum_flow;
        
        public FlowBean() {}    //无参构造函数,用于反序列化时使用
    
        public FlowBean(String phoneNB, long up_flow, long down_flow) {
            this.phoneNB = phoneNB;
            this.up_flow = up_flow;
            this.down_flow = down_flow;
            this.sum_flow = up_flow + down_flow;
        }
        
    
        public String getPhoneNB() {
            return phoneNB;
        }
    
        public void setPhoneNB(String phoneNB) {
            this.phoneNB = phoneNB;
        }
    
        public long getUp_flow() {
            return up_flow;
        }
    
        public void setUp_flow(long up_flow) {
            this.up_flow = up_flow;
        }
    
        public long getDown_flow() {
            return down_flow;
        }
    
        public void setDown_flow(long down_flow) {
            this.down_flow = down_flow;
        }
    
        public long getSum_flow() {
            return up_flow + down_flow;
        }
    
    
        //用于序列化
        @Override
        public void write(DataOutput out) throws IOException {
            // TODO Auto-generated method stub
            out.writeUTF(phoneNB);
            out.writeLong(up_flow);
            out.writeLong(down_flow);
            out.writeLong(up_flow+down_flow);
        }
        
        //用于反序列化
        @Override
        public void readFields(DataInput in) throws IOException {
            // TODO Auto-generated method stub
            phoneNB = in.readUTF();
            up_flow = in.readLong();
            down_flow = in.readLong();
            sum_flow = in.readLong();
        }
    
        @Override
        public int compareTo(FlowBean o) {
            // TODO Auto-generated method stub
            return 0;
        }
    
        @Override
        public String toString() {
            return "" + up_flow + "	" + down_flow + "	"+ sum_flow;
        }
    
    }

    三:实现Map程序

    package cn.hadoop.fs;
    
    import java.io.IOException;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import cn.hadoop.mr.wc.FlowBean;
    
    public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context)
                throws IOException, InterruptedException {
            //获取一行数据
            String line = value.toString();
            //进行切分
            String[] fields = StringUtils.split(line, "	");
            //获取我们需要的数据
            String phoneNB = fields[1];
            long up_flow = Long.parseLong(fields[7]);
            long down_flow = Long.parseLong(fields[8]);
            
            //封装数据为KV并输出
            context.write(new Text(phoneNB), new FlowBean(phoneNB,up_flow,down_flow));
        }
    }

    四:实现Reduce程序

    package cn.hadoop.fs;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import cn.hadoop.mr.wc.FlowBean;
    
    public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context)
                throws IOException, InterruptedException {
            long up_flow_c = 0;
            long down_flow_c = 0;
            
            for(FlowBean bean: values) {
                up_flow_c += bean.getUp_flow();
                down_flow_c += bean.getDown_flow();
            }
            
            context.write(key, new FlowBean(key.toString(),up_flow_c,down_flow_c));
        }
    }

    五:实现主函数调用

    package cn.hadoop.fs;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import cn.hadoop.mr.wc.FlowBean;
    
    
    
    public class FlowSumRunner extends Configured implements Tool{
    
        @Override
        public int run(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            
            job.setJarByClass(FlowSumRunner.class);
            
            job.setMapperClass(FlowSumMapper.class);
            job.setReducerClass(FlowSumReducer.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
            
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);
            
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            return job.waitForCompletion(true)?0:1;
        }
        
        public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);
            System.exit(res);
        }
    }

    六:测试结果

     hadoop jar fs.jar cn.hadoop.fs.FlowSumRunner /fs/input/ /fs/output

  • 相关阅读:
    51nod1459 迷宫游戏
    51nod2006 飞行员配对(二分图最大匹配)
    51nod2006 飞行员配对(二分图最大匹配)
    GIT学习之路第四天 远程仓库
    GIT学习之路第四天 远程仓库
    搞懂树状数组
    搞懂树状数组
    线段树基础详解
    线段树基础详解
    折半枚举(双向搜索)poj27854 Values whose Sum is 0
  • 原文地址:https://www.cnblogs.com/ssyfj/p/12344773.html
Copyright © 2011-2022 走看看