package algorithm; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; //前两个参数是固定的后两个根据需要修改 第四个参数我改成了IntWritable 比int写的快 public class TestMapper1 extends Mapper<LongWritable, Text, Text, IntWritable> { //key是行好 value是哪一行内容 //文件多少行 map调用多少次 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer st = new StringTokenizer(line); while(st.hasMoreElements()) { String word = st.nextToken(); context.write(new Text(word), new IntWritable(1));//map的输出 } } }
package algorithm; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class TestReduce1 extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> iterable, Context context) throws IOException, InterruptedException { // process values int sum = 0; for (IntWritable val : iterable) { sum += val.get();//get转为整数 } context.write(key, new IntWritable(sum)); } }
package algorithm; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Mapreduce1 { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //对应于mapred-site.xml Job job = new Job(conf,"WordCount"); job.setJarByClass(Mapreduce1.class); job.setMapperClass(TestMapper1.class); job.setReducerClass(TestReduce1.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setNumReduceTasks(1); //"/in"解析不了 提示文件不存在 因为把他们认为是本地文件了 因为有个 file:/ FileInputFormat.addInputPath(job, new Path("hdfs://192.168.58.180:8020/in")); //输出文件不能存在 FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.58.180:8020/wordcount")); System.exit(job.waitForCompletion(true) ? 0 : 1); } }