zoukankan      html  css  js  c++  java
  • 每日学习

    今天学习MapReduce排序:

    自定义排序 WritableComparable 原理分析
    bean 对象做为 key 传输,需要实现 WritableComparable 接口重写 compareTo 方法,就可
    以实现排序。
    @Override
    public int compareTo(FlowBean bean) {
    int result;
    // 按照总流量大小,倒序排列
    if (this.sumFlow > bean.getSumFlow()) {
    result = -1;
    }else if (this.sumFlow < bean.getSumFlow()) {
    result = 1;
    }else {
    result = 0;
    }
    return result;
    }
    全排序实现:
    FlowBean 对象在在需求 1 基础上增加了比较功能
    package com.atguigu.mapreduce.writablecompable;
    import org.apache.hadoop.io.WritableComparable;
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    public class FlowBean implements WritableComparable<FlowBean> {
    private long upFlow; //上行流量
    private long downFlow; //下行流量
    private long sumFlow; //总流量
    //提供无参构造
    public FlowBean() {
    }
    //生成三个属性的 getter 和 setter 方法
    public long getUpFlow() {
    return upFlow;
    }
    public void setUpFlow(long upFlow) {
    this.upFlow = upFlow;
    }
    public long getDownFlow() {
    return downFlow;
    }
    public void setDownFlow(long downFlow) {
    this.downFlow = downFlow;
    }
    public long getSumFlow() {
    return sumFlow;
    }
    public void setSumFlow(long sumFlow) {
    this.sumFlow = sumFlow;
    }
    public void setSumFlow() {
    this.sumFlow = this.upFlow + this.downFlow;
    }
    //实现序列化和反序列化方法,注意顺序一定要一致
    @Override
    public void write(DataOutput out) throws IOException {
    out.writeLong(this.upFlow);
    out.writeLong(this.downFlow);
    out.writeLong(this.sumFlow);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
    this.upFlow = in.readLong();
    this.downFlow = in.readLong();
    this.sumFlow = in.readLong();
    }
    //重写 ToString,最后要输出 FlowBean
    @Override
    public String toString() {
    return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }
    @Override
    public int compareTo(FlowBean o) {
    //按照总流量比较,倒序排列
    if(this.sumFlow > o.sumFlow){
    return -1;
    }else if(this.sumFlow < o.sumFlow){
    return 1;
    }else {
    return 0;
    }
    }
    }
    (2)编写 Mapper 类
    package com.atguigu.mapreduce.writablecompable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import java.io.IOException;
    public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text>
    {
    private FlowBean outK = new FlowBean();
    private Text outV = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
    //1 获取一行数据
    String line = value.toString();
    //2 按照"\t",切割数据
    String[] split = line.split("\t");
    //3 封装 outK outV
    outK.setUpFlow(Long.parseLong(split[1]));
    outK.setDownFlow(Long.parseLong(split[2]));
    outK.setSumFlow();
    outV.set(split[0]);
    //4 写出 outK outV
    context.write(outK,outV);
    }
    }
    (3)编写 Reducer 类
    package com.atguigu.mapreduce.writablecompable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import java.io.IOException;
    public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean>
    {
    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context
    context) throws IOException, InterruptedException {
    //遍历 values 集合,循环写出,避免总流量相同的情况
    for (Text value : values) {
    //调换 KV 位置,反向写出
    context.write(value,key);
    }
    }
    }
    (4)编写 Driver 类
    package com.atguigu.mapreduce.writablecompable;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import java.io.IOException;
    public class FlowDriver {
    public static void main(String[] args) throws IOException,
    ClassNotFoundException, InterruptedException {
    //1 获取 job 对象
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    //2 关联本 Driver 类
    job.setJarByClass(FlowDriver.class);
    //3 关联 Mapper 和 Reducer
    job.setMapperClass(FlowMapper.class);
    job.setReducerClass(FlowReducer.class);
    //4 设置 Map 端输出数据的 KV 类型
    job.setMapOutputKeyClass(FlowBean.class);
    job.setMapOutputValueClass(Text.class);
    //5 设置程序最终输出的 KV 类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FlowBean.class);
    //6 设置输入输出路径
    FileInputFormat.setInputPaths(job, new Path("D:\\inputflow2"));
    FileOutputFormat.setOutputPath(job, new Path("D:\\comparout"));
    //7 提交 Job
    boolean b = job.waitForCompletion(true);
    System.exit(b ? 0 : 1);
    }
    }
    作者:哦心有
    本文版权归作者和博客园共有,欢迎转载,但必须给出原文链接,并保留此段声明,否则保留追究法律责任的权利。
  • 相关阅读:
    SqlServer注意事项总结,高级程序员必背!
    C#语法——委托,架构的血液
    C#语法——泛型的多种应用
    C#——Nhibernate探索
    C#语法——await与async的正确打开方式
    谈谈我理解的SA——Systems Architecture
    C#线程安全使用(五)
    C#语法——元组类型
    架构师的御人之道
    另一个角度的架构师
  • 原文地址:https://www.cnblogs.com/haobox/p/15630818.html
Copyright © 2011-2022 走看看