zoukankan      html  css  js  c++  java
  • MapReduce架构

    主从结构

      主节点:JobTracker(一个)

      从节点:TaskTrackers(多个)

    JobTracker:

      接收客户提交的计算任务

      把计算任务分配给TaskTrackers执行

      监控TaskTracker执行情况

    TaskTrackers:

      执行JobTracker分配的计算任务


    MapReduce计算模型

      在Hadoop中,每个MapReduce任务都被初始化为一个Job,每个Job分为两个阶段:Map、Reduce。这两个阶段分别用两个函数表示 :Map、Reduce

      Map函数接收一个<key,value>形式的输入,产生同样形式的中间输出。Hadoop将所有相同key的value集合到一起传递给Reduce函数

      Reduce函数接收一个<key,(list of value)>形式的的呼入,然后对value集合进行处理输出结果。Reduce的输出也是<key,value>的形式

    练习:

    输入文本

    姓名 分数

    多个文本,内容行如上述,统计每个人的平均分

    Map

     1 package org.zln.scorecount;
     2 
     3 import org.apache.hadoop.io.IntWritable;
     4 import org.apache.hadoop.io.LongWritable;
     5 import org.apache.hadoop.io.Text;
     6 import org.apache.hadoop.mapreduce.Mapper;
     7 
     8 import java.io.IOException;
     9 import java.util.StringTokenizer;
    10 
    11 /**
    12  * Created by sherry on 15-7-12.
    13  */
    14 public class ScoreMap extends Mapper<LongWritable,Text,Text,IntWritable> {
    15 
    16     @Override
    17     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    18         String line = value.toString();//将纯文本的数据转化为string
    19         StringTokenizer tokenizer = new StringTokenizer(line,"
    ");//切割
    20         while (tokenizer.hasMoreTokens()){
    21             StringTokenizer tokenizerLine = new StringTokenizer(tokenizer.nextToken());
    22             String strName = tokenizerLine.nextToken();//姓名
    23             String strScore = tokenizerLine.nextToken();//成绩
    24 
    25             Text name = new Text(strName);
    26             int scoreInt = Integer.parseInt(strScore);
    27             context.write(name,new IntWritable(scoreInt));//输出姓名:成绩
    28 
    29         }
    30     }
    31 }

    Reduce

     1 package org.zln.scorecount;
     2 
     3 import org.apache.hadoop.io.IntWritable;
     4 import org.apache.hadoop.io.Text;
     5 import org.apache.hadoop.mapreduce.Reducer;
     6 
     7 import java.io.IOException;
     8 import java.util.Iterator;
     9 
    10 /**
    11  * Created by sherry on 15-7-12.
    12  */
    13 public class ScoreReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
    14     @Override
    15     protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    16         int sum = 0;
    17         int count = 0;
    18         Iterator<IntWritable> intWritableIterator = values.iterator();
    19         while (intWritableIterator.hasNext()){
    20             sum += intWritableIterator.next().get();//总分
    21             count++;//平均分
    22         }
    23         int avg = sum/count;
    24         context.write(key,new IntWritable(avg));
    25     }
    26 }

    Main

     1 package org.zln.scorecount;
     2 
     3 import org.apache.hadoop.conf.Configured;
     4 import org.apache.hadoop.fs.Path;
     5 import org.apache.hadoop.io.IntWritable;
     6 import org.apache.hadoop.io.Text;
     7 import org.apache.hadoop.mapreduce.Job;
     8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     9 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    11 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    12 import org.apache.hadoop.util.Tool;
    13 import org.apache.hadoop.util.ToolRunner;
    14 
    15 /**
    16  * Created by sherry on 15-7-12.
    17  */
    18 public class ScoreMain extends Configured implements Tool{
    19     public int run(String[] args) throws Exception {
    20         Job job = new Job(getConf());
    21         job.setJarByClass(ScoreMain.class);
    22         job.setJobName("ScoreCount");
    23 
    24 
    25         job.setOutputKeyClass(Text.class);
    26         job.setOutputValueClass(IntWritable.class);
    27 
    28         job.setMapperClass(ScoreMap.class);
    29         job.setReducerClass(ScoreReduce.class);
    30 
    31         job.setInputFormatClass(TextInputFormat.class);
    32         job.setOutputFormatClass(TextOutputFormat.class);
    33 
    34         FileInputFormat.setInputPaths(job, new Path(args[0]));
    35         FileOutputFormat.setOutputPath(job, new Path(args[1]));
    36 
    37         boolean success = job.waitForCompletion(true);
    38         return success?0:1;
    39     }
    40 
    41     //统计平均分
    42     public static void main(String[] args) throws Exception {
    43         int ret = ToolRunner.run(new ScoreMain(), args);
    44         System.exit(ret);
    45     }
    46 }



    我们的Map与Reduce都继承了父类,并复写了map或reduce方法

    父类中 还有 三个方法未作处理

    setup:启动map/reduce后首先调用

    cleanup:最后调用

    run:每次调用的时候都会执行

  • 相关阅读:
    Spring Cloud 模块简介2
    Eureka简介
    Spring Cloud 模块简介
    成神之路-基础篇 转
    Java面试题无答案
    java程序猿常用Linux命令
    Java工程师成神之路 转
    大型网站技术架构 大纲
    Mockito 相关资料
    webApp路由控制-vue-router2.0
  • 原文地址:https://www.cnblogs.com/sherrykid/p/4621634.html
Copyright © 2011-2022 走看看