zoukankan      html  css  js  c++  java
  • Hadoop小结

    Google大数据技术:MapReduce、BigTable、GFS

    Hadoop:一个模仿Google大数据技术的开源实现

    HDFS的概念

    数据块

    磁盘中的关系:

    HDFS同样也有块(block)的概念,但是大很多,默认为128MB。与单一磁盘上的文件系统相似,HDFS上的文件也被划分为块大小的多个分块(chunk),作为独立的存储单元。但与其他文件系统不同的是,HDFS中小于 一 个块大小的文件不会占据整个块的空间。

    名称节点和数据节点(NameNode and DataNode)

    • HDFS集群有两种节点,以管理者-工作者的模式运行,即一个名称节点(NameNode,管理者)和多个数据节点(DataNode,工作者)。名称节点管理文件系统的命名空间。它维护着这个文件系统树及这个树内所有的文件和索引目录。这些信息以两种形式将文件永久保存在本地磁盘上:命名空间镜像和编辑日志。名称节点也记录着每个文件的每个块所在的数据节点,但它并不永久保存块的位置,因为这些信息会在系统启动时由数据节点重建。
    • 客户端代表用户通过与名称节点和数据节点交互来访问整个文件系统。客户端提供一个类似POSIX(可移植操作系统界面)的文件系统接口,因此用户在编程时并不需要知道名称节点和数据节点及其功能。
    • 数据节点是文件系统的工作者。它们存储并提供定位块的服务(被用户或名称节点调用时),并且定时的向名称节点发送它们存储的块的列表。
    • 没有名称节点,文件系统将无法使用。事实上,如果运行名称节点的机器被毁坏,文件系统上所有的文件都会丢失,因为我们不知道如何通过数据节点上的块来重建文件。因此,名称节点能够经受故障是非常重要的,hadoop提供了两种机制来确保这一点。
    • 第一种机制就是复制那些组成文件系统元数据持久状态的文件。hadoop可以通过配置使名称节点在多个文件系统上写入持久化状态。这些写操作是具同步性和原子性的。一般的配置选择是,在本地磁盘上写入的同时,写入一个远程NFS(网络文件系统)挂载(mount)。
    • 另一种可行的方法是运行一个二级名称节点,虽然它不能作为名称节点使用。这个二级名称节点的重要作用就是定期的通过编辑日志合并命名空间镜像,以防止编辑日志过大。这个二级名称节点一般在其他单独的物理计算机上运行,因为它也需要占用大量CPU和内存来执行合并操作。它会保存合并后的命名空间镜像的副本,在名称节点失效后就可以使用。但是,二级名称节点的状态是比主节点滞后的,所以主节点的数据若全部丢失,损失仍然在所难免。这种情况下,一般把存在NFS(Network File System,网络文件系统)上的主名称节点元数据复制到二级名称节点上并将其作为新的主名称节点。

     有关二级名称节点更多的介绍:https://blog.csdn.net/x_i_y_u_e/article/details/52430932

    数据流

    •  文件读取剖析,为了了解客户端及与之交互的HDFS、名称节点和数据节点之间的数据流是怎样的,如图

     

    • 客户端通过调用FileSystem对象的open()来读取希望打开的文件,对于HDFS来说,这个对象是分布式文件系统的一个实例。DistributedFilesystem通过使用RPC来调用名称节点,以确定文件开头部分的块的位置(步骤2)。对于每一个块,名称节点返回具有该块副本的数据节点地址。此外,这些数据节点根据他们与客户端的距离来排序(根据网络集群的拓扑)。如果该客户端本身就是一个数据节点,便从本地数据节点上读取,Distributed Filesystem返回一个FSData InputStream对象(一个支持文件定位的输入流)给客户端读取数据。FSData InputStream转而包装了一个DFSInputStream对象。
    • 接着,客户端对这个输入流调用read()(步骤3)。存储着文件开头部分的块的数据节点地址的DFSInptStream随即与这些块最近的数据节点相连接。通过在数据流中重复调用read(),数据会从数据节点返回客户端(步骤4)。到达块的末端时,DFSInputStream会关闭与数据节点的联系,然后为下一个块找到最佳的数据节点(步骤5)。客户端只需要读取一个连续的流,这些对于客户端来说是透明的。
    • 客户端从流中读取数据时,块是按照DFSInputStream打开与数据节点的新连接的顺序读取的。它也会调用名称节点来检索下一组需要的块的数据节点的位置。一旦客户端完成读取,就对文件系统数据输入流调用close()(步骤6)

           在读取的时候,如果客户端在与数据节点通信时遇到一个错误,那么它就会去尝试对这个块来说下一个最近的块。它也会记住那个故障的数据节点,以保证不会再对之后的块进行徒劳无益的尝试。客户端也会确认从数据节点发来的数据的校验和。如果发现一个损坏的块,它就会在客户端试图从别的数据节点中读取一个块的副本之前报告给名称节点。

           这个设计的一个重点是,客户端直接联系数据节点去检索数据,并被名称节点指引到每个块中最好的数据节点。因为数据流动在此集群中是在所有数据节点分散进行的,所以这种设计能使HDFS可扩展到最大的并发客户端数量。同时,名称节点只不过是提供块位置请求(存储在内存中,因而非常高效),不是提供数据。否则如果客户端数量增长,名称节点会快速成为一个“瓶颈”。

    • 文件写入剖析,此处考虑的是创建一个新的文件,向其写入数据后关闭该文件。

     

    • 客户端通过Distributed Filesystem中调用create()来创建文件(图中步骤1)。DistributedFilesystem一个RPC去调用名称节点,在文件系统的命名空间中创建一个新的文件,没有块与之相联系(步骤2)。名称节点执行各种不同的检查后确保这个文件不会已经存在,并且客户端有可以创建文件的适当的许可。如果这些检查通过,名称节点就会生成一个新文件的记录;否则,文件创建失败并向客户端抛出一个IOException异常。分布式文件系统返回一个文件系统数据输出流,让客户端开始写入数据。就像读取事件一样,文件系统数据输出流控制一个DFSOutputStream,负责处理数据节点和名称节点之间的通信。
    • 在客户端写入数据时(步骤3),DFSOutputStream将它分成一个个的包,写入内部的队列,称为数据队列。数据队列随数据流流动,数据流的责任是根据适合的数据节点的列表来要求这些节点为副本分配新的块。这个数据节点的列表形成一个管线——我们假设这个副本数是3,所以有3个节点在管线中。数据流将包分流给管线中第一个的数据节点,这个节点会存储包并发送给管线中的第二个数据节点。同样地,第二个数据节点存储包并且传给管线中第三个(也是最后一个)数据节点(步骤4)。
    • DFSOutputStream也有一个内部的包队列来等待数据节点收到确认,称为确认队列。一个包只有在被管线中所有节点确认后才会被移出确认队列(步骤5)。
    • 如果在有数据写入期间,数据节点发生故障,则会执行下面的操作,当然这对写入数据的客户端而言,是透明的。首先管线被关闭,确认队列中的任何包都会被添加回数据队列的前面,以确保数据节点从失败的节点处是顺流的,不会漏掉任意一个包。当前的块在正常工作的数据节点中被给予一个新的身份并联系名称节点,以便能在故障数据节点后期恢复时其中的部分数据块会被删除。故障数据节点会从管线中删除并且余下块的数据会被写入管线中的两个好的数据节点。名称节点注意到块副本不足时,会在另一个节点上安排创建一个副本,随后,后续的块会继续正常处理。
    • 在一个块被写入期间多个数据节点发生故障的可能性虽然有但很少见。只要dfs.replication.min的副本(默认为1)被写入,写操作就是成功的,并且这个块会在集群中被异步复制,直到满足其目标副本数(dfs.replication的默认设置为3)。
    • 客户端完成数据的写入后,就会在流中调用close()(步骤6)。向名称节点发送完信息之前,此方法会将余下的所有包放入数据节点管线并等待确认(步骤7)。名称节点已经知道文件由哪些块组成(通过Data stream询问块分配),所以它只需在返回成功前等待块进行最小量的复制。

     

    副本的放置

           名称节点如何选择哪个数据节点来保存副本?我们需要在可靠性与写入带宽和读取带宽之间进行权衡。例如,因为副本管线都在单独一个节点上运行,所以把所有副本都放在一个节点基本上不会损失写入带宽,但这并没有实现真的冗余(如果节点发生故障,那么该块中的数据会丢失)。同样,离架读取的带宽是很高的,另一个极端,把副本放在不同的数据中心会最大限度地增大冗余,但会以带宽为代价。即使在相同的数据中心(所有的Hadoop集群到目前为止都运行在同一个数据中心),也有许多不同的放置策略。其实,Hadoop在发布的0.17.0版本中改变了放置策略来帮助保护块在集群间有相对平均的分布。

      Hadoop的策略是在与客户端相同的节点上放置第一个副本(若客户端运行在集群之外,就可以随机选择节点,不过系统会避免挑选那些太满或太忙的节点)。第二个副本被放置在第一个不同的随机选择的机架上(离架)。第三个副本被放置在与第二个相同的机架上,但放在不同的节点。更多的副本被放置在集群中的随机节点上,不过系统会尽量避免在相同的机架上放置太多的副本。

      一旦选定副本放置的位置,就会生成一个管线,会考虑到网络拓扑。副本数为3的管道看起来如图

      总的来说,这样的方法在稳定性(块存储在两个机架中)、写入宽带(写入操作只需要做一个单一网络转换)、读取性能(选择从两个机架中进行读取)和集群中块的分布(客户端只在本地机架写入一个块)之间,进行较好的平衡。

     

    MapReduce的工作原理

     

    • 整个过程如图,在最上层,有5个独立的实体:
    1. 客户端,提交MapReduce作业。
    2. YARN资源管理器,负责协调集群上计算机资源的分配。、
    3. YARN节点管理器,负责启动和监视集群中机器上的计算容器(container)。
    4. MapReduce的application master,负责协调运行MapReduce作业的任务。它和MapReduce任务在容器中运行,这些容器由资源管理器分配并由节点管理器进行管理。
    5. 分布式文件系统(一般是HDFS),用来与其他实体间共享作业文件。

      提交作业

      Job的submit()方法创建一个内部的JobSummiter实例,并且调用其submitJobInternal()方法(步骤1)。提交作业后,waitForCompletion()每秒轮询作业的进度,如果发现自上次报告后有改变,便把进度报告到控制台。作业完成后,如果成功,就显示作业计数器;如果失败,则导致作业失败的错误就被记录到控制台。

      JobSummiter所实现的作业提交过程如下:

    • 向资源管理器请求一个新应用ID,用于MapReduce作业ID(步骤2)
    • 检查作业的输出说明。例如,如果没有指定输出目录或输出目录已经存在,作业就不提交,错误抛回给MapReduce程序。
    • 计算作业的输入分片。如果分片无法计算,比如因为输入路径不存在,作业就不提交,错误返回给MapReduce程序。
    • 将运行作业所需要的资源(包括作业JAR文件、配置文件和计算所得的输入分片)复制到一个以作业ID命名的目录下的共享文件系统中(步骤3)。作业JAR的复本较多(由mapreduce.client.submit.file.replication属性控制,默认值为10),因此在运行作业的任务时,集群中有很多个复本可供节点管理器访问。
    • 通过调用资源管理器的submitApplication()方法提交作业(步骤4)

      作业的初始化

      资源管理器收到调用它的submitApplication()消息后,便将请求传递给YARN调度器(scheduler)。调度器分配一个容器,然后资源管理器在节点管理器的管理下在容器中启动application master的进程(步骤5a和5b)

      MapReduce作业的application master是一个Java应用程序,它的主类是MRAppMaster。由于将接受来自任务的进度和完成报告(步骤6),因此application master对作业的初始化是通过创建多个薄记对象以保持对作业进度的跟踪来完成的。接下来,它接受来自共享文件系统的、在客户端计算的输入分片(步骤7)。然后对每一个分片创建一个map任务对象以及由mapreduce.job.reduces属性(通过作业的setNumReduceTasks()方法设置)确定的多个reduce任务对象。任务ID在此时分配。

      application master必须决定如何运行构成MapReduce作业的各个任务。如果作业很小,就选择和自己在同一个JVM上运行任务。与在一个节点上顺序运行这些任务相比,当application master判断在新的容器中分配和运行任务的开销大于并行运行它们的开销时,就会发生这一情况。这样的作业称为uberized,或者作为uber任务运行。

      哪些作业是小作业?默认情况下,小作业就是少于10个mapper且只有一个reducer且输入大小小于一个HDFS块的作业(通过设置mapreduce.job.ubertask.maxmaps、mapreduce.job.ubertask.maxreducers和mapreduce.job.ubertask.maxbytes可以改变这几个值)。必须明确启用Uber任务(对于单个作业,或者是对整个集群),具体方法是将mapreduce.job.ubertask.enable设置为true。

      最后,在任何任务运行之前,application master调用setupJob()方法设置OutputCommiter。FileOutputCommiter为默认值,表示将建立作业的最终输出目录及任务输出的临时工作空间。

      任务的分配

      如果作业不适合作为uber任务运行,那么application master就会为该作业中的所有map任务和reduce任务向资源管理器请求容器(步骤8).首先为Map任务发出请求,该请求优先级高于reduce任务的请求,这是因为所有的map任务必须在reduce的排序阶段能够启动前完成。直到5%的map任务已经完成时,为reduce任务的请求才会发出。

      reduce任务能够在集群中任意位置运行,但是map任务的请求有着数据本地化局限,这也是调度器所关注的。在理想的情况下,任务是数据本地化(data local)的,意味着任务在分片驻留的同一节点上运行。可选的情况是,任务可能是机架本地阿花(rack local)的,即和分片在同一机架而非同一节点上运行。有一些任务既不是数据本地化,也不是机架本地化,它们会从别的机架,而不是运行所在的机架上获取自己的数据。对于一个特定的作业运行,可通过查看作业的计数器来确定在每个本地化层次上运行的任务的数量。

      请求也为任务指定了内存需求和CPU数。在默认情况下,每个map任务和reduce任务都分配到1024MB的内存和一个虚拟的内核,这些值可以在每个作业的基础上进行配置,分别通过4个属性来设置mapreduce.map.memory.mb、mapreduce.reduce.memory.mb、mapreduce.map.cpu.vcores和mapreduce.reduce.vcoresp.memory.mb。

      任务的执行

      一旦资源管理器的调度器为任务分配了一个特定节点上的容器,application master就通过与节点管理器通信来启动容器(步骤9a,9b)。该任务由主类为YarnChild的一个Java应用程序执行。在它运行任务之前,首先将任务需要的资源本地化,包括作业的配置、JAR文件和所有来自分布式缓存的文件(步骤10)。最后,运行map任务或reduce任务(步骤11)。

      YarnChild在指定的JVM中运行,因此用户定义的map或reduce函数(甚至是YarnChild)中的任何缺陷不会影响到节点管理器,例如导致其崩溃或挂起。

      每个任务都能够执行搭建(setup)和提交(commit)动作,它们和任务本身在同一个JVM中运行,并由作业的OutputCommitter确定。对于基于文件的作业,提交动作将任务输出由临时位置搬移到最终位置。提交协议确保当推测执行(speculative execution)被弃用时,只有一个任务副本被提交,其他的都被取消。

      流和管道

      Streaming运行特殊的map任务和reduce任务,目的是运行用户提供的可执行程序,并与之通信。

      

         (图为了偷懒,直接截的https://blog.csdn.net/weixin_41850738/article/details/81505369)

      Streaming任务使用标准输入和输出流与进程(可以用任何语言写)进行通信。在任务执行过程中,Java进程都会把输入键值对传给外部的进程,后者通过用户定义的map函数和reduce函数来执行它并把输出键值对传回java进程。从节点管理器的角度看,就像其子进程自己在运行map或reduce代码一样。 

      进度和状态的更新 

       MapReduce作业是长时间运行的批量作业,运行时间范围从几秒到几小时。这可能是一个很长的时间段,所以对于用户而言,能够得知关于作业进展的一些反馈是很重要的。一个作业和它的每个任务都有一个状态(status),包括:作业或任务的状态(比如,运行中,成功完成,失败)、map和reduce的进度、作业计数器的值、状态消息或描述(可以由用户代码来设置)。这些状态信息在作业期间不断改变,它们是如何与客户端通信的呢?

      任务在运行时,对其进度(progress,即任务完成百分比)保持追踪。对map任务,任务进度是已处理输入所占的比例。对reduce任务,情况稍微有点复杂,但系统仍然会估计已处理reduce输入的比例。整个过程分成三部分,与shuffle的三个阶段相对应。比如,如果任务已经执行reducer一半的输入,那么任务的进度便是5/6,这是因为已经完成复制和排序阶段(每个占1/3),并且已经完成reduce阶段的一半(1/6)。任务也有一组计数器,负责对任务运行过程中各个事件进行计数,这些计数器要么内置于框架中,例如已写入的map输出记录数,要么由用户自己定义。

      当map任务或reduce任务运行时,子进程和自己的父application master通过umbilical接口通信。每隔3秒中,任务通过这个umbilical接口向自己的application master报告进度和状态(包括计数器),application master会形成一个作业的汇聚试图(aggregate view)。

      资源管理器的界面显示了所有运行中的应用程序,并且分别有链接指向这些应用各自的application master的界面,这些界面展示了MapReduce作业的更多细节,包括其进度。

      在作业期间,客户端每秒轮询一次application master以接收最新状态(轮询间隔通过mapreduce.client.progressmonitor.pollinterval设置)。客户端也可以使用Job的getStatus()方法得到一个JobStatus的实例,后者包含作业的所有状态信息。

      作业的完成

      当application master收到作业最后一个任务已完成的通知后,便把作业的状态设置为“成功”。然后,在Job轮询状态时,便知道任务已成功完成,于是Job打印一条消息告知用户,然后从waitForCompletion()方法返回。Job的统计信息和技术值也在这个时候输出到控制台。

      如果application master有相应的设置,也会发送一个HTTP作业通知。希望收到回调指令的客户端可以通过mapreduce.job.end-notification.url属性来进行这项设置。

      最后,作业完成时,application master和任务容器清理其工作状态(这样中间输出将被删除),OutputCommitter的commitJob()方法会被调用。作业信息由作业服务器存档,以便日后用户需要时可以查询。

    Shuffle和排序

      MapReduce确保每个reduce的输入都是按键排序的。系统执行排序、将map输出作为输入传给reducer的过程称为shuffle。 

      map端

      map函数开始产生输出时,并不是简单地将它写到磁盘。这个过程更复杂,它利用缓冲的方式写到内存并出于效率的考虑进行预排序。

      每个map任务都有一个环形内存缓冲区用于存储任务输出。在默认情况下,缓冲区的大小为100MB,这个值可以通过改变mapreduce.task.io.sort.mb属性来调整。一旦缓冲内容达到阈值(mapreduce.map.sort.spill.percent,默认为0.80,或80%),一个后台线程便开始把内容溢出(spill)到磁盘。在溢出写到磁盘过程中,map输出继续写到缓冲区,但如果在此期间缓冲区被填满,map会被阻塞到写磁盘过程完成。溢出写过程按轮询方式将缓冲区中的内容写到mapreduce.cluster.local.dir属性在作业特定子目录下指定的目录中。

      在写磁盘之前,线程首先根据数据最终要传的reducer把数据划分成相应的分区(partition)。在每个分区中,后台线程按键进行内存中排序,如果有一个combiner函数,它就在排序后的输出上运行。运行combiner函数使得map输出结果更紧凑,因此减少写到磁盘的数据和传递给reducer的数据。

      每次内存缓冲区达到溢出阈值,就会新建一个溢出文件(spill file),因此在map任务写完其最后一个输出记录之后,会有几个溢出文件。在任务完成之前,溢出文件被合并成一个已分区且已排序的输出文件。配置属性mapreduce.task.io.sort.factor控制着一次最多能合并多少流,默认值是10。

      如果至少存在3个溢出文件(通过mapreduce.map.combine.minspills属性设置)时,则combiner就会在输出文件写到磁盘之前再次运行。combiner可以在输入上反复运行,但并不影响最终结果。如果只有1或2个溢出文件,那么由于map输出规模减少,因而不值得调用combiner带来的开销,因此不会为该map输出再次运行combiner。

      在将压缩map输出写到磁盘的过程中对它进行压缩往往是个很好的主意,因为这样会写磁盘的速度更快,节约磁盘空间,并且减少传给reducer的数据量。在默认情况下,输出是不压缩的,但只要将mapre.map.output.compress设置为true,就可以轻松启用此功能。使用的压缩裤由mapreduce.map.output.compress.codec指定。

      reducer通过HTTP得到输出文件的分区。用于文件分区的工作线程的数量由任务的mapreduce.shuffle.max.threads属性控制,此设置针对的是每一个节点管理器,而不是针对每个map任务。默认值0将最大线程数设置为机器中处理器数量的两倍。

      reduce端 

       map输出文件位于运行map任务的tasktracker的本地磁盘(注意,尽管map输出经常写到map tasktracker的本地磁盘,但reducer输出并不这样),现在,tasktracker需要为分区文件运行reducer任务。并且reduce任务需要集群上若干个map任务的map输出作为其特殊的分区文件。每个map任务的完成时间可能不同,因此在每个任务完成时,reduce任务就开始复制其输出。这就是reduce任务的复制阶段。reduce任务有少量复制线程,因此能够并行取得map输出。默认是5个线程,但这个默认值可以修改设置mapreduce.reduce.shuffle.parallelcopies属性即可。

      如果map输出相当小,会被复制到reducer任务JVM的内存(缓冲区大小由mapreduce.reduce.shuffle.input.buffer.percent属性控制,指定用于此用途的堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapreduce.reduce.shuffle.merge.percent决定)或达到map输出阈值(mapreduce.reduce.merge.inmem.threshold控制),则合并后溢出写到磁盘中。如果指定combiner,则在合并期间运行它以降低写入硬盘的数据量。

      随着磁盘上副本的增多,后台线程会将它们合并为更大的、排好序的文件。这会为后面的合并节省一些时间。注意,为了合并,压缩的map输出(通过map任务)都必须在内存中被解压缩。

      复制完所有map输出后,reduce任务进入排序阶段(更恰当的说法是合并阶段,因为排序是在map端进行的),这个阶段将合并map输出,维持其顺序排序。这是循环进行的。比如,如果有50个map输出,而合并因子是10(10为默认设置,由mapreduce.task.io.sort.factor属性设置,与map的合并类似),合并将进行5趟。每趟将10个文件合并成一个文件,因此最后有5个中间文件。

      在最后阶段,即reduce阶段,直接把数据输入reduce函数,从而省略了一次磁盘往返行程,并没有将这5个文件合并成一个已排序的文件作为最后一趟。最后的合并可用来自内存和磁盘片段。在reduce阶段,对已排序输出中的每个键调用reduce函数。此阶段的输出直接写到输出文件系统,一般为HDFS。如果采用HDFS,由于节点管理器也运行数据节点,所有第一个复本将被写到本地磁盘。

  • 相关阅读:
    28完全背包+扩展欧几里得(包子凑数)
    HDU 3527 SPY
    POJ 3615 Cow Hurdles
    POJ 3620 Avoid The Lakes
    POJ 3036 Honeycomb Walk
    HDU 2352 Verdis Quo
    HDU 2368 Alfredo's Pizza Restaurant
    HDU 2700 Parity
    HDU 3763 CDs
    POJ 3279 Fliptile
  • 原文地址:https://www.cnblogs.com/Mayny/p/9368103.html
Copyright © 2011-2022 走看看