zoukankan      html  css  js  c++  java
  • MapReduce按照两个字段对数据进行排序

    按照k2排序,要求k2必须是可以比较的,即必须实现WritableComparable接口。

    但是如果还想让别的字段(比如v2中的一些字段)参与排序怎么办?

    需要重新定义k2....把需要参与排序的字段都放到k2中.

    这块用代码实现:

    假如数据现在的结构是

    3       3

    3       2

    3       1

    2       2

    2       1

    1       1

    看代码:

     1 import java.io.DataInput;
     2 import java.io.DataOutput;
     3 import java.io.IOException;
     4 
     5 import org.apache.hadoop.conf.Configuration;
     6 import org.apache.hadoop.fs.Path;
     7 import org.apache.hadoop.io.LongWritable;
     8 import org.apache.hadoop.io.NullWritable;
     9 import org.apache.hadoop.io.Text;
    10 import org.apache.hadoop.io.WritableComparable;
    11 import org.apache.hadoop.mapreduce.Job;
    12 import org.apache.hadoop.mapreduce.Mapper;
    13 import org.apache.hadoop.mapreduce.Reducer;
    14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    16 
    17 public class TwoIntSortApp {
    18     
    19     public static void main(String[] args) throws Exception {
    20         Job job = Job.getInstance(new Configuration(), TwoIntSortApp.class.getSimpleName());
    21         job.setJarByClass(TwoIntSortApp.class);
    22         FileInputFormat.setInputPaths(job, args[0]);
    23         
    24         job.setMapperClass(TwoIntSortMapper.class);
    25         job.setMapOutputKeyClass(TwoInt.class);
    26         job.setMapOutputValueClass(NullWritable.class);
    27         
    28         job.setReducerClass(TwoIntSortReducer.class);
    29         job.setOutputKeyClass(TwoInt.class);
    30         job.setOutputValueClass(NullWritable.class);
    31 
    32         FileOutputFormat.setOutputPath(job, new Path(args[1]));
    33         job.waitForCompletion(true);        
    34     }
    35     
    36     public static class TwoIntSortMapper extends Mapper<LongWritable, Text, TwoInt, NullWritable>{
    37         TwoInt k2 = new TwoInt();
    38         @Override
    39         protected void map(LongWritable key, Text value,
    40                 Mapper<LongWritable, Text, TwoInt, NullWritable>.Context context)
    41                 throws IOException, InterruptedException {
    42             String[] splited = value.toString().split("	");
    43             k2.set(splited[0],splited[1]);
    44             context.write(k2, NullWritable.get());
    45             System.out.println("Mapper-----第一个数:"+k2.first+" 第二个数:"+k2.second);
    46         }
    47     }
    48     
    49     public static class TwoIntSortReducer extends Reducer<TwoInt, NullWritable, TwoInt, NullWritable>{
    50         int i=1;
    51         @Override
    52         protected void reduce(TwoInt k2, Iterable<NullWritable> arg1,
    53                 Reducer<TwoInt, NullWritable, TwoInt, NullWritable>.Context context)
    54                 throws IOException, InterruptedException {
    55             context.write(k2,NullWritable.get());
    56             System.out.println("调用次数"+(i++));
    57             System.out.println("Reducer-----第一个数:"+k2.first+" 第二个数:"+k2.second);
    58         }
    59     }
    60     
    61     public static class TwoInt implements WritableComparable<TwoInt>{
    62         int first;
    63         int second;
    64         public void write(DataOutput out) throws IOException {
    65             out.writeInt(first);
    66             out.writeInt(second);
    67         }
    68         
    69         public void set(String s1,String s2){
    70             this.first = Integer.parseInt(s1);
    71             this.second = Integer.parseInt(s2);
    72         }
    73 
    74         public void readFields(DataInput in) throws IOException {
    75             this.first = in.readInt();
    76             this.second = in.readInt();
    77             
    78         }
    79 
    80         public int compareTo(TwoInt o) {
    81             int r1 = this.first - o.first;
    82             if(r1 < 0){
    83                 return -1;
    84             }else if(r1 > 0){
    85                 return 1;
    86             }
    87             int r2 = this.second - o.second;
    88             return  (r2 < 0 ? -1 : (r2 > 0 ? 1 : 0)); 
    89         }
    90         
    91         @Override
    92         public String toString() {
    93             return this.first+"	"+this.second;
    94         }
    95     }
    96 }

     //==============================================================

    在job上设置Combiner类...

            
            job.setCombinerClass(TwoIntSortReducer.class);//设置Combiner类
            job.setGroupingComparatorClass(MyGroupingCompartor.class);//设置自定义的分组类

      

     1     public static class MyGroupingCompartor extends WritableComparator{
     2         @Override
     3         public int compare(WritableComparable a, WritableComparable b) {
     4             TwoInt aa = (TwoInt)a; 
     5             TwoInt bb = (TwoInt)b; 
     6             return aa.first-bb.first<0?-1:(aa.first-bb.first>0?1:0);//只要是第一列相同的就认为是一个分组.
     7             /*
     8              * 1    1
     9              * 2    1
    10              * 2    2
    11              * 3    1
    12              * 3    2
    13              * 3    3
    14              * 这样就分成了三组
    15              */
    16         }
    17     }
  • 相关阅读:
    函数
    数值
    数据类型
    jQuery工具方法
    jQuery概述
    史上最全的SpringMVC学习笔记
    webpack-Hot Module Replacement(热更新)
    webpack-Manifest
    webpack-Targets(构建目标)
    webpack-Dependency Graph(依赖图)
  • 原文地址:https://www.cnblogs.com/DreamDrive/p/5678175.html
Copyright © 2011-2022 走看看