zoukankan      html  css  js  c++  java
  • MapReduce做客户手机流量统计

    在上一篇关于单词的统计博文中,已经阐述了详细的导包和执行步骤,

    可以查看参考,本文主要显示对用户流量统计的代码实现过程,并对其进行了分组。

    代码:

    底层的FlowWriteable:

    package flowstat;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
    public class FolwWritable implements Writable {
        private int upFolw;//上行流量
        private int downFlow;//下行流量字节数
        private int sumFlow;//总流量
        
        public FolwWritable() {
            
        }
        /**
         * 
         * 用上下行流量创建流量对象看,自动计算总流量
         * @param up
         * @param down
         */
        
        public FolwWritable(int up,int down) {
            this.upFolw=up;
            this.downFlow=down;
            this.sumFlow=up+down;
        }
        
        public int getUpFolw() {
            return upFolw;
        }
        public void setUpFolw(int upFolw) {
            this.upFolw = upFolw;
        }
        public int getDownFlow() {
            return downFlow;
        }
        public void setDownFlow(int downFlow) {
            this.downFlow = downFlow;
        }
        public int getSumFlow() {
            return sumFlow;
        }
        public void setSumFlow(int sumFlow) {
            this.sumFlow = sumFlow;
        }
        @Override
        public void readFields(DataInput in) throws IOException {
            this.upFolw=in.readInt();
            this.downFlow=in.readInt();
            this.sumFlow=in.readInt();
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(upFolw);
            out.writeInt(downFlow);
            out.writeInt(sumFlow);
            
        }
        @Override
        public String toString() {
            return "FolwWritable [upFolw=" + upFolw + ", downFlow=" + downFlow + ", sumFlow=" + sumFlow + "]";
        }
        
    
    }

    分组partion类:

    package flowstat;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    
    public class NumsegPartion extends Partitioner<Text, IntWritable> {
    
        @Override
        public int getPartition(Text key, IntWritable value, int arg2) {
            String phone=key.toString();
            String mseg=phone.substring(3, 7);
            
            if(mseg.equals("0013")) {
                return 1;
            }
            if(mseg.equals("0023")) {
                return 2;
            }
            
            return 0;
        }
    
        
    
    }

    mapper类:

    package flowstat;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class FlowMapper extends Mapper<LongWritable, Text, Text, FolwWritable>{
    
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FolwWritable>.Context context)
                throws IOException, InterruptedException {
            //读一行上网记录
            String line=value.toString();
            //拆开字符串
            String[] fields=line.split("	");
            //获取特殊位置信息手机号
            Text phone=new Text(fields[2]);
            
            int up=Integer.parseInt(fields[3]);
            
            int down=Integer.parseInt(fields[4]);
            //构造写入值,流量序列化对象
            FolwWritable fw=new FolwWritable(up,down);
            //map函数输出,写入到context
            context.write(phone, fw);
        }
        
        
    }

    reduce类:

    package flowstat;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class FlowReduce extends Reducer<Text, FolwWritable, Text, FolwWritable>{
    
        @Override
        protected void reduce(Text key, Iterable<FolwWritable> values,
                Reducer<Text, FolwWritable, Text, FolwWritable>.Context context) throws IOException, InterruptedException {
            //遍历所有values中的FlowWritable对象,汇总流量
            int sumUp=0;
            int sumDn=0;
            for(FolwWritable value:values) {
                sumUp+=value.getUpFolw();
                sumDn+=value.getDownFlow();
            }
            //获取手机号
            //构造流量序列化完对象
            FolwWritable v=new FolwWritable(sumUp,sumDn);
            context.write(key, v);
            
        }
        
        
    
    }

    任务提交类:

    package flowstat;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    
    
    public class FlowMain {
        public static void main(String[] args) throws Exception {
            Configuration conf=new Configuration();
            
            //创建作业
            Job job=Job.getInstance(conf,"FlowStatMR");
            
            //设定jar
            job.setJarByClass(FlowMain.class);
            
            //设定map和ruduce类
            job.setMapperClass(FlowMapper.class);
            job.setReducerClass(FlowReduce.class);
            
            //添加分组类
            job.setPartitionerClass(NumsegPartion.class);
            job.setNumReduceTasks(3);
            
            //设定输入初始格式
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            
            //设置key value 类
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FolwWritable.class);
            
            //设置输入输出路径
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            //开启作业,等待结果
            boolean succ=job.waitForCompletion(true);
            System.out.println(succ?"执行成功":"执行失败");
            //作业执行
            job.waitForCompletion(true);
            
            
            
        }
    }

    接下来就是生产jar包,将其导入虚拟机,执行命令进行计算分组。具体步骤可以参考上一篇单词统计MapReduce博文。

  • 相关阅读:
    第06组 Alpha冲刺(6/6)
    第06组 Alpha冲刺(5/6)
    总结
    Vmware centos7无法联网的问题解决
    网络爬虫--前世今生
    CVE-2018-4407 漏洞复现POC
    编码原理_base64编码原理
    短信验证码之验证码回显
    2018_10_21 22:42
    信息安全考研和就业的选择分析
  • 原文地址:https://www.cnblogs.com/qianshuixianyu/p/9366595.html
Copyright © 2011-2022 走看看