主从结构
主节点:JobTracker(一个)
从节点:TaskTrackers(多个)
JobTracker:
接收客户提交的计算任务
把计算任务分配给TaskTrackers执行
监控TaskTracker执行情况
TaskTrackers:
执行JobTracker分配的计算任务
MapReduce计算模型
在Hadoop中,每个MapReduce任务都被初始化为一个Job,每个Job分为两个阶段:Map、Reduce。这两个阶段分别用两个函数表示 :Map、Reduce
Map函数接收一个<key,value>形式的输入,产生同样形式的中间输出。Hadoop将所有相同key的value集合到一起传递给Reduce函数
Reduce函数接收一个<key,(list of value)>形式的的呼入,然后对value集合进行处理输出结果。Reduce的输出也是<key,value>的形式
练习:
输入文本
姓名 分数
多个文本,内容行如上述,统计每个人的平均分
Map
1 package org.zln.scorecount; 2 3 import org.apache.hadoop.io.IntWritable; 4 import org.apache.hadoop.io.LongWritable; 5 import org.apache.hadoop.io.Text; 6 import org.apache.hadoop.mapreduce.Mapper; 7 8 import java.io.IOException; 9 import java.util.StringTokenizer; 10 11 /** 12 * Created by sherry on 15-7-12. 13 */ 14 public class ScoreMap extends Mapper<LongWritable,Text,Text,IntWritable> { 15 16 @Override 17 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 18 String line = value.toString();//将纯文本的数据转化为string 19 StringTokenizer tokenizer = new StringTokenizer(line," ");//切割 20 while (tokenizer.hasMoreTokens()){ 21 StringTokenizer tokenizerLine = new StringTokenizer(tokenizer.nextToken()); 22 String strName = tokenizerLine.nextToken();//姓名 23 String strScore = tokenizerLine.nextToken();//成绩 24 25 Text name = new Text(strName); 26 int scoreInt = Integer.parseInt(strScore); 27 context.write(name,new IntWritable(scoreInt));//输出姓名:成绩 28 29 } 30 } 31 }
Reduce
1 package org.zln.scorecount; 2 3 import org.apache.hadoop.io.IntWritable; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapreduce.Reducer; 6 7 import java.io.IOException; 8 import java.util.Iterator; 9 10 /** 11 * Created by sherry on 15-7-12. 12 */ 13 public class ScoreReduce extends Reducer<Text,IntWritable,Text,IntWritable> { 14 @Override 15 protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 16 int sum = 0; 17 int count = 0; 18 Iterator<IntWritable> intWritableIterator = values.iterator(); 19 while (intWritableIterator.hasNext()){ 20 sum += intWritableIterator.next().get();//总分 21 count++;//平均分 22 } 23 int avg = sum/count; 24 context.write(key,new IntWritable(avg)); 25 } 26 }
Main
1 package org.zln.scorecount; 2 3 import org.apache.hadoop.conf.Configured; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Job; 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 9 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 12 import org.apache.hadoop.util.Tool; 13 import org.apache.hadoop.util.ToolRunner; 14 15 /** 16 * Created by sherry on 15-7-12. 17 */ 18 public class ScoreMain extends Configured implements Tool{ 19 public int run(String[] args) throws Exception { 20 Job job = new Job(getConf()); 21 job.setJarByClass(ScoreMain.class); 22 job.setJobName("ScoreCount"); 23 24 25 job.setOutputKeyClass(Text.class); 26 job.setOutputValueClass(IntWritable.class); 27 28 job.setMapperClass(ScoreMap.class); 29 job.setReducerClass(ScoreReduce.class); 30 31 job.setInputFormatClass(TextInputFormat.class); 32 job.setOutputFormatClass(TextOutputFormat.class); 33 34 FileInputFormat.setInputPaths(job, new Path(args[0])); 35 FileOutputFormat.setOutputPath(job, new Path(args[1])); 36 37 boolean success = job.waitForCompletion(true); 38 return success?0:1; 39 } 40 41 //统计平均分 42 public static void main(String[] args) throws Exception { 43 int ret = ToolRunner.run(new ScoreMain(), args); 44 System.exit(ret); 45 } 46 }
我们的Map与Reduce都继承了父类,并复写了map或reduce方法
父类中 还有 三个方法未作处理
setup:启动map/reduce后首先调用
cleanup:最后调用
run:每次调用的时候都会执行