zoukankan      html  css  js  c++  java
  • MapReduce的二次排序

    二次排序的需求说明

       在mapreduce操作时,shuffle阶段会多次根据key值排序。但是在shuffle分组后,相同key值的values序列的顺序是不确定的(如下图)。如果想要此时value值也是排序好的,这种需求就是二次排序。

                                                            1.png

    测试的文件数据

    a  5
    a  1
    a  7
    a  9
    b  3
    b  8
    b  10
    c  2
    c  4
    c  3
    

      

    第一种实现思路

    直接在reduce端对分组后的values进行排序。

    • map关键代码
      public class myMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              System.out.println("before Mapper  <k1,v1>:"+key+"-"+value);
              String line = value.toString();
              String[] str = line.split(" ");
      
              context.write(new Text(str[0]),new IntWritable(Integer.parseInt(str[1])));
          }
      }
    • reduce关键代码
    • public class myReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
          @Override
          protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
              List<Integer> valuesList = new ArrayList<Integer>();
      
              // 取出value
              for(IntWritable value : values) {
                  valuesList.add(value.get());
              }
              // 进行排序
              Collections.sort(valuesList);
      
              for(Integer value : valuesList) {
                  context.write(key, new IntWritable(value));
              }
          }
      }

      App关键代码

      public class myApp {
          public static void main(String[] args) throws Exception {
              Configuration conf = new Configuration();
              conf.set("fs.defaultFS", "file:///");
              Job job = Job.getInstance(conf);
      
              //设置job的各种属性
              job.setJobName("myAppApp");                        //作业名称
              job.setJarByClass(myApp.class);                 //搜索类
      
              //添加输入路径
              FileInputFormat.addInputPath(job,new Path("F:\mr\secondsort\input1"));
              //设置输出路径
              FileOutputFormat.setOutputPath(job,new Path("F:\mr\secondsort\output1"));
      
              job.setMapperClass(myMapper.class);             //mapper类
              job.setReducerClass(myReducer.class);           //reducer类
      
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(IntWritable.class);     //
      
              job.waitForCompletion(true);
          }
      }

      其输出:

      a    1
      a    5
      a    7
      a    9
      b    3
      b    8
      b    10
      c    2
      c    3
      c    4

        需要注意的是,这样把排序工作都放到reduce端完成,当values序列长度非常大的时候,会对CPU和内存造成极大的负载。还有就是,在reduce端对values进行迭代的时候,不要直接直接存储value值或者key值,因为reduce方法会反复执行多次,但key和value相关的对象只有两个,reduce会反复重用这两个对象。需要用相应的数据类型.get()取出后再存储。

    第二种实现思路

       用二次排序来实现

        二次排序,从字面上可以理解为在对key排序的基础上对key所对应的值value排序,也叫辅助排序。一般情况下,MapReduce框架只对key排序,而不对key所对应的值排序,因此value的排序经常是不固定的。但是我们经常会遇到同时对key和value排序的需求,例如Hadoop权威指南中的求一年的高高气温,key为年份,value为最高气温,年份按照降序排列,气温按照降序排列。还有水果电商网站经常会有按天统计水果销售排行榜的需求等等,这些都是需要对key和value同时进行排序。

        根据相关博客加上书本的理解:

        得到如下的流程:

        input file ---> split  ---> recordreader(生成键值对)---> 形成复合键ComboKey(key和value的复合键)  ---> 分区 (setPartitionerClass设置分区函数--可以是自定义的分区函数,这里主要根据的是组合键的第一个字段(key)进行分区) ----> 在每个分区内进行排序(分区内部排序,setSortComparatorClass设定特定的排序对比器进行排序,实际上这已经算是进行了一次二次排序) ---> shuffle阶段--->内部排序(shuffle阶段从不同节点读取了相应的map输出文件,所以在此处进行第二次排序,同样的setSortComparatorClass设定特定的排序对比器进行排序,进行第二次排序)--->reduce阶段进行分组(会对键值相同的项进行分组操作,其默认操作的键。对于我们生产的键值对<key1,value1>,key1是一个复合键值对,我们对他的操作是针对复合键值对key1的第一个值为准的,setGroupingComparatorClass设定的分组函数)---> 执行reduce函数--->输出结果。

        具体事例:

     

                  事例文件如上:

        • 组合键值对comboKey
          public class pairWritable implements WritableComparable<pairWritable> {
              //组合key
              private String first;
              private int second;
          
              public pairWritable() {
          
              }
          
              public pairWritable(String first, int second) {
                  this.set(first,second);
              }
          
              public String getFirst() {
                  return first;
              }
          
              public void setFirst(String first) {
                  this.first = first;
              }
          
              public int getSecond() {
                  return second;
              }
          
              public void setSecond(int second) {
                  this.second = second;
              }
              /**
               * 方便设置字段
               */
              public void set(String first, int second) {
                  this.first = first;
                  this.second = second;
              }
              public int compareTo(pairWritable o) {
                  int comp=this.first.compareTo(o.first);
          
                  if (comp!=0){
                      return comp;
                  }else {
                      return Integer.valueOf(this.second).compareTo(Integer.valueOf(o.getSecond()));
                  }
              }
          
              /**
               * 序列化
               * @param dataOutput
               * @throws IOException
               */
              public void write(DataOutput dataOutput) throws IOException {
          
                  dataOutput.writeUTF(first);
                  dataOutput.writeInt(second);
          
              }
          
              /**
               * 反序列化
               * @param dataInput
               * @throws IOException
               */
              public void readFields(DataInput dataInput) throws IOException {
                  this.first = dataInput.readUTF();
                  this.second=dataInput.readInt();
              }
          }
        • 我们的mapper如下

          public class myMapper extends Mapper<LongWritable,Text,pairWritable,NullWritable> {
              @Override
              protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                  String line = value.toString();
                  String[] str = line.split(" ");
                  pairWritable keyOut = new pairWritable();
                  keyOut.set(str[0],Integer.valueOf(str[1]));
                  context.write(keyOut,NullWritable.get());
              }
          }
        • 然后是分区类

          ublic class myPartition  extends Partitioner<pairWritable, IntWritable> {
              public int getPartition(pairWritable key, IntWritable value, int numPartitions) {
                  /**
                   * 默认的实现 (key.hashCode() & Integer.MAX_VALUE) % numPartitions
                   * 让key中first字段作为分区依据
                   */
                  return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
              }
          }
        • 组合键排序比较方法需要自己写一下,默认情况下key不是排序的,所以我们需要先进行排序。然后才是分组

          public class pairWritableComparator extends WritableComparator {
              protected pairWritableComparator() {
                  super(pairWritable.class, true);
              }
          
              @Override
              public int compare(WritableComparable a, WritableComparable b) {
                  System.out.println("ComboKeyComparator");
                  pairWritable k1=(pairWritable)a;
                  pairWritable k2=(pairWritable)b;
          
                  return k1.compareTo(k2);
              }
          }
        • 重新分组方法,同一key的需要放在同一组中

          public class myGroupComparator implements RawComparator<pairWritable> {
              /**
               * 字节比较
               * arg0,arg3为要比较的两个字节数组
               * arg1,arg2表示第一个字节数组要进行比较的收尾位置,arg4,arg5表示第二个
               * 从第一个字节比到组合key中second的前一个字节,因为second为int型,所以长度为4
               */
              public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
                  return WritableComparator.compareBytes(arg0, 0, arg2-4, arg3, 0, arg5-4);
              }
          
              /**
               * 对象比较
               */
              public int compare(pairWritable o1, pairWritable o2) {
                  return o1.getFirst().compareTo(o2.getFirst());
              }
          }
        •  reducer

          public class myReducer extends Reducer<pairWritable,NullWritable,Text,IntWritable> {
              @Override
              protected void reduce(pairWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
                  Text keyOut = new Text();
                  IntWritable valueOut=new IntWritable();
                  //迭代输出
                  for(NullWritable value : values){
                      keyOut.set(key.getFirst());
                      valueOut.set(key.getSecond());
                      context.write(keyOut,valueOut);
                  }
              }
          }


        • App

          public class myApp {
              public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                  Configuration conf = new Configuration();
                  conf.set("fs.defaultFS", "file:///");
                  //判断文件是否存在
                  Path path = new Path("F:\mr\secondsort\output3");
                  FileSystem fileSystem = path.getFileSystem(conf);
                  if(fileSystem.isDirectory(path)){
                      fileSystem.delete(path,true);
                  }
          
                  Job job = Job.getInstance(conf);
          
                  //设置job的各种属性
                  job.setJobName("myApp");                        //作业名称
                  job.setJarByClass(myApp.class);                 //搜索类
                  job.setInputFormatClass(TextInputFormat.class); //设置输入格式
          
                  //添加输入路径
                  FileInputFormat.addInputPath(job,new Path("F:\mr\secondsort\input1"));
                  //设置输出路径
                  FileOutputFormat.setOutputPath(job,new Path("F:\mr\secondsort\output3"));
                  //map输出类型
                  job.setMapOutputKeyClass(pairWritable.class);           //
                  job.setMapOutputValueClass(NullWritable.class);        //
          
                  job.setMapperClass(myMapper.class);
                  job.setReducerClass(myReducer.class);
                  //设置分区函数
                  job.setPartitionerClass(myPartition.class);
                  //设置排序对比起
                  job.setSortComparatorClass(pairWritableComparator.class);
                  //设置分组对比器
                  job.setGroupingComparatorClass(myGroupComparator.class);
          
                  job.waitForCompletion(true);
              }
          }
                 

             

  • 相关阅读:
    176. Second Highest Salary
    175. Combine Two Tables
    172. Factorial Trailing Zeroes
    171. Excel Sheet Column Number
    169. Majority Element
    168. Excel Sheet Column Title
    167. Two Sum II
    160. Intersection of Two Linked Lists
    个人博客记录
    <meta>标签
  • 原文地址:https://www.cnblogs.com/tongxupeng/p/10415656.html
Copyright © 2011-2022 走看看