zoukankan      html  css  js  c++  java
  • spark基础知识汇总

    基础

    概述

    • Spark计算平台有两个重要角色,Driver和executor
    • Driver
      • Driver充当Application的master角色,负责任务执行计划生成和任务分发及调度;
      • Driver负责生成逻辑查询计划、物理查询计划和把任务派发给executor
    • Executor
      • Executor充当worker角色,负责实际执行任务的task,计算的结果返回Driver。
      • Executor接受任务后进行处理,离线计算也是按这个流程进行。

    分工

    • RDD Objects生成逻辑查询计划
    • 生成物理查询计划DAGScheduler
    • 任务调度TaskScheduler
    • 任务执行Executor

    作业提交流程

    1. client submit作业,通过反射invoke执行用户代码main函数,启动CoarseGrainedExecutorBackend、初始化SparkContext。
    2. SparkContext初始化包括初始化监控页面SparkUI、执行环境SparkEnv、安全管理器SecurityManager、stage划分及调度器DAGScheduler、task作业调度器TaskSchedulerImpl和与Executor通信的调度端CoarseGrainedSchedulerBackend。
    3. DAGScheduler将作业划分后,依次提交stage对应的taskSet给TaskSchedulerImpl。
    4. TaskSchedulerImpl会submit taskset给driver端的CoarseGrainedSchedulerBackend后端。
    5. CoarseGrainedSchedulerBackend会逐个LaunchTask
    6. 在远端的CoarseGrainedExecutorBackend接收到task提交event后,会调用Executor执行task
    7. 最终task是由TaskRunner的run方法内运行。

    image

    Executor

    • 首先executor端的rpc服务端点收到LaunchTask的消息,并对传过来的任务数据进行反序列化成TaskDescription将任务交给Executor对象运行
    • Executor根据传过来的TaskDescription对象创建一个TaskRunner对象,并放到线程池中运行。这里的线程池用的是Executors.newCachedThreadPool,空闲是不会有线程在跑
    • TaskRunner对任务进一步反序列化,调用Task.run方法执行任务运行逻辑
    • ShuffleMapTask类型的任务会将rdd计算结果数据经过排序合并之后写到一个文件中,并写一个索引文件
    • 任务运行完成后会更新一些任务统计量和度量系统中的一些统计量
    • 最后会根据结果序列化后的大小选择不同的方式将结果传回driver。

    共享变量

    Broadcast Variable(广播变量)

    • Broadcast Variable会将使用到的变量,仅仅为每个节点拷贝一份,而不是给节点上的每个task拷贝一份。这样可以优化性能,减少网络传输及内存消耗。
    • Broadcast Variable主要用于共享读,是只读的,没法去写
    • 可以通过调用SparkContext的broadcast()方法,来针对某个变量创建广播变量,返回类型是Broadcast。然后在算子的函数内,使用广播变量,此时每个节点都只会拷贝一份,每个节点可以使用广播变量的value()方法获取值。广播变量是只读的,不可写。

    Accumulator(累加变量)

    Accumulator可以让多个task共同操作一份变量,主要可以进行累加操作。

    内存管理

    • spark.executor.memory设置executor可用内存,包含
      • reservedMemory :默认300M
      • usableMemory
        • Execution 内存: 存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据
        • Storage 内存: 存储 spark 的 cache 数据,例如RDD的缓存、unroll数据;
        • 用户内存(User Memory):存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息

    相关配置

    • spark.memory.storageFraction:配置usableMemory中Storage内存占比
    • spark.memory.offHeap.enabled:堆外内存是否开启,默认不开启
    • spark.memory.offHeap.size:堆外内存大小

    堆内内存

    默认情况下,Spark 仅仅使用了堆内内存。Executor 端的堆内内存区域大致可以分为以下四大块

    • Execution 内存:主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据
    • Storage 内存:主要用于存储 spark 的 cache 数据,例如RDD的缓存、unroll数据;
    • 用户内存(User Memory):主要用于存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息。
    • 预留内存(Reserved Memory):系统预留内存,会用来存储Spark内部对象。

    堆外内存

    通过 spark.memory.offHeap.enabled 参数启用,并且通过 spark.memory.offHeap.size 设置堆外内存大小,单位为字节。如果堆外内存被启用,那么 Executor 内将同时存在堆内和堆外内存,两者的使用互补影响,
    堆外内存只区分 Execution 内存和 Storage 内存

    Execution 内存和 Storage 内存动态调整

    • 若Execution内存与Storage内存都不足时,按照LRU规则存储到磁盘;
    • 若Execution内存不足,Storage内存有结余,Storage 内存的空间被占用后,目前的实现是无法让对方"归还"
    • 若Storage内存不足,Execution内存有结余,Execution内存的空间被占用后,可让对方将占用的部分转存到硬盘,然后"归还"借用的空间,因为Cache在内存的数据不一定后面会用

    Task之间内存分布

    • Task共享着 Execution 内存
    • Spark 内部维护了一个 HashMap 用于记录每个 Task 占用的内存
    • 每个Task可用内存为Executor可用内存的 1/2N ~ 1/N,N是Task的个数

    Spark Core

    spark的shuffle

    • 前一个stage 的 ShuffleMapTask 进行 shuffle write, 把数据存储在 blockManager 上面, 并且把数据位置元信息上报到 driver 的 mapOutTrack 组件中, 下一个 stage 根据数据位置元信息, 进行 shuffle read, 拉取上个stage 的输出数据。

    shuffle write

    分为三种writer, 分为 BypassMergeSortShuffleWriter, SortShuffleWriter 和 UnsafeShuffleWriter

    BypassMergeSortShuffleWriter
    • 开启map side combine并且分区数较少
    • BypassMergeSortShuffleWriter和Hash Shuffle中的HashShuffleWriter实现基本一致, 唯一的区别在于,map端的多个输出文件会被汇总为一个文件。 所有分区的数据会合并为同一个文件,会生成一个索引文件,是为了索引到每个分区的起始地址,可以随机 access 某个partition的所有数据。
    • 这种方式不宜有太多分区,因为过程中会并发打开所有分区对应的临时文件,会对文件系统造成很大的压力。
    • 给每个分区分配一个临时文件,对每个 record的key 使用分区器(模式是hash,如果用户自定义就使用自定义的分区器)找到对应分区的输出文件句柄,直接写入文件,没有在内存中使用 buffer。 最后copyStream方法把所有的临时分区文件拷贝到最终的输出文件中,并且记录每个分区的文件起始写入位置,把这些位置数据写入索引文件中。
    SortShuffleWriter
    • 使用 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 在内存中进行排序, 排序的 K 是(partitionId, hash(key)) 这样一个元组。
    • 如果超过内存 limit, 我 spill 到一个文件中,这个文件中元素也是有序的,首先是按照 partitionId的排序,如果 partitionId 相同, 再根据 hash(key)进行比较排序
    • 如果需要输出全局有序的文件的时候,就需要对之前所有的输出文件 和 当前内存中的数据结构中的数据进行 merge sort, 进行全局排序
    UnsafeShuffleWriter
    • UnsafeShuffleWriter需要Serializer支持relocation
    • UnsafeShuffleWriter 里面维护着一个 ShuffleExternalSorter, 用来做外部排序
    区别 UnsafeShuffleWriter SortShuffleWriter
    排序方式 最终只是 partition 级别的排序 先 partition 排序,相同分区 key有序
    aggregation 没有饭序列化,没有aggregation 支持 aggregation

    shuffle read

    内存管理——Tungsten

    • TaskMemoryManager用来统一这两种内存:堆内内存和堆外内存
    • MemoryBlock 继承 MemoryLocation 代表着对内存的定位,这个对象可以把off-heap 和on-heap 进行统一, MemoryLocation 对于off-heap的memory,obj为null,offset则为绝对的内存地址,对于on-heap的memory,obj则是JVM对象的基地址,offset则是相对于改对象基地址的偏移。

    Spark SQL

    Parser模块

    • 将sparkSql字符串切分成一个一个token,再根据一定语义规则解析为一个抽象语法树/AST。Parser模块目前基本都使用第三方类库ANTLR来实现,比如Hive,presto,sparkSql等。
    • SqlBaseLexer和SqlBaseParser都是使用ANTLR4自动生成的Java类。使用这两个解析器将SQL字符串语句解析成了ANTLR4的ParseTree语法树结构。然后在parsePlan过程中,使用AstBuilder.scala将ParseTree转换成catalyst表达式逻辑计划LogicalPlan。

    Analyzer模块

    • 基本的元数据信息schema catalog来表达这些token。最重要的元数据信息就是,
      • 表的schema信息,主要包括表的基本定义(表名、列名、数据类型)、表的数据格式(json、text、parquet、压缩格式等)、表的物理位置
      • 基本函数信息,主要是指类信息
    • Analyzer会再次遍历整个AST,对树上的每个节点进行数据类型绑定以及函数绑定,比如people词素会根据元数据表信息解析为包含age、id以及name三列的表,people.age会被解析为数据类型为int的变量,sum会被解析为特定的聚合函数,

    Optimizer模块

    • Optimizer是catalyst的核心,分为RBO和CBO两种。
    • RBO的优化策略就是对语法树进行一次遍历,模式匹配能够满足特定规则的节点,再进行相应的等价转换,即将一棵树等价地转换为另一棵树。SQL中经典的常见优化规则有,
      +谓词下推(predicate pushdown)
      +常量累加(constant folding)
      +列值裁剪(column pruning)
      • Limits合并(combine limits)

    SparkPlanner模块

    • 至此,OLP已经得到了比较完善的优化,然而此时OLP依然没有办法真正执行,它们只是逻辑上可行,实际上spark并不知道如何去执行这个OLP。
    • 此时就需要将左边的OLP转换为physical plan物理执行计划,将逻辑上可行的执行计划变为spark可以真正执行的计划。
    • 比如join算子,spark根据不同场景为该算子制定了不同的算法策略,有broadcastHashJoin、shuffleHashJoin以及sortMergeJoin,物理执行计划实际上就是在这些具体实现中挑选一个耗时最小的算法实现,这个过程涉及到cost model/CBO

    WholeStageCodegen

    • WholeStageCodegen,将多个operators合并成一个java函数,从而提高执行速度

    Spark MLLib

    Spark Streaming

    • 数据流经过Spark Streaming的receiver,数据切分为DStream(类似RDD,DStream是Spark Streaming中流数据的逻辑抽象),然后DStream被Spark Core的离线计算引擎执行并行处理。

    流程

    • 实时计算与离线计算一样,主要组件是Driver和Executor的。不同的是多了数据采集和数据按时间分片过程,数据采集依赖外部数据源,这里用MessageQueue表示
    • 数据分片则依靠内部一个时钟Clock,按batch interval来定时对数据分片,然后把每一个batch interval内的数据提交处理。
    • Executor从MessageQueue获取数据并交给BlockManager管理,然后把元数据信息BlockID返给driver的Receiver Tracker
    • driver端的Job Jenerator对一个batch的数据生成JobSet,最后把作业执行计划传递给executor处理。

    image

    Structure Streaming

    将数据抽象为DataFrame,即无边界的表,通过将数据源映射为一张无界长度的表,通过表的计算,输出结果映射为另一张表。这样以结构化的方式去操作流式数据,简化了实时计算过程,同时还复用了其Catalyst引擎来优化SQL操作。此外还能支持增量计算和基于event time的计算。

    Spark thrift server优化

    • spark常驻Driver
    • 增加用户概念,application有用户归属

    实现流程

    • spark thrift server基于hive jdbc服务实现

    特性

    • 支持手动缓存中间结果集,Statement级别复用
    • 支持提交离线作业、监控离线作业、清理离线作业
      • 增加SparkListener监控job状态,然后取数据
      • 调用spark restful接口 kill job

    ES-Spark优化

    Spark DirectQuery ES

    • 原生API会解析json,拼接到query中,不灵活

      • 例如query+highlight
      • query + sort
      • 等等
    • Spark-SQL 的 where 语句全部(或者部分)下沉到 ES里进行执行,依赖于倒排索引,DocValues,以及分片,并行化执行,ES能够获得比Spark-SQL更优秀的响应时间

    • 分片数据Merge(Reduce操作,Spark 可以获得更好的性能和分布式能力),更复杂的业务逻辑都交给Spark-SQL (此时数据规模已经小非常多了),并且可以做各种自定义扩展,通过udf等函数

    • ES 无需实现Merge操作,可以减轻内存负担,提升并行Merge的效率(并且现阶段似乎ES的Reduce是只能在单个实例里完成)

    实现流程

    ## package.scala
    + 增加接口
    

    def esDirectRDD(resource: String, cfg: scala.collection.Map[String, String]) = EsSpark.esDirectRDD(sc, resource, cfg)

    
    ## EsSpark
    + 增加接口
    

    def esDirectRDD(sc: SparkContext, resource: String, cfg: Map[String, String]): RDD[(String, Map[String, AnyRef])] =
    new ScalaEsRDD[Map[String, AnyRef]](sc, collection.mutable.Map(cfg.toSeq: _*) += (ES_RESOURCE_READ -> resource))

    
    ## ScalaESDirectRDD
    + 继承AbstractDirectEsRDD
    
    
    ## AbstractDirectEsRDD
    + 抽象ESRDD类
    
    ## AbstractEsDirectRDDIterator
    + 对应AbstractEsRDDIterator
    + 处理直连查询
    
    ## PartitionDirectReader
    + 处理director查询
    
    ## SearchRequestBuilder
    + 增加buildDirect方法
    
    ## DirectQuery
    + 与ScrollQuery一致
    + 包含DirectReader的引用,处理结果集
    + 调用RestRepository,处理结果集
    
    
    ## RestRepository
    ### direrctQuery
    + 发起Restful请求,并使用传入的Reader处理返回的结果;
    + 原始scroll接口,处理scroll查询
    + 增加directQuery接口,处理direrctQuery查询
    
    ### scanDirect
    + 直接创建DirectQuery
    
    
    ## DirectSearchRequestBuilder
    + 构建查询;
    
    ## DirectReader
    + 对应ScrollReader
    + 处理结果集请求
    + 从Hits开始解析
    
    ## SimpleQueryParser
    + 增加parseESQuery
    

    agg支持

    • 实现流程类似,但执行节点为节点级别,而不是shard级别
    • 数据结果解析,对agg别名设定规则
    • 结果解析为同一个partition/各个shard对应一个partition,需要二次处理
    • 后续:设定参数,进行shard级别聚合,并对聚合结果改写、合并

    spark MLLib

    源码

    thrift server

    ES-Hadoop

    调优

    动态资源分配

    • spark.dynamicAllocation.enabled:该配置项用于配置是否使用动态资源分配,根据工作负载调整应用程序注册的executor的数量。默认为false(至少在spark2.2-spark2.4中如此),在CDH发行版中默认为true,
    • 如果启用动态分配,在executor空闲spark.dynamicAllocation.executorIdleTimeout(默认60s)之后将被释放。

    动态资源分配策略

    开启动态分配策略后,application会在task因没有足够资源被挂起的时候去动态申请资源,这种情况意味着该application现有的executor无法满足所有task并行运行。spark一轮一轮的申请资源,当有task挂起或等待spark.dynamicAllocation.schedulerBacklogTimeout(默认1s)时间的时候,会开始动态资源分配;之后会每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout(默认1s)时间申请一次,直到申请到足够的资源。每次申请的资源量是指数增长的,即1,2,4,8等。
    之所以采用指数增长,出于两方面考虑:其一,开始申请的少是考虑到可能application会马上得到满足;其次要成倍增加,是为了防止application需要很多资源,而该方式可以在很少次数的申请之后得到满足。

    资源回收策略

    当application的executor空闲时间超过spark.dynamicAllocation.executorIdleTimeout(默认60s)后,就会被回收。

    内存占用

    调优内存的使用主要有三个方面的考虑:对象的内存占用量(你可能希望整个数据集都适合内存),访问这些数据的开销,垃圾回收的负载。

    默认情况下,java的对象是可以快速访问的,但是相比于内部的原始数据消耗估计2-5倍的空间。主要归于下面三个原因:
    1),每个不同的Java对象都有一个“对象头”,它大约是16个字节,包含一个指向它的类的指针。对于一个数据很少的对象(比如一个Int字段),这可以比数据大。
    2),Java字符串在原始字符串数据上具有大约40字节的开销(因为它们将它们存储在一个Chars数组中,并保留额外的数据,例如长度),并且由于String的内部使用UTF-16编码而将每个字符存储为两个字节。因此,一个10个字符的字符串可以容易地消耗60个字节。
    3),常用集合类(如HashMap和LinkedList)使用链接的数据结构,其中每个条目都有一个“包装器”对象(例如Map.Entry)。该对象不仅具有头部,还包括指针(通常为8个字节)到列表中的下一个对象。
    4),原始类型的集合通常将它们存储为“boxed”对象,如java.lang.Integer。

    确定内存的消耗

    最好的方式去计算一个数据的的内存消耗,就是创建一个RDD,然后加入cache,这样就可以在web ui中Storage页面看到了。页面会告诉你,这个RDD消耗了多少内存。
    要估计特定对象的内存消耗,请使用SizeEstimator的估计方法。这对于尝试使用不同的数据布局来修剪内存使用情况以及确定广播变量在每个执行程序堆中占用的空间量非常有用。

    调优数据结构

    减少内存消耗的第一种方法是避免使用增加负担的java特性,例如基于指针的数据结构和包装对象。下面几种方法可以来避免这个。

    • 1,将数据结构设计为偏好对象数组和原始类型,而不是标准的Java或Scala集合类(例如HashMap)。fastutil库(http://fastutil.di.unimi.it/)为与Java标准库兼容的原始类型提供方便的集合类。
    • 2,尽可能避免使用有很多小对象和指针的嵌套结构。
    • 3,针对关键词,考虑使用数字ID或者枚举对象而不是字符串。
    • 4,如果您的RAM少于32 GB,请设置JVM标志-XX:+ UseCompressedOops使指针为四个字节而不是八个字节。您可以在spark-env.sh中添加这些选项。

    序列化RDD

    尽管进行了调优,当您的对象仍然太大而无法有效存储时,一个简单的方法来减少内存使用是使用RDD持久性API中的序列化StorageLevel(如MEMORY_ONLY_SER)以序列化形式存储它们。Spark将会将每个RDD分区存储为一个大字节数组。以序列化形式存储数据的唯一缺点是数据访问变慢,因为必须对每个对象进行反序列化。如果您想以序列化形式缓存数据,我们强烈建议使用Kryo,因为它会使数据比java序列化后的大小更小(而且肯定比原Java对象更小)。

    垃圾回收调优

    • 垃圾收集的成本与Java对象的数量成正比,因此使用较少对象的数据结构(例如,Ints数组,代替LinkedList)将大大降低了成本。一个更好的方法是以序列化形式持久化对象,如上所述:每个RDD分区将只有一个对象(一个字节数组)。在尝试其他技术之前,如果GC是一个问题,首先要尝试的是使用序列化缓存。

    由于任务的运行内存和RDD的缓存内存的干扰,GC也会是一个问题。

    测量GC的影响

    GC调优的第一步是收集关于垃圾收集发生频率和GC花费的时间的统计信息。通过将-verbose:gc -XX:+ PrintGCDetails -XX:+ PrintGCTimeStamps添加到Java选项来完成。下次运行Spark作业时,每当垃圾收集发生时,都会看到在工作日志中打印的消息。请注意,这些日志将在您的群集的Executor节点上(在其工作目录中的stdout文件中),而不是您的driver功能中。

    高级GC调优

    Spark应用程序GC调优的目标是,确保生命周期比较长的RDD保存在老年代,新生代有足够的空间保存生命周期比较短的对象。这有助于避免触发Full GC去收集task运行期间产生的临时变量。下面列举几个有用的步骤:

    • 1),通过收集垃圾回收信息,判断是否有太多的垃圾回收过程。假如full gc在一个task完成之前触发了好几次,那说明运行task的内存空间不足,需要加内存。
    • 2),在gc的统计信息中,如果老年代接近满了,减少用于缓存的内存(通过减小spark.memory.Fraction)。缓存较少的对象比降低运行速度对我们来说更有好处。另外,可以考虑减少年轻代。可以通过减小-Xmn参数设置的值,假如使用的话。假如没有设置可以修改JVM的NewRation参数。大多数JVMs默认值是2,意思是老年代占用了三分之二的总内存。这个值要足够大,相当于扩展了spark.memory.fraction.
    • 3),如果有太多的minor gc,较少的major gc,增加Eden区内存会有帮助。将Eden区内存设置的比task运行估计内存稍微大一些。如果Eden区大小确定为E,那就将新生代的内存设置为-Xmn=4/3E,按比例增加内存是考虑到survivor区所占用的内存。
    • 4),尝试通过设置-XX:+UseG1GC垃圾回收器为G1。在垃圾回收器是瓶颈的一些情况下,它可以提高性能。请注意,对于大的Executor堆,通过使用-XX:G!HeapRegionSize去增大G1的堆大小,显得尤为重要。
    • 5),例如,如果您的任务是从HDFS读取数据,则可以使用从HDFS读取的数据块的大小来估计任务使用的内存量。请注意,解压缩块的大小通常是块大小的2或3倍。所以如果我们希望有3或4个任务的工作空间,HDFS块的大小是64 MB,我们可以估计Eden的大小是4 * 3 * 64MB。
    • 6),监控垃圾收集的频率和时间如何随着新设置的变化而变化。

    经验表明,GC调整的效果取决于您的应用程序和可用的内存量。

    并行度

    并发不足会导致集群浪费。

    • Spark自动会根据文件的大小,是否可分割等因素来设置map的数目;
    • 对于分布式reduce操作,例如groupbykey和reducebykey,reduce默认数量是分区数最大的父RDD的分区数;
    • 你也可以通过设置spark.default.parallelism来改变默认值,建议值是每个CPU执行2-3个tasks。

    Reduce任务的内存使用

    • 内存溢出并一定是RDD不适合放在内存里面,可能因为task的数据集太大了。
    • Spark的shuffle操作(sortByKey, groupByKey, reduceByKey, join, etc)会构建一个hash表,每个task执行一个分组的数据,单个往往会很大。
    • 最简单的改善方法是增加并行度,让每个task的输入变得更小。
    • Spark可以高效的支持短达200ms的任务,因为复用了Executor的JVM,这可以降低启动成本,所以你可以很安全的增加并行度,使其超过你的集群core数目。

    广播变量

    • spark的广播功能可以大幅度减少每个序列化后的task的大小,也可以减少在集群中执行一个job的代价。
    • 如果你的任务中使用了大的对象,比如静态表,可以考虑将它声明成广播变量。
    • 在driver节点,spark会打印出每个task序列化后的大小,所以你可以通过查看task的大小判断你的task是否过大,通常task的大小超过20KB就值得调优了。

    数据本地化

    • 数据的本地性可能会对Spark jobs产生重大影响。如果数据和在其上操作的代码在一起,则计算往往是快速的。但如果代码和数据分开,则必须要有一方进行移动。典型的情况是将序列化后的代码移动到数据所在的地方,因为数据往往比代码大很多。Spark构建调度计划的原则就是数据本地性。
    • 根据数据和代码当前的位置,数据本地性等级。
    • Spark倾向于调度任务依据最高的数据本地性,但这往往是不可能的。在任何空闲的Executor上没有未处理数据的情况下,Spark会切换到较低的数据本地性。这种情况下会有两个选择:
      • 1),等待CPU空闲,然后在相同的server上启动task。
      • 2),立即在一个需要迁移数据的较远位置启动一个新的task。
    • Spark的典型处理策略是等待繁忙CPU释放,时间很短。一旦超时,将移动数据到空闲CPU的地方执行任务。每个级别之间的回退等待超时可以在一个参数中单独配置或全部配置。如果任务较长,且数据本地性较差,可以适当调整Spark.locatity超时时间相关的配置。

    从最近到最远的顺序列出如下:

    PROCESS_LOCAL

    数据和代码在同一个JVM中,这是最佳的数据本地性。

    NODE_LOCAL

    数据和代码在相同的节点。比如数据在同一节点的HDFS上,或者在统一节点的Executor上。由于数据要在多个进程间移动,所以比PROCESS_LOCAL稍慢。

    NO_PREF

    数据可以从任何地方快速访问,没有数据本地性。

    RACK_LOCAL

    数据和代码在相同的机架。数据位于同一机架上的不同服务器上,因此需要通过网络发送,通常通过单个交换机发送

    ANY

    数据在网络上的其他地方,而不在同一个机架中。

    数据倾斜

    数据源

    • 尽量使用可切分的格式代替不可切分的格式,或者保证各文件实际包含数据量大致相同。

    调整并行度分散同一个Task的不同Key

    Spark在做Shuffle时,默认使用HashPartitioner(非Hash Shuffle)对数据进行分区。如果并行度设置的不合适,可能造成大量不相同的Key对应的数据被分配到了同一个Task上,造成该Task所处理的数据远大于其它Task,从而造成数据倾斜。

    如果调整Shuffle时的并行度,使得原本被分配到同一Task的不同Key发配到不同Task上处理,则可降低原Task所需处理的数据量,从而缓解数据倾斜问题造成的短板效应。

    适用场景

    大量不同的Key被分配到了相同的Task造成该Task数据量过大。

    自定义Partitioner

    使用自定义的Partitioner(默认为HashPartitioner),将原本被分配到同一个Task的不同Key分配到不同Task。

    适用场景

    大量不同的Key被分配到了相同的Task造成该Task数据量过大。

    将Reduce side Join转变为Map side Join

    正确的使用Broadcast实现Map侧Join的方式是,通过SET spark.sql.autoBroadcastJoinThreshold=104857600;将Broadcast的阈值设置得足够大。

    适用场景

    参与Join的一边数据集足够小,可被加载进Driver并通过Broadcast方法广播到各个Executor中。

    为skew的key增加随机前/后缀

    为数据量特别大的Key增加随机前/后缀,使得原来Key相同的数据变为Key不相同的数据,从而使倾斜的数据集分散到不同的Task中,彻底解决数据倾斜问题。Join另一则的数据中,与倾斜Key对应的部分数据,与随机前缀集作笛卡尔乘积,从而保证无论数据倾斜侧倾斜Key如何加前缀,都能与之正常Join。

    步骤

    现通过如下操作,实现倾斜Key的分散处理

    • 将leftRDD中倾斜的key(即9500048与9500096)对应的数据单独过滤出来,且加上1到24的随机前缀,并将前缀与原数据用逗号分隔(以方便之后去掉前缀)形成单独的leftSkewRDD
    • 将rightRDD中倾斜key对应的数据抽取出来,并通过flatMap操作将该数据集中每条数据均转换为24条数据(每条分别加上1到24的随机前缀),形成单独的rightSkewRDD
    • 将leftSkewRDD与rightSkewRDD进行Join,并将并行度设置为48,且在Join过程中将随机前缀去掉,得到倾斜数据集的Join结果skewedJoinRDD
    • 将leftRDD中不包含倾斜Key的数据抽取出来作为单独的leftUnSkewRDD
    • 对leftUnSkewRDD与原始的rightRDD进行Join,并行度也设置为48,得到Join结果unskewedJoinRDD
    • 通过union算子将skewedJoinRDD与unskewedJoinRDD进行合并,从而得到完整的Join结果集

    适用场景

    两张表都比较大,无法使用Map则Join。其中一个RDD有少数几个Key的数据量过大,另外一个RDD的Key分布较为均匀。

    大表随机添加N种随机前缀,小表扩大N倍

    如果出现数据倾斜的Key比较多,上一种方法将这些大量的倾斜Key分拆出来,意义不大。此时更适合直接对存在数据倾斜的数据集全部加上随机前缀,然后对另外一个不存在严重数据倾斜的数据集整体与随机前缀集作笛卡尔乘积(即将数据量扩大N倍)。

    适用场景

    一个数据集存在的倾斜Key比较多,另外一个数据集数据分布比较均匀。

    Adaptive Execution

    http://www.jasongj.com/spark/adaptive_execution/

    参考

  • 相关阅读:
    投票系统完善
    投票系统设计与实现
    一天天进步
    洛谷P4168 [Violet]蒲公英 题解 数列分块
    LOJ6285. 数列分块入门 9 题解
    洛谷P5340 大中锋的游乐场 题解 分层图最短路
    P1073 [NOIP2009 提高组] 最优贸易 题解 分层图最短路
    洛谷P7297 [USACO21JAN] Telephone G 题解 分层图最短路
    洛谷P1119 灾后重建 题解 Floyd算法
    安装redis 后本地系统空间越来越小
  • 原文地址:https://www.cnblogs.com/bigbigtree/p/11555098.html
Copyright © 2011-2022 走看看