zoukankan      html  css  js  c++  java
  • 6.824 Spring 2020 feb14 Lab1MapReduce 笔记

    简介

    这个实验要求我们使用 golang 自行实现一个支持并发和简单容错的 分布式 MR ,支持一个 master 多个 wokrer 的工作模式。

    我做完实验之后去看了一下之前网上发布的一些 lab1 的作业。感觉今年是不是改版了?后来我去看了一下 2018 年的 lab1 作业,果然改版了。。。。。。不过本身也没打算参考网上的作业。就在这篇里面记录一些关键点吧。

    因为我本人并不会 golang,是临时学习了一下。所以整个实验搞了 3个晚上才通过所有测试。

    过程

    实验提供了一些现成的文件,比如 mr 的 application 都是现成的我们可以通过 go build -buildmode=plugin 把他们编译成动态链接库直接调用他们用以测试我们的 mapreduce,我们只需要重点修改三个文件 master | worker | rpc 里面的内容即可。代码里面也提供了现成的 master 启动入口和 woker 启动入口。

    下面我就挑我觉得有必要说一下的地方谈一下,不遍历整个 lab1 的过程了。

    首先要实现一个在 master 中的 MakeMaster 的函数,这个 MakeMaster 相当于初始化我们的 master 对象上的值参数,做一些准备。

    master 结构体定义

    const (
        INIT = 0
        PROCESS = 1
        FINISHED = 2
    )
    
    
    type Master struct {
        // Your definitions here.
        Contents                      map[string]string
    
        // MAP task runner control
        FinishedStateMap            map[string]int
        // REDUCE task runner control
        FinishedStateReduce            map[int]int
    
        // FilenameList
        filenames                    []string
        // FilenameOriginMap
        filenamesOrigin                map[string]string
    
        IsMapFinished                bool
        IsReduceFinished            bool
    
        ReduceTaskCounter           int
    
        CoarsnessMapFailControl        int
        CoarsnessReduceFailControl    int
    
        MM                         sync.Mutex
    }

    初始化 master 并开启 master 服务器监听

    import (
    "fmt"
    "io/ioutil"
    "log"
    "net"
    "net/http"
    "net/rpc"
    "os"
    "path"
    "strconv"
    "sync"
    )
    func MakeMaster(files []string, nReduce int) *Master {
        m := Master{
            Contents: make(map[string]string),
            FinishedStateMap: make(map[string]int),
            FinishedStateReduce: make(map[int]int),
            IsMapFinished: false,
            IsReduceFinished: false,
            filenames: []string{},
            filenamesOrigin: make(map[string]string),
            CoarsnessMapFailControl: 0,
            CoarsnessReduceFailControl: 0,
    
            ReduceTaskCounter: nReduce,
    
            MM: sync.Mutex{},
        }
    
        // init map state
        for _, filename := range files {
            baseFilename := path.Base(filename)
            file, err := os.Open(filename)
            if err != nil {
                log.Fatalf("cannot open %v", filename)
            }
    
            content, err := ioutil.ReadAll(file)
            if err != nil {
                log.Fatalf("cannot read %v", filename)
            }
    
            m.Contents[baseFilename] = string(content)
            m.FinishedStateMap[baseFilename] = INIT
            m.filenames = append(m.filenames, baseFilename)
            m.filenamesOrigin[baseFilename] = filename
            m.CoarsnessMapFailControl = 0
            m.CoarsnessReduceFailControl = 0
            file.Close()
        }
    
        // init reduce state
        maxR := m.ReduceTaskCounter
        i := 0
        for ; i < maxR; i++ {
            m.FinishedStateReduce[i] = INIT
        }
    
        // print now have something
        fmt.Printf("Master start listen
    ")
    
        m.server()
        return &m
    }

    我分别说下这里 master 结构体里面定义参数以及我的想法。

    Contens: key 是文件名,value 是文本。这里主要是建立一个 filename 和 文本文件的映射关系,方便需要的时候对其进行内容的遍历。因为 master 涉及到下发工作,所以当 woker 来向 master 申请任务的时候我们需要将这些内容下发给 woker 进行处理。

    FinishedStateMap: key 是 map 任务的任务名(这里我使用的文件名做 map 任务的任务名),value 是三个状态 INIT PROCESS FINISHED。这个状态控制器用于控制 map 任务的状态情况监控。

    FinishedStateReduce: key 是 reduce 任务的任务名(这里因为后面输出文件测试要求的关系,直接使用了 reduce 任务的编号 0 1 2 3 .....),value 是三个状态 INIT PROCESS FINISHED。这个状态控制器用于控制 reduce 任务的状态情况监控。

    Filenames: 用于存储所有被访问的 input 文件名称。因为使用了文件名称作状态的 key 所以得维护这么一个东西,其实想想应该给 Map 任务搞个编号的。

    FilenamesOrigin: 用于存储所有被访问的 input 文件名称的 basefile 名称。因为测试文件里面都是使用的相对路径访问的文件,文件名称里面都有 ../../ 这种东西,而且不能修改测试文件和原文件所以就搞了一个这个方便映射。

    IsMapFinished: 标记是否所有 Map 任务都已经完成。MR 中必须要等所有 Map 任务都完成了才可以开始 Reduce 任务。

    IsReduceFinished: 标记是否所有 Reduce 任务都已经完成。 MR 中必须要等所有的 Reduce 任务都完成了才会结束。

    ReduceTaskCounter: 标记一共需要几个 Reduce 启动,一共输出几个文件。因为调用 MakeMaster 函数的地方有传入最后使用几个 Reducer 输出文件,所以这里存一下。

    CoarsnessMapFailControl: 粗粒度 Map 控制器,因为测试有容错需求。所以这里用于粗略控制一下超时。具体实现下面我会说到。

    CoarsnessReduceFailControl: 粗粒度 Reduce 控制器,因为测试有容错需求。所以这里用于粗略控制一下超时。具体实现下面我会说到。

    MM: 一把排它锁。

    所以整个设计的整体思想都围绕着这一张图

     刚才做的工作就是启动 Master 让后把刚服务需要用到的数据都准备好,然后开始监听 woker 服务。

    import "../mr"
    import "plugin"
    import "os"
    import "fmt"
    import "log"
    
    func main() {
        if len(os.Args) != 2 {
            fmt.Fprintf(os.Stderr, "Usage: mrworker xxx.so
    ")
            os.Exit(1)
        }
    
        mapf, reducef := loadPlugin(os.Args[1])
    
        mr.Worker(mapf, reducef)
    }

    启动 woker 服务,并解析动态链接库传进来的 mapf reducef 传递给 woker 

    worker.go file
    import
    ( "encoding/json" "fmt" "hash/fnv" "io/ioutil" "log" "net/rpc" "os" "strconv" "time" ) const PREFIX = "mr-" const MRPREFIX = "mr-out-" // // Map functions return a slice of KeyValue. // type KeyValue struct { Key string Value string } // // use ihash(key) % NReduce to choose the reduce // task number for each KeyValue emitted by Map. // func ihash(key string) int { h := fnv.New32a() h.Write([]byte(key)) return int(h.Sum32() & 0x7fffffff) } // // main/mrworker.go calls this function. // func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { // Your worker implementation here. for { flag := RunTaskCall(mapf, reducef) time.Sleep(1 * time.Second) if flag == 0 { break } } // uncomment to send the Example RPC to the master. // CallExample() } func RunTaskCall(mapf func(string, string) []KeyValue, reducef func(string, []string)string) int { args := RequestJobArgs{} reply := ReplyJobArgs{} rAckargs := RequestAckArgs{} replayAckargs := ReplyAckArgs{} // ask for a task call("Master.AskForJob", &args, &reply) // if task finished then end this program if reply.JobType == "finished" { fmt.Println("No tasks worker exit") return 0 } // get a map task if reply.JobType == "map" { fmt.Println("Worker start a map task") return RunMapTask(args, reply, rAckargs, replayAckargs, mapf) // get a reduce task } else if reply.JobType == "reduce" { fmt.Println("Woker start reduce task") return RunReduceTask(args, reply, rAckargs, replayAckargs, reducef) } return 1 } func RunMapTask(args RequestJobArgs, reply ReplyJobArgs, rAckargs RequestAckArgs, replayAckargs ReplyAckArgs, mapf func(string, string) []KeyValue) int { fileMap := []KeyValue{} filename := reply.Contents[0] filenameOriginMap := reply.ExtraInfo["filenamesOriginMap"] values := reply.Contents[1] target := mapf(filenameOriginMap[filename], values) f, _ := os.OpenFile(PREFIX+filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666) defer f.Close() // write json intermediate results into file for _, v := range target { fileMap = append(fileMap, KeyValue{v.Key, v.Value}) } // open file and write file after use buffer _fileMap, err := json.Marshal(fileMap) if err != nil { log.Fatal("%v", err)} _, err = f.Write(_fileMap) if err != nil { log.Fatalf("%v", err)} f.Close() fmt.Println("完成一次 Map") rAckargs.JobType = reply.JobType rAckargs.Filename = filename call("Master.ConfirmState", &rAckargs, &replayAckargs) fmt.Printf("Confirm finished map task %v ", filename) return 1 } func RunReduceTask(args RequestJobArgs, reply ReplyJobArgs, rAckargs RequestAckArgs, replayAckargs ReplyAckArgs, reducef func(string, []string)string) int { _results := make(map[string][]string) filenames := reply.Contents reduceKey := reply.ReduceKey ReduceTaskCounter := reply.ReduceTaskCounter outputFile, _ := os.OpenFile(MRPREFIX + strconv.Itoa(reduceKey), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) defer outputFile.Close() // iter all files match reduce no for _, v := range filenames { f, err := os.Open(PREFIX+v) if err != nil { log.Fatalf("cant open file: %v ", err) } target := []KeyValue{} contents, err := ioutil.ReadAll(f) err = json.Unmarshal(contents, &target) if err != nil { log.Fatalf("json decode fail %v ", err) } // combine same key values for _, v := range target { if ihash(v.Key) % ReduceTaskCounter == reduceKey { _results[v.Key] = append(_results[v.Key], v.Value) } } } for k, v := range _results { fmt.Fprintf(outputFile, "%v %v ", k, reducef(k, v)) } fmt.Println("Finished onece Reduce") rAckargs.JobType = reply.JobType rAckargs.Filename = strconv.Itoa(reduceKey) call("Master.ConfirmState", &rAckargs, &replayAckargs) fmt.Printf("Confirm finished reduce task %v ", reduceKey) return 1 }
    master file
    func (m *Master) AskForJob(args *RequestJobArgs, reply *ReplyJobArgs) error { fmt.Println("Start AskForJob") m.MM.Lock() defer m.MM.Unlock() reply.ReduceTaskCounter = m.ReduceTaskCounter reply.ExtraInfo = make(map[string]map[string]string) reply.ExtraInfo["filenamesOriginMap"] = m.filenamesOrigin if m.IsReduceFinished == true { reply.Success = 1 reply.JobType = "finished" return nil } if m.IsMapFinished == false { m.CoarsnessMapFailControl++ // crash safe map reset state if m.CoarsnessMapFailControl >= 30 { for k, v := range m.FinishedStateMap { if v == PROCESS { m.FinishedStateMap[k] = INIT continue } } m.CoarsnessMapFailControl = 0 } fmt.Println("give a map task to worker") for k, v := range m.FinishedStateMap { if v == INIT { reply.Success = 1 reply.JobType = "map" reply.Contents = append(reply.Contents, k, m.Contents[k]) m.FinishedStateMap[k] = PROCESS fmt.Printf("assignment %v task key: %v ", reply.JobType, k) break } } return nil } else if m.IsReduceFinished == false { // crash safe reduce reset state m.CoarsnessReduceFailControl++ if m.CoarsnessReduceFailControl >= 30 { for k, v := range m.FinishedStateReduce { if v == PROCESS { m.FinishedStateReduce[k] = INIT continue } } m.CoarsnessReduceFailControl = 0 } fmt.Println("give a reduce task to worker") for k, v := range m.FinishedStateReduce { if v == INIT { reply.Success = 1 reply.JobType = "reduce" reply.Contents = m.filenames reply.ReduceKey = k m.FinishedStateReduce[k] = PROCESS fmt.Printf("assignment %v reduce key: %v ", reply.JobType, k) break } } } return nil } func (m *Master) ConfirmState(args *RequestAckArgs, reply *ReplyAckArgs) error { m.MM.Lock() defer m.MM.Unlock() filename := args.Filename if args.JobType == "map"{ m.FinishedStateMap[filename] = FINISHED for _, v := range m.FinishedStateMap { if v == INIT || v == PROCESS { return nil } continue } m.IsMapFinished = true } else if args.JobType == "reduce" { filename, _ := strconv.Atoi(filename) m.FinishedStateReduce[filename] = FINISHED for _, v := range m.FinishedStateReduce { if v == INIT || v == PROCESS { return nil } continue } m.IsReduceFinished = true } return nil }

    代码贴了不少,来说下整个 mr 的设计思路。

    1. master 起来之后,worker 需要先去执行 map 任务。那么我们让 woker 通过 rpc 去找 master 要任务来执行。

    call("Master.AskForJob", &args, &reply)

    这是申请任务的 rpc 不管是 Map 任务还是 Reduce 任务都是从这里申请。是发放 Map 任务还是 Reduce 任务由 master 来决策和控制。

    2. 当 worker 拿到 master 下发的 Map 任务之后将输入文件 key 对应的 FinishedStateMap 值置为 PROCESS 以保证后面过来申请任务的 woker 不会执行相同的任务。(这里设置时要使用排它锁进行锁定)

    获取任务中间还有一些使用粗粒度计数器来进行超时控制的判断(这些都可以使用更细粒度的超时计算方法,这里只是单纯实现一下类似的功能大家知道意思就行)。

    3. 我们 worker 拿到被分配的任务之后调用对应 jobType 的 task runner 执行对应任务。

    4. woker 将对应的 input 读入,然后运行动态链接库传入的 map 函数进行执行,获得 map 之后的结果,将结果 json 序列化之后写入到本地存储。

    5. 反复执行步骤4 直到所有 map 任务被 ack 到 FINISHED 状态。

    6. 继续申请任务 master 将开始分发 reduce 任务。

    7. reduce 任务将通过 hash 分为 m.ReduceTaskCounter 数量的桶中,并且执行相同次数的 reduce 输出。每个被丢进桶里的数据都是单个 reduce 任务拉取所有中间结果进行输出和合并的(这个中间有个 shuffle 的步骤)。这里有个问题,为什么我们不在 map 任务的时候将结果进行分组到对应数量的 reduce 中。这样 reduce 在读取的时候就可以读取单个文件进行统计排序了。其实也不是不行但是有个很坑的问题是,这样的话每个被写入的文件是由多个 map 任务进行执行。如果中间有任何一个 map 任务失败进行重试了。就可能写入重复的数据到那些文件里面去了。这样 failover cover 起来就复杂了。但是如果我们让 map 任务只是单纯 map 完毕并将结果存到对应的文件中。我们就可以无限次使用幂等操作重复这个 map 操作而不用担心数据混乱。

    8. reduce 读取所有中间结果汇总起来输出跟自己桶对应的输出文件。

    9. 读取这些结果汇总输出最后的结果。

    总结

    感觉整个过程中还是要牢记论文中的几个关键点:

    1. 怎么去 shuffle 数据再交给 reduce 保证交给同一个 Reduce 的数据的幂等性。lab1 中提供给了我们一个 ihash 函数帮助我们对 key 进行 hash 然后根据 reducer 的数量来 shuffle。

    func ihash(key string) int {
        h := fnv.New32a()
        h.Write([]byte(key))
    // 和 0x7fffffff 取 & 是为了处理负数情况。
    return int(h.Sum32() & 0x7fffffff) }
    if ihash(v.Key) % ReduceTaskCounter == reduceKey { _results[v.Key] = append(_results[v.Key], v.Value) }

    2. mapreduce 的本质是 

    map (k1,v1) → list(k2,v2) => shuffle => reduce (k2,list(v2)) → list(v2)

    拿 wordcount 应用来说

    K1, V1 是 filename,文章 => list[(word, 1), xxxxx,xx] 

    下面代码 if 部分就是在 shuffle

    if ihash(v.Key) % ReduceTaskCounter == reduceKey {
        _results[v.Key] = append(_results[v.Key], v.Value)
    }

    计算出 (k2 => list(v2)) 的部分就是 if 内部的部分

    这里的 k2 是 word, v2 是这个文件中的 key 对应出现次数的 list 

    k2, list(v2) 会是这种数据造型

     wrongs:[1 1 1] wry:[1] yearly:[1] yell:[1 1 1 1 1 1 1 1 1]

    最终通过 reduce 函数输出 list(v2) reduce 函数的传入值就是 k2, list(v2) 然后通过自己定义的 reduce 函数算出自己想要的结果。

    wc 实现了一个这样的函数 只是单纯的将各 list(v2) 汇总了

    func Reduce(key string, values []string) string {
        // return the number of occurrences of this word.
        return strconv.Itoa(len(values))
    }

    最后结果 list(v2) 应该长这样 (3, 1, 1, 9 ) 但是我们可以将他打印结合 key 就变成了

    wrongs 3

    wry 1 

    这样的我们想要的输出了。

    我们写的这个 mr 程序要能完全匹配这一规则,才能做到接口提供好了所有 mrapp 都可以使用。因为 test-case 包含了好几个 mr 的经典场景包括 wc wordcount | indexer 索引倒排索引统计。

    以上

    (下一个实验就 raft 了有点小激动呢) 

    Refenrece:

    http://nil.csail.mit.edu/6.824/2020/labs/lab-mr.html    lab1 描述

    http://nil.csail.mit.edu/6.824/2020/papers/mapreduce.pdf    mapreduce 论文

    https://blog.csdn.net/JasonDing1354/article/details/46882597    mapreduce shuffle 和 spark shuffle 机制介绍

  • 相关阅读:
    Vista修改网卡MAC地址防御ARP
    MySQL
    FirstWebSite
    Java
    python
    编码风格 标识符命名
    计算机体系结构基础
    编码风格 indent工具
    Linux发布web app 到Tomcat
    编码风格 函数
  • 原文地址:https://www.cnblogs.com/piperck/p/12342173.html
Copyright © 2011-2022 走看看