作者:Jeffrey Dean,Sanjay Ghemawat
翻译:Judeshawn
摘要
MapReduce是处理和生成大数据集的程序模型和相关的实现方式。用户执行一个Map函数来处理键值对来生成中间键值对,Reduce函数将所有相同中间键的所有中间值合并在一起。许多现实世界的任务可以用这个模型来表式,本文中会进行演示。
以这种函数式方式写的程序是自动并行化的并且是在大型商业机器上执行的。运行时系统维护输入数据的分区详情,计划程序跨整个机器集群执行,处理机器故障,管理必要的机器间通信。这允许程序员在没有任何并行和分布式系统相关的经验的情况下就能轻松使用大型分布式系统的资源。
我们的MapReduce的实施是在大型的商业机器集群上,具有很高的扩展性:一个典型的MapReduce(后面简称MR)算法可以在上千台机器上处理TB级别的数据量。程序员们觉得这个系统用起来很方便:Google的集群上已经部署了上百个MR程序,并且每天有至少有一千个MR任务被执行。
1 引言
在过去的5年,作者以及许多其他人已经在Google上部署了上百个处理大量原始数据的特定目的的计算,例如爬文档、网络请求日志等等,计算出各种各样的衍生数据,例如倒排索引、网络文档中的图像结构的各种画像、计算每台机器上爬来的页的数量总和、一天内频率最高的查询等等。大多数这样的计算都是概念上很简单直接。但是输入数据量通常很大并且计算必须分布在成百上千太机器上才能在合理范围的时间内完成计算。如何分区化计算,分布数据以及故障处理等问题使得我们不得不将本来很简单的计算复杂化成大量的复杂代码来实现以解决这些问题。
鉴于它的复杂性,我们设计了一个新的抽象概念,就是能够表示我们准备执行的简单计算,但把复杂的并行化、故障容错、数据分布以及负载均衡封装成了一个库。我们的抽象概念基于Map和Reduce两个基本元素并使用Lisp以及许多其他函数式语言来表示。我们知道,大多数计算都涉及将Map操作应用于每个输入的逻辑"记录"来生成中间键值对集合,然后将Reduce操作应用于所有分享相同键的值来将这些衍生数据合理地结合起来。我们使用用户指定的Map和Reduce函数这样一种函数式模型使得我们可以轻松地将大型计算并行化,并且使用了"重执行"机制来实现故障容忍。
这项工作的主要贡献来自于一个简单和强大的接口,这个接口实现了自动化并行和大型分布式计算,再把这个接口部署到大型商业PC集群上实现高性能。
第二部分描述了基本的程序模型然后给出了几个例子。第三部分描述了一个根据我们的基于集群计算环境量身部署的MapReduce接口。第四部分描述了我们已经发现的几种有用的程序模型改良。第五部分针对我们部署的各种任务进行了性能测试。第六部分探索了MapReduce在Google内部的使用,包括我们把它作为重写产品索引系统的基础的一些经验。第七部分讨论了相关的以及未来的工作。
2 程序模型
计算接收输入的键值对集合,然后产生键值对输出集合。MR库的用户把算法用两个函数来表示:Map和Reduce。
Map,是由用户自己写的,输入一个键值对然后产生一个中间键值对集合。MR库把所有的值按照相同的中间键分组然后把它们传给Reduce函数。
Reduce函数,也是由用户自己写的,接收一个中间键I和那个键对应的值得集合。它把这些值合并形成一个可能更小的值的集合。通常一个Reduce调用只会生成零个或一个输出值。中间值通过一个迭代器提供给用户的Reduce函数。这让我们可以处理值列表太大以至于内存不够的情况。
2.1 示例
例如计算每个单词在大量文档集中出现的次数。用户应该写类似于下面这样的伪代码:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value: EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
Map函数发布每个单词及该单词的出现次数(这个简单例子中为‘1’)。Reduce函数对特定单词的数量进行求和。
另外,用户通过写代码来将输入的名字和输出文件以及优化调整参数填充到一个MR特殊对象中。用户然后调用MapReduce函数,把它传进这个特殊对象中。用户代码和MR库是链接在一起的(C++实现)。附录A包含了这个例子的完整程序文本。
2.2 类型
虽然前面的伪代码看上去是基于字符串输入和输出,但实际上,用户提供的Map和Reduce函数本身具有对应的类型:
map (k1,v1) → list(k2,v2)
reduce (k2,list(v2)) → list(v2)
也就是说,输入的键值对与输出的键值对域不同。并且中间键值对和输出键值对域相同。
我们的C++实现方法是将字符串传给用户定义函数和从用户定义函数中接收,然后交给用户代码完成字符串到合适类型间的转换
2.3 更多例子
这里有一些简单的例子,一些容易用MapReduce算法来表示的有趣的程序
分布式Grep:如果匹配到提供的格式,Map函数就输出一行。Reduce函数是一个标记函数,只是把收到的中间数据复制到输出。
URL访问频率计数:Map函数处理WEB页面请求然后输出<URL,1>.Reduce函数把所有相同URL的值加到一起,然后得到一个<URL,total count>键值对。
反向Web-link图:Map函数为每个源页面上的每个链接目标URL输出一个<target,source>键值对。Reduce函数把一个目标URL所有源URLs组合在一起生成键值对:<target,list(source)>
主机关键向量指标:关键向量就是把大多数出现在文档或文档集中最重要的单词概括成一个键值对列表<word,frequency>。Map函数为每个输入的文档生成一个<hostname,term vector>键值对(主机名从文档的URL中提取出来的)。Reduce函数接收一个给定的主机的所有单个文档的关键向量。它把这些向量加在一起,剔除掉频次少的参数对,就能得到最终的<hostname,term vector>参数对。
反向目录:Map函数解析每个文档,生成一系列<word,document ID>参数对。Reduce函数接收给定单词的所有参数对,给对应的文档ID排序,就能生一个<word,list(document ID)>参数对。所有输出的参数对形成一个简单的反向目录。很容易就可以加强这个算法来跟踪单词位置。
分布式排序:Map函数从每个记录提取键,生成<Key,record>参数对。Reduce函数生成所有参数对不变。算法基于4.1部分中描述的分区工具和4.2部分中描述的排序工具。
3 实现方法
MapReduce接口可以有许多不同的实现方法。如何选择取决于环境。例如,一种实现方法可能适合共享内存很小的机器,另一种方法可能适合大型NUMA多处理器架构,另一种甚至适合大型的网络机器集合。
这部分讲述的实现方法适用的是一种在Google中广泛使用的计算环境:通过交换以太网连接的大型商业PC集群。在我们的环境中:
-
机器通常是linux下双处理X86处理器,每台机器2-4GB内存
-
使用商业网络硬件-通常是服务器级别100M/s或1G/s,但平均下来整体的对半宽带相当小。
-
集群由成百上千个机器组成,因此机器出现故障的现象是很普遍的
-
存储是便宜的IDE磁盘直接和单台机器连接。内部开发的分布式文件系统用于管理这些磁盘上的数据。基于不可靠的硬件,文件系统使用冗余复制来提供可用性和可靠性
-
用户把job提交给一个计划任务系统。每个job由一堆任务集合组成,然后通过计划器映射到集群内可用的机器。
3.1 执行概览
通过自动地把数据分区输入到分段集合M中,从而把map调用分布在多个机器上。输入分段可以被不同的机器并行处理。Reduce调用是通过一种分区函数把中间键空间分区成分段集合R(例如,hash(key) mod R)来实现分布式的。分区(R)的数量和分区函数是通过用户指定的。
图1 执行流程
图1展示了整个我们部署的MapReduce的整个流程。当用户程序调用MR函数,接下来的一些列行为就发生了(图1中的数字标记对应下面的步骤数):
-
用户程序中的MapReduce库首先将输入文件切割成通常每片16M到64M左右的(用户可控的一个优化参数)。然后在集群中的机器中启动许多相同的程序副本
-
程序副本中有一个很特别-(master)主程序。其他的是工人(worker),主程序负责给他们分配工作。其中有M map任务和R reduce任务。主程序挑选一个空闲的worker然后分配一个map或reduce任务。
-
分配了Map任务的工人会从对应的输入分片中读取内容。它从输入数据中解析出键值对然后把每个键值对传给用户自定义的Map函数。Map函数生成的中间键值对缓存在内存中。
-
定期地,缓存的键值对被写入本地磁盘,被分区函数分区成R区。这些缓存的参数对在磁盘上的位置会记录在主程序中,它负责把这些位置信息转发给reduce工人。
-
当reduce工人被告知这些位置信息,就会使用远程过程调用来从Map工人的本地磁盘中读取缓存数据。当reduce工人读取完成所有的中间数据后,会按照中间键排序以便所有具有相同键的键值对被分成一组。排序是有必要的,因为通常情况下会有很多不同的键映射到相同的reduce任务(排序可以提升搜索效率)。如果中间数据的量太大超出了物理内存限制,就会使用外部排序。
-
Reduce工人在已经排序的中间数据上对每个第一次出现的中间键进行迭代处理,然后把键和对应的中间值集传给用户的reduce函数。
-
当所有的map任务和reduce任务完成,主程序会唤醒用户程序。这时,用户程序里的MR调用就交给了用户代码。
成功完成后,执行MR的输出可通过R输出文件使用(每个reduce任务对应一个文件,文件名是用户指定的)。通常,用户不需要将这些R输出文件合并成一个文件-他们通常把这些文件作为另一个MR调用的输入,或者通过另一个能够处理被分成多个文件的输出的分布式应用来使用它们
3.2 主程序数据结构
主程序保留了几种数据结构。对于每个Map任务和Reduce任务,它存放他们的状态(空闲,处理中或已完成), 以及工人机器的标识(非空闲任务)。
主程序是map任务把中间文件区域的位置传播给reduce任务的通道。因此,对于每个已完成的map任务,主程序存放map任务产生的R中间文件区的位置和大小。当map任务完成后,主程序会接收到这个位置和大小信息的更新。信息会增量推送给状态为处理中的reduce任务工人(没懂)
3.3 故障容忍
因为MR库的设计是为了帮助处理超大型数据量使用上百或上千台机器,因此库必须允许机器出现故障。
工人故障
主程序会定期地ping每个工人。如果在一段时间内没有收到任何来自工人的响应,主程序会把工人标记为故障。任何该工人完成的map任务都会重置最初的空闲状态,因此会变得可以被其他工人来选择完成。同理,任何map任务或者reduce任务在处理的过程中只要发生了工人故障都会被重置为空闲状态,然后可以被其他工人来重新完成。
已完成map任务需要在故障的时候重新执行是因为他们的输出存放在故障机器的本地磁盘,所以是不可访问的。完成的reduce任务不需要重新执行,因为他们的输出存放在一个全局文件系统。
当map任务首先被A工人执行然后被B工人执行(因为A出现故障了),所有执行reduce任务的工人都会被告知这个信息(重新执行信息)。任何还未从A工人读取数据的reduce任务会从B工人读取数据。
MR是可以从大范围的工人故障中恢复。例如,在一个MR操作过程中,运行中的集群的网络维护导致一组80台机器同时变得不可访问大约几分钟。MR主程序仅仅是重新执行不可访问的故障工人机器,然后继续后面的工作,最后完成MR任务。
主程序故障
让主程序写前面讲过的这种主程序数据结构的周期性检查点很容易实现。如果主程序任务死掉了,新的副本可以从最新的检查点状态启动。但是,如果只有一个节点,恢复故障就没戏了;因此,如果主程序失败,在这样的部署情况下会中支MR计算。客户端会发现这个状况然后重新尝试MR操作。
故障呈现语义
当用户提供的map和reduce操作符是关于他们输入值的确定函数,我们的分布式部署生成的输出信息和整个程序正常顺序执行产生的输出一样。
我们依赖于map和reduce任务输出的原子提交来实现这一性质。每个处理中的任务会把输出写入私有临时文件。一个Reduce任务会生成一个这样的文件,一个map任务会生成R这样的文件(每个reduce任务一个)。当一个map任务完成,工人发送一条信息给主程序,信息包括R临时文件的名字。如果主程序接收到的是map任务的完成提醒,会忽略掉这个信息。否者,会把R文件的名字记录到主程序数据结构里。
当reduce任务完成,reduce工人自动把临时输出文件重命名为最终输出文件。如果多台机器执行了相同的reduce任务,同一最终输出文件会进行多次重命名操作。我们依赖于底层文件系统重命名操作的原子性来保证最终的文件系统状态只包含reduce任务的一个操作产生的数据。
绝大多数的Map和reduce操作都是确定性的,并且我们的语义和顺序执行是一样的,这种就有利于程序员理解程序的行为。当map和reduce操作不确定,我们提供了较弱但仍算合理的语义。当出现不明确的操作符时,某个reduce任务R1的输出和R1产生的顺序执行不明确的的程序的输出是一样的。但是,另一个reduce任务R2的输出会对应于R2产生的另一个顺序执行的不明确程序。
比如说map任务M和reduce任务R1和R2。令e(R I )表示已提交的Ri 的执行(只存在一个这样的执行)。较弱的语义会出现,因为e(R1)可能已经读取了M的一个操作产生的输出并且e(R2)可能已经读取了M的另一个操作产生的输出。
3.4 位置优化
网络带宽在我们的计算环境中是一种相对稀缺的资源。输入数据存放在组成集群的机器的本地磁盘(GFS管理),利用这一点,我们节省了网络带宽。GFS将每个文件分成了64MB的块,然后将每个块的多个副本(通常3副本)放在不同的机器上。MR主程序根据输入文件的位置信息尝试在一台包含对应输入数据副本的机器上进行map任务。如果没有的话,就尝试在任务输入数据的副本附近进行map任务(例如,一个工人机器和另一个包含数据的工人机器在相同的网络交换机上)。当运行一个需要集群中大多数工人的大型MR操作是,大多数数据都是读本地数据,不会占用网络带宽。
3.5 任务粒度
我们把map任务分成了M片,把reduce任务分成了R片。理论上,M和R应该比工人机器的数量更大。让每个工人执行许多不同的任务有利于提升动态负载均衡性,并且可以加速故障工人的恢复:许多已完成的map任务可以扩散到所有其他工人机器上。
实际上在部署中M和R的数量是有限制的,因为主程序的计划决策数量为M+R数量级,内存中需要存放MR数量级种状态。(内存使用的固定部分很少,但是:状态的O(MR)个片中的每个map/reduce任务对是由一个字节组成)
并且,R经常是受用户限制,因为每个reduce任务最终会形成一个独立的输入文件。实际上,我们倾向于选择M值使得每个独立的任务大概16MB到64MB的输入数据(以使上面提到的位置优化效率达到最高),R选择为工人机器的几倍就行了。我们通常使用M=200000,R=5000,2000个工人机器来进行MR计算。
3.6 备用任务
导致MR操作总时间延长最常见的原意之一是‘落伍者’:一台需要非常多的时间来完成最后些许map或reduce任务的机器。出现落伍者原因有很多。例如,一台机器的磁盘有问题,也许会出现频繁的可校验错误,导致读性能从30M/s降至1M/s。集群计划系统可能已经在机器上安排了其他任务,由于CPU、内存、本地磁盘或者网络带宽等争用导致执行MR代码更慢。最近我们经常遇到的问题是机器初始化代码中的一个bug,导致处理器缓存被禁用了:导致受影响的机器的速度降低了很多。
我们有一个通用的方案来减缓落伍者的影响。当MR操作快要完成时,主程序为剩下的任务安排备用任务。只要主要任务或备用任务中的其中一个完成,任务就会被标记为完成。我们已经优化了这个方案以使它增加的计算资源消耗通常只有几个百分点。我们已经发现这可以明显减少完成大型MR操作的时间。。例如,5.3中提到的排序程序仅需原来时间的44%就能完成。
4 改进
尽管只需要编写Map和Reduce函数提供的基本功能就满足大多数需求,我们还是开发了一些有用的扩展。这一部分会进行说明。
4.1 分区函数
MapReduce的用户指定需要的reduce任务/输出文件数目(R)。通过对中间键使用分区函数来实现跨任务的数据分区。默认的分区函数使用的哈希函数(例如,hash(key) mod R).这样使得分区结果相当均匀。但在一些情况下,使用其他分区函数会更合适一些。例如,输出的键是URLs,我们想要所有的相同主机的条目最终输出在同一输出文件。为了支持这种情况,MR库的用户可以使用一种特殊的分区函数。例如使用“hash(Hostname(rulkey)) mode R”作为分区函数可以让所有同一主机的URLs输出在相同的文件中。
4.2 排序保证
我们可以保证给定分区内中间键值对是按照键的递增顺序处理。这种排序保证使得每个输出分区文件内的数据的序列存放可以轻松完成,这对于输出文件格式需要支持按照键有效随机访问或者是基于用户希望输出数据是有序的需求,都是很有用的。
4.3 组合器函数
在某些情况下,每个map任务产生的中间键存在大量重复的情况,并且用户指定的reduce函数是可交换的、相关的(没懂)。较好的一个例子是2.1中讲到的单词计数。因为单词频率倾向于服从zipf分布,每个map任务会处理成千上万种形如<the,1>这样的记录。所有这些计数会通过网络被发送到某个单独的reduce任务中,然后被reduce函数汇总生成一个数字。我们允许用户使用一个优化的连接器函数,在通过网络发送之前局部合并这些数据。
组合器函数在执行map任务的每台机器上被执行。一般来说,实现组合器和reduce函数的代码是相同的。唯一的区别在于,MR库如何处理函数的输出。Reduce函数写入最终的输出文件。组合器函数输出到会被发送给reduce任务的中间文件。
局部组合大幅提升了某些类型的MR操作。附录A包含一个使用组合器的示例。
4.4 输入和输出类型
MR库提供支持读几种不同格式的输入数据。例如,“text”模式输入把每行看做一个键值对:键是文件文件的偏移量值是行的内容。另一种常用的支持格式是按照键的顺序生成一个键值对序列。每种输入类型在使用的时候都知道如何把自己分裂成有意义的区间块用于单独的Map任务处理(例如,text模式的区间块分裂确保只会发生在行与行之间)。用户可以自定义添加新的类型,只需要通过部署一个简单reader接口,尽管大多数用户只使用其中少量预定义的输入类型。
Reader没有必要非得从一个文件里读数据。例如,可以轻松定义一个reader从数据库里读记录,或者从内存中读数据结构。
以类似的方式,我们也支持一些不同格式的数据处理的输出类型,并且同样可以轻松地使用用户代码来添加新的输出类型支持。
4.5 副产物
在某些情况下,MR用户觉得生成附加文件作为map或reduce操作的额外输出很方便。我们使用应用写入器让这样的副产物具有原子性和幂等性。通常应用一旦完全写入临时文件就会自动重命名这个文件。
我们没有提供单个任务产生多个输出文件的两段提交的原子性支持。因此,应该确定任务产生的多个输出文件具有跨文件一致性的需求。这个限制在实际使用中还没出现过问题。
4.6 跳过坏记录
有时候用户代码中存在bug使得Map或Reduce函数在处理某些记录时会崩溃。这个bug让MR操作没法完成。常用的作法是修复bug。但有时这并不可行;也许这个bug是第三方库的源代码不可用引起的。有时忽略少量记录是可接受的,例如,在做大数据集的统计分析时。我们提供了一种优化的执行模式,MR库会准确探测崩溃的记录然后跳过这些记录继续执行后面的任务。
每个工人进程上都安装了一个信号处理器来捕捉内存段异常和总线错误。在调用用户Map或Reduce操作之前,MR库把变量的序列号存放在一个全局变量中。如果用户代码生成一个信号,信号处理器就发送一个包含序列号的“最后的喘息”UDP包给MR主程序。当主程序在一个特定记录上发现了超过一次故障,就表示记录在下一次被map或reduce任务重执行时会被跳过。
4.7 本地执行
Map或Reduce函数中的问题调试可能会很棘手,因为实际的计算是发生在分布式系统,通常是在几千台机器上,工作分配由主程序动态决策的。为了帮组促进调试,性能分析和小范围的测试,我们已经开发了一个可选部署方式,让MR库在本地顺序执行所有MR操作。用户可以控制计算限制在特定的map任务。用户使用特殊的标志来调用他们的程序并且可以轻松的使用任何有用的调试或测试工具(例如,GDB,一种UNIX程序调试工具)
4.8 状态信息
主程序运行一个内部的HTTP服务器然后导出一个状态页面集用于人们查看。状态页面展示了计算的进度,例如已经完成了多少个任务,多少个正在处理,输入了多少字节,中间数据多少字节,输出了多少字节,处理速率,等等。页面也包含了每个任务的标准错误和标准输出文件的链接。用户可以使用这个数据来预测还有多久计算能完成,以及是否需要添加更多的资源给计算。这些页也可以用于找出计算速度异常缓慢的时间点。
另外,顶级状态页显示了哪些工人已经失败了,以及失败出现在哪些map和reduce任务上。这个信息对于诊断用户代码bug很有用。
4.9 计数器
MR库提供了一个计数装置来统计各种事件的出现次数。例如,用户代码可能想要统计总共处理的单词数目,或者是德语文档的索引数目,等等
想要使用这个装置,用户代码需要创建一个命名的计数器对象然后在Map或Reduce函数中合理地实现自增。例如:
Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");
单个工人机器上的计数器值会定期地传播给主程序。主程序会在MR操作完成时把所有的成功执行的Map或Reduce任务的计数器的值汇总然后返还给用户代码。当前的计数器值也会在主程序状态页显示,这样我们就能看到实时计算的进度。当汇总计数器的值的时候,主程序会消除某些重复执行的map或reduce任务的影响,避免多次计数。(重复执行可能来自于备用任务和一些因为故障而重新执行的任务)
一些计数器的值自动由MR库维护,例如已经处理的输入键值对的数量和产生的输出键值对的数量。
用户已经发现计数器工具对于清楚地查看MR操作的行为很有用。例如,在某些MR操作中,用户代码可能想要确认输出键值对的数量和输入键值对的数量是否准确一致,或者是确认德语文档的占总文档数量的比例在某个可容忍的比例范围内。
5 性能
在这个部分,我们在两个运行在大型机器集群上运行了两个计算来测算MR的性能。一个计算是是在大约1T的数据中匹配某种符合模式的数据。另一个计算是对大约一个T的数据进行排序。
这两个程序可以代表用户编写的大型MR实际程序的其中一部分 – 一类程序是对数据进行洗牌,另一类是从大数据集中提取出感兴趣的少量数据。
5.1 集群配置
所有程序在一个由大约1800台机器组成的集群上执行。每个机器有2颗启用了超线程的2GHz Intel Xeon处理器,4G内存,两个160GB IDE磁盘,以及1G带宽的以太网。机器是按照二级树状交换网络结构部署的,大约100-200Gbps的根节点聚合带宽。所有机器都是在同一个代管设备中,因此任何两台机器之间的响应时间都是低于1毫秒的。
4GB内存中,大约会暴露1-1.5GB内存用于集群中的其他任务。程序是在周末下午执行的,那时的CPU,磁盘和网络大多数都是空闲的。
5.2 Grep(筛选)
Grep程序在100亿条100字节长度的记录中搜索相对来说很少的3个字符的模式(这种模式出现在92337条记录中)。输入被分成了大约64M每片(M=15000),全部输出都放在一个文件里面(R=1)。
图2 数据传输率随时间变化
图2展示了随着时间变化的计算进度。Y轴表示输入数据被扫描的速率。随着越来越多的机器被分配了MR计算任务,这个速率逐渐上升,峰值超过了30GB/s,这时已有1764个工人被分配了任务。当map任务完成,速率开始下降,直到80秒左右下降至0。整个计算过程从开始到结束总共花费了将近150秒。这包含了大概一分钟用于启动消耗。这块消耗是为了将程序传播给所有的工人机器,以及和GFS交互打开1000个输入文件和获取位置优化所需信息的延迟。
5.3 排序
排序程序排序100亿每条100字节的记录(大约1T数据量)。这个程序模仿的是TeraSort 算法benchmark。
排序程序由不超过50行用户代码组成。3行map函数用于从一个文本行提取10字节的排序键然后发送键,原来的文本行作为中间键值对。我们用一个内置的标识函数作为Reduce操作。这个函数保持中间键值不变作为输出键值对。最终排序输出写入一个2个副本的GFS文件集合(也就是说,会有2T的文件所谓程序的输出)。
在这之前,输入数据被分成了64MB的片(M=15000)。我们将排好序的输出放入了4000个文件(R=4000)。分区函数按照键的前面几个字节来将每个键值对划分到其中一个R片。
我们这次基准测试的分区函数已经内置了键的分布式识别功能。在常规的排序程序中,我们会添加一个MR预计算操作来收集键的样本然后使用样本键分布来计算出最终的排序计算的使用的分裂点。
图3 排序程序在不同状况下执行的数据传输率随时间变化
图3(a)展示了排序程序正常执行的进度。左上角的图展示了读取输入的速率。峰值在13GB/s左右,200s之前就已经完成了所有的map任务。注意到输入速率比grep要慢很多。这是因为排序map任务花费了大约一半的时间和IO带宽将中间数据输出到本地磁盘。对应的grep的中间数据输出可以忽略不计。
中间最左边的图展示了数据通过网络从map任务发送到reduce任务的速率。这次转换是从第一个map任务完成开始的。第一个波峰是第一次大约1700个reduce任务同时运行(整个MR分配给了大约1700tail机器,每个机器每次最多执行一个reduce任务)。计算到大概300秒左右,第一批reduce任务中的一部分已经完成,然后开始转换剩下的reduce任务数据。计算到600s左右,所有的数据转换都已经完成了。
左边最下面的图展示了排序完成的数据被reduce入任务写入输出文件的速率。第一次转换数据完成和开始输出数据的时间点之间存在延迟,是因为机器正忙于对中间数据进行排序。写操作以大概2-4G/s速率持续了一会儿。所有的写操作在850s左右完成了。包括启动消耗,整个计算耗费了891秒。这和TereSort benchmark的最快的结果1057秒很靠近。
需要注意的事情:输入速率比转换速率和输出速率都要高很多,是因为位置优化-大多数数据都是从本地磁盘,绕过了带宽相对局限的网络。转换率比输出率高,因为输出阶段会写把排好序的数据写入两个副本(是基于可靠性和可用性的考虑)。我们写入两个副本是因为底层文件系统提供的可靠性和可用性机制。如果底层文件系统使用的是纠删码而不是复制,写数据的网络带宽需求会降低。
5.4 备用任务的影响
在图3(b)中,我们展示了一个在没有启用备用任务的情况下排序程序的执行情况。执行过程和图3(a)中的类似,只是执行过程存在很长的结尾,几乎没有任何写入活动发生。960秒后,只剩下5个reduce任务还没完成。但是这些落伍者挣扎了300秒才完成。整个计算耗费了1283秒,时间消耗增加了44%。
5.5 机器故障
在图3(4)中,我们展示了排序程序在进行到几分钟时故意杀掉1746个工人中的200个工人,这种情况下的执行情况。底层集群计划器立刻重启了新的工人(因为只有进程被杀掉了,机器仍然能够正常运行)
工人的死导致了输入速率出现了负数,因为前面完成的map工作会消失(因为map工人被杀掉了)并且需要重新完成。这些map工作的重新执行相对来说会快点。整个计算在933秒的时候完成了,包括启动消耗(相对于正常执行,只增加了5%的时间)
6 经验
我们在2003年2月写了MapReduce的第一版,然后在2003年的8月做了大量加强,包括位置优化,任务跨工人机器动态负载均衡,等等。从那以后,我们惊喜地看到MapReduce库对于解决我们遇到的各种问题上的应用竟如此广泛。它已经在谷歌内部大范围的被使用,包括:
- 大型的机器学习问题
- Google新闻和Froogle产品(google的购物搜索)的聚类问题
- 用于热搜产品报告的数据提取(例如,Google Zeitgeist)。
- 新试验和产品Web网页的属性提取(例如,从位置搜索网页大型语料库提取地理位置)
- 大型图片计算
图4 Mapreduce实例数量变化趋势
图4展示了随着时间注册进我们的主要源代码管理系统的独立MR数量的上升趋势,从2003年的0到2004年的900.。MR已经应用得如此成功主要是因为它让我们只需要花半个小时就能完成写简单的程序并且在上千台机器上运行,大大地减少了部署和成型的周期。并且,它能让没有任何分布式或并行系统相关经验的程序员轻松开采出大量资源。
表1 2004年8月运行的某个job的统计
在每个job的结尾,MR库日志统计了job关于计算资源的统计。在表1中,我们展示了2004年8月google运行的某个job的统计。
6.1 大型索引
产品索引系统用于生产Google web搜索服务所需的数据结构,是目前为止MR最重要的应用之一。索引系统以爬网系统获取到的文档集合作为输入源,存入GFS文件集。这些文档的净内容量就超过了20T。索引进程运行了连续的5-10个MR操作。使用MapReduce(而非索引系统先前使用的ad-hoc distributed passes)的几点好处是:
-
索引代码简单,小巧,容易理解,因为代码在MR库中就已经解决了故障容忍,分布式和并行化等问题。例如,计算段落长度的算法代码从C++的3800行变成了MapReduce代码的700行。
-
MR库的性能足够好,因为我们可以让概念上无关的计算分离,而非混杂在一起计算,避免了额外的数据交换。这让我们可以轻松地变更索引进程。例如,在旧系统一个变更可能需要耗费几个月,而现在只需要几天。
-
索引进程变得更容易操作,因为机器故障、慢机器和网络波动等引起的大多数问题都会被MR进程自动解决,不需要人为干涉。并且,如果想提升索引处理的性能,给索引系统添加新机器也很方便。
7 相关工作
许多系统已经提供了受限制的编程模型,使用自动并行化的限制。例如,一个结合函数(associative function)可以使用并行前缀算法[6,9,13]通过在N个处理器上记录N次来在一个N元素矩阵的所有前缀上进行计算.MR可以被看作是我们基于实际大型现实世界计算的一些模型的简化版和精华版。更重要的是,我们提供了故障容忍部署,因此可以扩展到上个CPU。相比之下,大多数并行处理系统只能较小规模地部署,并且把处理机器故障的细节都交给了程序员。
批量同步进程[17]和一些MPI基元[11]提供了高度抽象化使得程序员写程序变得更容易。这些系统和MapReduce的关键区别在于MapReduce利用一种受限制的编程模型来自动并行化用户代码并且提供了效果显著的故障容忍。
我们的位置优化的灵感来自于活动磁盘[12.15]这样的技术,计算倾向于处理接近本地磁盘的元素,减少了跨IO子系统或网络的数据传输量。我们少量的磁盘是直接和商业处理器连接运行的,而不是直接和磁盘控制器的处理器连接运行,但常规的方法都差不多。
我们的备用任务的机制类似于Charlotte System[3]中用到的eager调度机制。简单eager调度机制的一个缺点是,如果某个不断地出现故障,整个计算就不能完成。我们修复了这个问题的其中一个方面,就是通过跳过坏记录的机制。
MapReduce部署依赖于一个负责在大型共享机器集群上分配和运行用户任务的内部集群管理系统。但不是本文讨论的重点,集群管理系统类似于其他像Condor[16]这样的系统。
排序组件是MapReduce库的一部分,类似于NOW-sort[1].源机器(Map工人)把要分区的数据进行排序然后发送到R个reduce工人中一个。每个reduce工人在本地排序数据(尽可能在内存中)。当然NOW-sort没有用户定义的Map和Reduce那样的可以让我们的库广泛应用的函数。
River[2]提供了一个通过分布式队列发送数据来处理相互通信的编程模型。和MapReduce一样,River系统试图提供较为均衡的性能,即使出现由于异构硬件和系统波动导致的不均匀。River通过仔细规划磁盘和网络传输来实现完成时间的平衡。通过受限制的编程模型,MapReduce框架就能够把问题分区成大量细粒度的任务。这些任务会在可用的工人机器上动态规划,让更快的工人处理更多的任务。受限制的程序模型也让我们在job快结束时规划任务的冗余执行,这大量减少了由于不均匀现象带来的额外完成时间(例如慢或卡的工人)。
BAD-FS[5]有一个和MapReduce非常不一样的程序模型,不像MapReduce,目的是在执行跨整个广域网的job。但是,有两个基本的相似点。(1)两个系统都使用冗余执行来恢复故障丢失的数据。(2)两者都使用智能位置计划来减少网络连接稀缺资源的数据传输量。
TACC[7]是一个用于简化高可用性网络服务结构的系统。类似于MapReduce,使用重执行机制来实现故障容忍。
8 结论
MapReduce程序模型已经成功地应用于Google的许多各种不同的用途。我们把它的成功归功于一下几个因素。首先,模型使用起来很简单,即使没有任何并行和分布式经验的程序员也一样,因为它把并行化、故障容忍、位置优化、以及负载均衡都封装起来了。其次,大量各种各样的问题都可以轻松地用MapReduce计算来表式。例如,MapReduce用于Google的产品web搜索服务的数据生成,排序、数据挖掘、机器学习和很多其他系统。第三,我们已经开发了一个能够扩展到上千台机器组成的大型集群的MapReduce部署。这样的部署充分利用了这些机器资源并且因此适用于大量Google面临的计算问题。
我们已经从这项工作中学会了很多。首先,限制编程模型,让并行和分布计算变得简单,并且让计算容忍故障。第二,网络带宽是稀缺的资源。大量的系统优化目的在于减少跨网络的数据传输:位置优化让我们能从本地磁盘读数据,并且在本地磁盘上写一份中间数据,节约了带宽。第三冗余执行可用于减少慢机器的影响,还有处理机器故障和数据丢失。
致谢
Josh Levenberg基于对MapReduce和使用经验和其他人的改善建议,对于很多用户级别MapReduce API的新功能的转换和扩展起到了关键性作用。MapReduce对Google文件系统[8]进行读写。我们要感谢Mohit Aron,Howard Gobioff,Markus Gutschke,David Kramer,Shun-Tak Leung,以及Josh Redstone在开发GFS上做出的努力。我们也应该感谢Percy Liang和Olcan Sercinoglu在开发MapReduce使用的集群管理系统上做出的努力。Mike Burrows,Wilson Hsieh,Jlsh Levenberg,Sharon Perl,Rob Pike,以及Debby Wallach所提供的对本文的较早草稿版本的有用批注。不知名的OSDI评审员,和我们的指导员Eric Brewer,对于论文的改善,提供了许多各方面有用的建议。最后,我们要感谢所有Google工程师团队内的所有MapReduce用户提供的有用反馈,建议和Bug报告。
参考文献
[1] Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, DavidE.Culler,JosephM.Hellerstein,andDavidA.Patterson. High-performance sorting on networks of workstations. In Proceedings of the 1997 ACM SIGMOD International Conference on Management of Data, Tucson, Arizona, May 1997.
[2] Remzi H. Arpaci-Dusseau, Eric Anderson, Noah Treuhaft, David E. Culler, Joseph M. Hellerstein, David Patterson, and Kathy Yelick. Cluster I/O with River: Makingthefastcasecommon. InProceedingsoftheSixth Workshop on Input/Output in Parallel and Distributed Systems (IOPADS ’99), pages 10–22, Atlanta, Georgia, May 1999.
[3] Arash Baratloo, Mehmet Karaul, Zvi Kedem, and Peter Wyckoff. Charlotte: Metacomputing on the web. In Proceedings of the 9th International Conference on Parallel and Distributed Computing Systems, 1996.
[4] Luiz A. Barroso, Jeffrey Dean, and Urs H¨olzle. Web searchforaplanet: TheGoogleclusterarchitecture. IEEE Micro, 23(2):22–28, April 2003.
[5] John Bent, Douglas Thain, Andrea C.Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, and Miron Livny. Explicit control in a batch-aware distributed file system. In Proceedings of the 1st USENIX Symposium on Networked Systems Design and Implementation NSDI,March 2004.
[6] Guy E. Blelloch. Scans as primitive parallel operations. IEEE Transactions on Computers, C-38(11), November 1989.
[7] *Armando Fox, Steven D. Gribble, Yatin Chawathe, Eric A. Brewer, and Paul Gauthier. Cluster-based scalable network services. In Proceedings of the 16th ACM Symposium on Operating System Principles, pages 78– 91, Saint-Malo, France,1997.
[8] Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The Google file system. In 19th Symposium on Operating Systems Principles, pages 29–43, Lake George, New York, 2003.
[9] S. Gorlatch. Systematic efficient parallelization of scan and other list homomorphisms. In L. Bouge, P. Fraigniaud, A. Mignotte, and Y . Robert, editors, Euro-Par’96. Parallel Processing, Lecture Notes in Computer Science 1124, pages 401–408. Springer-Verlag, 1996.
[10] Jim Gray. Sort benchmark home page. http://research.microsoft.com/barc/SortBenchmark/.
[11] William Gropp, Ewing Lusk, and Anthony Skjellum. Using MPI: Portable Parallel Programming with the Message-Passing Interface. MITPress,Cambridge, MA, 1999.
[12] L.Huston,R.Sukthankar, R.Wickremesinghe,M.Satyanarayanan,G.R.Ganger,E.Riedel,andA.Ailamaki. Diamond: A storage architecture for early discard in interactive search. In Proceedings of the 2004 USENIX File and Storage Technologies FAST Conference, April2004.
[13] Richard E. Ladner and Michael J.Fischer. Parallelprefix computation. Journal of the ACM,27(4):831–838, 1980.
[14] Michael O. Rabin. Efficient dispersal of information for security, load balancing and fault tolerance. Journal of the ACM,36(2):335–348, 1989.
[15] Erik Riedel, Christos Faloutsos, Garth A. Gibson, and David Nagle. Active disks for large-scale data processing. IEEE Computer, pages 68–74, June 2001.
[16] Douglas Thain, Todd Tannenbaum, and Miron Livny. Distributed computing in practice: The Condor experience. Concurrency and Computation: Practice and Experience, 2004.
[17] L.G.Valiant. Abridgingmodelforparallelcomputation. Communications of the ACM,33(8):103–111, 1997.
[18] Jim Wyllie. Spsort: How to sort a terabyte quickly. http://alme1.almaden.ibm.com/cs/spsort.pdf.
附录A 文字频率
这一部分包含了一个程序代码用于计算每个输入的唯一单词在输入文件集中出现的次数。
#include "mapreduce/mapreduce.h"
// User’s map function
class WordCounter : public Mapper {
public: virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
// Skip past leading whitespace
while ((i < n) && isspace(text[i]))
i++;
// Find word end
int start = i;
while ((i < n) && !isspace(text[i]))
i++;
if (start < i)
Emit(text.substr(start,i-start),"1");
}
}
};
REGISTER_MAPPER(WordCounter);
// User’s reduce function
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
// Iterate over all entries with the
// same key and add the values
int64 value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
// Emit sum for input->key()
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// Store list of input files into "spec"
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format("text");
input->set_filepattern(argv[i]);
input->set_mapper_class("WordCounter");
}
// Specify the output files:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// ...
MapReduceOutput* out = spec.output();
out->set_filebase("/gfs/test/freq");
out->set_num_tasks(100);
out->set_format("text"); out->set_reducer_class("Adder");
// Optional: do partial sums within map
// tasks to save network bandwidth out->set_combiner_class("Adder");
// Tuning parameters: use at most 2000
// machines and 100 MB of memory per task
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);
// Now run it
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();
// Done: ’result’ structure contains info
// about counters, time taken, number of
// machines used, etc.
return 0;
}











网友评论