MapReduce:大型集群上的简单数据处理
摘要
MapReduce是一个设计模型,也是一个处理和产生海量数据的一个相关实现。用户指定一个用于处理一个键值(key-value)对生成一组key/value对形式的中间结果的map函数,以及一个将中间结果键相同的键值对合并到一起的reduce函数。许多现实世界的任务都能满足这个模型,如这篇文章所示。
使用这个功能形式实现的程序能够在大量的普通机器上并行执行。这个运行程序的系统关心下面的这些细节:输入数据的分区、一组机器上调度程序执行、处理机器失败问题,以及管理所需的机器内部的通信。这使没有任何并行处理和分布式系统经验的程序员能够利用这个大型分布式系统的资源。
我们的MapReduce实现运行在一个由普通机器组成的大规模集群上,具有很高的可扩展性:一个典型的MapReduce计算会在几千台机器上处理许多TB的数据。程序员们发现这个系统很容易使用:目前已经实现了几百个MapReduce程序,在Google的集群上,每天有超过一千个的MapReduce工作在运行。
一、 介绍
在过去的5年中,本文作者和许多Google的程序员已经实现了数百个特定用途的计算程序,处理了海量的原始数据,包括抓取到的文档、网页请求日志等,计算各种衍生出来的数据,如反向索引、网页文档的图形结构的各种表示、每个host下抓取到的页面数量的总计、一个给定日期内的最频繁查询的集合等。大多数这种计算概念明确。然而,输入数据通常都很大,并且计算必须分布到数百或数千台机器上以确保在一个合理的时间内完成。如何并行计算、分布数据、处理错误等问题使这个起初很简单的计算,由于增加了处理这些问题的很多代码而变得十分复杂。
为了解决这个复杂问题,我们设计了一个新的抽象模型,它允许我们将想要执行的计算简单的表示出来,而隐藏其中并行计算、容错、数据分布和负载均衡等很麻烦的细节。我们的抽象概念是受最早出现在lisp和其它结构性语言中的map和reduce启发的。我们认识到,大多数的计算包含对每个在输入数据中的逻辑记录执行一个map操作以获取一组中间key/value对,然后对含有相同key的所有中间值执行一个reduce操作,以此适当的合并之前的衍生数据。由用户指定map和reduce操作的功能模型允许我们能够简单的进行并行海量计算,并使用re-execution作为主要的容错机制。
这项工作的最大贡献是提供了一个简单的、强大的接口,使我们能够自动的进行并行和分布式的大规模计算,通过在由普通PC组成的大规模集群上实现高性能的接口来进行合并。
第二章描述了基本的编程模型,并给出了几个例子。第三章描述了一个为我们的聚类计算环境定制的MapReduce接口实现。第四章描述了我们发现对程序模型很有用的几个优化。第六章探索了MapReduce在Google内部的使用,包括我们在将它作为生产索引系统重写的基础的一些经验。第七章讨论了相关的和未来的工作。
二、 编程模型
这个计算输入一个key/value对集合,产生一组输出key/value对。MapReduce库的用户通过两个函数来标识这个计算:Map和Reduce。
Map,由用户编写,接收一个输入对,产生一组中间key/value对。MapReduce库将具有相同中间key I的聚合到一起,然后将它们发送给Reduce函数。
Reduce,也是由用户编写的,接收中间key I和这个key的值的集合,将这些值合并起来,形成一个尽可能小的集合。通常,每个Reduce调用只产生0或1个输出值。这些中间值经过一个迭代器(iterator)提供给用户的reduce函数。这允许我们可以处理由于数据量过大而无法载入内存的值的链表。
2.1 例子
考虑一个海量文件集中的每个单词出现次数的问题,用户会写出类似于下面的伪码:
Map函数对每个单词增加一个相应的出现次数(在这个例子中仅仅为“1”)。Reduce函数将一个指定单词所有的计数加到一起。
此外,用户使用输入和输出文件的名字、可选的调节参数编写代码,来填充一个mapreduce规格对象,然后调用MapReduce函数,并把这个对象传给它。用户的代码与MapReduce库(C++实现)连接到一起。。附录A包含了这个例子的整个程序。
2.2 类型
尽管之前的伪代码中使用了字符串格式的输入和输出,但是在概念上,用户定义的map和reduce函数需要相关联的类型:
map (k1, v1) --> list(k2, v2)
reduce (k2, list(v2)) --> list(v2)
也就是说,输入的键和值和输出的键和值来自不同的域。此外,中间结果的键和值与输出的键和值有相同的域。
MapReduce的C++实现与用户定义的函数使用字符串类型进行参数传递,将类型转换的工作留给用户的代码来处理。
2.3 更多的例子
这里有几个简单有趣的程序,能够使用MapReduce计算简单的表示出来。
分布式字符串查找(Distributed Grep):map函数将匹配一个模式的行找出来。Reduce函数是一个恒等函数,只是将中间值拷贝到输出上。
URL访问频率计数(Count of URL Access Frequency):map函数处理web页面请求的日志,并输出<URL, 1>。Reduce函数将相同URL的值累加到一起,生成一个<URL, total count>对。
翻转网页连接图(Reverse Web-Link Graph):map函数为在一个名为source的页面中指向目标(target)URL的每个链接输出<target, source>对。Reduce函数将一个给定目标URL相关的所有源(source)URLs连接成一个链表,并生成对:<target, list(source)>。
主机关键向量指标(Term-Vector per Host):一个检索词向量将出现在一个文档或是一组文档中最重要的单词概述为一个<word, frequency>对链表。Map函数为每个输入文档产生一个<hostname, term vector>(hostname来自文档中的URL)。Reduce函数接收一个给定hostname的所有文档检索词向量,它将这些向量累加到一起,将罕见的向量丢掉,然后生成一个最终的<hostname, term vector>对。
倒排索引(Inverted Index):map函数解析每个文档,并生成一个<word, document ID>序列。Reduce函数接收一个给定单词的所有键值对,所有的输出对形成一个简单的倒排索引。可以通过对计算的修改来保持对单词位置的追踪。
分布式排序(Distributed Sort):map函数将每个记录的key抽取出来,并生成一个<key, record>对。Reduce函数不会改变任何的键值对。这个计算依赖了在4.1节提到的分区功能和4.2节提到的排序属性。
三、 实现
MapReduce接口有很多不同的实现,需要根据环境来做出合适的选择。比如,一个实现可能适用于一个小的共享内存机器,而另一个实现则适合一个大的NUMA多处理器机器,再另一个可能适合一个更大的网络机器集合。
这一章主要描述了针对在Google内部广泛使用的计算环境的一个实现:通过交换以太网将大量的普通PC连接到一起的集群。在我们的环境中:
(1) 机器通常是双核x86处理器、运行Linux操作系统、有2-4G的内存。
(2) 使用普通的网络硬件—通常是100Mb/s或者是1Gb/s的机器带宽,但是平均值远小于带宽的一半。
(3) 由数百台或者数千台机器组成的集群,因此机器故障是很平常的事
(4) 存储是由直接装在不同机器上的便宜的IDE磁盘提供。一个内部的分布式文件系统用来管理存储这些磁盘上的数据。文件系统在不可靠的硬件上使用副本机制提供了可用性和可靠性。
(5) 用户将工作提交给一个调度系统,每个工作由一个任务集组成,通过调度者映射到集群中可用机器的集合上。
3.1 执行概述
通过自动的将输入数据分区成M个分片,Map调用被分配到多台机器上运行。数据的分片能够在不同的机器上并行处理。使用分区函数(如,hash(key) mod R)将中间结果的key进行分区成R个分片,Reduce调用也被分配到多台机器上运行。分区的数量(R)和分区函数是由用户指定的。
图1:执行概述
图1中显示了我们实现的一个MapReduce操作的整个流程。当用户程序调用MapReduce函数时,下面一系列的行为将会发生(图1中所使用的数字标识将与下面列表中的相对应):
1. 用户程序中的MapReduce库会先将输入文件分割成M个通常为16MB-64MB大小的片(用户可以通过可选参数进行控制)。然后它将在一个集群的机器上启动许多程序的拷贝。
2. 这些程序拷贝中的一个是比较特殊的——master。其它的拷贝都是工作进程,是由master来分配工作的。有M个map任务和R个reduce任务被分配。Master挑选出空闲的工作进程,并把一个map任务或reduce任务分配到这个进程上。
3. 一个分配了map任务的工作进程读取相关输入分片的内容,它将从输入数据中解析出key/value对,并将其传递给用户定义的Map函数。Map函数生成的中间key/value对缓存在内存中。
4. 缓存中的键值对周期性的写入到本地磁盘,并通过分区函数分割为R个区域。将这些缓存在磁盘上的键值对的位置信息传回给master,master负责将这些位置信息传输给reduce工作进程。
5. 当一个reduce工作进程接收到master关于位置信息的通知时,它将使用远程调用函数(RPC)从map工作进程的磁盘上读取缓存的数据。当reduce工作进程读取完所有的中间数据后,它将所有的中间数据按中间key进行排序,以保证相同key的数据聚合在一起。这个排序是需要的,因为通常许多不同的key映射到相同的reduce任务上。如果中间数据的总量太大而无法载入到内存中,则需要进行外部排序。
6. reduce工作进程迭代的访问已排序的中间数据,并且对遇到的每个不同的中间key,它会将key和相关的中间values传递给用户的Reduce函数。Reduce函数的输出追加到当前reduce分区一个最终的输出文件上。
7. 当所有的map任务和reduce任务完成后,master会唤醒用户程序。这时候,用户程序中的MapReduce调用会返回到用户代码上。
在成功完成后,MapReduce操作输出到R个输出文件(每个reduce任务生成一个,文件名是由用户指定的)中的结果是有效的。通常,用户不需要合并这R个输出文件,它们经常会将这些文件作为输入传递给另一个MapReduce调用,或者在另一个处理这些输入分区成多个文件的分布式应用中使用。
3.2 Master数据结构
Master保留了几个数据结构。对于每个Map和Reduce任务,它存储了它们的状态(idle、in-progress或者completed),以及工作进程机器的特性(对于非空闲任务)。
Master是中间文件区域的位置信息从map任务传送到reduce任务的一个通道。因此,对于每个完成的map任务来说,master存储了map任务产生的R个中间文件区域的位置信息和大小。在map任务完成时,master会接收到更新这个含有位置信息和大小信息的消息。信息被增量的传输到运行in-progress的reduce任务的工作进程上。
3.3 容错
因为MapReduce库是被设计成运行在数百或数千台机器上帮助处理海量数据的,所以这个库必须能够优雅的处理机器故障。
工作进程故障
Master周期性的ping每个工作进程,如果在一个特定的时间内没有收到响应,则master会将这个工作进程标记为失效。任何由失效的工作进程完成的map任务都被标记为初始idle状态,因此这个map任务会被重新分配给其它的工作进程。同样的,任何正在处理的map任务或reduce任务也会被置为idle状态,进而可以被重新调度。
在一个失效的节点上完成的map任务会被重新执行,因为它们的输出被存放在失效机器的本地磁盘上,而磁盘不可访问。完成的reduce任务不需要重新执行,因为它们的输出被存储在全局文件系统上。
当一个map任务先被工作进程A执行,然后再被工作进程B执行(因为A失效了),所有执行reduce任务的工作进程都会接收到重新执行的通知,任何没有从工作进程A上读取数据的reduce任务将会从工作进程B上读取数据。
MapReduce对于大规模工作进程失效有足够的弹性。比如,在一个MapReduce操作处理过程中,网络维护造成了80台机器组成的集群几分钟内不可达。MapReduce的master会重新执行那些在不可达机器上完成的工作,并持续推进,最终完成MapReduce操作。
Master故障
将上面提到的master数据结构周期性的进行写检查点操作(checkpoint)是比较容易的。如果master任务死掉,一个新的拷贝会从最近的检查点状态上启动。然而,假定只有一个单独的master,它的故障是不大可能的。因此,如果master故障,我们当前的实现是中止MapReduce计算。
当前故障的语义
当用户提供的map和reduce操作是输入确定性函数,我们的分布式实现与无故障序列执行整个程序所生成的结果相同。
我们依靠map和reduce任务输出的原子性提交来实现这个属性。每个in-progress任务将它们的输出写入到一个私有的临时文件中。一个reduce任务产生一个这样的文件,一个map任务产生R个这样的文件(每个reduce任务一个)。当一个map任务完成时,它将发送给master一个消息,其中包括R个临时文件的名字。如果master收到一个已经完成的map任务的完成消息,则忽略这个消息。否则,它将这R个文件名记录在master的数据结构中。
当一个reduce任务完成后,reduce的工作进程自动的将临时文件更名为最终的输出文件,如果相同的reduce任务运行在多台机器上,会调用多个重命名操作将这些文件更名为最终的输出文件。
绝大部分的map和reduce操作是确定性的,事实上,在这种情况下我们的语义与一个序列化的执行是相同的,这使程序开发者能够简单的推出他们程序的行为。当map和/或reduce操作是不确定性的时,我们提供较弱但依然合理的语义。在不确定性的操作面前,一个特定的reduce任务R1的输出与一个序列执行的不确定性程序生成的输出相同。然而,一个不同的reduce任务R2的输出可能与一个不同的序列执行的不确定性程序生成的输出可能一致。
考虑map任务M和reduce任务R1和R2。假定e(Ri)是提交的Ri的执行过程(有且仅有这样一个过程)。e(R1)可能从M的一个执行生成的输出中读取数据,e(R2)可能从M的一个不同执行生成的输出中读取数据,则会产生较弱的语义。
3.4 位置
在我们的计算环境中,网络带宽是一个相对不足的资源。我们通过将输入数据存放在组成集群的机器的本地磁盘来节省网络带宽。GFS将每个文件分割成64MB大小的块,每个块会在不同的机器上存储几个拷贝(通常为3个)。MapReduce master会考虑文件的位置信息,并试图将一个map任务分配到包含相关输入数据副本的机器上。如果这样做失败,它会试图将map任务调度到一个包含任务输入数据的临近的机器上(例如,与包含输入数据机器在同一个网络下进行交互的一个工作进程)。当在集群的一个有效部分上运行大规模的MapReduce操作时,大多数输入数据都从本地读取,不消耗任何网络带宽。
3.5 任务粒度
根据上面所提到的,我们将map阶段细分为M个片,将reduce阶段细分为R个片。理想情况下,M和R应该比工作机器的数量大得多,每个工作进程执行很多不同的任务来促使负载均衡,在一个工作进程失效时也能够快速的恢复:许多完成的map任务可以传播到其它所有的工作机器上。
在我们的实现中,对于取多大的M和R有一个实际的界限,因为如上面提到的那样,master必须进行O(M+R)次调度,在内存中保持O(M*R)个状态。(对内存使用的恒定因素影响较小,然而:对由每个map任务/reduce任务对占用大约一个字节所组成的O(M*R)片的状态影响较大。)
此外,R经常是由用户约束的,因为每个reduce任务的输出最终放在一个分开的输出文件中。实际中,我们倾向选择M值,以使每一个独立的任务能够处理大约16MB到64MB的输入数据(可以使上面提到的位置优化有更好的效果),把R值设置为我们想使用的工作机器的一个小的倍数。我们经常使用2000个工作机器,设置M=200000和R=5000,来执行MapReduce计算。
3.6 备用任务
影响一个MapReduce操作整体执行时间的一个通常因素是“落后者”:一个使用了异常的时间完成了计算中最后几个map任务或reduce任务中的一个的机器。可能有很多因素导致落后者的出现,例如,一个含有损坏磁盘的机器频繁的处理可校正的错误,使它的读取速度从30MB/s下降到了1MB/s。集群调度者可能将其它的任务分配到这个机器上,由于CPU、内存、磁盘或网络带宽的竞争会导致MapReduce代码执行的更慢。我们遇到的最近一个问题是机器初始化代码中的一个bug,它会使处理器的缓存不可用:受到这个问题影响的机器会慢上百倍。
我们使用一个普通的机制来缓解落后者问题。当一个MapReduce操作接近完成时,master调度备用(backup)任务执行剩下的、处于in-process状态的任务。一旦主任务或是备用任务完成,则将这个任务标识为已经完成。我们优化了这个机制,使它通常能够仅仅增加少量的操作所使用的计算资源。我们发现这能有效的减少完成大规模MapReduce操作所需要的时间。作为一个例子,5.3节所描述的那种程序在禁用备用任务机制的情况下,会需要多消耗44%的时间。
四、 细化
尽管简单的编写Map和Reduce函数提供的基本功能足够满足大多数需要,但是,我们发现一些扩展是很有用的。这会在本章进行描述。
4.1 分区函数
MapReduce的用户指定所希望的reduce任务/输出文件的数量(R)。使用分区函数在中间键上将数据分区到这些任务上。一个默认的分区函数使用hash方法(如“hash(key) mod R”),它能产生相当平衡的分区。然而,在一些情况下,需要使用其它的在key上的分区函数对数据进行分区。为了支持这种情况,MapReduce库的用户能够提供指定的分区函数。例如,使用“hash(Hostname(urlkey)) mod R”作为分区函数,使所有来自同一个host的URL最终放到同一个输出文件中。
4.2 顺序保证
我们保证在一个给定的分区内,中间key/value对是根据key值顺序增量处理的。顺序保证可以使它易于生成一个有序的输出文件,这对于输出文件需要支持有效的随机访问,或者输出的用户方便的查找排序的数据很有帮助。
4.3 组合(Combiner)函数
在一些情况下,每个map任务产生的中间key会有很多重复,并且用户指定的reduce函数满足结合律和交换律。2.1节中提到的单词技术的例子就是一个很好的例子。因为单词频率倾向于zifp分布,每个map任务都会产生数百或数千个<the, 1>形式的记录。所有这些计数都会通过网络发送给一个单独的reduce任务,然后通过Reduce函数进行累加并产生一个数字。我们允许用户指定一个可选的Combiner函数,它能在数据通过网络发送前先对这些数据进行局部合并。
Combiner函数在每台执行map任务的机器上执行。通常情况下,combiner函数和reduce函数的代码是相同的,两者唯一不同的是MapReduce库如何处理函数的输出。Reduce函数的输出被写入到一个最终的输出文件中,而combiner函数会写入到一个将被发送给reduce函数的中间文件中。
局部合并可以有效的对某类MapReduce操作进行加速。附录A包含了一个使用combiner函数的例子。
4.4 输入和输出类型
MapReduce库支持几种不同格式的输入数据。比如,“text”模式的输入可以讲每一行看出一个key/value对:key是该行在文件中的偏移量,value是该行的内容。另一中常见的支持格式是根据key进行排序存储一个key/value对的序列。每种输入类型的实现知道如何将自己分割成对map任务处理有意义的区间(例如,text模式区间分割确保区间分割只在行的边界进行)。用户能够通过实现一个简单的读取(reader)接口来增加支持一种新的输入类型,尽管大多数用户仅仅使用了预定义输入类型中的一小部分。
Reader并不是必须从文件中读取数据,比如,我们可以容易的定义一个从数据库中读取记录,或者从内存的数据结构中读取数据的Reader。
类似的,我们提供一组输出类型来产生不同格式的数据,用户也可以简单的通过代码增加对新输出类型的支持。
4.5 副作用
在一些情况下,MapReduce的用户发现为它们的map和/或reduce操作的输出生成辅助的文件很方便。我们依靠应用的writer将这个副作用变成原子的和幂等的。通常,应用会将结果写入到一个临时文件,然后在数据完全生成后,原子的重命名这个文件。
如果一个单独任务产生的多个输出文件,我们没有提供两阶段提交的原子操作。因此,产生多个输出文件且对交叉文件有一致性需求的任务应该是确定性的操作。但是在实际工作中,这个限制并不是一个问题。
4.6 跳过损坏的记录
有时,在我们的代码中会存在一些bug,它们会导致Map或Reduce函数在处理特定的记录上一定会Crash。这样的bug会阻止MapReduce操作顺利完成。通常的做法是解决这个bug,但有时,这是不可行的;可能是由于第三方的库中的bug,而我们没有这个库的源码。有时,忽略一些记录也是可以接受的,例如,当在海量的数据集上做数据统计时。我们提供了一个可选的运行模式,MapReduce库探测出哪些记录会导致确定性的Crash,并跳过这些记录以继续执行这个程序。
每个工作进程都安装了一个信号处理器,它能捕获段错误和总线错误。在调用用户的Map或Reduce操作之前,MapReduce库将记录的序号存储到全局变量中。如果用户代码产生一个信号,这个信号处理器会向MapReudce master发送一个“临死前”的UDP包,其中包含了这个序号。当master看到对于一个特定的记录有多个失败信号时,在相应的Map或Reduce任务下一次重新执行时,master会通知它跳过这个记录。
4.7 本地执行
在Map或Reduce函数中调试问题是很棘手的,因为实际的计算是发生在一个分布式系统上的,通常有几千台机器,并且是由master动态分配的。为了有助于调试、性能分析和小规模测试,我们开发了一个MapReduce库可供选择的实现,它将在本地机器上序列化的执行一个MapReduce的所有工作。这为用户提供了对MapReduce操作的控制,使计算能被限制在一个特定的map任务上。用户使用标记调用他们的程序,并能够简单的使用它们找到的任何调试或测试工具(如,gdb)。
4.8 状态信息
Master运行了一个内部的HTTP服务,并显示出状态集页面供人们查看,如,有多少任务已经完成、有多少正在处理、输入的字节数、中间数据的字节数、输出的字节数、处理速率等。这些页面也包含了指向每个任务生成的标准错误和标准输出文件的链接。用户能使用这些数据预测这个计算将要持续多长时间,以及是否应该向这个计算添加更多的资源。这些页面也有助于找出计算比预期执行慢的多的原因。
此外,顶层的状态页显示了哪些工作进程失效,哪些map和reduce任务在处理时失败。这个信息对试图诊断出用户代码中的bug很有用。
4.9 计数器
MapReduce库提供了一个计数器,用于统计不同事件的发生次数。比如,用户代码想要统计已经处理了多少单词,或者已经对多少德国的文档建立了索引等。
用户代码可以使用这个计数器创建一个命名的计数器对象,然后在Map和/或Reduce函数中适当的增加这个计数器的计数。例如:
独立的工作机器的计数器值周期性的传送到master(附在ping的响应上)master将从成功的map和reduce任务上获取的计数器值进行汇总,当MapReduce操作完成时,将它们返回给用户的代码。当前的计数器值也被显示在了master的状态页面上,使人们能够看到当前计算的进度。当汇总计数器值时,master通过去掉同一个map或reduce任务的多次执行所造成的影响来防止重复计数。(重复执行可能会在我们使用备用任务和重新执行失败的任务时出现。)
一些计数器的值是由MapReduce库自动维护的,如已处理的输入key/value对的数量和已生成的输出key/value对的数量。
用户发现计数器对检查MapReduce操作的行为很有用处。例如,在一些MapReduce操作中,用户代码可能想要确保生成的输出对的数量是否精确的等于已处理的输入对的数量,或者已处理的德国的文档数量在已处理的所有文档数量中是否被容忍。
五、 性能
在这章中,我们测试两个运行在一个大规模集群上的MapReduce计算的性能。一个计算在大约1TB的数据中进行特定的模式匹配,另一个计算对大约1TB的数据进行排序。
这两个程序能够代表实际中大量的由用户编写的MapReduce程序,一类程序将数据从一种表示方式转换成另一种形式;另一类程序是从海里的数据集中抽取一小部分感兴趣的数据。
5.1 集群配置
所有的程序运行在一个由将近1800台机器组成的集群上。每个机器有两个2GHz、支持超线程的Intel Xeon处理器、4GB的内存、两个160GB的IDE磁盘和一个1Gbps的以太网链路,这些机器部署在一个两层的树状交换网络中,在根节点处有大约100-200Gbps的带宽。所有的机器都采用相同的部署,因此任意两个机器间的RTT都小于1ms。
在4GB内存里,有接近1-1.5GB用于运行在集群上的其它任务。程序在一个周末的下午开始执行,这时主机的CPU、磁盘和网络基本都是空闲的。
5.2 字符串查找(Grep)
这个grep程序扫描了大概1010个100字节大小的记录,查找出现概率相对较小的3个字符的模式(这个模式出现在92337个记录中)。输入被分割成接近64MB的片(M=15000),整个输出被放到一个文件中(R=1)。
图2:数据传输速率
图2显示了计算随时间的进展情况。Y轴显示了输入数据的扫描速率,这个速率会随着MapReduce计算的机器数量的增长而增长,当1764个工作进程参与计算时,总的速率超过30GB/s。随着map任务的完成,速率开始下降,并在计算的大约第80秒变为0,整个计算从开始到结束大约持续了150秒,这包含了大约1分钟的启动时间开销,这个开销是由将程序传播到所有工作机器的时间、等待GFS文件系统打开1000个输入文件集的时间和获取位置优化所需信息的时间造成的。
5.3 排序
排序程序对1010个100字节大小的记录(接近1TB的数据)进行排序,这个程序模仿了TeraSort benchmark。
排序程序由不到50行的用户代码组成,一个三行的Map函数从一个文本行中抽取出一个10字节的key,并将这个key和原始的文本行作为中间的key/value对进行输出。我们使用内置的Identity函数作为Reduce操作。这个函数将中间key/value对不做任何修改的输出,最终排序结果输出到两路复制的GFS文件中(如,该程序输出了2TB的数据)。
如前所述,输入数据被分割为64MB大小的片(M=15000),将输出结果分成4000个文件(R=4000)。分区函数使用了key的开头字符将数据分隔到R片中的一个。
这个基准测试的分区函数内置了key的分区信息。在一个普通的排序程序中,我们将增加一个预处理MapReduce操作,它能够对key进行抽样,通过key的抽样分布来计算最终排序处理的分割点。
图3:对于排序程序的不同执行过程随时间的数据传输速率
图3(a)显示了排序程序的正常执行过程。左上方的图显示了输入读取的速率,这个速率峰值大约为13GB/s,因为所有的map任务执行完成,速率也在200秒前下降到了0。注意,这里的输入速率比字符串查找的要小,这是因为排序程序的map任务花费了大约一半的处理时间和I/O带宽将终结结果输出到它们的本地磁盘上,字符串查找相应的中间结果输出几乎可以忽略。
左边中间的图显示了数据通过网络从map任务发往reduce任务的速率。这个缓慢的数据移动在第一个map任务完成时会尽快开始。图中的第一个峰值是启动了第一批大概1700个reduce任务(整个MapReduce被分配到大约1700台机器上,每个机器每次最多只执行一个reduce任务)。这个计算执行大概300秒后,第一批reduce任务中的一些执行完成,我们开始执行剩下的reduce任务进行数据处理。所有的处理在计算开始后的大约600秒后完成。
左边下方的图显示了reduce任务就爱那个排序后的数据写到最终的输出文件的速率。在第一个处理周期完成到写入周期开始间有一个延迟,因为机器正在忙于对中间数据进行排序。写入的速率会在2-4GB/s上持续一段时间。所有的写操作会在计算开始后的大约850秒后完成。包括启动的开销,整个计算耗时891秒,这与TeraSort benchmark中的最好记录1057秒相似。
一些事情需要注意:因为我们的位置优化策略,大多数数据从本地磁盘中读取,绕开了网络带宽的显示,所以输入速率比处理速率和输出速率要高。处理速率要高于输出速率,因为输出过程要将排序后的数据写入到两个拷贝中(为了可靠性和可用性,我们将数据写入到两个副本中)。我们将数据写入两个副本,因为我们的底层文件系统为了可靠性和可用性提供了相应的机制。如果底层文件系统使用容错编码(erasure coding)而不是复制,写数据的网络带宽需求会降低。
5.4 备用任务的作用
在图3(b)中,我们显示了一个禁用备用任务的排序程序的执行过程。执行的流程与如3(a)中所显示的相似,除了有一个很长的尾巴,在这期间几乎没有写入行为发生。在960秒后,除了5个reduce任务的所有任务都执行完成。然而,这些落后者只到300秒后才执行完成。整个计算任务耗时1283秒,增加了大约44%的时间。
5.5 机器故障
在图3(c)中,我们显示了一个排序程序的执行过程,在计算过程开始都的几分钟后,我们故意kill掉了1746个工作进程中的200个。底层的调度者会迅速在这些机器上重启新的工作进程(因为只有进程被杀掉,机器本身运行正常)。
工作进程死掉会出现负的输入速率,因为一些之前已经完成的map工作消失了(因为香港的map工作进程被kill掉了),并且需要重新执行。这个map任务会相当快的重新执行。整个计算过程在933秒后完成,包括了启动开销(仅仅比普通情况多花费了5%的时间)。
六、 经验
我们在2003年2月完成了MapReduce库的第一个版本,并在2003年8月做了重大的改进,包括位置优化、任务在工作机器上的动态负载均衡执行等。从那时起,我们惊喜的发现,MapReduce库能够广泛的用于我们工作中的各种问题。它已经被用于Google内部广泛的领域,包括:
- 大规模机器学习问题
- Google新闻和Froogle产品的集群问题
- 抽取数据用于公众查询的产品报告
- 从大量新应用和新产品的网页中抽取特性(如,从大量的位置查询页面中抽取地理位置信息)
- 大规模图形计算
图4:随时间变化的MapReduce实例
图4中显示了在我们的源码管理系统中,随着时间的推移,MapReduce程序的数量有明显的增加,从2003年早期的0增加到2004年9月时的900个独立的实例。MapReduce如此的成功,因为它使利用半个小时编写的一个简单程序能够高效的运行在一千台机器上成为可能,这极大的加快了开发和原型设计的周期。此外,它允许没有分布式和/或并行系统经验的开发者能够利用这些资源开发出分布式应用。
表1: 2004年8月运行的MapReduce任务
在每个工作的最后,MapReduce库统计了工作使用的计算资源。在表1中,我们看到一些2004年8月在Google内部运行的MapReduce工作的一些统计数据。
6.1 大规模索引
目前为止,MapReduce最重要的应用之一就是完成了对生产索引系统的重写,它生成了用于Google网页搜索服务的数据结构。索引系统的输入数据是通过我们的爬取系统检索到的海量文档,存储为就一个GFS文件集合。这些文件的原始内容还有超过20TB的数据。索引程序是一个包含了5-10个MapReduce操作的序列。使用MapReduce(代替了之前版本的索引系统中的adhoc分布式处理)有几个优点:
- 索引程序代码是一个简单、短小、易于理解的代码,因为容错、分布式和并行处理都隐藏在了MapReduce库中。比如,一个计算程序的大小由接近3800行的C++代码减少到使用MapReduce的大约700行的代码。
- MapReduce库性能非常好,以至于能够将概念上不相关的计算分开,来代替将这些计算混合在一起进行,避免额外的数据处理。这会使索引程序易于改变。比如,对之前的索引系统做一个改动大概需要几个月时间,而对新的系统则只需要几天时间。
- 索引程序变得更易于操作,因为大多数由于机器故障、机器处理速度慢和网络的瞬间阻塞等引起的问题都被MapReduce库自动的处理掉,而无需人为的介入。
七、 相关工作
许多系统都提供了有限的程序模型,并且对自动的并行计算使用了限制。比如,一个结合函数可以在logN时间内在N个处理器上对一个包含N个元素的数组使用并行前缀计算,来获取所有的前缀[6,9,13]。MapReduce被认为是这些模型中基于我们对大规模工作计算的经验的简化和精华。更为重要的是,我们提供了一个在数千个处理器上的容错实现。相反的,大多数并行处理系统只在较小规模下实现,并将机器故障的处理细节交给了程序开发者。
Bulk Synchronous Programming和一些MPI源于提供了更高层次的抽象使它更易于让开发者编写并行程序。这些系统和MapReduce的一个关键不同点是MapReduce开发了一个有限的程序模型来自动的并行执行用户的程序,并提供了透明的容错机制。
我们的位置优化机制的灵感来自于移动磁盘技术,计算用于处理靠近本地磁盘的数据,减少数据在I/O子系统或网络上传输的次数。我们的系统运行在挂载几个磁盘的普通机器上,而不是在磁盘处理器上运行,但是一般方法是类似的。
我们的备用任务机制与Charlotte系统中采用的eager调度机制类似。简单的Eager调度机制有一个缺点,如果一个给定的任务造成反复的失败,整个计算将以失败告终。我们通过跳过损坏计算路的机制,解决了这个问题的一些情况。
MapReduce实现依赖了内部集群管理系统,它负责在一个大规模的共享机器集合中分发和运行用户的任务。尽管不是本篇文章的焦点,但是集群管理系统在本质上与像Condor的其它系统类似。
排序功能是MapReduce库的一部分,与NOW-Sort中的操作类似。源机器(map工作进程)将将要排序的数据分区,并将其发送给R个Reduce工作进程中的一个。每个reduce工作进程在本地对这些数据进行排序(如果可能的话就在内存中进行)。当然NOW-Sort没有使MapReduce库能够广泛使用的用户定义的Map和Reduce函数。
River提供了一个编程模型,处理进程通过在分布式队列上发送数据来进行通信。像MapReduce一样,即使在不均匀的硬件或系统颠簸的情况下,River系统依然试图提供较好的平均性能。River系统通过小心的磁盘和网络传输调度来平衡完成时间。通过限制编程模型,MapReduce框架能够将问题分解成很多细颗粒的任务,这些任务在可用的工作进程上动态的调度,以至于越快的工作进程处理越多的任务。这个受限制的编程模型也允许我们在工作将要结束时调度冗余的任务进行处理,这样可以减少不均匀情况下的完成时间。
BAD-FS与MapReduce有完全不同的编程模型,不像MapReduce,它是用于在广域网下执行工作的。然而,它们有两个基本相似点。(1)两个系统都使用了重新执行的方式来处理因故障而丢失的数据。(2)两个系统都本地有限调度原则来减少网络链路上发送数据的次数。
TASCC是一个用于简化结构的高可用性的网络服务。像MapReduce一样,它依靠重新执行作为一个容错机制。
八、 总结
MapReduce编程模型已经成功的应用在Google内部的许多不同的产品上。我们将这个成功归功于几个原因。第一,模型很易用,即使对那些没有并行计算和分布式系统经验的开发者,因为它隐藏了并行处理、容错、本地优化和负载均衡这些处理过程。第二,各种各样的问题都能用MapReduce计算简单的表示出来,例如,MapReduce被Google网页搜索服务用于生成数据、排序、数据挖掘、机器学习和许多其它系统。第三,我们已经实现了扩展到由数千台机器组成的大规模集群上使用的MapReduce。这个实现能够有效的利用这些机器自由,因此适合在Google内部遇到的很多海量计算问题。
我们从这项工作中学到了几样东西。第一,限制程序模型使得并行计算和分布式计算变得容易,也容易实现这样的计算容错。第二,网络带宽是一个稀有的资源,因此我们系统中的很多优化的目标都是为了减少数据在网络上的传输次数:位置优化允许我们从本地磁盘读取数据,并将中间数据的一个拷贝写入到本地磁盘,以此来节省网络带宽的使用。第三,冗余执行能够用于减少允许速度慢的机器所造成的影响,并且能够处理机器故障和数据丢失。