zoukankan      html  css  js  c++  java
  • 进阶程序 ==> 二次排序

    mr会自动对key按字典序排序,而不会对value排序。如果想对value进行排序,就要自己手动写逻辑来实现针对value的二次排序。

    比如有这样一个文件:

    a 1
    b 3
    a 5
    d 10
    a 4
    b 8
    d 9
    c 2
    b 5
    c 1

    经过排序后,变成这样:

    a 1
    a 4
    a 5
    b 3
    b 5
    b 8
    c 1
    c 2
    d 9
    d 10

    既然mr会自动对key做排序,我们就要想办法利用它的这个特点,构造一个key。

    我们把原始的key和value组装在一起,作为新key,类似于这样:<(a,1),1> <(b,3),3> ... 

    //我们创建一个对象,这个对象如果想被mr使用,就必须要实现WritableComparable<>这个接口。需要重写它的序列化和反序列化方法,注意read和write传入属性的顺序必须一致!

    //我们还需要重写这个类的compareTo方法,先比较原始key,如果一致再比较原始value。

    package com.rabbit.hadoop.secondarysort;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    import org.apache.hadoop.io.WritableComparable;

    public class CustomWritable implements WritableComparable<CustomWritable>{

    private String key;
    private int value;

    public CustomWritable() {

    }

    public CustomWritable(String key,int value) {
    set(key, value);
    }

    public void set(String key,int value) {
    this.key = key;
    this.value = value;
    }

    public String getKey() {
    return key;
    }

    public void setKey(String key) {
    this.key = key;
    }

    public Integer getValue() {
    return value;
    }

    public void setValue(Integer value) {
    this.value = value;
    }

    /*
    * x序列化
    * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
    */
    public void write(DataOutput out) throws IOException {
    out.writeUTF(key);
    out.writeInt(value);
    }

    /*
    * 反序列化
    * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
    */
    public void readFields(DataInput in) throws IOException {
    this.key = in.readUTF();
    this.value = in.readInt();
    }

    public int compareTo(CustomWritable o) {
    int result = this.getKey().compareTo(o.getKey());

    //原始key相等,就比较原始value值,做二次排序。
    if (result == 0) {
    return Integer.valueOf(this.value).compareTo(Integer.valueOf(o.getValue()));

    }

    return result;
    }

    @Override
    public String toString() {
    return "CustomWritable [key=" + key + ", value=" + value + "]";
    }

    }

    重新定义了key之后,可以保证排序的正确性,但是分组是会出问题的。因为我们最后还是要 (a,1),(a,4),(a,5),(b,3),(b,5)...这种形式的结果。如果按照默认的分组方式,不会把所有的a分在一个组。因为它现在的key是(a,1),(b,3)这种东西,肯定不能把所有的key为 a 的正确的分到一个f里。所以必须重写分组方法,还是按照原始的key分组。

    //实现RawComparator,重写compare()方法,传入我们自定义的类。

    package com.rabbit.hadoop.secondarysort;

    import org.apache.hadoop.io.RawComparator;
    import org.apache.hadoop.io.WritableComparator;

    public class CustomComparator implements RawComparator<CustomWritable>{

    public int compare(CustomWritable o1, CustomWritable o2) {
    return o1.getKey().compareTo(o2.getKey());
    }

    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    return WritableComparator.compareBytes(b1, 0, l1-4, b2, 0, l2-4);
    }

    }

    //分组不对,分区也不对。原因跟分组差不多,a 和 (a,1) 对 reduce个数取模的结果几乎肯定不一样。如果想按照原始key正确分区,就需要重新定义分区方法。

    //创建一个分区类,继承Partitioner<>,重写getPartition()方法。

    package com.rabbit.hadoop.secondarysort;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Partitioner;

    public class CustomPartitioner extends Partitioner<CustomWritable, IntWritable>{

    @Override
    public int getPartition(CustomWritable key, IntWritable value, int numPartitions) {

    return key.getKey().hashCode() & Integer.MAX_VALUE % numPartitions;
    }

    }

    到这里,前期的准备工作就算完成了。接下来就是按模板,重写map() 方法和 reduce()方法了。

    //重写map

    package com.rabbit.hadoop.secondarysort;

    import java.io.IOException;
    import java.util.StringTokenizer;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    //输出key为我们自定义的类,输出value还是原始的value

    public class SecondarySortMapper extends Mapper<LongWritable, Text, CustomWritable, IntWritable> {

    private CustomWritable outputKey = new CustomWritable();
    private IntWritable outputValue = new IntWritable();

    /**
    * a 1 <(a 1),1> a 100 <(a 100),100> a 3 b 2 b 1 b 5 c 3
    */

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

    String line = value.toString();

    StringTokenizer stringTokenizer = new StringTokenizer(line, " ");

    while (stringTokenizer.hasMoreTokens()) {

    String string = stringTokenizer.nextToken();
    //这里我偷了个懒,不想写 if 判断了,写了个取巧的办法。在解析的时候,如果是能正确的解析成Integer,那么说明当前拿到的这个field是value。如果不能正确的解析成Integer,说明拿到的是key。
    try {
    Integer parseValue = Integer.parseInt(string);
    outputKey.setValue(parseValue);
    outputValue.set(parseValue);

    } catch (Exception e) {
    outputKey.setKey(string);
    }
    }

    context.write(outputKey, outputValue);

    }

    }

    //重写reduce

    package com.rabbit.hadoop.secondarysort;

    import java.io.IOException;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    //map的输出就是reduce的输入。所以reduce输入的key就是我们自定义的类,输入的value是原始value

    public class CustomReducer extends Reducer <CustomWritable, IntWritable, Text, IntWritable>{

    private Text outputKey = new Text();

    /**
    * <a,[1,3,100]>
    */
    @Override
    protected void reduce(CustomWritable key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

    String oriKey = key.getKey(); //拿到原始key

    for(IntWritable value : values) {
    outputKey.set(oriKey);
    context.write(outputKey, value); //写出原始key和原始value
    }
    }

    }

    //写个Driver类,调用一下

    package com.rabbit.hadoop.secondarysort;


    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;


    public class SecondarySortDriver extends Configured implements Tool {

    public int run(String[] args) throws Exception {

    Configuration configuration = getConf();

    Job job = Job.getInstance(configuration,this.getClass().getSimpleName());

    job.setJarByClass(SecondarySortDriver.class);

    job.setPartitionerClass(CustomPartitioner.class);
    job.setGroupingComparatorClass(CustomComparator.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.setMapperClass(SecondarySortMapper.class);
    job.setMapOutputKeyClass(CustomWritable.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setReducerClass(CustomReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    // job.setNumReduceTasks(4);  //测试一下能否正确分区

    boolean issuccess = job.waitForCompletion(true);
    return issuccess ? 0 : 1;
    }


    public static void main(String[] args) throws Exception {

    //我的程序是在本地windows环境下跑的,输入输出文件都在windows文件系统。如果要放到HDFS集群上,需要指定参数,configuration.set("fs.defaultFS", "hdfs://bd27-server.ibeifeng.com:8020")

    Configuration configuration = new Configuration();
    args = new String[] {"D:\input\sec-sort.txt","D:\output5"};

    int status = ToolRunner.run(configuration, new SecondarySortDriver(), args);

    System.exit(status);
    }

    }

  • 相关阅读:
    申请Namecheap的.me 顶级域名以及申请ssl认证--github教育礼包之namecheap
    Floodlight+Mininet的SDN实验平台搭建初探
    在DigitalOcean vps中安装vnstat监控流量,浏览器打开php代码。。。
    知识积累
    vue路由跳转(新开页面 当前页面)
    手机访问电脑本地静态页面
    Vue:scoped与module的使用与利弊
    小技巧
    布局记录
    vue项目及vuex使用
  • 原文地址:https://www.cnblogs.com/rabbit624/p/10553189.html
Copyright © 2011-2022 走看看