zoukankan      html  css  js  c++  java
  • MapReduce手机流量统计

    package com.yuejiesong.PhoneFlowCount;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;
    import java.util.Calendar;

    public class FlowCount {
    static class FlowCountMapper extends Mapper<LongWritable,Text,Text,FlowBean>{
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    //将一行内容转换成string
    String line = value.toString();
    //切分字段
    String[] fields = line.split(" ");
    //取出手机号
    String phoneNbr = fields[0];
    //取出上行流量下行流量
    long upFlow = Long.parseLong(fields[1]);
    long dFlow = Long.parseLong(fields[2]);
    context.write(new Text(phoneNbr),new FlowBean(upFlow,dFlow));

    }
    }
    static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
    long sum_upFlow = 0;
    long sum_dFlow = 0;
    for(FlowBean bean:values){
    sum_upFlow += bean.getUpFlow();
    sum_dFlow += bean.getdFlow();
    }
    FlowBean resultBean = new FlowBean(sum_upFlow,sum_dFlow);
    context.write(key,resultBean);
    }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    //指定本程序的jar包所在的本地路径
    job.setJarByClass(FlowCount.class);
    //指定本业务job要使用的maper/reducer业务类
    job.setMapperClass(FlowCountMapper.class);
    job.setReducerClass(FlowCountReducer.class);
    //指定mapper输出数据的kv类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(FlowBean.class);
    //指定最终输出的数据的kv类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FlowBean.class);
    //指定job的输入原始文件所在目录
    FileInputFormat.setInputPaths(job,new Path(args[0]));
    //指定job的输出结果所在目录
    FileOutputFormat.setOutputPath(job,new Path(args[1]));
    //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
    /* job.submit()*/
    boolean b = job.waitForCompletion(true);
    System.exit(b?0:1);

    }
    }

    (base) [root@pyspark flowcount]# hdfs dfs -text /flowcount20200410input/*
    13726230501 200 1100
    13396230502 300 1200
    13897230503 400 1300
    13897230503 100 300
    13597230534 500 1400
    13597230534 300 1200

    hadoop jar HiveCountToolsUDF-1.0-SNAPSHOT.jar com.yuejiesong.PhoneFlowCount.FlowCount /flowcount20200410input/ /flowcount20200410output/

    (base) [root@pyspark flowcount]# hadoop fs -ls /flowcount20200410output/ Found 2 items
    -rw-r--r-- 1 root supergroup 0 2020-04-11 14:29 /flowcount20200410output/_SUCCESS
    -rw-r--r-- 1 root supergroup 104 2020-04-11 14:29 /flowcount20200410output/part-r-00000
    (base) [root@pyspark flowcount]# hadoop fs -text /flowcount20200410output/part-r-00000
    13396230502 300 1200 1500
    13597230534 800 2600 3400
    13726230501 200 1100 1300
    13897230503 500 1600 2100

  • 相关阅读:
    黑马java课程2222
    屏幕亮度软件和一些自己必用的软件设置
    ahk保存
    天天洗一次澡还真是好方法
    自动化测试 就这两张图
    python __init__.py用途
    bash检查文件格式
    cygwin中运行命令提示command not found的解决方法
    Python批量插入SQL Server数据库
    怎样去掉FireFox的导入向导
  • 原文地址:https://www.cnblogs.com/songyuejie/p/12673532.html
Copyright © 2011-2022 走看看