zoukankan      html  css  js  c++  java
  • Spark内存和shuffle

    Spark的全排序
    -------------------
        RDD.sortByKey()内部使用采样方式按照RangePartitioner进行分区。
    
    Spark Shuffle
    -------------------
        1.Bypass
            迂回shuffle
            可以使用零拷贝技术实现分区文件的合并。
            配置属性为:
            
                spark.file.transferTo=false
    
        2.SerializedShuffle
            串行shuffle,不安全shuffle
    
        3.SortShuffle
            基本shuffle.
    
    
    ShuffleWriter
    --------------------
        1.BypassMerge..Writer
        2.Unsafe..Writer
        3.SortShuffleWriter
            sorter = 
                if(mapCombine?){
                    // agg + 排序
                    new ExternalSorter[K, V, C](context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
                }
                else{
                    //没有 agg , 没有排序
                    new ExternalSorter[K, V, V](context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
                }
    
    
            sorter.insertAll(records) ;
            
                --> if(agg.defined?){
                        
                    }
                    else{
                        //在缓冲区中插入数据
                        buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
    
                        //写入kv到内存缓存区中,每个kv在缓存区中的存放形式为[(parid , key) , value]
                        //如果到达一定条件,溢出buffer数据到磁盘,溢出到磁盘后,对buffer进行重置。
                        //条件: (1)SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", Long.MaxValue)    //long最大值
                                (2)SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold", 5 * 1024 * 1024)            //5M
      
                        //溢出过程
                            累计溢出的次数。
                    }
    
    SortShuffleWriter
    --------------------
        使用的最多的ShuffleWriter,迂回和UnsafeShuffleWriter不满足时,使用该类完成shuffle输出。
        分区id,k,v重新组合,(partid + k)形成新key,写入缓冲区或者map集合,究竟是
        缓冲区还是map集合看是否需要map合成,需要合成是map,否则是buf.
        缓存会一定条件下(内存超过阈值或者达到特定数据)溢出数据到磁盘,溢出磁盘前
        需要对kv进行排序,因此输出到文件中的数据是排了序的数据,注意可能产生多个溢出文件。
        最后还需要将内存数据和溢出的文件合并,产生一个shuffle文件。还有一个存放分区id的索引文件。
    
        本次磁盘文件写入完成后,生成mapStatus状态数据,内部主要包含数据块信息(在哪个节点运
        行的任务,端口,执行器id等)并将其发送给driver,在driver保存,以便供下一个阶段的task
        提取数据块使用。
    
    
    ShuffleReader
    ------------------
        BlockStoreShuffleReader.
        从其他节点的块存储设备上请求数据,抓取并读出特定分区范围的数据。
        该对象由SortShuffleManger负责创建,reader内部含有四个属性,ShuffleHandle,
        int startPartition, int edn , ctx);
    
        该reader通过ShuffleBlockFetcherIterator抓取本地或者远程的块数据。返回(blockId->InputStream).
        本地块通过Local Block manager读取,远程块通过BlockTransferService来提取。
    
        //task一次性从远程节点抓取数据量的最大值,每次抓取请求是 1/5
        SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m")
        
        //一次请求抓取的量限制
        SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue))
    
        [概括]
        Task读取上一阶段的输出,包括两种方式读取,本地块和远程块,远程块需要向driver发送请求消息,获得
        mapstatus,拿到数据后再通过传输服务向对应的节点发送FetchRequest.
    
    
    
    Spark内存管理
    ---------------------
        不是通过物理或者硬件底层API实现对内存资源探测,只是通过对内存操作过程期间
        的字节量的变化不断更新维护的数字,通过该方式跟踪内存使用情况。spark对每个task都关联了
        内存的使用量,存放在了map<Long,Long>中。严格意义上讲,spark内存的管理是估算量,不是
        精确量。
    
        [JVM]
            [堆角度]
                1.堆内
                    年轻代            //伊甸区 + 幸存区()
                    年老代            //对象
                    
    
                2.非堆
                    jvm - heap
                    方法区
                
                3.离堆
                    os - jvm
    
            [全局角度]
                1.方法区
                2.堆区
                3.java方法栈
                4.本地方法栈
                5.程序计数器
    
        指定spark内存。        //针对堆的管理
        JVM                    //全面
    
        [MemoryManager]
        强制在storage和execute之间共享内存的数量。
        执行内存 : shuffle中执行计算的内存,join,aggregate,sort
        存储内存 : 用于跨集群,缓存或传播数据的内存。
    
        实现有两种:
        1.StaticMemoryManager
            将堆空间划分成不相交的两个区域,此两个区域完全独立,不存在相互借用的问题。
            spark.memory.useLegacyMode=true,默认false,
    
            [spark.shuffle.memoryFraction]
            shuffle期间,用户cogroup和聚合的内存比例,默认0.2
    
            [spark.storage.memoryFraction]
            是spark缓存的内存比例,针对jvm的堆空间的比例,默认0.62.UnifiedMemoryManager
            在execution和storage之间存在交叉区间,可以相互借用。
    
             1.spark.memory.fraction
                用于spark的执行和存储的内存比例,针对(heap - 300m)而言的,默认值是 0.6
                (heap - 300M) * 0.6
    
             2.spark.memory.storageFraction
                在spark内存中,控制存储内存的比例。默认0.5
                存储内存 = spark内存 * 0.5 = (heap - 300M) * 0.6 * 0.5
    
            存储内存可以借用执行内存的空闲空间,执行内存不足时可以收回借出的内存,但最多
            也只能收回借出的那部分内存,执行内存不能抢存储内存的内存。
            
            执行内存也可以借用存储内存空间,不还内存。
    
        3.内存结构
            3.0)driver内存配置和executor内存配置。
                --driver-memory                //driver内存
                spark.executor.memory        //执行器内存,等价于--executor-memory
    
            3.1)保留内存
                默认是300M,程序硬编码实现。
                //RESERVED_SYSTEM_MEMORY_BYTES = 300M
                conf.getLong("spark.testing.reservedMemory",if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
                可以通过spark.testing.reservedMemory设置特定的保留内置的大小。
                spark.testing属性可以控制是否关闭保留内存。
                //设置保留内存的大小
                spark-shell --master spark://s101:7077 --executor-memory 20m --conf spark.testing.reservedMemory=80000000
    
                //关闭保留内存
                spark-shell --master spark://s101:7077 --executor-memory 20m --conf spark.testing=1
    
            3.2)系统内存
                最小 = 保留内存 * 1.5
                spark.testing.memory=配置系统内存
                --executor-memory=配置系统内存
                
            3.3)内存管理实现原理
                内存池实现。
                class MemoryPool{
                    //容量
                    var _poolSize: Long = 0
                    //已使用
                    def memoryUsed: Long
                }
                
                //执行内存池
                class ExecutionMemoryPool extends MemoryPool{
                    //维护内存
                    memoryForTask = new mutable.HashMap[Long, Long]()
                }
                
                不是通过物理或者硬件底层API实现对内存资源探测,只是通过对内存操作过程期间
                的字节量的变化不断更新维护的数字,通过该方式跟踪内存使用情况。spark对每个task都关联了
                内存的使用量,存放在了map<Long,Long>中。严格意义上讲,spark内存的管理是估算量,不是
                精确量。
                
                
            
            3.4)统一内存管理
                系统内存            //堆内存,通过--executor-memory | spark.executor.memory 
                堆内存                //系统内存,至少是保留的1.5倍。
                保留内存            //300M定量,关闭通过--conf spark.testing=1 , 可用spark.testing.reservedMemory设置保留内存大小.
                可用内存            //(系统内存 - 保留内存)
                spark内存            //可用内存 *  spark.memory.fraction(0.6)
                用户内存            //可用内存 - spark内存
                存储内存            //spark内存 * spark.memory.storageFraction(0.5)
                执行内存            //spark内存 *  (1 - spark.memory.storageFraction)
            
            3.5)静态内存
                spark.memory.useLegacyMode=true            //启用传统模式
                spark.shuffle.memoryFraction            //0.2,
                spark.storage.memoryFraction            //0.6
                spark.storage.unrollFraction            //
    
            
            3.6)练习
                调整保留内存,用户内存,执行内存,存储内存均为10m
                spark-shell --master spark://s101:7077 --executor-memory 40m --conf spark.testing.reservedMemory=10000000 --conf spark.memory.fraction=0.67 --conf spark.memory.storageFraction=0.5
    
    
    RDD持久化
    ---------------
        不要和task结果溢出到磁盘搞混淆,RDD缓存是对某个rdd的结算结果(中间结果)
        进行缓存,避免该过程重复执行。
        存储级别,默认是None.
    
        一旦指定级别后,不能修改,除非先取消持久化。
        
        rdd.cache()是persist()的默认形式。
    
        class StorageLevel private(
            private var _useDisk: Boolean,
            private var _useMemory: Boolean,
            private var _useOffHeap: Boolean,
            private var _deserialized: Boolean,
            private var _replication: Int = 1)
    
        spark在内存中跨操作,在内存(磁盘)中,为了便于重用。
        1.MEMORY_ONLY
            存储原生对象在内中,如果内存不足,有些分区不存储。需要的重新计算,默认级别。
        2.MEMORY_AND_DISK
            原生态存储rdd在内存中,不足的分区存到disk.
    
        3.MEMORY_ONLY_SER 
            在内存中存储串行化rdd.在具有快速串行化器的时,更加高效。
            cpu增加压力。
        4.MEMORY_AND_DISK_SER
            串行化态存储rdd在内存中,不足的分区存到disk.
    
        5.DISK_ONLY
            只在磁盘存储。
    
        6.MEMORY_ONLY_2
            等价于memory_only,2是2个节点存放分区。
        
        7.OFF_HEAP
            离堆存储
        
        8.方法
            1.rdd.persist(newLevel: StorageLevel)
                持久化rdd
    
            2.rdd.unpersist()
                撤销持久化。
    
        9.选择策略
            默认是合理的。
            如果内存不足,使用MEMORY_ONLY_SER.
            轻易不要选择spill到磁盘。
            如果想要更快容错,可以采用副本策略。
    
    
    广播变量******************
    -------------------
         允许开发人员在每个节点上缓存一份数据,不是每个任务都发送一次拷贝,对于每个节点都采用的大型数据集
         适合使用该方式发送。
    
         采用类似于BT算法实现的,实现机制如下:
         driver切割串行对象成chunk,并存在driver端的blockManager。
         在executor中也有blockmanager,需要的话首先从自己的blockmanger中fetch数据,
         如果没有,尝试从driver或其他的executor中fetch,fetch到的data存放到自己blockmanger中,
         以便供其他executor fetch,有效防止driver因发送多个副本给execcute造成的瓶颈。
    
         广播变量实现的原理通过scala的lazy手段实现。********************************
    
         BT:bit torrent.
         我为人人,
         RDD以广播变量方式传输的。
    
        //代码
        def host() = java.net.InetAddress.getLocalHost().getHostName
        def pid() = java.lang.management.ManagementFactory.getRuntimeMXBean().getName().split("@")(0)
        def tid() = Thread.currentThread().getName()
        def oid(obj: AnyRef, str: String) = obj.toString() + " : " + str
        def doSend(str: String) = {
        val sock = new java.net.Socket("s101", 8888)
        val out = sock.getOutputStream()
        out.write((str + "
    ").getBytes())
        out.flush();
        out.close();
        }
        def sendInfo(obj: AnyRef, str: String) = {
           val info = host() + "/" + pid() + "/" + tid() + "/" + oid(obj, str); doSend(info)
        }
    
        //300M
        //val list = db.find() ;
        /**********************************/
        /************    广播变量***********/
        /**********************************/
        class Dog(val name: String, age: Int) extends Serializable
        val d = new Dog("dahuang", 3)
        //作成广播对象
        val dd = sc.broadcast(d)
    
        val rdd1 = sc.makeRDD(1 to 10 , 4)
        val rdd2 = rdd1.map(e=>{
            /********************/
            /***** 访问广播变量   */
            /********************/
            val d0 = dd.value
            val hash = d0.hashCode()
            val name = d0.name ;
            sendInfo(this,hash + " : " + e + " : " + name)
        })
        rdd2.collect()
    
    
    def sendInfo(msg: String,objStr:String) = {
    //获取ip
    val ip = java.net.InetAddress.getLocalHost.getHostAddress
    //得到pid
    val rr = java.lang.management.ManagementFactory.getRuntimeMXBean();
    val pid = rr.getName().split("@")(0);//pid
    //线程
    val tname = Thread.currentThread().getName
    //对象id
    val sock = new java.net.Socket("s101", 8888)
    val out = sock.getOutputStream
    val m = ip + "	:" + pid + "	:" + tname + "	:" + msg + "	:" +  objStr+ "
    "
    out.write(m.getBytes)
    out.flush()
    out.close()
    }
    
    
    
    //准备
    4个执行器 , 6个线程
    spark-shell --master spark://s101:7077 --total-executor-cores 4 --executor-cores 1 
    
    192.168.231.102 :5907   :Executor task launch worker-2  :map    :2122466653
    192.168.231.102 :5907   :Executor task launch worker-2  :map    :211241535
    
    192.168.231.103 :3329   :Executor task launch worker-2  :map    :1508566448
    192.168.231.103 :3329   :Executor task launch worker-2  :map    :923167588
    
    192.168.231.104 :3206   :Executor task launch worker-2  :map    :2004855724
    192.168.231.104 :3207   :Executor task launch worker-2  :map    :1386748001
    
    
    192.168.231.102 :5907   :Executor task launch worker-2  :map    :1769298538
    192.168.231.102 :5907   :Executor task launch worker-2  :map    :1769298538
    192.168.231.102 :5907   :Executor task launch worker-2  :map    :1769298538
    
    192.168.231.103 :3329   :Executor task launch worker-2  :map    :1496579174
    
    192.168.231.104 :3207   :Executor task launch worker-2  :map    :2016831251
    192.168.231.104 :3206   :Executor task launch worker-2  :map    :975295493
  • 相关阅读:
    C#多线程编程实战1.5检测线程状态
    C#多线程编程实战1.4终止线程
    C#多线程编程实战1.3等待线程
    C#多线程编程实战1.2暂停线程(休眠)
    C#多线程编程实战1.1创建线程
    C#中base的作用
    C#继承
    C#中return的两个作用
    Windows下完全卸载node.js并安装node.js的多版本管理工具nvm-windows
    执行gulp build报错
  • 原文地址:https://www.cnblogs.com/zyde/p/9225372.html
Copyright © 2011-2022 走看看