zoukankan      html  css  js  c++  java
  • Hadoop--mapreduce知识点总结

    并行计算框架(MapReduce)

    适用于大数据量处理的分布式框架,是为离线数据分析而设计,利用数据的并行性进行分布运算,而后汇总结果的计算框架。

    将任务拆分、分布、汇总,开发人员只需要实现业务逻辑;分布任务自动失败重试,单个任务失败不会造成整个任务退出;和HDFS整合,使计算移到数据所在的节点运行

    角色的划分

    1)hadoop1.x

    Hadoop角度:Master(主结点)和Slave(从结点)

    HDFS角度:NameNode和DataNode

    MapReduce角度:JobTracker和TaskTracker

    2)hadoop 2.x

    HDFS角度:NameNode和DataNode

    YARN角度:nodemanager和ResourceManager

    MapReduce程序执行流程

    MapReduce由JobTracker和TaskTracker组成。JobTracker负责资源管理和作业控制,TaskTracker负责任务的运行。

    (1) 开发人员编写好MapReduce program,将程序打包运行。

    (2) JobClient向JobTracker申请可用Job,JobTracker返回JobClient一个可用Job ID。

    (3) JobClient得到Job ID后,将运行Job所需要的资源拷贝到共享文件系统HDFS中。

    (4) 资源准备完备后,JobClient向JobTracker提交Job。

    (5) JobTracker收到提交的Job后,初始化Job。

    (6) 初始化完成后,JobTracker从HDFS中获取输入splits(作业可以该启动多少Mapper任务)。

    (7) 与此同时,TaskTracker不断地向JobTracker汇报心跳信息,并且返回要执行的任务。

    (8) TaskTracker得到JobTracker分配(尽量满足数据本地化)的任务后,向HDFS获取Job资源(若数据是本地的,不需拷贝数据)。

    (9) 获取资源后,TaskTracker会开启JVM子进程运行任务。

    MapReduce工作原理

    map task

    程序会根据InputFormat将输入文件分割成splits,每个split会作为一个map task的输入,每个map task会有一个内存缓冲区,

    输入数据经过map阶段处理后的中间结果会写入内存缓冲区,并且决定数据写入到哪个partitioner(分割器),当写入的数据到达内存缓冲区的的阀值(默认是0.8),会启动一个线程将内存中的数据溢写入磁盘,同时不影响map中间结果继续写入缓冲区。在溢写过程中,MapReduce框架会对key进行排序,如果中间结果比较大,会形成多个溢写文件,最后的缓冲区数据也会全部溢写入磁盘形成一个溢写文件(最少有一个溢写文件),如果是多个溢写文件,则最后合并所有的溢写文件为一个文件。

    reduce task

    当所有的map task完成后,每个map task会形成一个最终文件,并且该文件按区划分。reduce任务启动之前,一个map task完成后,

    就会启动线程来拉取map结果数据到相应的reduce task,不断地合并数据,为reduce的数据输入做准备,当所有的map tesk完成后,

    数据也拉取合并完毕后,reduce task 启动,最终将输出输出结果存入HDFS上。

    MapReduce编程主要组件

    InputFormat类:分割成多个splits和每行怎么解析。  

    Mapper类:对输入的每对<key,value>生成中间结果。

    Combiner类:在map端,对相同的key进行合并。

    Partitioner类:在shuffle过程中,将按照key值将中间结果分为R份,每一份都由一个reduce去完成。

    Reducer类:对所有的map中间结果,进行合并。

    OutputFormat类:负责输出结果格式

    针对MapReduce的缺点,YARN解决了什么?

    MapReduce由以下缺点:

    ★ JobTracker挂掉,整个作业挂掉,存在单点故障

    ★ JobTracker既负责资源管理又负责作业控制,当作业增多时,JobTracker内存是扩展的瓶颈

    ★ map task全部完成后才能执行reduce task,造成资源空闲浪费

    YARN设计考虑以上缺点,对MapReduce重新设计:

    ★ 将JobTracker职责分离,ResouceManager全局资源管理,ApplicationMaster管理作业的调度

    ★ 对ResouceManager做了HA设计

    ★ 设计了更细粒度的抽象资源容器Container

    mapper类实现:/*

     * KEYIN:输入kv数据对中key的数据类型

     * VALUEIN:输入kv数据对中value的数据类型

     * KEYOUT:输出kv数据对中key的数据类型

     * VALUEOUT:输出kv数据对中value的数据类型

     */

    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

      

       /*

        * map方法是提供给map task进程来调用的,map task进程是每读取一行文本来调用一次我们自定义的map方法

        * map task在调用map方法时,传递的参数:

        *       一行的起始偏移量LongWritable作为key

        *       一行的文本内容Text作为value

        */

       @Override

       protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {

          //拿到一行文本内容,转换成String 类型

          String line = value.toString();

          //将这行文本切分成单词

          String[] words=line.split(" ");

         

          //输出<单词,1>

          for(String word:words){

            context.write(new Text(word), new IntWritable(1));

          }

       }

    }

    reducer类实现

    /*

     * KEYIN:对应mapper阶段输出的key类型

     * VALUEIN:对应mapper阶段输出的value类型

     * KEYOUT:reduce处理完之后输出的结果kv对中key的类型

     * VALUEOUT:reduce处理完之后输出的结果kv对中value的类型

     */

    public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

       @Override

       /*

        * reduce方法提供给reduce task进程来调用

        *

        * reduce task会将shuffle阶段分发过来的大量kv数据对进行聚合,聚合的机制是相同key的kv对聚合为一组

        * 然后reduce task对每一组聚合kv调用一次我们自定义的reduce方法

        * 比如:<hello,1><hello,1><hello,1><tom,1><tom,1><tom,1>

        *  hello组会调用一次reduce方法进行处理,tom组也会调用一次reduce方法进行处理

        *  调用时传递的参数:

        *       key:一组kv中的key

        *       values:一组kv中所有value的迭代器

        */

       protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

          //定义一个计数器

          int count = 0;

          //通过value这个迭代器,遍历这一组kv中所有的value,进行累加

          for(IntWritable value:values){

            count+=value.get();

          }

         

          //输出这个单词的统计结果

          context.write(key, new IntWritable(count));

       }

    }

    job提交客户端实现

    public class WordCountJobSubmitter {

      

       public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

          Configuration conf = new Configuration();

          Job wordCountJob = Job.getInstance(conf);

         

          //重要:指定本job所在的jar包

          wordCountJob.setJarByClass(WordCountJobSubmitter.class);

         

          //设置wordCountJob所用的mapper逻辑类为哪个类

          wordCountJob.setMapperClass(WordCountMapper.class);

          //设置wordCountJob所用的reducer逻辑类为哪个类

          wordCountJob.setReducerClass(WordCountReducer.class);

         

          //设置map阶段输出的kv数据类型

          wordCountJob.setMapOutputKeyClass(Text.class);

          wordCountJob.setMapOutputValueClass(IntWritable.class);

         

          //设置最终输出的kv数据类型

          wordCountJob.setOutputKeyClass(Text.class);

          wordCountJob.setOutputValueClass(IntWritable.class);

         

          //设置要处理的文本数据所存放的路径

          FileInputFormat.setInputPaths(wordCountJob, "hdfs://192.168.77.70:9000/wordcount/srcdata/");

          FileOutputFormat.setOutputPath(wordCountJob, new Path("hdfs://192.168.77.70:9000/wordcount/output/"));

         

          //提交job给hadoop集群

          wordCountJob.waitForCompletion(true);

       }

    }

  • 相关阅读:
    英雄无敌5东方部落秘籍
    Tomcat基础学习
    Flume入门
    SparkStreaming 编程指南
    Kafka单机配置部署
    Kafka介绍 (官方文档翻译)
    spark调优
    《Spark Python API 官方文档中文版》 之 pyspark.sql (四)
    《Spark Python API 官方文档中文版》 之 pyspark.sql (三)
    git基本常用命令总结
  • 原文地址:https://www.cnblogs.com/ToDoNow/p/9647954.html
Copyright © 2011-2022 走看看