一.实现案例
实现WorldCount的流程如下:
备注:其中输入的数据是一个txt文件,里面有各种单词,每一行中用空格进行空行
一.Mapper的编写
我们在IDEA是使用“ctrl+alt+鼠标左键点击”的方式来查看源码,我们首先查看mapper 类的源码,同时源码我已经使用了,如下所示:
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.apache.hadoop.mapreduce; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; @Public @Stable public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Mapper() { }
//在任务开始之前,setup必然被调用一次 protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { }
//在input split的时候,对每一个key/value的pair都call once.大多数程序都会overide这个方法 protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { context.write(key, value); } //在at the end of the task,这个方法被调用一次 protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } //把整个程序,里面的所有方法串连起来 public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKeyValue()) {//每次仅读取一行数据 this.map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { this.cleanup(context); } }
//上下文,封装了程序当中大量的分析方法 public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Context() { } } }
因此我们根据里面的源码,编写wordcount所需要的mapper的代码,如下所示:
//现在我们开始编写wordcount的示例 public class WordcountMapper extends Mapper<LongWritable, Text,Text, IntWritable> { //mapper后面的参数: // 1.输入数据的key类型 // 2.输入数据的value类型 // 3.输出数据的key类型 // 4.输出数据的value的类型 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1.首先获取一行 String line=value.toString(); //2.将获取后的单词进行分割,按照空格进行分割 String[] words=line.split(" "); //3.循环输出(不是输出到控制台上面,是输出到reducer里进行处理) for(String word:words) { Text k=new Text();//定义我们输出的类型,肯定是Text,和整个类extends的顺序对应 k.set(word); IntWritable v=new IntWritable(); v.set(1);//将value设置为1 context.write(k,v); } } }
二.Reducer的编写
reducer的源码如下,和mapper的源码非常相似,其实也就是对reducer的方法进行了封装,并没有方法体:
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.mapreduce.ReduceContext.ValueIterator; import org.apache.hadoop.mapreduce.task.annotation.Checkpointable; @Checkpointable @Public @Stable public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Reducer() { } protected void setup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { Iterator i$ = values.iterator(); while(i$.hasNext()) { VALUEIN value = i$.next(); context.write(key, value); } } protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } public void run(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKey()) { this.reduce(context.getCurrentKey(), context.getValues(), context); Iterator<VALUEIN> iter = context.getValues().iterator(); if (iter instanceof ValueIterator) { ((ValueIterator)iter).resetBackupStore(); } } } finally { this.cleanup(context); } } public abstract class Context implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Context() { } } }
代码如下:
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import javax.xml.soap.Text; import java.io.IOException; public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { super.reduce(key, values, context); //在reduce里拿到的是mapper已经map好的数据 //现在数据的形式是这样的: //atguigu(key),1(value) //atguigu(key),1(value) int sum=0; //累计求和 for(IntWritable value: values) { sum+=value.get();//将intwrite对象转化为int对象 } IntWritable v=new IntWritable(); v.set(sum); //2.写出 atguigu 2 context.write(key,v); //总结,这个程序看起来并没有起到分开不同单词,并对同一单词的value进行相加的作用啊 //唯一的功能则是统计仅有一个单词的字符之和,这有啥用...... } }
三.Driver程序编写,让mapreduce动起来!
代码如下:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class wordcoundDriver { //将mapper和reducer进行启动的类 //driver是完全格式固定的 public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); //1.获取Job对象 Job job=Job.getInstance(conf); //2.设置jar储存位置 job.setJarByClass(wordcoundDriver.class); //3.关联map和reduce类 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordCountReducer.class); //4.设置mapper阶段输出数据的key和value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5.设置最终数据输出的key和value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6.设置输入路径和输出路径 FileInputFormat.setInputPaths(job,new Path(args[0])); FileInputFormat.setInputPaths(job,new Path(args[1])); //7.提交Job job.submit(); job.waitForCompletion(true); } }
这样就可以运行起来了!大家可以尝试在分布式集群上实现wordcount统计这个功能,只需要将这些代码进行打成jar包,这样就可以放到linux操作系统上去运行了!最后运行的时候,路径写的是HDFS上的路径哦!