zoukankan      html  css  js  c++  java
  • 《尚硅谷大数据课程Spark+Spark高级(内存解析+性能调优)》

    https://www.bilibili.com/video/BV1E4411i771?p=23


    23/62

    join 

    cogroup 

    ...

    上面的 (a,b) 里面的 a 是(x,1)。。。

     也就是下面的 V=>C 的 C 

     

     

     

    byKey 

    RDD 任务切分中间分为:Application、Job、Stage 和 Task

    (1)Application:初始化一个 SparkContext 即生成一个 Application;

    (2)Job:一个 Action 算子就会生成一个 Job;

    (3)Stage:Stage 等于宽依赖的个数加 1;

    (4)Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。

    Action 算子、宽依赖个数、分区个数

    注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系

    主要步骤

    // 代码样例
    def main(args: Array[String]): Unit = {
        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)
        val rdd:RDD[String] = sc.textFile("input/1.txt")
        val mapRdd = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
        mapRdd.saveAsTextFile("outpath")
        //3.关闭连接
        sc.stop()
      }

    先看下执行流程图(Yarn-Cluster)

    现在一步一步分析

    第一步

    • 执行 main 方法

    • 初始化 sc

    • 执行到 Action 算子

    这个阶段会产生血缘依赖关系,具体的代码并没有执行

    第二步:DAGScheduler 对上面的 job 切分 stage,stage 产生 task

    DAGScheduler: 先划分阶段 (stage) 再划分任务(task)

    这个时候会产生 Job 的 stage 个数 = 宽依赖的个数 + 1 = 2 (这个地方产生一个宽依赖),也就是产生 shuffle 这个地方

    Job 的 Task 个数 = 一个 stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数(2+2 =4)shuffle 前的 ShuffleStage 产生两个

    shuffle 后 reduceStage 产生两个

    第三步:TaskSchedule 通过 TaskSet 获取 job 的所有 Task,然后序列化分给 Exector

    job 的个数也就是 = Action 算子的个数(这里只一个 collect)

     
       

     

     Idea 去掉提示信息:

    改成 error

    sc

    enableHiveSupport()

    df 编译期不做类型检查,运行期会报错——res16.collect() 时

    DS2DF方便

    DF2DS需要样例类

       
       

    UDF

    UDAF

     

    Dstream的转化操作分为无状态的(stateless)和有状态的(stateful)
    无状态转化:每个批次处理都不依赖于先前批次的数据,如map() filter() reduceByKey()等均属于无状态的
    有状态转化:依赖之前的批次数据或者中间结果来计算当前批次的数据,包括updateStatebyKey()window()
     
    DStream:Discretized Stream 离散化流
     

    程序写完了,开启服务端 nc

    5秒一次,无状态 

    自定义接收器:

    https://spark.apache.org/docs/latest/streaming-custom-receivers.html

    class CustomReceiver(host: String, port: Int)
      extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
    
      def onStart() {
        // Start the thread that receives data over a connection
        new Thread("Socket Receiver") {
          override def run() { receive() }
        }.start()
      }
    
      def onStop() {
        // There is nothing much to do as the thread calling receive()
        // is designed to stop by itself if isStopped() returns false
      }
    
      /** Create a socket connection and receive data until receiver is stopped */
      private def receive() {
        var socket: Socket = null
        var userInput: String = null
        try {
          // Connect to host:port
          socket = new Socket(host, port)
    
          // Until stopped or connection broken continue reading
          val reader = new BufferedReader(
            new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
          userInput = reader.readLine()
          while(!isStopped && userInput != null) {
            store(userInput)
            userInput = reader.readLine()
          }
          reader.close()
          socket.close()
    
          // Restart in an attempt to connect again when server is active again
          restart("Trying to connect again")
        } catch {
          case e: java.net.ConnectException =>
            // restart if could not connect to server
            restart("Error connecting to " + host + ":" + port, e)
          case t: Throwable =>
            // restart if there is any other error
            restart("Error receiving data", t)
        }
      }
    }
     
       

    isStopped()

     

    有状态,必须保存 checkpoint

     低级API,手动维护 offset —— 处理完数据后,再更新 

    zk 


    45/62

    有需要得空详细看

    Spark 性能调优

    由于大部分的 Spark 计算都是在内存中完成的,集群中的任何资源(CPU,网络带宽,或者内存)都可能成 为 Spark 应用程序的瓶颈。最常见的情况是,数据能装进内存,而瓶颈是网络带宽;当然,有时候我们也需 要做一些优化调整来减少内存占用,例如将RDD以序列化格式保存(storing RDDs in serialized form)。 本文将主要涵盖两个主题:1.数据序列化(这对于优化网络性能极为重要);2.减少内存占用以及内存调优。 同时,我们也会提及其他几个比较小的主题。

    数据序列化

    序列化在任何一种分布式应用性能优化时都扮演几位重要的角色。如果序列化格式序列化过程缓慢,或者需要占用字节很多,都会大大拖慢整体的计算效率。通常,序列化都是Spark应用优化时首先需要关注的地方。Spark着眼于要达到便利性(允许你在计算过程中使用任何Java类型)和性能的一个平衡。Spark主要提供了两个序列化库: * Java serialization: 默认情况,Spark使用Java自带的ObjectOutputStream 框架来序列化对象,这样任何实现了 java.io.Serializable 接口的对象,都能被序列化。同时,你还可以通过扩展 java.io.Externalizable 来控制序列化性能。Java序列化很灵活但性能较差,同时序列化后占用的字节数也较多。 * Kryo serialization: Spark还可以使用Kryo 库(版本2)提供更高效的序列化格式。Kryo的序列化速度和字节占用都比Java序列化好很多(通常是10倍左右),但Kryo不支持所有实现了Serializable 接口的类型,它需要你在程序中 register 需要序列化的类型,以得到最佳性能。 要切换到使用 Kryo,你可以在 SparkConf 初始化的时候调用 conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)。这个设置不仅控制各个worker节点之间的混洗数据序列化格式,同时还控制RDD存到磁盘上的序列化格式。目前,Kryo不是默认的序列化格式,因为它需要你在使用前注册需要序列化的类型,不过我们还是建议在对网络敏感的应用场景下使用Kryo。

    Spark对一些常用的Scala核心类型(包括在Twitter chill 库的AllScalaRegistrar中)自动使用Kryo序列化格式。

    如果你的自定义类型需要使用Kryo序列化,可以用 registerKryoClasses 方法先注册:

    val conf = new SparkConf().setMaster(…).setAppName(…) conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) val sc = new SparkContext(conf)

    Kryo的文档(Kryo documentation )中有详细描述了更多的高级选项,如:自定义序列化代码等。

    如果你的对象很大,你可能需要增大 spark.kryoserializer.buffer 配置项(config)。其值至少需要大于最大对象的序列化长度。

    最后,如果你不注册需要序列化的自定义类型,Kryo也能工作,不过每一个对象实例的序列化结果都会包含一份完整的类名,这有点浪费空间。

    内存调优

    调整内存使用有三个注意事项:对象使用的内存量(您可能希望整个数据集适合内存),访问这些对象的成本,以及垃圾收集的开销(如果您有更高的营业额对象的条款)。

    默认情况下,Java对象的访问速度很快,但是与其“字段”中的“原始”数据相比,可以轻松地占用多达2-5倍的空间。这是由于几个原因:

    每个不同的Java对象都有一个“对象头”,大约有16个字节,并包含诸如指向其类的指针等信息。对于数据非常少的对象(比如一个Int字段),这可能比数据大。 Java字符串在原始字符串数据上有大约40字节的开销(因为它们将字符串数据存储在一个字符数组中,并保留额外的数据,如长度),并将每个字符存储为两个字节,这是由于字符串内部使用了UTF-16编码。因此一个10个字符的字符串可以很容易地消耗60个字节 常见的集合类(如HashMap和LinkedList)使用链接的数据结构,每个条目都有一个“包装器”对象(例如Map.Entry)。这个对象不仅有一个头,而且还有指向列表中下一个对象的指针(通常是8个字节)。 基本类型的集合通常将它们存储为“装箱”对象,如java.lang.Integer。 本节将首先概述Spark的内存管理,然后讨论用户可以在他/她的应用程序中更有效地使用内存的具体策略。具体来说,我们将介绍如何确定对象的内存使用情况,以及如何改进它,或者通过更改数据结构,或者以序列化格式存储数据。接下来我们将介绍Spark的缓存大小和Java垃圾收集器。

    内存管理概述

    Spark中的内存使用大部分属于两类:执行和存储。执行内存是指在混洗,连接,排序和聚合中用于计算的内存,而存储内存指的是用于跨群集缓存和传播内部数据的内存。在Spark中,执行和存储共享一个统一的区域(M)。当不使用执行内存时,存储器可以获取所有可用内存,反之亦然。如有必要,执行可以驱逐存储器,但是只有在总存储器内存使用量低于特定阈值(R)时才执行。换句话说,R描述了M内的一个分区,缓存块不会被驱逐。由于执行的复杂性,存储可能不会执行。

    这种设计确保了几个理想的性能首先,不使用缓存的应用程序可以使用整个空间执行,避免不必要的磁盘溢出。其次,使用高速缓存的应用程序可以保留最小的存储空间(R),使数据块不受驱逐。最后,这种方法为各种工作负载提供了合理的开箱即用性能,而不需要用户如何在内部划分内存的专业知识。

    虽然有两种相关配置,但典型用户不需要调整它们,因为默认值适用于大多数工作负载:

    spark.memory.fraction将M的大小表示为(JVM堆空间 - 300MB)的一部分(默认值为0.6)。其余空间(40%)保留给用户数据结构,Spark中的内部元数据,并在稀疏和异常大的记录的情况下防止OOM错误。 spark.memory.storageFraction将R的大小表示为M的一部分(默认为0.5)。 R是M中的存储空间,缓存的块不会被执行驱逐。 应该设置spark.memory.fraction的值,以便在JVM的旧时代或“终身”时代中舒适地适应这种堆空间。有关详细信息,请参阅下面对高级GC调整的讨论。

    确定内存消耗

    调整数据集所需的内存消耗量的最佳方法是创建RDD,将其放入缓存,然后查看Web UI中的“存储”页面。 页面会告诉你RDD占用了多少内存。

    要估计特定对象的内存消耗,请使用SizeEstimator的估计方法。这对于尝试使用不同数据布局来调整内存使用情况以及确定每个执行程序堆中广播变量占用的空间量非常有用。

    调整数据结构

    减少内存消耗的第一种方法是避免增加开销的Java功能,例如基于指针的数据结构和包装对象。 做这件事有很多种方法:

    1、设计你的数据结构来优先选择对象数组和基本类型,而不是标准的Java或Scala集合类(例如HashMap)。 fastutil库为与Java标准库兼容的基本类型提供了方便的集合类。 2、尽可能避免使用大量小对象和指针的嵌套结构。 3、考虑使用数字ID或枚举对象而不是键的字符串。 4、如果RAM少于32 GB,请设置JVM标志-XX:+ UseCompressedOops使指针为4个字节而不是8个。 您可以在spark-env.sh中添加这些选项。

    序列化的 RDD 存储

    如果对象仍然太大,无法进行高效存储,但是使用RDD持久性API中的序列化存储级别(例如MEMORY_ONLY_SER)来减少内存使用的一种更简单的方法是以序列化的形式存储它们。 Spark然后将每个RDD分区存储为一个大字节数组。 以序列化形式存储数据的唯一缺点是访问速度较慢,这是由于必须快速反序列化每个对象。 如果你想以序列化的形式缓存数据,我们强烈推荐使用Kryo,因为它比Java序列化(当然还有原始的Java对象)要小得多。

    垃圾回收调优

    当您的程序存储RDD时,JVM垃圾回收会成为问题。 (在只读RDD的程序中,通常不会出现问题,然后在其上运行很多操作。)当Java需要驱逐旧对象以腾出空间给新对象时,它需要跟踪所有的Java对象并找到未使用的。这里要记住的要点是垃圾收集的成本与Java对象的数量成正比,所以使用较少对象的数据结构(例如Ints数组而不是LinkedList)大大降低了成本。更好的方法是以序列化的形式保存对象,如上所述:现在每个RDD分区只有一个对象(一个字节数组)。在尝试其他技术之前,如果GC是一个问题,首先要尝试使用序列化缓存。

    由于任务的工作内存(运行任务所需的空间量)和缓存在节点上的RDD之间的干扰,GC也可能成为问题。我们将讨论如何控制分配给RDD缓存的空间来缓解这个问题。

    测量GC的影响

    GC调优的第一步是收集垃圾收集发生的频率和GC花费的时间。这可以通过在Java选项中添加-verbose:gc -XX:+ PrintGCDetails -XX:+ PrintGCTimeStam来完成。 (有关将Java选项传递给Spark作业的信息,请参阅配置指南。)下次运行Spark作业时,每次发生垃圾回收时都会在工作人员的日志中看到消息。请注意,这些日志将位于群集的工作节点上(在其工作目录中的stdout文件中),而不是在驱动程序上。

    高级GC调整

    为了进一步调整垃圾收集,我们首先需要了解JVM中有关内存管理的一些基本信息:

    爪哇堆空间分为两个地区的年轻人和老人。年轻一代是为了保存短寿命的物体,而老一代则是为了寿命更长的物体。

    年轻一代进一步分为三个地区[伊甸园,幸存者1,幸存者2]。

    垃圾收集过程的简单描述:当Eden已满时,在Eden上运行一个小型GC,并将从Eden和Survivor1中存活的对象复制到Survivor2。幸存者地区交换。如果一个对象足够旧或者Survivor2已满,则将其移至Old。最后,当Old接近满时,调用完整的GC。

    在Spark中进行GC调优的目标是确保只有长寿命的RDD才被存储在旧一代中,并且Young生成的大小足以存储短期对象。这将有助于避免完整的GC收集任务执行期间创建的临时对象。一些可能有用的步骤是:

    通过收集GC统计信息来检查是否有太多的垃圾回收。如果在任务完成之前多次调用完整的GC,则意味着没有足够的内存可用于执行任务。

    如果有太多次要收集,但没有太多主要地理信息,那么为伊甸园分配更多的内存将会有所帮助。您可以将Eden的大小设置为高估每个任务需要多少内存。如果Eden的大小确定为E,则可以使用选项-Xmn = 4/3 * E来设置Young代的大小。 (增加4/3也是为了解释幸存者地区所使用的空间)。

    在打印的GC统计信息中,如果OldGen接近满,则通过降低spark.memory.fraction来减少用于缓存的内存量;缓存更少的对象比减慢任务执行更好。或者,考虑减少年轻一代的规模。这意味着如果你按照上面的方式设置,则降低-Xmn。如果不是,请尝试更改JVM的NewRatio参数的值。许多JVM默认这个为2,这意味着老一代占2/3的堆。它应该足够大,使得这个分数超过spark.memory.fraction。

    使用-XX:+ UseG1GC试用G1GC垃圾回收器。在某些垃圾收集是瓶颈的情况下,它可以提高性能。请注意,对于较大的执行程序堆大小,使用-XX:G1HeapRegionSize增加G1区大小可能很重要

    例如,如果您的任务正在从HDFS中读取数据,则可以使用从HDFS读取的数据块的大小来估计该任务使用的内存量。请注意,解压缩块的大小通常是块大小的2到3倍。所以如果我们希望有3或4个任务的工作空间,HDFS块大小为128 MB,我们可以估计Eden的大小为4 * 3 * 128MB。

    监视垃圾收集所花费的时间和频率如何随新设置发生变化。

    我们的经验表明,GC调整的效果取决于您的应用程序和可用的内存量。在线描述的调谐选项还有很多,但在较高的层次上,管理全面GC发生的频率有助于减少开销。

    GC调整标志

    其它考虑事项

    并行度

    除非您将每个操作的并行度设置得足够高,否则群集不会被充分利用。 Spark会根据自己的大小(尽管可以通过可选参数控制SparkContext.textFile等)自动设置每个文件上运行的“map”任务的数量,而对于分布式的“reduce”操作,比如groupByKey和reduceByKey, 它使用最大的父RDD的分区数量。 您可以将并行级别作为第二个参数(请参阅spark.PairRDDFunctions文档),或者将config属性设置为spark.default.parallelism以更改默认值。 一般来说,我们建议您的群集中每个CPU核心有2-3个任务。

    Reduce 任务的内存使用

    有时,你会得到一个OutOfMemoryError,不是因为你的RDD不适合内存,而是因为你的一个任务的工作集,比如groupByKey中的一个reduce任务,太大了。 Spark的shuffle操作(sortByKey,groupByKey,reduceByKey,join等)在每个任务中构建一个哈希表来执行分组,这通常会很大。 这里最简单的解决方法是增加并行度,使每个任务的输入集合更小。 Spark能够有效地支持短至200毫秒的任务,因为它可以在一个任务中重复使用一个执行器JVM,并且任务启动成本较低,因此可以安全地将并行级别提高到超过集群内核的数量。

    广播超大变量

    使用SparkContext中可用的广播功能可以大大减少每个序列化任务的大小,以及通过集群启动作业的成本。 如果您的任务使用其中的驱动程序的任何大对象(例如静态查找表),请考虑将其转换为广播变量。 Spark打印每个任务的序列化大小,所以你可以看看,以确定你的任务是否太大; 一般来说大于20KB的任务可能是值得优化的。

    数据本地化

    数据局部性可能会对Spark作业的性能产生重大影响。 如果数据和在其上运行的代码在一起,那么计算就会很快。 但是,如果代码和数据是分开的,就必须转移到另一个。 通常情况下,由于代码大小比数据小得多,所以将数据从一个地方传输到另一个地方比传输数据更快。 Spark围绕这个数据局部性的一般原则构建调度。

    数据局部性是数据与代码的接近程度。 根据数据的当前位置,有几个级别的地点。 从最近到最远的顺序:

    PROCESS_LOCAL 数据与运行代码位于同一个JVM中。 这是最好的地方可能 NODE_LOCAL 数据在同一个节点上。 例子可能在同一个节点上的HDFS中,或者在同一个节点上的另一个执行器上。 这比PROCESS_LOCAL稍慢,因为数据必须在进程之间传输 NO_PREF 数据可以从任何地方以相同的速度访问,并且没有本地偏好 RACK_LOCAL 数据位于同一台服务器上。 数据位于同一机架上的不同服务器上,因此需要通过网络进行发送,通常通过一台交换机进行发送 ANY 数据都在网络上的其他地方,而不在同一个机架上

    Spark更喜欢在最好的地点级别安排所有任务,但这并不总是可能的。 在任何空闲的执行器上没有未处理的数据的情况下,Spark会切换到较低的地点级别。 有两种选择:a)等待一个繁忙的CPU释放,以便在同一台服务器上的数据上启动一个任务;或者b)立即在较远的地方开始一个需要移动数据的新任务。

    Spark通常所做的就是等待繁忙的CPU释放的希望。 一旦超时,它就开始将数据从远处移动到空闲的CPU。 每个级别之间回退的等待超时可以单独配置,也可以全部配置在一个参数中; 有关详细信息,请参阅配置页面上的spark.locality参数。 如果你的任务很长,看到地方不好,你应该增加这些设置,但是默认情况下通常效果不错。

    小结

    这是一个简短的指南,指出调整Spark应用程序时应该了解的主要问题 - 最重要的是数据序列化和内存调整。 对于大多数程序来说,切换到Kryo序列化和以序列化形式保存数据将解决最常见的性能问题。 请随时在Spark邮件列表上询问其他调整最佳实践。

     

    Spark性能调优九之常用算子调优


    z小赵
    2018.11.21 04:28:53字数 1,093阅读 929

            前面介绍了很多关于Spark性能的调优手段,今天来介绍一下Spark性能调优的最后一个点,就是关于Spark中常用算子的调优。废话不多说,直接进入正文;

    1.使用mapPartitions算子提高性能

    mapPartition的优点:使用普通的map操作,假设一个partition中有1万条数据,那么function就要被执行1万次,但是使用mapPartitions操作之后,function仅仅会被执行一次,显然性能得到了很大的提升,这个就没必要在多废话了。

    mapPartition的缺点:使用普通的map操作,调用一次function执行一条数据,不会出现内存不够使用的情况;但是使用mapPartitions操作,很显然,如果数据量太过于大的时候,由于内存有限导致发生OOM,内存溢出。

    总结:通过以上以上优缺点的对比,我们可以得出一个结论;就是在数据量不是很大的情况下使用mapPartition操作,性能可以得到一定的提升,在使用mapPartition前,我们需要预先估计一下每个partition的量和每个executor可以被分配到的内存资源。然后尝试去运行程序,如果程序没有问题就大可放心的使用即可,下图是一个实际的应用例子,仅供参考。

     
    mapPartitions优化

    2.filter操作之后使用coalesce算子提高性能

    先看看默认情况下,执行完filter操作以后的各个partition的情况,如下图所示;

     
    默认的执行流程图

    问题:从上面的图中可以很明显的看出,经过一次filter操作以后,每个partition的数据量不同程度的变少了,这里就出现了一个问题;由于每个partition的数据量不一样,出现了数据倾斜的问题。比如上图中执行filter之后的第一个partition的数据量还有9000条。

    解决方案:针对上述出现的问题,我们可以将filter操作之后的数据进行压缩处理;一方面减少partition的数量,从而减少task的数量;另一方面通过压缩处理之后,尽量让每个partition的数据量差不多,减少数据倾斜情况的出现,从而避免某个task运行速度特别慢。coalesce算子就是针对上述出现的问题的一个解决方案,下图是一个解决案例。

     
    应用实例图

    3.使用foreachPartition算子进行

    默认的foreach对于每一条数据,都要单独调用一次function并创建一个数据库连接,如果数据量很大,对于spark作业是非常消耗性能的。

    而对于foreachPartition来说,对于function函数,只调用一次,只获取一个数据库连接,一次将数据全部写入数据库。但是数据量很大的话,可能会引发OOM的问题。不过在生产环境中一般都是使用foreachPartition(好像说了半天废话)。

    4.使用repartition解决SparkSQL低并行度的问题

    在spark项目中,如果在某些地方使用了SparkSQL,那么使用了SparkSQL的那个stage的并行度就没有办法通过手动设置了,而是由程序自己决定。那么,我们通过什么样的手段来提高这些stage的并行度呢?其实解决这个问题的办法就是使partition的数量增多,从而间接的提高了task的并发度,要提高partition的数量,该怎么做呢?就是使用repartition算子,对SparkSQL查询出来的数据重新进行分区操作,此时可以增加分区的个数。具体使用如下图所示:

     

    总结:关于RDD算子的优化,就先讲到这里。关于整个Spark调优,基本先告一段落,后面会介绍一些Spark源码分析的知识,欢迎关注。

    如需转载,请注明:

    z小赵 Spark性能调优九之常用算子调优

     
     
       

    一、MapPartitions提升Map类型操作性能
    Spark中,每个task处理一个RDD的partition。

    ①MapPartitions的优点
    如果是普通的map,比如一个partition中有一万条数据,那么function需要执行和计算一万次。如果使用了MapPartitions,一个task只执行一次function,function一次接受所有的partition数据。只要执行一次就可以了,性能比较高。

    ②MapPartitions的缺点
    当数据量很大的情况下,比如有一千万条数据。如果使用普通的map,一次只处理一条数据,在处理的过程中,如果内存不足,则会将处理完的数据从内存中垃圾回收掉,或者使用其他方法,清理出内存,程序可以正常运行。如果是MapPartitions操作,一次需要读入所有的数据,内存不足的话也无法腾出空间,会导致内存溢出。

    ③适用场景
    数据量不是很大的时候,内存足够存放一次function操作所需的所有数据。

    二、filter之后使用coalesce减少分区数量
    ①filter只有可能会产生的后果
    每个partition数据量变少了,但是在后面进行处理的时候,还是要跟partition数量一样的task来进行处理,造成task资源的浪费。
    每个partition的数据量不一样,会导致后面的每个task处理每个partition的时候,每个task要处理的数据量就不一样,造成数据的倾斜。
    ②解决方案
    针对第一个问题,因为数据量变少了,那么partition其实也可以相应地变少,可以考虑进行partition的压缩。
    针对第二个问题,解决方案和第一个问题一样,也可以考虑partition的压缩,尽量让每个partition的数据量差不多。这样,后面的task需要处理的数据量就差不多,处理速度差不多,避免数据倾斜带来的问题。
    ③实现
    在代码中,filter算子后面添加.coalesce(numPartitions),比如:

    RDD.filter(XXXXXXXXXXXXXXX).coalesce(100);

    三、repartition解决Spark SQL低并行度的性能问题
    ①问题描述
    通过spark.default.parallelism设置的并行度,对Spark SQL操作不生效,Spark SQL依然使用自身默认的并行度。这种情况下,可能我们的环境中可以使用的cpu core数量为200,手动设置的并行度为600。但是,Spark SQL操作执行后生成的task数量为10,那么后续的所有操作,并行度都是10,而不是我们设置的600,。这样,就会对后续的操作带来性能上的问题。

    ②解决方案
    对Spark SQL操作产生的partition,使用reparation操作,设置并行度。

    ③代码示例
    xxxRDD.repartition(numPartitions);

    四、foreachPartition优化写数据库性能
    ①问题描述
    默认的foreach性能有很大的缺陷:

    对于每条数据,task都要单独执行一次function
    每次执行function的时候,都需要发送SQL语句往数据库写数据,需要创建和销毁数据库连接,而数据库的创建和连接对资源的消耗很大。即使使用数据库连接池,也只是创建有限个数据库连接,很难满足实际需求。
    以上两点,多次数据库连接和多次发送SQL语句,非常消耗资源,影响性能。

    ②使用foreachPartition的好处
    在function函数中,对一个partition的数据进行批量处理,只调用一次function函数,一次传入partition中的所有数据。这样,数据库连接只需要创建一次,想数据库发送一次SQL语句,只是需要传入多组参数即可。

    ③foreachPartition的缺陷
    如果一个partition的数据量特别大,比如达到上百万条,一次性传入,可能会造成OOM内存溢出。

    所以,foreachPartition的适用环境还需要在现实的生产环境中慢慢调试。

    五、reduceByKey本地聚合
    相比较于普通的shuffle,reduceByKey会对map端的数据进行本地聚合。也就是说,map端给下一个stage的每个task创建的输出文件,在写数据之前,会对数据进行一次本地聚合combiner操作。

    好处在于:

    在本地进行聚合以后,在map端的数据量就会变少,减少磁盘IO,减少磁盘空间的占用。
    下一个stage的task,在拉取数据的时候,数据量减少,减少网络传输的性能消耗。
    reduce端拉取的数据减少,需要进行聚合的数据减少,需要的缓存也减少。
    ————————————————

     
              
    (1 条消息)shuffle 原理及调优_大数据_Johnson8702 的博客 - CSDN 博客

    一、原理概述

    ①什么是 shuffle?

    以 reduceByKey 为例,要把分布在集群各个节点上的数据中的同一个 key 对应的 values 集中到一块,集中到集群中同一个节点上。更严格地说,集中到同一个节点的同一个 executor 的 task 中。

    集中同一个 key 对应的 values 之后,数据变成 <key,Iterable<value>>,算子函数对 values 进行 reduce 操作,最后变成 < key,value > 形式的数据。

    每一个 shuffle 的前半部分 stage 的 task,都会创建与下一个 stage 的 task 数量相同的文件。比如下一个 stage 有 100 个 task,那么当前 stage 的每个 task 都会创建 100 个文件。并且,会将同一个 key 对应的 values 写入同一个文件;不同节点上的 task,会将同一个 key 对应的 values,写入下一个 stage 同一个 task 对应的文件中。在这个部分,task 在将数据写入磁盘文件之前,会先写入内存缓存区,内存缓冲区溢满后,再 spill 溢写到磁盘文件中。

    shuffle 的后半部分 stage 的 task,每个 task 都会从各个节点中,上一个 stage 中的 task 为自己创建的文件中,拉取属于自己的 key,value 数据,然后 task 会有一个内存缓冲区,对数据进行汇聚。

    ②有哪些常用的操作会触发 shuffle?

    spark 中,常见的会触发 shuffle 的算子有:groupByKey、reduceByKey、countByKey、join,等等。

    二、合并 map 端输出文件

    在该功能不开启的情况下,如原理所述,当前 stage 的每个 task 都会为下一个 stage 创建和下一个 stage 中 task 数量相同的文件。比如当前 stage 和下一个阶段的 stage 都有 10 个 task,那么当前 stage 的每个 task 会为下一个 stage 创建 10 个文件,一共是 10*10=100 个文件。

    开启该功能后,如果每个 executor 只有两个 cpu core,那么同时运行的 task 数量只有 2 个。这个时候,当前的 stage 中,先运行 2 个 task,每个 task 创建 10 个文件,一共是 2*10=20 个。这 2 个 task 运行完之后,再运行后面的 2 个 task。后面的 2 个 task 运行的时候,不再创建新的文件,而是复用前面 2 个 task 创建的文件。依次类推,直到当前 stage 的 10 个 task 运行结束,一共创建 20 个文件,和之前的 100 个文件相比,数量大大减少,需要写到磁盘的文件数减少,同时下一个 stage 阶段需要去磁盘上读取的文件数也减少,性能得到很大提升。

    该功能默认不开启,可以在代码中使用 new SparkConf().set("spark.shuffle.consolidateFiles","true") 来开启。

    三、调节 map 端内存缓冲和 reduce 端内存占比

    默认情况下,map 端每个 task 内存缓冲的大小是 32kb,reduce 端内存占比是 0.2,即 20%。

    如果需要处理的数据量比较大,比如每个 task 需要处理 320000kb,那么需要向磁盘溢写 320000/32=10000 次,向磁盘溢写文件次数过多,造成大量的磁盘 IO,性能下降。

    reduce 端拉取数据的时候,如果数据量很大,而内存占比很小,在 reduce 端聚合的时候,内存不够用,需要 spill 溢写到磁盘中。向磁盘溢写的数据量越大,后续处理需要从磁盘读取的数据量也越大。这样频繁地发生磁盘 IO,会严重地影响性能。

    根据上述情况,为了减少磁盘 IO,可以调节 map 端缓存区大小和 reduce 端内存占比。

    调节的原则是,先固定一个参数,调整另一个参数,多次销量调节。一个参数调整完之后,再调整另一个参数。

     

    reduce:一边拉取一边聚合

    Spark shuffle是什么

    Shuffle在Spark中即是把父RDD中的KV对按照Key重新分区,从而得到一个新的RDD。也就是说原本同属于父RDD同一个分区的数据需要进入到子RDD的不同的分区。

    现在的spark版本默认使用的是sortshuffle;

    shuffle在哪里产生

    shuffle在spark的算子中产生,也就是运行task的时候才会产生shuffle.

    sortShuffleManager
    spark shuffle的默认计算引擎叫sortshuffleManager,它负责shuffle过程的执行、计算和组件的处理,sortshuffleManager会将task进行shuffle操作时产生的临时磁盘文件合并成一个磁盘文件,在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。


    sortshuffle的内部机制

    1. 数据会根据不同的shuffle算子存储到map数据结构(如reduceByKey)或者array数据结构(join);不过Map是一边聚合,一边写入内存,array是直接写入内存. 当内存达到一个阈值,就会溢出写到磁盘,因此在溢出这个环节会在磁盘上产生多个临时文件,磁盘上的这些文件需要合并,于是spark就有了merge机制.

    2. 在溢写到磁盘之前,在内存中会按照key来排序,排序过后会进入到一个buffer缓冲区,默认为32K,缓冲区的batch默认为1万条key,也就是缓冲区以每次一万条的量写入到磁盘文件中,该缓冲区减少IO,提高性能. 缓冲区和写入磁盘使用的技术是java中的BufferedOutputStream.
    3. merge会将之前产生的所有的临时文件进行合并,包括缓冲区读写到磁盘上的文件,合并成一个大的文件到磁盘,默认为48M,与这个文件相对于的还有一个索引文件,索引文件里面记录的是这个文件的元信息,且这个磁盘文件也是下游stage的Task的输入信息! 注: 一个下游的task对应一个磁盘文件和这个磁盘文件的元信息. 于是就有了血统,继承之类的!

    shuffle当中可能会遇到的问题

    1. 数据量非常大,从其他各台机器收集数据占用大量网络。
    2. 数据如何分类,即如何Partition,Hash、Sort等;
    3. 负载均衡(数据倾斜),因为采用不同的Shuffle方式对数据不同的分类,而分类之后又要跑到具体的节点上计算,如果不恰当的话,很容易产生数据倾斜;
    4. 网络传输效率,需要在压缩和解压缩之间做出权衡,序列化和反序列也是要考虑的问题;

    说明:具体的Task进行计算的时候尽一切最大可能使得数据具备Process Locality的特性;退而求次是增加数据分片,减少每个Task处理的数据量。

    shuffle调优

    shuffle调优分为两种,一种是shuffle参数根据实际情况调优,一种是代码开发调优,代码开发调优我在spark性能调优里面去写!

    1. spark.shuffle.file.buffer(默认值为32K,每次出货1万条)

    该参数是缓冲区的缓冲内存,如果可用的内存资源较为充足的话,可以将缓冲区的值设置大点,这样会较少磁盘IO次数.,如果合理调节该参数,性能会提升1%~5%... 可以设置为64K.
    2. spark.reducer.maxSizeInFlight(默认为48M)

    该参数是stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作,如果合理调节该参数(增大),性能会提升1%~5%...

    3. spark.shuffle.io.maxRetries(默认3次)
    该参数是stage的task向上一个stage的task计算结果拉取数据,也就是上面那个操作,有时候会因为网络异常原因,导致拉取失败,失败时候默认重新拉取三次,三次过还是失败的话作业就执行失败了,根据具体的业务可以考虑将默认值增大,这样可以避免由于JVM的一些原因或者网络不稳定等因素导致的数据拉取失败.也有助于提高spark作业的稳定性. 可以适当的提升重新拉取的次数,最大为60次.

    4. spark.shuffle.io.retryWait(默认为5s)

    该参数和上面一样,是每次拉取数据的间隔时间... 调优建议:建议加大间隔时长(比如20s),以增加shuffle操作的稳定性


    5. spark.shuffle.memoryFraction(默认0.2,也就是20%)
    该参数是数据根据不同的shuffle算子将数据写入内存结构中,内存结构达到阈值会溢出临时文件,这个参数就是则是内存结构的阈值百分比的,不是内存结构的内存大小. 如果内存充足,而且很少使用持久化操作,建议调高这个比例,可以减少频繁对磁盘进行IO操作,合理调节该参数可以将性能提升10%左右.

    6. spark.shuffle.manager(默认sort)
    该参数是设置shuffle的类型,默认是sort,也就是sortshuffleManager, hash参数对应HashShuffleManager, tungsten-sort参数对应tungsten(这个很少用),HashShuffleManager是以前的版本,这个默认就行,
    7. spark.shuffle.sort.bypassMergeThreshold(默认200个)
    该参数是如果shuffle read task的数量小于等于200个的时候,在sortshufflemanager模式下,就会启动ByPass sortshufflemanager...这个调优就这样把 ,默认200挺好的.
    8. spark.shuffle.consolidateFiles(默认为false)
    该参数只对HashshuffleManager有效,而HashshuffleManager是spark1.2之前默认使用的版本...

     
    (1 条消息)Spark 性能调优 ---JVM 调优之原理概述及降低 cache 操作的内存占比_大数据_Johnson8702 的博客 - CSDN 博客

    一、原理概述

    ①理论

    Spark 是用 Scala 开发的。Spark 的 Scala 代码调用了很多 java api。Scala 也是运行在 JVM 中的,所以 Spark 也是运行在 JVM 中的。

    ②JVM 可能会产生什么样的问题?

    内存不足——RDD 的缓存、task 运行定义的算子函数,可能会创建很多对象,占用大量的内存。处理不当,可能导致 JVM 出问题。

    ③堆内存

    1. 作用:存放项目中创建的对象。
    2. 划分:新生代(young generation,Eden 区域 + survivor 区域 1+survivor 区域 2,比例 8:1:1),老年代(old generation)

    ④GC(垃圾回收)

    每次创建出来的对象,都会放到 Eden 区域和 survivor 区域 1 中,另外一个 survivor 区域空闲。

    由于 spark 作业产生的对象过多,当 Eden 区域和 survivor 区域放满之后,就会触发 minor gc(初代回收)。把不再使用的对象从内存中清理出去,给后面对象的创建腾出空间。

    清理掉了不再使用的对象之后,那些存活下来(还需要继续使用)的对象,放入之前空闲的 survivor 区域 2 中。当 survivor 区域 2 满了放不下,JVM 会通过担保机制机制将多余的对象直接放到老年代中。

    如果 JVM 内存不够大,可能导致频繁的新生代内存溢出,频繁的 minor gc频繁的 minor gc 会导致短时间内,有些存活下来的对象,经过多次垃圾回收都没有回收掉,导致这种生命周期短(不一定会长期使用)的对象,年龄过大,进入老年代。

    老年代中存在过多的短生命周期的、本该在新生代中可能马上要被回收的对象,导致内存不足,频繁内存满溢,频繁进行 full gc(老年代回收)。full gc 会回收老年代中的对象。由于老年代中的对象数量少,满溢进行的 full gc 频率本应该很少,所以回收算法很简单,但是耗费性能和时间。——full gc 很耗时间。

    full gc/minor gc,无论快慢,都会导致 JVM 工作线程停止工作,spark 作业会暂停,等待垃圾回收完成之后继续工作。

    ⑤总结 --- 内存不足导致的问题

    1. 频繁 minor gc,导致 spark 频繁停止工作
    2. 老年代囤积大量活跃对象(短生命周期的对象),导致频繁 full gc,full gc 时间很长,短则数十秒,长则数十分钟,甚至数小时,导致 spark 长时间停止作业
    3. 严重影响 spark 作业的性能和运行的速度

    二、降低 cache 操作的内存占比

    spark 中,堆内存被划分为两块,一块专门用来给 RDD 的 cache、persist 操作进行数据缓存使用;一块用来给 spark 算子函数的运行使用,存放函数自己创建的对象。

    默认情况下,给 RDD cache 操作的内存占比是 0.6,但是可能 cache 真正需要使用的内存不需要这么多,而存储 spark 算子函数创建对象需要大量的内存,这个时候可以调节这个参数,比如 0.5、0.3、0.2 等等。

    用 yarn 运行 spark 的时候,通过 yarn 页面可以查看 spark 作业的运行统计,一层一层点进去,可以看到每个 stage 的运行情况,包括每个 task 的运行时间、gc 时间等。根据情况,如果 gc 频繁,时间太长,可以适当地调整这个比例,反复测试,直到调整到一个合适的比例。

    在 spark 的代码中设置示例:SparkConf.set("spark.storage.memoryFraction","0.6")

     
    (1 条消息)Spark 性能调优 ---JVM 调优之调节 executor 堆外内存与连接等待时长_大数据_Johnson8702 的博客 - CSDN 博客

    一、executor 堆外内存

    ①理论

    在实际项目中,有时候需要处理大量的数据,比如上亿、数十亿条数据,发现项目时不时地报错:shuffle file not found,executor lost,task lost,out of memory 等等。

    之所以出现上述问题,可能是因为 executor 的堆外内存不足,导致 executor 在运行的过程中,内存溢出。后续 stage 的 task 在运行的时候,可能要从之前的 executor 中拉取 shuffle map output file 文件,此时 executor 已经挂掉,关联的 BlockManager 也挂掉了,找不到文件。此时可能会报 shuffle output file not found,resubmitting task,executor lost,导致 spark 作业彻底崩溃。

    出现这种情况,可以调节一下 executor 的堆外内存,这样可以避免报错,也可能会带来性能的提升。

    ②参数调节

    --conf spark.yarn.executor.memoryOverhead=2048

    这个参数是在是 spark-submit 脚本里面设置,不是在 spark 代码里面设置。

    默认情况下,堆外内存的大小是 300M,但通常会影响 spark 作业的运行,需要调节这个参数到 1024M、2048M 或者更大。

    调节完这个参数,可以避免某些 JVM OOM(内存溢出)的问题,同时使整个 spark 作业的性能得到很大的提升。

    二、连接等待时长

    ①理论

    executor 在读取数据时,优先从本地关联的 BlockManager 中读取。读取不到时,会通过 TransferService 去远程连接其他节点的 executor 的 BlockManager 读取数据。

    这个时候,如果碰到 executor 的 JVM 在进行垃圾回收或其他操作,建立连接失败读取不到数据时,程序会卡住,并进入等待状态。spark 默认的网络连接的超时时长为 60s,60s 后如果还是无法连接,则宣告失败。此时会报错。几次尝试之后,还是获取不到数据,会导致 spark 作业奔溃,也可能导致 DAGSchedule 反复提交几次 stage,TaskSchedule 反复提交几次 task,这样会大大延长 spark 作业的时间。

    此时,可以考虑调节连接的超时时长,比如设置成 600 秒:

    --conf spark.core.connection.ack.wait.timeout=600

     
    (1 条消息)SparkStreaming 性能调优_大数据_Johnson8702 的博客 - CSDN 博客

    一、数据接收并行度调优——创建更多的输入 DStream 和 Receiver

    通过网络接收数据时(比如 Kafka,Flume),会将数据反序列化,并存储在 Spark 的内存中。如果数据接收成为系统的瓶颈,可以考虑并行化数据接收。每个输入 DStream 都会在某个 Worker 的 Executor 上启动一个 Receiver,该 Receiver 接收一个数据流。因此可以通过创建多个输入 DStream,并配置它们接收数据源不同的分区数据,达到接收多个数据流的效果。

    比如,一个接收两个 Kafka Topic 的输入 DStream,可以拆分成两个输入 DStream,每个分别接收一个 topic 的数据。这样就会创建两个 Receiver,从而并行地接收数据,提高吞吐量。多个 DStream 可以使用 union 算子进行合并,从而形成一个 DStream。后续的算子操作只需要针对合并之后的 DSream 即可。

    代码示例:

    int numStreams = 5;

    List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(numStreams);

    for (int i = 0; i < numStreams; i++) {

      kafkaStreams.add(KafkaUtils.createStream(...));

    }

    JavaPairDStream<String, String> unifiedDStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));

    unifiedDStream.print();

    二、数据接收并行度调优——调节 block interval

    数据接收并行度调优,除了创建更多输入 DStream 和 Receiver 以外,还可以调节 block interval。通过参数 spark.streaming.blockInterval,可以设置 block interval,默认是 200ms。

    对于大多数 Receiver 而言,在将接收到的数据保存到 Spark 的 BlockManager 之前,都会将数据切分成一个一个的 block。每个 batch 的 block 数量,决定了该 batch 对应的 RDD 的 partition 的数量,以及针对该 RDD 执行 transformation 操作时创建的 task 数量。每个 batch 对应的 task 的数量可以大约估算出来,即 batch interval / block interval。

    比如,batch interval 为 1s,block interval 为 100ms,则会创建 10 个 task。如果每个 batch 的 task 数量太少,即低于每台机器的 CPU Core,说明 batch 的 task 数量偏少,导致所有的 CPU 资源没有被完全利用起来。此时应该为 batch 增加 block 的数量,需要减小 block interval。

    但是,需要注意的是,推荐的 block interval 的最小值为 50ms,如果低于这个值,那么大量的 task 的启动时间可能会变成性能的一个开销。

    三、数据接收并行度调优——输入流数据重分区

    使用 inputStream.repartition(<number of partitions>),将接收到的 batch,分不到指定数量的机器上,然后进行后续操作。

    四、任务启动调度

    如果每秒钟启动的 task 过多,比如每秒启动 50 个,100 个,那么发送这些 task 去 Worker 节点上的 Executor 的性能开销将会大大增加,可以使用下述操作减少这方面的性能开销:

    1. Task 序列化:使用 Kryo 序列化机制来序列化 task,减小 task 的大小,从而减少发送这些 task 到各个 Worker 节点上的 Executor 的时间
    2. 执行模式:在 Standalone 模式下运行 Spark,可以达到更少的 task 启动时间

    五、数据处理并行度调优

    如果在计算的任何 stage 中使用的并行 task 的数量没有足够多,那么集群资源是无法被充分利用的。

    举例来说,对于分布式的 reduce 操作,比如 reduceByKey 和 reduceByKeyAndWindow,默认的并行 task 的数量是由 spark.default.parallelism 参数决定的。也可以在 reduceByKey 等操作中,传入第二个参数,手动指定该操作的并行度,也可以调节全局的 spark.default.parallelism 参数。

    六、数据序列化调优

    数据序列化造成的系统开销可以由序列化格式的优化来减小。在流式计算的场景下,由两种类型的数据需要优化:

    1. 输入数据:默认情况下,接收到的输入数据,是存储在 Executor 的内存中的,使用的持久化级别是 StorageLevel.MEMORY_AND_SER_2。这意味着,数据被序列化为字节流从而减小 GC 开销,并且会复制以进行 executor 失败的容错。因此,数据首先会存储在内存中,然后在内存不足时会溢写到磁盘上,从而为流式计算来保存所有需要的数据。这里的序列化有明显的性能开销——Receiver 必须反序列化从网络接收到的数据,然后再使用 Spark 的序列化格式序列化数据。
    2. 流式计算操作生成的持久化 RDD:流式计算操作生成的持久化 RDD,可能会持久化到内存中。例如,窗口操作默认就会将数据持久化在内存中,因为这些数据后面可能会在多个窗口中使用,并被处理多次。然而,不像 Spark Core 的默认持久化级别,StorageLevel.MEMORY_ONLY,流式计算操作生成的 RDD 的默认持久化级别是 StorageLevel.MEMORY_ONLY_SER,默认就会减小 GC 开销。

    在上述的两个场景中,使用 Kyro 序列化类库可以减小 CPU 和内存的性能开销。使用 Kyro 时,一定要考虑注册自定义的类,并且禁用对应引用的 tracking(spark.kyro.referenceTracking)。

    在一些特殊的场景中,比如需要为流式应用保持的数据总量并不是很多,也许可以将数据以非序列化的方式进行持久化,从而减少序列化和反序列化的 CPOU 开销,而且又不会有太昂贵的 GC 开销。举例来说,如果设置的 batch interval,并且没有使用 window 操作,那么可以通过显式地设置持久化级别,来禁止持久化对数据进行序列化。这样就可以减少用于序列化和反序列化的 CPU 性能开销,并且不用承担太多的 GC 开销。

    七、batch interval 调优(最重要)

    如果想让一个运行在集群上的 Spark Streaming 应用程序可以稳定,就必须尽可能快地处理接收到的数据。换句话说,batch 应该在生成之后,尽可能快地处理掉。对于一个应用来说,可以通过观察 Spark UI 上的 batch 处理时间来判断 batch interval 的设置是否合适。batch 处理的时间必须小于等于 batch interval 的值。

    给予流式计算的本质,在固定集群资源条件下,应用能保持的数据接收速率,batch interval 的设置会有巨大的影响。例如,在 WordCount 例子中,对于一个特定的数据接收速率,应用业务可以保证每 2 秒打印一次单词计数,而不是每 500ms。因此 batch interval 需要设置,让预期的数据接收速率可以在生产环境中保持住。

    为应用计算合适的 batch 大小,比较好的方法是先设置一个很保守的 batch interval,比如 5s~10s,以很慢的数据接收速率进行测试。要检查应用是否跟得上这个数据速率,可以检查每个 batch 的处理时间的延迟,如果处理时间与 batch interval 基本吻合,那么应用就是稳定的。否则,如果 batch 调度的延迟持续增长,那么久意味着应用无法跟得上这个速率,就是不稳定的。此时可以提升数据处理的速度,或者增加 batch interval,以保证应用的稳定。

    注意,由于临时性的数据增长导致的暂时的延迟增长是合理的,只要延迟情况可以在短时间内回复即可。

    八、内存调优——内存资源

    Spark Streaming 应用需要的集群内存资源,是由使用的 transformation 操作类型决定的。举例来说,如果想要使用一个窗口长度为 10 分钟的 window 操作,那么集群就必须有足够的内存来保存 10 分钟内的数据。如果想要使用 uodateStateByKey 来维护许多 key 的 state,那么内存资源就必须足够大。反过来说,如果想要做一个简单的 map-filter-store 操作,那么需要使用的内存就很少。

    通常来说,通过 Receiver 接收到的数据,会使用 StorageLevel.MEMPRY_AND_DISK_SER_2 持久化级别来进行存储,因此无法保存在内存中的数据就会溢写到磁盘上。而溢写到磁盘上,会降低应用的性能。因此,通常的建议是为应用提供它需要的足够的内存资源。

    (建议在一个小规模的场景下测试内存的使用量,并进行评估)

    九、内存调优——垃圾回收

    内存调优的另一个方面是垃圾回收。对于流式应用来说,如果要获得低延迟,肯定不能有因为 JVM 垃圾回收导致的长时间延迟。有很多参数可以帮助降低内存使用和 GC 开销:

    1. DStream 的持久化:正如在 “数据序列化调优” 一节中提到的,输入数据和某些操作产生的中间 RDD,默认持久化时都会序列化为字节。与非序列化的方式相比,这会降低内存和 GC 开销。使用 Kyro 序列化机制可以进一步减少内存使用和 GC 开销。进一步降低内存使用率,可以对数据进行压缩,由 spark.rdd.compress 参数控制(默认 false)
    2. 清理旧数据:默认情况下,所有输入数据和通过 DStream transformation 操作生成的持久化 RDD,会自动被清理。Spark Streaming 会决定何时清理这些数据,取决于 transformation 操作类型。例如,在使用窗口长度为 10 分钟的 window 操作,Spark 会保持 10 分钟以内的数据,时间过了以后就会清理旧数据。但是在某些特定的场景下,比如 Spark SQL 和 Spark Streaming 整合使用时,在异步开启的线程中,使用 Spark SQL 针对 batch RDD 进行执行查询。那么就需要让 Spark 保存更长时间的数据,直到 Spark SQL 查询结束。可以使用 streamingContext.remember() 方法来实现。
    3. CMS 垃圾回收器:使用并行化的 mark-sweep 垃圾回收机制,被推荐使用,用来保持 GC 低开销。虽然并行的 GC 会降低吞吐量,但是还是建议使用它,来减少 batch 的处理时间(降低处理过程中的 gc 开销)。如果要使用,那么要在 driver 端和 executor 端都开启。在 spark-submit 中使用 --driver-java-options 设置;使用 spark.executor.extraJavaOptions 参数设置。-XX:+UseConcMarkSweppGC。
     
     
       
    (1 条消息) 数据倾斜解决方案之原理以及现象分析_Johnson8702 的博客 - CSDN 博客

    一、什么是数据倾斜

    数据倾斜是大数据类型项目中最棘手的性能问题。数据倾斜一般会有两种表现:

    1. 大部分 task 执行很快,只有极少数的 task 执行得特别慢,可能耗费数个小时。

    2. 大部分 task 很快执行完,但有的 task 会突然报 OOM 内存溢出,导致 task fail、task lost、resubmitting task,反复执行几次,特定的 task 还是跑不通,最后挂掉。可能是因为这个 task 需要处理大量的数据,创建大量的对象,内存放不下,直接爆掉。

    二、原因分析

    在执行 shuffle 操作的时候,比如说有 3 个 task,100 万数据。某一个 key 对应 98 万个 values,分配到同一个 task;另外两个 task,每个 task 分配到 1 万个 values,并且是对应不同的 key。

    这个时候,分别分配到 1 万个 values 的 task 很快执行完,但是分配到 98 万 values 的 task 执行起来特别慢,导致整个 spark 作业的运行时间特别长。

    三、定位原因与出现问题的位置

    出现数据倾斜,基本只可能是因为发生了 shuffle 操作,在 shuffle 的过程中,某个 key 对应的数据远远多于其他 key 对应的数据,导致数据倾斜。这种情况下,可以通过两种方式去定位:

    1. 到程序中去找,哪些地方用了会产生 shuffle 的算子,比如 groupByKey、countByKey、reduceByKey、join
    2. 看 log 文件,一般会报程序的哪一行代码导致了 OOM 异常;或者执行到了第几个 stage

    下面开始介绍几个数据倾斜的解决方案:

    四、聚合源数据和过滤导致数据倾斜的 key

    ①聚合源数据

    通常,在做一些聚合操作时,比如 groupByKey、reduceByKey、join,拿到的源数据都是每个 key 对应的 values,然后在 spark 作业中对数据进行一定的操作。

    如果这些数据来自 hive 表,可以在 hive 表中对数据进行 ETL,对数据进行一定的处理。由于 hive 比较适合做离线分析,可以做定时任务,定时对数据进行聚合(也可能是初步计算)如果按 key 来分组,将 key 对应的所有 values,用特定的方式拼接到一个字符串中去,比如 “key=sessionid,value:action_seq=1|user_id=1|search_keyword = 电子 | category_id=01;action_seq=2|user_id=1|search_keyword = 书籍 | category_id=01”。

    在 hive 表中对 key 进行聚合后,当需要对 key 进行操作时,spark 中通过 key 拿到的数据就是 hive 表聚合过的数据。可能 hive 表进行聚合前是 10 万条数据,聚合后就变成了 1 千条数据,解决了数据倾斜的问题。

    ②过滤导致数据倾斜的 key

    对于待处理数据,可能大多数 key 对应的数据都是几十、几百条,只有极少数 key 对应的数据是数十万条。

    这个时候,根据业务需求,如果允许的情况下,可以将那极少数的 key 对应的数据舍弃掉,避免数据倾斜。

    如果上面两个方法可以采纳,那么基本上可以从源头上解决绝大部分数据倾斜问题。实在不行,再尝试下面几个方法。

    五、提高 shuffle 操作 reduce 并行度

    提高 reduce 并行度,就是提高 task 的数量。

    举个例子,原本某个 task 分配数据特别多,直接 OOM(内存溢出),导致程序无法运行。根据 log,找到发生数据倾斜的 shuffle 操作,提高并行度(在算子后面传入一个参数,即可设置并行度,比如 reduceByKey(xxx,1000)),让数据尽量均匀地分配到所有的 task 中。每个 task 分配到的数据量变少,避免 OOM,缓解数据倾斜的问题。

    注意:

    这种方法并没有从根本上解决数据倾斜的问题,只是缓解了数据倾斜带来的影响。

    在理想状态下,如果这步操作之后,数据倾斜的问题得到解决,最好;如果对数据倾斜问题的影响很小,则不考虑这种方法,参考其他方法。

    六、使用随机 key 实现双重聚合

    在原始 key 前面加一个随机数(不同场景灵活应对),生成双重 key,然后进行第一轮聚合;第一轮聚合之后,再将双重 key 反向映射成原始 key,进行第二轮聚合。

    在 groupByKey、reduceByKey 等算子中,可以采取这个方法。

    如果前面的几种方法(聚合源数据和过滤导致数据倾斜的 key、提高 shuffle 操作 reduce 并行度)无法解决数据倾斜问题,则只能依靠这种方法

    七、将 reduce join 转换为 map join

    普通 join,需要经过 shuffle 操作,是 reduce join,先将所有相同 key 对应的 values 汇聚到一个 task 中,然后再进行 join。

    map join,把小的 RDD 做成广播变量,再进行 map 操作,得到聚合后的 RDD,最后再进行后续操作

    ①适用场景

    如果两个 RDD 要进行 join,其中一个 RDD 比较小(比如一个 RDD 是 100 万数据,另一个 RDD 是 1 万数据),可以采取这种方法。广播出去的 RDD,在每个 executor 的 block manager 上都会有一个数据备份,所以采取这种方法的时候,要确保内存足够存放小 RDD 的数据。

    这个方式下,不会发生 shuffle 操作,从根本上杜绝了数据倾斜的问题。

    ②不使用场景

    两个 RDD 都比较大,如果将一个 RDD 广播出去,每个 executor 上都会有一个数据备份,可能会导致 OOM(内存溢出)。

    ③总结

    对于 join 这种操作,即使没有数据倾斜,在条件允许的情况下,也优先使用 map join 方法,避免通过 shuffle 进行 join,牺牲少量的内存来换取更好的性能

    八、sample 采样倾斜 key 进行两次 join

    将发生数据倾斜的 key 单独去出来,放到一个 RDD 中,将原始 RDD 拆分成两个 RDD:导致倾斜的 key 的 RDD 和其他 RDD。用这两个 RDD 单独 join,此时 key 对应的数据会分散到多个 task 中进行 join 操作,避免数据倾斜造成的问题,然后将两次 join 的结果执行 union 操作合并起来。

    --> 可以通过自动化的方式进行~

    ①适用场景

    对于 join 操作,优先使用 map join 替代普通 join。如果不能采用 map join,并且某一个或几个 key 对应的数据量比较大,考虑使用 sample 采样。

    ②不适用场景

    如果一个 RDD 中,导致数据倾斜的 key 特别多,此时不建议使用 sample 采样的方法,建议考虑下面第九种方法(sample 采样倾斜 key 进行两次 join)。

    ③优化

    对于采样的 key,从另一个要 join 的表中也过滤出一份数据,比如就只有一条数据。对这一个数据,进行 flatMap 操作,打上 100 个随机数作为前缀,返回 100 条数据。

    单独拉出来的可能产生数据倾斜的 RDD,给每一个数据都带上一个 100 以内的随机数最为前缀。

    然后在进行 join,整个过程会被打散到不同的 task 中运行,性能会提高很多。最后,再将结果 union 合并。

    九、使用随机数以及扩容表进行 join

    当前面几个方法都没法使用时,可以尝试使用随机数以及扩容表进行 join。这个方法没有办法彻底解决数据倾斜,只是一种对数据倾斜的缓解。

    步骤:

    1. 选择一个 RDD,用 flatMap 进行扩容,将一条数据映射成多条数据,每一条映射出来的数据都到一个 n 以内的随机数,比如说 n 取 10。
    2. 将另一个 RDD,做普通的 map 映射,每条数据,前面带上一个 n 以内的随机数。
    3. 将两个处理后的 RDD 进行 join 操作,并将结果数据反映射,去掉前面的随机数。

    局限性:

    1. 两个 RDD 都很大,所以没有办法将某一个 RDD 扩得特别大,一般取 10。
    2. 扩容的倍数比较小,没法彻底解决数据倾斜的问题,只能缓解数据倾斜带来的问题。
     
    (1 条消息)Spark SQL 数据倾斜解决方案_大数据_Johnson8702 的博客 - CSDN 博客

    由于 Spark 都是基于 RDD 的特性,所以可以用纯 RDD 的方法,实现和 Spark SQL 一模一样的功能。

    之前在 Spark Core 中的数据倾斜的七种解决方案,全部都可以直接套用在 Spark SQL 上。

    Spark SQL 的数据倾斜解决方案:

    1. 聚合源数据:Spark Core 和 Spark SQL 没有任何区别
    2. 过滤导致倾斜的 key:在 sql 中用 where 条件
    3. 提高 shuffle 并行度:groupByKey(1000),spark.sql.shuffle.partitions(默认是 200)
    4. 双重 groupBy:改写 SQL,两次 groupBy
    5. reduce join 转换为 map join:spark.sql.autoBroadcastJoinThreshold(默认是 10485760);可以自己将表做成 RDD,自己手动去实现 map join;SparkSQL 内置的 map join,默认如果有一个 10M 以内的小表,会将该表进行 broadcast,然后执行 map join;调节这个阈值,比如调节到 20M、50M、甚至 1G。
    6. 采样倾斜 key 并单独进行 join:纯 Spark Core 的一种方式,sample、filter 等算子
    7. 随机 key 与扩容表:Spark SQL+Spark Core
     
  • 相关阅读:
    yii中设置提示成功信息,错误提示信息,警告信息
    关于京东评价数目,淘宝评价数目延迟加载
    关于yii的relations
    关于yii验证和yii错误处理
    jquery资源网站
    Facybox弹出层效果
    YII适合做后台的一个扩展
    Yii: 参数检查和错误的集中处理技巧
    Asp.net 备份、还原Ms SQLServer及压缩Access数据库
    天诺网
  • 原文地址:https://www.cnblogs.com/cx2016/p/13026470.html
Copyright © 2011-2022 走看看