zoukankan      html  css  js  c++  java
  • MapReduce详解

     一、MapReduce概念

    Hadoop MapReduce是一个软件框架,基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大集群上,并以一种可靠的,具有容错能力的方式并行地处理上TB级别的海量数据集。这个定义里面有着这些关键词,

    一是软件框架,二是并行处理,三是可靠且容错,四是大规模集群,五是海量数据集。

    二、MapReduce工作机制

    MapReduce的整个工作过程如上图所示,它包含如下4个独立的实体:

      实体一:客户端,用来提交MapReduce作业。

      实体二:JobTracker,用来协调作业的运行。

      实体三:TaskTracker,用来处理作业划分后的任务。

      实体四:HDFS,用来在其它实体间共享作业文件。

    三、MapReduce框架

      一个MapReduce作业通常会把输入的数据集切分为若干独立的数据块,由Map任务以完全并行的方式去处理它们。

      框架会对Map的输出先进行排序,然后把结果输入给Reduce任务。通常作业的输入和输出都会被存储在文件系统中,整个框架负责任务的调度和监控,以及重新执行已经关闭的任务。

      通常,MapReduce框架和分布式文件系统是运行在一组相同的节点上,也就是说,计算节点和存储节点通常都是在一起的。这种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使得整个集群的网络带宽被非常高效地利用。

    3.1MapReduce框架的组成

    (1)JobTracker

      JobTracker负责调度构成一个作业的所有任务,这些任务分布在不同的TaskTracker上(由上图的JobTracker可以看到2 assign map 和 3 assign reduce)。你可以将其理解为公司的项目经理,项目经理接受项目需求,并划分具体的任务给下面的开发工程师。

    (2)TaskTracker

      TaskTracker负责执行由JobTracker指派的任务,这里我们就可以将其理解为开发工程师,完成项目经理安排的开发任务即可。

     2.2MapReduce的输入输出

      MapReduce框架运转在<key,value>键值对上,也就是说,框架把作业的输入看成是一组<key,value>键值对,同样也产生一组<key,value>键值对作为作业的输出,这两组键值对有可能是不同的。

      一个MapReduce作业的输入和输出类型如下图所示:可以看出在整个流程中,会有三组<key,value>键值对类型的存在。

     

    2.3MapReduce的处理流程

      这里以WordCount单词计数为例,介绍map和reduce两个阶段需要进行哪些处理。单词计数主要完成的功能是:统计一系列文本文件中每个单词出现的次数,如图所示:

    (1)map任务处理

    (2)reduce任务处理

    四、第一个MapReduce程序:WordCount

     WordCount单词计数是最简单也是最能体现MapReduce思想的程序之一,该程序完整的代码可以在Hadoop安装包的src/examples目录下找到。

     WordCount单词计数主要完成的功能是:统计一系列文本文件中每个单词出现的次数;

    4.1 初始化一个words.txt文件并上传HDFS

      首先在Linux中通过Vim编辑一个简单的words.txt,其内容很简单如下所示:

    Hello Edison Chou
    Hello Hadoop RPC
    Hello Wncud Chou
    Hello Hadoop MapReduce
    Hello Dick Gu

      通过Shell命令将其上传到一个指定目录中,这里指定为:/testdir/input

    4.2 自定义Map函数

      在Hadoop 中, map 函数位于内置类org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN, KEYOUT, VALUEOUT>中,reduce 函数位于内置类org.apache.hadoop. mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>中。

      我们要做的就是覆盖map 函数和reduce 函数,首先我们来覆盖map函数:继承Mapper类并重写map方法

       /**
         * @param KEYIN
         *            →k1 表示每一行的起始位置(偏移量offset)
         * @param VALUEIN
         *            →v1 表示每一行的文本内容
         * @param KEYOUT
         *            →k2 表示每一行中的每个单词
         * @param VALUEOUT
         *            →v2 表示每一行中的每个单词的出现次数,固定值为1
         */
        public static class MyMapper extends
                Mapper<LongWritable, Text, Text, LongWritable> {
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                    throws java.io.IOException, InterruptedException {
                String[] spilted = value.toString().split(" ");
                for (String word : spilted) {
                    context.write(new Text(word), new LongWritable(1L));
                }
            };
        }

    Mapper 类,有四个泛型,分别是KEYIN、VALUEIN、KEYOUT、VALUEOUT,前面两个KEYIN、VALUEIN 指的是map 函数输入的参数key、value 的类型;后面两个KEYOUT、VALUEOUT 指的是map 函数输出的key、value 的类型;

    从代码中可以看出,在Mapper类和Reducer类中都使用了Hadoop自带的基本数据类型,例如String对应Text,long对应LongWritable,int对应IntWritable。这是因为HDFS涉及到序列化的问题,Hadoop的基本数据类型都实现了一个Writable接口,而实现了这个接口的类型都支持序列化。

      这里的map函数中通过空格符号来分割文本内容,并对其进行记录;

    4.3 自定义Reduce函数

      现在我们来覆盖reduce函数:继承Reducer类并重写reduce方法

     /**
         * @param KEYIN
         *            →k2 表示每一行中的每个单词
         * @param VALUEIN
         *            →v2 表示每一行中的每个单词的出现次数,固定值为1
         * @param KEYOUT
         *            →k3 表示每一行中的每个单词
         * @param VALUEOUT
         *            →v3 表示每一行中的每个单词的出现次数之和
         */
        public static class MyReducer extends
                Reducer<Text, LongWritable, Text, LongWritable> {
            protected void reduce(Text key,
                    java.lang.Iterable<LongWritable> values,
                    Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                    throws java.io.IOException, InterruptedException {
                long count = 0L;
                for (LongWritable value : values) {
                    count += value.get();
                }
                context.write(key, new LongWritable(count));
            };
        }

      Reducer 类,也有四个泛型,同理,分别指的是reduce 函数输入的key、value类型(这里输入的key、value类型通常和map的输出key、value类型保持一致)和输出的key、value 类型。

      这里的reduce函数主要是将传入的<k2,v2>进行最后的合并统计,形成最后的统计结果。4

    4.4 设置Main函数

      (1)设定输入目录,当然也可以作为参数传入

    public static final String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/words.txt";

      (2)设定输出目录(输出目录需要是空目录),当然也可以作为参数传入

    public static final String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/wordcount";

      (3)Main函数的主要代码

     public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
    
            // 0.0:首先删除输出路径的已有生成文件
            FileSystem fs = FileSystem.get(new URI(INPUT_PATH), conf);
            Path outPath = new Path(OUTPUT_PATH);
            if (fs.exists(outPath)) {
                fs.delete(outPath, true);
            }
    
            Job job = new Job(conf, "WordCount");
            job.setJarByClass(MyWordCountJob.class);
    
            // 1.0:指定输入目录
            FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
            // 1.1:指定对输入数据进行格式化处理的类(可以省略)
            job.setInputFormatClass(TextInputFormat.class);
            // 1.2:指定自定义的Mapper类
            job.setMapperClass(MyMapper.class);
            // 1.3:指定map输出的<K,V>类型(如果<k3,v3>的类型与<k2,v2>的类型一致则可以省略)
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            // 1.4:分区(可以省略)
            job.setPartitionerClass(HashPartitioner.class);
            // 1.5:设置要运行的Reducer的数量(可以省略)
            job.setNumReduceTasks(1);
            // 1.6:指定自定义的Reducer类
            job.setReducerClass(MyReducer.class);
            // 1.7:指定reduce输出的<K,V>类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            // 1.8:指定输出目录
            FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
            // 1.9:指定对输出数据进行格式化处理的类(可以省略)
            job.setOutputFormatClass(TextOutputFormat.class);
            // 2.0:提交作业
            boolean success = job.waitForCompletion(true);
            if (success) {
                System.out.println("Success");
                System.exit(0);
            } else {
                System.out.println("Failed");
                System.exit(1);
            }
        }

    在Main函数中,主要做了三件事:一是指定输入、输出目录;二是指定自定义的Mapper类和Reducer类;三是提交作业;匆匆看下来,代码有点多,但有些其实是可以省略的。

    4.5 运行

      (1)调试查看控制台状态信息

    (2)通过Shell命令查看统计结果

    五、一个完整的代码实例:

    package com.laotou.map;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import java.io.IOException;
    
    /**
     * Mapper中的四个泛型参数,前面两个表示:hadoop在调用自定义map类的时候,给的入参类型
     * 后面两个表示:map类在处理完之后要给reduce的出去的类型
     */
    public class WordMap extends Mapper<LongWritable,Text,Text,LongWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //value参数表示的是hadoop传递给我们的一行参数
            String[] split = value.toString().split(" ");
            //发送的时候每次只能发送一个key,一个value
            for(String str:split){
                //如何发送?
                context.write(new Text(str),new LongWritable(1));
            }
        }
    }
    package com.laotou.reduce;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class WordReduce extends Reducer<Text,LongWritable,Text,LongWritable> {
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            long result=0l;
            for(LongWritable lw:values){
                result+=lw.get();
            }
        context.write(key,new LongWritable(result));
        }
    }
    package com.laotou.job;
    
    import com.laotou.map.WordMap;
    import com.laotou.reduce.WordReduce;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    public class WordJob {
        public static void main(String[] args)  {
            //创建job
            try {
                Configuration conf=new Configuration();
                Job job = Job.getInstance();
                //Job得再给一个名字
    //            job.setJar("mr_word_job");
                job.setJobName("mr_word_job");
                //指定job的主类名称
                job.setJarByClass(WordJob.class);
                //关联map类
                job.setMapperClass(WordMap.class);
                //设定map的输出类型
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(LongWritable.class);
                //关联reduce类
                job.setReducerClass(WordReduce.class);
                //设置reduce的输入类型
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(LongWritable.class);
                //指定来源
                FileInputFormat.addInputPath(job,new Path("/word.txt"));
                //指定输出地
                FileOutputFormat.setOutputPath(job,new Path("/out"));
                //启动,设置为true可以看到进度
                boolean b = job.waitForCompletion(true);
                if(b){
                    System.out.println("任务执行完成");
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
        }
    }
  • 相关阅读:
    jq02--基础函数
    jq01--概述
    js06--函数库jq与prototype
    eclipse启动时 failed to create the java virtual machine 解决办法
    将博客搬至CSDN
    eclipse.ini 修改默认编码为 UTF-8
    Elicpse使用技巧-打开选中文件文件夹或者包的当前目录
    eclipse换了高版本的maven插件后报错:org.apache.maven.archiver.MavenArchiver.getManifest(org.apache.maven.project
    python进行数据清理之pandas中的drop用法
    如何用Python实现常见机器学习算法-4
  • 原文地址:https://www.cnblogs.com/yfb918/p/10475587.html
Copyright © 2011-2022 走看看