zoukankan      html  css  js  c++  java
  • MR job提交流程和切片算法

    Mapreduce:
        编程模型,适用于分布式计算
    
        
        Map:    映射    预处理
        Reduce:    化简    聚合
        
    
        shuffle:    网络间分发
        combiner:    Map端的reduce
        partitioner:    分区,默认hash分区
    
    combiner:
        适合最大值,最小值
        平均值不适用
    
        
    文件格式和切片的关系:
        
    
    MR的job提交流程:
        1、计算Map数和文件切片:
            Map运行的基本单位,切片数=map数
            FileInputFormat中getSplits方法:
            minSize:1
            maxSize:Long.Max_VALUE
    
            确定文件夹下面的所有文件并进行迭代(listStatus方法)
            
            判断文件是否可切割:
                1、默认文本文件格式TextInputFormat
                2、通过文件后缀的判断,得到文件的压缩编解码器(bzip2可切割)
                3、如果文件可切割:
                    选择(minSize,blockSize,maxSize)中的中间值作为切片大小                              ///重点切片算法
        2、使用LocalJobRunner的submitJob方法,真正开始提交作业
        3、开始Map作业提交:
            ExecutorService mapService = createMapExecutor();
            runTasks(mapRunnables, mapService, "map");        //执行MapTaskRunnables对象的run函数
    
            //map中除了partition、combiner还有sort阶段
    
            使用maptask的runNewMapper方法开始正式的map阶段
                1、根据自定义map类名,获得自定义map对象
                2、调用Mapper的run函数来运行用户自定义的map方法
                    //设置相关变量或者参数,一个map只调用一次
                    setup(context);
                    try {
                      while (context.nextKeyValue()) {
                    //使用while循环调用自定义map的方法
                    map(context.getCurrentKey(), context.getCurrentValue(), context);
                      }
                    } finally {
                      //清理过程,包括清理一些没用的k-v
                      cleanup(context);
                    }
    
            Spill:溢出    //当map中的数据超出内存空间的80%,超出的数据就会被本地化
    
            map端的输出称为ifile:key-len, value-len, key, value
            
            shuffle在调用的时候对ifile进行处理
    
        4、通过shuffle进行网络间分发,reduce的调用过程类似于map过程
    
        5、细节:FileInputFormat ====> RecordReader ==> 
             map ===> partition ===> sort ===> combiner ===> shuffle ====> reduce  
             RecordWriter ====> FileOutputFormat
    
    
    FileInputFormat:
        TextInputFormat =====> createRecordReader ====> LineRecordReader  //负责处理文件,定义以行为单位读取和文件类型为UTF-8
               
        通过LineReader中的readDefaultLine来读行
    
        CR    //carriage return 回车
        LF    //line feed      换行
        CRLF    //          回车换行
    
     /* We're reading data from in, but the head of the stream may be
       流的头部可能已经被buffer所缓存
       1. 缓冲区中没有换行符,我们需要拷贝所有然后读取另一个缓冲区
       2. 在buffer中含有明确的换行符,只需要拷贝即可
       3. 在buffer中没有明确的换行符, 比如buffer以回车作为末尾. 
          我们拷贝内容到回车符为止到字符串中,我们需要观察什么在回车符之后
          1)回车符后有换行符,将CR和LF作为一个整体,作为一个整行结尾
          2)回车符后没有换行符,用flag来标记,如果恰好在buffer最后,需要观察下一个buffer的字符
     
    
  • 相关阅读:
    让pv3d(papervision3D)支持单帧前进、后退(nextFrame)。
    4399 威武三国 网页游戏破解。
    策划进化史一 (2013-12-21)
    Java的一个高性能快速深拷贝方法。Cloneable?
    as3commons-bytecode 获取所有类的一个BUG
    MYSQL 大文件无法导入的问题。
    诡异的 未处理的IOErrorEvent 2035
    一个用微软官方的OpenXml读写Excel 目前网上不太普及的方法。
    如何在高并发环境下设计出无锁的数据库操作(Java版本)
    达洛克战记3 即将开服! What's New!
  • 原文地址:https://www.cnblogs.com/zyde/p/8944191.html
Copyright © 2011-2022 走看看