【数据倾斜出现的原因】
并行计算中,我们总希望分配的每一个任务(task)都能以相似的粒度来切分,且完成时间相差不大。但是由于集群中的硬件和应用的类型不同、切分的数据大小不一,总会导致部分任务极大地拖慢了整个任务的完成时间,数据倾斜原因如下:
- 业务数据本身的特性
- Key分布不均匀
- 建表时考虑不周
- 某些SQL语句本身就有数据倾斜
数据倾斜的表现:任务进度长时间维持,查看任务监控页面,由于其处理的数据量与其他任务差异过大,会发现只有少量(1个或几个)任务未完成。
【数据倾斜的解决方案】
数据倾斜有很多解决方案,本例简要介绍一种实现方式。假设表A和表B连接,表A数据倾斜,只有一个Key倾斜。首先对A进行采样,统计出最倾斜的Key。将A表分隔为A1只有倾斜Key,A2不包含倾斜Key,然后分别与B连接。
【数据实例】
------ ------
我们要实现上面两个表的连接,很容易发现在table1中(1,tom)出现的次数明显比其他的键值对要多,是倾斜数据。通过处理我们要把它拆分成两部分,如上图所示。然后这两部分分别与table2做连接操作,最后把结果汇总到一起。
【SPARK 代码】
1 package spark 2 3 import org.apache.spark.{SparkContext, SparkConf} 4 5 /** 6 * Created by Liu Jinhong on 2016/5/27. 7 */ 8 object TiltJoin { 9 def main(args: Array[String]) { 10 val conf = new SparkConf().setAppName("TiltJoin").setMaster("local") 11 val sc = new SparkContext(conf) 12 val line1 = sc.textFile("E:\testdoc\a.txt") 13 val line2 = sc.textFile("E:\testdoc\b.txt") 14 15 val table1 = line1.map(_.split(' ')).map(x => (x(0), x(1))) 16 val table2 = line2.map(_.split(' ')).map(x => (x(0), x(1))) 17 //对table1进行采样 18 val sample = table1.sample(false, 0.3, 9).map(x => (x._1, 1)).reduceByKey(_+_) 19 //找到table1中的倾斜数据 20 val maxrowKey = sample.map(x => (x._2, x._1)).sortByKey(false).take(1).toSeq(0)._2 21 //把table1拆分成两个表 22 val maxrowTable = table1.filter(_._1 == maxrowKey) 23 val maintable = table1.filter(_._1 != maxrowKey) 24 25 val result = sc.union(maxrowTable.join(table2), maintable.join(table2)).foreach(println(_)) 26 } 27 }
【涉及到的函数】
val maxrowKey = sample.map(x => (x._2, x._1)).sortByKey(false).take(1).toSeq(0)._2
上诉代码相当于实现了按照value降序排序。