zoukankan      html  css  js  c++  java
  • MapReduce C++ Library

    MapReduce C++ Library for single-machine, multicore applications

    Distributed and scalable computing disciplines have recognized that immutable data, lock free access, and isolated data processing is not only inevitable across a number of machines, but has significant benefits for reliability and scalability. These benefit can be fostered in application design to improve reliability and take advantage of increasingly multi-core processor that are available on end-user devices as well as server machines.

    MapReduce is an architecture framework designed for creating and processing large volumes of data using clusters of computers. For background information on MapReduce see Software Scalability with MapReduce.

    An important development from Google's original paper is in applying MapReduce to parallel processing on multi-core environments such as multi-core and multi-processor machines and graphics processors (GPUs).

    The scalability achieved using MapReduce to implement data processing across a large volume of machines with low implementation costs motivates the design of this library. By taking the principles that have been proven in a distributed MapReduce system and applying them to a single-machine, multicore implementation, reliability and execution efficiency can be attained in a reusable framework. In scaling down the architecture from multi-machine to multi-CPU or multi-core, threads of execution become analogous to machines in a distributed environment as a unit of process execution.

    The MapReduce C++ Library implements a single-machine platform for programming using the the Google MapReduce idiom. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key. Many real world tasks are expressible in this model, as shown in the Google paper.

      map (k1,v1) --> list(k2,v2)
      reduce (k2,list(v2)) --> list(v2)

    Synopsis

    namespace mapreduce {
    
    template<typename MapTask,
             typename ReduceTask,
             typename Datasource=datasource::directory_iterator<MapTask>,
             typename Combiner=null_combiner,
             typename IntermediateStore=intermediates::local_disk<MapTask> >
    class job;
    
    } // namespace mapreduce
    
        

    The developer is required to write two classes; MapTask implements a mapping function to process key/value pairs generate a set of intermediate key/value pairs and ReduceTask that implements a reduce function to merges all intermediate values associated with the same intermediate key.

    In addition, there are three optional template parameters that can be used to modify the default implementation behavior; Datasource that implements a mechanism to feed data to the Map Tasks - on request of the MapReduce libraryCombiner that can be used to partially consolidate results of the Map Task before they are passed to the Reduce Tasks, and IntermediateStore that handles storage, merging and sorting of intermediate results between the Map and Reduce phases.

    The MapTask class must define four data types; the key/value types for the inputs to the Map Tasks and the intermediate types.

    class map_task
    {
      public:
        typedef std::string   key_type;
        typedef std::ifstream value_type;
        typedef std::string   intermediate_key_type;
        typedef unsigned      intermediate_value_type;
    
        map_task(job::map_task_runner &runner);
        void operator()(key_type const &key, value_type const &value);
    }:
          

    The ReduceTask must define the key/value types for the results of the Reduce phase.

    class reduce_task
    {
      public:
        typedef std::string  key_type;
        typedef size_t       value_type;
    
        reduce_task(job::reduce_task_runner &runner);
    
        template<typename It>
        void operator()(typename map_task::intermediate_key_type const &key, It it, It ite)
    }:
          

    Extensibility

    The library is designed to be extensible and configurable through a Policy-based mechanism. Default implementations are provided to enable the library user to run MapReduce simply by implementing the core Map and Reduce tasks, but can be replaced to provide specific features.

    Policy   Application   Supplied Implementation(s)
    Datasource  

    mapreduce::job template parameter

      datasource::directory_iterator<MapTask>
    Combiner  

    mapreduce::job template parameter

      null_combiner
    IntermediateStore  

    mapreduce::job template parameter

      local_disk<MapTask, SortFn, MergeFn>
    SortFn  

    local_disk template parameter

      external_file_sort

    MergeFn

     

    local_disk template parameter

      external_file_merge
    SchedulePolicy  

    mapreduce::job::run() template parameter

     

    cpu_parallelsequential

    Datasource

    This policy implements a data provider for Map Tasks. The default implementation iterates a given directory and feeds each Map Task with a Filename and std::ifstream to the open file as a key/value pair.

    Combiner

    A Combiner is an optimization technique, originally designed to reduce network traffic by applying a local reduction of intermediate key/value pairs in the Map phase before being passed to the Reduce phase. The combiner is optional, and can actually degrade performance on a single machine implementation due to the additional file sorting that is required. The default is therefore a null_combiner which does nothing.

    IntermediateStore

    The policy class implements the behavior for storing, sorting and merging intermediate results between the Map and Reduce phases. The default implementation uses temporary files on the local filesystem.

    SortFn

    Used to sort external intermediate files. Current default implementation uses a system() call to shell out to the operating system SORT process. A Merge Sort implementation is currently in development.

    MergeFn

    Used to merge external intermediate files. Current default implementation uses a system() call to shell out to the operating system COPY process (Win32 only). A platform independent in-process implementation is required.

    SchedulePolicy

    This policy is the core of the scheduling algorithm and runs the Map and Reduce Tasks. Two schedule policies are supplied, cpu_parallel uses the maximum available CPU cores to run as many map simultaneous tasks as possible (within a limit given in the mapreduce::specification object). The sequential scheduler will run one map task followed by one reduce task, which is useful for debugging purposes.

    Example - WordCount

    Below is a simplified - but complete - WordCount example using the library. Error checking is removed and a simplified definition of a 'word' is used for brevity.

    namespace wordcount {
    
    class map_task;
    class reduce_task;
    
    typedef
    mapreduce::job<
      wordcount::map_task,
      wordcount::reduce_task>
    job;
    
    class map_task : boost::noncopyable
    {
      public:
        typedef std::string   key_type;
        typedef std::ifstream value_type;
        typedef std::string   intermediate_key_type;
        typedef unsigned      intermediate_value_type;
    
        map_task(job::map_task_runner &runner)
          : runner_(runner)
        {
        }
    
        // 'value_type' is not a reference to const to enable streams to be passed
        //    key: input filename
        //    value: ifstream of the open file
        void operator()(key_type const &/*key*/, value_type &value)
        {
            while (!value.eof())
            {
                std::string word;
                value >> word;
                std::transform(word.begin(), word.end(), word.begin(),
                               std::bind1st(
                                   std::mem_fun(&std::ctype<char>::tolower),
                                   &std::use_facet<std::ctype<char> >(std::locale::classic())));
    
                runner_.emit_intermediate(word, 1);
            }
        }
    
      private:
        job::map_task_runner &runner_;
    };
    
    class reduce_task : boost::noncopyable
    {
      public:
        typedef std::string  key_type;
        typedef size_t       value_type;
    
        reduce_task(job::reduce_task_runner &runner)
          : runner_(runner)
        {
        }
    
        template<typename It>
        void operator()(typename map_task::intermediate_key_type const &key, It it, It ite)
        {
            reduce_task::value_type result = 0;
            for (; it!=ite; ++it)
               result += *it;
            runner_.emit(key, result);
        }
    
      private:
        job::reduce_task_runner &runner_;
    };
    
    }   // namespace wordcount
    
    int main(int argc, char **argv)
    {
        wordcount::job::datasource_type datasource;
        datasource.set_directory(argv[1]);
    
        mapreduce::specification  spec;
        mapreduce::results        result;
        wordcount::job            mr(datasource);
        mr.run<mapreduce::schedule_policy::cpu_parallel>(spec, result);
    
        // output the results
        std::cout << std::endl << "
    " << "MapReduce statistics:";
        std::cout << "
      " << "MapReduce job runtime                     : " << result.job_runtime << " seconds, of which...";
        std::cout << "
      " << "  Map phase runtime                       : " << result.map_runtime << " seconds";
        std::cout << "
      " << "  Reduce phase runtime                    : " << result.reduce_runtime << " seconds";
        std::cout << "
    
      " << "Map:";
        std::cout << "
        " << "Total Map keys                          : " << result.counters.map_tasks;
        std::cout << "
        " << "Map keys processed                      : " << result.counters.map_tasks_completed;
        std::cout << "
        " << "Map key processing errors               : " << result.counters.map_tasks_error;
        std::cout << "
        " << "Number of Map Tasks run (in parallel)   : " << result.counters.actual_map_tasks;
        std::cout << "
        " << "Fastest Map key processed in            : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
        std::cout << "
        " << "Slowest Map key processed in            : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
        std::cout << "
        " << "Average time to process Map keys        : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
        std::cout << "
    
      " << "Reduce:";
        std::cout << "
        " << "Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
        std::cout << "
        " << "Number of Result Files                  : " << result.counters.num_result_files;
        std::cout << "
        " << "Fastest Reduce key processed in         : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
        std::cout << "
        " << "Slowest Reduce key processed in         : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
        std::cout << "
        " << "Average time to process Reduce keys     : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
    
        return 0;
    }
          

    Performance

    Here are some results running the WordCount example on the Westbury Lab USENET corpus (2005) containing 9.92 GB (10,659,287,688 bytes) of data in 23 files.

    Sequential MapReduce

    The sequential schedulig algorithm gives a baseline timing for the WordCount implementation, running a single Map task followed by a single Reduce task.

    MapReduce Wordcount Application
    16 CPU cores
    class mapreduce::job<class wordcount::map_task,class wordcount::reduce_task,struct mapreduce::null_c
    ombiner,class mapreduce::datasource::directory_iterator<class wordcount::map_task>,class mapreduce::
    intermediates::local_disk<class wordcount::map_task,struct win32::external_file_sort,struct win32::e
    xternal_file_merge> >
    
    Running Sequential MapReduce...
    Finished.
    
    MapReduce statistics:
      MapReduce job runtime                     : 3105 seconds, of which...
        Map phase runtime                       : 699 seconds
        Reduce phase runtime                    : 2406 seconds
    
      Map:
        Total Map keys                          : 23
        Map keys processed                      : 23
        Map key processing errors               : 0
        Number of Map Tasks run (in parallel)   : 1
        Fastest Map key processed in            : 0 seconds
        Slowest Map key processed in            : 96 seconds
        Average time to process Map keys        : 30 seconds
    
      Reduce:
        Number of Reduce Tasks run (in parallel): 1
        Number of Result Files                  : 10
        Fastest Reduce key processed in         : 161 seconds
        Slowest Reduce key processed in         : 390 seconds
        Average time to process Reduce keys     : 104 seconds
    

    CPU Parallel

    Running on a 16 CPU-core Windows server, using all core for Map tasks, produces the following results:

    MapReduce Wordcount Application
    16 CPU cores
    class mapreduce::job,class mapreduce::
    intermediates::local_disk >
    
    Running CPU Parallel MapReduce...
    CPU Parallel MapReduce Finished.
    
    MapReduce statistics:
      MapReduce job runtime                     : 1608 seconds, of which...
        Map phase runtime                       : 842 seconds
        Reduce phase runtime                    : 766 seconds
    
      Map:
        Total Map keys                          : 23
        Map keys processed                      : 23
        Map key processing errors               : 0
        Number of Map Tasks run (in parallel)   : 16
        Fastest Map key processed in            : 0 seconds
        Slowest Map key processed in            : 842 seconds
        Average time to process Map keys        : 433 seconds
    
      Reduce:
        Number of Reduce Tasks run (in parallel): 10
        Number of Result Files                  : 10
        Fastest Reduce key processed in         : 384 seconds
        Slowest Reduce key processed in         : 766 seconds
        Average time to process Reduce keys     : 261 seconds
    

    Running on the same server, restricting the number of Map task to 8 yields:

    MapReduce Wordcount Application
    16 CPU cores
    class mapreduce::job<class wordcount::map_task,class wordcount::reduce_task,struct mapreduce::null_c
    ombiner,class mapreduce::datasource::directory_iterator<class wordcount::map_task>,class mapreduce::
    intermediates::local_disk<class wordcount::map_task,struct win32::external_file_sort,struct win32::e
    xternal_file_merge> >
    
    Running CPU Parallel MapReduce...
    CPU Parallel MapReduce Finished.
    
    MapReduce statistics:
      MapReduce job runtime                     : 1743 seconds, of which...
        Map phase runtime                       : 950 seconds
        Reduce phase runtime                    : 793 seconds
    
      Map:
        Total Map keys                          : 23
        Map keys processed                      : 23
        Map key processing errors               : 0
        Number of Map Tasks run (in parallel)   : 8
        Fastest Map key processed in            : 0 seconds
        Slowest Map key processed in            : 934 seconds
        Average time to process Map keys        : 303 seconds
    
      Reduce:
        Number of Reduce Tasks run (in parallel): 10
        Number of Result Files                  : 10
        Fastest Reduce key processed in         : 396 seconds
        Slowest Reduce key processed in         : 793 seconds
        Average time to process Reduce keys     : 271 seconds
  • 相关阅读:
    Vuex
    浏览器渲染页过程描述
    mvvm 模式
    flex 布局
    js 浮点数计算
    3、异步编程-JS种事件队列的优先级
    高阶函数 debounce 和 throttle
    记录学习MVC过程,HTML铺助类(二)
    记录学习MVC过程,控制器方法和视图(一)
    修改以前项目遇到,所有页面继承BaseBage,Sesssion保存一个model,实现登录(记录一下)
  • 原文地址:https://www.cnblogs.com/lenmom/p/7799629.html
Copyright © 2011-2022 走看看