zoukankan      html  css  js  c++  java
  • 9.2.3 hadoop辅助排序(二次排序)数据实例详解,idea测试hadoop二次排序通过

    1.1.1         辅助排序(二次排序)

    1)二次排序定义

    通常情况下我们只对键进行排序,例如(年份,温度)组成的键值对,我们通常只对key年份进行排序,如果先按照年份排好序,还要求年份相同的再按照温度进行进行逆序排列,像这样先按照第一字段进行排序,然后再对第一字段相同的行按照第二字段排序,我们称为二次排序

    2)组合键定义

    因为排序都是针对键的排序,现在要求按照两个字段进行排序,那么可以定义一个对象,包含两个字段,并且把这个对象作为map的输出键,就可以实现组合键的排序。如果map输出值的话不重要,就设置为NullWritable对象。因为这个对象要作为map的键,而且还要能够进行比较,所以对象要实现WritableComparable接口。定义一个两个字段的Class如下,first保存年份,second保存温度。实现序列化、反序列化接口、对比接口。对比接口是比较两个字段,按照第一字段升序排列,第一字段相同的按第二字段逆序排列。

    package Temperature;


    import org.apache.hadoop.io.WritableComparable;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    public class IntPair implements WritableComparable<IntPair> {
        long first;
        double second;

        public IntPair()
        {

        }
        public IntPair(long first, double second) {
            this.first = first;
            this.second = second;
        }

        @Override
        public int compareTo(IntPair o) {
            if (first!=o.getFirst()) {
                return (int) (first-o.getFirst());
            }
            return (second-getSecond())==0? 0:(second-getSecond())>0? 1:-1;
        }

        @Override
        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeLong(first);
            dataOutput.writeDouble(second);
        }

        @Override
        public void readFields(DataInput dataInput) throws IOException {
            first=dataInput.readLong();
            second=dataInput.readDouble();
        }

        public long getFirst() {
            return first;
        }

        public void setFirst(long first) {
            this.first = first;
        }

        public double getSecond() {
            return second;
        }

        public void setSecond(double second) {
            this.second = second;
        }
    }

    3)二次排序实例

    需求:求出下列数据中气象站每年的最高气温,下面只是为了说明,实际肯定不只6行数据。6行数据保存在三个不同文件中。

    文件1

    1900~34

    1901~17

    1900~27

    文件2

    1901~11

    1900~40

    文件3

    1902~13

    1902~26

    实现步骤:

    1)定义map类,读取三个文件中的数据,定义组合键类FirstSecondPair,将年份写入第一个字段,温度写入第二个字段,实现CompareTo函数,按照年份升序,按照温度降序排列。

    public static class SecondSortMaper extends Mapper<LongWritable, Text, IntPair, NullWritable>
    {
        @Override
        //map的作用是解析记录,得到年和 温度,组成组合键输出
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntPair, NullWritable>.Context context) throws IOException, InterruptedException {
            String strValue=value.toString();
            String values[]=strValue.split("~");
            int year=Integer.parseInt(values[0]);
            double temperature=Double.parseDouble(values[1]);
            //map输出默认按照键排序,也就是按照FirstSecondPair的CompareTo定义的排序规则排序
            context.write(new IntPair(year,temperature),NullWritable.get());
        }
    }

    map处理后输出数据应该是如下这样。

    1900 40        NULL

    1900 34       NULL

    1900 27       NULL

    1901 17       NULL

    1901 11       NULL

    1902 26       NULL

    1902 13       NULL

    2)如果是大量数据的话,需要对多个reduce任务来处理,为了让年份相同的数据被同一个reduce处理,这样才能找出每一年的最高气温,需要定义分区类Partitioner,让年份相同数据进入同一个分区。按组合键第一字段分区的类定义如下:

    //按照组合键的第一个字段年份进行分区,让相同年份的数据被同一个分区处理,才能比较出每一年的最高气温
    public static class FirstPartitioner extends Partitioner<IntPair,NullWritable>{
        @Override
        public int getPartition(IntPair intPair, NullWritable nullWritable, int numPartition) {
            //numPartition是分区数,根据年份取余得到分区编号,可以保证年份相同记录进入通一个分区,但一个分区内有多个年份的记录。
            //例如1900~2000年的数据,numPartition为10,则1900,1910,1930……2000这些取余后都为0,这些年份的记录都会放入分区0,
            //需要对分区0中的记录按照年份分组,setGroupingComparatorClass就起到了这个分组的作用,分组后再按温度逆序排序
            int partition=(int)(intPair.getFirst()%numPartition);
            return partition;
        }
    }

    例如采用两个reduce任务,就会有两个分区,分区函数采用年份取余2。那么1900和1902年的数据都会被分到第一个reduce,1901年的数据被分到第二个reduce。shuftle分区后的数据如下所示:

    分区1

    1900 40       NULL

    1900 34       NULL

    1900 27       NULL

    1902 26       NULL

    1902 13       NULL

    分区2

    1901 17       NULL

    1901 11       NULL

    3)在分区1中有1900和1902两个年份的数据,为了得到每一年的最高气温,需要对同一个分区的数据,按照年份进行分组,每一组的第一条数据就是我们想要的每年最高的气温。这时候就需要用GroupingComparator来实现分组,就是把年份相同的分为一组。分组类定义如下:

    //一个reduce中有多个年份的数据,按照年份进行分组,分组后在按键进行聚合,年份相同的键都被认为是同一个键,聚合为第一个键,值都是NULL
    public static class ReducerGroupingComparator  extends WritableComparator
    {
        public ReducerGroupingComparator()
        {
            super(IntPair.class,true);
        }
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            IntPair pairA=(IntPair)a;
            IntPair pairB=(IntPair)b;
            //年份相同,返回值为0的分为一组
            return (int) (pairA.getFirst()-pairB.getFirst());
        }
    }

    分区1中的数据分组之后变为

    组1

    1900 40        NULL

    1900 34        NULL

    1900 27        NULL

    组2

    1902 26        NULL

    1902 13        NULL

    分组并不是分为一组那么简单,还有按键聚合的功能。组1中三条记录虽然三个键都不相同<1900 40>、<1900 34>、<1900 27>,但是ReducerGroupingComparator进行键的比较只是按照第一字段年份进行比较,所以是相同的键,所以年份相同的键会被合并,即为第一个键<1900 40>,三条记录的值也会合并,变为<NULL,NULL,NULL>。这里的值并不重要。所以按键聚合之后数据为

    组1

    1900 40        <NULL,NULL,NULL>

    组2

    1902 26        <NULL,NULL >

    4)分组之后再按键聚合的数据就是我们要获取的的每年的最高气温值,还要通过reduce函数排个序。在将键传给reduce函数的key,值组成的values的迭代器iterator传给reduce方法的入参values,这里的值其实不重要,直接将key写入输出文件,默认就会按照键IntPair排序(年份升序,没有年份相同的气温)。

    public static class SecondSortReducer extends Reducer<IntPair,NullWritable, LongWritable, DoubleWritable>
    {
        //Reducer<LongWritable, Text, FirstSecondPair, NullWritable>.Context 一定要这么写,否则会报异常
        protected void reduce(IntPair key, Iterable<NullWritable> values, Reducer<IntPair,NullWritable, LongWritable, DoubleWritable>.Context  context) throws IOException, InterruptedException {
            context.write(new LongWritable(key.getFirst()),new DoubleWritable(key.getSecond()));
        }
    }

    reduce处理之后的的数据结果是

    1900 40

    1901 17

    1902 26

    (4)详细的二次排序代码实例

    package Temperature;

            import org.apache.hadoop.conf.Configuration;
            import org.apache.hadoop.conf.Configured;
            import org.apache.hadoop.fs.FileUtil;
            import org.apache.hadoop.fs.Path;
            import org.apache.hadoop.io.*;
            import org.apache.hadoop.mapreduce.Job;
            import org.apache.hadoop.mapreduce.Mapper;
            import org.apache.hadoop.mapreduce.Partitioner;
            import org.apache.hadoop.mapreduce.Reducer;
            import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
            import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
            import org.apache.hadoop.util.Tool;
            import org.apache.hadoop.util.ToolRunner;

            import java.io.File;
            import java.io.IOException;

    public class MaxTemperatureUsingSecondSort extends Configured implements Tool {

        public static class SecondSortMaper extends Mapper<LongWritable, Text, IntPair, NullWritable>
        {
            @Override
            //map的作用是解析记录,得到年和 温度,组成组合键输出
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntPair, NullWritable>.Context context) throws IOException, InterruptedException {
                String strValue=value.toString();
                String values[]=strValue.split("~");
                int year=Integer.parseInt(values[0]);
                double temperature=Double.parseDouble(values[1]);
                //map输出默认按照键排序,也就是按照FirstSecondPair的CompareTo定义的排序规则排序
                context.write(new IntPair(year,temperature),NullWritable.get());
            }
        }
        //按照组合键的第一个字段年份进行分区,让相同年份的数据被同一个分区处理,才能比较出每一年的最高气温
        public static class FirstPartitioner extends Partitioner<IntPair,NullWritable>{
            @Override
            public int getPartition(IntPair intPair, NullWritable nullWritable, int numPartition) {
                //numPartition是分区数,根据年份取余得到分区编号,可以保证年份相同记录进入通一个分区,但一个分区内有多个年份的记录。
                //例如1900~2000年的数据,numPartition为10,则1900,1910,1930……2000这些取余后都为0,这些年份的记录都会放入分区0,
                //需要对分区0中的记录按照年份分组,setGroupingComparatorClass就起到了这个分组的作用,分组后再按温度逆序排序
                int partition=(int)(intPair.getFirst()%numPartition);
                return partition;
            }
        }
        //一个reduce中有多个年份的数据,按照年份进行分组,分组后在按键进行聚合,年份相同的键都被认为是同一个键,聚合为第一个键,值都是NULL
        public static class ReducerGroupingComparator  extends WritableComparator
        {
            public ReducerGroupingComparator()
            {
                super(IntPair.class,true);
            }
            @Override
            public int compare(WritableComparable a, WritableComparable b) {
                IntPair pairA=(IntPair)a;
                IntPair pairB=(IntPair)b;
                //年份相同,返回值为0的分为一组
                return (int) (pairA.getFirst()-pairB.getFirst());
            }
        }
        //与FirstSecondComparator中定义的默认对比函数功能相同,这个用于显示设置对比类
        public  static class FirstSecondComparator extends WritableComparator
        {
            //这里一定要加
            public FirstSecondComparator()
            {
                super(IntPair.class,true);
            }
            @Override
            public int compare(WritableComparable a, WritableComparable b) {
                IntPair pairA=(IntPair)a;
                IntPair pairB=(IntPair)b;
                int cmp= (int) (pairA.getFirst()-pairA.getFirst());
                if (cmp!=0)
                {
                    return cmp;
                }
                double A=pairA.getSecond();
                double B=pairB.getSecond();
                return -((A==B)? 0:(A>B? 1:-1));
            }
        }
        //将已经排序、分组、聚合后的数据写入文件,默认按照FirstSecondPair进行排序。
        public static class SecondSortReducer extends Reducer<IntPair,NullWritable, LongWritable, DoubleWritable>
        {
            //Reducer<LongWritable, Text, FirstSecondPair, NullWritable>.Context 一定要这么写,否则会报异常
            protected void reduce(IntPair key, Iterable<NullWritable> values, Reducer<IntPair,NullWritable, LongWritable, DoubleWritable>.Context  context) throws IOException, InterruptedException {
                context.write(new LongWritable(key.getFirst()),new DoubleWritable(key.getSecond()));
            }
        }
        public static class JobBuilder {
            public static Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) throws IOException {
                if (args.length != 2) {
                    return null;
                }
                Job job = null;
                try {
                    job = new Job(conf, tool.getClass().getName());
                } catch (IOException e) {
                    e.printStackTrace();
                }
                FileInputFormat.addInputPath(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));
                return job;
            }
        }
        public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Job job =JobBuilder.parseInputAndOutput(this,getConf(),args);
            if (job==null)
            {
                return -1;
            }
            //设置map和reduce
            job.setMapperClass(SecondSortMaper.class);
            job.setReducerClass(SecondSortReducer.class);
            //显示设置排序类,先按照第一字段年份升序排列,年份相同的按照温度逆序排列。
            job.setSortComparatorClass(FirstSecondComparator.class);
            //设置分区类,根据年份分区,1990~2000分到10个分区,每个分区10年的数据
            job.setMapOutputKeyClass(IntPair.class);
            job.setMapOutputValueClass(NullWritable.class);
            job.setPartitionerClass(FirstPartitioner.class);
            //需要将每个分区中的10年的数据按照年份进行分组,每组的第一个值就是这一年的最高气温
            job.setGroupingComparatorClass(ReducerGroupingComparator.class);
            job.setOutputKeyClass(IntPair.class);
            job.setOutputValueClass(NullWritable.class);
            job.setNumReduceTasks(2);
            //删除结果目录,重新生成
            FileUtil.fullyDelete(new File(args[1]));
            // solve the Result, Put, KeyValue Serialization
           // job.getConfiguration().setStrings("io.serializations", job.getConfiguration().get("io.serializations"), FirstSecondPair.class.getName());


            return job.waitForCompletion(true)? 0:1;
        }
        public static void main(String[] args) throws Exception
        {
           int exitCode= ToolRunner.run(new MaxTemperatureUsingSecondSort(),args);
           System.exit(exitCode);
        }
    }

    执行作业的hadoop命令, -secondsort表示二次排序

    %hadoop jar  hadoop-example.jar MaxTemperatureUsingSecondSort input/ncdc/all output –secondsort

    查看结果的命令

    %hadoop fs –cat output –secondarysort/part-* | sort | head

    或者在idea中直接调试,在程序的src同级目录下创建文件夹input

     

    将上面实例中的三组数据写入input/sencondSort文件夹中

     

    在run->edit Cnfiguration选中这个类,设置输入路径,输出路径

     

    点击运行程序就可以在output路径看到输出结果

     

    参考文献:

    https://blog.csdn.net/sinat_32329183/article/details/73741880

  • 相关阅读:
    POJ-1182 食物链
    hdu 1879 继续畅通工程
    HDU 2604 Queuing
    hdu 1232 畅通工程
    POJ-1611 The Suspects
    Free DIY Tour
    Tr A
    不容易系列之(3)―― LELE的RPG难题
    W3C标准冒泡、捕获机制
    JavaScript 浏览器事件解读
  • 原文地址:https://www.cnblogs.com/bclshuai/p/12314002.html
Copyright © 2011-2022 走看看