现在你可能有一个疑问 :为什么MapReduce会被取代?今天我将重点为你解答。
高昂的维护成本
使用MapReduce,你需要严格地遵循分步的Map和Reduce步骤。当你构造更为复杂的处理架构时,往往需要协调多个Map和多个Reduce任务。
而现实的MapReduce系统的复杂度是超过了“伪专家”的认知范围的。下面我来举个例子,告诉你MapReduce有多复杂。
想象一下这个情景,你的公司要预测美团的股价,其中一个重要特征是活跃在街头的美团外卖电动车数量,而你负责处理所有美团外卖电动车的图片。
在真实的商用环境下,为了解决这个问题,你可能至少需要10个MapReduce任务:

首先,我们需要搜集每日的外卖电动车图片。
数据的搜集往往不全部是公司独自完成,许多公司会选择部分外包或者众包。所以在数据搜集(Data collection)部分,你至少需要4个MapReduce任务:
数据导入(data ingestion):用来把散落的照片(比如众包公司上传到网盘的照片)下载到你的存储系统。
数据统一化(data normalization):用来把不同外包公司提供过来的各式各样的照片进行格式统一。
数据压缩(compression):你需要在质量可接受的范围内保持最小的存储资源消耗 。
数据备份(backup):大规模的数据处理系统我们都需要一定的数据冗余来降低风险。
仅仅是做完数据搜集这一步,离真正的业务应用还差得远。
真实的世界是如此不完美,我们需要一部分数据质量控制(quality control)流程,比如:
数据时间有效性验证 (date validation):检测上传的图片是否是你想要的日期的。
照片对焦检测(focus detection):你需要筛选掉那些因对焦不准而无法使用的照片。
最后才到你负责的重头戏——找到这些图片里的外卖电动车。而这一步因为人工的介入是最难控制时间的。你需要做4步:
数据标注问题上传(question uploading):上传你的标注工具,让你的标注者开始工作。
标注结果下载(answer downloading):抓取标注完的数据。
标注异议整合(adjudication):标注异议经常发生,比如一个标注者认为是美团外卖电动车,另一个标注者认为是京东快递电动车。
标注结果结构化(structuralization): 要让标注结果可用,你需要把可能非结构化的标注结果转化成你的存储系统接受的结构。
通过这个案例,我想要阐述的观点是,因为真实的商业MapReduce场景极端复杂,像上面这样10个子任务的MapReduce系统在硅谷一线公司司空见惯。
在应用过程中,每一个MapReduce任务都有可能出错,都需要重试和异常处理的机制。所以,协调这些子MapReduce的任务往往需要和业务逻辑紧密耦合的状态机。
这样过于复杂的维护让系统开发者苦不堪言。
时间性能“达不到”用户的期待
除了高昂的维护成本,MapReduce的时间性能也是个棘手的问题。
MapReduce是一套如此精巧复杂的系统,如果使用得当,它是青龙偃月刀,如果使用不当,它就是一堆废铁。不幸的是并不是每个人都是关羽。
在实际的工作中,不是每个人都对MapReduce细微的配置细节了如指掌。
在现实中,业务往往需求一个刚毕业的新手在3个月内上线一套数据处理系统,而他很可能从来没有用过MapReduce。这种情况下开发的系统是很难发挥好MapReduce的性能的。
同时,我们也提到了2008年诞生在Google西雅图研发中心的FlumeJava,它成为了Google内部的数据处理新宠。
那么,为什么是它扛起了继任MapReduce的大旗呢?
要知道,在包括Google在内的硅谷一线大厂,对于内部技术选择是非常严格的,一个能成为默认方案的技术至少满足以下条件:
经受了众多产品线,超大规模数据量例如亿级用户的考验;
自发地被众多内部开发者采用,简单易用而受开发者欢迎;
能通过内部领域内专家的评审;
比上一代技术仅仅提高10%是不够的,必须要有显著的比如70%的提高,才能够说服整个公司付出技术迁移的高昂代价。就看看从Python 2.7到Python 3的升级花了多少年了,就知道在大厂迁移技术是异常艰难的。
我想先和你一起设想一下,假如我和你站在2008年的春夏之交,在已经清楚了MapReduce的现有问题的情况下,我们会怎么设计下一代大规模数据处理技术,带领下一个十年的技术革新呢?
我们需要一种技术抽象让多步骤数据处理变得易于维护
上一讲中我提到过,维护协调多个步骤的数据处理在业务中非常常见。
像图片中这样复杂的数据处理在MapReduce中维护起来令人苦不堪言。
为了解决这个问题,作为架构师的我们或许可以用有向无环图(DAG)来抽象表达。因为有向图能为多个步骤的数据处理依赖关系,建立很好的模型。如果你对图论比较陌生的话,可能现在不知道我在说什么,你可以看下面一个例子,或者复习一下极客时间的《数据结构与算法之美》。

