0 前置知识-MapReduce
0.1 MapReduce的出现背景与应用场景
分布式Grep:map函数在匹配到给定的pattern时输出一行。reduce函数只是将给定的中间数据复制到输出上。
URL访问频次统计:map函数处理网页请求的日志,对每个URL输出〈URL, 1〉。reduce函数将相同URL的所有值相加并输出〈URL, 总次数〉对。
倒转Web链接图:map函数在source页面中针对每个指向target的链接都输出一个〈target, source〉对。reduce函数将与某个给定的target相关联的所有source链接合并为一个列表,并输出〈target, list(source)〉对。
倒排索引:map函数解析每个文档,并输出一系列〈单词, 文档ID〉对。reduce函数接受给定单词的所有中间对,将它们按文档ID排序,再输出〈单词, list(文档ID)〉对。所有输出对的集合组成了一个简单的倒排索引。用户可以很轻松的扩展这个过程来跟踪单词的位置。
分布式排序:map函数从每条记录中提取出key,并输出〈key, 记录〉对。reduce函数不改变这些中间对,直接输出。这个过程依赖于4.1节介绍的划分机制和4.2节介绍的排序性质。
0.2 编程模型
用户自定义的Map函数 接收一个输入的key/value pair值,然后产生一个中间key/value pair值的集合。
MapReduce库把所有具有相同key值的value值集合在一起,传递给reduce函数。
用户自定义的Reduce函数接受一个key值I和相关的一个value值的集合。Reduce函数合并这些value值,形成一个较小的value值的集合。一般,每次Reduce函数调用只产生0或1个输出value值。通常我们通过一个迭代器把中间value值提供给Reduce函数,这样我们就可以处理无法全部放入内存中的大量的value值的集合。
0.3 举例
统计多个文件中每个单词的出现次数: 对每个文件调用map函数,最后对所有map函数的输出调用reduce函数。
1、splitting
MapReduce库将输入的文件拆分成M个数据分片,每个数据片段的大小一般从 16MB到64MB(可以通过可选的参数来控制每个数据片段的大小),
将这些小块发送到不同的机器上(对应一个worker)
2、mapping
每个机器对拿到的数据,运行map函数:
入参key是文件名,value是文件内容,
输出一个key-value对(中间结果),key是该单词,value是1。如 "black":1, "black":1,"white":1 这种。
3、shuffling
reduce阶段 : 将所有map产生的内容汇总。 变成 key:list(value)的形式(中间结果)
4、reduce
用户编写的reduce函数处理中间结果,形成输出。
0.4 实现
1、用户程序调用MapReduce库,MapReduce库将输入文件分成M个数据片,每个数据片段的大小一般从 16MB到64MB(可以通过可选的参数来控制每个数据片段的大小)。这些分片发送给不同的机器, 每个机器都启动一个worker。 2、程序副本中的有一个特殊的程序–master。副本中其它的程序都是worker程序。 由master分配任务。有M个Map任务和R个Reduce任务将被分配。 3、被分配了map任务的worker程序读取相关的输入数据片段,从输入的数据片段中解析出key/value pair,然后把key/value pair传递给用户自定义的Map函数,由Map函数生成并输出的中间key/value pair,并缓存在内存中。 4、缓存中的key/value pair通过分区函数分成R个区域,之后周期性的写入到本地磁盘上(图中切成了2块)。缓存的key/value pair在本地磁盘上的存储位置将被回传给master,由master负责把这些存储位置再传送给Reduce worker。 5、当Reduce worker程序接收到master程序发来的数据存储位置信息后,使用RPC从Map worker所在主机的磁盘上读取这些缓存数据。当Reduce worker读取了中间数据后,通过对key进行排序后使得具有相同key值的数据聚合在一起。由于许多不同的key值会映射到相同的Reduce任务上,因此必须进行排序。如果中间数据太大无法在内存中完成排序,那么就要在外部进行排序。 6、Reduce worker程序遍历排序后的中间数据,对于每一个唯一的中间key值,Reduce worker程序将这个key值和它相关的中间value值的集合传递给用户自定义的Reduce函数。Reduce函数的输出被追加到所属分区的输出文件。 7、当所有的Map和Reduce任务都完成之后,master唤醒用户程序。在这个时候,在用户程序里的对MapReduce调用才返回。
在成功完成任务之后,MapReduce的输出存放在R个输出文件中(对应每个Reduce任务产生一个输出文件,文件名由用户指定)。一般情况下,用户不需要将这R个输出文件合并成一个文件–他们经常把这些文件作为另外一个MapReduce的输入,或者在另外一个可以处理多个分割文件的分布式应用中使用。
0.5 容错
worker失效
worker故障
master周期性的ping每个worker。如果在一个约定的时间范围内没有收到worker返回的信息,master将把这个worker标记为失效。
map失效后:
-
已完成的map task标记为idle。当worker故障时,由于已经完成的Map任务的输出存储在这台机器上,Map任务的输出已不可访问了,因此必须重新执行。
-
已完成的reduce task不用变。已经完成的Reduce任务的输出存储在全局文件系统上(GFS),因此不需要再次执行。
当一个Map任务首先被worker A执行,之后由于worker A失效了又被调度到worker B执行,这个“重新执行”的动作会被通知给所有执行Reduce任务的worker。任何还没有从worker A读取数据的Reduce任务将从worker B读取数据。
master失效
周期性备份数据。
master周期性的将上面描述的数据结构写入磁盘,即检查点(checkpoint)。如果这个master任务失效了,可以从最后一个检查点(checkpoint)开始启动另一个master进程。
0.6 备用任务
如果最后的几个任务执行时间过长怎么办?存在这种case,10个任务用5分钟完成了其中9个,但最后一个任务因为当前机器的负载过高花费了20分钟执行完毕,这么整个任务的执行周期就是20分钟。 如何能应对这一问题呢?
当仅剩下1%的任务时,可以启动备用任务,即同时在两个节点上执行相同的任务。这样只要其中一个先返回即可结束整个任务,同时释放未完成的任务所占用的资源。
1 实验要求
实现一个分布式 MapReduce,它由两个程序组成,master 和 worker。只有一个主进程和一个或多个并行执行的工作进程。Master 应该注意到一个 worker 是否没有在合理的时间内完成它的任务(对于本实验,使用 10 秒),并将相同的任务分配给不同的 worker。
2 程序设计
主要包括三个文件
-
master 与 Worker 间的 RPC 通信,对应的rpc.go文件
-
master对应 mr/master.go
-
Worker mr/worker.go
数据结构
task信息
拆成两部分,MasterStoreTask包含Task。 前者存在master里,后者传给worker。前者包含的信息更多,这样做降低了网络的传输量。
type Task struct {
TaskStage Stage //task的类型
NReducer int
TaskNumber int
Input string
Intermediates []string //存
Output string
}
//task的其他信息,保存在master即可,不需要通过rpc在master和worker之间传输。降低消耗
type MasterStoreTask struct {
TaskStatus Status //idle 等等
StartTime time.Time //记录该task的启动时间
TaskBody *Task //Task本体
}
master
type Master struct {
TaskQueue chan *Task ///task队列,保存所有等待执行的task。使用buffered channel作为queue存的task指针
TaskMap map[int]*MasterStoreTask // 保存当前所有task的信息。key:为task的idx value:MasterStoreTask指针,
MasterPhase Stage // Master当前所处的阶段
NReduce int
InputFiles []string
Intermediates [][]string // Map任务产生的R个中间文件的信息
}
执行流程
1.创建master,初始化各种参数m.MakeMaster()
坑:TaskQueue的大小为max(nReduce, len(files)),而非 len(files)
原因:该队列存的是当前等待运行的任务数量,map阶段的值是len(files),reduce阶段的值是nReduce。默认设置时应该二者取其大。
2.创建map任务
每个文件创建一个map任务。
3.等待连接
4.worker
死循环,不停的向master索要任务。根据任务类型,决定下一步的操纵。
4.1 worker索要任务
Master.GetTask
4.2 master响应
返回一个任务
Master.GetTask
入参:ExampleArgs
出参:Task{}
4.3 worker处理
根据task.TaskStage 判断任务类型:map reduce wait exit
4.3.1 mapper函数
接收切分后的input文件,执行mapf函数,并将输出得到的结果切成R份,存到本地磁盘。
4.4.1通知master任务完成
call("Master.TaskCompleted", task, &reply)
4.4.2 master响应
将map中存的该task的状态设置为completed,处理worker的运行结果:将map产生的文件内容保存到本地。如果所有任务都完成了,则创建reduce任务,进入到reduce阶段。
4.5 其他
reduce的操作与map基本相同。
4.6 容错
运行一个协程,每隔1秒判断一次,任务是否超时(当前时间减去任务的开始时间大于10秒。)
超时则处理该任务。
3 踩坑
TaskQueue chan *Task 的缓冲大小。
不能使用map元素的指针。