zoukankan      html  css  js  c++  java
  • MapReduce中的排序(附代码)

    在直接学习hadoop的排序之前还要了解一些基本知识。

    Hadoop的序列化和比较接口

    Hadoop的序列化格式:Writable

    Writable是Hadoop自己的序列化格式,还要一个子接口是WritableComparable<T>,

    public interface WritableComparable<T> extends Writable, Comparable<T>

    这样一来WritableComparable接口不仅有序列化的功能,还可以进行比较。

    排序在MapReduce是很重要的一个方面,因为MapReduce有一个基于键的排序过程,所以可以作为键的类型必须具有Comparable<T>的特性。

    除了WritableComparable接口之外,还有一个叫做RawComparator的接口。

    WritableComparable和RawComparator的区别是:

    WritableComparable需要把数据反序列化为对象,然后做对象之间的比较;RawComparator是直接比较数据流的数据,不需要数据反序列化成对象,从而省去了新建对象的开销。

    Hadoop中key的排序逻辑

    Hadoop中key的数据类型的排序逻辑其实就是按照WritableComparable<T>的基本数据类型和其他类型的compareTo方法的定义。

    Key的排序规则:

    1. 如果调用了jobconf的setOutputKeyComparatorClass(),那么就使用设置的;
    2. 否则,采用实现接口WritableComparable的compareTo()来比较大小并排序。

    例如IntWritable的比较算法

    public int compareTo(Object o) {  
        int thisValue = this.value;  
        int thatValue = ((IntWritable)o).value;  
        return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));  
    }

    我们可以根据自己的需要来修改compareTo()这个方法来实现自己的比价算法。

    在Hadoop内部是自动调用这个方法来进行比较的,不需要我们手动干预,而且排序只是局限于map或者reduce内部。针对于map和map之间,reduce与reduce之间的排序compareTo就管不着了。虽然这种情况不常出现,但是还是有,比如全排序。

    全排序

    如果遇到了全排序,就要关注Partition这个阶段,Partition阶段是针对每个Reduce创建一个分区,然后把Map的输出结果映射到特定的分区中,这个分区中可能会有N个Key对应的数据,但是一个Key只能出现在一个分区中。在实现全排序的过程中,如果只有一个Reduce,也就是只有一个Partition,那么所有Map的输出都会经过一个Partition到一个reduce里,一个reduce里就可以根据compareTo来排序,这样就实现了全排序。但是这样不好的地方就是你全排序都是在一个Reduce中做的,这还叫毛的分布式计算啊。

    因此,为了take advantage of distributed computing,我们需要使用多个Reduce,也就是有多个分区,而且这些分区的特点是两个已经排序完毕的partition1和partition2,partition1中的根据排序规则要么全部大于partition2,要么全部小于partition2。

    这样做可以利用到分布式计算的特性,但是仍然要考虑到的是分区不均的问题,也就是一个partition中的数要远比其他的partition中的多,这样的话对分布式计算的效果也会大打折扣。解决这个问题的方法就是抽样,目的就是使partition更加均匀。

    二次排序

    代码实例

    需求:现在有这样类型的文件若干

    file1.txt

    2
    32
    654
    32
    15
    756
    65223

    file2.txt

    5956
    22
    650
    92

    file3.txt

    26
    54
    6

    现在希望你用Hadoop进行处理并得到如下结果:

    1     2
    2     6
    3     15
    4     22
    5     26
    6     32
    7     32
    8     54
    9     92
    10    650
    11    654
    12    756
    13    5956
    14    65223

    注:本代码是在Hadoop 2.7.4真实分布式环境下执行通过的。

    当然最简单的就是多个Map,一个Reduce;因为Reduce在处理的时候是默认的会排序的。所以可以得到一个最简单的版本。

    Version1:最简单版本,多个Map,单个Reduce

    package com.tuhooo;
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.Partitioner;
    import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
    import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class Sort {
    
    
        // map将输入中的value化成IntWritable类型, 作为输出的key
        public static class Map extends Mapper<Object, Text, IntWritable, IntWritable> {
    
            private static IntWritable data = new IntWritable();
            // 实现map函数
            @Override
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                if(line != null && line.length() != 0 && !"".equals(line)) {
                    data.set(Integer.parseInt(line));
                    context.write(data, new IntWritable(1));
                }
            }
        }
    
        // reduce会对map传进来的值进行排序
        public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
    
            private static IntWritable linenum = new IntWritable(1);
    
            @Override
            public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                for(IntWritable val : values) {
                    context.write(linenum, key);
                    linenum = new IntWritable(linenum.get() + 1);
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
    
            // 校验参数是否正确
            if (args.length != 2) {
                System.err.println("Usage: MaxTemperature <input path> <output path>");
                System.exit(-1);
            }
    
            Job job = new Job();
            job.setJarByClass(Sort.class); // main Class
            job.setJobName("Sort");
    
            FileInputFormat.addInputPath(job, new Path(args[0]));   // 输入数据的目录
            FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出数据的目录
    
            job.setMapperClass(Map.class);
            job.setReducerClass(Reduce.class);
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(IntWritable.class);
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

    通过以上的讲解我们知道这得分区啊,于是有了下面的版本

    Version2:M个Map,N个Reduce,N个Partition,均值分区不采样

    package com.tuhooo;
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.Partitioner;
    import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
    import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class Sort {
    
    
        // map将输入中的value化成IntWritable类型, 作为输出的key
        public static class Map extends Mapper<Object, Text, IntWritable, IntWritable> {
    
            private static IntWritable data = new IntWritable();
            // 实现map函数
            @Override
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                if(line != null && line.length() != 0 && !"".equals(line)) {
                    data.set(Integer.parseInt(line));
                    context.write(data, new IntWritable(1));
                }
            }
        }
    
        // reduce会对map传进来的值进行排序
        public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
    
            private static IntWritable linenum = new IntWritable(1);
    
            @Override
            public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                for(IntWritable val : values) {
                    context.write(linenum, key);
                    linenum = new IntWritable(linenum.get() + 1);
                }
            }
        }
        
        public static class MyPartition extends Partitioner<IntWritable, IntWritable> {
            
            @Override
            public int getPartition(IntWritable key, IntWritable value, int numTaskReduce) {
    
                int maxNumber = 65223;    // 样本数据中的最大值
                int part = maxNumber/numTaskReduce + 1;
                int keyNum = key.get();
                for(int i=0; i<numTaskReduce; i++) {
    
                    if(keyNum >= part*i && keyNum <= part*(i+1)) {
                        return i;
                    }
                }
                return -1;     // 如果没有出现在分区里面就会返回-1, 如果返回-1肯定是要报错的
            }
        }
        public static void main(String[] args) throws Exception {
    
            // 校验参数是否正确
            if (args.length != 2) {
                System.err.println("Usage: MaxTemperature <input path> <output path>");
                System.exit(-1);
            }
            Job job = new Job();
            job.setJarByClass(Sort.class); // main Class
            job.setJobName("Sort");
    
            FileInputFormat.addInputPath(job, new Path(args[0]));   // 输入数据的目录
            FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出数据的目录
            
            job.setNumReduceTasks(3);                     // Reduce任务数为3
            job.setPartitionerClass(MyPartition.class);   // 设置分区的类
    
            job.setMapperClass(Map.class);
            job.setReducerClass(Reduce.class);
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(IntWritable.class);
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

    Version2的代码基本上可以实现分区排序的功能,但是并没有采样啊,就可能会导致分区不均匀导致有些task任务很重,所以我们要加入采样。因此有了Version3。

    Version3:M个Map,N个Reduce,N个Partition(此时采用的是TotalSortPartitioner,并且随机采样)

    package com.tuhooo;
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.Partitioner;
    import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
    import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class Sort {
    
        // map将输入中的value化成LongWritable类型, 作为输出的key
        public static class Map extends Mapper<Object, Text, LongWritable, LongWritable> {
    
            private static LongWritable data = new LongWritable();
            // 实现map函数
            @Override
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                if(line != null && line.length() != 0 && !"".equals(line)) {
                    data.set(Integer.parseInt(line));
                    context.write(data, new LongWritable(1));
                }
            }
        }
    
        public static class Reduce extends Reducer<LongWritable, LongWritable, LongWritable, LongWritable> {
    
            private static LongWritable linenum = new LongWritable(1);
    
            @Override
            public void reduce(LongWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
                for(LongWritable val : values) {
                    context.write(linenum, key);
                    linenum = new LongWritable(linenum.get() + 1);
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
    
            if (args.length != 3) {
                System.err.println("Usage: MaxTemperature <input path> <output path> <partition path>");
                System.exit(-1);
            }
    
            Job job = new Job();
            job.setJarByClass(Sort.class); // main Class
            job.setJobName("Sort");
    
            job.setNumReduceTasks(3);
    
            FileInputFormat.addInputPath(job, new Path(args[0])); // 输入数据的目录
            FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出数据的目录
    
            TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(args[2]));
            InputSampler.Sampler<LongWritable, LongWritable> sampler = new InputSampler.RandomSampler<LongWritable, LongWritable>(0.5, 5, 2);
            InputSampler.writePartitionFile(job, sampler);
    
            job.setMapperClass(Map.class);
            job.setReducerClass(Reduce.class);
    
            // 这里换成了TotalOrderPartitioner
            job.setPartitionerClass(TotalOrderPartitioner.class);
    
            // job.setPartitionerClass(MyPartition.class);
            job.setOutputKeyClass(LongWritable.class);
            job.setOutputValueClass(LongWritable.class);
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

    在编写并运行Version3的时候踩了很多坑:

    1. 随机采样的参数设置问题,由于我的样本文件中的数据很少,因此我猜测采样出来的数也很少,导致报异常:数组越界,

    此时我采用的参数为:

    InputSampler.Sampler<LongWritable, LongWritable> sampler = new InputSampler.RandomSampler<LongWritable, LongWritable>(0.01, 500, 10);

    我感觉是没有采到样导致的。

    17/11/16 17:31:51 INFO input.FileInputFormat: Total input paths to process : 3
    17/11/16 17:31:51 INFO partition.InputSampler: Using 0 samples
    17/11/16 17:31:51 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
    17/11/16 17:31:51 INFO compress.CodecPool: Got brand-new compressor [.deflate]
    Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0
             at org.apache.hadoop.mapreduce.lib.partition.InputSampler.writePartitionFile(InputSampler.java:340)
             at com.tuhooo.Sort.main(Sort.java:67)
             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
             at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
             at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
             at java.lang.reflect.Method.invoke(Method.java:498)
             at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
             at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

    2. 如果你有发现的话,最开始在代码Version1和代码Version2中,我用的都是IntWritable,但是在代码Version3中我用的是LongWritable,这是为啥捏。如果我继续在Version3中使用IntWritable的时候,采样文件的结果是这样的:

    [root@hadoop Sort]# hdfs dfs -cat /partitionFile_heheda
    SEQ!org.apache.hadoop.io.LongWritable!org.apache.hadoop.io.NullWritable*org.apache.hadoop.io.compress.DefaultCodec▒]▒S▒3n▒l▒▒▒▒Axx▒[root@hadoop Sort]#

    后来一直报错就是IntWritable不是LongWritable,就是下面这样的:

    Error: java.lang.IllegalArgumentException: Can't read partitions file
             at org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.setConf(TotalOrderPartitioner.java:116)
             at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:76)
             at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)
             at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:702)
             at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770)
             at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
             at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
             at java.security.AccessController.doPrivileged(Native Method)
             at javax.security.auth.Subject.doAs(Subject.java:422)
             at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
             at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
    Caused by: java.io.IOException: wrong key class: org.apache.hadoop.io.IntWritable is not class org.apache.hadoop.io.LongWritable
             at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2329)
             at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2381)
             at org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.readPartitions(TotalOrderPartitioner.java:306)
             at org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.setConf(TotalOrderPartitioner.java:88)
             ... 10 more

    大数据路长漫漫~~~

  • 相关阅读:
    银行业务调度系统
    交通灯管理系统
    Java高新技术
    Java反射机制
    java的集合框架
    正则表达式
    IPD CBB
    TCP的可靠传输(依赖流量控制、拥塞控制、连续ARQ)
    等价类划分
    Pycharm常用配置汇总
  • 原文地址:https://www.cnblogs.com/tuhooo/p/7833691.html
Copyright © 2011-2022 走看看