https://www.bilibili.com/video/BV1E4411i771?p=23
23/62
- P101_Spark课程介绍
- P202_Spark是什么
- P303_Spark角色
- P404_Spark集群安装
- P505_Spark测试
- P606_Spark测试总结
- P707_历史服务配置
- P808_SparkHA安装
- P909_SparkYarn模式安装&提交任务解析
- P1010_WordCount过程解析
- P1111_IDEA中编写WordCount程序并打包测试
- P1212_IDEA中本地调试
- P1313_IDEA中远程调试
- P1414_RDD是什么
- P1515_本地文件系统Bug&RDD懒执行说明
- P1616_RDD的创建
- P1717_RDD转换(1-3)
- P1801_回顾
- P1902_转换操作(4-9)
- P2003_转换操作(10-13)
- P2103_转换操作(14-18)
- P2204_转换操作(19-22)
- P2406_转换操作(24)
- P2507_转换操作(25-27)
- P2608_行动操作
- P2709_任务划分
- P2810_练习分析
- P2911_缓存操作
- P3002_分区
- P3103_RDD的输入输出
- P3204_累加器
- P3305_广播变量
- P3406_SparkSQL概述
- P3507_SparkSQL牛刀小试
- P3608_RDD_DF_DS之间的相互转化
- P3709_用户自定义函数
- P3810_与hive集成
- P3902_SparkStreaming概述
- P4003_SparkStreaming测试
- P4104_自定义接收器
- P4205_有状态转换(一)
- P4306_有状态转换(二)
- P4407_kafka_Spark集成介绍
- P4508_KafkaStreaming(一)
- P4609_生产者池封装
- P4710_KafkaStreaming(二)
- P4801_Spark内核概述与部署模式解析_rec
- P4902_Spark通讯架构_rec
- P5003_Spark任务调度解析01_rec
- P5104_Spark任务调度解析02_rec
- P5205_Spark Shuffle_rec
- P5306_Spark内存架构解析01_rec
- P5407_Spark内存架构解析02_rec
- P5508_Spark核心组件解析_rec
- P5601_Spark性能调优之常规性能调优_rec
- P5702_Spark性能调优之算子调优_rec
- P5803_Spark性能调优之Shuffle性能调优_rec
- P5904_Spark性能调优之JVM调优_rec
- P6005_Spark性能调优之数据倾斜解决方案01_rec
- P6106_Spark性能调优之数据倾斜解决方案02_rec
- P6207_Spark性能调优之TroubleShooting_rec
join
cogroup
...
上面的 (a,b) 里面的 a 是(x,1)。。。
也就是下面的 V=>C 的 C
byKey
Idea 去掉提示信息:
改成 error
sc
enableHiveSupport()
df 编译期不做类型检查,运行期会报错——res16.collect() 时
DS2DF方便
DF2DS需要样例类
UDF
UDAF
无状态转化:每个批次处理都不依赖于先前批次的数据,如map() filter() reduceByKey()等均属于无状态的
有状态转化:依赖之前的批次数据或者中间结果来计算当前批次的数据,包括updateStatebyKey()和window()