西红柿炒鸡蛋这样一个菜,就是一个有向无环图概念的典型案例。
比如看这里面番茄的处理,最后一步“炒”的步骤依赖于切好的番茄、打好的蛋、热好的油。而切好的番茄又依赖于洗好的番茄等等。如果用MapReduce来实现的话,在这个图里面,每一个箭头都会是一个独立的Map或Reduce。
为了协调那么多Map和Reduce,你又难以避免会去做很多检查,比如:番茄是不是洗好了,鸡蛋是不是打好了。
最后这个系统就不堪重负了。
但是,如果我们用有向图建模,图中的每一个节点都可以被抽象地表达成一种通用的数据集,每一条边都被表达成一种通用的数据变换。如此,你就可以用数据集和数据变换描述极为宏大复杂的数据处理流程,而不会迷失在依赖关系中无法自拔。
我们不想要复杂的配置,需要能自动进行性能优化
上一讲中提到,MapReduce的另一个问题是,配置太复杂了。以至于错误的配置最终导致数据处理任务效率低下。
这种问题怎么解决呢?很自然的思路就是,如果人容易犯错,就让人少做一点,让机器多做一点呗。
我们已经知道了,得益于上一步中我们已经用有向图对数据处理进行了高度抽象。这可能就能成为我们进行自动性能优化的一个突破口。
回到刚才的番茄炒鸡蛋例子,哪些情况我们需要自动优化呢?
设想一下,如果我们的数据处理食谱上又增加了番茄牛腩的需求,用户的数据处理有向图就变成了这个样子了。

理想的情况下,我们的计算引擎要能够自动发现红框中的两条数据处理流程是重复的。它要能把两条数据处理过程进行合并。这样的话,番茄就不会被重复准备了。
同样的,如果需求突然不再需要番茄炒蛋了,只需要番茄牛腩,在数据流水线的预处理部分也应该把一些无关的数据操作优化掉,比如整个鸡蛋的处理过程就不应该在运行时出现。
另一种自动的优化是计算资源的自动弹性分配。
比如,还是在番茄炒蛋这样一个数据处理流水线中,如果你的规模上来了,今天需要生产1吨的番茄炒蛋,明天需要生产10吨的番茄炒蛋。你发现有时候是处理1000个番茄,有时候又是10000个番茄。如果手动地去做资源配置的话,你再也配置不过来了。
我们的优化系统也要有可以处理这种问题的弹性的劳动力分配机制。它要能自动分配,比如100台机器处理1000个番茄,如果是10000个番茄那就分配1000台机器,但是只给热油1台机器可能就够了。
这里的比喻其实是很粗糙也不精准的。我想用这样两个例子表达的观点是,在数据处理开始前,我们需要有一个自动优化的步骤和能力,而不是按部就班地就把每一个步骤就直接扔给机器去执行了。
我们要能把数据处理的描述语言,与背后的运行引擎解耦合开来
前面两个设计思路提到了很重要的一个设计就是有向图。
用有向图进行数据处理描述的话,实际上数据处理描述语言部分完全可以和后面的运算引擎分离了。有向图可以作为数据处理描述语言和运算引擎的前后端分离协议。
举两个你熟悉的例子可能能更好理解我这里所说的前后端分离(client-server design)是什么意思:
比如一个网站的架构中,服务器和网页通过HTTP协议通信。
比如在TensorFlow的设计中,客户端可以用任何语言(比如Python或者C++)描述计算图,运行时引擎(runtime) 理论上却可以在任何地方具体运行,比如在本地,在CPU,或者在TPU。
那么我们设计的数据处理技术也是一样的,除了有向图表达需要数据处理描述语言和运算引擎协商一致,其他的实现都是灵活可拓展的。
比如,我的数据描述可以用Python描述,由业务团队使用;计算引擎用C++实现,可以由数据底层架构团队维护并且高度优化;或者我的数据描述在本地写,计算引擎在云端执行。
我们要统一批处理和流处理的编程模型
关于什么是批处理和流处理概念会在后面的章节展开。这里先简单解释下,批处理处理的是有界离散的数据,比如处理一个文本文件;流处理处理的是无界连续的数据,比如每时每刻的支付宝交易数据。
MapReduce的一个局限是它为了批处理而设计的,应对流处理的时候不再那么得心应手。即使后面的Apache Storm、Apache Flink也都有类似的问题,比如Flink里的批处理数据结构用DataSet,但是流处理用DataStream。
但是真正的业务系统,批处理和流处理是常常混合共生,或者频繁变换的。
比如,你有A、B两个数据提供商。其中数据提供商A与你签订的是一次性的数据协议,一次性给你一大波数据,你可以用批处理。而数据提供商B是实时地给你数据,你又得用流处理。更可怕的事情发生了,本来是批处理的数据提供商A,突然把协议修改了,现在他们实时更新数据。这时候你要是用Flink就得爆炸了。业务需求天天改,还让不让人活了?!
因此,我们设计的数据处理框架里,就得有更高层级的数据抽象。
不论是批处理还是流处理的,都用统一的数据结构表示。编程的API也需要统一。这样不论业务需求什么样,开发者只需要学习一套API。即使业务需求改变,开发者也不需要频繁修改代码。
我们要在架构层面提供异常处理和数据监控的能力
真正写过大规模数据处理系统的人都深有感触:在一个复杂的数据处理系统中,难的不是开发系统,而是异常处理。
事实正是如此。一个Google内部调研表明,在大规模的数据处理系统中,90%的时间都花在了异常处理中。常常发生的问题的是,比如在之前的番茄炒鸡蛋处理问题中,你看着系统log,明明买了1000个鸡蛋,炒出来的菜却看起来只有999个鸡蛋,你仰天长叹,少了一个蛋到底去哪里了!
这一点和普通的软件开发不同。比如,服务器开发中,偶尔一个RPC请求丢了就丢了,重试一下,重启一下能过就行了。可如果在数据处理系统中,数据就是钱啊,不能随便丢。比如我们的鸡蛋,都是真金白银买回来的。是超市买回来数错了?是打蛋时候打碎了?还是被谁偷吃了?你总得给老板一个合理的交代。
我们要设计一套基本的数据监控能力,对于数据处理的每一步提供自动的监控平台,比如一个监控网站。
在番茄炒蛋系统中,要能够自动的记录下来,超市买回来是多少个蛋,打蛋前是多少个蛋,打完蛋是多少个蛋,放进锅里前是多少个蛋等等。也需要把每一步的相关信息进行存储,比如是谁去买的蛋,哪些人打蛋。这样出错后可以帮助用户快速找到可能出错的环节。
小结
通过上面的分析,我们可以总结一下。如果是我们站在2008年春夏之交来设计下一代大规模数据处理框架,一个基本的模型会是图中这样子的:

