4.2.2 总排序(Total order sorting)
有的时候需要将作业的的所有输出进行总排序,使各个输出之间的结果是有序的。有以下实例:
- 如果要得到某个网站中最受欢迎的网址(URL),就需要根据某种受欢迎的指标来对网址进行排序。
- 如果要让最活跃的用户能够看到某张表,就需要根据某种标准(发表文章数)对用户进行排序。
技术22 在多个reduce间对键进行排序
在MapReduce框架中,map的输出会被排序,然后被发送给reduce。不过,相同reduce的输入数据是有序的,不同reduce的输入数据就没有顺序关系了。如果要让不同的reduce的数据也存在顺序关系,就需要使用分区器(partitioner)。MapReduce的默认分区器是HashPartitioner。它使用map的输出键的哈希值进行分区。这保证了相同的map输出键的所有记录会到达同一个reduce。不过HashPartitioner并不会对所有map的全部输出键进行总排序。接下来说明如何在MapReduce中对所有map的全部输出键进行排序:
问题
需要对作业输出的所有键进行总排序,但是不能增加任何一个reduce的负担。
方案
这里要用到TotalOrderPartitioner类来保证所有reduce的全部输出是有序的。这个类由Hadoop自带。这个分类器保证了所有map的全部输出是完全有序的。那么只要reduce的输出键和输入键是一样的,作业的最终输出就是有序的。
讨论
TotalOrderPartitioner是Hadoop的内置分区器。它根据分区文件进行分区。分区文件是一个包括N-1个键的预先计算好的序列文件。(N是指reduce的个数。)分区文件中的键的顺序是由map输出键比较器决定的。每一个键对应着一个逻辑区间。TotalOrderPartitioner检查每一个输出键,确定它在那个区间,然后将这个键发送给相对应的reduce。
图4.15中说明了这个技术的两个部分。第一部分,创建分区文件。第二部分,将TotalOrderPartitioner加入MapReduce作业。
先用InputSampler从输入文件中抽样,以生成分区文件。抽样器可以选用RandomSampler类进行随机抽样,也可以选用IntervalSampler类进行间距为R的等距抽样。生成的分区文件中将包含有序的N-1个键。N是reduce的个数。InputSampler不是MapReduce作业。它从InputFormat读取数据。它在被调用的过程中生成分区。
下列代码说明了在调用InputSampler函数之前需要完成的步骤:
1 int numReducers = 2; 2 3 Path input = new Path(args[0]); 4 5 Path partitionFile = new Path(args[1]); 6 7 InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<Text,Text>(0.1, 10000, 10); 8 9 JobConf job = new JobConf(); 10 11 job.setNumReduceTasks(numReducers); 12 13 job.setInputFormat(KeyValueTextInputFormat.class); 14 15 job.setMapOutputKeyClass(Text.class); 16 17 job.setMapOutputValueClass(Text.class); 18 19 TotalOrderPartitioner.setPartitionFile(job, partitionFile); 20 21 FileInputFormat.setInputPaths(job, input); 22 23 InputSampler.writePartitionFile(job, sampler);
下一步在作业中指定TotalOrderPartitioner为分区器:
1 job.setPartitionerClass(TotalOrderPartitioner.class);
这个技术并不需要修改MapReduce作业本身,也就是说,不需要修改map或reduce过程。现在就可以开始运行代码了:
$ hadoop fs -put test-data/names.txt names.txt $ bin/run.sh com.manning.hip.ch4.sort.total.TotalSortMapReduce names.txt large-names-sampled.txt output $ hadoop fs -ls output /user/aholmes/output/part-00000 /user/aholmes/output/part-00001 $ hadoop fs -cat output/part-00000 | head AABERG AABY AADLAND $ hadoop fs -cat output/part-00000 | tail LANCZ LAND LANDA $ hadoop fs -cat output/part-00001 | head LANDACRE LANDAKER LANDAN $ hadoop fs -cat output/part-00001 | tail ZYSK ZYSKOWSKI ZYWIEC
从MapReduce作业的结果中可以看到,在各个输出文件之间,map的输出键是有序的。
小结
这个技术中使用InputSampler来创建分区文件。TotalOrderPartitioner使用这个分区文件来分区map的输出键。
MapReduce也可以生成分区文件,但效率不高。另一个有效的的方法就是用自定义的InputFormat类来执行抽样,并将抽样后的键发送给一个reduce,由其创建分区文件。这也就是这一章下一个部分讲到的抽样。