zoukankan      html  css  js  c++  java
  • Mapreduce 原理及程序分析

    1.MapReduce(Map+Reduce)

       提出一个问题:

    目标:你想数出一摞牌中有多少张黑桃。

    直观方式:一张一张检查并且数出有多少张是黑桃数目

    MapReduce方法则是:

    • 给在座的所有玩家中分配这摞牌
    • 让每个玩家数自己手中的牌有几张是黑桃,(map)
    • 然后把这个数目汇报给你你把所有玩家告诉你的数字加起来,(reduce)
    • 得到最后的结论

    不变的:

         源数据不变:纸牌没有改变

         做的事情不变:要筛选所有黑桃的数目

         结果不变性:无论哪种方式,结果必须是一致和正确的。

         keys:1.完成上述步骤需要多个人协同完成。

                  2.每个人需要根据任务完成自己被分配的任务,每个人需要具备一定的计算能力

                  3.需要将所有人得到的结果需要汇总起来。

          在Web项目中,客户端通过Java代码读取数据库的数据,可以取一条,可以取十条,也可以全表加载出来,随着表中

    数据的增加,执行查询的速度就会越来越慢,因为需要将数据加载到内存中并通过网络传输到java程序中,这个过程的拼劲

    是网络IO,故障率也比较高,当数据量较大的时候,通过Mysql加载到应用服务器实践会越来越长,目前市场上硬件的价格

    比较低廉,因此可以将数据分散存储在多台机器上,每台机器分别运行得到结果,在汇总得到总的结果。如此,每个节点计

    算的数据量小了,虽然最后多了汇总的步骤,但是就整体而言速度提高了。如此,效率会有很大的提升。

         有上述两个例子可得,MapReduce在大数据量情况下是非常有益的。数据量越小,效果越不明显,有可能运算速度比传

    统的单机更慢。因为节点之间数据传输是通过网络传输的,本来数据量很小,在通过网络传输只会耗费更长时间,从而造成

    数据量运算的降低。

          如果计算程序存在于每个节点,数据也分散存储于每个节点,这样在数据需要通过计算程序计算时就无需通过网络传输,

    网络在通常数据传输时候延迟是比较高的,如此对任务作业的执行节省一些计算时间。这也是在工作环境中,通常存储数据

    的节点和计算程序会放在一起,即执行计算的数据都在本地,这也是“数据本地化”的体现。这样就解决了在Web项目中将数据

    传到Tomcat中,将数据从Mysql移动到计算节点的问题,更好的方案是数据仍在原地方存储,只是将计算的程序移动到存储

    数据的节点。把原来的移动数据的方式更改为了现在的移动计算的方式。<分布式计算一些术语>

        *****************************************************************************************

    2.MapReduce的提出

    • MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题。
    • MapReduce合并了两种经典函数: 映射(Mapping)对集合里的每个目标应用同一个操作。即,如果你想把表单里每个单元格乘以二,那么把这个函数单独地应用在每个单元格上的操作就属于mapping化简(Reducing )遍历集合中的元素来返回一个综合的结果。即,输出表单里一列数字的和这个任务属于reducing
    • MapReduce由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。
    • 这两个函数的形参是key、value对,表示函数的输入信息。
    • MapReduce在多于10PB数据时趋向于变慢。

         注:在Python和Scala中均添加了map和reduce内置函数。map和reduce顺序是不可颠倒的,可以想象上述发牌程序。

         

          思考:   1.如果让你自己实现MapReduce,如何设计?<架构师>

                      2.执行map任务多还是reduce多?

                        答案:map任务多于reduce,比如分别数牌的人多于汇总结果的人。执行map任务的节点明显多于reduce节点,执行reduce节点通常只有1个或多个。

                         map需要拿到原始数据源,而原始数据源是分散的,需要在map阶段处理,而reduce做的是汇总计算,节点较少,而map和reduce之间是通过网络传输

                         的,这也是mapreduce的短板。

                      3.一个split和一个mapper task任务是一一对应的,split和mapper任务之间传输的是[k1,v1]数据,mapper任务对应的是map操作

                       左下方是HDFS中的文件,存放在block块中,而block块本身是存放在namenode中文件之上是split,split之和和block是相同。

                       一个split和一个task任务是一一对应的。每一个task任务可以处理一个split,同一时间段所有的task可以并发执行

                      4.右下角也是HDFS目录和文件,每个reducer任务对应一个输出的part-0000x(x=1,2,3,4,5,6...)文件,reducer任务对应的是reduce操作(汇总操作)

                          数据最终来源于HDFS,如果从计算步骤上将计数据来源于mapper任务,即mapreduce的数据来源hdfs,回归到hdfs,也就是说mapreduce是运行在

                          Hdfs之上的。而map+reduce本身就是计算。

                       5.从图中可看出map在前,reduce在后,并且map节点数目大于reduce数目。

                       6.在mapreduce任务的业务逻辑是map和reduce<即:分布式计算模型>

                       6.shuffle本以为洗牌,仅仅是数据传输的方式(比如一对一,一对多,多对多等),一个mapper任务第n个输出必定指向相同的shuffle,shuffle通常是

                          mapreduce的拼劲。

                       7.Group分组,指的是按照Key将value分到各个组,分组之后的数据进入reduce。

                       8.map和reduce之间为啥会有分组的过程?

    ◆执行步骤:

    1. map任务处理

    1.1 读取输入文件(HDFS)内容,解析成key1、value1对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。

       注:key是当前行的起始位置,单位是字节。第一行的起始位置是0,value是当前行的内容。有多少行就产生多少键值对。每个键值对调用一个map函数。

             注意区别map任务与map函数,map函数仅仅是map任务中的一个步骤。

    1.2 覆盖map函数,写自己的逻辑,对输入的key1、value1处理,转换成新的key2、value2输出。

    1.3 对输出的key2、value2进行分区(默认只有一个分区)

    1.4 对不同分区的数据,按照key进行排序分组。相同key的value放到一个集合中。

    1.5 (可选)分组后的数据进行归约思考:何时可以规约?

    2.reduce任务处理

    2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。

            注:那哪些map任务进入到那些reduce节点,原则是按照分区

    2.2 对多个map任务的输出进行合并、排序。写reduce函数自己的业务逻辑,对输入的key2、values2处理,转换成新的key3、value3输出。

     

           注:为什么需要合并操作?因为需要将多个map任务输出的结果进行合并,合并之后既可以排序。

                 分组前后,键值对的数目有变化吗?答案:没有变化。

    2.3 把reduce的输出保存到文件(HDFS)中。

    例子:实现WordCountApp

                 -------------------

    hello you
    hello me
    -------------------
    MapReducer的执行过程
    1.map处理阶段
    1.1读取HDFS的输入文件中的内容,把每一行解析成一个<k1,v1>对。
    k1表示每一行的起始位置(单位byte),v1表示每一行的文本内容。每个键值对调用一次map函数。
    问:一共可以解析成多少个<k1,v1>?
    答:2个,分别是<0, hello you>,<10, hello me>,一共2次map函数
    1.2覆盖map函数,实现自己的业务逻辑。对输入的<k1,v1>做处理,转化为新的<k2,v2>输出。

    void map(k1,v1, context){
        String[] splited = v1.split(' ');
        for(String word : splited){
        context.write(<k2,v2>);  #可以理解为return的含义,或者没有返回值,但是通过上下文传递,如Java中通过按值传递和按地址传递

    }
    }
    分别是<hello,1><you,1><hello,1><me,1>    

     注:<key1,value1>和<key2,value2>是1对多情况,但是在什么情况下是一对0的情况?当无需调用context时候,因为有的记录行不符合记录要求,就无需写出去。

    1.3对map输出的<k2,v2>做分区。默认有1个分区。
    1.4每个分区中的<k2,v2>按照k2进行排序、分组。分组指的是把相同k的v放到一个集合中。
    排序后<hello,1><hello,1><me,1><you,1>   #按照k2进行排序
    分组后<hello,{1,1}><me,{1}><you,{1}>
    1.5归约
    2.reduce处理阶段
    2.1对多个map的输出,按照不同的分区,通过网络copy到不同的reduce节点。
    2.2对多个map的输出,进行合并、排序。覆盖reduce函数,实现自己的业务逻辑,对输入的<k2,v2s>进行处理,转化为新的<k3,v3>输出。
    void reduce(k2,v2s, context){
        long sum=0L;
        for(long times : v2s){
          sum += times;
        }
        context.write(k2,sum);
    }
    2.3把<k3,v3>写入到Hdfs中

    -------------------
    如何从源代码的角度分析map函数处理的<k1,v1>是如何从HDFS文件中获取的?
    答:
    1.从TextInputFormat入手分析,找到父类FileInputFormat,找到父类InputFormat。
    在InputFormat中找到2个方法,分别是getSplits(...)和createRecordReader(...)。
    通过注释知道getSplits(...)作用是把输入文件集合中的所有内容解析成一个个的InputSplits,每一个InputSplit对应一个mapper task。
    createRecordReader(...)作用是创建一个RecordReader的实现类。RecordReader作用是解析InputSplit产生一个个的<k,v>。
    2.在FileInputFormat中找到getSplits(...)的实现。
    通过实现,获知
    (1)每个SplitSize的大小和默认的block大小一致,好处是满足数据本地性。
    (2)每个输入文件都会产生一个InputSplit,即使是空白文件,也会产生InputSPlit;
    如果一个文件非常大,那么会按照InputSplit大小,切分产生多个InputSplit。
    3.在TextInputFormat中找到createRecordReader(...)的实现,在方法中找到了LineRecordReader。
    接下来分析LineRecordReader类。
    在RecordReader类中,通过查看多个方法,知晓key、value作为类的属性存在的,且知道了nextKeyValue()方法的用法。
    在LineRecordReader类中,重点分析了nextKeyValue(...)方法。在这个方法中,重点分析了newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
    在in.readLine(...)中,第一个形参存储被读取的行文本内容,返回值表示被读取内容的字节数。
    通过以上代码,分析了InputSplit中的内容是如何转化为一个个的<k,v>。
    4.从Mapper类中进行分析,发现了setup()、cleanup()、map()、run()。
    在run()方法中,通过while,调用context.nextKeyValue(...)。
    进一步分析Context的接口类是org.apache.hadoop.mapreduce.lib.map.WrappedMapper.MapContext,MapContext调用了nextKeyValue(...)。最终找到了MapContext的实现了MapContextImpl类org.apache.hadoop.mapreduce.task.MapContextImpl。
    在这个类的构造方法中,发现传入了RecordReader的实现类。

  • 相关阅读:
    git
    oracle object_id和data_object_id的区别
    statspack系列8
    statspack系列7
    statspack系列6
    statspack系列5
    statspack系列4
    statspack系列3
    statspack系列2
    MySQL源码之两阶段提交
  • 原文地址:https://www.cnblogs.com/jackchen-Net/p/6368247.html
Copyright © 2011-2022 走看看