但是这样粗糙的设计和思想实验离实现还是太远。你可能还是会感到无从下手。
后面的章节会给你补充一些设计和使用大规模数据处理架构的基础知识。同时,也会深入剖析两个与我们这里的设计理念最接近的大数据处理框架,Apache Spark和Apache Beam。
今天我要与你分享的主题是“怎样实现大型电商热销榜”。
我在Google面试过很多优秀的候选人,应对普通的编程问题coding能力很强,算法数据结构也应用得不错。
可是当我追问数据规模变大时该怎么设计系统,他们却说不出所以然来。这说明他们缺乏必备的规模增长的技术思维(mindset of scaling)。这会限制这些候选人的职业成长。
因为产品从1万用户到1亿用户,技术团队从10个人到1000个人,你的技术规模和数据规模都会完全不一样。
今天我们就以大型电商热销榜为例,来谈一谈从1万用户到1亿用户,从GB数据到PB数据系统,技术思维需要怎样的转型升级?
同样的问题举一反三,可以应用在淘宝热卖,App排行榜,抖音热门,甚至是胡润百富榜,因为实际上他们背后都应用了相似的大规模数据处理技术。
真正的排序系统非常复杂,仅仅是用来排序的特征(features)就需要多年的迭代设计。
为了便于这一讲的讨论,我们来构想一个简化的玩具问题,来帮助你理解。
假设你的电商网站销售10亿件商品,已经跟踪了网站的销售记录:商品id和购买时间 {product_id, timestamp},整个交易记录是1000亿行数据,TB级。作为技术负责人,你会怎样设计一个系统,根据销售记录统计去年销量前10的商品呢?
举个例子,假设我们的数据是:

我们可以把热销榜按 product_id 排名为:1, 2, 3。
小规模的经典算法
如果上过极客时间的《数据结构与算法之美》,你可能一眼就看出来,这个问题的解法分为两步:

第一步,统计每个商品的销量。你可以用哈希表(hashtable)数据结构来解决,是一个O(n)的算法,这里n是1000亿。
第二步,找出销量前十,可以用经典的Top K算法,也是O(n)的算法。
如果你考虑到了这些,先恭喜你答对了。
在小规模系统中,我们确实完全可以用经典的算法简洁漂亮地解决。以Python编程的话可能是类似这样的:
但在一切系统中,随着尺度的变大,很多方法就不再适用。
比如,在小尺度经典物理学中适用的牛顿力学公式是这样的:

