mapreduce版本:0.2.0之前
说明:
该注释为之前学习时找到的一篇,现在只是在入门以后对该注释做了一些修正以及添加。
由于版本问题,该代码并没有在集群环境中运行,只将其做为理解mapreduce的参考吧。
切记,该版本是0.2.0之前的版本,请分辨清楚!
正文:
package org.apache.hadoop.examples; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class WordCount { //Map类继承自MapReduceBase,并且实现了Mapper接口,此接口是一个规范类型. //它有4种形式的参数,分别用来指定map的输入key、value值类型,输出key、value值类型 public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); //实现map方法,对输入值进行处理。(此处用来去掉空格) public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } } /* //Reduce类也是继承自MapReduceBase的,需要实现Reducer接口。 //Reduce类以map的输出作为输入,因此Reduce的输入类型是<Text,Intwritable>。 //而Reduce的输出是单词和它的数目,因此,它的输出类型是<Text,IntWritable>。 //Reduce类也要实现reduce方法,在此方法中,reduce函数将输入的key值作为输出的key值,然后将获得多个value值加起来,作为输出的值。 */ public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { //1.用JobConf类对 MapReduce job进行初始化 JobConf conf = new JobConf(WordCount.class); // 调用setJobName()方法命名这个Job conf.setJobName("wordcount"); //setup2:设置Job输出结果<key,value>的中key和value数据类型,因为结果是<单词,个数> //所以key设置为"Text"类型,相当于Java中String类型。 conf.setOutputKeyClass(Text.class); //Value设置为"IntWritable",相当于Java中的int类型。 conf.setOutputValueClass(IntWritable.class); //setup3:指定job的MapReduce,以及combiner //设置Job处理的Map(拆分) conf.setMapperClass(Map.class); //设置Job处理的Combiner(中间结果合并,这里用Reduce类来进行Map产生的中间结果合并,避免给网络数据传输产生压力。) 也可以不用设置(已默认) conf.setCombinerClass(Reduce.class); //设置Job处理的Reduce(合并) conf.setReducerClass(Reduce.class); //指定输入输出路径,可在项目上右键->Run As->Run Configuration->arguments->program arguments中配置 即为main(String[] args)中String[] args赋值 //指定InputPaths eg:hdfs://master:9000/input1/ FileInputFormat.setInputPaths(conf, new Path(args[0])); //指定outputPaths eg:hdfs://master:9000/input1/ FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }