zoukankan      html  css  js  c++  java
  • MapReduce程序开发之流量求和(八)


    1.分析记录手机流量的日志。

    2.拿到日志中的一行数据,切分各个字段,抽取出我们需要的字段:手机号,上行流量,下行流量,然后封装成kv发送出去

    3.使用java中的map方法;

    public class FlowNumMapper extends Mapper<LongWritable,Text,Text,FlowBean> {

    @Override

    protected void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{

    //拿一行数据

    String line = value.toString();

    //切分成各个字段

    String[] fields=StringUtils.split(line, " ");

    String phoneNB=fields[1];

    long u_flow=Long.parseLong(fields[7]);

    long d_flow=Long.parseLong(fields[8]);

    //封装数据为KV并输出

    context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));

    }

    }

    4.在map方法中FlowBean参数传递的是一个序列化实体。

    package hadoop.mr.flownum;

    import java.io.DataInput;

    import java.io.DataOutput;

    import java.io.IOException;

    import org.apache.hadoop.io.Writable;

    public class FlowBean implements Writable {

    private String phoneNB;

    private long up_flow;

    private long d_flow;

    private long s_flow;

    // 在反序列化时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数

    public FlowBean() {

    }

    // 为了对象数据的初始化方便,加入一个带参数的构造函数

    public FlowBean(String phoneNB, long up_flow, long d_flow) {

    this.phoneNB = phoneNB;

    this.up_flow = up_flow;

    this.d_flow = d_flow;

    this.s_flow = up_flow + d_flow;

    }

    public String getPhoneNB() {

    return phoneNB;

    }

    public void setPhoneNB(String phoneNB) {

    this.phoneNB = phoneNB;

    }

    public long getUp_flow() {

    return up_flow;

    }

    public void setUp_flow(long up_flow) {

    this.up_flow = up_flow;

    }

    public long getD_flow() {

    return d_flow;

    }

    public void setD_flow(long d_flow) {

    this.d_flow = d_flow;

    }

    public long getS_flow() {

    return s_flow;

    }

    public void setS_flow(long s_flow) {

    this.s_flow = s_flow;

    }

    // 将对象数据序列化对流中

    @Override

    public void write(DataOutput out) throws IOException {

    out.writeUTF(phoneNB);

    out.writeLong(up_flow);

    out.writeLong(d_flow);

    out.writeLong(s_flow);

    }

    // 从数据流中反序列出对象的数据

    // 从数据流中读出对象字段时,必须跟序列化时的顺序保持一样

    @Override

    public void readFields(DataInput in) throws IOException {

    phoneNB = in.readUTF();

    up_flow = in.readLong();

    d_flow = in.readLong();

    s_flow = in.readLong();

    }

    @Override

    public String toString(){

    return ""+up_flow+" "+d_flow+" "+s_flow;

    }

    }

    5.传一组数据调用一次我们的reduce方法,reduce中的业务逻辑就是遍历values,然后进行累加求和输出.

    package hadoop.mr.flownum;

    import java.io.IOException;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Reducer;

    public class FlowNumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

    @Override

    protected void reduce(Text key, Iterable<FlowBean> values, Context context)

    throws IOException, InterruptedException {

    long up_flow_counter=0;

    long d_flow_counter=0;

    for (FlowBean bean : values) {

    up_flow_counter +=bean.getD_flow();

    d_flow_counter+=bean.getD_flow();

    }

    context.write(key, new FlowBean(key.toString(),up_flow_counter,d_flow_counter));

    }

    }

    6.job提交:

    package hadoop.mr.flownum;

    import org.apache.hadoop.util.Tool;

    import org.apache.hadoop.util.ToolRunner;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.conf.Configured;

    public class FlowNumRunner extends Configured implements Tool {

    @Override

    public int run(String[] args) throws Exception {

    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf);

    job.setJarByClass(FlowNumRunner.class);

    job.setMapperClass(FlowNumMapper.class);

    job.setMapOutputKeyClass(FlowNumReducer.class);

    job.setMapOutputKeyClass(Text.class);

    job.setMapOutputValueClass(FlowBean.class);

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(FlowBean.class);

    FileInputFormat.setInputPaths(job, new Path(args[0]));

    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    return job.waitForCompletion(true) ? 0 : 1;

    }

    public static void main(String[] args) throws Exception {

    int res = ToolRunner.run(new Configuration(), new FlowNumRunner(), args);

    System.exit(res);

    }

    }

    7.对mapreduce进行打包。
    8.把打包的jar包上传到虚拟机,把要统计的日志上传到hadoop

     hadoop fs -put HTTP_20130313143750.dat /flow/data

    在hadoop中执行flow.jar结果输出到flow/output文件下

    hadoop jar flow.jar hadoop.mr.flownum.FlowNumRunner /flow/data /flow/output


    9.执行hadoop fs -cat /flow/output/part-r-00000命令查询里面输出的内容,对日志里面的内容统计如下:




  • 相关阅读:
    【面积并】 Atlantis
    【动态前k大 贪心】 Gone Fishing
    【复杂枚举】 library
    【双端队列bfs 网格图建图】拯救大兵瑞恩
    【奇偶传递关系 边带权】 奇偶游戏
    【权值并查集】 supermarket
    CF w4d3 A. Pythagorean Theorem II
    CF w4d2 C. Purification
    CF w4d2 B. Road Construction
    CF w4d2 A. Cakeminator
  • 原文地址:https://www.cnblogs.com/SamllBaby/p/5695505.html
Copyright © 2011-2022 走看看