zoukankan      html  css  js  c++  java
  • MapReduce概述

    MapReduce 源自于Google的MapReduce论文,Hadoop MapReduce是Google MapReduce克隆版

    MapReduce适合PB级以上海量数据的离线处理

    MapReduce不擅长的地方

      实时计算,不能像MySQL一样,在毫秒级或者秒级内返回结果

      流式计算,MapReduce的输入数据集是静态的,不能动态变化,MapReduce自身的设计特点决定了数据源必须是静态的

      DAG计算,spark可以比较好的计算DAG这种模型,MapReduce不太适合

    MapReduce 的扩展能力是sprak是比不了的,spark是取代不了MapReduce 的,至少在扩展上,spark可以运行在几百几千个节点上还行,MapReduce 可以运行在三万以上的节点上

    MapReduce编程模型(逻辑)

    MapReduce将作业的整个运行过程分为两个阶段Map阶段和Reduce阶段

    Map阶段由一定数量的Map Task组成

      输入数据格式解析:InputFormat

      输入数据处理:Mapper

      数据分组:Partitioner

    Reduce阶段由一定数量的Reduce Task组成

      数据远程拷贝

      数据按照key排序

      数据处理:Reducer

      数据输出格式:OutputFormat

    MapReduce编程模型(代码)

    Map阶段

      InputFormat(默认TextInputFormat)

      Mapper

      Combiner(local reducer)

      Partitioner

    Reduce阶段

      Reducer

      OutputFormat(默认TextOutputFormat)

    MapReduce编程模型—内部逻辑

    一般来说,我的输入是hdfs的一个或者多个文件,mr首先会将这些文件切分,切分成split,然后去读取这些split的内容,split的大小默认和hdfs的block(默认128M)相等,Mapper每读一行split的input数据,就会执行一次map函数,将数据拆分,解析成key-value对,然后默认是通过hash(key)%num,决定某一个单词应该发送到哪一个到Partitioner,Partitioner的作用是什么呢,就是告诉你我这一个单词应该发送给哪一个Reduce,然后经过Shuffle,Shuffle的同时会给你排序,之后发送给Reduce,Reduce处理完之后,就完成了整个MapReduce计算

    MapReduce是分布式的,那会起多少个子任务,多少个map或者reduce是由什么决定的呢?

    Mapper的切分是跟文件大小相关的,文件越大Mapper的数量越多,Mapper的数量一般是跟block的数量是对应起来的,Reducer呢,他的计算会复杂一点,一般来说,MapReduce内部会估算,就是估算你Mapper有多少,我来决定我的Reducer有多少,通常,Reducer不会大于Mapper的数量,会远远小于Mapper的数量,比如十分之一这种,但是他们的数量都可以去定义,比如我有10个block,但是我起1个Mapper,没问题,比如原来有三个Mapper,但是我就起1一个Reducer,完全没问题,但是不同的Mapper和不同的Reducer数量会影响速度,Mapper切得太细太粗都不好 ,Mapper的数量过多可是都会占据yang的资源的

    partitioner默认是hash(key)%num来确定个数的,partitioner的个数跟reduce相关,我有多少个reduce,那我就把这些map的所有的key-value分成多少个partitioner,然后一个partitioner交给一个reduce处理

    map的结果是怎么发给reduce呢 ?
    map的结果是先存放到磁盘上,存到磁盘上的过程呢,都已经通过partitioner给他分好,可以理解为,比如我有三个reduce,可以理解为输出有三个文件,每个文件对应一个reduce,实际上的存储,比这个复杂

    一个map会生成reduce个shuffle文件,那么整个集群中最终会有map*reduce个shuffle文件

    一个MapReduce最终会生成Reduce个最终结果文件

    注:比如计算结果是放在HDFS里面的,可能会输出多个文件,只要在同一个目录下,就认为是计算完成,不会将多个文件合并

    MapReduce中的个数问题需要理清楚,

    MapReduce编程模型—InputFormat

    文件分片(InputSplit)方法,处理跨行问题

    将分片数据解析成key/value对,默认实现是TextInputFormat

    TextInputFormat,Key是行在文件中的偏移量,value是行内容,若行被截断,则读取下一个block的前几个字符

    下面拿wordcount举例

    map函数的参数,一个是文件的偏移量(key),即第一行的偏移量offset是多少,然后第二行读的时候,就从offset之后开始读这个键,另一个就是行的内容(value)

    reduce函数的参数,一个是单词(key),比如a,另一个是一个集合,样子大概是这样的[1,1,1,1],意思就是,a出现了4次

    MapReduce编程模型—Split与Block

    Block,HDFS中最小的数据存储单位,默认是128MB

    Split,MapReduce中最小的计算单元,默认与Block一一对应

    Block与Split,Split与Block的对应关系是任意的,由用户控制

    MapReduce编程模型—Combiner(可选的局部优化)

    Combiner可做看local reducer,合并相同的key对应的value(wordcount例子),通常与Reducer逻辑一样

    Combiner作用在map的结果

    好处是,减少Map Task输出数据量(磁盘IO),减少Reduce-Map网络传输数据量(网络IO)

    如何正确使用呢,结果可叠加,Sum可以而且简单,Average比太容易

    MapReduce编程模型—Partitioner

    Partitioner决定了Map Task输出的每条数据交给哪个Reduce Task处理

    默认实现:hash(key) mod R,R是Reduce Task数目,允许用户自定义

    很多情况需自定义Partitioner,比如“hash(hostname(URL)) mod R”确保相同域名的网页交给同一个Reduce Task处理

    MapReduce 2.0架构——YARN

    Client,用户通过Client与YARN交互,提交MapReduce作业,查询作业运行状态,管理作业等

    MRAppMaster,功能包括:任务划分、资源申请并将之二次分配给MapTask和Reduce Task、任务状态监控和容错

    ResourcesManager,管理资源的

    NodeManager,每一个机器上都会有一个NodeManager,来管理这个节点,进程, 及其资源等

    ApplicationMaster负责调度作业,调度任务,任务的划分,资源的申请,任务状态的监控等等,帮你申请资源,然后分发到机器上,(这是一个嵌套的主从结构,整个hadoop是一个主从的,Yarn也是,ApplicationMaster是一个主)

    JobHistory,我执行了很多任务提交到MapReduce,我想看MapReduce的历史情况,比如说,执行的一些log,可以通过hobhistory的web页面来查看所有的历史,一些Job的统计信息等

    我Client在提交任务的时候,比如MapReduce任务,首先会在一个机器上启动ApplicationMaster,ApplicationMaster他负责什么?负责这个作业的monitor(监控),然后他会向ResourcesManager申请资源,帮你跑maptask,reducetask

    MapReduce 2.0运行流程

    Client提交的作业,任务,他们一次流程是什么样子呢?

    Client先联系 ResourcesManager,ResourcesManager会找一个有空闲资源的节点NodeManager,起一个Container,运行 你的ApplicationMaster,他会与ResourcesManager交互,申请资源,比方我要启动20个Mapper,则向 ResourcesManager申请20个Mapper的Container,如果是5个Reducer,则申请5个Reducer的 Container,然后申请完之后,就可以在对应节点上,启动这些maptask和reducetask ,这些task呢,会与ApplicationMaster进行交互,来监控task的状态,这是Yarn的一些内部原理

    具体步骤:

    步骤1: Client向Yarn的ResourceManager(RM)提交Job请求

    步骤2: RM找到具有空闲资源的NodeManager(NM)并让这个NM运行ApplicationMaster(AM)

    步骤3:该NM启动一个Container来运行AM

    步骤4:AM和RM通信申请资源,RM返回可用的资源NMs给AM

    步骤5:AM要求这些NMs来运行MapTask或者ReduceTask

    步骤6:NMs启动Container来运行MapTask/ReduceTask

    步骤7:MapTask和ReduceTask运行完毕后将结果发送给AM

    MapReduce 2.0容错性

    MRAppMaster容错性,一旦运行失败,由YARN的ResourceManager负责重新启动,最多重启次数可由用户设置,默认是2次。一旦超过最高重启次数,则作业运行失败

    Map Task/Reduce Task,Task周期性向MRAppMaster汇报心跳,一旦Task挂掉,则MRAppMaster将为之重新申请资源并运行。最多重新运行次数可由用户设置,默认4次

    MapReduce计算框架—推测执行机制

    作业完成时间取决于最慢的任务完成时间
       一个作业由若干个Map任务和Reduce任务构成
      因硬件老化、软件Bug等,某些任务可能运行非常慢
    推测执行机制
      发现拖后腿的任务,比如某个任务运行速度远慢于任务平均速度
      为拖后腿任务启动一个备份任务,同时运行
      谁先运行完,则采用谁的结果
    不能启用推测执行机制
      任务间存在严重的负载倾斜
      特殊任务,比如任务向数据库中写数据

    100个map,99个都跑完了,但是剩下一个可能机器老化,跑的特别慢,这时候就有了推测执行机制,会为拖后腿任务,在另一台机器上启动一个备份任务,同时运行,谁先运行完,就使用它的结果,并把另一个给杀掉,这是优点,缺点是,占用了更多的资源,比如10个慢的话,就使用了110个资源。可以通过参数来设置,开启推测执行不

    但是那种向数据库写数据这种,是不适合推测执行的,因为数据已经写进数据库了,你杀掉慢的那个也没用

    输入数据格式解析

    使用默认的TextInputFormat,每个Map Task处理一个split,一个split大小等于一个block,如果最后一行数据被截断,则读取后一个block前半部分,转换成key/value对,key是偏移量,value是行内容

    问题:
    我hdfs的文件是按照size,大小来分的,我hdfs是不知道里面存的什么的,那Mapper有可能就会将一个单词或者一行截断,这个时候,我读的数据有可能是重复的或者漏读的,这时候怎么办,MR是怎么处理的呢?

    会有10个Mapper来读,Mapper2把block2从头读到尾读完,这个时候block2可能会遇到被切分时,断行或者断单词的情况,此时,我不管怎么样,我都会往后读,跨block,把后面的一个block的第一行读了,读到那一行结束,即Mapper2会把block3的第一行读了,读到换行符,而Mapper3则会从block3的第二行开始读,这样就解决了,断行断词的问题。

    这时候你会疑问了,是不是MapReduce读block的时候,是不是知道block的顺序?

    差不多,MapReduce不知道整个block的顺序,实际上也没必要知道整个顺序,只需要知道上一个下一个block的顺序就行,再详细的解释下,就是每个Mapper知道他对应的block的两个属性,offset和length,offset是所有block的offset,length是他的block的长度,比如第一个block的offset是从0开始,然后block大小是10000,第二个block的offset是10001,block是10000,这样每个Mapper都知道他读的block的上一个和下一个block

    其他

    如果想kill掉正在运行的MapReduce,直接ctrl+c是不行的,因为已经在集群中运行的,在本地或者某个客户端这样是不可以的,可以在bin下面,执行
    yarn application -list 打印出正在运行的application
    可以yarn application -kill applicationid
    可以去搜索yarn的命令

  • 相关阅读:
    The Quad
    将OrCAD Capture CIS的设计文件(.dsn)导入到PADS Logic VX.2.3
    OrCAD Capture CIS 16.6 将版本16.6的设计文件另存为版本16.2的设计文件
    Eclipse IDE 添加jar包到Java工程中
    PADS Logic VX.2.3 修改软件界面语言
    切换Allegro PCB Editor
    Allegro PCB Design GXL (legacy) 将brd文件另存为低版本文件
    Allegro PCB Design GXL (legacy) 设置自动保存brd文件
    Could not create an acl object: Role '16'
    windows 下apache开启FastCGI
  • 原文地址:https://www.cnblogs.com/sorco/p/6956743.html
Copyright © 2011-2022 走看看