How MapReduce Works

 

一、从Map到Reduce

MapReduce其实是分治算法的一种实现,其处理过程亦和用管道命令来处理十分相似,一些简单的文本字符的处理甚至也可以使用Unix的管道命令来替代,从处理流程的角度来看大概如下:

cat input | grep |   sort  |  uniq -c | cat > output
# Input -> Map -> Shuffle & Sort -> Reduce -> Output

简单的流程图如下:


对于Shuffle,简单地说就是将Map的输出通过一定的算法划分到合适的Reducer中进行处理。Sort当然就是对中间的结果进行按key排序,因为Reducer的输入是严格要求按key排序的。

Input->Map->Shuffle&Sort->Reduce->Output只是从宏观的角度对MapReduce的简单描述,实际在MapReduce的框架中,即从编程的角度来看,其处理流程是Input->Map->Sort->Combine->Partition->Reduce->Output。用之前的对温度进行统计的例子来讲述这些过程。

Input Phase

输入的数据需要以一定的格式传递给Mapper的,格式有多种,如TextInputFormat、DBInputFormat、SequenceFileInput等等,可以使用JobConf.setInputFormat来设置,这个过程还应该包括对输入的数据进行任务粒度划分(split)然后再传递给Mapper。在温度的例子中,由于处理的都是文本数据,输入的格式使用默认的TextInputFormat即可。

Map Phase

对输入的key、value对进行处理,输出的是key、value的集合,即map (k1, v1) -> list(k2, v2),使用JobConf.setMapperClass设置自己的Mapper。在例子中,将(行号、温度的文本数据)作为key/value输入,经过处理后,从温度的文件数据中提取出日期中的年份和该日的温度数据,形成新的key/value对,最后以list(年,  温度)的结果输出,如[(1950, 10), (1960, 40), (1960, 5)]。

Sort Phase

对Mapper输出的数据进行排序,可以通过JobConf.setOutputKeyComparatorClass来设置自己的排序规则。在例子中,经过排序之后,输出的list集合是按年份进行排序的list(年, 温度),如[(1950, 10), (1950, 5), (1960, 40)]。

Combine Phase

这个阶段是将中间结果中有相同的key的<key, value>对合并成一对,Combine的过程与Reduce很相似,使用的甚至是Reduce的接口。通过Combine能够减少<key, value>的集合数量,从而减少网络流量。Combine只是一个可选的优化过程,并且无论Combine执行多少次(>=0),都会使Reducer产生相同的输出,使用JobConf.setCombinerClass来设置自定义的Combine Class。在例子中,假如map1产生出的结果为[(1950, 0), (1950, 20), (1950, 10)],在map2产生出的结果为[(1950, 15), (1950, 25)],这两组数据作为Reducer的输入并经过Reducer处理后的年最高温度结果为(1950, 25),然而当在Mapper之后加了Combine(Combine先过滤出最高温度),则map1的输出是[(1950, 20)]和map2的输出是[(1950, 25)],虽然其他的三组数据被抛弃了,但是对于Reducer的输出而言,处理后的年最高温度依然是(1950, 25)。

Partition Phase

把Mapper任务输出的中间结果按key的范围划分成R份(R是预先定义的Reduce任务的个数),默认的划分算法是”(key.hashCode() & Integer.MAX_VALUE) % numPartitions”,这样保证了某一范围的key一定是由某个Reducer来处理,简化了Reducer的处理流程,使用JobConf.setPartitionClass来设置自定义的Partition Class。在例子中,默认就自然是对年份进行取模了。

Reduce Phase

Reducer获取Mapper输出的中间结果,作为输入对某一key范围区间进行处理,使用JobConf.setReducerClass来设置。在例子中,与Combine Phase中的处理是一样的,把各个Mapper传递过来的数据计算年最高温度。

Output Phase

Reducer的输出格式和Mapper的输入格式是相对应的,当然Reducer的输出还可以作为另一个Mapper的输入继续进行处理。

二、Details of Job Run

上面只是从task运行中描述了Map和Reduce的过程,实际上当从运行”hadoop jar”开始还涉及到很多其他的细节。从整个Job运行的流程来看,如下图所示:

