zoukankan      html  css  js  c++  java
  • 2016/09/22 mapreduce

    1.概念

        Mapreduce是一个计算框架,表现形式是有个输入(input),mapreduce操作这个输入,通过本身定义好的计算模型,得到一个输出(output),也就是我们需要的结果。

    在运行一个mapreduce任务的时候,任务过程被分为2个阶段:map和reduce阶段,每个阶段都是用键值对(key/value)作为输入和输出

    2.mapreduce实例

    package org.apache.hadoop.examples;
     
    import java.io.IOException;
    import java.util.StringTokenizer;
     
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
     
    public class WordCount {
     
      public static class TokenizerMapper 
           extends Mapper<Object, Text, Text, IntWritable>{
     
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
     
        public void map(Object key, Text value, Context context
                        ) throws IOException, InterruptedException {
          StringTokenizer itr = new StringTokenizer(value.toString());
          while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
          }
        }
      }
     
      public static class IntSumReducer 
           extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();
     
        public void reduce(Text key, Iterable<IntWritable> values, 
                           Context context
                           ) throws IOException, InterruptedException {
          int sum = 0;
          for (IntWritable val : values) {
            sum += val.get();
          }
          result.set(sum);
          context.write(key, result);
        }
      }
     
      public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
          System.err.println("Usage: wordcount <in> <out>");
          System.exit(2);
        }
        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
    }

     

    要写一个mapreduce程序,我们要实现一个map函数和reduce函数。

    map:

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {…}

    这里有3个参数,Object key 为输入key,Text value为输入value,第三个参数Context context为记录输入的key和value,例如:

    context.write(word, one)

    reduce:

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {…}

    reduce函数的输入也是一个key/value的形式,不过它的value是一个迭代器的形式Iterable<IntWritable> values,也就是说reduce的输入是一个key对应一组的值的value,reduce也有context和map的context作用一致。

    3.mapreduce运行机制

    按照时间顺序包括:输入分片(input split)、 map阶段、 combiner阶段 、 shuffle阶段和reduce阶段

    • 输入分片(input split):

    在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片针对一个map任务,输入分片存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组,输入分片往往和hdfs和block关系密切,假如我们设定hdfs的块大小是64mb,如果我们输入有三个文件,大小分别是3mb、65mb和127mb,那么mapreduce会把3mb文件分为一个输入分片,65mb分为2个输入分片,而127mb也是两个输入分片,换句话说我们如果在map计算前做输入分片调整,例如合并小文件,那么就会有5个map任务将执行,并且每个map执行的数据大小不均。

    • map阶段

    就是程序员编写好的map函数,因此map函数效率相对好控制,而且一般map操作都是本地化操作,也就是在数据存储节点上进行。

    • combiner阶段

    combiner阶段是程序员可以选择的,combiner其实也是一种reduce操作,因此我们看见WordCount类里是用reduce进行加载的。Combiner是一个本地化的reduce操作,它是map运算的后续操作,主要是在map计算出中间文件前做一个简单的合并重复key值的操作,例如我们对文件里的单词频率做统计,map计算时候如果碰到一个hadoop的单词就会记录为1,但是这篇文章里hadoop可能会出现n多次,那么map输出冗余就会很多,因此在reduce计算前对相同的可以值做一个合并操作,那么文件会变小,这样就提高了宽带的传输效率。

    • shuffle阶段

    将map的输出作为reduce的输入的过程就是shuffle阶段了,这个是mapreduce优化的重点地方。

    • reduce阶段

    和map函数一样也是程序员编写 ,最终结果是存储在hdfs。

    出处:http://blog.jobbole.com/84089/

  • 相关阅读:
    既然选择了远方,就只顾风雨兼程!
    slots
    面向对象
    模块和作用域
    偏函数
    python中decorator
    返回函数
    filter, sort
    map/reduce
    开发步骤
  • 原文地址:https://www.cnblogs.com/zcr3108346262/p/5895920.html
Copyright © 2011-2022 走看看