MapReduce
参考:大数据经典论文解读 | 极客时间 (opens new window) 06-07 讲
MapReduce 的设计哲学与 Unix 是一样的:“Do one thing, and do it well.”
# 1. 分布式系统的首要目标:开发人员不懂分布式
一个分布式数据处理系统,哪些特性比较重要呢?有人选择 performance,有人选择 scalability,但大部分人会漏掉一点:“易用性”。
更具体一点,就是我们希望来使用这个分布式数据处理系统的人,最好意识不到“分布式”的存在。来使用这个系统的人,只需要撰写处理数据的业务逻辑代码,而不需要关心代码会在哪一台机器上执行、怎么去进行网络通信。撰写的代码只需要解决自身的工作,那就是数据处理的业务逻辑。也就是我们自己的代码,只需要有单一职责,而不是需要全盘考虑分布式系统的设计问题。
因为分布式系统中会遇到的这些故障和失败,是一个很常见的问题,但是这些问题并不容易处理。我们之所以开发 MapReduce 这样的系统,就是为了让没有分布式系统知识和经验的人,一样可以快速简便地去利用 MapReduce 处理海量的数据。
那么,整个MapReduce的论文,其实就是围绕着这一核心点来讲述和设计的。这篇论文,基本上可以看作是三个部分:
- MapReduce 的计算模型和应用场景;
- MapReduce 实际是如何实现的,使得开发者无需关心分布式的存在;
- 如何逐步迭代优化 MapReduce 的性能。
# 2. MapReduce:一个分布式的 Bash
尽管我们现在回过头看 MapReduce,可以说是简陋,这也是为什么后续有各种各样的新系统出现,比如 Spark、Storm 等。但尽管简单,MapReduce 的设计却是干净利落,它从一开始就是奔着让开发者对“分布式”无感而去的。
今天先看一下 MapReduce 的计算模型和应用场景是什么,然后我们再和 Unix 下的管道做一个对比,看一看为什么说 MapReduce 继承了 Unix 的设计思想。
对于存放于上千个节点的 GFS 数据,要如何进行数据处理呢?最简单的方式就是在许多机器上并行计算,而需要的计算方式,抽象来说,也无非是三种情况:
- 第一种,是对所有的数据,我们都只需要单条数据就能完成处理。比如,我们有很多网页的内容,我们要从里面提取出来每一个网页的标题。这样的计算可以完全并行化。
- 第二种,是需要汇总多条数据才能完成计算。比如,要统计日志里面某个URL被访问了多少次,只需要简单累加就可以了。或者我们需要更复杂一些的操作,比如统计某个URL下面的唯一用户数。而对于这里的第二种情况,我们就需要将所有相同URL的数据,搬运到同一个计算节点上进行处理。不过,在搬运之后,不同的URL还是可以放到不同的节点进行处理的。
- 第三种,自然是一、二两种情况的组合了。比如,我们先从网页数据里面,提取出网页的URL和标题,然后根据标题里面的关键字,统计特定关键字出现在多少个不同的URL里面,这就需要同时采用一二这两种情况的操作。
当然,我们可以有更复杂的数据操作,但是这些动作也都可以抽象成前面的两个动作的组合。因为无非,我们要处理的数据要么是完全独立的,要么需要多条数据之间的依赖。实际上,前面的第一种动作,就是 MapReduce 里面的 Map;第二种动作,就是 MapReduce 里面的 Reduce;而搬运的过程,就是 Shuffle 操作。
那么接下来,我们就一起具体看看 MapReduce 是怎么回事儿。
# 2.1 MapReduce 编程模型
MapReduce 的编程模型非常简单,对于想要利用 MapReduce 框架进行数据处理的开发者来说,只需要实现一个 Map函数,一个 Reduce 函数。
- Map 函数:它会接受一个key-value对,然后把这个key-value对转换成0到多个新的key-value对并输出出去:
map (k1, v1) -> list (k2, v2)
- Reduce 函数:它接受一个 Key,以及这个 Key 下的一组 Value,然后化简成一组新的值 Value 输出出去:
reduce (k2, list(v2)) -> list(v3)
而在Map函数和Reduce函数之外,开发者还需要指定一下输入输出文件的路径。输入路径上的文件内容,会变成一个个键值对给到Map函数。而Map函数会运行开发者写好的映射逻辑,把数据作为新的一组键值对输出出去。
Map函数的输出结果,会被整个 MapReduce 程序接手,进行一个叫做混洗的操作。混洗会把Map函数输出的所有相同的Key的Value整合到一个列表中,给到Reduce函数。并且给到Reduce函数的Key,在每个Reduce里,都是按照Key排好序的。
这里排好序并不是MapReduce框架本身的核心需求,而是为了技术上实现方便。因为我们要把相同Key的数据放到一起处理,而通过一个HashMap把所有的数据放在内存里又不一定放得下。那么利用硬盘进行外部排序是一个最简单的,没有内存大小依赖的对数据根据Key进行分组的解决办法。最后,在拿到混洗完成,分好组的数据之后,Reduce函数就会运行你写好的化简逻辑,最终输出结果。
如果你熟悉设计模式的话,你会发现在实现一个 MapReduce 程序上你需要做的事情,其实就是一个典型的模版方法模式 (opens new window)(Template Method Pattern)。而 MapReduce 与其说是一个分布式数据处理系统,不如说是分布式数据处理框架。因为 MapReduce 框架已经设定好了整个数据处理的流程,你只需要实现 Map 和 Reduce 这两个接口函数,就能完成海量的数据处理程序。
# 2.2 MapReduce 的应用场景
别看在 MapReduce 框架下,你只能定义简简单单的一个 Map 和一个 Reduce 函数,实际上它能够实现的应用场景,论文里可列了不少,包括以下六个:
- 分布式grep;
- 统计URL的访问频次;
- 反转网页-链接图;
- 分域名的词向量;
- 生成倒排索引;
- 分布式排序。
下面,我们就主要来关注一下前两个场景的用途,看看最简单的两个场景是如何通过MapReduce来实现的。
# 2.2.1 分布式 grep
在日常使用Linux的过程中,相信你没少用过grep这个命令。早年间,在出现各种线上故障的时候,我常常会通过grep来检索各种应用和Web服务器的错误日志,去排查线上问题,如下所示:
grep "error" access.log > /tmp/error.log.1
在单台Linux服务器上,我们当然可以用一个grep命令。那么如果有很多台服务器,我们怎么才能知道在哪台机器上会有我们需要的错误日志呢?
最简单的办法,当然就是在每台服务器上,都执行一遍相同的grep命令就好了。这个动作就是所谓的“分布式grep”,在整个MapReduce框架下,它其实就是一个只有Map,没有Reduce的任务。
在真实的应用场景下,“分布式grep”当然不只是用来检索日志。对于谷歌这个全球最大搜索引擎来说,这是完美地用来做网页预处理的方案。通过网络爬虫抓取到的网页内容,你都可以直接存到GFS上,这样你就可以撰写一个Map函数,从HTML的网页中,提取网页里的标题、正文,以及链接。然后你可以再去撰写一个Map函数,对标题和正文进行关键词提取。
这些一步步的处理结果,还会作为后续的反转网页-链接图、生成倒排索引等MapReduce任务的输入数据。
实际上,“分布式grep”就是一个分布式抽取数据的抽象,无论是像grep一样通过一个正则表达式来提取内容,还是用复杂的算法和策略从输入中提取内容,都可以看成是一种“分布式grep”。而在MapReduce这个框架下,你只需要撰写一个Map函数,并不需要关心数据存储在具体哪台机器上,也不需要关心哪台机器的硬件或者网络出了问题。
# 2.2.2 统计 URL 的访问频次
了解了分布式 grep,我们再来看看统计访问频次。这里,我们先以极客时间的专栏用户访问日志作为例子,来看看 MapReduce 可以怎么来统计访问频次。
下面放了一个表格,我们把它叫做url_visit_logs 。在这个表格里面有三个字段,分别是:
- URL,记录用户具体是看专栏里的哪一篇文章;
- USER_ID,记录具体是哪一个用户访问;
- VISIT_TIME,记录用户访问的具体时间。
那么,作为像谷歌这样的搜索引擎,它通常都会有统计网页访问频次的需求。访问频次高的网页,通常可以被认为是内容质量高,会在搜索结果的排名里面,排在更前面的位置。
如果只是极客时间的网页,我们可以把这张表里面的数据放在数据库里面,通过一句SQL就可以完成了。但是如果考虑全网的所有数据网页访问日志,数据库就肯定放不下了。我们可以把这些日志以文件的形式放在GFS上,然后通过MapReduce来做数据统计。
Map 函数很简单,它拿到的输入数据是这样的:
- Key 就是单条日志记录在文件中的行号;
- Value 就是对应单条记录的字符串,不同字段之间以 Tab 分割。
Map函数只需要通过一个split或者类似的函数,对Value进行分割,拿到URL,然后输出一个List的key-value对。在当前的场景下,这个List只有一个key-value对:
- 输出的Key就是URL;
- 输出的Value为空字符串。
这个URL肯定不只被访问了一次,因为MapReduce框架会把所有相同URL的Map的输出记录,都混洗给到同一个Reduce函数里。所以在这里,Reduce函数拿到的输入数据是这样的:
- Key就是URL;
- 一个List的Value,里面的每一项都是空字符串。
Reduce函数的逻辑也非常简单,就是把list里面的所有Value计个数,然后和前面的Key拼装到一起,并输出出去。Reduce函数输出的list里,也只有这一个元素。
这样一个 MapReduce 过程,与使用 SQL 来统计的作用是相同的。
其实,SQL 是一种声明式的语言,MapReduce 是我们的实现过程,后面我们在讲解Hive 论文时候,你就会发现,Hive 的 HQL 就是通过一个个 MapReduce 程序来实现的。而前面的整个 MapReduce 的过程,其实用一段 Bash 代码也可以实现:
cat $input |
awk '{print $1}' |
sort |
uniq -c > $output
2
3
4
在这段 Bash 代码中:
- cat 相当于我们 MapReduce 框架从 HDFS 读取数据;
- awk 的脚本,是我们实现的 Map 函数;
- sort 相当于 MapReduce 的混洗,只是这个混洗是在本机上执行的;
- 而最后的 uniq -c 则是实现了 Reduce 函数,在排好序的数据下,完成了同一 URL 的去重计数的工作。
如果和 MapReduce 框架对照起来,你会发现:
- 读写 HDFS 文件的内容,对应着 cat 命令和标准输出;
- 对于数据进行混洗,对应着 sort 命令;
- 整个框架,不同阶段之间的数据传输,用的就是标准的输入输出管道。
那么,对于开发者来说,只要自己实现Map和Reduce函数就好了,其他都不需要关心。而对于实现MapReduce的底层框架代码,也可以映射到读取、外部排序、输出,以及通过网络进行跨机器的数据传输就好了。在这个设计框架下,每一个组件都只需要完成自己的工作,整个框架就能很容易地串联起来了。
MapReduce在发明之后,也被用作大规模的机器学习,一个最常用的算法就是通过L-BFGS来进行逻辑回归的模型训练。如果你对通过MapReduce进行机器学习感兴趣,可以去读一下论文“ Large-scale L-BFGS using MapReduce (opens new window)”。
# 3. MapReduce 的三个框架
要想让写 Map 和 Reduce 函数的人不需要关心“分布式”的存在,那么 MapReduce 框架本身就需要解决好三个很重要的问题:
- 如何做好各个服务器节点之间的协同,以及解决出现各种软硬件问题后的容错 这两部分的设计。
- 性能问题。与 GFS 一样,MapReduce 也同样容易遇到网络性能瓶颈,因此需要尽量充分利用 MapReduce 集群的计算能力,并让整个集群的性能可以随硬件的增加接近于线性增长,可以说是非常大的一个挑战。
- 易用性。map 和 reduce 函数可能会遇到千奇百怪的数据,当我们的程序在遭遇到奇怪的数据,我们需要有办法来 debug。
而谷歌在论文里面,也通过第三部分的“MapReduce的实现”,以及第四部分的“MapReduce的完善”,很好地回答了怎么解决这三个问题。下面,我们就来具体看看,论文里是怎么讲的。
# 4. MapReduce 的协同
一个MapReduce的集群,通常就是之前的分布式存储系统 GFS 的集群。在这个集群里,本身会有一个调度系统(Scheduler)。当我们要运行一个MapReduce任务的时候,其实就是把整个 MapReduce 的任务提交给这个调度系统,让这个调度系统来分配和安排 Map 函数和 Reduce 函数,以及后面会提到的 master 在不同的硬件上运行。
在 MapReduce 任务提交了之后,整个 MapReduce 任务就会按照这样的顺序来执行:
- 第一步:你写好的MapReduce程序,已经指定了输入路径。所以MapReduce会先找到GFS上的对应路径,然后把对应路径下的所有数据进行分片(Split)。每个分片的大小通常是64MB,这个尺寸也是GFS里面一个块(Block)的大小。接着,MapReduce会在整个集群上,启动很多个MapReduce程序的复刻(fork)进程。
- 第二步:在这些进程中,有一个和其他不同的特殊进程,就是一个master进程,剩下的都是worker进程。然后,我们会有M个map的任务(Task)以及R个reduce的任务,分配给这些worker进程去进行处理。这里的master进程,是负责找到空闲的(idle)worker进程,然后再把map任务或者reduce任务,分配给worker进程去处理。
需要注意一点,并不是每一个map和reduce任务,都会单独建立一个新的worker进程来执行。而是master进程会把map和reduce任务分配给有限的worker,因为一个worker通常可以顺序地执行多个map和reduce的任务。
- 第三步:被分配到map任务的worker会读取某一个分片,分片里的数据就像上一讲所说的,变成一个个key-value对喂给了map任务,然后等Map函数计算完后,会生成的新的key-value对缓冲在内存里。
- 第四步:这些缓冲了的key-value对,会定期地写到map任务所在机器的本地硬盘上。并且按照一个分区函数(partitioning function),把输出的数据分成R个不同的区域。而这些本地文件的位置,会被worker传回给到master节点,再由master节点将这些地址转发给reduce任务所在的worker那里。
- 第五步:运行reduce任务的worker,在收到master的通知之后,会通过RPC(远程过程调用)来从map任务所在机器的本地磁盘上,抓取数据。当reduce任务的worker获取到所有的中间文件之后,它就会将中间文件根据Key进行排序。这样,所有相同Key的Value的数据会被放到一起,也就是完成了我们上一讲所说的混洗(Shuffle)的过程。
- 第六步:reduce会对排序后的数据执行实际的Reduce函数,并把reduce的结果输出到当前这个reduce分片的最终输出文件里。
- 第七步:当所有的map任务和reduce任务执行完成之后,master会唤醒启动MapReduce任务的用户程序,然后回到用户程序里,往下执行MapReduce任务提交之后的代码逻辑。
其实,以上整个MapReduce的执行过程,还是一个典型的Master-Slave的分布式系统。map和reduce所在的worker之间并不会直接通信,它们都只和master通信。另外,像是map的输出数据在哪里这样的信息,也是告诉master,让master转达给reduce所在的worker。reduce从map里获取数据,也是直接拿到数据所在的地址去抓取,而不是让reduce通过RPC,调用map所在的worker去获取数据。
如果你熟悉MapReduce的开源实现Hadoop的话,你会发现Hadoop 1.0的实现,其实和MapReduce的论文不太一样。在Hadoop里,每一个MapReduce的任务并没有一个独立的master进程,而是直接让调度系统承担了所有的worker的master的角色,这就是Hadoop 1.0里的 JobTracker。
在Hadoop 1.0里,MapReduce论文里面的worker就是 TaskTracker,用来执行map和reduce的任务。而分配任务,以及和TaskTracker沟通任务的执行情况,都由单一的JobTracker来负责。
这个设计,也导致了只要服务器数量一多,JobTracker的负载就会很重。所以早年间,单个Hadoop集群能够承载的服务器上限,被卡在了4000台。而且JobTracker也成为了整个Hadoop系统很脆弱的单点。
所以之后在Hadoop 2.0,Hadoop社区把JobTracker的角色,拆分成了进行任务调度的 Resource Mananger,以及监控单个MapReduce任务执行的 Application Master,回到了和MapReduce论文相同的架构。
而在2015年,谷歌发布了Borg这个集群管理系统的论文的时候,大家发现谷歌早在2003~2004年,就已经有了独立的集群管理系统Borg,也就是MapReduce里面所提到的调度系统。在后面的资源调度模块中,我们也会专门解读Borg这个调度系统的论文,以及被认为是Borg后继者和开源实现Kubernetes的论文。
# 5. MapReduce 的容错(Fault Tolerance)
MapReduce的容错机制非常简单,可以简单地用两个关键词来描述,就是重新运行和写Checkpoints。
# 5.1 Worker Failure
对于worker节点的失效,MapReduce框架解决问题的方式非常简单。就是换一台服务器重新运行这个worker节点被分配到的所有任务。master节点会定时地去ping每一个worker节点,一旦worker节点没有响应,我们就会认为这个节点失效了。
于是,我们会重新在另一台服务器上,启动一个worker进程,并且在新的worker进程所在的节点上,重新运行所有失效节点上被分配到的任务。而无论失效节点上,之前的map和reduce任务是否执行成功,这些任务都会重新运行。因为在节点ping不通的情况下,我们很难保障它的本地硬盘还能正常访问。
# 5.2 Master Failure
谷歌已经告诉了我们,他们就任由master节点失败了,也就是整个MapReduce任务失败了。那么,对于开发者来说,解决这个问题的办法也很简单,就是再次提交一下任务去重试。
因为master进程在整个任务中只有一个,它会失效的可能性很小。而MapReduce的任务也是一个用户离线数据处理的任务,并不是一个实时在线的服务,失败重来通常也没有什么影响,只是晚一点拿到数据结果罢了。
虽然在论文发表的时候,谷歌并没有实现对于master的失效自动恢复机制,但他们也给出了一个很简单的解决方案,那就是让master定时把它里面存放的信息,作为一个个的Checkpoint写入到硬盘中去。
那么我们动一下脑筋,我们可以把这个Checkpoint直接写到GFS里,然后让调度系统监控master。这样一旦master失效,我们就可以启动一个新的master,来读取Checkpoints数据,然后就可以恢复任务的继续执行了,而不需要重新运行整个任务。
# 5.3 对错误数据视而不见
worker和master的节点失效,以及对应的恢复机制,通常都是来自于硬件问题。但是在海量数据处理的情况下,比如在TB乃至PB级别的数据下,我们还会经常遇到“脏数据”的问题。
这些数据,可能是日志采集的时候就出错了,也可能是一个非常罕见的边界情况(edge-case),我们的Map和Reduce函数正好处理不了。甚至有可能,只是简单的硬盘硬件的问题带来的错误数据。
那么,对于这些异常数据,我们固然可以不断debug,一一修正。但是这么做,大多数时候都是划不来的,你很可能为了一条数据记录,由于Map函数处理不了,你就要重新扫描几TB的数据。
所以,MapReduce不仅为节点故障提供了容错机制,对于这些极少数的数据异常带来的问题,也提供了一个容错机制。MapReduce会记录Map或者Reduce函数,运行出错的具体数据的行号,如果同样行号的数据执行重试还是出错,它就会跳过这一行的数据。如果这样的数据行数在总体数据中的比例很小,那么整个MapReduce程序会忽视这些错误,仍然执行完成。毕竟,一个URL被访问了1万次还是9999次,对于搜素引擎的排序结果不会有什么影响。
# 6. MapReduce 的性能优化
聊完了MapReduce的容错处理,我们接着一起来看看MapReduce的性能问题。我们在前面说过,其实MapReduce的集群就是GFS的集群。所以MapReduce集群里的硬件配置,和GFS的硬件配置差不多,最容易遇到的性能瓶颈,也是100MB或者1GB的网络带宽。
# 6.1 把程序搬到数据哪去
既然网络带宽是瓶颈,那么优化的办法自然就是尽可能减少需要通过网络传输的数据。
在MapReduce这个框架下,就是在分配map任务的时候,根据需要读取的数据在哪里进行分配。通过前面GFS论文的学习,我们可以知道,GFS是知道每一个Block的数据是在哪台服务器上的。而MapReduce,会找到同样服务器上的worker,来分配对应的map任务。如果那台服务器上没有,那么它就会找离这台服务器最近的、有worker的服务器,来分配对应的任务。你可以参考下面给出的示意图:
除此之外,由于MapReduce程序的代码往往很小,可能只有几百KB或者几MB,但是每个map需要读取的一个分片的数据是64MB大小。这样,我们通过把要执行的MapReduce程序,复制到数据所在的服务器上,就不用多花那10倍乃至100倍的网络传输量了。
这就好像你想要研究金字塔,最好的办法不是把金字塔搬到你家来,而是你买张机票飞过去。这里的金字塔就是要处理的数据,而你,就是那个分配过去的MapReduce程序。
# 6.2 通过 Combiner 减少网络数据传输
除了Map函数需要读取输入的分片数据之外,Reduce所在的worker去抓取中间数据,一样也需要通过网络。那么要在这里减少网络传输,最简单的办法,就是尽可能让中间数据的数据量小一些。
自然,在MapReduce的框架里,也不会放过这一点。MapReduce允许开发者自己定义一个 Combiner 函数。这个Combiner函数,会对在同一个服务器上所有map输出的结果运行一次,然后进行数据合并。
比如,在上一讲我留下的思考题里,如果你想要统计每个域名的访问次数,那么Map函数的输出结果,就会是一个域名+一次访问计数的1。
而对于用户会高频访问的网站,在map输出的中间结果里就会有很多条记录,比如用户访问了baidu.com、douyin.com这样的域名就会有大量的记录。这些记录的Key就是对应的baidu.com、douyin.com的域名,而value都是1。
既然只是对访问次数计数,我们自然就可以通过一个 Combiner,把1万条相同域名的访问记录做个化简。把它们变成Key还是域名,Value就是有多少次访问的数值这样的记录就好了。而这样一化简,reduce所在的worker需要抓取的数据,就从1万条变成了1条。
实际上,不仅是同一个Map函数的输出可以合并,同一台服务器上多个Map的输出,我们都可以合并。反正它们都在一台机器上,合并只需要本地的硬盘读写和CPU,并不需要我们最紧缺的网络资源。
我就以域名的访问次数为例,它的数据分布一定有很强的头部效应,少量20%的域名可能占了80%的访问记录。这样一合并,我们要传输的数据至少可以减少60%。如果考虑一台16核的服务器,有16个map的worker运行,应该还能再减少80%以上。这样,通过一个中间的Combiner,我们要传输的数据一下子就下降了两个数量级,大大缓解了网络传输的压力。
你可以参考下面给出的示意图:
# 7. MapReduce 的 debug 信息
好,性能优化完了,最后我们来看一下MapReduce对于开发者的易用性。
虽然我们一直说,我们希望MapReduce让开发者意识不到分布式的存在。但是归根到底,map和reduce的任务都是在分布式集群上运行的,这个就给我们对程序debug带来了很大的挑战。 无论是通过debugger做单步调试,还是打印出日志来看程序执行的情况,都不太可行。所以,MapReduce也为开发者贴心地提供了三个办法来解决这一点。
- 第一个,是提供一个单机运行的MapReduce的库,这个库在接收到MapReduce任务之后,会在本地执行完成map和reduce的任务。这样,你就可以通过拿一点小数据,在本地调试你的MapReduce任务了,无论是debugger还是打日志,都行得通。
- 第二个,是在master里面内嵌了一个HTTP服务器,然后把master的各种状态展示出来给开发者看到。这样一来,你就可以看到有多少个任务执行完了,有多少任务还在执行过程中,它处理了多少输入数据,有多少中间数据,有多少输出的结果数据,以及任务完成的百分比等等。同样的,里面还有每一个任务的日志信息。通过这个HTTP服务器,你还可以看到具体是哪一个worker里的任务失败了,对应的错误日志是什么。这样,你就可以快速在线上定位你的程序出了什么错,是在哪台服务器上。
- 最后一个,是MapReduce框架里提供了一个计数器(counter)的机制。作为开发者,你可以自己定义几个计数器,然后在Map和Reduce的函数里去调用这个计数器进行自增。所有map和reduce的计数器都会汇总到master节点上,通过上面的HTTP服务器里展现出来。
比如,你就可以利用这个计数器,去统计有多少输入日志的格式和预期的不一样。如果比例太高,那么多半你的程序就有Bug,没有兼容所有合法的日志。下图展示的就是在Hadoop里,通过JobTracker查看Task的执行情况,以及对应每个Task的日志:
这些机制看起来好像并不起眼,似乎和分布式计算框架的名头关系不大。但也正是这些易用的小功能,就让开发者在开发分布式数据处理任务的时候效率大增,不需要吭哧吭哧一台台服务器去翻日志来排查问题,可谓是功莫大焉。
# 8. 小结
MapReduce的论文到这里就讲解完了。和GFS一样,MapReduce的实现是比较简单的,就是一个典型的单master多worker组成的主从架构。在分布式系统容错上,MapReduce也采取了简单的 重新运行、再来一次 的方案。对于master这个单点可能出现的故障,谷歌在最早的实现里,根本就没有考虑失效恢复,而是选择了任由master失败,让开发人员重新提交任务重试的办法。
还有一点也和GFS一样,MapReduce论文发表时的硬件,用的往往是100MB或者1GB的网络带宽。所以MapReduce框架对于这一点,就做了不少性能优化动作。通过尽量让各个worker从本地硬盘读取数据,以及通过Combiner合并本地Map输出的数据,来尽可能减少数据在网络上的传输。
而为了方便开发人员去debug程序,以及监控程序的执行,MapReduce框架通过master内嵌的Web服务器,展示了所有worker的运行情况和日志。你还可以通过自定义的计数器,统计更多你觉得有价值的信息。
当然,MapReduce里还有备用任务(Backup Tasks)、自定义的Partitioner等更多的细节值得你去探索。这些就留给你去仔细研读论文,好好琢磨了。
# 8.1 遗憾与缺陷
尽管MapReduce框架已经作出了很多努力,但是今天来看,整个计算框架的缺陷还是不少的。在我看来,主要的缺陷有两个:
- 第一个是还没有100%做到让用户意识不到“分布式”的存在,无论是Combiner还是Partitioner,都是让开发者意识到,它面对的还是分布式的数据和分布式的程序。
- 第二个是性能仍然不太理想,这体现在两个方面,一个是每个任务都有比较大的overhead,都需要预先把程序复制到各个worker节点,然后启动进程;另一个是所有的中间数据都要读写多次硬盘。map的输出结果要写到硬盘上,reduce抓取数据排序合并之后,也要先写到本地硬盘上再进行读取,所以快不起来。
不过,随着时间的变迁,会有更多新一代的系统,像是Dremel和Spark逐步取代MapReduce,让我们能更容易地写出分布式数据处理程序,处理起数据也比原始的MapReduce快上不少。
# 8.2 推荐阅读
对于分布式系统,我们总是希望增加机器就能够带来同比例的性能提升,但是这一点其实很难做到。
Storm的作者南森·马茨(Nathan Marz)在2010年就发表过一个很有意思的博文,告诉大家为什么优化MapReduce任务30%的运行时间,就会减少80%任务实际消耗的时间。这篇文章应该更加有助于你理解为什么我们说MapReduce的遗憾与缺陷中提到的额外开销(overhead)问题。我把这篇文章的 链接 (opens new window) 放在这里,推荐你去阅读学习一下。