zoukankan      html  css  js  c++  java
  • Spark Core

    Spark Core
        DAG概念
            有向无环图
            Spark会根据用户提交的计算逻辑中的RDD的转换(变换方法)和动作(action方法)来生成RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG。
            RDD之间的关系可以从两个维度来理解:一个是RDD是从哪些RDD转换而来,也就是RDD的parent RDD(s)是什么;还有就是依赖于parent RDD(s)的哪些Partition(s)。这个关系,就是RDD之间的依赖,org.apache.spark.Dependency。根据依赖于parent RDD(s)的Partitions的不同情况,Spark将这种依赖分为两种,一种是宽依赖,一种是窄依赖。
        DAG的生成与Stage的划分
            DAG的生成
                原始的RDD(s)通过一系列转换就形成了DAG。RDD之间的依赖关系,包含了RDD由哪些Parent RDD(s)转换而来和它依赖parent RDD(s)的哪些Partitions,是DAG的重要属性。
                借助这些依赖关系,DAG可以认为这些RDD之间形成了Lineage(血统,血缘关系)。借助Lineage,能保证一个RDD被计算前,它所依赖的parent RDD都已经完成了计算;同时也实现了RDD的容错性,即如果一个RDD的部分或者全部的计算结果丢失了,那么就需要重新计算这部分丢失的数据。
            Spark的Stage(阶段)
                Spark在执行任务(job)时,首先会根据依赖关系,将DAG划分为不同的阶段(Stage)
                处理流程是:
                    1)Spark在执行Transformation类型操作时都不会立即执行,而是懒执行(计算)
                    2)执行若干步的Transformation类型的操作后,一旦遇到Action类型操作时,才会真正触发执行(计算)
                    3)执行时,从当前Action方法向前回溯,如果遇到的是窄依赖则应用流水线优化,继续向前找,直到碰到某一个宽依赖
                    4)因为宽依赖必须要进行shuffle,无法实现优化,所以将这一次段执行过程组装为一个stage
                    5)再从当前宽依赖开始继续向前找。重复刚才的步骤,从而将这个DAG还分为若干的stage
                在stage内部可以执行流水线优化,而在stage之间没办法执行流水线优化,因为有shuffle。但是这种机制已经尽力的去避免了shuffle
            Spark的Job和Task
                原始的RDD经过一系列转换后(一个DAG),会在最后一个RDD上触发一个动作,这个动作会生成一个Job。
                所以可以这样理解:一个DAG对应一个Spark的Job。
                在Job被划分为一批计算任务(Task)后,这批Task会被提交到集群上的计算节点去计算Spark的Task分为两种:
                    1)org.apache.spark.scheduler.ShuffleMapTask
                    2)org.apache.spark.scheduler.ResultTask
                简单来说,DAG的最后一个阶段会为每个结果的Partition生成一个ResultTask,其余所有的阶段都会生成ShufffleMapTask。
        RDD
            RDD就是带有分区的集合类型
                RDD是分布式的,弹性的,容错的数据结构
                弹性分布式数据集(RDD),特点是可以并行操作,并且是容错的。有两种方法可以创建RDD:
                    1)执行Transform操作(变换操作),
                    2)读取外部存储系统的数据集,如HDFS,HBase,或任何与Hadoop有关的数据源。
                    注:创建RDD的方式有多种,比如案例一中是基于一个基本的集合类型(Array)转换而来,像parallelize这样的方法还有很多此外,我们也可以在读取数据集时就创建RDD。
                分区概念
                    可以在不同的机器上并行处理
                它是spark提供的一个特殊集合类。诸如普通的集合类型,如传统的Array:(1,2,3,4,5)是一个整体,但转换成RDD后,我们可以对数据进行Partition(分区)处理,这样做的目的就是为了分布式。
                    你可以让这个RDD有两个分区,那么有可能是这个形式:RDD(1,2) (3,4)。
                    这样设计的目的在于:可以进行分布式运算。
            RDD操作
                针对RDD的操作,分两种,一种是Transformation(变换),一种是Actions(执行)。
                Transformation(变换)操作属于懒操作(算子),不会真正触发RDD的处理计算。
                变换方法的共同点:1.不会马上触发计算 2.每当调用一次变换方法,都会产生一个新的RDD,Actions(执行)操作才会真正触发。
            RDD的依赖关系
                RDD和它依赖的parent RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
                1)窄依赖指的是每一个parent RDD的Partition最多被子RDD的一个Partition使用
                    对于窄依赖操作,它们只是将Partition的数据根据转换的规则进行转化,并不涉及其他的处理,可以简单地认为只是将数据从一个形式转换到另一个形式。
                    所以对于窄依赖,并不会引入昂贵的Shuffle。所以执行效率非常高。如果整个DAG中存在多个连续的窄依赖,则可以将这些连续的窄依赖整合到一起连续执行,中间不执行shuffle 从而提高效率,这样的优化方式称之为流水线优化。
                    此外,针对窄依赖,如果子RDD某个分区数据丢失,只需要找到父RDD对应依赖的分区,恢复即可。但如果是宽依赖,当分区丢失时,最糟糕的情况是要重算所有父RDD的所有分区。
                2)宽依赖指的是多个子RDD的Partition会依赖同一个parent RDD的Partition。
                    对于groupByKey这样的操作,子RDD的所有Partition(s)会依赖于parent RDD的所有Partition(s),子RDD的Partition是parent RDD的所有Partition Shuffle的结果。
                Shuffle概述
                    spark中一旦遇到宽依赖就需要进行shuffle的操作,所谓的shuffle的操作的本质就是将数据汇总后重新分发的过程
                    这个过程数据要汇总到一起,数据量可能很大所以不可避免的需要进行数据落磁盘的操作,会降低程序的性能,所以spark并不是完全内存不读写磁盘,只能说它尽力避免这样的过程来提高效率 。
                    spark中的shuffle,在早期的版本中,会产生多个临时文件,但是这种多临时文件的策略造成大量文件的同时的读写,磁盘的性能被分摊给多个文件,每个文件读写效率都不高,影响spark的执行效率。所以在后续的spark中(1.2.0之后的版本)的shuffle中,只会产生一个文件,并且数据会经过排序再附加索引信息,减少了文件的数量并通过排序索引的方式提升了性能。
            RDD容错机制
                分布式系统通常在一个机器集群上运行,同时运行的几百台机器中某些出问题的概率大大增加,所以容错设计是分布式系统的一个重要能力。
                Spark以前的集群容错处理模型,像MapReduce,将计算转换为一个有向无环图(DAG)的任务集合,这样可以通过重复执行DAG里的一部分任务来完成容错恢复。但是由于主要的数据存储在分布式文件系统中,没有提供其他存储的概念,容错过程需要在网络上进行数据复制,从而增加了大量的消耗。所以,分布式编程中经常需要做检查点,即将某个时机的中间数据写到存储(通常是分布式文件系统)中。
                RDD也是一个DAG,每一个RDD都会记住创建该数据集需要哪些操作,跟踪记录RDD的继承关系,这个关系在Spark里面叫lineage(血缘关系)。当一个RDD的某个分区丢失时,RDD是有足够的信息记录其如何通过其他RDD进行计算,且只需重新计算该分区,这是Spark的一个创新。
            RDD的缓存
                相比Hadoop MapReduce来说,Spark计算具有巨大的性能优势,其中很大一部分原因是Spark对于内存的充分利用,以及提供的缓存机制
                RDD持久化(缓存)
                    持久化在早期被称作缓存(cache),但缓存一般指将内容放在内存中。虽然持久化操作在绝大部分情况下都是将RDD缓存在内存中,但一般都会在内存不够时用磁盘顶上去(比操作系统默认的磁盘交换性能高很多)。当然,也可以选择不使用内存,而是仅仅保存到磁盘中。所以,现在Spark使用持久化(persistence)这一更广泛的名称。
                默认情况下,RDD只使用一次,用完即扔,再次使用时需要重新计算得到,而持久化(缓存)操作避免了这里的重复计算,实际测试也显示持久化对性能提升明显,这也是Spark刚出现时被人称为内存计算框架的原因。
                持久化的方法是调用persist()函数,除了持久化至内存中,还可以在persist()中指定storage level参数使用其他的类型,具体如下:
                    1)MEMORY_ONLY : 将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中. 如果内存空间不够,部分数据分区将不会被缓存,在每次需要用到这些数据时重新进行计算. 这是默认的级别。
                    cache()方法对应的级别就是MEMORY_ONLY级别
                    2)MEMORY_AND_DISK:将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
                    3)MEMORY_ONLY_SER :将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式会比反序列化对象的方式节省很多空间,尤其是在使用 fast serialize时会节省更多的空间,但是在读取时会使得 CPU 的 read 变得更加密集。如果内存空间不够,部分数据分区将不会被缓存,在每次需要用到这些数据时重新进行计算。
                    4)MEMORY_AND_DISK_SER :类似于 MEMORY_ONLY_SER ,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
                    5)DISK_ONLY:只在磁盘上缓存 RDD。
                    6)MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. :与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。
                    7)OFF_HEAP 将数据存储在 off-heap memory 中。使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。使用堆外内存的好处:可能会利用到更大的内存存储空间。但是对于数据的垃圾回收会有影响,需要程序员来处理
                    注意,可能带来一些GC回收问题。
                缓存数据的清除
                    Spark 会自动监控每个节点上的缓存数据,然后使用 least-recently-used (LRU) 机制来处理旧的缓存数据。如果你想手动清理这些缓存的 RDD 数据而不是去等待它们被自动清理掉,
                    可以使用 RDD.unpersist( ) 方法。
                Spark 也会自动持久化一些在 shuffle 操作过程中产生的临时数据(比如 reduceByKey),即便是用户并没有调用持久化的方法。这样做可以避免当 shuffle 阶段时如果一个节点挂掉了就得重新计算整个数据的问题。如果用户打算多次重复使用这些数据,我们仍然建议用户自己调用持久化方法对数据进行持久化。
        Spark框架核心概念
            1.RDD。弹性分布式数据集,是Spark最核心的数据结构。有分区机制,所以可以分布式进行处理。有容错机制,通过RDD之间的依赖关系来恢复数据。
            2.依赖关系。RDD的依赖关系是通过各种Transformation(变换)来得到的。父RDD和子RDD之间的依赖关系分两种:①窄依赖  ②宽依赖
                ①针对窄依赖:父RDD的分区和子RDD的分区关系是:一对一
                窄依赖不会发生Shuffle,执行效率高,spark框架底层会针对多个连续的窄依赖执行流水线优化,从而提高性能。例如 map  flatMap等方法都是窄依赖方法
                ②针对宽依赖:父RDD的分区和子RDD的分区关系是:一对多
                宽依赖会产生shuffle,会产生磁盘读写,无法优化。
            3.DAG。有向无环图,当一整条RDD的依赖关系形成之后,就形成了一个DAG。一般来说,一个DAG,最后都至少会触发一个Action操作,触发执行。一个Action对应一个Job任务。
            4.Stage。一个DAG会根据RDD之间的依赖关系进行Stage划分,流程是:以Action为基准,向前回溯,遇到宽依赖,就形成一个Stage。遇到窄依赖,则执行流水线优化(将多个连续的窄依赖放到一起执行)
            5.task。任务。一个分区对应一个task。可以这样理解:一个Stage是一组Task的集合
            6.RDD的Transformation(变换)操作:懒执行,并不会立即执行
            7.RDD的Action(执行)操作:触发真正的执行
        Spark Shuffle详解
            Shuffle,翻译成中文就是洗牌。之所以需要Shuffle,还是因为具有某种共同特征的一类数据需要最终汇聚(aggregate)到一个计算节点上进行计算。这些数据分布在各个存储节点上并且由不同节点的计算单元处理。
            数据重新打乱然后汇聚到不同节点的过程就是Shuffle。但是实际上,Shuffle过程可能会非常复杂:
                1)数据量会很大,比如单位为TB或PB的数据分散到几百甚至数千、数万台机器上。
                2)为了将这个数据汇聚到正确的节点,需要将这些数据放入正确的Partition,因为数据大小已经大于节点的内存,因此这个过程中可能会发生多次硬盘续写。
                3)为了节省带宽,这个数据可能需要压缩,如何在压缩率和压缩解压时间中间做一个比较好的选择?
                4)数据需要通过网络传输,因此数据的序列化和反序列化也变得相对复杂。
                一般来说,每个Task处理的数据可以完全载入内存(如果不能,可以减小每个Partition的大小),因此Task可以做到在内存中计算。但是对于Shuffle来说,如果不持久化这个中间结果,一旦数据丢失,就需要重新计算依赖的全部RDD,因此有必要持久化这个中间结果。所以这就是为什么Shuffle过程会产生文件的原因。
                如果Shuffle过程不落地,①可能会造成内存溢出 ②当某分区丢失时,会重新计算所有父分区数据
            Shuffle Write
                Shuffle Write,即数据是如何持久化到文件中,以使得下游的Task可以获取到其需要处理的数据的(即Shuffle Read)。在Spark 0.8之前,Shuffle Write是持久化到缓存的,但后来发现实际应用中,shuffle过程带来的数据通常是巨量的,所以经常会发生内存溢出的情况,所以在Spark 0.8以后,Shuffle Write会将数据持久化到硬盘,再之后Shuffle Write不断进行演进优化,但是数据落地到本地文件系统的实现并没有改变。
                1)Hash Based Shuffle Write
                    在Spark 1.0以前,Spark只支持Hash Based Shuffle。因为在很多运算场景中并不需要排序,因此多余的排序只能使性能变差,比如Hadoop的Map Reduce就是这么实现的,也就是Reducer拿到的数据都是已经排好序的。实际上Spark的实现很简单:每个Shuffle Map Task根据key的哈希值,计算出每个key需要写入的Partition然后将数据单独写入一个文件,这个Partition实际上就对应了下游的一个Shuffle Map Task或者Result Task。因此下游的Task在计算时会通过网络(如果该Task与上游的Shuffle Map Task运行在同一个节点上,那么此时就是一个本地的硬盘读写)读取这个文件并进行计算。
                    Hash Based Shuffle Write存在的问题
                        1)每个节点可能会同时打开多个文件,每次打开文件都会占用一定内存。假设每个Write Handler的默认需要100KB的内存,那么同时打开这些文件需要50GB的内存,对于一个集群来说,还是有一定的压力的。尤其是如果Shuffle Map Task和下游的Task同时增大10倍,那么整体的内存就增长到5TB。
                        2)从整体的角度来看,打开多个文件对于系统来说意味着随机读,尤其是每个文件比较小但是数量非常多的情况。而现在机械硬盘在随机读方面的性能特别差,非常容易成为性能的瓶颈。如果集群依赖的是固态硬盘,也许情况会改善很多,但是随机写的性能肯定不如顺序写的。
                    Hash Based Shuffle的每个Mapper都需要为每个Reducer写一个文件,供Reducer读取,即需要产生M*R个数量的文件,如果Mapper和Reducer的数量比较大,产生的文件数会非常多。
                2)Sort Based Shuffle Write
                    Spark Core的一个重要的升级就是将默认的Hash Based Shuffle换成了Sort Based Shuffle,即spark.shuffle.manager从Hash换成了Sort
                    对应的实现类分别是
                        org.apache.spark.shuffle.hash.HashShuffleManager
                        org.apache.spark.shuffle.sort.SortShuffleManager。
                    Sort Based Shuffle的模式是:每个Shuffle Map Task不会为每个Reducer生成一个单独的文件;相反,它会将所有的结果写到一个文件里,同时会生成一个Index文件,
                    Reducer可以通过这个Index文件取得它需要处理的数据。避免产生大量文件的直接收益就是节省了内存的使用和顺序Disk IO带来的低延时。节省内存的使用可以减少GC的风险和频率。而减少文件的数量可以避免同时写多个文件给系统带来的压力。
                    Sort Based Write实现详解
                        Shuffle Map Task会按照key相对应的Partition ID进行Sort,其中属于同一个Partition的key不会Sort。因为对于不需要Sort的操作来说,这个Sort是负收益的;要知道之前Spark刚开始使用Hash Based的Shuffle而不是Sort Based就是为了避免Hadoop Map Reduce对于所有计算都会Sort的性能损耗。对于那些需要Sort的运算,
                        比如sortByKey,这个Sort在Spark 1.2.0里还是由Reducer完成的。
                        ①答出shuffle的定义
                        ②spark shuffle的特点
                        ③spark shuffle的目的
                        ④spark shuffel的实现类,即对应优缺点
            Shuffle 相关参数配置
                Shuffle是Spark Core比较复杂的模块,它也是非常影响性能的操作之一。
                1)spark.shuffle.manager
                    两种方式的Shuffle 即Hash Based Shuffle和Sort Based Shuffle
                2)spark.shuffle.spill
                    这个参数的默认值是true,用于指定Shuffle过程中如果内存中的数据超过阈值(参考spark.shuffle.memoryFraction的设置)时是否需要将部分数据临时写入外部存储。
                    如果设置为false,那么这个过程就会一直使用内存,会有内存溢出的风险。因此只有在确定内存足够使用时,才可以将这个选项设置为false。
                3)spark.shuffle.memoryFraction
                    在启用spark.shuffle.spill的情况下,spark.shuffle.memoryFraction决定了当Shuffle过程中使用的内存达到总内存多少比例的时候开始spill。在Spark 1.2.0里,这个值是0.2
                    此参数可以适当调大,可以控制在0.4~0.6。
                    通过这个参数可以设置Shuffle过程占用内存的大小,它直接影响了写入到外部存储的频率和垃圾回收的频率。
                    可以适当调大此值,可以减少磁盘I/O次数。
                4)spark.shuffle.blockTransferService
                    在Spark 1.2.0中这个配置的默认值是netty,而在之前的版本中是nio。它主要是用于在各个Executor之间传输Shuffle数据。netty的实现更加简洁,但实际上用户不用太关心这个选项。除非有特殊需求,否则采用默认配置即可。
                5)spark.shuffle.consolidateFiles
                    这个配置的默认值是false。主要是为了解决在Hash Based Shuffle过程中产生过多文件的问题。如果配置选项为true,那么对于同一个Core上运行的Shuffle Map Task不会产生一个新的Shuffle文件而是重用原来的
                6)spark.shuffle.compress和spark.shuffle.spill.compress
                    这两个参数的默认配置都是true。都是用来设置Shuffle过程中是否对Shuffle数据进行压缩
                    前者针对最终写入本地文件系统的输出文件
                    后者针对在处理过程需要写入到外部存储的中间数据,即针对最终的shuffle输出文件。
                7)spark.reducer.maxMbInFlight
                    这个参数用于限制一个Result Task向其他的Executor请求Shuffle数据时所占用的最大内存数,默认是64MB。尤其是如果网卡是千兆和千兆以下的网卡时。默认值是 设置这个值需要综合考虑网卡带宽和内存。
        Spark调优
            更好的序列化实现
                Spark用到序列化的地方
                    1)Shuffle时需要将对象写入到外部的临时文件。
                    2)每个Partition中的数据要发送到worker上,spark先把RDD包装成task对象,将task通过网络发给worker。
                    3)RDD如果支持内存+硬盘,只要往硬盘中写数据也会涉及序列化。
                默认使用的是java的序列化。但java的序列化有两个问题,一个是性能相对比较低,另外它序列化完二进制的内容长度也比较大,造成网络传输时间拉长。业界现在有很多更好的实现,如kryo,比java的序列化快10倍以上。而且生成内容长度也短。时间快,空间小,自然选择它了。
            通过代码使用Kryo
            配置多临时文件目录
                spark.local.dir参数。当shuffle、归并排序(sort、merge)时都会产生临时文件。这些临时文件都在这个指定的目录下。那这个文件夹有很多临时文件,如果都发生读写操作,有的线程在读这个文件,有的线程在往这个文件里写,磁盘I/O性能就非常低。
                可以创建多个文件夹,每个文件夹都对应一个真实的硬盘。假如原来是3个程序同时读写一个硬盘,效率肯定低,现在让三个程序分别读取3个磁盘,这样冲突减少,效率就提高了。这样就有效提高外部文件读和写的效率。怎么配置呢?只需要在这个配置时配置多个路径就可以。中间用逗号分隔。
                spark.local.dir=/home/tmp,/home/tmp2
            启用推测执行机制
                可以设置spark.speculation  true
                开启后,spark会检测执行较慢的Task,并复制这个Task在其他节点运行,最后哪个节点先运行完,就用其结果,然后将慢Task 杀死
            collect速度慢
                collect只适合在测试时,因为把结果都收集到Driver服务器上,数据要跨网络传输,同时要求Driver服务器内存大,所以收集过程慢。解决办法就是直接输出到分布式文件系统中。
            有些情况下,RDD操作使用MapPartitions替代map
                map方法对RDD的每一条记录逐一操作。mapPartitions是对RDD里的每个分区操作
                rdd.map{ x=>conn=getDBConn.conn;write(x.toString);conn close;}
                这样频繁的链接、断开数据库,效率差。
                rdd.mapPartitions{(record:=>conn.getDBConn;for(item<-recorders;write(item.toString);conn close;}
                这样就一次链接一次断开,中间批量操作,效率提升。
            Spark的GC调优
                由于Spark立足于内存计算,常常需要在内存中存放大量数据,因此也更依赖JVM的垃圾回收机制(GC)。并且同时,它也支持兼容批处理和流式处理,对于程序吞吐量和延迟都有较高要求,因此GC参数的调优在Spark应用实践中显得尤为重要。
                主要有两种策略——Parallel GC(吞吐量优先)和CMS GC(低延迟响应)。
                GC算法原理
                    对于内存较大的环境非常友好。因为G1 GC对于内存的使用率特别高,内存越大,此优势越明显。
                选择垃圾收集器
                    park默认使用的是Parallel GC。经调研我们发现,Parallel GC常常受困于Full GC,而每次Full GC都给性能带来了较大的下降。而Parallel GC可以进行参数调优的空间也非常有限,我们只能通过调节一些基本参数来提高性能,如各年代分区大小比例、进入老年代前的拷贝次数等。而且这些调优策略只能推迟Full GC的到来,如果是长期运行的应用,Parallel GC调优的意义就非常有限了。
                将InitiatingHeapOccupancyPercent参数调低(默认值是45),可以使G1 GC收集器更早开始Mixed GC(Minor GC);但另一方面,会增加GC发生频率。(启动并发GC周期时的堆内存占用百分比. G1之类的垃圾收集器用它来触发并发GC周期,基于整个堆的使用率,而不只是某一代内存的使用比. 值为 0 则表示"一直执行GC循环". 默认值为 45.)降低此值,会提高Minor GC的频率,但是会推迟Full GC的到来。
                提高ConcGCThreads的值,在Mixed GC阶段投入更多的并发线程,争取提高每次暂停的效率。但是此参数会占用一定的有效工作线程资源。
                调试这两个参数可以有效降低Full GC出现的概率。Full GC被消除之后,最终的性能获得了大幅提升。
            Spark的内存管理
                Spark的核心概念是RDD,实际运行中内存消耗都与RDD密切相关。Spark允许用户将应用中重复使用的RDD数据持久化缓存起来,从而避免反复计算的开销,而RDD的持久化形态之一就是将全部或者部分数据缓存在JVM的Heap中。当我们观察到GC延迟影响效率时,应当先检查Spark应用本身是否有效利用有限的内存空间。RDD占用的内存空间比较少的话,程序运行的heap空间也会比较宽松,GC效率也会相应提高;而RDD如果占用大量空间的话,则会带来巨大的性能损失
            总结
                对于大量依赖于内存计算的Spark应用,GC调优显得尤为重要。在发现GC问题的时候,不要着急调试GC。而是先考虑是否存在Spark进程内存管理的效率问题,例如RDD缓存的持久化和释放。至于GC参数的调试,首先我们比较推荐使用G1 GC来运行Spark应用。相较于传统的垃圾收集器,随着G1的不断成熟,需要配置的选项会更少,能同时满足高吞吐量和低延迟的寻求。当然,GC的调优不是绝对的,不同的应用会有不同应用的特性,掌握根据GC日志进行调优的方法,才能以不变应万变。最后,也不能忘了先对程序本身的逻辑和代码编写进行考量,例如减少中间变量的创建或者复制,控制大对象的创建,将长期存活对象放在Off-heap中等等。
        Checkpoint机制
            checkpoint的意思就是建立检查点,类似于快照,例如在spark计算里面 计算流程DAG特别长,服务器需要将整个DAG计算完成得出结果,但是如果在这很长的计算流程中突然中间算出的数据丢失了,spark又会根据RDD的依赖关系从头到尾计算一遍,这样子就很费性能,当然我们可以将中间的计算结果通过cache或者persist放到内存或者磁盘中,但是这样也不能保证数据完全不会丢失,存储的这个内存出问题了或者磁盘坏了,也会导致spark从头再根据RDD计算一遍,所以就有了checkpoint,其中checkpoint的作用就是将DAG中比较重要的中间数据做一个检查点将结果存储到一个高可用的地方
            总结:Spark的CheckPoint机制很重要,也很常用,尤其在机器学习中的一些迭代算法中很常见。比如一个算法迭代10000次,如果不适用缓冲机制,如果某分区数据丢失,会导致整个计算链重新计算,所以引入缓存机制。但是光引入缓存,也不完全可靠,比如缓存丢失或缓存存储不下,也会导致重新计算,所以使用CheckPoint机制再做一层保证。
            补充:检查目录的路径,一般都是设置到HDFS上
            Spark懒执行的意义
                Spark中,Transformation方法都是懒操作方法,比如map,flatMap,reduceByKey等。当触发某个Action操作时才真正执行。
                懒操作的意义:
                    ①不运行job就触发计算,避免了大量的无意义的计算,即避免了大量的无意义的中间结果的产生,即避免产生无意义的磁盘I/O及网络传输
                    ②更深层次的意义在于,执行运算时,看到之前的计算操作越多,执行优化的可能性就越高
        Spark共享变量
            Spark程序的大部分操作都是RDD操作,通过传入函数给RDD操作函数来计算。这些函数在不同的节点上并发执行,但每个内部的变量有不同的作用域,不能相互访问,所以有时会不太方便,Spark提供了两类共享变量供编程使用——广播变量和计数器
            1. 广播变量
                这是一个只读对象,在所有节点上都有一份缓存,创建方法是SparkContext.broadcast()
                注意,广播变量是只读的,所以创建之后再更新它的值是没有意义的,一般用val修饰符来定义广播变量。
            2. 计数器
                计数器只能增加,是共享变量,用于计数或求和。
                计数器变量的创建方法是SparkContext.accumulator(v, name),其中v是初始值,name是名称。
        spark解决数据倾斜问题
            将少量的数据转化为Map进行广播,广播会将此 Map 发送到每个节点中,如果不进行广播,每个task执行时都会去获取该Map数据,造成了性能浪费。

  • 相关阅读:
    powerpc调试工具的使用
    微软的开发工具是我见过比较难用的
    Spring之jdbcTemplate:查询的三种方式(单个值、单个对象、对象集合)
    静态工厂方法和实例工厂方法
    理解.NET Framework
    .NET相关的概念简介
    二、文档工具 swagger
    Java Web学习(二)数据加密
    Java Web学习(一)Web基础
    Oracle学习(十二)标量函数
  • 原文地址:https://www.cnblogs.com/Striverchen/p/10557905.html
Copyright © 2011-2022 走看看