从上图可以看到,MapReduce运行过程中涉及有4个独立的实体:

  • Client,用于提交MapReduce job。
  • JobTracker,负责协调job的运行。
  • TaskTrackers,运行 job分解后的多个tasks,task主要是负责运行Mapper和Reducer。
  • Distributed filesystem,用于存储上述实体运行时共享的job文件(如中间结果文件)。

Job Submission

当调用了JobClient.runJob()之后,Job便开始被提交了,在Job提交这个步骤中,经历了以下的过程:

  1. Client向JobTacker申请一个新的job ID(step 2),job ID形如job_200904110811_0002的格式,是由JobTracker运行当前的job的时间和一个由JobTracker维护的自增计数(从1开始)组成的。
  2. 检查job的output specification,比如输出目录是否已经存在(存在则抛异常)、是否有权限写等等。
  3. Computes the input splits for the job,这些input splits就是作为Mapper的输入。
  4. Copies the resources needed to run the job, including the job JAR file, the configuration file and the computed input splits, to the jobtracker’s filesystem in a direcotry named after the job ID(step 3)。
  5. Tells the jobtracker that the job is ready for execution(step 4)。

Job Initialization

当JobTracker收到Job提交的请求后,将job保存在一个内部队列,并让Job Scheduler处理并初始化。初始化涉及到创建一个封装了其tasks的job对象,并保持对task的状态和进度的根据(step 5)。当创建要运行的一系列task对象后,Job Scheduler首先开始从文件系统中获取由JobClient计算的input splits(step 6),然后再为每个split创建map task。

Task Assignment

TaskTrackers会使用一个简单的loop为定期向JobTracker发送heartbeat调用,发送的间隔时间大约5秒,一般取决于集群服务器的规模和繁忙程度以及网络拥挤程度。这个heartbeat一方面是告知JobTracker当前TaskTracker处于live状态,同时是用于JobTracker和TaskTracker进行通信,TaskTracker会根据heartbeat的返回值来执行一定的操作(step 7)。

To choose a reduce task the JobTracker simply takes the next in its list of yet-to-be-run reduce tasks, since there are no data locality considerations. For a map task, however, it takes account of the TaskTracker’s network location and picks a task whose input splits is as close as possible to the tasktracker. In the optimal case, the task is data-local, that is , running on the same node that the split resides on. Alternatively, the task may be rack-local: on the same rack, but not the same node, as the split.

Task Execution

当TaskTrack被分配到一个task之后,接下来就是运行这个task。首先,它会需要的job JAR文件从shared filesystem拷贝到local filesystem,然后创建一个working direcotry并un-jars拷贝的JAR文件到该directory,最后就创建一个TaskRunner对象运行task。

TaskRunner在运行的时候是启动了一个新的JVM来run each task(step 10),这样是为了防止在用户自定义的Mapper出现异常令JVM挂了,从而连累到TaskTracker。TaskRunner子进程会使用umbilical接口和TaskTracker通信并每隔几秒向TaskTracker汇报进度。

对于使用Streaming和Pipes方式来创建的Mapper,也是作为TaskTracker的子进程来运行的。Streaming是使用标准输入输出来通信,而Pipes是使用socket来进行通信,如下图:

Progress and Status Updates

进度和状态是通过heartbeat来更新和维护的。来对于Map Task,进度就是已处理数据和所有输入数据的比例。对于Reduce Task,情况就有点复杂,包括3部分,拷贝中间结果文件、排序、Reduce调用,每部分占1/3。

Job Completion

当Job完成后,JobTracker会收一个Job Complete的通知,并将当前的Job状态更新为Successful,同时JobClient也会轮循获知提交的Job已经完成,将信息显示给用户。最后,JobTracker会清理和回收该Job的相关资源,并通知TaskTracker进行相同的操作(比如删除中间结果文件)。

 

转载于:https://www.cnblogs.com/ggjucheng/archive/2012/04/23/2465820.html

来源:http://www.cnblogs.com/ggjucheng/archive/2012/04/23/2465820.html


智能推荐

How a CPU Works ?

The CPU short for central processing unit is like the brain of the computer and once you understand how it works you’ll understand the computer as well. Let’s remove the cover of the CPU A...

How Tomcat works 6: Lifecycle

概述     Catalina由许多组件组成,当容器启动时,这些组件都需要启动。当容器停止时,所有组件都需要做清除动作。如:调用所有servlet的detroy动作,session manager把session对象存入第二存储位置。启动和停止组件的持续性机制就是实现Lifecycle接口的实现类。     Lifecycle接口的实现组...

