zoukankan      html  css  js  c++  java
  • MapReduce中shuffle过程

    shuffle是MapReduce的核心,map和reduce的中间过程。

    Map负责过滤分发,reduce归并整理,从map输出到reduce输入就是shuffle过程。

    实现的功能

    分区

    决定当前key交给哪个reduce处理

    默认:按照key的hash值对reduce的个数取余进行分区

     

    分组

    将相同key的value合并

    排序

    按照key对每一个keyvalue进行排序,字典排序

    过程

     

    map端shuffle

    spill阶段:溢写

    每一个map task处理的结果会进入环形缓冲区(内存100M)

    分区

    对每一条key进行分区(标上交给哪个reduce)

    hadoop      1       reduce0
    hive        1       reduce0
    spark       1       reduce1
    hadoop      1       reduce0
    hbase       1       reduce1
    排序

    按照key排序,将相同分区的数据进行分区内排序

    hadoop      1       reduce0
    hadoop      1       reduce0
    hive        1       reduce0
    hbase       1       reduce1
    spark       1       reduce1
    溢写

    当整个缓冲区达到阈值80%,开始进行溢写


    将当前分区排序后的数据写入磁盘变成一个文件file1
    最终生成多个spill小文件

    可以在mapred-site.xml中设置内存的大小和溢写的阈值

    在mapred-site.xml中设置内存的大小
    ​
        <property>
    ​
          <name>mapreduce.task.io.sort.mb</name>
    ​
          <value>100</value>
    ​
        </property>
    ​
    在mapred-site.xml中设置内存溢写的阈值  
    ​
        <property>
    ​
          <name>mapreduce.task.io.sort.spill.percent</name>
    ​
          <value>0.8</value>
    ​
        </property>

    merge:合并

    将spill生成的多个小文件进行合并

    排序:将相同分区的数据进行分区内排序,实现comparator比较器进行比较。最终形成一个文件。

    
    
    file1
    hadoop      1       reduce0
    hadoop      1       reduce0
    hive        1       reduce0
    hbase       1       reduce1
    spark       1       reduce1
    ​
    file2
    hadoop      1       reduce0
    hadoop      1       reduce0
    hive        1       reduce0
    hbase       1       reduce1
    spark       1       reduce1
    ​
    end_file:
    hadoop      1       reduce0
    hadoop      1       reduce0
    hadoop      1       reduce0
    hadoop      1       reduce0
    hive        1       reduce0
    hive        1       reduce0
    hbase       1       reduce1
    hbase       1       reduce1
    spark       1       reduce1
    spark       1       reduce1

    map task 结束,通知app master,app master通知reduce拉取数据

    reduce端shuffle

    map task1
            hadoop      1       reduce0
            hadoop      1       reduce0
            hadoop      1       reduce0
            hadoop      1       reduce0
            hive        1       reduce0
            hive        1       reduce0
            hbase       1       reduce1
            hbase       1       reduce1
            spark       1       reduce1
            spark       1       reduce1
    map task2
            hadoop      1       reduce0
            hadoop      1       reduce0
            hadoop      1       reduce0
            hadoop      1       reduce0
            hive        1       reduce0
            hive        1       reduce0
            hbase       1       reduce1
            hbase       1       reduce1
            spark       1       reduce1
            spark       1       reduce1

    reduce启动多个线程通过http到每台机器上拉取属于自己分区的数据

    reduce0:
        hadoop      1       reduce0
        hadoop      1       reduce0
        hadoop      1       reduce0
        hadoop      1       reduce0
        hadoop      1       reduce0
        hadoop      1       reduce0
        hadoop      1       reduce0
        hadoop      1       reduce0
        hive        1       reduce0
        hive        1       reduce0
        hive        1       reduce0
        hive        1       reduce0

    merge:合并,将每个map task的结果中属于自己的分区数据进行合并

    排序:对整体属于我分区的数据进行排序

    分组:对相同key的value进行合并,使用comparable完成比较。

    hadoop,list<1,1,1,1,1,1,1,1>
    hive,list<1,1,1,1>
    

      

    优化

    combine

    在map阶段提前进行一次合并。一般等同于提前执行reduce

    
    
    job.setCombinerClass(WCReduce.class);

    compress

    压缩中间结果集,减少磁盘IO以及网络IO

    压缩配置方式

    1.default:所有hadoop中默认的配置项
    2.site:用于自定义配置文件,如果修改以后必须重启生效
    3.conf对象配置每个程序的自定义配置
    4.运行时通过参数实现用户自定义配置
    bin/yarn jar xx.jar -Dmapreduce.map.output.compress=true -Dmapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.Lz4Codec main_class input_path ouput_path

    查看本地库支持哪些压缩

    bin/hadoop checknative

    通过conf配置对象配置压缩

    public static void main(String[] args) {
            Configuration configuration = new Configuration();
            //配置map中间结果集压缩
            configuration.set("mapreduce.map.output.compress","true");
            configuration.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.Lz4Codec");
            //配置reduce结果集压缩
            configuration.set("mapreduce.output.fileoutputformat.compress","true");
            configuration.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.Lz4Codec");
            try {
                int status = ToolRunner.run(configuration, new MRDriver(), args);
                System.exit(status);
            } catch (Exception e) {
                e.printStackTrace();
            }
    }
    

     

    通过自定义配置文件site-xml   

    针对Map Output数据进行压缩设置

    对于MR程序来说:
    提交任务的时候使用参数配置
    mapreduce.map.output.compress=true
    mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
     
     

    运行官方MapReduce案例 wordcount    可在Yarn上看到

     

     
     
     
     
     
     
     
     
  • 相关阅读:
    MapReduce WordCount Combiner程序
    Spring Boot 单元测试
    Spring Boot @SpringApplicationConfiguration 不能导入的问题
    西西弗斯 滚石上山
    《Effective Modern C++》翻译--简单介绍
    算法排序问题
    MySQL Study之--MySQL schema_information数据库
    HDOJ 4251 The Famous ICPC Team Again
    一、OpenStack入门 之 初步认识
    python模块
  • 原文地址:https://www.cnblogs.com/whcwkw1314/p/8970985.html
Copyright © 2011-2022 走看看