mapreduce代码主要包括三个类,map类、reduce类以及测试类!
以wordcount为例,
map类为:
static class WordMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException{ StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreElements()) { word.set(itr.nextToken()); context.write(word, one); } } }
reduce类为:
static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable res = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable val:values){ sum += val.get(); } res.set(sum); context.write(key, res); } }
主函数代码为:
public static void main(String args[]) throws Exception{ String inputfilepath = "hdfs://localhost:9000/input1"; String outputfilepath = "hdfs://localhost:9000/output4"; Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(WordCount.class); job.setJobName("word-count"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(WordMapper.class); job.setReducerClass(WordReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(inputfilepath)); FileOutputFormat.setOutputPath(job, new Path(outputfilepath)); job.waitForCompletion(true); }
其他的hadoop简单实例代码如:
数字求和:
1 package goal; 2 3 import java.io.IOException; 4 import java.util.StringTokenizer; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.FloatWritable; 9 import org.apache.hadoop.io.LongWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 16 import org.apache.hadoop.util.GenericOptionsParser; 17 18 public class Sum { 19 20 public static class SumMapper extends 21 Mapper<Object, Text, Text, FloatWritable>{ 22 private Text word = new Text("sum"); 23 private static FloatWritable nv = new FloatWritable(1.0f); 24 public void map(Object key, Text value, Context context) 25 throws IOException, InterruptedException 26 { 27 StringTokenizer str = new StringTokenizer(value.toString()); 28 float sum = 0; 29 while(str.hasMoreTokens()){ 30 String s = str.nextToken(); 31 float val = Float.parseFloat(s); 32 sum = val; 33 } 34 nv.set(sum); 35 context.write(word, nv); 36 } 37 } 38 public static class SumReducer extends 39 Reducer<Text, FloatWritable, Text, FloatWritable>{ 40 private Text k = new Text("sum"); 41 private FloatWritable res = new FloatWritable(); 42 public void reduce(Text key, Iterable<FloatWritable> values, 43 Context context) throws IOException, InterruptedException{ 44 float sum = 0; 45 for(FloatWritable val : values){ 46 float v = val.get(); 47 sum += v; 48 } 49 res.set(sum); 50 context.write(k, res); 51 } 52 } 53 54 public static void main(String args[])throws Exception{ 55 String other[] = {"hdfs://localhost:9000/input2/1.txt", "hdfs://localhost:9000/output3"}; 56 Configuration conf = new Configuration(); 57 System.out.println("yes"); 58 Job job = new Job(conf, "number sum"); 59 job.setJarByClass(Sum.class); 60 job.setMapperClass(SumMapper.class); 61 job.setReducerClass(SumReducer.class); 62 job.setOutputKeyClass(Text.class); 63 job.setOutputValueClass(FloatWritable.class); 64 FileInputFormat.addInputPath(job, new Path(other[0])); 65 FileOutputFormat.setOutputPath(job, new Path(other[1])); 66 System.exit(job.waitForCompletion(true) ? 0 : 1); 67 System.out.println("yes"); 68 } 69 70 }