在网上看见一个见得的hadoop例子,给要学习的大家分享一下
package com.run.ayena.distributed.test; 002 003 import java.io.IOException; 004 import java.util.StringTokenizer; 005 006 import org.apache.hadoop.conf.Configuration; 007 import org.apache.hadoop.fs.Path; 008 import org.apache.hadoop.io.IntWritable; 009 import org.apache.hadoop.io.Text; 010 import org.apache.hadoop.mapreduce.Job; 011 import org.apache.hadoop.mapreduce.Mapper; 012 import org.apache.hadoop.mapreduce.Reducer; 013 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 014 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 015 import org.apache.hadoop.util.GenericOptionsParser; 016 017 ////统计文本中指定某个单词出现的次数 018 019 020 021 public class SingleWordCount { 022 public static class SingleWordCountMapper extends 023 Mapper<Object, Text, Text, IntWritable> { 024 025 private final static IntWritable one = new IntWritable(1); 026 private Text val = new Text(); 027 028 public void map(Object key, Text value, Context context) 029 throws IOException, InterruptedException { 030 StringTokenizer itr = new StringTokenizer(value.toString()); 031 String keyword = context.getConfiguration().get("word"); 032 while (itr.hasMoreTokens()) { 033 String nextkey = itr.nextToken(); 034 if (nextkey.trim().equals(keyword)) { 035 val.set(nextkey); 036 context.write(val, one); 037 } else { 038 // do nothing 039 } 040 } 041 } 042 } 043 044 public static class SingleWordCountReducer extends 045 Reducer<Text,IntWritable,Text,IntWritable> { 046 private IntWritable result = new IntWritable(); 047 048 public void reduce(Text key, Iterable<IntWritable> values, 049 Context context) throws IOException, InterruptedException { 050 int sum = 0; 051 for (IntWritable val : values) { 052 sum += val.get(); 053 } 054 result.set(sum); 055 context.write(key, result); 056 } 057 } 058 059 public static void main(String[] args) throws Exception { 060 Configuration conf = new Configuration(); 061 String[] otherArgs = new GenericOptionsParser(conf, args) 062 .getRemainingArgs(); 063 if (otherArgs.length != 3) { 064 System.err.println("Usage: wordcount "); 065 System.exit(2); 066 } 067 068 069 // 输入指定的单词 070 conf.set("word", otherArgs[2]); 071 072 // 指定系统路 073 074 conf.set("mapred.system.dir", "/cygdrive/e/workspace_hadoop/SingleWordCount/"); 075 076 // 设置运行的job名称 077 Job job = new Job(conf, "word count"); 078 079 // 设置运行的job类 080 job.setJarByClass(SingleWordCount.class); 081 082 // 设置Mapper 083 job.setMapperClass(SingleWordCountMapper.class); 084 085 // 设置本地聚合类,该例本地聚合类同Reduer类 086 job.setCombinerClass(SingleWordCountReducer.class); 087 088 // 设置Reduer 089 job.setReducerClass(SingleWordCountReducer.class); 090 091 // 设置Map的输出 092 job.setMapOutputKeyClass(Text.class); 093 job.setMapOutputValueClass(IntWritable.class); 094 095 // 设置Reducer输出的key类型 096 job.setOutputKeyClass(Text.class); 097 // 设置Reducer输出的value类型 098 job.setOutputValueClass(IntWritable.class); 099 100 // 设置输入和输出的目录 101 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 102 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 103 104 // 执行,直到结束就退出 105 System.exit(job.waitForCompletion(true) ? 0 : 1); 106 107 108 109 } 110 }