1.wordcount的代码如下
public class WordCount
{
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context ) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2)
{
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
2.一个可以运行的mapreduce程序可以包含哪些元素呢?
JobConf 常用可定制参数
参数 |
作用 |
缺省值 |
其它实现 |
InputFormat |
将输入的数据集切割成小数据集 InputSplits, 每一个InputSplit 将由一个 Mapper 负责处理。此外InputFormat 中还提供一个 RecordReader 的实现,将一个 InputSplit 解析成 <key,value> 对提供给 map函数。 |
TextInputFormat |
SequenceFileInputFormat |
OutputFormat |
提供一个 RecordWriter 的实现,负责输出最终结果 |
TextOutputFormat |
SequenceFileOutputFormat |
OutputKeyClass |
输出的最终结果中 key 的类型 |
LongWritable |
|
OutputValueClass |
输出的最终结果中 value 的类型 |
Text |
|
MapperClass |
Mapper 类,实现 map 函数,完成输入的<key,value> 到中间结果的映射 |
IdentityMapper |
LongSumReducer, |
CombinerClass |
实现 combine 函数,将中间结果中的重复 key 做合并 |
null |
|
ReducerClass |
Reducer 类,实现 reduce 函数,对中间结果做合并,形成最终结果 |
IdentityReducer |
AccumulatingReducer, LongSumReducer |
InputPath |
设定 job 的输入目录, job 运行时会处理输入目录下的所有文件 |
null |
|
OutputPath |
设定 job 的输出目录,job 的最终结果会写入输出目录下 |
null |
|
MapOutputKeyClass |
设定 map 函数输出的中间结果中 key 的类型 |
如果用户没有设定的话,使用OutputKeyClass |
|
MapOutputValueClass |
设定 map 函数输出的中间结果中 value 的类型 |
如果用户没有设定的话,使用OutputValuesClass |
|
OutputKeyComparator |
对结果中的 key 进行排序时的使用的比较器 |
WritableComparable |
|
PartitionerClass |
对中间结果的 key 排序后,用此 Partition 函数将其划分为R份,每份由一个 Reducer 负责处理。 |
HashPartitioner |
KeyFieldBasedPartitioner PipesPartitioner |
比较容易疑惑的是:
InputFormat:读取输入文件,以自定义map的输入数据格式,传给map.(如下红色字体)
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>MapOutputKeyClass,MapOutputValueClass:定义了map的输出数据的格式,reduce的输入数据格式
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
OutputKeyClass,OutputValueClass:定义了reduce的输出数据格式,OutputFormat的输入格式
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
OutputFormat:将mapreduce的结果数据写入到文件中去
OutputKeyComparator/OutputValueGroupingComparator:二次排序用的
参考文献:
1.http://hadoop.apache.org/docs/r0.19.1/cn/mapred_tutorial.html
2.http://caibinbupt.iteye.com/blog/338785
3.http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop2/index.html
4.http://www.riccomini.name/Topics/DistributedComputing/Hadoop/SortByValue/