zoukankan      html  css  js  c++  java
  • spark数据倾斜分析与解决方案

    Spark数据倾斜(数据分布不均匀)

    数据倾斜发生时的现象:

    1. 绝大多数task(任务)执行得都非常快,但个别task执行极慢。
    2. OOM(内存溢出),这种情况比较少见。

    数据倾斜发生的原理

    数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大,就会发生数据倾斜。比如,大部分key对应的数据是10条,有一个key对应的数据是100万条,那么大部分的task只分配了10条数据,可能1秒就完成了,但是那个100万条数据的task,可能还要经过一两个小时,整个Spark作业的运行进度是由运行时间最长的那个task决定的。木桶原理。

    因此出现数据倾斜的时候,Spark作业看起来会运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出。

    数据倾斜产生在那些地方

    首先要看的,就是数据倾斜发生在第几个stage中。

    Stage的划分是触发了shuffle操作,才会划分stage。

    触发shuffle操作的算子:

    distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition

    数据倾斜解决方法

    1.     使用spark通用的优化方案

    2.     分两种情况,

    一种聚合数据倾斜:

    1. 先对key前加n以内的随机前缀,然后计算,计算完成去掉随机前缀,再次合并结果。N一般来说取值在10左右

     

    一种是join类型的数据倾斜:

    a.     先对左表加随机前缀

    b.     对右表扩容n倍

    c.      执行join操作

    d.     去掉结果中的前缀

     

    实现代码如下

    一种聚合数据倾斜:

    //key前加随机数,聚合,再去掉随机前缀

      def testAcc(sc: SparkContext) = {

     

     

        sc.parallelize(List("hello", "hello", "hello", "hello", "world"))

          //sc.textFile("d:\test\ssc\bias.txt",20)

          .map(word => (word, 1))

     

          //传统做法,可能会出现数据倾斜

          .reduceByKey(_+_)

          //解决数据倾斜--加后缀+聚合+去后缀+聚合

          .map { case (key, value) => {

          val random = new Random();

          //将key加随机前缀

          (random.nextInt(3) + "_" + key, value)

        }

        }

          //聚合

          .reduceByKey(_ + _)

          //去随机前缀

          .map { case (k, v) => (k.substring(k.indexOf("_") + 1), v) }

          //聚合

          .reduceByKey(_ + _)

          .foreach(println)

        Thread.sleep(1000000)

      }

     

    一种是join类型的数据倾斜:

    二个rdd join操作 rddl.join(rdd2) 左表加前缀--右表扩展n倍

    def testJoin(sc: SparkContext): Unit = {

        val rddl=sc.parallelize(List((1,"hello"),(1,"hello"),(1,"hello"),(1,"hello"),(2,"world")))

        val rddr=sc.parallelize(List((1,"man"),(2,"woman")))

        //传统方式,可能会出现数据倾斜

        //rddl.join(rdd2).foreach(println)

        //左侧rdd加随机前缀(n以内),右侧rdd根据随机前缀扩容n倍

        val prefixRdd=rddl.map{case (k,v)=>{

          val random = new Random()

          (random.nextInt(3) + "_" + k, v)

        }}

     

        //右侧扩容

        val expandRdd=rddr.flatMap{

          case (k,v)=>{

            val num=List(0,1,2)

            num.map(i=>(i+"_"+k,v))

          }

        }

        //去掉前缀

        prefixRdd.join(expandRdd)

          .map{case (k,v)=>(k.split("_")(1),v)}

          .foreach(println)

     

      }

  • 相关阅读:
    numpy 基础 —— np.linalg
    图像旋转后显示不完全
    opencv ---getRotationMatrix2D函数
    PS1--cannot be loaded because the execution of scripts is disabled on this system
    打开jnlp Faild to validate certificate, the application will not be executed.
    BATCH(BAT批处理命令语法)
    oracle vm virtualbox 如何让虚拟机可以上网
    merge 实现
    Windows batch,echo到文件不成功,只打印出ECHO is on.
    python2.7.6 , setuptools pip install, 报错:UnicodeDecodeError:'ascii' codec can't decode byte
  • 原文地址:https://www.cnblogs.com/pigdata/p/10305564.html
Copyright © 2011-2022 走看看