zoukankan      html  css  js  c++  java
  • mapReduce实现

    1.3 lab1我的版本

    0 前置知识-MapReduce

    0.1 MapReduce的出现背景与应用场景

    MapReduce的思想是,应用程序设计人员和分布式运算的使用者,只需要写简单的Map函数和Reduce函数,而不需要知道任何有关分布式的事情,MapReduce框架会处理剩下的事情。

    分布式Grep:map函数在匹配到给定的pattern时输出一行。reduce函数只是将给定的中间数据复制到输出上。

    URL访问频次统计:map函数处理网页请求的日志,对每个URL输出〈URL, 1〉。reduce函数将相同URL的所有值相加并输出〈URL, 总次数〉对。

    倒转Web链接图:map函数在source页面中针对每个指向target的链接都输出一个〈target, source〉对。reduce函数将与某个给定的target相关联的所有source链接合并为一个列表,并输出〈target, list(source)〉对。

    倒排索引:map函数解析每个文档,并输出一系列〈单词, 文档ID〉对。reduce函数接受给定单词的所有中间对,将它们按文档ID排序,再输出〈单词, list(文档ID)〉对。所有输出对的集合组成了一个简单的倒排索引。用户可以很轻松的扩展这个过程来跟踪单词的位置。

    分布式排序:map函数从每条记录中提取出key,并输出〈key, 记录〉对。reduce函数不改变这些中间对,直接输出。这个过程依赖于4.1节介绍的划分机制和4.2节介绍的排序性质。

     

    0.2 编程模型

    用户自定义的Map函数 接收一个输入的key/value pair值,然后产生一个中间key/value pair值的集合。

    MapReduce库把所有具有相同key值的value值集合在一起,传递给reduce函数。

    用户自定义的Reduce函数接受一个key值I和相关的一个value值的集合。Reduce函数合并这些value值,形成一个较小的value值的集合。一般,每次Reduce函数调用只产生0或1个输出value值。通常我们通过一个迭代器把中间value值提供给Reduce函数,这样我们就可以处理无法全部放入内存中的大量的value值的集合。

    0.3 举例

    统计多个文件中每个单词的出现次数: 对每个文件调用map函数,最后对所有map函数的输出调用reduce函数。

    1、splitting

    MapReduce库将输入的文件拆分成M个数据分片,每个数据片段的大小一般从 16MB到64MB(可以通过可选的参数来控制每个数据片段的大小),

    将这些小块发送到不同的机器上(对应一个worker)

    2、mapping

    每个机器对拿到的数据,运行map函数

    入参key是文件名,value是文件内容,

    输出一个key-value对(中间结果),key是该单词,value是1。如 "black":1, "black":1,"white":1 这种。

    3、shuffling

    reduce阶段 : 将所有map产生的内容汇总。 变成 key:list(value)的形式(中间结果)

    4、reduce

    用户编写的reduce函数处理中间结果,形成输出。

    0.4 实现

    image 1、用户程序调用MapReduce库,MapReduce库将输入文件分成M个数据片,每个数据片段的大小一般从 16MB到64MB(可以通过可选的参数来控制每个数据片段的大小)。这些分片发送给不同的机器, 每个机器都启动一个worker。 2、程序副本中的有一个特殊的程序–master。副本中其它的程序都是worker程序。 由master分配任务。有M个Map任务和R个Reduce任务将被分配。 3、被分配了map任务的worker程序读取相关的输入数据片段,从输入的数据片段中解析出key/value pair,然后把key/value pair传递给用户自定义的Map函数,由Map函数生成并输出的中间key/value pair,并缓存在内存中。 4、缓存中的key/value pair通过分区函数分成R个区域,之后周期性的写入到本地磁盘上(图中切成了2块)。缓存的key/value pair在本地磁盘上的存储位置将被回传给master,由master负责把这些存储位置再传送给Reduce worker。 5、当Reduce worker程序接收到master程序发来的数据存储位置信息后,使用RPC从Map worker所在主机的磁盘上读取这些缓存数据。当Reduce worker读取了中间数据后,通过对key进行排序后使得具有相同key值的数据聚合在一起。由于许多不同的key值会映射到相同的Reduce任务上,因此必须进行排序。如果中间数据太大无法在内存中完成排序,那么就要在外部进行排序。 6、Reduce worker程序遍历排序后的中间数据,对于每一个唯一的中间key值,Reduce worker程序将这个key值和它相关的中间value值的集合传递给用户自定义的Reduce函数。Reduce函数的输出被追加到所属分区的输出文件。 7、当所有的Map和Reduce任务都完成之后,master唤醒用户程序。在这个时候,在用户程序里的对MapReduce调用才返回。

    在成功完成任务之后,MapReduce的输出存放在R个输出文件中(对应每个Reduce任务产生一个输出文件,文件名由用户指定)。一般情况下,用户不需要将这R个输出文件合并成一个文件–他们经常把这些文件作为另外一个MapReduce的输入,或者在另外一个可以处理多个分割文件的分布式应用中使用。

    0.5 容错

    worker失效

    worker故障

    master周期性的ping每个worker。如果在一个约定的时间范围内没有收到worker返回的信息,master将把这个worker标记为失效。

    map失效后:

    • 已完成的map task标记为idle。当worker故障时,由于已经完成的Map任务的输出存储在这台机器上,Map任务的输出已不可访问了,因此必须重新执行。

    • 已完成的reduce task不用变。已经完成的Reduce任务的输出存储在全局文件系统上(GFS),因此不需要再次执行。

    当一个Map任务首先被worker A执行,之后由于worker A失效了又被调度到worker B执行,这个“重新执行”的动作会被通知给所有执行Reduce任务的worker。任何还没有从worker A读取数据的Reduce任务将从worker B读取数据。

     

    master失效

    周期性备份数据。

    master周期性的将上面描述的数据结构写入磁盘,即检查点(checkpoint)。如果这个master任务失效了,可以从最后一个检查点(checkpoint)开始启动另一个master进程。

     

    0.6 备用任务

    如果最后的几个任务执行时间过长怎么办?存在这种case,10个任务用5分钟完成了其中9个,但最后一个任务因为当前机器的负载过高花费了20分钟执行完毕,这么整个任务的执行周期就是20分钟。 如何能应对这一问题呢?

    当仅剩下1%的任务时,可以启动备用任务,即同时在两个节点上执行相同的任务。这样只要其中一个先返回即可结束整个任务,同时释放未完成的任务所占用的资源。

    1 实验要求

    实现一个分布式 MapReduce,它由两个程序组成,master 和 worker。只有一个主进程和一个或多个并行执行的工作进程。Master 应该注意到一个 worker 是否没有在合理的时间内完成它的任务(对于本实验,使用 10 秒),并将相同的任务分配给不同的 worker。

    2 程序设计

    主要包括三个文件

    • master 与 Worker 间的 RPC 通信,对应的rpc.go文件

    • master对应 mr/master.go

    • Worker mr/worker.go

     

    数据结构

    task信息

    拆成两部分,MasterStoreTask包含Task。 前者存在master里,后者传给worker。前者包含的信息更多,这样做降低了网络的传输量。

    type Task struct {
      TaskStage     Stage   //task的类型
      NReducer      int
      TaskNumber    int

      Input         string
      Intermediates []string //存
      Output        string
    }
    //task的其他信息,保存在master即可,不需要通过rpc在master和worker之间传输。降低消耗
    type MasterStoreTask struct {
      TaskStatus    Status  //idle 等等
      StartTime     time.Time   //记录该task的启动时间
      TaskBody *Task  //Task本体
    }

     

    master

    type Master struct {
      TaskQueue     chan *Task        ///task队列,保存所有等待执行的task。使用buffered channel作为queue存的task指针
      TaskMap     map[int]*MasterStoreTask // 保存当前所有task的信息。key:为task的idx   value:MasterStoreTask指针,
      MasterPhase   Stage              // Master当前所处的阶段
      NReduce       int
      InputFiles   []string
      Intermediates [][]string // Map任务产生的R个中间文件的信息
    }

     

    执行流程

    1.创建master,初始化各种参数m.MakeMaster()

    坑:TaskQueue的大小为max(nReduce, len(files)),而非 len(files)

    原因:该队列存的是当前等待运行的任务数量,map阶段的值是len(files),reduce阶段的值是nReduce。默认设置时应该二者取其大。

    2.创建map任务

    每个文件创建一个map任务。

    3.等待连接

    4.worker

    死循环,不停的向master索要任务。根据任务类型,决定下一步的操纵。

    4.1 worker索要任务

    Master.GetTask

    4.2 master响应

    返回一个任务

    Master.GetTask
    入参:ExampleArgs
    出参:Task{}

    4.3 worker处理

    根据task.TaskStage 判断任务类型:map reduce wait exit

    4.3.1 mapper函数

    接收切分后的input文件,执行mapf函数,并将输出得到的结果切成R份,存到本地磁盘。

    4.4.1通知master任务完成

    call("Master.TaskCompleted", task, &reply)

    4.4.2 master响应

    将map中存的该task的状态设置为completed,处理worker的运行结果:将map产生的文件内容保存到本地。如果所有任务都完成了,则创建reduce任务,进入到reduce阶段。

    4.5 其他

    reduce的操作与map基本相同。

    4.6 容错

    运行一个协程,每隔1秒判断一次,任务是否超时(当前时间减去任务的开始时间大于10秒。)

    超时则处理该任务。

     

    3 踩坑

     TaskQueue chan *Task 的缓冲大小。

    不能使用map元素的指针。

     

  • 相关阅读:
    POJ 1811 Prime Test 素性测试 分解素因子
    sysbench的安装与使用
    电脑中已有VS2005和VS2010安装.NET3.5失败的解决方案
    I.MX6 show battery states in commandLine
    RPi 2B Raspbian system install
    I.MX6 bq27441 driver porting
    I.MX6 隐藏电池图标
    I.MX6 Power off register hacking
    I.MX6 Goodix GT9xx touchscreen driver porting
    busybox filesystem httpd php-5.5.31 sqlite3 webserver
  • 原文地址:https://www.cnblogs.com/zyhe/p/15680304.html
Copyright © 2011-2022 走看看