zoukankan      html  css  js  c++  java
  • spark-yarn模式和shuffle原理

    sparkjob的部署
    -----------------
        1.client
            driver run on client
        2.cluster
            driver on a worker
    
    
    
    4.启动job时,指定资源使用。
        $>spark-submit 
            --driver-memory MEM            //设置driver内存,默认1g,配置2g
            --executor-memory MEM        //控制每个执行器内存,默认1g
    
            [只在standalone模式下]
            --driver-cores                //控制driver使用的内核数,默认1.
    
            [standalone & mesos]
            --total-executor-cores NUM    //控制执行器使用的总内核数
    
            [standalone & yarn]
            --executor-cores NUM        //控制每个执行的内核数。
            
            [yarn]
            --driver-cores NUM            //控制driver内核数,默认1
            --num-executors NUM            //启动的执行器个数,动态分配内核启用时,数字就是Num的值。
    
        
    5.启动spark-shell,手动分配资源
        //启动3个executor,worker节点不能启动2个executor
        spark-shell --master spark://s101:7077 --driver-memory 2g --executor-memory 6g --total-executor-cores 4 --executor-cores 1
        //启动了4个executor,
        spark-shell --master spark://s101:7077 --driver-memory 2g --executor-memory 3g --total-executor-cores 4 --executor-cores 1
        //启动了7个executor,
        spark-shell --master spark://s101:7077 --driver-memory 2g --executor-memory 3g --total-executor-cores 22 --executor-cores 3
    
    
    spark + yarn模式
    --------------------
        yarn模式,不需要spark集群,只是在client安装spark,提交作业时,走的是hadoop的流程。
        使用spark的jar,在nodemanager上启动的spark的executor进程。
        --master的值指定yarn即可,rm的地址从配置文件中提取的。
    
        --master yarn --deployMode client            //--master yarn-client
        --master yarn --deployMode cluster            //--master yarn-cluster
        [yarn-client]
            Appmaster只运行appmaster自身程序,负责资源请求。
            Driver仍然位于client执行。
    
        [yarn-cluster]
            appmaster不但负责资源请求,还负责运行driver。
    
        //实操
        1.停止spark集群
            stop-all.sh
    
        2.启动zk和hdfs-yarn
            start-yarn.sh
    
        3.配置spark的spark-env.sh的HADOOP_CONF_DIR并分发.
            ...
            export HADOOP_CONF_DIR=/soft/hadoop/etc/hadoop
        
        4.启动spark-shell
            spark-shell --master yarn --deploy-mode client --num-executors 4
            
        5.故障诊断
            出现 is running beyond virtual memory limits. 
            Current usage: 178.7 MB of 1 GB physical memory used; 2.3 GB of 2.1 GB virtual memory used. Killing container.
    
            关闭yarn-site.xml虚拟内存检查并分发文件。
            [yarn-site.xml]
            <property>
                <name>yarn.nodemanager.vmem-check-enabled</name>
                <value>false</value>
            </property>
    
        6.spark yarn运行时将spark的所有jar上传到hdfs,协同hadoop的作业运行流程。
            配置spark.yarn.jars或者spark.yarn.archive,避免每次上传jar包。
            1.spark.yarn.jars
                spark.yarn.jars=hdfs:///some/path
            2.spark.yarn.archive
                spark.yarn.archive=hdfs://mycluster/user/centos/spark/spark-jars.zip
    
            3.配置spark.yarn.archive属性,避免每次上传大的jar包。
                a)上传zip文件到hdfs://mycluster/user/centos/spark/spark-jars.zip
                b)配置spark配置文件。
                    [spark/conf/spark-default.conf]
                    spark.yarn.archive hdfs://mycluster/user/centos/spark/spark-jars.zip
                c)启动shell
                    $>spark-shell --master yarn-client
    
    ShuffleMapTask
    ------------------
        private[spark] class ShuffleMapTask(
            stageId: Int,
            stageAttemptId: Int,
            taskBinary: Broadcast[Array[Byte]],        //(rdd,dep)
            partition: Partition,
            @transient private var locs: Seq[TaskLocation],
            metrics: TaskMetrics,
            localProperties: Properties,
            jobId: Option[Int] = None,
            appId: Option[String] = None,
            appAttemptId: Option[String] = None)
        }
    
    shuffle管理
    -------------------
        [ShuffleManager]
            ShuffleManager,是shuffle系统可插拔接口。
            ShuffleManager在driver和每个executor通过SparkEnv进行创建。
            基于spark.shuffle.manager属性配置创建相应shuffleManager实现。
            在spark 2.1.0中只有SortShuffleManager.
            在spark 1.6.0中有SortShuffleManager和HashShuffleManager.
    
        [HashShuffleManager]
            spark.shuffle.consolidateFiles=true,默认false,合并输出。
            slot = 并发能力 = 并发执行的线程数 = (执行器个数 * 每个执行器的cpu内核数) / 每个任务占用的内核数。
    
        spark 2.1.0的实现类是SortShuffleManager(不论sort还是tungsten-sort(钨丝排序))
        [SortShuffleManager]
            基于排序的shuffle,输入kv按照目标分区的id进行排序,然后写入一个map输出文件。
            reducer读取连续文件区域来提取数据。map内存不足,溢出到磁盘,磁盘上的文件最终输出到一个文件中。
    
            该方式的shuffle有两种途径生成map输出文件:
            1.串行化排序(以下三个条件均满足使用)
                a)shuffle依赖没有指定聚合或者输出排序
                b)shuffle序列化器支持序列化值得重新定位。(当前只有KryoSerializer和SQL的Serializer可以,java不可以)
                c)shuffle生成的分区少于16777216个.
    
            2.反串行排序
                所有其他情况。
        
        [串行化排序模式]
            该模式下,传递给ShuffleWriter的record即可被串行化,排序时也是串行化进行缓冲。该方式有几点优化
            处理:
            1.对串行化的二进制数据进行排序,而不是针对java对象,因此可以减少内存消耗和过度GC。
              该优化机制要求串行化器具有特殊的属性能够对串行的record进行重排序,不需要反串过程。
    
            2.使用串行化的具有高效缓存特征的sorter,可以对压缩的record指针和分区id的数组进行排序。
              数组中,每条record使用8字节空间存储。
    
            3.溢出合并过程对串行化的数据块(属于同一分区)进行操作,并且合并期间不需要反串(流)。
    
            4.支持压缩文件块的合成,合并过程简单的将压缩和串行化的分区最终合并成一个分区文件,
              支持高效数据复制方式,例如NIO中的零拷贝。
            
            
    ShuffleManager.registerShuffle()
    -----------------------------------
        //1.通过ShuffleDep判断是否需要bypass
        if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) {
          new BypassMergeSortShuffleHandle[K, V](shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
        } 
        //判断依赖是否可以串行shuffle
        else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
          new SerializedShuffleHandle[K, V]( shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
        } 
        //基本shuffle
        else {
          new BaseShuffleHandle(shuffleId, numMaps, dependency)
        }
    
    
        
    是否迂回的条件
    -------------------------
        def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
        //如果map端需要聚合,不能回调。
        if (dep.mapSideCombine) {
          require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
          false
        }
        //判断依赖的分区数量是否小于指定的配置(默认时200)
        else {
          val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
          dep.partitioner.numPartitions <= bypassMergeThreshold
        }
        }
    
        //结论
        if(map需要聚合){
            //不能迂回
        }
        else{
            if(分区数 <= 200(可配:spark.shuffle.sort.bypassMergeThreshold)){
                //可以迂回
            }
            else{
                //不能迂回
            }
        }
    
    
    串行shuffle的判断条件
    ------------------------
        def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
            val shufId = dependency.shuffleId
            val numPartitions = dependency.partitioner.numPartitions
            //判断是否dep中使用的串行化器是否时kryo(kryo支持)。
            if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
              false
            }
            //判断dep是否定义聚合器
            else if (dependency.aggregator.isDefined) {
              false
            } 
            //分区数大于特定值
            else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
              false
            } 
            ///
            else {
              true
            }
        }
        
        //结论
        if(不是kryo){
            //不能用串行shuffle
        }
        //
        else if(dep定义了聚合器){
            //不能用串行shuffle
        }
        else if(分区数 > (1 << 24) ){
            //不能用串行shuffle
        }
        else{
            //使用串行shuffle
        }
    
    
    整个shuffle处理手段的优先级
    ---------------------------
        //1.迂回策略
        if(能否迂回){
            //new BypassMergeSortShuffleHandle()
        }
        //2.串行策略
        else if(是否串行){
            //new SerializedShuffleHandle();
        }
        //3.常规策略
        else{
            //new BaseShuffleHandle
        }
    
    
    SortShuffleManager.getWrtier()
    --------------------------------
        handle match{
            case SerializedShuffleHandle        => new UnsafeShuffleWriter();
            case BypassMergeSortShuffleHandle    => new BypassMergeSortShuffleWriter();
            case BaseShuffleHandle                => new SortShuffleWriter();
        
        }
    
    
    ShuffleWriter的特性
    --------------------
        abstract class ShuffleWriter
             |
            / 
            ---
             |
             |------BypassMergeSortShuffleWriter
             |------UnsafeShuffleWriter
             |------SortShuffleWriter
    
        [BypassMergeSortShuffleWriter]
            该类实现了hash方式的shuffle处理手段,将record写入单独文件,每个分区一个文件。
            然后对每个分区文件合并再产生一个文件,文件的不同区域用于不同reduce,该模式下,
            record不在内存中缓存,这是和HashShuffleWriter本质不同点。
    
            该方式对于有大量分区的shuffle处理效率不高,原因是需要对所有分区同时打开串行化器
            和文件流。
    
        [UnsafeShuffleWriter]
            将kv分开单独以kryo串行写入缓冲区,然后将缓冲放入ShuffleExternalSorter中。
            1.ShuffleExternalSorter
                专门用于基于sort的shuffle。record追加到date page,如果所有record插入
                后或者内存到达limit值,这些记录按照分区id进行排序,排序后的记录写入单独
                的输出文件(或多个文件),输出文件的格式和SortShuffleWriter输出文件格式相同,
                每条分区的记录都是单独串行和压缩写入的,同样使用反串和解压缩方式读取。
                和ExternalSorter不同,该对象不对溢出文件进行合并,而是将合并过程交给
                UnsafeShuffleWriter,避免多余串行和反串过程。
    
                KV以串行和压缩方式写缓冲区,再将缓冲区字节数组写入页面内存(long[]),标记好
                长度、偏移量、分区数等等,每个KV在页面内存的地址和分区进行编码后写入内存
                排序器(InMemorySorter,该排序器使用分区id降序排列).如果内存页默认超过1G(
                可以通过spark.shuffle.spill.numElementsForceSpillThreshold进行修改)个kv,
                发生溢出,进行排序输出到文件。
    
    
        [SortShuffleWriter]
            
            
    
    
    
    Spark中的串行化
    -------------------
        spark默认使用java串行化器,但性能一般,优化手段之一
        使用kryo串行化,但是kryo串行化器对于要串行化的类使用前
        需要注册,spark的kryo串行化器只是对java内置类、scala的内置
        类核spark的内置类进行了注册,自定义的类必须手动注册。
        也是没有把kryo串行化器做为默认设置的原因.
        
        
        keyo串行化为什么快
    ——————————————————————————————————————————
    为什么kryo比其它的序列化方案要快?
    
    为每一个类分配一个id
    
    实现了自己的IntMap
    
    代码中一些取巧的地方:
    
    利用变量memoizedRegistration和memoizedType记录上一次的调用writeObject函数的Class,则如果两次写入同一类型时,可以直接拿到,不再查找HashMap。
  • 相关阅读:
    yum仓库客户端搭建和NTP时间同步客户端配置
    linux中删除文件内空白行的几种方法。
    ubuntu下安装memcached和PHP的memcache扩展
    Java JXL 实现Excel文件读写操作
    Spring事务管理
    代理模式
    Java POI 实现Excel文件读写操作
    iOS 查找文件、遍历文件系统
    iOS NSDate获取当前时间并格式化
    iOS 为类添加Xib里面配置的view
  • 原文地址:https://www.cnblogs.com/zyde/p/9046244.html
Copyright © 2011-2022 走看看