zoukankan      html  css  js  c++  java
  • mr

    大数据技术 —— MapReduce 简介

    本文为senlie原创,转载请保留此地址:http://www.cnblogs.com/senlie/

    1.概要
    很多计算在概念上很直观,但由于输入数据很大,为了能在合理的时间内完成,这些计算
    必须分布在数以百计数以千计的机器上。例如处理爬取得到的文档、网页请求日志来计算
    各种衍生数据,如倒排索引,网页文档的各种图结构表示,从每个主机上爬取的文档数,
    在某一天最频繁的查询的集合。

    MapReduce 是为处理和生成大数据集的编程模式和相应的实现。
    用户指定一个 map 函数来处理一个键值对来生成一个键值对的集合,
    和一个 reduce 函数来合并具有相同中间键的实值。

    例如,有大一堆文档,要统计里面每一个文档的出现的次数。可以这样写map 函数和 reduce 函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    map(String key, String value):
        //key: document name
        //value: document contents
        for each word w in value:
            EmitIntermediate(w, '1');
    reduce(String key, Iterator values):
        //key: a word
        //values: a list of counts
        int result = 0;
        for each v in values:
            result += ParseInt(v);
        Emit(AsString(result));

      

    ??疑问:map 返回的是一个 key/value ,为什么到了 resuce 这的输入却变成了 key/list of values ,这中间
    发生了什么?
    解答:
    map 函数接受一个键值对(如上面例子中的文档名/文档内容)并产生一组键值对(单词/1)。在将这组
    键值对传给 reduce 函数之前, MapReduce 库会组合所有具有相同键值的实值产生新的一组键/值(单词/次数)。
    reduce 函数接受来自多个 map 函数产生的键值对,它们在被 reduce 函数处理前,会先被 MapReduce 库组合成
    键/值列表(单词/次数列表)。下图解释了这一过程。
    (声明:图来自实验室 adonis 同学的 seminar 展示ppt)

    2.MapReduce 的执行的大概流程
    通过将输入数据划分为 M 个分片, map 函数的调用分布在多台机器上,这些分片可同
    不同的机器并行地处理。
    通过将中间结果的键空间划分为 R 个分片, reduce 函数的调用分布在多台机器上。
    下图展示了 MapReduce 操作的整个流程。

    1). 客户程序中的 MapReduce 库首先将输入文件分成 M 个大小通常为 16MB 或者64MB 的分片。
    然后开始在集群上的机器复制客户程序
    2).其中有一个程序的备份是特殊的,它就是主节点。其它的是由主节点分配任务的从节点。
    主节点有 M 个 map 任务和 R 个 reduce 任务要分配给那些空闲的从节点。
    3).一个被分配了 map 任务的从节点从输入分片中读取内容,然后从输入中解析出键值对被传递给
    用户定义的 map 函数,由它来产生中间结果的键值对并缓存在内存中
    4).在内存中的键值对被周期性地写入到本地磁盘,通过分片函数被分成 R 个分片。
    这些分片的位置被回传给主节点,由主节点告诉 reduce 从节点它们的位置
    5).当 reduce 从节点被主节点告知分片的位置时,它从使用 RPC(remote procedure call) 去读取
    那些缓存数据,当读完后,它会按键值进行排序,然后将有相同键值的键值对组合在一起,形成键/值列表
    6).reduce 从节点遍历已经排序合并好了的中间数据,将每一个键/值列表对传递给客户定义的 reduce 函数。
    reduce 函数返回的结果被添加到这个 reduce 从节点的结果文件中。
    7).当所有 map 从节点和 reduce 从节点完成后,主节点唤醒客户程序。
    如果 MapReduce 程序成功完成,结果文件被存储在 R 个输出文件中。

    3.示例
    这个示例统计了一组输入文件里每个单词的出现次数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    #include "mapreduce/mapreduce.h"
    //user's map function
    class WordCounter : public Mapper{
    public:
        virtual void Map(const MapInput &input){
            const string &text = input.value();
            const int n = text.size();
            for(int i = 0; i < n; ){
                //忽略单词前空格
                while(i < n && isspace(text[i])) i++;
                //找到单词的结尾
                int start = i;
                while(i < n && !isspace(text[i])) i++;
                if(start < i) Emit(text.substr(start, i - start), "1");
                 
            }
        }
    };
    REGISTER_MAPPER(WordCounter); // 这个是干嘛用的??
     
    //User's  reduce function
    class Adder : public Reducer {
        // 这里不用加个 public 的关键字?
        virtual void Reduce(ReduceInput *input){
            //把有相同键值的数值加起来
            int64 value = 0;
            while(!input->done()){
                value != StringToInt(input->value());
                input->NextValue();
            }
            Emit(IntToString(value));  
        }
    }
    REGISTER_REDUCER(Adder);
     
    int main(int argc, char **argv){
        ParseCommandLineFlags(argc, argv);
        MapReduceSpecification spec;
         
        //把输入文件列表存入 "spec"
        for(int i = 1; i < argc; i++){
            MapReduceInput *input = spec.add_input();
            input->set_format("text");
            input->set_filepattern(argv[i]);
            input->set_mapper_class("WordCounter");
        }
        //指定输出文件
        MapReduceOutput *out = spec.output();
        out->set_filebase("gfs/test/freq");
        out->set_num_tasks(100);
        out->set_format("text");
        out->set_reducer_class("Adder");
         
        //可选:在 map 节点中做部分和运算以节省带宽
        out->set_combiner_class("Adder");
         
        //调节参数:使用最多2000台机器,每个任务最多100MB内存
        spec.set_machines(2000);
        spec.set_map_megabytes(100);
        spec.set_reduce_megabytes(100);
         
        //开跑
        MapReduceResult result;
        if(!MapReduce(spec, &result)) abort();
         
        //失败的时候 abort, 能运行在这里就是成功了。
        return 0;
    }
  • 相关阅读:
    url编码
    客户端安全-xss-1类型介绍
    阿里云扩容教程
    jquery获取和设置表单数据
    uMlet建模工具
    phpstorm的调试工具xdebug
    服务器如何处理http请求
    http基础实战
    协程
    Goroutine(协程)为何能处理大并发?
  • 原文地址:https://www.cnblogs.com/lcword/p/9486355.html
Copyright © 2011-2022 走看看