zoukankan      html  css  js  c++  java
  • MapReduce的分区与 分组二次排序

    原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。否则将追究法律责任。http://computerdragon.blog.51cto.com/6235984/1287721

    问题描述:

    输入文件格式如下:

    name1    2

    name3    4

    name1    6

    name1    1

    name3    3

    name1    0

    要求输出的文件格式如下:

    name1    0,1,2,6

    name3    3,4

    要求是按照第一列分组,name1与name3也是按照顺序排列的,组内升序排序。

    思路:

    常规的输出,无法排序key所对应的多个值的顺序。为了排序组内中的值,需要将key与value放在同一个组。Job中有两个方法setGroupingComparatorClass和setSortComparatorClass,可以利用这两个方法来实现组内排序。但是这些排序都是基于key的,则就要将key和value定义成组合键。

    但是必须要保证第一列相同的全部都放在同一个分区中,则就需要自定义分区,分区的时候只考虑第一列的值。由于partitioner仅仅能保证每一个reducer接受同一个name的所有记录,但是reducer仍然是通过键进行分组的分区,也就说该分区中还是按照键来分成不同的组,还需要分组只参考name值

    先按照name分组,再在name中内部进行排序。

    解决方法:

    运用自定义组合键的策略,将name和1定义为一个组合键。在分区的时候只参考name的值,即继承partitioner。

     由于要按照name分组,则就需要定义分组策略,然后设置setGroupingComparatorClass。

    setGroupingComparatorClass主要定义哪些key可以放置在一组,分组的时候会对组合键进行比较,由于这里只需要考虑组合键中的一个值,则定义实现一个WritableComparator,设置比较策略。

    对于组内的排序,可以利用setSortComparatorClass来实现,

    这个方法主要用于定义key如何进行排序在它们传递给reducer之前,

    这里就可以来进行组内排序。

    具体代码:

         Hadoop版本号:hadoop1.1.2

    自定义组合键

     1 package whut;
     2 import java.io.DataInput;
     3 import java.io.DataOutput;
     4 import java.io.IOException;
     5 import org.apache.hadoop.io.IntWritable;
     6 import org.apache.hadoop.io.Text;
     7 import org.apache.hadoop.io.WritableComparable;
     8 //自定义组合键策略
     9 //java基本类型数据
    10 public class TextInt implements WritableComparable{
    11     //直接利用java的基本数据类型
    12     private String firstKey;
    13     private int secondKey;
    14     //必须要有一个默认的构造函数
    15     public String getFirstKey() {
    16         return firstKey;
    17     }
    18     public void setFirstKey(String firstKey) {
    19         this.firstKey = firstKey;
    20     }
    21     public int getSecondKey() {
    22         return secondKey;
    23     }
    24     public void setSecondKey(int secondKey) {
    25         this.secondKey = secondKey;
    26     }
    27                                                                                                                                                                           
    28     @Override
    29     public void write(DataOutput out) throws IOException {
    30         // TODO Auto-generated method stub
    31         out.writeUTF(firstKey);
    32         out.writeInt(secondKey);
    33     }
    34     @Override
    35     public void readFields(DataInput in) throws IOException {
    36         // TODO Auto-generated method stub
    37         firstKey=in.readUTF();
    38         secondKey=in.readInt();
    39     }
    40     //map的键的比较就是根据这个方法来进行的
    41     @Override
    42     public int compareTo(Object o) {
    43         // TODO Auto-generated method stub
    44         TextInt ti=(TextInt)o;
    45         //利用这个来控制升序或降序
    46         //this本对象写在前面代表是升序
    47         //this本对象写在后面代表是降序
    48         return this.getFirstKey().compareTo(ti.getFirstKey());
    49     }
    50 }
    View Code

    分组策略

     1 package whut;
     2 import org.apache.hadoop.io.WritableComparable;
     3 import org.apache.hadoop.io.WritableComparator;
     4 //主要就是对于分组进行排序,分组只按照组建键中的一个值进行分组
     5 public class TextComparator extends WritableComparator {
     6     //必须要调用父类的构造器
     7     protected TextComparator() {
     8         super(TextInt.class,true);//注册comparator
     9     }
    10     @Override
    11     public int compare(WritableComparable a, WritableComparable b) {
    12         // TODO Auto-generated method stub
    13         TextInt ti1=(TextInt)a;
    14         TextInt ti2=(TextInt)b;
    15         return ti1.getFirstKey().compareTo(ti2.getFirstKey());
    16     }
    17 }
    View Code

    组内排序策略

     1 package whut;
     2 import org.apache.hadoop.io.WritableComparable;
     3 import org.apache.hadoop.io.WritableComparator;
     4 //分组内部进行排序,按照第二个字段进行排序
     5 public class TextIntComparator extends WritableComparator {
     6     public TextIntComparator()
     7     {
     8         super(TextInt.class,true);
     9     }
    10     //这里可以进行排序的方式管理
    11     //必须保证是同一个分组的
    12     //a与b进行比较
    13     //如果a在前b在后,则会产生升序
    14     //如果a在后b在前,则会产生降序
    15     @Override
    16     public int compare(WritableComparable a, WritableComparable b) {
    17         // TODO Auto-generated method stub
    18         TextInt ti1=(TextInt)a;
    19         TextInt ti2=(TextInt)b;
    20         //首先要保证是同一个组内,同一个组的标识就是第一个字段相同
    21         if(!ti1.getFirstKey().equals(ti2.getFirstKey()))
    22            return ti1.getFirstKey().compareTo(ti2.getFirstKey());
    23         else
    24            return ti2.getSecondKey()-ti1.getSecondKey();//0,-1,1
    25     }
    26                                                                                                                                                           
    27 }
    View Code

    分区策略

     1 package whut;
     2 import org.apache.hadoop.io.IntWritable;
     3 import org.apache.hadoop.mapreduce.Partitioner;
     4 //参数为map的输出类型
     5 public class KeyPartitioner extends Partitioner<TextInt, IntWritable> {
     6     @Override
     7     public int getPartition(TextInt key, IntWritable value, int numPartitions) {
     8         // TODO Auto-generated method stub
     9         return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
    10     }
    11 }
    View Code

    MapReduce策略

      1 package whut;
      2 import java.io.IOException;
      3 import org.apache.hadoop.conf.Configuration;
      4 import org.apache.hadoop.conf.Configured;
      5 import org.apache.hadoop.fs.Path;
      6 import org.apache.hadoop.io.IntWritable;
      7 import org.apache.hadoop.io.Text;
      8 import org.apache.hadoop.mapreduce.Job;
      9 import org.apache.hadoop.mapreduce.Mapper;
     10 import org.apache.hadoop.mapreduce.Reducer;
     11 import org.apache.hadoop.mapreduce.Mapper.Context;
     12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     13 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
     14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     16 import org.apache.hadoop.util.Tool;
     17 import org.apache.hadoop.util.ToolRunner;
     18 //需要对数据进行分组以及组内排序的时候
     19 public class SortMain extends Configured implements Tool{
     20     //这里设置输入文格式为KeyValueTextInputFormat
     21     //name1 5
     22     //默认输入格式都是Text,Text
     23     public static class GroupMapper extends
     24        Mapper<Text, Text, TextInt, IntWritable>  {
     25         public IntWritable second=new IntWritable();
     26         public TextInt tx=new TextInt();
     27         @Override
     28         protected void map(Text key, Text value, Context context)
     29                 throws IOException, InterruptedException {
     30             String lineKey=key.toString();
     31             String lineValue=value.toString();
     32             int lineInt=Integer.parseInt(lineValue);
     33             tx.setFirstKey(lineKey);
     34             tx.setSecondKey(lineInt);
     35             second.set(lineInt);
     36             context.write(tx, second);
     37         }
     38     }
     39     //设置reduce
     40     public static class GroupReduce extends Reducer<TextInt, IntWritable, Text, Text>
     41     {
     42         @Override
     43         protected void reduce(TextInt key, Iterable<IntWritable> values,
     44                Context context)
     45                 throws IOException, InterruptedException {
     46             StringBuffer sb=new StringBuffer();
     47             for(IntWritable val:values)
     48             {
     49                 sb.append(val+",");
     50             }
     51             if(sb.length()>0)
     52             {
     53                 sb.deleteCharAt(sb.length()-1);
     54             }
     55             context.write(new Text(key.getFirstKey()), new Text(sb.toString()));
     56         }
     57     }
     58                                                                                                                                         
     59     @Override
     60     public int run(String[] args) throws Exception {
     61         // TODO Auto-generated method stub
     62         Configuration conf=getConf();
     63         Job job=new Job(conf,"SecondarySort");
     64         job.setJarByClass(SortMain.class);
     65         // 设置输入文件的路径,已经上传在HDFS
     66         FileInputFormat.addInputPath(job, new Path(args[0]));
     67         // 设置输出文件的路径,输出文件也存在HDFS中,但是输出目录不能已经存在
     68         FileOutputFormat.setOutputPath(job, new Path(args[1]));
     69                                                                                                                                             
     70         job.setMapperClass(GroupMapper.class);
     71         job.setReducerClass(GroupReduce.class);
     72         //设置分区方法
     73         job.setPartitionerClass(KeyPartitioner.class);
     74                                                                                                                                             
     75         //下面这两个都是针对map端的
     76         //设置分组的策略,哪些key可以放置到一组中
     77         job.setGroupingComparatorClass(TextComparator.class);
     78         //设置key如何进行排序在传递给reducer之前.
     79         //这里就可以设置对组内如何排序的方法
     80         /*************关键点**********/
     81         job.setSortComparatorClass(TextIntComparator.class);
     82         //设置输入文件格式
     83         job.setInputFormatClass(KeyValueTextInputFormat.class);
     84         //使用默认的输出格式即TextInputFormat
     85         //设置map的输出key和value类型
     86         job.setMapOutputKeyClass(TextInt.class);
     87         job.setMapOutputValueClass(IntWritable.class);
     88         //设置reduce的输出key和value类型
     89         //job.setOutputFormatClass(TextOutputFormat.class);
     90         job.setOutputKeyClass(Text.class);
     91         job.setOutputValueClass(Text.class);
     92         job.waitForCompletion(true);
     93         int exitCode=job.isSuccessful()?0:1;
     94         return exitCode;
     95     }
     96                                                                                                                                         
     97     public static void main(String[] args)  throws Exception
     98     {
     99        int exitCode=ToolRunner.run(new SortMain(), args);
    100        System.exit(exitCode);
    101     }
    102 }
    View Code

    注意事项

       1,设置分组排序按照升序还是降序是在自定义WritableComparable中的compareTo()方法实现的,具体升序或者降序的设置在代码中已经注释说明

       2,设置组内值进行升序还是降序的排序是在组内排序策略中的compare()方法注释说明的。

       3,这里同时最重要的一点是,将第二列即放在组合键中,又作为value,这样对于组合键排序也就相当于对于value进行排序了。

       4,在自定义组合键的时候,对于组合键中的数据的基本类型可以采用Java的基本类型也可以采用Hadoop的基本数据类型,对于Hadoop的基本数据类型一定要记得初始化new一个基本数据类型对象。对于组合键类,必须要有默认的构造方法。

    本文出自 “在云端的追梦” 博客,请务必保留此出处http://computerdragon.blog.51cto.com/6235984/1287721

  • 相关阅读:
    This counter can increment, decrement or skip ahead by an arbitrary amount
    LUT4/MUXF5/MUXF6 logic : Multiplexer 8:1
    synthesisable VHDL for a fixed ratio frequency divider
    Bucket Brigade FIFO SRL16E ( VHDL )
    srl16e fifo verilog
    DualPort Block RAM with Two Write Ports and Bytewide Write Enable in ReadFirst Mode
    Parametrilayze based on SRL16 shift register FIFO
    stm32 spi sdcard fatfs
    SPI bus master for System09 (2)
    SQLSERVER中的自旋锁
  • 原文地址:https://www.cnblogs.com/hbmlml/p/6952819.html
Copyright © 2011-2022 走看看