zoukankan      html  css  js  c++  java
  • 流量汇总(自定义jar包,在hadoop集群上 统计,排序,分组)之统计

    小知识点:

    half:关机
    yarn端口:8088
    删除hdfs目录:hadoop fs -rm -r /wc/output

    namenode两个状态都是standby原因:zookeeper没有比hdfs先启动

    现在来做一个流量统计的例子:
    首先数据是这样一张表:见附件


    统计:(代码)

    1,flowbean:
    package cn.itcast.hadoop.mr.flowsum;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.io.WritableComparable;

    public class FlowBean implements WritableComparable<FlowBean> {

    private String phoneNB;
    private long up_flow;
    private long d_flow;
    private long s_flow;
    //在反序列化时候反射机制需要调用空参数构造方法,所以显示定义了一个空参构造函数
    public FlowBean() {}
    //为了对象数据的初始化方便,加入一个带参数的构造函数
    public FlowBean(String phoneNB, long up_flow, long d_flow) {
    super();
    this.phoneNB = phoneNB;
    this.up_flow = up_flow;
    this.d_flow = d_flow;
    this.s_flow = up_flow+d_flow;
    }

    @Override
    public String toString() {
    return ""+up_flow +" " +d_flow + " "+ s_flow;
    }

    public String getPhoneNB() {
    return phoneNB;
    }

    public void setPhoneNB(String phoneNB) {
    this.phoneNB = phoneNB;
    }

    public long getUp_flow() {
    return up_flow;
    }

    public void setUp_flow(long up_flow) {
    this.up_flow = up_flow;
    }

    public long getD_flow() {
    return d_flow;
    }

    public void setD_flow(long d_flow) {
    this.d_flow = d_flow;
    }

    public long getS_flow() {
    return s_flow;
    }

    public void setS_flow(long s_flow) {
    this.s_flow = s_flow;
    }

    //从数据流中反序列化出对象的数据
    // 从数据流中独处对象字段时候,必须跟序列化的顺序保持一致
    @Override
    public void readFields(DataInput in) throws IOException {
    phoneNB = in.readUTF();
    up_flow=in.readLong();
    d_flow=in.readLong();
    s_flow=in.readLong();
    }

    //将对象数据序列化到流中
    @Override
    public void write(DataOutput out) throws IOException {
     
    out.writeUTF(phoneNB);
    out.writeLong(up_flow);
    out.writeLong(d_flow);
    out.writeLong(s_flow);
    }
    //比较,在这里实现了排序
    @Override
    public int compareTo(FlowBean o) {
    return s_flow>o.getS_flow()?-1:1;
    }

    }

    2,flowsumMapper:
    package cn.itcast.hadoop.mr.flowsum;

    import java.io.IOException;

    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    /**
     * @author yw.wang
     * FlowBean 是我们自定义的一种数据类型,要在hadoop的各个节点之间传输,所以应该遵循hadoop的序列化机制
     * 就必须实现hadoop的序列化接口
     *
     */
    public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    // 拿到日志中的一行数据,切分各个字段,抽取我们需要的字段:手机号,上行流量,下行流量,然后封装成kv类型发送出去,到reduce
    @Override
    protected void map(LongWritable key, Text value,Context context)
    throws IOException, InterruptedException {
    //拿一行数据
    String line = value.toString();
    //切分成各个字段
    String[] fields = StringUtils.split(line," ");
    //拿到我们需要的字段
    String phoneNB = fields[0];
    long u_flow =Long.parseLong(fields[7]);
    long d_flow =Long.parseLong(fields[8]);
    //封装数据为kv类型并输出
    context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));
    }
    }


    3,flowsumreducer
    package cn.itcast.hadoop.mr.flowsum;

    import java.io.IOException;

    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
    //框架每传递一组数据<1237435262,{flowbean,flowbean,flowbean....}>
    //reduce中的业务逻辑就是遍历values,然后进行累加求和再输出
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values,Context context)
    throws IOException, InterruptedException {
    long up_flow_counter= 0;
    long d_flow_counter=0;
    for (FlowBean bean : values) {
    up_flow_counter +=bean.getUp_flow();
    d_flow_counter+=bean.getD_flow();
    }
    context.write(key, new FlowBean(key.toString(),up_flow_counter,d_flow_counter));
    }

    }

    4,flowsumrunner:
    package cn.itcast.hadoop.mr.flowsum;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.OutputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;

    //这是job描述和提交类的规范写法
    public class FlowSumRunner extends Configured implements Tool{

    @Override
    public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    job.setJarByClass(FlowSumRunner.class);
    job.setMapperClass(FlowSumMapper.class);
    job.setReducerClass(FlowSumReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(FlowBean.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FlowBean.class);
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true)?0:1;
    }
    public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);
    System.exit(res);
    }

    }

    打成jar包:


    在集群中使用命令:
    hadoop  jar  /root/Documents/sum.jar   cn.itcast.hadoop.mr.flowsum.FlowSumRunner  /wc/data/  /wc/sumoutput

    解释:



    排序:

    代码:
    1. package cn.itcast.hadoop.mr.flowsort;
    2. import java.io.IOException;
    3. import org.apache.commons.lang.StringUtils;
    4. import org.apache.hadoop.conf.Configuration;
    5. import org.apache.hadoop.fs.Path;
    6. import org.apache.hadoop.io.LongWritable;
    7. import org.apache.hadoop.io.NullWritable;
    8. import org.apache.hadoop.io.Text;
    9. import org.apache.hadoop.mapreduce.Job;
    10. import org.apache.hadoop.mapreduce.Mapper;
    11. import org.apache.hadoop.mapreduce.Reducer;
    12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    14. import cn.itcast.hadoop.mr.flowsum.FlowBean;
    15. public class SortMR {
    16. public static class SortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{
    17. //拿到一行数据,切分出各字段,封装为一个flowbean,作为key输出
    18. @Override
    19. protected void map(LongWritable key, Text value,Context context)
    20. throws IOException, InterruptedException {
    21. String line = value.toString();
    22. String[] fields = StringUtils.split(line, " ");
    23. String phoneNB = fields[0];
    24. long u_flow = Long.parseLong(fields[1]);
    25. long d_flow = Long.parseLong(fields[2]);
    26. context.write(new FlowBean(phoneNB, u_flow, d_flow), NullWritable.get());
    27. }
    28. }
    29. public static class SortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean>{
    30. @Override
    31. protected void reduce(FlowBean key, Iterable<NullWritable> values,Context context)
    32. throws IOException, InterruptedException {
    33. String phoneNB = key.getPhoneNB();
    34. context.write(new Text(phoneNB), key);
    35. }
    36. }
    37. public static void main(String[] args) throws Exception {
    38. Configuration conf = new Configuration();
    39. Job job = Job.getInstance(conf);
    40. // main方法所在的类,此处表示自身的类
    41. job.setJarByClass(SortMR.class);
    42. //会代表map,reduce的output,如果不一样可以申明mapoutput类型,像下面的一样
    43. job.setMapperClass(SortMapper.class);
    44. job.setReducerClass(SortReducer.class);
    45. // mapoutput类型
    46. job.setMapOutputKeyClass(FlowBean.class);
    47. job.setMapOutputValueClass(NullWritable.class);
    48. job.setOutputKeyClass(Text.class);
    49. job.setOutputValueClass(FlowBean.class);

    50. //这两个参数正好是 hadoop jar 。。 最后两个参数
    51. FileInputFormat.setInputPaths(job, new Path(args[0]));
    52. FileOutputFormat.setOutputPath(job, new Path(args[1]));
    53. //标准输出
    54. System.exit(job.waitForCompletion(true)?0:1);
    55. }
    56. }
    排序是针对统计的结果进行排序,故数据元是统计完成之后的00000success那个文件


    分组:


    FlowSumArea :
    1. package cn.itcast.hadoop.mr.areapartition;
    2. import java.io.IOException;
    3. import org.apache.commons.lang.StringUtils;
    4. import org.apache.hadoop.conf.Configuration;
    5. import org.apache.hadoop.fs.Path;
    6. import org.apache.hadoop.io.LongWritable;
    7. import org.apache.hadoop.io.Text;
    8. import org.apache.hadoop.mapreduce.Job;
    9. import org.apache.hadoop.mapreduce.Mapper;
    10. import org.apache.hadoop.mapreduce.Reducer;
    11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    13. import org.apache.hadoop.metrics2.impl.ConfigBuilder;
    14. import cn.itcast.hadoop.mr.flowsum.FlowBean;
    15. /**
    16. * 对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件
    17. * 需要自定义改造两个机制
    18. * 1,改造分区的逻辑,自定义一个partitioneer
    19. * 2,自定义reduer task的并发任务数
    20. */
    21. public class FlowSumArea {
    22. public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
    23. @Override
    24. protected void map(LongWritable key, Text value,Context context)
    25. throws IOException, InterruptedException {
    26. //拿一行数据
    27. String line = value.toString();
    28. //切分成各个字段
    29. String[] fields = StringUtils.split(line," ");
    30. //拿到我们的字段
    31. String phoneNB = fields[1];
    32. long u_flow = Long.parseLong(fields[7]);
    33. long d_flow = Long.parseLong(fields[8]);
    34. //封装数据为kv并输出
    35. context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));
    36. }
    37. }
    38. public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
    39. @Override
    40. protected void reduce(Text key, Iterable<FlowBean> values,Context context)
    41. throws IOException, InterruptedException {
    42. long up_flow_counter = 0;
    43. long d_flow_counter = 0;
    44. for (FlowBean bean : values) {
    45. up_flow_counter +=bean.getUp_flow();
    46. d_flow_counter += bean.getD_flow();
    47. }
    48. context.write(key, new FlowBean(key.toString(),up_flow_counter,d_flow_counter));
    49. }
    50. }
    51. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    52. Configuration conf = new Configuration();
    53. Job job = Job.getInstance(conf);
    54. job.setJarByClass(FlowSumArea.class);
    55. //job.setMapperClass(FlowSumAreaMapper.class);
    56. job.setMapperClass(FlowSumAreaMapper.class);
    57. job.setReducerClass(FlowSumAreaReducer.class);
    58. //设置我们自定义的分组逻辑定义
    59. job.setPartitionerClass(AreaPartitioner.class);
    60. job.setOutputKeyClass(Text.class);
    61. job.setOutputValueClass(FlowBean.class);
    62. //设置reduce的任务并发数,应该跟分组的数量保持一致
    63. job.setNumReduceTasks(6);
    64. //进程数如果大了,后面的文件为空,小了会出现错误,为1则没有分组
    65. FileInputFormat.setInputPaths(job, new Path(args[0]));
    66. FileOutputFormat.setOutputPath(job, new Path(args[1]));
    67. System.exit(job.waitForCompletion(true)?0:1);
    68. }
    69. }

    AreaPartitioner :
    1. package cn.itcast.hadoop.mr.areapartition;
    2. import java.util.HashMap;
    3. import org.apache.hadoop.mapreduce.Partitioner;
    4. public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE> {
    5. private static HashMap<String,Integer> areaMap = new HashMap<>();
    6. static{
    7. areaMap.put("135", 0);
    8. areaMap.put("136", 1);
    9. areaMap.put("137", 2);
    10. areaMap.put("138", 3);
    11. areaMap.put("139", 4);
    12. }
    13. @Override
    14. public int getPartition(KEY key, VALUE value, int numPartitions) {
    15. //从key中拿到手机号,查询手机归属地字典,不同省份返回不同的组号
    16. int areaCoder = areaMap.get(key.toString().substring(0,3))==null?5:areaMap.get(key.toString().substring(0,3));
    17. return areaCoder;
    18. }
    19. }

    运行:
    hadoop jar /root/Documents/area.jar cn.itcast.hadoop.mr.areapartition.FlowSumArea /wc/data /wc/areasoutput


    至此,mapreduce的流量统计,分组,排序工作完成了
     
     





























































    附件列表

    • 相关阅读:
      494 Target Sum 目标和
      493 Reverse Pairs 翻转对
      492 Construct the Rectangle 构建矩形
      491 Increasing Subsequences 递增子序列
      488 Zuma Game 祖玛游戏
      486 Predict the Winner 预测赢家
      485 Max Consecutive Ones 最大连续1的个数
      483 Smallest Good Base
      Django Form组件
      Django Auth组件
    • 原文地址:https://www.cnblogs.com/xiaoxiao5ya/p/c23cd7c85104ae4bc5875c798d81fb2e.html
    Copyright © 2011-2022 走看看