zoukankan      html  css  js  c++  java
  • Hadoop MapReduce编程 API入门系列之网页流量版本1(二十一)

      不多说,直接上代码。

      对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件。

     

    代码

    package zhouls.bigdata.myMapReduce.areapartition;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.io.WritableComparable;

    public class FlowBean implements WritableComparable<FlowBean>{


    private String phoneNB;
    private long up_flow;
    private long d_flow;
    private long s_flow;

    //在反序列化时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数
    public FlowBean(){}

    //为了对象数据的初始化方便,加入一个带参的构造函数
    public FlowBean(String phoneNB, long up_flow, long d_flow) {
    this.phoneNB = phoneNB;
    this.up_flow = up_flow;
    this.d_flow = d_flow;
    this.s_flow = up_flow + d_flow;
    }

    public String getPhoneNB() {
    return phoneNB;
    }

    public void setPhoneNB(String phoneNB) {
    this.phoneNB = phoneNB;
    }

    public long getUp_flow() {
    return up_flow;
    }

    public void setUp_flow(long up_flow) {
    this.up_flow = up_flow;
    }

    public long getD_flow() {
    return d_flow;
    }

    public void setD_flow(long d_flow) {
    this.d_flow = d_flow;
    }

    public long getS_flow() {
    return s_flow;
    }

    public void setS_flow(long s_flow) {
    this.s_flow = s_flow;
    }



    //将对象数据序列化到流中
    public void write(DataOutput out) throws IOException {

    out.writeUTF(phoneNB);
    out.writeLong(up_flow);
    out.writeLong(d_flow);
    out.writeLong(s_flow);

    }


    //从数据流中反序列出对象的数据
    //从数据流中读出对象字段时,必须跟序列化时的顺序保持一致
    public void readFields(DataInput in) throws IOException {

    phoneNB = in.readUTF();
    up_flow = in.readLong();
    d_flow = in.readLong();
    s_flow = in.readLong();

    }


    @Override
    public String toString() {

    return "" + up_flow + " " +d_flow + " " + s_flow;
    }

    public int compareTo(FlowBean o) {
    return s_flow>o.getS_flow()?-1:1;
    }

    }

    package zhouls.bigdata.myMapReduce.areapartition;

    import java.util.HashMap;

    import org.apache.hadoop.mapreduce.Partitioner;

    public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{

    private static HashMap<String,Integer> areaMap = new HashMap<>();

    static{
    areaMap.put("135", 0);
    areaMap.put("136", 1);
    areaMap.put("137", 2);
    areaMap.put("138", 3);
    areaMap.put("139", 4);
    }





    @Override
    public int getPartition(KEY key, VALUE value, int numPartitions) {
    //从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号

    int areaCoder = areaMap.get(key.toString().substring(0, 3))==null?5:areaMap.get(key.toString().substring(0, 3));

    return areaCoder;
    }

    }

    package zhouls.bigdata.myMapReduce.areapartition;

    import java.io.IOException;

    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;


    import zhouls.bigdata.myMapReduce.areapartition.FlowBean;


    /**
    * 对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件
    * 需要自定义改造两个机制:
    * 1、改造分区的逻辑,自定义一个partitioner
    * 2、自定义reduer task的并发任务数
    *
    *
    *
    */
    public class FlowSumArea implements Tool {

    public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

    @Override
    protected void map(LongWritable key, Text value,Context context)
    throws IOException, InterruptedException {

    //拿一行数据
    String line = value.toString();
    //切分成各个字段
    String[] fields = StringUtils.split(line, " ");

    //拿到我们需要的字段
    String phoneNB = fields[1];
    long u_flow = Long.parseLong(fields[7]);
    long d_flow = Long.parseLong(fields[8]);

    //封装数据为kv并输出
    context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));

    }


    }


    public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean>{

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values,Context context)
    throws IOException, InterruptedException {

    long up_flow_counter = 0;
    long d_flow_counter = 0;

    for(FlowBean bean: values){

    up_flow_counter += bean.getUp_flow();
    d_flow_counter += bean.getD_flow();


    }

    context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter));



    }

    }


    public int run(String[] arg0) throws Exception {


    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);

    job.setJarByClass(FlowSumArea.class);

    job.setMapperClass(FlowSumAreaMapper.class);
    job.setReducerClass(FlowSumAreaReducer.class);

    //设置我们自定义的分组逻辑定义
    job.setPartitionerClass(AreaPartitioner.class);


    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FlowBean.class);

    //设置reduce的任务并发数,应该跟分组的数量保持一致
    job.setNumReduceTasks(1);


    FileInputFormat.addInputPath(job, new Path(arg0[0]));// 文件输入路径
    FileOutputFormat.setOutputPath(job, new Path(arg0[1]));// 文件输出路径
    job.waitForCompletion(true);

    return 0;

    }

    public static void main(String[] args) throws Exception {

    //集群路径
    // String[] args0 = { "hdfs://HadoopMaster:9000/flowSumArea/HTTP_20130313143750.dat",
    // "hdfs://HadoopMaster:9000/out/flowSumArea"};

    //集群路径
    String[] args0 = { "./data/flowSumArea/HTTP_20130313143750.dat",
    "./out/flowSumArea/"};

    int ec = ToolRunner.run( new Configuration(), new FlowSumArea(), args0);
    System. exit(ec);

    }

    @Override
    public Configuration getConf() {
    // TODO Auto-generated method stub
    return null;
    }

    @Override
    public void setConf(Configuration arg0) {
    // TODO Auto-generated method stub

    }


    }

  • 相关阅读:
    npm安装appium server路过的坑
    Android开发相关工具(eclipse篇)
    第一个nodejs程序
    淘宝密码(爬虫爬取淘宝美食的数据源码)
    jetbrains全系列可用例:IDEA、WebStorm、phpstorm、clion等激活到2099
    python爬虫快递查询系统(源码)
    python中利用上下文管理器来实现mysql数据库的封装
    关于js渲染网页时爬取数据的思路和全过程(附源码)
    python——mysql京东数据库设计案例(源码)
    pycharm安装mysql驱动包
  • 原文地址:https://www.cnblogs.com/zlslch/p/6165864.html
Copyright © 2011-2022 走看看