自定义分组
job.setGroupingComparatorClass(MyGroupingComparator.class); //按照第一列进行分组,然后找出每个分组中的第二列中的最小值
为什么要自定义分组?
业务要求分组是按照第一列分组,但是NewK2的比较规则决定了不能按照第一列分。只能自定义分组比较器。
1 package group; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import java.net.URI; 7 import java.util.Comparator; 8 import java.util.function.Function; 9 import java.util.function.ToDoubleFunction; 10 import java.util.function.ToIntFunction; 11 import java.util.function.ToLongFunction; 12 13 import org.apache.hadoop.conf.Configuration; 14 import org.apache.hadoop.fs.FileSystem; 15 import org.apache.hadoop.fs.Path; 16 import org.apache.hadoop.io.LongWritable; 17 import org.apache.hadoop.io.RawComparator; 18 import org.apache.hadoop.io.Text; 19 import org.apache.hadoop.io.WritableComparable; 20 import org.apache.hadoop.io.WritableComparator; 21 import org.apache.hadoop.io.file.tfile.RawComparable; 22 import org.apache.hadoop.mapreduce.Job; 23 import org.apache.hadoop.mapreduce.Mapper; 24 import org.apache.hadoop.mapreduce.Reducer; 25 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 26 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 27 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 28 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 29 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 30 31 public class GroupApp { 32 static final String INPUT_PATH = "hdfs://chaoren:9000/input"; 33 static final String OUT_PATH = "hdfs://chaoren:9000/out"; 34 35 public static void main(String[] args) throws Exception { 36 final Configuration configuration = new Configuration(); 37 38 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), 39 configuration); 40 if (fileSystem.exists(new Path(OUT_PATH))) { 41 fileSystem.delete(new Path(OUT_PATH), true); 42 } 43 44 final Job job = new Job(configuration, GroupApp.class.getSimpleName()); 45 46 // 1.1 指定输入文件路径 47 FileInputFormat.setInputPaths(job, INPUT_PATH); 48 // 指定哪个类用来格式化输入文件 49 job.setInputFormatClass(TextInputFormat.class); 50 51 // 1.2指定自定义的Mapper类 52 job.setMapperClass(MyMapper.class); 53 // 指定输出<k2,v2>的类型 54 job.setMapOutputKeyClass(NewK2.class); 55 job.setMapOutputValueClass(LongWritable.class); 56 57 // 1.3 指定分区类 58 job.setPartitionerClass(HashPartitioner.class); 59 job.setNumReduceTasks(1); 60 61 // 1.4 TODO 排序、分区 62 /** 63 * 分组:按照第一列分区 64 */ 65 job.setGroupingComparatorClass(MyGroupingComparator.class); 66 67 // 1.5 TODO (可选)合并 68 69 // 2.2 指定自定义的reduce类 70 job.setReducerClass(MyReducer.class); 71 // 指定输出<k3,v3>的类型 72 job.setOutputKeyClass(LongWritable.class); 73 job.setOutputValueClass(LongWritable.class); 74 75 // 2.3 指定输出到哪里 76 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); 77 // 设定输出文件的格式化类 78 job.setOutputFormatClass(TextOutputFormat.class); 79 80 // 把代码提交给JobTracker执行 81 job.waitForCompletion(true); 82 } 83 84 static class MyMapper extends 85 Mapper<LongWritable, Text, NewK2, LongWritable> { 86 protected void map( 87 LongWritable key, 88 Text value, 89 org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, NewK2, LongWritable>.Context context) 90 throws java.io.IOException, InterruptedException { 91 final String[] splited = value.toString().split(" "); 92 final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), 93 Long.parseLong(splited[1])); 94 final LongWritable v2 = new LongWritable(Long.parseLong(splited[1])); 95 context.write(k2, v2); 96 }; 97 } 98 99 static class MyReducer extends 100 Reducer<NewK2, LongWritable, LongWritable, LongWritable> { 101 protected void reduce( 102 NewK2 k2, 103 java.lang.Iterable<LongWritable> v2s, 104 org.apache.hadoop.mapreduce.Reducer<NewK2, LongWritable, LongWritable, LongWritable>.Context context) 105 throws java.io.IOException, InterruptedException { 106 long min = Long.MAX_VALUE; 107 for (LongWritable v2 : v2s) { 108 if (v2.get() < min) { 109 min = v2.get(); 110 } 111 } 112 context.write(new LongWritable(k2.first), new LongWritable(min)); 113 }; 114 } 115 116 /** 117 * 问:为什么实现该类? 答:因为原来的v2不能参与排序,把原来的k2和v2封装到一个类中,作为新的k2 118 * 119 */ 120 // WritableComparable:Hadoop的序列化 121 static class NewK2 implements WritableComparable<NewK2> { 122 Long first; 123 Long second; 124 125 public NewK2() { 126 } 127 128 public NewK2(long first, long second) { 129 this.first = first; 130 this.second = second; 131 } 132 133 public void readFields(DataInput in) throws IOException { 134 this.first = in.readLong(); 135 this.second = in.readLong(); 136 } 137 138 public void write(DataOutput out) throws IOException { 139 out.writeLong(first); 140 out.writeLong(second); 141 } 142 143 /** 144 * 当k2进行排序时,会调用该方法. 当第一列不同时,升序;当第一列相同时,第二列升序 145 */ 146 public int compareTo(NewK2 o) { 147 final long minus = this.first - o.first; 148 if (minus != 0) { 149 return (int) minus; 150 } 151 return (int) (this.second - o.second); 152 } 153 154 @Override 155 public int hashCode() { 156 return this.first.hashCode() + this.second.hashCode(); 157 } 158 159 @Override 160 public boolean equals(Object obj) { 161 if (!(obj instanceof NewK2)) { 162 return false; 163 } 164 NewK2 oK2 = (NewK2) obj; 165 return (this.first == oK2.first) && (this.second == oK2.second); 166 } 167 } 168 169 static class MyGroupingComparator implements RawComparator<NewK2> { 170 171 public int compare(NewK2 o1, NewK2 o2) { 172 return (int) (o1.first - o2.first); 173 } 174 175 /** 176 * @param arg0 177 * 表示第一个参与分组的字节数组 178 * @param arg1 179 * 表示第一个参与分组的字节数组的起始位置 180 * @param arg2 181 * 表示第一个参与分组的字节数组的偏移量 182 * 183 * @param arg0 184 * 表示第二个参与分组的字节数组 185 * @param arg1 186 * 表示第二个参与分组的字节数组的起始位置 187 * @param arg2 188 * 表示第二个参与分组的字节数组的偏移量 189 */ 190 public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, 191 int arg4, int arg5) { 192 return WritableComparator 193 .compareBytes(arg0, arg1, 8, arg3, arg4, 8); 194 } 195 196 } 197 198 }