package com.rabbit.hadoop.mapreduce;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.omg.CORBA.ARG_IN;
/**
* MapReduce
* Word counter
*/
public class WordCountMapReduce extends Configured implements Tool{
// 1. Map Class
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text mapOutPutKey = new Text();
private final static IntWritable mapOutPutValue = new IntWritable(1);
@Override
public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// read one line <0,hadoop yarn> 偏移量,值
String lineValue = value.toString(); // 获取值
// words split
StringTokenizer stringTokenizer = new StringTokenizer(lineValue);
//iterator
while (stringTokenizer.hasMoreTokens()) {
String word = stringTokenizer.nextToken();
mapOutPutKey.set(word);
context.write(mapOutPutKey, mapOutPutValue);
}
}
}
// 2. Reduce Class
public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outputValue = new IntWritable();
@Override
protected void reduce(Text keys, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// <hadoop,1> <yarn,1> <hadoop,1> <hive,1> <hadoop,1>...... -> 分组 <hadoop,list(1,1,1)>
// list中的元素相加,求和,就是"hadoop"这个单词的出现次数。
System.out.println("input key: "+keys.toString());
System.out.println("::::::::::::::::::::::::::::::::::::::");
int sum = 0;
for (IntWritable value : values) {
System.out.println("recursively print key: "+keys.toString()+" value is: "+value.get());
sum += value.get();
}
outputValue.set(sum);
context.write(keys, outputValue);
}
}
// 3. Driver
public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1. get configuration
Configuration configuration = getConf();
// 2. create job : parameters: configuration, jobName
Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
// 3.run jars
job.setJarByClass(this.getClass());
// 4. set job:
// input -> map -> reduce -> output
// input
Path inPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inPath);
// set mapper class,output key, output value
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// set reducer class,output key, output value
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// output
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
// 5. submit job
boolean isSuccess = job.waitForCompletion(true);
return isSuccess? 0 : 1;
}
// run program
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
// int status = new WordCountMapReduce().run(args);
// configuration.set("mapreduce.framework.name", "yarn"); 表示提交到yarn上面执行
args = new String[] {"D:\input\1.txt","D:\output2"};
int status = ToolRunner.run(configuration, new WordCountMapReduce(), args);
System.exit(status);
}
}
======================================================================================================
MapReduce的写法比较固定,主要就是重写map()和 reduce()两个方法,在方法里面实现自己的逻辑。
整个mr程序执行的过程可以归结为5步: input -> map -> shuffle -> reduce -> output
shuflle阶段可以说是带reduce的mr程序的核心了。之所以说是带reduce的mr的核心,是因为mr可以没有reduce方法,但一定有map方法。
shuffle的过程中包含了排序、溢写、分区、合并、分组这些操作。
mr在读数据的时候是按键值对的形式读取的,key即为每一行开头在整个文件中的偏移量,value即为这一行的文本内容。
比如 :
Hadoop Hadoop Hive
Spark Hadoop Hive
读到mr中的格式为 <0,"Hadoop Hadoop Hive"> , <19,"Spark Hadoop Hive"> 。回车符也是一个字符,所以第二行的起始位置是19
我们要统计每个单词出现的次数,首先需要将单个的单词按照空格切割出来。将单个单词以键值对的形式写出,得到这种形式:<Hadoop,1> <Hadoop,1> <Hadoop,1> <Hive,1> <Hive,1><Spark,1> 。这个逻辑在map()中实现,可以理解为对于读到的每一行都做相同的处理。
每个map会溢写出一个或者多个小文件,在溢写之前会在内存中将key按字典序排序。如果指定了多个reduce,那么还会做分区。分区编号可以简单理解为 key的hashcode 对reduce的个数取模。
假如有两个分区,那么每个key的分区编号就为 key.hashcode() % 2 ,每个数字对2取模只有0和1两个值,就代表了两个分区。
接下来,每个map溢写出的小文件会合并成大文件。合并的过程中还是会按照key的字典序排序,并且保留分区。
经过这些过程,最终,每个map会有一个合并后的大文件。
接下来,reduce会主动去到这些大文件中拉取属于自己分区的数据,并且将拿到的分散的数据再做合并,形成一个文件。
接着,分组操作来了。我们目前的数据形式还是 <Hadoop,1> <Hadoop,1> <Hadoop,1> <Hive,1> <Hive,1><Spark,1> 。 经过分组操作之后,就会变成这种形式:<Hadoop,[1,1,1]> <Hive,[1,1]> <Spark,[1]>
所以reduce方法的第二个参数是 Iterable<>,它是一个迭代器。实现累加就可以计算出key出现的次数了。