zoukankan      html  css  js  c++  java
  • MapReduce分组

    •  分组:相同key的value进行分组

     例子:如下输入输出,右边的第一列没有重复值,第二列取得是当第一列相同时第二例取最大值

              

    分析:首先确定<k3,v3>,k3的选择两种方式,

    方法1.前两列都作为k3

    方法2.两列分别是k3和v3,此种情况的k2和v2分别是那些,第一列为k2,第二列为v2,但是最后如何无法转化为k3,v3呢,思路是从v2s中取值最大的,此种情况不能取值。

    第一部分:方法二达到任务目的

    (1)自定义Mapper

     1 private static class MyMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{
     2     IntWritable k2= new IntWritable();
     3     IntWritable v2= new IntWritable();
     4     @Override
     5     protected void map(LongWritable key, Text value,
     6             Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context)
     7             throws IOException, InterruptedException {
     8            String[] splited = value.toString().split("	");
     9            k2.set(Integer.parseInt(splited[0]));
    10            v2.set(Integer.parseInt(splited[1]));
    11            context.write(k2, v2);
    12     }
    13 }

    (2)自定义Reduce

    //按照k2進行排序,分組(分为3各组,reduce函数被调用3次,分几组调用几次)
    //分组为3-{3,2,1}, 2-{2,1},1-{1}

     1 private static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
     2     IntWritable v3 = new IntWritable();
     3     @Override
     4     protected void reduce(IntWritable k2, Iterable<IntWritable> v2s,
     5             Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context)
     6             throws IOException, InterruptedException {
     7         int max=Integer.MIN_VALUE;
     8         for (IntWritable v2 : v2s) {
     9             if (v2.get()>max) {
    10                 max=v2.get();
    11             }
    12         }
    13         //每个组求得一个最大值可得到结果的序列
    14         v3.set(max);
    15         context.write(k2, v3);
    16     }
    17 }

    (3)组合MapReduce

     1 public static void main(String[] args) throws Exception {
     2     Job job = Job.getInstance(new Configuration(), GroupTest.class.getSimpleName());
     3     job.setJarByClass(GroupTest.class);
     4     //1.自定义输入路径
     5     FileInputFormat.setInputPaths(job, new Path(args[0]));
     6     //2.自定义mapper
     7     //job.setInputFormatClass(TextInputFormat.class);
     8     job.setMapperClass(MyMapper.class);
     9     //job.setMapOutputKeyClass(Text.class);
    10     //job.setMapOutputValueClass(TrafficWritable.class);
    11     
    12     //3.自定义reduce
    13     job.setReducerClass(MyReducer.class);
    14     job.setOutputKeyClass(IntWritable.class);
    15     job.setOutputValueClass(IntWritable.class);
    16     //4.自定义输出路径
    17     FileOutputFormat.setOutputPath(job, new Path(args[1]));
    18     //job.setOutputFormatClass(TextOutputFormat.class);//对输出的数据格式化并写入磁盘
    19     
    20     job.waitForCompletion(true);
    21 }

    由此,完整的代码如下:

     1 package Mapreduce;
     2 
     3 import java.io.IOException;
     4 
     5 import org.apache.hadoop.conf.Configuration;
     6 import org.apache.hadoop.fs.Path;
     7 import org.apache.hadoop.io.IntWritable;
     8 import org.apache.hadoop.io.LongWritable;
     9 import org.apache.hadoop.io.Text;
    10 import org.apache.hadoop.mapreduce.Job;
    11 import org.apache.hadoop.mapreduce.Mapper;
    12 import org.apache.hadoop.mapreduce.Reducer;
    13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    15 
    16 public class GroupTest {
    17 public static void main(String[] args) throws Exception {
    18     Job job = Job.getInstance(new Configuration(), GroupTest.class.getSimpleName());
    19     job.setJarByClass(GroupTest.class);
    20     //1.自定义输入路径
    21     FileInputFormat.setInputPaths(job, new Path(args[0]));
    22     //2.自定义mapper
    23     //job.setInputFormatClass(TextInputFormat.class);
    24     job.setMapperClass(MyMapper.class);
    25     //job.setMapOutputKeyClass(Text.class);
    26     //job.setMapOutputValueClass(TrafficWritable.class);
    27     
    28     //3.自定义reduce
    29     job.setReducerClass(MyReducer.class);
    30     job.setOutputKeyClass(IntWritable.class);
    31     job.setOutputValueClass(IntWritable.class);
    32     //4.自定义输出路径
    33     FileOutputFormat.setOutputPath(job, new Path(args[1]));
    34     //job.setOutputFormatClass(TextOutputFormat.class);//对输出的数据格式化并写入磁盘
    35     
    36     job.waitForCompletion(true);
    37 }
    38 private static class MyMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{
    39     IntWritable k2= new IntWritable();
    40     IntWritable v2= new IntWritable();
    41     @Override
    42     protected void map(LongWritable key, Text value,
    43             Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context)
    44             throws IOException, InterruptedException {
    45            String[] splited = value.toString().split("	");
    46            k2.set(Integer.parseInt(splited[0]));
    47            v2.set(Integer.parseInt(splited[1]));
    48            context.write(k2, v2);
    49     }
    50 }
    51 //按照k2進行排序,分組(分为3各组,reduce函数被调用3次,分几组调用几次)
    52 //分组为3-{3,2,1}, 2-{2,1},1-{1}
    53 private static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
    54     IntWritable v3 = new IntWritable();
    55     @Override
    56     protected void reduce(IntWritable k2, Iterable<IntWritable> v2s,
    57             Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context)
    58             throws IOException, InterruptedException {
    59         int max=Integer.MIN_VALUE;
    60         for (IntWritable v2 : v2s) {
    61             if (v2.get()>max) {
    62                 max=v2.get();
    63             }
    64         }
    65         //每个组求得一个最大值可得到结果的序列
    66         v3.set(max);
    67         context.write(k2, v3);
    68     }
    69 }
    70 }
    最值得MapReduce代码

    (4)测试代码运行结果

    •   [root@neusoft-master filecontent]# hadoop jar GroupTest.jar /neusoft/twoint  /out9 
    •   [root@neusoft-master filecontent]# hadoop jar -text  /out9/part-r-00000
    •   [root@neusoft-master filecontent]# hadoop dfs -text  /out9/part-r-00000  

           

    第二部分:方法一达到任务目的

          前两列都作为k3,无v3,由此类推,k2也是前两列

          但是如果采用默认分组的话,上述数据集分为6组,无法达到同样的数值取得最大值的目的。

          由此,利用Mapreduce的自定义分组规则,使得第一列相同的数值可以在一个组里面,从而正确的分组。

          MapReduce提供了job.setGroupingComparatorClass(cls);其中cls是自定义分组的类

          

          (1) 从源代码可知,该类需要继承RawComparator类,自定义分组代码如下:

     1 //分组比较--自定义分组
     2     private static class MyGroupingComparator implements RawComparator {
     3         public int compare(Object o1, Object o2) {
     4             return 0;//默认的比较方法
     5         }
     6         //byte[] b1 表示第一个参数的输入字节表示,byte[] b2表示第二个参数的输入字节表示
     7         //b1 The first byte array. 第一个字节数组,
     8         //b1表示前8个字节,b2表示后8个字节,字节是按次序依次比较的
     9         //s1 The position index in b1. The object under comparison's starting index.第一列开始位置
    10         //l1 The length of the object in b1.第一列长度 ,在这里表示长度8
    11         //提供的数据集中的k2一共48个字节,k2的每一行的TwoInt类型表示8字节(t1和t2分别为4字节)
    12         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    13             //compareBytes是按字节比较的方法,其中k2表示的是两列,第一列比较,第二例不比较
    14             //第一个字节数组的前四个字节和第二个字节数组的前四个字节比较
    15             //{3,3},{3,2},{3,1},{2,2},{2,1},{1,1}
    16             //比较上述六组的每组的第一个数字,也就是比较twoint中得t1数值
    17             //现在就根据t1可分成3个组了{3,(3,2,1)}{2,(2,1)}{1,1}
    18             //之后再从v2中取出最大值
    19             return WritableComparator.compareBytes(b1, s1, l1-4, b2, s2, l2-4);
    20         }
    21 
    22     }

          (2)主函数中调用      

    //当没有下面的自定义分组的话,会调用k2的compareto方法执行k2的比较,如果自定义了分组类则使用自定义分组类
            job.setGroupingComparatorClass(MyGroupingComparator.class);

         (3)根据比较函数个字段的含义,可以得到v2的类型为intwritable,而不是nullwritable,v2是由第二列的数组成的集合

          Mapper函数如下:

     1 private static class MyMapper extends
     2             Mapper<LongWritable, Text, TwoInt, IntWritable> {
     3         //这里的v2需要改为IntWritable而不是nullwritable
     4         TwoInt K2 = new TwoInt();
     5         IntWritable v2= new IntWritable();
     6         @Override
     7         protected void map(LongWritable key, Text value,
     8                 Mapper<LongWritable, Text, TwoInt, IntWritable>.Context context)
     9                 throws IOException, InterruptedException {
    10             String[] splited = value.toString().split("	");
    11             K2.set(Integer.parseInt(splited[0]), Integer.parseInt(splited[1]));
    12             v2.set(Integer.parseInt(splited[1])); //要比较第二列,需要将第二列的值赋值为v2
    13             context.write(K2, v2);
    14         }
    15     }

         (4)k3和v3的类型为reduce输出的类型,均为intwritable类型,但是如何根据得到的v2去统计其中相同key的value中得最大值呢?

     1 private static class MyReducer extends
     2             Reducer<TwoInt, IntWritable, IntWritable, IntWritable> {//k2,v2s,k3,v3
     3         IntWritable k3 = new IntWritable();
     4         IntWritable v3 = new IntWritable();
     5         @Override
     6         protected void reduce(
     7                 TwoInt k2,
     8                 Iterable<IntWritable> v2s,
     9                 Reducer<TwoInt, IntWritable, IntWritable, IntWritable>.Context context)
    10                 throws IOException, InterruptedException {
    11             int max=Integer.MIN_VALUE;
    12             for (IntWritable v2 : v2s) {
    13                 if (v2.get()>max) {
    14                     max=v2.get();
    15                 }
    16             }
    17             //每个组求得一个最大值可得到结果的序列
    18             v3.set(max);
    19             k3.set(k2.t1);//k2的第一列作为k3,因为k2为Twoint类型
    20             context.write(k3,v3);
    21         }
    22     }

    最终的代码如下:

      1 package Mapreduce;
      2 
      3 import java.io.DataInput;
      4 import java.io.DataOutput;
      5 import java.io.IOException;
      6 
      7 import org.apache.hadoop.conf.Configuration;
      8 import org.apache.hadoop.fs.Path;
      9 import org.apache.hadoop.io.IntWritable;
     10 import org.apache.hadoop.io.LongWritable;
     11 import org.apache.hadoop.io.NullWritable;
     12 import org.apache.hadoop.io.RawComparator;
     13 import org.apache.hadoop.io.Text;
     14 import org.apache.hadoop.io.WritableComparable;
     15 import org.apache.hadoop.io.WritableComparator;
     16 import org.apache.hadoop.mapreduce.Job;
     17 import org.apache.hadoop.mapreduce.Mapper;
     18 import org.apache.hadoop.mapreduce.Reducer;
     19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     20 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     21 
     22 
     23 public class Group2Test {
     24     public static void main(String[] args) throws Exception {
     25         Job job = Job.getInstance(new Configuration(),
     26                 Group2Test.class.getSimpleName());
     27         job.setJarByClass(Group2Test.class);
     28         // 1.自定义输入路径
     29         FileInputFormat.setInputPaths(job, new Path(args[0]));
     30         // 2.自定义mapper
     31         job.setMapperClass(MyMapper.class);
     32         //这里的k2,v2和k3,v3不同,需要显式定义k2和v2类型
     33         job.setMapOutputKeyClass(TwoInt.class);  
     34         job.setMapOutputValueClass(IntWritable.class);
     35 
     36         //当没有下面的自定义分组的话,会调用k2的compareto方法执行k2的比较,如果自定义了分组类则使用自定义分组类
     37         job.setGroupingComparatorClass(MyGroupingComparator.class);
     38 
     39         // 3.自定义reduce
     40         job.setReducerClass(MyReducer.class);
     41         job.setOutputKeyClass(IntWritable.class);
     42         job.setOutputValueClass(IntWritable.class);
     43         // 4.自定义输出路径
     44         FileOutputFormat.setOutputPath(job, new Path(args[1]));
     45 
     46         job.waitForCompletion(true);
     47     }
     48     //分组比较--自定义分组
     49     private static class MyGroupingComparator implements RawComparator {
     50         public int compare(Object o1, Object o2) {
     51             return 0;//默认的比较方法
     52         }
     53         //byte[] b1 表示第一个参数的输入字节表示,byte[] b2表示第二个参数的输入字节表示
     54         //b1 The first byte array. 第一个字节数组,
     55         //b1表示前8个字节,b2表示后8个字节,字节是按次序依次比较的
     56         //s1 The position index in b1. The object under comparison's starting index.第一列开始位置
     57         //l1 The length of the object in b1.第一列长度 ,在这里表示长度8
     58         //提供的数据集中的k2一共48个字节,k2的每一行的TwoInt类型表示8字节(t1和t2分别为4字节)
     59         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
     60             //compareBytes是按字节比较的方法,其中k2表示的是两列,第一列比较,第二例不比较
     61             //第一个字节数组的前四个字节和第二个字节数组的前四个字节比较
     62             //{3,3},{3,2},{3,1},{2,2},{2,1},{1,1}
     63             //比较上述六组的每组的第一个数字,也就是比较twoint中得t1数值
     64             //现在就根据t1可分成3个组了{3,(3,2,1)}{2,(2,1)}{1,1}
     65             //之后再从v2中取出最大值
     66             return WritableComparator.compareBytes(b1, s1, l1-4, b2, s2, l2-4);
     67         }
     68 
     69     }
     70 
     71     private static class MyMapper extends
     72             Mapper<LongWritable, Text, TwoInt, IntWritable> {
     73         //这里的v2需要改为IntWritable而不是nullwritable
     74         TwoInt K2 = new TwoInt();
     75         IntWritable v2= new IntWritable();
     76         @Override
     77         protected void map(LongWritable key, Text value,
     78                 Mapper<LongWritable, Text, TwoInt, IntWritable>.Context context)
     79                 throws IOException, InterruptedException {
     80             String[] splited = value.toString().split("	");
     81             K2.set(Integer.parseInt(splited[0]), Integer.parseInt(splited[1]));
     82             v2.set(Integer.parseInt(splited[1]));
     83             context.write(K2, v2);
     84         }
     85     }
     86 
     87     private static class MyReducer extends
     88             Reducer<TwoInt, IntWritable, IntWritable, IntWritable> {//k2,v2s,k3,v3
     89         IntWritable k3 = new IntWritable();
     90         IntWritable v3 = new IntWritable();
     91         @Override
     92         protected void reduce(
     93                 TwoInt k2,
     94                 Iterable<IntWritable> v2s,
     95                 Reducer<TwoInt, IntWritable, IntWritable, IntWritable>.Context context)
     96                 throws IOException, InterruptedException {
     97             int max=Integer.MIN_VALUE;
     98             for (IntWritable v2 : v2s) {
     99                 if (v2.get()>max) {
    100                     max=v2.get();
    101                 }
    102             }
    103             //每个组求得一个最大值可得到结果的序列
    104             v3.set(max);
    105             k3.set(k2.t1);//k2的第一列作为k3,因为k2为Twoint类型
    106             context.write(k3,v3);
    107         }
    108     }
    109 
    110     private static class TwoInt implements WritableComparable<TwoInt> {
    111         public int t1;
    112         public int t2;
    113 
    114         public void write(DataOutput out) throws IOException {
    115             out.writeInt(t1);
    116             out.writeInt(t2);
    117         }
    118 
    119         public void set(int t1, int t2) {
    120             this.t1 = t1;
    121             this.t2 = t2;
    122         }
    123 
    124         public void readFields(DataInput in) throws IOException {
    125             this.t1 = in.readInt();
    126             this.t2 = in.readInt();
    127         }
    128 
    129         public int compareTo(TwoInt o) {
    130             if (this.t1 == o.t1) { // 當第一列相等的時候,第二列升序排列
    131                 return this.t2 - o.t2;
    132             }
    133             return this.t1 - o.t1;// 當第一列不相等的時候,按第一列升序排列
    134         }
    135         @Override
    136         public String toString() {
    137             return t1 + "	" + t2;
    138         }
    139     }
    140 }
    方法1求最值

    测试并运行结果如下:

    [root@neusoft-master filecontent]# hadoop dfs -text  /out9/part-r-00000

     [root@neusoft-master filecontent]# hadoop dfs -text  /out10/part-r-00000

     

    结果是正确无误的。

     END~

           

  • 相关阅读:
    Linux工具[转]
    [C++]线程池 与 [Go] mapreduce
    快手面试代码题
    C++ note
    sudo与用户权限
    service 与 log日志
    tmux-cheatsheet
    [转]Linux下的常见信号总结
    记一个低级错误
    Github个人主页不显示提交记录的问题
  • 原文地址:https://www.cnblogs.com/jackchen-Net/p/6425757.html
Copyright © 2011-2022 走看看