How Tomcat Works 15: Digester

一、概述     前面章节中,使用hard-code来管理各component间的从属关系,如果需要改变则需要重新编译Bootstrap类。幸运的是tomcat设计者采用了更优雅的方法来管理配置,即XML文件server.xml. 这样我们只需要修改server.xml文件就可以设置tomcat。如:<context docBase='myApp' path="...

How Tomcat works 10: Security

    1. 概述:     WebApp的资源的访问限制可以通过web.xml文件来支撑。Servlet容器可以通过叫做Authenticator valve类来支持。Authenticator valve会调用context's realms的authenticate方法校验user     2. Rea...

How cc Works 中文译文

Chromium 的工程师们写了两篇技术文章 [How Blink Works]1 和 How cc Works,分别介绍了 Chrome 浏览器内核内部的两个重要模块 Blink 和 cc 内部设计和实现的一些细节。对于想要了解 Chromium 内核内部实现的同学,这两篇文章提供了不错的入门指引。在征得作者同意后,我将其翻译成中文,以馈读者。 文中部分术语觉得难以翻译成合适的中文的,我会保留原...

猜你喜欢

How Blink Works 中文译文

Chromium 的工程师们写了两篇技术文章 How Blink Works 和 How cc Works,分别介绍了 Chrome 浏览器内核内部的两个重要模块 Blink 和 cc 内部设计和实现的一些细节。对于想要了解 Chrome 内核内部实现的同学,这两篇文章提供了不错的入门指引。在征得作者同意后,我将其翻译成中文,以馈读者。 文中部分术语觉得难以翻译成合适的中文的,我会保留原文。对于一...

How E-Commerce works

做了半个晚上的PPT和程序,打算英语演讲的时候录个视频滴,可惜出了些小插曲讲得时间远超了老师规定的7分钟内嘿嘿。本来此视频适合无聊的、伤心的、想学习的……最终只完成了2/5,但老师还是爽快地给了个历史记录A+,这是我犹豫之后继续发此文章的勇气了看来。 视频地址: http://t.cn/8k7EAEY 因为内容没讲完,不过有部分内容是以前已经写过的,可以看看:http:...

How the backpropagation algorithm works

http://neuralnetworksanddeeplearning.com/chap2.html 接上一篇的最后,我们要训练多层网络的时候,最后关键的部分就是求梯度啦。纯数学方法几乎是不可能的,那么反向传播算法就是用来求梯度的,用了一个很巧妙的方法。  反向传播算法应该是神经网络最基本最需要弄懂的方法了,要是反向传播方法不懂,后面基本上进行不下去。  非常推荐的是How ...

How transaction copying control works

Maintain copying control settings in below IMG activity Usually, copying control can dertermine the list of transaction types which can be created as follow-up, but for activity (task and appointment)...

原型对象,原型链

函数都有prototype属性,它指向原型对象。 实例对象有__proto__属性,它指向对象原型 每一个原型对象都有constructor输赢,指向构造函数,每一个原型对象又具有__proto__属性,这个指向Object.prototype.在这里插入图片描述...

问答精选

Correctly formatting GCM notifications?

I'm currently trying out the google cloud messaging service with its sample application "Guestbook." https://developers.google.com/cloud/samples/mbs/ I'm attempting to send notifications tha...

Are there any performance benefits of using Asynchronous functions over Synchronous in Node Js?

Now I came across an article that distinguishes between an Asynchronous function and Synchronous functions. From my understanding of the different examples and explanations, synchronous functions are ...

Python: Costing calculator output

Good day all I'm busy creating a small costing calculator for the signage department. I'm not getting the calculator to output the amount. Brief Description: You enter the height and width and then wh...

Flask-SQLAlchemy - model has no attribute 'foreign_keys'

I have 3 models created with Flask-SQLalchemy: User, Role, UserRole role.py: user.py: user_role.py: If I try (in the console) to get all users via User.query.all() I get AttributeError: 'NoneType' obj...

Seeding many PRNGs, then having to seed them again, what is a good quality approach?

I have many particles that follow an stochastic process in parallel. For each particle, there is a PRNG associated to it. The simulation must go through many repetitions to get average results. For ea...

相关问题

相关文章

热门文章

推荐文章

相关标签

推荐问答