这在高速强力的物理系统中就不再适用,在狭义相对论中有另外的表达。

在社会系统中也是一样,管理10人团队,和治理14亿人口的国家,复杂度也不可同日而语。
具体在我们这个问题中,同样的Top K算法当数据规模变大会遇到哪些问题呢?
第一,内存占用。
对于TB级的交易记录数据,很难找到单台计算机容纳那么大的哈希表了。你可能想到,那我不要用哈希表去统计商品销售量了,我把销量计数放在磁盘里完成好了。
比如,就用一个1000亿行的文件或者表,然后再把销量统计结果一行一行读进后面的堆树/优先级队列。理论上听起来不错,实际上是否真的可行呢,那我们看下一点。
第二,磁盘I/O等延时问题。
当数据规模变大,我们难以避免地需要把一些中间结果存进磁盘,以应对单步任务出错等问题。一次磁盘读取大概需要10ms的时间。
如果按照上一点提到的文件替代方法,因为我们是一个O(n * log k)的算法,就需要10ms * 10^9 = 10 ^ 7 s = 115 天的时间。你可能需要贾跃亭附体,才能忽悠老板接受这样的设计方案了。
这些问题怎么解决呢?你可能已经想到,当单台机器已经无法适应我们数据或者问题的规模,我们需要横向扩展。
大规模分布式解决方案
之前的思路依然没错。但是,我们需要把每一步从简单的函数算法,升级为计算集群的分布式算法。
统计每个商品的销量
我们需要的第一个计算集群,就是统计商品销量的集群。
例如,1000台机器,每台机器一次可以处理1万条销售记录。对于每台机器而言,它的单次处理又回归到了我们熟悉的传统算法,数据规模大大缩小。
下图就是一个例子,图中每台机器输入是2条销售记录,输出是对于他们的本地输入而言的产品销量计数。
找出销量前K
我们需要的第二个计算集群,则是找出销量前十的集群。
这里我们不妨把问题抽象一下,抽象出是销量前K的产品。因为你的老板随时可能把产品需求改成前20销量,而不是前10了。
在上一个统计销量集群得到的数据输出,将会是我们这个处理流程的输入。所以这里需要把分布在各个机器分散的产品销量汇总出来。例如,把所有product_id = 1的销量全部叠加。
下图示例是K = 1的情况,每台机器先把所有product_id = 1的销量叠加在了一起,再找出自己机器上销量前K = 1的商品。可以看到对于每台机器而言,他们的输出就是最终排名前K = 1的商品候选者。
汇总最终结果
到了最后一步,你需要把在“销量前K集群”中的结果汇总出来。也就是说,从所有排名前K=1的商品候选者中找出真正的销量前K=1的商品。
这时候完全可以用单一机器解决了。因为实际上你汇总的就是这1000台机器的结果,规模足够小。
看到这里,你已经体会到处理超大规模数据的系统是很复杂的。
当你辛辛苦苦设计了应对1亿用户的数据处理系统时,可能你就要面临另一个维度的规模化(scaling)。那就是应用场景数量从1个变成1000个。每一次都为不同的应用场景单独设计分布式集群,招募新的工程师维护变得不再“可持续发展”。
这时,你需要一个数据处理的框架。
大规模数据处理框架的功能要求
在第二讲“MapReduce后谁主沉浮:怎样设计现代大规模数据处理技术”中,我们对于数据处理框架已经有了基本的方案。
今天这个实际的例子其实为我们的设计增加了新的挑战。
很多人面对问题,第一个想法是找有没有开源技术可以用一下。
但我经常说服别人不要先去看什么开源技术可以用,而是从自己面对的问题出发独立思考,忘掉MapReduce,忘掉Apache Spark,忘掉Apache Beam。
如果这个世界一无所有,你会设计怎样的大规模数据处理框架?你要经常做一些思维实验,试试带领一下技术的发展,而不是永远跟随别人的技术方向。
在我看来,两个最基本的需求是:
高度抽象的数据处理流程描述语言。作为小白用户,我肯定再也不想一一配置分布式系统的每台机器了。作为框架使用者,我希望框架是非常简单的,能够用几行代码把业务逻辑描述清楚。
根据描述的数据处理流程,自动化的任务分配优化。这个框架背后的引擎需要足够智能,简单地说,要把那些本来手动配置的系统,进行自动任务分配。
那么理想状况是什么?对于上面的应用场景,我作为用户只想写两行代码。
第一行代码:
sales_count = sale_records.Count()
这样简单的描述,在我们框架设计层面,就要能自动构建成上文描述的“销量统计计算集群”。
第二行代码
top_k_sales = sales_count.TopK(k)
网友评论