简介
这个实验要求我们使用 golang 自行实现一个支持并发和简单容错的 分布式 MR ,支持一个 master 多个 wokrer 的工作模式。
我做完实验之后去看了一下之前网上发布的一些 lab1 的作业。感觉今年是不是改版了?后来我去看了一下 2018 年的 lab1 作业,果然改版了。。。。。。不过本身也没打算参考网上的作业。就在这篇里面记录一些关键点吧。
因为我本人并不会 golang,是临时学习了一下。所以整个实验搞了 3个晚上才通过所有测试。
过程
实验提供了一些现成的文件,比如 mr 的 application 都是现成的我们可以通过 go build -buildmode=plugin 把他们编译成动态链接库直接调用他们用以测试我们的 mapreduce,我们只需要重点修改三个文件 master | worker | rpc 里面的内容即可。代码里面也提供了现成的 master 启动入口和 woker 启动入口。
下面我就挑我觉得有必要说一下的地方谈一下,不遍历整个 lab1 的过程了。
首先要实现一个在 master 中的 MakeMaster 的函数,这个 MakeMaster 相当于初始化我们的 master 对象上的值参数,做一些准备。
master 结构体定义
const ( INIT = 0 PROCESS = 1 FINISHED = 2 ) type Master struct { // Your definitions here. Contents map[string]string // MAP task runner control FinishedStateMap map[string]int // REDUCE task runner control FinishedStateReduce map[int]int // FilenameList filenames []string // FilenameOriginMap filenamesOrigin map[string]string IsMapFinished bool IsReduceFinished bool ReduceTaskCounter int CoarsnessMapFailControl int CoarsnessReduceFailControl int MM sync.Mutex }
初始化 master 并开启 master 服务器监听
import (
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/rpc"
"os"
"path"
"strconv"
"sync"
)
func MakeMaster(files []string, nReduce int) *Master { m := Master{ Contents: make(map[string]string), FinishedStateMap: make(map[string]int), FinishedStateReduce: make(map[int]int), IsMapFinished: false, IsReduceFinished: false, filenames: []string{}, filenamesOrigin: make(map[string]string), CoarsnessMapFailControl: 0, CoarsnessReduceFailControl: 0, ReduceTaskCounter: nReduce, MM: sync.Mutex{}, } // init map state for _, filename := range files { baseFilename := path.Base(filename) file, err := os.Open(filename) if err != nil { log.Fatalf("cannot open %v", filename) } content, err := ioutil.ReadAll(file) if err != nil { log.Fatalf("cannot read %v", filename) } m.Contents[baseFilename] = string(content) m.FinishedStateMap[baseFilename] = INIT m.filenames = append(m.filenames, baseFilename) m.filenamesOrigin[baseFilename] = filename m.CoarsnessMapFailControl = 0 m.CoarsnessReduceFailControl = 0 file.Close() } // init reduce state maxR := m.ReduceTaskCounter i := 0 for ; i < maxR; i++ { m.FinishedStateReduce[i] = INIT } // print now have something fmt.Printf("Master start listen ") m.server() return &m }
我分别说下这里 master 结构体里面定义参数以及我的想法。
Contens: key 是文件名,value 是文本。这里主要是建立一个 filename 和 文本文件的映射关系,方便需要的时候对其进行内容的遍历。因为 master 涉及到下发工作,所以当 woker 来向 master 申请任务的时候我们需要将这些内容下发给 woker 进行处理。
FinishedStateMap: key 是 map 任务的任务名(这里我使用的文件名做 map 任务的任务名),value 是三个状态 INIT PROCESS FINISHED。这个状态控制器用于控制 map 任务的状态情况监控。
FinishedStateReduce: key 是 reduce 任务的任务名(这里因为后面输出文件测试要求的关系,直接使用了 reduce 任务的编号 0 1 2 3 .....),value 是三个状态 INIT PROCESS FINISHED。这个状态控制器用于控制 reduce 任务的状态情况监控。
Filenames: 用于存储所有被访问的 input 文件名称。因为使用了文件名称作状态的 key 所以得维护这么一个东西,其实想想应该给 Map 任务搞个编号的。
FilenamesOrigin: 用于存储所有被访问的 input 文件名称的 basefile 名称。因为测试文件里面都是使用的相对路径访问的文件,文件名称里面都有 ../../ 这种东西,而且不能修改测试文件和原文件所以就搞了一个这个方便映射。
IsMapFinished: 标记是否所有 Map 任务都已经完成。MR 中必须要等所有 Map 任务都完成了才可以开始 Reduce 任务。
IsReduceFinished: 标记是否所有 Reduce 任务都已经完成。 MR 中必须要等所有的 Reduce 任务都完成了才会结束。
ReduceTaskCounter: 标记一共需要几个 Reduce 启动,一共输出几个文件。因为调用 MakeMaster 函数的地方有传入最后使用几个 Reducer 输出文件,所以这里存一下。
CoarsnessMapFailControl: 粗粒度 Map 控制器,因为测试有容错需求。所以这里用于粗略控制一下超时。具体实现下面我会说到。
CoarsnessReduceFailControl: 粗粒度 Reduce 控制器,因为测试有容错需求。所以这里用于粗略控制一下超时。具体实现下面我会说到。
MM: 一把排它锁。
所以整个设计的整体思想都围绕着这一张图
刚才做的工作就是启动 Master 让后把刚服务需要用到的数据都准备好,然后开始监听 woker 服务。
import "../mr" import "plugin" import "os" import "fmt" import "log" func main() { if len(os.Args) != 2 { fmt.Fprintf(os.Stderr, "Usage: mrworker xxx.so ") os.Exit(1) } mapf, reducef := loadPlugin(os.Args[1]) mr.Worker(mapf, reducef) }
启动 woker 服务,并解析动态链接库传进来的 mapf reducef 传递给 woker
worker.go file
import ( "encoding/json" "fmt" "hash/fnv" "io/ioutil" "log" "net/rpc" "os" "strconv" "time" ) const PREFIX = "mr-" const MRPREFIX = "mr-out-" // // Map functions return a slice of KeyValue. // type KeyValue struct { Key string Value string } // // use ihash(key) % NReduce to choose the reduce // task number for each KeyValue emitted by Map. // func ihash(key string) int { h := fnv.New32a() h.Write([]byte(key)) return int(h.Sum32() & 0x7fffffff) } // // main/mrworker.go calls this function. // func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { // Your worker implementation here. for { flag := RunTaskCall(mapf, reducef) time.Sleep(1 * time.Second) if flag == 0 { break } } // uncomment to send the Example RPC to the master. // CallExample() } func RunTaskCall(mapf func(string, string) []KeyValue, reducef func(string, []string)string) int { args := RequestJobArgs{} reply := ReplyJobArgs{} rAckargs := RequestAckArgs{} replayAckargs := ReplyAckArgs{} // ask for a task call("Master.AskForJob", &args, &reply) // if task finished then end this program if reply.JobType == "finished" { fmt.Println("No tasks worker exit") return 0 } // get a map task if reply.JobType == "map" { fmt.Println("Worker start a map task") return RunMapTask(args, reply, rAckargs, replayAckargs, mapf) // get a reduce task } else if reply.JobType == "reduce" { fmt.Println("Woker start reduce task") return RunReduceTask(args, reply, rAckargs, replayAckargs, reducef) } return 1 } func RunMapTask(args RequestJobArgs, reply ReplyJobArgs, rAckargs RequestAckArgs, replayAckargs ReplyAckArgs, mapf func(string, string) []KeyValue) int { fileMap := []KeyValue{} filename := reply.Contents[0] filenameOriginMap := reply.ExtraInfo["filenamesOriginMap"] values := reply.Contents[1] target := mapf(filenameOriginMap[filename], values) f, _ := os.OpenFile(PREFIX+filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666) defer f.Close() // write json intermediate results into file for _, v := range target { fileMap = append(fileMap, KeyValue{v.Key, v.Value}) } // open file and write file after use buffer _fileMap, err := json.Marshal(fileMap) if err != nil { log.Fatal("%v", err)} _, err = f.Write(_fileMap) if err != nil { log.Fatalf("%v", err)} f.Close() fmt.Println("完成一次 Map") rAckargs.JobType = reply.JobType rAckargs.Filename = filename call("Master.ConfirmState", &rAckargs, &replayAckargs) fmt.Printf("Confirm finished map task %v ", filename) return 1 } func RunReduceTask(args RequestJobArgs, reply ReplyJobArgs, rAckargs RequestAckArgs, replayAckargs ReplyAckArgs, reducef func(string, []string)string) int { _results := make(map[string][]string) filenames := reply.Contents reduceKey := reply.ReduceKey ReduceTaskCounter := reply.ReduceTaskCounter outputFile, _ := os.OpenFile(MRPREFIX + strconv.Itoa(reduceKey), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) defer outputFile.Close() // iter all files match reduce no for _, v := range filenames { f, err := os.Open(PREFIX+v) if err != nil { log.Fatalf("cant open file: %v ", err) } target := []KeyValue{} contents, err := ioutil.ReadAll(f) err = json.Unmarshal(contents, &target) if err != nil { log.Fatalf("json decode fail %v ", err) } // combine same key values for _, v := range target { if ihash(v.Key) % ReduceTaskCounter == reduceKey { _results[v.Key] = append(_results[v.Key], v.Value) } } } for k, v := range _results { fmt.Fprintf(outputFile, "%v %v ", k, reducef(k, v)) } fmt.Println("Finished onece Reduce") rAckargs.JobType = reply.JobType rAckargs.Filename = strconv.Itoa(reduceKey) call("Master.ConfirmState", &rAckargs, &replayAckargs) fmt.Printf("Confirm finished reduce task %v ", reduceKey) return 1 }
master file
func (m *Master) AskForJob(args *RequestJobArgs, reply *ReplyJobArgs) error { fmt.Println("Start AskForJob") m.MM.Lock() defer m.MM.Unlock() reply.ReduceTaskCounter = m.ReduceTaskCounter reply.ExtraInfo = make(map[string]map[string]string) reply.ExtraInfo["filenamesOriginMap"] = m.filenamesOrigin if m.IsReduceFinished == true { reply.Success = 1 reply.JobType = "finished" return nil } if m.IsMapFinished == false { m.CoarsnessMapFailControl++ // crash safe map reset state if m.CoarsnessMapFailControl >= 30 { for k, v := range m.FinishedStateMap { if v == PROCESS { m.FinishedStateMap[k] = INIT continue } } m.CoarsnessMapFailControl = 0 } fmt.Println("give a map task to worker") for k, v := range m.FinishedStateMap { if v == INIT { reply.Success = 1 reply.JobType = "map" reply.Contents = append(reply.Contents, k, m.Contents[k]) m.FinishedStateMap[k] = PROCESS fmt.Printf("assignment %v task key: %v ", reply.JobType, k) break } } return nil } else if m.IsReduceFinished == false { // crash safe reduce reset state m.CoarsnessReduceFailControl++ if m.CoarsnessReduceFailControl >= 30 { for k, v := range m.FinishedStateReduce { if v == PROCESS { m.FinishedStateReduce[k] = INIT continue } } m.CoarsnessReduceFailControl = 0 } fmt.Println("give a reduce task to worker") for k, v := range m.FinishedStateReduce { if v == INIT { reply.Success = 1 reply.JobType = "reduce" reply.Contents = m.filenames reply.ReduceKey = k m.FinishedStateReduce[k] = PROCESS fmt.Printf("assignment %v reduce key: %v ", reply.JobType, k) break } } } return nil } func (m *Master) ConfirmState(args *RequestAckArgs, reply *ReplyAckArgs) error { m.MM.Lock() defer m.MM.Unlock() filename := args.Filename if args.JobType == "map"{ m.FinishedStateMap[filename] = FINISHED for _, v := range m.FinishedStateMap { if v == INIT || v == PROCESS { return nil } continue } m.IsMapFinished = true } else if args.JobType == "reduce" { filename, _ := strconv.Atoi(filename) m.FinishedStateReduce[filename] = FINISHED for _, v := range m.FinishedStateReduce { if v == INIT || v == PROCESS { return nil } continue } m.IsReduceFinished = true } return nil }
代码贴了不少,来说下整个 mr 的设计思路。
1. master 起来之后,worker 需要先去执行 map 任务。那么我们让 woker 通过 rpc 去找 master 要任务来执行。
call("Master.AskForJob", &args, &reply)
这是申请任务的 rpc 不管是 Map 任务还是 Reduce 任务都是从这里申请。是发放 Map 任务还是 Reduce 任务由 master 来决策和控制。
2. 当 worker 拿到 master 下发的 Map 任务之后将输入文件 key 对应的 FinishedStateMap 值置为 PROCESS 以保证后面过来申请任务的 woker 不会执行相同的任务。(这里设置时要使用排它锁进行锁定)
获取任务中间还有一些使用粗粒度计数器来进行超时控制的判断(这些都可以使用更细粒度的超时计算方法,这里只是单纯实现一下类似的功能大家知道意思就行)。
3. 我们 worker 拿到被分配的任务之后调用对应 jobType 的 task runner 执行对应任务。
4. woker 将对应的 input 读入,然后运行动态链接库传入的 map 函数进行执行,获得 map 之后的结果,将结果 json 序列化之后写入到本地存储。
5. 反复执行步骤4 直到所有 map 任务被 ack 到 FINISHED 状态。
6. 继续申请任务 master 将开始分发 reduce 任务。
7. reduce 任务将通过 hash 分为 m.ReduceTaskCounter 数量的桶中,并且执行相同次数的 reduce 输出。每个被丢进桶里的数据都是单个 reduce 任务拉取所有中间结果进行输出和合并的(这个中间有个 shuffle 的步骤)。这里有个问题,为什么我们不在 map 任务的时候将结果进行分组到对应数量的 reduce 中。这样 reduce 在读取的时候就可以读取单个文件进行统计排序了。其实也不是不行但是有个很坑的问题是,这样的话每个被写入的文件是由多个 map 任务进行执行。如果中间有任何一个 map 任务失败进行重试了。就可能写入重复的数据到那些文件里面去了。这样 failover cover 起来就复杂了。但是如果我们让 map 任务只是单纯 map 完毕并将结果存到对应的文件中。我们就可以无限次使用幂等操作重复这个 map 操作而不用担心数据混乱。
8. reduce 读取所有中间结果汇总起来输出跟自己桶对应的输出文件。
9. 读取这些结果汇总输出最后的结果。
总结
感觉整个过程中还是要牢记论文中的几个关键点:
1. 怎么去 shuffle 数据再交给 reduce 保证交给同一个 Reduce 的数据的幂等性。lab1 中提供给了我们一个 ihash 函数帮助我们对 key 进行 hash 然后根据 reducer 的数量来 shuffle。
func ihash(key string) int { h := fnv.New32a() h.Write([]byte(key))
// 和 0x7fffffff 取 & 是为了处理负数情况。 return int(h.Sum32() & 0x7fffffff) }
if ihash(v.Key) % ReduceTaskCounter == reduceKey { _results[v.Key] = append(_results[v.Key], v.Value) }
2. mapreduce 的本质是
map (k1,v1) → list(k2,v2) => shuffle => reduce (k2,list(v2)) → list(v2)
拿 wordcount 应用来说
K1, V1 是 filename,文章 => list[(word, 1), xxxxx,xx]
下面代码 if 部分就是在 shuffle
if ihash(v.Key) % ReduceTaskCounter == reduceKey { _results[v.Key] = append(_results[v.Key], v.Value) }
计算出 (k2 => list(v2)) 的部分就是 if 内部的部分
这里的 k2 是 word, v2 是这个文件中的 key 对应出现次数的 list
k2, list(v2) 会是这种数据造型
wrongs:[1 1 1] wry:[1] yearly:[1] yell:[1 1 1 1 1 1 1 1 1]
最终通过 reduce 函数输出 list(v2) reduce 函数的传入值就是 k2, list(v2) 然后通过自己定义的 reduce 函数算出自己想要的结果。
wc 实现了一个这样的函数 只是单纯的将各 list(v2) 汇总了
func Reduce(key string, values []string) string { // return the number of occurrences of this word. return strconv.Itoa(len(values)) }
最后结果 list(v2) 应该长这样 (3, 1, 1, 9 ) 但是我们可以将他打印结合 key 就变成了
wrongs 3
wry 1
这样的我们想要的输出了。
我们写的这个 mr 程序要能完全匹配这一规则,才能做到接口提供好了所有 mrapp 都可以使用。因为 test-case 包含了好几个 mr 的经典场景包括 wc wordcount | indexer 索引倒排索引统计。
以上
(下一个实验就 raft 了有点小激动呢)
Refenrece:
http://nil.csail.mit.edu/6.824/2020/labs/lab-mr.html lab1 描述
http://nil.csail.mit.edu/6.824/2020/papers/mapreduce.pdf mapreduce 论文
https://blog.csdn.net/JasonDing1354/article/details/46882597 mapreduce shuffle 和 spark shuffle 机制介绍