这里分析MapReduce原理并没用WordCount,目前没用过hadoop也没接触过大数据,感觉,只是感觉,在项目中,如果真的用到了MapReduce那待排序的肯定会更加实用。
先贴上源码
package examples; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; /** * This is an example Hadoop Map/Reduce application. It reads the text input files that must contain * two integers per a line. The output is sorted by the first and second number and grouped on the * first number. * * To run: bin/hadoop jar build/hadoop-examples.jar secondarysort <i>in-dir</i> <i>out-dir</i> */ public class SecondarySort { /** * Define a pair of integers that are writable. They are serialized in a byte comparable format. */ public static class IntPair implements WritableComparable<IntPair> { private int first = 0; private int second = 0; /** * Set the left and right values. */ public void set(int left, int right) { first = left; second = right; } public int getFirst() { return first; } public int getSecond() { return second; } /** * Read the two integers. Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1 */ @Override public void readFields(DataInput in) throws IOException { System.out.println("in read Fields"); first = in.readInt() + Integer.MIN_VALUE; second = in.readInt() + Integer.MIN_VALUE; } @Override public void write(DataOutput out) throws IOException { System.out.println("in write"); out.writeInt(first - Integer.MIN_VALUE); out.writeInt(second - Integer.MIN_VALUE); } @Override public int hashCode() { System.out.println("in hashCode"); return first * 157 + second; } @Override public boolean equals(Object right) { System.out.println("in equals"); if (right instanceof IntPair) { IntPair r = (IntPair) right; return r.first == first && r.second == second; } else { return false; } } /** A Comparator that compares serialized IntPair. */ public static class Comparator extends WritableComparator { public Comparator() { super(IntPair.class); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { System.out.println("in IntPair's Comparator's compare"); return compareBytes(b1, s1, l1, b2, s2, l2); } } static { // register this comparator WritableComparator.define(IntPair.class, new Comparator()); } @Override public int compareTo(IntPair o) { System.out.println("in IntPair's compareTo"); if (first != o.first) { return first < o.first ? -1 : 1; } else if (second != o.second) { return second < o.second ? -1 : 1; } else { return 0; } } } /** * Partition based on the first part of the pair. */ public static class FirstPartitioner extends Partitioner<IntPair, IntWritable> { @Override public int getPartition(IntPair key, IntWritable value, int numPartitions) { System.out.println("in FistPartitioner"); return Math.abs(key.getFirst() * 127) % numPartitions; } } /** * Compare only the first part of the pair, so that reduce is called once for each value of the * first part. */ // public static class FirstGroupingComparator implements RawComparator<IntPair> { // @Override // public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { //System.out.println("in FirstGroupingComparator's compare 6 params"); // return WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8, b2, s2, Integer.SIZE / 8); // } // // @Override // public int compare(IntPair o1, IntPair o2) { //System.out.println("in FirstGroupingComparator's compare 2 params"); // int l = o1.getFirst(); // int r = o2.getFirst(); // return l == r ? 0 : (l < r ? -1 : 1); // } // } public static class FirstGroupingComparator extends WritableComparator { @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { System.out.println("in FirstGroupingComparator's compare 6 params"); System.out.println("==========="); for (byte b : b1) { System.out.print(b + " - "); } System.out.println(); for (byte b : b2) { System.out.print(b + " - "); } System.out.println(); System.out.println(s1 + " " + l1 + " " +s2 + " " + l2); return WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8, b2, s2, Integer.SIZE / 8); } public int compare(IntPair o1, IntPair o2) { System.out.println("in FirstGroupingComparator's compare 2 params"); int l = o1.getFirst(); int r = o2.getFirst(); return l == r ? 0 : (l < r ? -1 : 1); } } /** * Read two integers from each line and generate a key, value pair as ((left, right), right). */ public static class MapClass extends Mapper<LongWritable, Text, IntPair, IntWritable> { private final IntPair key = new IntPair(); private final IntWritable value = new IntWritable(); @Override public void map(LongWritable inKey, Text inValue, Context context) throws IOException, InterruptedException { System.out.println("in map"); StringTokenizer itr = new StringTokenizer(inValue.toString()); int left = 0; int right = 0; if (itr.hasMoreTokens()) { left = Integer.parseInt(itr.nextToken()); if (itr.hasMoreTokens()) { right = Integer.parseInt(itr.nextToken()); } key.set(left, right); value.set(right); context.write(key, value); } } } /** * A reducer class that just emits the sum of the input values. */ public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable> { private static final Text SEPARATOR = new Text("------------------------------------------------"); private final Text first = new Text(); @Override public void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { System.out.println("in reduce"); context.write(SEPARATOR, null); first.set(Integer.toString(key.getFirst())); for (IntWritable value : values) { context.write(first, value); } } } public static void main(String[] args) throws Exception { SimpleDateFormat formatter = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss:SSS"); args = new String[] {"sort", "output/" + formatter.format(new Date())}; Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: secondarysort <in> <out>"); System.exit(2); } Job job = Job.getInstance(conf, "secondary sort"); job.setJarByClass(SecondarySort.class); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); // group and partition by the first int in the pair job.setPartitionerClass(FirstPartitioner.class); job.setGroupingComparatorClass(FirstGroupingComparator.class); // the map output is IntPair, IntWritable job.setMapOutputKeyClass(IntPair.class); job.setMapOutputValueClass(IntWritable.class); // the reduce output is Text, IntWritable job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(false) ? 0 : 1); } }
为了看程序是怎么跑的,所以在上面加上了各种输出。
从头开始分析,单纯的从main方法开始,集群上不太好跟踪代码,所以分析本地的,原理都类似。
这里是在单机跑的,纯单机,eclipse的hadoop插件都没有,这里在main方法的参数中加上了输入、输出路径。
启动main方法后,new Configuration()先加载默认的配置文件。加载core-default.xml,core-site.xml。
new GenericOptionsParser(conf, args).getRemainingArgs(),先获取命令行上的参数,剩下的就是输入输出路径(这里是在代码里直接启动,没有命令行参数,也可以模拟方法和输入输出路径类似)。
设置job的一些配置。先根据configuration生成JobConf,加载配置。在根据JobConf获取Job实例,设置job名。
自定义的设置job属性,setJarByClass程序的入口。设置map,reduce类。
setPartitionerClass设置分区器,如果是在集群中,会有多个map同时运行,分区器用来确定map的输出值应该分配到哪个reduce。如果没设置这个,默认的分区器是HashPartitioner调用输出键的hashCode方法,然后用hashCode方法的结果对reduce的数量进行一个模数(modulo)运算,最后得到那个目标reduce。默认的分区器使用整个键。这就不适于组合键了。因为它可能把有同样自然键的组合键发送给不同的reduce。因此,就需要自定义分区器,基于自然键进行分区。我们在这里虽然设置了这个参数,但是如果数量小,并不会调用自定义的getPartition方法。
setGroupingComparatorClass。分组,当reduce阶段将在本地磁盘上的map输出的记录进行流化处理(streaming)的时候,需要要进行分组。在分组中,记录将被按一定方式排成一个有逻辑顺序的流,并被传输给reduce。在分组阶段,所有的记录已经经过了次排序。分组比较器需要将有相同fitst值的记录分在同一个组。
设置map和reduce端的输入输出key和value。
FileInputFormat的超类是InputFormat,getSplits方法将输入的数据分片,createRecordReader将分片的数据生成一个个key/value的键值对传给map处理,computeSplitSize方法就是需要多少个map处理数据。hdfs中块的大小是64M,Split大小最好也设置成64M,利于资源本地化。Split只是一个逻辑的概念,它只是记录了数据的起始位置,路径等一些其他的信息。
FileOutputFormat是设置输出的格式。
进入job.waitForCompletion(false)开始程序。false不显示最后的统计信息,可以设置成true显示信息。
waitForCompletion方法中,开始进入submit()提交作业。这里hadoop的版本是3.0.0-alpha,会有设置使用新的api。进入connect方法,这里的方法主要的作用是在Job类中实例化一个cluster(return new Cluster(getConfiguration()))建立连接,进入initialize方法初始化,根据jobTrackAddr为空生成ClientProtocol clientProtocol,最终生成的是LocalJobRunner(在集群中,可以配置mapreduce使用yarn,这时这里生成的就是YARNRunner,YARNRunner的作用管理Job和Task)。这里有MapTaskRunnable和ReduceTaskRunnable,这两个里面有run方法,分别是map和reduce启动时调用的方法。上面ClientProtocol,还有其他的protocol是RPC通讯时的协议。将信息封装进Cluster cluster中,根据cluster得到JobSubmitter submitter,去提交作业submitter.submitJobInternal。
checkSpecs()检查输出路径是否存在,如果存在就报错。
JobSubmissionFiles.getStagingDir()初始化用于存放job相关资源的存放路径,会创建目录file:/tmp/hadoop/mapred/staging/{电脑登陆的用户名}{随机数}/.staging
获取并设置hostname和ip
ClientProtocol类型的submitClient(这里是LocalJobRunner)通过RPC申请一个JobID,如果集群用了Yarn,就是向ResourceManager申请JobID。设置在job上。
通过上面生成的job相关资源的存放目录和JobID一起生成一个新的提交作业的目录。
从HDFS的NameNode获取验证用的Token,并将其放入缓存。
copyAndConfigureFiles()。上传资源到上面生成的提交作业的目录。设置工作目录。
生成一个提交作业的xml配置文件(file:/tmp/hadoop/mapred/staging/fzk1167408138/.staging/job_local1167408138_0001/job.xml)就是上面提交作业的目录下的job.xml文件。
向上面生成的job.xml文件写入数据。进入到writeNewSplits()方法,方法中获取InputFormat(这里是TextInputFormat)的实例input,调用input.getSplits()获取List<InputSplit> splits。分片的大小由如下几个参数决定:mapreduce.input.fileinputformat.split.maxsize、mapreduce.input.fileinputformat.split.minsize、文件的块大小。具体计算方式为:Math.max(minSize, Math.min(maxSize, blockSize))分片的大小有可能比默认块大小64M要大,当然也有可能小于它,默认情况下分片大小为当前HDFS的块大小,64M。调用SplitComparator的compare把splits排序,SplitComparator是JobSubmitter的内部类目的就是排序。排序之后大的在前边。创建file:/tmp/hadoop/mapred/staging/ito1621858443/.staging/job_local1621858443_0001/job.split。设置split数据备份数(mapreduce.client.submit.file.replication),默认是10。splits的个数就是map任务的个数。
将配置文件( core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml)写入到submitJobFile(file:/tmp/hadoop/mapred/staging/fzk1621858443/.staging/job_local1621858443_0001/job.xml)文件中。
submitClient.submitJob()。提交任务。集群中就是提交到Yarn中。方法中,创建localJobFile(file:/tmp/hadoop-fzk/mapred/local/localRunner/ito/job_local1621858443_0001/job_local1621858443_0001.xml),将配置文件写入。开始map和reduce线程(this.start)。在LocalJobRunner中的run方法中调用runtasks()方法,map任务结束后执行reduce任务。先执行MapTaskRunnable的run方法。所有map任务结束后执行ReduceTaskRunnable的run方法。上面一系列的配置文件的复制就是在为map和reduce准备。
-->MapTaskRunnable.run()
获取MapTask map,进行设置。
-->map.run(MapTask.run())
-->initialize(job, getJobID(), reporter, useNewApi) 这里会初始化临时工作目录和输出目录
--> runNewMapper() 获取Mapper,InputFormat,Split,RecordReader<INKEY,INVALUE> input,RecordWriter output等
-->mapper.run(mapperContext) setup()为空方法。
-->map()方法,用户自定义的map()方法中
-->context.write(key, value) -->WrappedMapper.Context.write() -->TaskInputOutputContextImpl.write() --> MapTask.NewOutputCollector.write() 方法中只有collector.collect(key, value,partitioner.getPartition(key, value, partitions)); 一行代码partitioner.getPartition(key, value, partitions)中的partitioner是MapTask.NewOutputCollector.(org.apache.hadoop.mapreduce.Partitioner<K,V>),还记得一开始设置的setPartitionerClass()。这里的代码
if (partitions > 1) { partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); } else { partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() { @Override public int getPartition(K key, V value, int numPartitions) { return partitions - 1; } }; }
partitions也是MapTask.NewOutputCollector中的成员,分离的数量也就是reduceTask的数量,这里的数据量比较少只有一个,所以走else。在else中Partitioner中实现了compare方法,所以走的是这个compare方法。如果这里的数据量大的花,有多余一个reduceTask的话就会走自己的FirstPartitioner中的getPartition方法。可以试试。
collector.collect-->MapTask.MapOutputBuffer.collect() 中间有行keySerializer.serialize(key)方法,进入到自己写的IntPair的write()方法。
反复执行map(),将所有数据解析完成后执行cleanup()方法,默认空方法,同setup()一样,程序可重写。
mapper.run()执行完成后接着向下执行,进入output.close(mapperContext);
-->MapTask.close()
-->collector.flush()
-->MapTask.sortAndSpill() 创建临时文件/tmp/hadoop-fzk/mapred/local/localRunner/fzk/jobcache/job_local1621858443_0001/attempt_local1621858443_0001_m_000000_0/output/spill0.out
进入sorter.sort(MapOutputBuffer.this, mstart, mend, reporter) --> Quick.sort() --> Quick.sortInternal()
其中不断调用s.compare()方法,这个s从最外层传的是MapOutputBuffer.this。s.compare() --> Task.MapOutputBuffer.compare() 进入了我们自定义的Comparator的compare()方法。 -->WritebleComparator.compareBytes() -->FastByteComparisons.compareTo() -->LexicographicalComparerHolder.BEST_COMPARER.compareTo()最终的这个compareTo方法是二进制的比较方法,这种比较方法不需要进行序列化,效率更高。这里为什么会进入我们的自定义的Comparator的compare方法?MapReduce程序中,想使用自定义的类最为Key那么就必须继承Writable和Comparable接口,继承Writable是因为文件会进行序列化和反序列化,也正因为这个我们也需要提供一个无参构造器。继承Comparable是因为在进行作业时,项目会多次进行排序。还需要重写hashCode方法和equals方法。由于需要二次排序所以还需要一个比较器(这里是自定义的Comparator,IntPair的内部类),一般这个类写成Key的内部类,并提供一个无参构造器,将要比较的类(IntPari)传入。并重写compare方法。然后注册。
public static class Comparator extends WritableComparator { public Comparator() { super(IntPair.class); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {return compareBytes(b1, s1, l1, b2, s2, l2); } } static { // register this comparator WritableComparator.define(IntPair.class, new Comparator()); }
为什么需要重写compare方法。从super(IntPair.class)深入看看父类中的代码
private final Class<? extends WritableComparable> keyClass; private final WritableComparable key1; private final WritableComparable key2; private final DataInputBuffer buffer; protected WritableComparator() { this(null); } /** Construct for a {@link WritableComparable} implementation. */ protected WritableComparator(Class<? extends WritableComparable> keyClass) { this(keyClass, null, false); } protected WritableComparator(Class<? extends WritableComparable> keyClass, boolean createInstances) { this(keyClass, null, createInstances); } protected WritableComparator(Class<? extends WritableComparable> keyClass, Configuration conf, boolean createInstances) { this.keyClass = keyClass; this.conf = (conf != null) ? conf : new Configuration(); if (createInstances) { key1 = newKey(); key2 = newKey(); buffer = new DataInputBuffer(); } else { key1 = key2 = null; buffer = null; } }
super(IntPair.class)实例化的WritableComparator中的key1,key2,buffer均为空。如果调用默认的compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2):
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { buffer.reset(b1, s1, l1); // parse key1 key1.readFields(buffer); buffer.reset(b2, s2, l2); // parse key2 key2.readFields(buffer); buffer.reset(null, 0, 0); // clean up reference } catch (IOException e) { throw new RuntimeException(e); } return compare(key1, key2); // compare them }
buffer为空,这里会空指针。所以重写compare方法,让其直接调用compareBytes方法。而其中不在涉及到序列化后的IntPair,compare(Object a, Object b)将不会调用,我们也不需要重写。
public static int compareBytes(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2); }
排序之后开始记录数据,如果totalIndexCacheMemory >= indexCacheMemoryLimit,就会向文件中溢写。
sortAndSpill()执行完成后mergeParts()合并。这又是一个大过程。在想想这里。最终的结果是将相同的key合并成一组,并复制文件到目标区。
collector.flush()执行完成进入collector.close()方法,这是一个空方法。
之后是一系列的关闭资源,关闭通讯线程等一些操作,map端执行完成。
上面就是map端的执行过程,在重新看一下,首先根据InputFormat将文件分块,在将文件转成key/value的RecordReader传给map处理,经过map处理过转成key/values的键值对,然后会通过分区器(Partitioner)确定应该输出在哪个reduce,然后进行文件的sort,通过IntPair.Comparator.compare()方法进行排序。这里起名为secondarysort,其实还是只进行了依次排序,只不过这依次排序会根据fisrt和second进行排序(类似于我们在java中实现了Comparator接口的类),这个compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法是进行一个字节一个字节的比较,知道找到不一样的字节,在判断哪个字节大。在map端已经是拍好顺序的文件了。
reduce端。
ReduceTask.run() 初始化,获取各种需要的变量后shuffle。这里属于shuffle的后半段,前半段是在map端,就是在sortAndSpill()方法中,不断的进行merge,reduce端不断的获取数据。在reduce端也是一样的道理,先不断的copy map的输出文件,启动的Fetcher线程。不断的Merge,数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。merge有三种形式:1)内存到内存 2)内存到磁盘 3)磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也会启用,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的那个文件。最终的文件可能在磁盘,可能在内存中。shuffle结束后才向下进行。
进入runNewReducer()
--> reducer.run() 在调用context.nextKey()时 调用的是Reducer.Context.next() --> ReduceContextImpl.nextKey --> ReduceContextImpl.nextKeyValue() 中的key = keyDeserializer.deserialize(key);就是调用自己写的IntPait的readFields方法。继续向下在方法最后执行了comparator.compare()方法就是自定义的FirstGroupingComparator.compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法。comparator是ReduceContextImpl的成员变量,赋值的时候是在开始设置变量的时候。这里原理和上面map端分析的过程一样,也解释了为什么compare(IntPair o1, IntPair o2)不执行的原因。
setGroupingComparatorClass()这个部分设置的分组类的作用并没有做排序功能,reduce端的作用就是获取map端的输出文件,一条一条读取。设置这个的作用是在reduce读取一条记录后,还会判断下面的一条数据和本条数据是不是在一个组里,而在不在一个组里调用的方法就是FirstGroupingComparator.compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法。迭代器的next方法--> ReduceContextImpl.next() 这个方法里会执行一下nextKeyValue()方法,在nextKeyValue()方法中如果hasMore重新计算nextKeyIsSame。
//ReduceContextImpl.next() --> ReduceContextImpl.nextKeyValue() 中代码片段
hasMore = input.next(); if (hasMore) { nextKey = input.getKey(); nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, currentRawKey.getLength(), nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition() ) == 0; } else { nextKeyIsSame = false; }
comparator.compare会调用自定义的FirstGroupingComparator.compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) --> WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8, b2, s2, Integer.SIZE / 8),这里并没有将长度l1和l2传进去,传的是Integer.SIZE / 8,Integer.SIZE是32。l1 l2代表的是length。就只取前四个。
看一下怎么实现的只读取first值进行比较的。上面代码中currentRawKey和nextKey的赋值方式。
currentRawKey.set(nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition()); nextKey = input.getKey();
自定义读取二进制和转换二进制的方式:
public void readFields(DataInput in) throws IOException { first = in.readInt() + Integer.MIN_VALUE; second = in.readInt() + Integer.MIN_VALUE; } public void write(DataOutput out) throws IOException {out.writeInt(first - Integer.MIN_VALUE); out.writeInt(second - Integer.MIN_VALUE); }
自定义的FirstGroupingComparator.compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)的参数全部输出出来(只看一组)
@Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {for (byte b : b1) { System.out.print(b + " - "); } System.out.println(); for (byte b : b2) { System.out.print(b + " - "); } System.out.println(); System.out.println(s1 + " " + l1 + " " +s2 + " " + l2); return WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8, b2, s2, Integer.SIZE / 8); } 输出:
-128 - 0 - 0 - 123 - -128 - 0 - 0 - 23 - 0 - 0 - 0 - 0 -
-128 - 0 - 0 - 123 - -128 - 0 - 0 - 120 - 0 - 0 - 0 - 0 - 0 - 0 - 0 - 0 -
0 8 0 8
以下的观点只是我的推测,这个地方没弄太明白:
Integer.size / 8 就是Integer所占的字节数 4。在输出中,第一个数是-128 注意是一个 “-”号,而第四个字符就是我们sort文件中的左边的值123,在向后四个是右边的数字23。下边一样,但是后边又多出了四个字符。多的原因应该是currentRawKey.getBytes()和nextKey.getData()的数据结构不一样。但是前面有效占位都是一样的,是通过map端调用write()方法写进的。Integer占四个字节,头四个字节组成了一个Integer的值,为什么是负数,out.writeInt(first - Integer.MIN_VALUE)是这么写进的。紧接着又写入右边的值out.writeInt(second - Integer.MIN_VALUE);为什么又多出来四个0,不知道和OutPutStraming.writeInt()方法有没有关系。总之,前面已经读取到想要的数据了。参数中的s1、s2是offset,l1、l2是length向对比的长度。b1、b2就是想要比较的数据。取前四个也就是取第一个读进out中的数据也就是IntPair中的first。
在判断是不是同一组的时候,Reduce端读取的是IntPair类型的Key,读取的结果是 {(123 23) 23} {(123 128) 128}这种数据,小括号表示IntPair类型,大括号表示一组数据,自行忽略。读过来后,把Key进行比较,上面说的compare方法,在compare方法中比较(123 23)和(123 128)中的第一个提取出来比较,最后发现一样,比较结果返回0。那么这两个数据就在一个reduce中处理。不断重复。
迭代器的hasNext()的结果根据nextKeyIsSame获取
//ReduceContextImpl.class
public boolean hasNext() { try { if (inReset && backupStore.hasNext()) { return true; } } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("hasNext failed", e); } return firstValue || nextKeyIsSame; }
在外层调用reduce()方法的循环中
while (context.nextKey()) { reduce(context.getCurrentKey(), context.getValues(), context); // If a back up store is used, reset it Iterator<VALUEIN> iter = context.getValues().iterator(); if(iter instanceof ReduceContext.ValueIterator) { ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); } }
上面加黑部分应该能理解,nextKey() --> WrapperReduce.Context.nextKey() --> ReduceContextImpl.nextKey()
//ReduceContextImpl.class public boolean nextKey() throws IOException,InterruptedException { while (hasMore && nextKeyIsSame) { nextKeyValue(); } if (hasMore) { if (inputKeyCounter != null) { inputKeyCounter.increment(1); } return nextKeyValue(); } else { return false; } }
又是根据hasMore和nextKeyIsSame判断,又会走进nextKeyValue()方法中。
最终的意思就是如果下一个的first的值相同,那么就是同一组,就会在同一个reduce里处理。这里并没有排序,只是顺序读取数据和判断一下。
reduce结束,关闭该关闭的资源、线程,删除临时文件。最终程序结束。
刚接触hadoop不久,好多地方还不明白,如果有错误的地方希望指出,这只是作为自己理解MapReduce的鉴证。
参考:http://www.cnblogs.com/datacloud/p/3584640.html
http://www.aboutyun.com/forum.php?mod=viewthread&tid=9366&highlight=hadoop2%CC%E1%BD%BB%B5%BDYarn