zoukankan      html  css  js  c++  java
  • MapReduce的分区与 分组二次排序

    原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。否则将追究法律责任。http://computerdragon.blog.51cto.com/6235984/1287721

    问题描述:

    输入文件格式如下:

    name1    2

    name3    4

    name1    6

    name1    1

    name3    3

    name1    0

    要求输出的文件格式如下:

    name1    0,1,2,6

    name3    3,4

    要求是按照第一列分组,name1与name3也是按照顺序排列的,组内升序排序。

    思路:

    常规的输出,无法排序key所对应的多个值的顺序。为了排序组内中的值,需要将key与value放在同一个组。Job中有两个方法setGroupingComparatorClass和setSortComparatorClass,可以利用这两个方法来实现组内排序。但是这些排序都是基于key的,则就要将key和value定义成组合键。

    但是必须要保证第一列相同的全部都放在同一个分区中,则就需要自定义分区,分区的时候只考虑第一列的值。由于partitioner仅仅能保证每一个reducer接受同一个name的所有记录,但是reducer仍然是通过键进行分组的分区,也就说该分区中还是按照键来分成不同的组,还需要分组只参考name值

    先按照name分组,再在name中内部进行排序。

    解决方法:

    运用自定义组合键的策略,将name和1定义为一个组合键。在分区的时候只参考name的值,即继承partitioner。

     由于要按照name分组,则就需要定义分组策略,然后设置setGroupingComparatorClass。

    setGroupingComparatorClass主要定义哪些key可以放置在一组,分组的时候会对组合键进行比较,由于这里只需要考虑组合键中的一个值,则定义实现一个WritableComparator,设置比较策略。

    对于组内的排序,可以利用setSortComparatorClass来实现,

    这个方法主要用于定义key如何进行排序在它们传递给reducer之前,

    这里就可以来进行组内排序。

    具体代码:

         Hadoop版本号:hadoop1.1.2

    自定义组合键

     1 package whut;
     2 import java.io.DataInput;
     3 import java.io.DataOutput;
     4 import java.io.IOException;
     5 import org.apache.hadoop.io.IntWritable;
     6 import org.apache.hadoop.io.Text;
     7 import org.apache.hadoop.io.WritableComparable;
     8 //自定义组合键策略
     9 //java基本类型数据
    10 public class TextInt implements WritableComparable{
    11     //直接利用java的基本数据类型
    12     private String firstKey;
    13     private int secondKey;
    14     //必须要有一个默认的构造函数
    15     public String getFirstKey() {
    16         return firstKey;
    17     }
    18     public void setFirstKey(String firstKey) {
    19         this.firstKey = firstKey;
    20     }
    21     public int getSecondKey() {
    22         return secondKey;
    23     }
    24     public void setSecondKey(int secondKey) {
    25         this.secondKey = secondKey;
    26     }
    27                                                                                                                                                                           
    28     @Override
    29     public void write(DataOutput out) throws IOException {
    30         // TODO Auto-generated method stub
    31         out.writeUTF(firstKey);
    32         out.writeInt(secondKey);
    33     }
    34     @Override
    35     public void readFields(DataInput in) throws IOException {
    36         // TODO Auto-generated method stub
    37         firstKey=in.readUTF();
    38         secondKey=in.readInt();
    39     }
    40     //map的键的比较就是根据这个方法来进行的
    41     @Override
    42     public int compareTo(Object o) {
    43         // TODO Auto-generated method stub
    44         TextInt ti=(TextInt)o;
    45         //利用这个来控制升序或降序
    46         //this本对象写在前面代表是升序
    47         //this本对象写在后面代表是降序
    48         return this.getFirstKey().compareTo(ti.getFirstKey());
    49     }
    50 }
    View Code

    分组策略

     1 package whut;
     2 import org.apache.hadoop.io.WritableComparable;
     3 import org.apache.hadoop.io.WritableComparator;
     4 //主要就是对于分组进行排序,分组只按照组建键中的一个值进行分组
     5 public class TextComparator extends WritableComparator {
     6     //必须要调用父类的构造器
     7     protected TextComparator() {
     8         super(TextInt.class,true);//注册comparator
     9     }
    10     @Override
    11     public int compare(WritableComparable a, WritableComparable b) {
    12         // TODO Auto-generated method stub
    13         TextInt ti1=(TextInt)a;
    14         TextInt ti2=(TextInt)b;
    15         return ti1.getFirstKey().compareTo(ti2.getFirstKey());
    16     }
    17 }
    View Code

    组内排序策略

     1 package whut;
     2 import org.apache.hadoop.io.WritableComparable;
     3 import org.apache.hadoop.io.WritableComparator;
     4 //分组内部进行排序,按照第二个字段进行排序
     5 public class TextIntComparator extends WritableComparator {
     6     public TextIntComparator()
     7     {
     8         super(TextInt.class,true);
     9     }
    10     //这里可以进行排序的方式管理
    11     //必须保证是同一个分组的
    12     //a与b进行比较
    13     //如果a在前b在后,则会产生升序
    14     //如果a在后b在前,则会产生降序
    15     @Override
    16     public int compare(WritableComparable a, WritableComparable b) {
    17         // TODO Auto-generated method stub
    18         TextInt ti1=(TextInt)a;
    19         TextInt ti2=(TextInt)b;
    20         //首先要保证是同一个组内,同一个组的标识就是第一个字段相同
    21         if(!ti1.getFirstKey().equals(ti2.getFirstKey()))
    22            return ti1.getFirstKey().compareTo(ti2.getFirstKey());
    23         else
    24            return ti2.getSecondKey()-ti1.getSecondKey();//0,-1,1
    25     }
    26                                                                                                                                                           
    27 }
    View Code

    分区策略

     1 package whut;
     2 import org.apache.hadoop.io.IntWritable;
     3 import org.apache.hadoop.mapreduce.Partitioner;
     4 //参数为map的输出类型
     5 public class KeyPartitioner extends Partitioner<TextInt, IntWritable> {
     6     @Override
     7     public int getPartition(TextInt key, IntWritable value, int numPartitions) {
     8         // TODO Auto-generated method stub
     9         return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
    10     }
    11 }
    View Code

    MapReduce策略

      1 package whut;
      2 import java.io.IOException;
      3 import org.apache.hadoop.conf.Configuration;
      4 import org.apache.hadoop.conf.Configured;
      5 import org.apache.hadoop.fs.Path;
      6 import org.apache.hadoop.io.IntWritable;
      7 import org.apache.hadoop.io.Text;
      8 import org.apache.hadoop.mapreduce.Job;
      9 import org.apache.hadoop.mapreduce.Mapper;
     10 import org.apache.hadoop.mapreduce.Reducer;
     11 import org.apache.hadoop.mapreduce.Mapper.Context;
     12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     13 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
     14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     16 import org.apache.hadoop.util.Tool;
     17 import org.apache.hadoop.util.ToolRunner;
     18 //需要对数据进行分组以及组内排序的时候
     19 public class SortMain extends Configured implements Tool{
     20     //这里设置输入文格式为KeyValueTextInputFormat
     21     //name1 5
     22     //默认输入格式都是Text,Text
     23     public static class GroupMapper extends
     24        Mapper<Text, Text, TextInt, IntWritable>  {
     25         public IntWritable second=new IntWritable();
     26         public TextInt tx=new TextInt();
     27         @Override
     28         protected void map(Text key, Text value, Context context)
     29                 throws IOException, InterruptedException {
     30             String lineKey=key.toString();
     31             String lineValue=value.toString();
     32             int lineInt=Integer.parseInt(lineValue);
     33             tx.setFirstKey(lineKey);
     34             tx.setSecondKey(lineInt);
     35             second.set(lineInt);
     36             context.write(tx, second);
     37         }
     38     }
     39     //设置reduce
     40     public static class GroupReduce extends Reducer<TextInt, IntWritable, Text, Text>
     41     {
     42         @Override
     43         protected void reduce(TextInt key, Iterable<IntWritable> values,
     44                Context context)
     45                 throws IOException, InterruptedException {
     46             StringBuffer sb=new StringBuffer();
     47             for(IntWritable val:values)
     48             {
     49                 sb.append(val+",");
     50             }
     51             if(sb.length()>0)
     52             {
     53                 sb.deleteCharAt(sb.length()-1);
     54             }
     55             context.write(new Text(key.getFirstKey()), new Text(sb.toString()));
     56         }
     57     }
     58                                                                                                                                         
     59     @Override
     60     public int run(String[] args) throws Exception {
     61         // TODO Auto-generated method stub
     62         Configuration conf=getConf();
     63         Job job=new Job(conf,"SecondarySort");
     64         job.setJarByClass(SortMain.class);
     65         // 设置输入文件的路径,已经上传在HDFS
     66         FileInputFormat.addInputPath(job, new Path(args[0]));
     67         // 设置输出文件的路径,输出文件也存在HDFS中,但是输出目录不能已经存在
     68         FileOutputFormat.setOutputPath(job, new Path(args[1]));
     69                                                                                                                                             
     70         job.setMapperClass(GroupMapper.class);
     71         job.setReducerClass(GroupReduce.class);
     72         //设置分区方法
     73         job.setPartitionerClass(KeyPartitioner.class);
     74                                                                                                                                             
     75         //下面这两个都是针对map端的
     76         //设置分组的策略,哪些key可以放置到一组中
     77         job.setGroupingComparatorClass(TextComparator.class);
     78         //设置key如何进行排序在传递给reducer之前.
     79         //这里就可以设置对组内如何排序的方法
     80         /*************关键点**********/
     81         job.setSortComparatorClass(TextIntComparator.class);
     82         //设置输入文件格式
     83         job.setInputFormatClass(KeyValueTextInputFormat.class);
     84         //使用默认的输出格式即TextInputFormat
     85         //设置map的输出key和value类型
     86         job.setMapOutputKeyClass(TextInt.class);
     87         job.setMapOutputValueClass(IntWritable.class);
     88         //设置reduce的输出key和value类型
     89         //job.setOutputFormatClass(TextOutputFormat.class);
     90         job.setOutputKeyClass(Text.class);
     91         job.setOutputValueClass(Text.class);
     92         job.waitForCompletion(true);
     93         int exitCode=job.isSuccessful()?0:1;
     94         return exitCode;
     95     }
     96                                                                                                                                         
     97     public static void main(String[] args)  throws Exception
     98     {
     99        int exitCode=ToolRunner.run(new SortMain(), args);
    100        System.exit(exitCode);
    101     }
    102 }
    View Code

    注意事项

       1,设置分组排序按照升序还是降序是在自定义WritableComparable中的compareTo()方法实现的,具体升序或者降序的设置在代码中已经注释说明

       2,设置组内值进行升序还是降序的排序是在组内排序策略中的compare()方法注释说明的。

       3,这里同时最重要的一点是,将第二列即放在组合键中,又作为value,这样对于组合键排序也就相当于对于value进行排序了。

       4,在自定义组合键的时候,对于组合键中的数据的基本类型可以采用Java的基本类型也可以采用Hadoop的基本数据类型,对于Hadoop的基本数据类型一定要记得初始化new一个基本数据类型对象。对于组合键类,必须要有默认的构造方法。

    本文出自 “在云端的追梦” 博客,请务必保留此出处http://computerdragon.blog.51cto.com/6235984/1287721

  • 相关阅读:
    从属性文件中读取配置
    Page Object Manager
    在Selenium中使用JavaScriptExecutor处理Ajax调用?
    wait
    常用操作
    Selenium收藏官方网址
    PageObject样例
    解决办法-错误:Access denied for user 'root'@'localhost'
    Struts2中的OGNL详解
    用C++,调用浏览器打开一个网页
  • 原文地址:https://www.cnblogs.com/hbmlml/p/6952819.html
Copyright © 2011-2022 走看看