程序写完了,开启服务端 nc
5秒一次,无状态
自定义接收器:
https://spark.apache.org/docs/latest/streaming-custom-receivers.html
|
|
isStopped()
有状态,必须保存 checkpoint
低级API,手动维护 offset —— 处理完数据后,再更新
zk
45/62
- P101_Spark课程介绍
- P202_Spark是什么
- P303_Spark角色
- P404_Spark集群安装
- P505_Spark测试
- P606_Spark测试总结
- P707_历史服务配置
- P808_SparkHA安装
- P909_SparkYarn模式安装&提交任务解析
- P1010_WordCount过程解析
- P1111_IDEA中编写WordCount程序并打包测试
- P1212_IDEA中本地调试
- P1313_IDEA中远程调试
- P1414_RDD是什么
- P1515_本地文件系统Bug&RDD懒执行说明
- P1616_RDD的创建
- P1717_RDD转换(1-3)
- P1801_回顾
- P1902_转换操作(4-9)
- P2003_转换操作(10-13)
- P2103_转换操作(14-18)
- P2204_转换操作(19-22)
- P2305_转换操作(23)
- P2406_转换操作(24)
- P2507_转换操作(25-27)
- P2608_行动操作
- P2709_任务划分
- P2810_练习分析
- P2911_缓存操作
- P3002_分区
- P3103_RDD的输入输出
- P3204_累加器
- P3305_广播变量
- P3406_SparkSQL概述
- P3507_SparkSQL牛刀小试
- P3608_RDD_DF_DS之间的相互转化
- P3709_用户自定义函数
- P3810_与hive集成
- P3902_SparkStreaming概述
- P4003_SparkStreaming测试
- P4104_自定义接收器
- P4205_有状态转换(一)
- P4306_有状态转换(二)
- P4407_kafka_Spark集成介绍
- P4609_生产者池封装
- P4710_KafkaStreaming(二)
- P4801_Spark内核概述与部署模式解析_rec
- P4902_Spark通讯架构_rec
- P5003_Spark任务调度解析01_rec
- P5104_Spark任务调度解析02_rec
- P5205_Spark Shuffle_rec
- P5306_Spark内存架构解析01_rec
- P5407_Spark内存架构解析02_rec
- P5508_Spark核心组件解析_rec
- P5601_Spark性能调优之常规性能调优_rec
- P5702_Spark性能调优之算子调优_rec
- P5803_Spark性能调优之Shuffle性能调优_rec
- P5904_Spark性能调优之JVM调优_rec
- P6005_Spark性能调优之数据倾斜解决方案01_rec
- P6106_Spark性能调优之数据倾斜解决方案02_rec
- P6207_Spark性能调优之TroubleShooting_rec
有需要得空详细看
Spark 性能调优由于大部分的 Spark 计算都是在内存中完成的,集群中的任何资源(CPU,网络带宽,或者内存)都可能成 为 Spark 应用程序的瓶颈。最常见的情况是,数据能装进内存,而瓶颈是网络带宽;当然,有时候我们也需 要做一些优化调整来减少内存占用,例如将RDD以序列化格式保存(storing RDDs in serialized form)。 本文将主要涵盖两个主题:1.数据序列化(这对于优化网络性能极为重要);2.减少内存占用以及内存调优。 同时,我们也会提及其他几个比较小的主题。 数据序列化序列化在任何一种分布式应用性能优化时都扮演几位重要的角色。如果序列化格式序列化过程缓慢,或者需要占用字节很多,都会大大拖慢整体的计算效率。通常,序列化都是Spark应用优化时首先需要关注的地方。Spark着眼于要达到便利性(允许你在计算过程中使用任何Java类型)和性能的一个平衡。Spark主要提供了两个序列化库: * Java serialization: 默认情况,Spark使用Java自带的ObjectOutputStream 框架来序列化对象,这样任何实现了 java.io.Serializable 接口的对象,都能被序列化。同时,你还可以通过扩展 java.io.Externalizable 来控制序列化性能。Java序列化很灵活但性能较差,同时序列化后占用的字节数也较多。 * Kryo serialization: Spark还可以使用Kryo 库(版本2)提供更高效的序列化格式。Kryo的序列化速度和字节占用都比Java序列化好很多(通常是10倍左右),但Kryo不支持所有实现了Serializable 接口的类型,它需要你在程序中 register 需要序列化的类型,以得到最佳性能。 要切换到使用 Kryo,你可以在 SparkConf 初始化的时候调用 conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)。这个设置不仅控制各个worker节点之间的混洗数据序列化格式,同时还控制RDD存到磁盘上的序列化格式。目前,Kryo不是默认的序列化格式,因为它需要你在使用前注册需要序列化的类型,不过我们还是建议在对网络敏感的应用场景下使用Kryo。 Spark对一些常用的Scala核心类型(包括在Twitter chill 库的AllScalaRegistrar中)自动使用Kryo序列化格式。 如果你的自定义类型需要使用Kryo序列化,可以用 registerKryoClasses 方法先注册: val conf = new SparkConf().setMaster(…).setAppName(…) conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) val sc = new SparkContext(conf) Kryo的文档(Kryo documentation )中有详细描述了更多的高级选项,如:自定义序列化代码等。 如果你的对象很大,你可能需要增大 spark.kryoserializer.buffer 配置项(config)。其值至少需要大于最大对象的序列化长度。 最后,如果你不注册需要序列化的自定义类型,Kryo也能工作,不过每一个对象实例的序列化结果都会包含一份完整的类名,这有点浪费空间。 内存调优调整内存使用有三个注意事项:对象使用的内存量(您可能希望整个数据集适合内存),访问这些对象的成本,以及垃圾收集的开销(如果您有更高的营业额对象的条款)。 默认情况下,Java对象的访问速度很快,但是与其“字段”中的“原始”数据相比,可以轻松地占用多达2-5倍的空间。这是由于几个原因: 每个不同的Java对象都有一个“对象头”,大约有16个字节,并包含诸如指向其类的指针等信息。对于数据非常少的对象(比如一个Int字段),这可能比数据大。 Java字符串在原始字符串数据上有大约40字节的开销(因为它们将字符串数据存储在一个字符数组中,并保留额外的数据,如长度),并将每个字符存储为两个字节,这是由于字符串内部使用了UTF-16编码。因此一个10个字符的字符串可以很容易地消耗60个字节 常见的集合类(如HashMap和LinkedList)使用链接的数据结构,每个条目都有一个“包装器”对象(例如Map.Entry)。这个对象不仅有一个头,而且还有指向列表中下一个对象的指针(通常是8个字节)。 基本类型的集合通常将它们存储为“装箱”对象,如java.lang.Integer。 本节将首先概述Spark的内存管理,然后讨论用户可以在他/她的应用程序中更有效地使用内存的具体策略。具体来说,我们将介绍如何确定对象的内存使用情况,以及如何改进它,或者通过更改数据结构,或者以序列化格式存储数据。接下来我们将介绍Spark的缓存大小和Java垃圾收集器。 内存管理概述Spark中的内存使用大部分属于两类:执行和存储。执行内存是指在混洗,连接,排序和聚合中用于计算的内存,而存储内存指的是用于跨群集缓存和传播内部数据的内存。在Spark中,执行和存储共享一个统一的区域(M)。当不使用执行内存时,存储器可以获取所有可用内存,反之亦然。如有必要,执行可以驱逐存储器,但是只有在总存储器内存使用量低于特定阈值(R)时才执行。换句话说,R描述了M内的一个分区,缓存块不会被驱逐。由于执行的复杂性,存储可能不会执行。 这种设计确保了几个理想的性能首先,不使用缓存的应用程序可以使用整个空间执行,避免不必要的磁盘溢出。其次,使用高速缓存的应用程序可以保留最小的存储空间(R),使数据块不受驱逐。最后,这种方法为各种工作负载提供了合理的开箱即用性能,而不需要用户如何在内部划分内存的专业知识。 虽然有两种相关配置,但典型用户不需要调整它们,因为默认值适用于大多数工作负载: spark.memory.fraction将M的大小表示为(JVM堆空间 - 300MB)的一部分(默认值为0.6)。其余空间(40%)保留给用户数据结构,Spark中的内部元数据,并在稀疏和异常大的记录的情况下防止OOM错误。 spark.memory.storageFraction将R的大小表示为M的一部分(默认为0.5)。 R是M中的存储空间,缓存的块不会被执行驱逐。 应该设置spark.memory.fraction的值,以便在JVM的旧时代或“终身”时代中舒适地适应这种堆空间。有关详细信息,请参阅下面对高级GC调整的讨论。 确定内存消耗调整数据集所需的内存消耗量的最佳方法是创建RDD,将其放入缓存,然后查看Web UI中的“存储”页面。 页面会告诉你RDD占用了多少内存。 要估计特定对象的内存消耗,请使用SizeEstimator的估计方法。这对于尝试使用不同数据布局来调整内存使用情况以及确定每个执行程序堆中广播变量占用的空间量非常有用。 调整数据结构减少内存消耗的第一种方法是避免增加开销的Java功能,例如基于指针的数据结构和包装对象。 做这件事有很多种方法: 1、设计你的数据结构来优先选择对象数组和基本类型,而不是标准的Java或Scala集合类(例如HashMap)。 fastutil库为与Java标准库兼容的基本类型提供了方便的集合类。 2、尽可能避免使用大量小对象和指针的嵌套结构。 3、考虑使用数字ID或枚举对象而不是键的字符串。 4、如果RAM少于32 GB,请设置JVM标志-XX:+ UseCompressedOops使指针为4个字节而不是8个。 您可以在spark-env.sh中添加这些选项。 序列化的 RDD 存储如果对象仍然太大,无法进行高效存储,但是使用RDD持久性API中的序列化存储级别(例如MEMORY_ONLY_SER)来减少内存使用的一种更简单的方法是以序列化的形式存储它们。 Spark然后将每个RDD分区存储为一个大字节数组。 以序列化形式存储数据的唯一缺点是访问速度较慢,这是由于必须快速反序列化每个对象。 如果你想以序列化的形式缓存数据,我们强烈推荐使用Kryo,因为它比Java序列化(当然还有原始的Java对象)要小得多。 垃圾回收调优当您的程序存储RDD时,JVM垃圾回收会成为问题。 (在只读RDD的程序中,通常不会出现问题,然后在其上运行很多操作。)当Java需要驱逐旧对象以腾出空间给新对象时,它需要跟踪所有的Java对象并找到未使用的。这里要记住的要点是垃圾收集的成本与Java对象的数量成正比,所以使用较少对象的数据结构(例如Ints数组而不是LinkedList)大大降低了成本。更好的方法是以序列化的形式保存对象,如上所述:现在每个RDD分区只有一个对象(一个字节数组)。在尝试其他技术之前,如果GC是一个问题,首先要尝试使用序列化缓存。 由于任务的工作内存(运行任务所需的空间量)和缓存在节点上的RDD之间的干扰,GC也可能成为问题。我们将讨论如何控制分配给RDD缓存的空间来缓解这个问题。 测量GC的影响 GC调优的第一步是收集垃圾收集发生的频率和GC花费的时间。这可以通过在Java选项中添加-verbose:gc -XX:+ PrintGCDetails -XX:+ PrintGCTimeStam来完成。 (有关将Java选项传递给Spark作业的信息,请参阅配置指南。)下次运行Spark作业时,每次发生垃圾回收时都会在工作人员的日志中看到消息。请注意,这些日志将位于群集的工作节点上(在其工作目录中的stdout文件中),而不是在驱动程序上。 高级GC调整 为了进一步调整垃圾收集,我们首先需要了解JVM中有关内存管理的一些基本信息: 爪哇堆空间分为两个地区的年轻人和老人。年轻一代是为了保存短寿命的物体,而老一代则是为了寿命更长的物体。 年轻一代进一步分为三个地区[伊甸园,幸存者1,幸存者2]。 垃圾收集过程的简单描述:当Eden已满时,在Eden上运行一个小型GC,并将从Eden和Survivor1中存活的对象复制到Survivor2。幸存者地区交换。如果一个对象足够旧或者Survivor2已满,则将其移至Old。最后,当Old接近满时,调用完整的GC。 在Spark中进行GC调优的目标是确保只有长寿命的RDD才被存储在旧一代中,并且Young生成的大小足以存储短期对象。这将有助于避免完整的GC收集任务执行期间创建的临时对象。一些可能有用的步骤是: 通过收集GC统计信息来检查是否有太多的垃圾回收。如果在任务完成之前多次调用完整的GC,则意味着没有足够的内存可用于执行任务。 如果有太多次要收集,但没有太多主要地理信息,那么为伊甸园分配更多的内存将会有所帮助。您可以将Eden的大小设置为高估每个任务需要多少内存。如果Eden的大小确定为E,则可以使用选项-Xmn = 4/3 * E来设置Young代的大小。 (增加4/3也是为了解释幸存者地区所使用的空间)。 在打印的GC统计信息中,如果OldGen接近满,则通过降低spark.memory.fraction来减少用于缓存的内存量;缓存更少的对象比减慢任务执行更好。或者,考虑减少年轻一代的规模。这意味着如果你按照上面的方式设置,则降低-Xmn。如果不是,请尝试更改JVM的NewRatio参数的值。许多JVM默认这个为2,这意味着老一代占2/3的堆。它应该足够大,使得这个分数超过spark.memory.fraction。 使用-XX:+ UseG1GC试用G1GC垃圾回收器。在某些垃圾收集是瓶颈的情况下,它可以提高性能。请注意,对于较大的执行程序堆大小,使用-XX:G1HeapRegionSize增加G1区大小可能很重要 例如,如果您的任务正在从HDFS中读取数据,则可以使用从HDFS读取的数据块的大小来估计该任务使用的内存量。请注意,解压缩块的大小通常是块大小的2到3倍。所以如果我们希望有3或4个任务的工作空间,HDFS块大小为128 MB,我们可以估计Eden的大小为4 * 3 * 128MB。 监视垃圾收集所花费的时间和频率如何随新设置发生变化。 我们的经验表明,GC调整的效果取决于您的应用程序和可用的内存量。在线描述的调谐选项还有很多,但在较高的层次上,管理全面GC发生的频率有助于减少开销。 GC调整标志 其它考虑事项并行度除非您将每个操作的并行度设置得足够高,否则群集不会被充分利用。 Spark会根据自己的大小(尽管可以通过可选参数控制SparkContext.textFile等)自动设置每个文件上运行的“map”任务的数量,而对于分布式的“reduce”操作,比如groupByKey和reduceByKey, 它使用最大的父RDD的分区数量。 您可以将并行级别作为第二个参数(请参阅spark.PairRDDFunctions文档),或者将config属性设置为spark.default.parallelism以更改默认值。 一般来说,我们建议您的群集中每个CPU核心有2-3个任务。 Reduce 任务的内存使用有时,你会得到一个OutOfMemoryError,不是因为你的RDD不适合内存,而是因为你的一个任务的工作集,比如groupByKey中的一个reduce任务,太大了。 Spark的shuffle操作(sortByKey,groupByKey,reduceByKey,join等)在每个任务中构建一个哈希表来执行分组,这通常会很大。 这里最简单的解决方法是增加并行度,使每个任务的输入集合更小。 Spark能够有效地支持短至200毫秒的任务,因为它可以在一个任务中重复使用一个执行器JVM,并且任务启动成本较低,因此可以安全地将并行级别提高到超过集群内核的数量。 广播超大变量使用SparkContext中可用的广播功能可以大大减少每个序列化任务的大小,以及通过集群启动作业的成本。 如果您的任务使用其中的驱动程序的任何大对象(例如静态查找表),请考虑将其转换为广播变量。 Spark打印每个任务的序列化大小,所以你可以看看,以确定你的任务是否太大; 一般来说大于20KB的任务可能是值得优化的。 数据本地化数据局部性可能会对Spark作业的性能产生重大影响。 如果数据和在其上运行的代码在一起,那么计算就会很快。 但是,如果代码和数据是分开的,就必须转移到另一个。 通常情况下,由于代码大小比数据小得多,所以将数据从一个地方传输到另一个地方比传输数据更快。 Spark围绕这个数据局部性的一般原则构建调度。 数据局部性是数据与代码的接近程度。 根据数据的当前位置,有几个级别的地点。 从最近到最远的顺序: PROCESS_LOCAL 数据与运行代码位于同一个JVM中。 这是最好的地方可能 NODE_LOCAL 数据在同一个节点上。 例子可能在同一个节点上的HDFS中,或者在同一个节点上的另一个执行器上。 这比PROCESS_LOCAL稍慢,因为数据必须在进程之间传输 NO_PREF 数据可以从任何地方以相同的速度访问,并且没有本地偏好 RACK_LOCAL 数据位于同一台服务器上。 数据位于同一机架上的不同服务器上,因此需要通过网络进行发送,通常通过一台交换机进行发送 ANY 数据都在网络上的其他地方,而不在同一个机架上 Spark更喜欢在最好的地点级别安排所有任务,但这并不总是可能的。 在任何空闲的执行器上没有未处理的数据的情况下,Spark会切换到较低的地点级别。 有两种选择:a)等待一个繁忙的CPU释放,以便在同一台服务器上的数据上启动一个任务;或者b)立即在较远的地方开始一个需要移动数据的新任务。 Spark通常所做的就是等待繁忙的CPU释放的希望。 一旦超时,它就开始将数据从远处移动到空闲的CPU。 每个级别之间回退的等待超时可以单独配置,也可以全部配置在一个参数中; 有关详细信息,请参阅配置页面上的spark.locality参数。 如果你的任务很长,看到地方不好,你应该增加这些设置,但是默认情况下通常效果不错。 小结这是一个简短的指南,指出调整Spark应用程序时应该了解的主要问题 - 最重要的是数据序列化和内存调整。 对于大多数程序来说,切换到Kryo序列化和以序列化形式保存数据将解决最常见的性能问题。 请随时在Spark邮件列表上询问其他调整最佳实践。 |
|
Spark性能调优九之常用算子调优2018.11.21 04:28:53字数 1,093阅读 929
前面介绍了很多关于Spark性能的调优手段,今天来介绍一下Spark性能调优的最后一个点,就是关于Spark中常用算子的调优。废话不多说,直接进入正文; 1.使用mapPartitions算子提高性能 mapPartition的优点:使用普通的map操作,假设一个partition中有1万条数据,那么function就要被执行1万次,但是使用mapPartitions操作之后,function仅仅会被执行一次,显然性能得到了很大的提升,这个就没必要在多废话了。 mapPartition的缺点:使用普通的map操作,调用一次function执行一条数据,不会出现内存不够使用的情况;但是使用mapPartitions操作,很显然,如果数据量太过于大的时候,由于内存有限导致发生OOM,内存溢出。 总结:通过以上以上优缺点的对比,我们可以得出一个结论;就是在数据量不是很大的情况下使用mapPartition操作,性能可以得到一定的提升,在使用mapPartition前,我们需要预先估计一下每个partition的量和每个executor可以被分配到的内存资源。然后尝试去运行程序,如果程序没有问题就大可放心的使用即可,下图是一个实际的应用例子,仅供参考。 ![]() mapPartitions优化
2.filter操作之后使用coalesce算子提高性能 先看看默认情况下,执行完filter操作以后的各个partition的情况,如下图所示; ![]() 默认的执行流程图
问题:从上面的图中可以很明显的看出,经过一次filter操作以后,每个partition的数据量不同程度的变少了,这里就出现了一个问题;由于每个partition的数据量不一样,出现了数据倾斜的问题。比如上图中执行filter之后的第一个partition的数据量还有9000条。 解决方案:针对上述出现的问题,我们可以将filter操作之后的数据进行压缩处理;一方面减少partition的数量,从而减少task的数量;另一方面通过压缩处理之后,尽量让每个partition的数据量差不多,减少数据倾斜情况的出现,从而避免某个task运行速度特别慢。coalesce算子就是针对上述出现的问题的一个解决方案,下图是一个解决案例。 ![]() 应用实例图
3.使用foreachPartition算子进行 默认的foreach对于每一条数据,都要单独调用一次function并创建一个数据库连接,如果数据量很大,对于spark作业是非常消耗性能的。 而对于foreachPartition来说,对于function函数,只调用一次,只获取一个数据库连接,一次将数据全部写入数据库。但是数据量很大的话,可能会引发OOM的问题。不过在生产环境中一般都是使用foreachPartition(好像说了半天废话)。 4.使用repartition解决SparkSQL低并行度的问题 在spark项目中,如果在某些地方使用了SparkSQL,那么使用了SparkSQL的那个stage的并行度就没有办法通过手动设置了,而是由程序自己决定。那么,我们通过什么样的手段来提高这些stage的并行度呢?其实解决这个问题的办法就是使partition的数量增多,从而间接的提高了task的并发度,要提高partition的数量,该怎么做呢?就是使用repartition算子,对SparkSQL查询出来的数据重新进行分区操作,此时可以增加分区的个数。具体使用如下图所示: ![]() 总结:关于RDD算子的优化,就先讲到这里。关于整个Spark调优,基本先告一段落,后面会介绍一些Spark源码分析的知识,欢迎关注。 如需转载,请注明: |
|
|
一、MapPartitions提升Map类型操作性能 ①MapPartitions的优点 ②MapPartitions的缺点 ③适用场景 二、filter之后使用coalesce减少分区数量 RDD.filter(XXXXXXXXXXXXXXX).coalesce(100); 三、repartition解决Spark SQL低并行度的性能问题 ②解决方案 ③代码示例 四、foreachPartition优化写数据库性能 对于每条数据,task都要单独执行一次function ②使用foreachPartition的好处 ③foreachPartition的缺陷 所以,foreachPartition的适用环境还需要在现实的生产环境中慢慢调试。 五、reduceByKey本地聚合 好处在于: 在本地进行聚合以后,在map端的数据量就会变少,减少磁盘IO,减少磁盘空间的占用。 |
|
(1 条消息) 数据倾斜解决方案之原理以及现象分析_Johnson8702 的博客 - CSDN 博客
一、什么是数据倾斜数据倾斜是大数据类型项目中最棘手的性能问题。数据倾斜一般会有两种表现:
二、原因分析在执行 shuffle 操作的时候,比如说有 3 个 task,100 万数据。某一个 key 对应 98 万个 values,分配到同一个 task;另外两个 task,每个 task 分配到 1 万个 values,并且是对应不同的 key。 这个时候,分别分配到 1 万个 values 的 task 很快执行完,但是分配到 98 万 values 的 task 执行起来特别慢,导致整个 spark 作业的运行时间特别长。 三、定位原因与出现问题的位置出现数据倾斜,基本只可能是因为发生了 shuffle 操作,在 shuffle 的过程中,某个 key 对应的数据远远多于其他 key 对应的数据,导致数据倾斜。这种情况下,可以通过两种方式去定位:
下面开始介绍几个数据倾斜的解决方案: 四、聚合源数据和过滤导致数据倾斜的 key①聚合源数据通常,在做一些聚合操作时,比如 groupByKey、reduceByKey、join,拿到的源数据都是每个 key 对应的 values,然后在 spark 作业中对数据进行一定的操作。 如果这些数据来自 hive 表,可以在 hive 表中对数据进行 ETL,对数据进行一定的处理。由于 hive 比较适合做离线分析,可以做定时任务,定时对数据进行聚合(也可能是初步计算)。如果按 key 来分组,将 key 对应的所有 values,用特定的方式拼接到一个字符串中去,比如 “key=sessionid,value:action_seq=1|user_id=1|search_keyword = 电子 | category_id=01;action_seq=2|user_id=1|search_keyword = 书籍 | category_id=01”。 在 hive 表中对 key 进行聚合后,当需要对 key 进行操作时,spark 中通过 key 拿到的数据就是 hive 表聚合过的数据。可能 hive 表进行聚合前是 10 万条数据,聚合后就变成了 1 千条数据,解决了数据倾斜的问题。 ②过滤导致数据倾斜的 key对于待处理数据,可能大多数 key 对应的数据都是几十、几百条,只有极少数 key 对应的数据是数十万条。 这个时候,根据业务需求,如果允许的情况下,可以将那极少数的 key 对应的数据舍弃掉,避免数据倾斜。 如果上面两个方法可以采纳,那么基本上可以从源头上解决绝大部分数据倾斜问题。实在不行,再尝试下面几个方法。 五、提高 shuffle 操作 reduce 并行度提高 reduce 并行度,就是提高 task 的数量。 举个例子,原本某个 task 分配数据特别多,直接 OOM(内存溢出),导致程序无法运行。根据 log,找到发生数据倾斜的 shuffle 操作,提高并行度(在算子后面传入一个参数,即可设置并行度,比如 reduceByKey(xxx,1000)),让数据尽量均匀地分配到所有的 task 中。每个 task 分配到的数据量变少,避免 OOM,缓解数据倾斜的问题。 注意: 这种方法并没有从根本上解决数据倾斜的问题,只是缓解了数据倾斜带来的影响。 在理想状态下,如果这步操作之后,数据倾斜的问题得到解决,最好;如果对数据倾斜问题的影响很小,则不考虑这种方法,参考其他方法。 六、使用随机 key 实现双重聚合![]() 在原始 key 前面加一个随机数(不同场景灵活应对),生成双重 key,然后进行第一轮聚合;第一轮聚合之后,再将双重 key 反向映射成原始 key,进行第二轮聚合。 在 groupByKey、reduceByKey 等算子中,可以采取这个方法。 如果前面的几种方法(聚合源数据和过滤导致数据倾斜的 key、提高 shuffle 操作 reduce 并行度)无法解决数据倾斜问题,则只能依靠这种方法。 七、将 reduce join 转换为 map join![]() 普通 join,需要经过 shuffle 操作,是 reduce join,先将所有相同 key 对应的 values 汇聚到一个 task 中,然后再进行 join。 ![]() map join,把小的 RDD 做成广播变量,再进行 map 操作,得到聚合后的 RDD,最后再进行后续操作。 ①适用场景如果两个 RDD 要进行 join,其中一个 RDD 比较小(比如一个 RDD 是 100 万数据,另一个 RDD 是 1 万数据),可以采取这种方法。广播出去的 RDD,在每个 executor 的 block manager 上都会有一个数据备份,所以采取这种方法的时候,要确保内存足够存放小 RDD 的数据。 这个方式下,不会发生 shuffle 操作,从根本上杜绝了数据倾斜的问题。 ②不使用场景两个 RDD 都比较大,如果将一个 RDD 广播出去,每个 executor 上都会有一个数据备份,可能会导致 OOM(内存溢出)。 ③总结对于 join 这种操作,即使没有数据倾斜,在条件允许的情况下,也优先使用 map join 方法,避免通过 shuffle 进行 join,牺牲少量的内存来换取更好的性能。 八、sample 采样倾斜 key 进行两次 join![]() 将发生数据倾斜的 key 单独去出来,放到一个 RDD 中,将原始 RDD 拆分成两个 RDD:导致倾斜的 key 的 RDD 和其他 RDD。用这两个 RDD 单独 join,此时 key 对应的数据会分散到多个 task 中进行 join 操作,避免数据倾斜造成的问题,然后将两次 join 的结果执行 union 操作合并起来。 --> 可以通过自动化的方式进行~ ①适用场景对于 join 操作,优先使用 map join 替代普通 join。如果不能采用 map join,并且某一个或几个 key 对应的数据量比较大,考虑使用 sample 采样。 ②不适用场景如果一个 RDD 中,导致数据倾斜的 key 特别多,此时不建议使用 sample 采样的方法,建议考虑下面第九种方法(sample 采样倾斜 key 进行两次 join)。 ③优化对于采样的 key,从另一个要 join 的表中也过滤出一份数据,比如就只有一条数据。对这一个数据,进行 flatMap 操作,打上 100 个随机数作为前缀,返回 100 条数据。 单独拉出来的可能产生数据倾斜的 RDD,给每一个数据都带上一个 100 以内的随机数最为前缀。 然后在进行 join,整个过程会被打散到不同的 task 中运行,性能会提高很多。最后,再将结果 union 合并。 九、使用随机数以及扩容表进行 join当前面几个方法都没法使用时,可以尝试使用随机数以及扩容表进行 join。这个方法没有办法彻底解决数据倾斜,只是一种对数据倾斜的缓解。 ![]() 步骤:
局限性:
|
|
(1 条消息)Spark SQL 数据倾斜解决方案_大数据_Johnson8702 的博客 - CSDN 博客
由于 Spark 都是基于 RDD 的特性,所以可以用纯 RDD 的方法,实现和 Spark SQL 一模一样的功能。 之前在 Spark Core 中的数据倾斜的七种解决方案,全部都可以直接套用在 Spark SQL 上。 Spark SQL 的数据倾斜解决方案:
|