本文发表于本人博客。
今天接着上次【Hadoop mapreduce自定义排序WritableComparable】文章写,按照顺序那么这次应该是讲解自定义分组如何实现,关于操作顺序在这里不多说了,需要了解的可以看看我在博客园的评论,现在开始。
首先我们查看下Job这个类,发现有setGroupingComparatorClass()这个方法,具体源码如下:
/** * Define the comparator that controls which keys are grouped together * for a single call to * {@link Reducer#reduce(Object, Iterable, * org.apache.hadoop.mapreduce.Reducer.Context)} * @param cls the raw comparator to use * @throws IllegalStateException if the job is submitted */ public void setGroupingComparatorClass(Class<? extends RawComparator> cls ) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setOutputValueGroupingComparator(cls); }
从方法的源码可以看出这个方法是定义自定义键分组功能。设置这个自定义分组类必须满足extends RawComparator,那我们可以看下这个类的源码:
/** * <p> * A {@link Comparator} that operates directly on byte representations of * objects. * </p> * @param <T> * @see DeserializerComparator */ public interface RawComparator<T> extends Comparator<T> { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); }
然而这个RawComparator是泛型继承Comparator接口的,简单看了下那我们来自定义一个类继承RawComparator,代码如下:
public class MyGrouper implements RawComparator<SortAPI> { @Override public int compare(SortAPI o1, SortAPI o2) { return (int)(o1.first - o2.first); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int compareBytes = WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8); return compareBytes; } }
源码中SortAPI是上节自定义排序中的定义对象,第一个方法从注释可以看出是比较2个参数的大小,返回的是自然整数;第二个方法是在反序列化时比较,所以需要是用字节比较。接下来我们继续看看自定义MyMapper类:
public class MyMapper extends Mapper<LongWritable, Text, SortAPI, LongWritable> { @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] splied = value.toString().split(" "); try { long first = Long.parseLong(splied[0]); long second = Long.parseLong(splied[1]); context.write(new SortAPI(first,second), new LongWritable(1)); } catch (Exception e) { System.out.println(e.getMessage()); } } }
自定义MyReduce类:
public class MyReduce extends Reducer<SortAPI, LongWritable, LongWritable, LongWritable> { @Override protected void reduce(SortAPI key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { context.write(new LongWritable(key.first), new LongWritable(key.second)); } }
自定义SortAPI类:
public class SortAPI implements WritableComparable<SortAPI> { public Long first; public Long second; public SortAPI(){ } public SortAPI(long first,long second){ this.first = first; this.second = second; } @Override public int compareTo(SortAPI o) { return (int) (this.first - o.first); } @Override public void write(DataOutput out) throws IOException { out.writeLong(first); out.writeLong(second); } @Override public void readFields(DataInput in) throws IOException { this.first = in.readLong(); this.second = in.readLong(); } @Override public int hashCode() { return this.first.hashCode() + this.second.hashCode(); } @Override public boolean equals(Object obj) { if(obj instanceof SortAPI){ SortAPI o = (SortAPI)obj; return this.first == o.first && this.second == o.second; } return false; } @Override public String toString() { return "输出:" + this.first + ";" + this.second; } }
接下来准备数据,数据如下:
1 2 1 1 3 0 3 2 2 2 1 2
上传至hdfs://hadoop-master:9000/grouper/input/test.txt,main代码如下:
public class Test { static final String OUTPUT_DIR = "hdfs://hadoop-master:9000/grouper/output/"; static final String INPUT_DIR = "hdfs://hadoop-master:9000/grouper/input/test.txt"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, Test.class.getSimpleName()); job.setJarByClass(Test.class); deleteOutputFile(OUTPUT_DIR); //1设置输入目录 FileInputFormat.setInputPaths(job, INPUT_DIR); //2设置输入格式化类 job.setInputFormatClass(TextInputFormat.class); //3设置自定义Mapper以及键值类型 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(SortAPI.class); job.setMapOutputValueClass(LongWritable.class); //4分区 job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1); //5排序分组 job.setGroupingComparatorClass(MyGrouper.class); //6设置在一定Reduce以及键值类型 job.setReducerClass(MyReduce.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); //7设置输出目录 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_DIR)); //8提交job job.waitForCompletion(true); } static void deleteOutputFile(String path) throws Exception{ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI(INPUT_DIR),conf); if(fs.exists(new Path(path))){ fs.delete(new Path(path)); } } }
执行代码,然后在节点上用终端输入:hadoop fs -text /grouper/output/part-r-00000查看结果:
1 2 2 2 3 0
接下来我们修改下SortAPI类的compareTo()方法:
@Override public int compareTo(SortAPI o) { long mis = (this.first - o.first) * -1; if(mis != 0 ){ return (int)mis; } else{ return (int)(this.second - o.second); } }
再次执行并查看/grouper/output/part-r-00000文件:
3 0 2 2 1 1
这样我们就得出了同样的数据分组结果会受到排序算法的影响,比如排序是倒序那么分组也是先按照倒序数据源进行分组输出。我们还可以在map函数以及reduce函数中打印记录(过程省略)这样经过对比也得出分组阶段:键值对中key相同(即compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法返回0)的则为一组,当前组再按照顺序选择第一个往缓冲区输出(也许会存储到硬盘)。其它的相同key的键值对就不会再往缓冲区输出了。在百度上检索到这边文章,其中它的分组是把map函数输出的value全部迭代到同一个key中,就相当于上面{key,value}:{1,{2,1,2}},这个结果跟最开始没有自定义分组时是一样的,我们可以在reduce函数输出Iterable<LongWritable> values进行查看,其实我觉得这样的才算是分组吧就像数据查询一样。
在这里我们应该要弄懂分组与分区的区别。分区是对输出结果文件进行分类拆分文件以便更好查看,比如一个输出文件包含所有状态的http请求,那么为了方便查看通过分区把请求状态分成几个结果文件。分组就是把一些相同键的键值对进行计算减少输出;分区之后数据全部还是照样输出到reduce端,而分组的话就有所减少了;当然这2个步骤也是不同的阶段执行。
这次先到这里。坚持记录点点滴滴!