zoukankan      html  css  js  c++  java
  • spark数据倾斜分析与解决方案

    Spark数据倾斜(数据分布不均匀)

    数据倾斜发生时的现象:

    1. 绝大多数task(任务)执行得都非常快,但个别task执行极慢。
    2. OOM(内存溢出),这种情况比较少见。

    数据倾斜发生的原理

    数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大,就会发生数据倾斜。比如,大部分key对应的数据是10条,有一个key对应的数据是100万条,那么大部分的task只分配了10条数据,可能1秒就完成了,但是那个100万条数据的task,可能还要经过一两个小时,整个Spark作业的运行进度是由运行时间最长的那个task决定的。木桶原理。

    因此出现数据倾斜的时候,Spark作业看起来会运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出。

    数据倾斜产生在那些地方

    首先要看的,就是数据倾斜发生在第几个stage中。

    Stage的划分是触发了shuffle操作,才会划分stage。

    触发shuffle操作的算子:

    distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition

    数据倾斜解决方法

    1.     使用spark通用的优化方案

    2.     分两种情况,

    一种聚合数据倾斜:

    1. 先对key前加n以内的随机前缀,然后计算,计算完成去掉随机前缀,再次合并结果。N一般来说取值在10左右

     

    一种是join类型的数据倾斜:

    a.     先对左表加随机前缀

    b.     对右表扩容n倍

    c.      执行join操作

    d.     去掉结果中的前缀

     

    实现代码如下

    一种聚合数据倾斜:

    //key前加随机数,聚合,再去掉随机前缀

      def testAcc(sc: SparkContext) = {

     

     

        sc.parallelize(List("hello", "hello", "hello", "hello", "world"))

          //sc.textFile("d:\test\ssc\bias.txt",20)

          .map(word => (word, 1))

     

          //传统做法,可能会出现数据倾斜

          .reduceByKey(_+_)

          //解决数据倾斜--加后缀+聚合+去后缀+聚合

          .map { case (key, value) => {

          val random = new Random();

          //将key加随机前缀

          (random.nextInt(3) + "_" + key, value)

        }

        }

          //聚合

          .reduceByKey(_ + _)

          //去随机前缀

          .map { case (k, v) => (k.substring(k.indexOf("_") + 1), v) }

          //聚合

          .reduceByKey(_ + _)

          .foreach(println)

        Thread.sleep(1000000)

      }

     

    一种是join类型的数据倾斜:

    二个rdd join操作 rddl.join(rdd2) 左表加前缀--右表扩展n倍

    def testJoin(sc: SparkContext): Unit = {

        val rddl=sc.parallelize(List((1,"hello"),(1,"hello"),(1,"hello"),(1,"hello"),(2,"world")))

        val rddr=sc.parallelize(List((1,"man"),(2,"woman")))

        //传统方式,可能会出现数据倾斜

        //rddl.join(rdd2).foreach(println)

        //左侧rdd加随机前缀(n以内),右侧rdd根据随机前缀扩容n倍

        val prefixRdd=rddl.map{case (k,v)=>{

          val random = new Random()

          (random.nextInt(3) + "_" + k, v)

        }}

     

        //右侧扩容

        val expandRdd=rddr.flatMap{

          case (k,v)=>{

            val num=List(0,1,2)

            num.map(i=>(i+"_"+k,v))

          }

        }

        //去掉前缀

        prefixRdd.join(expandRdd)

          .map{case (k,v)=>(k.split("_")(1),v)}

          .foreach(println)

     

      }

  • 相关阅读:
    C++ std::forward_list
    MyBatis基础入门《十三》批量新增数据
    MyBatis基础入门《十二》删除数据
    MyBatis基础入门《十 一》修改数据
    MyBatis基础入门《十》添加数据
    MyBatis基础入门《九》ResultMap自动匹配
    MyBatis基础入门《八》查询参数传入Map
    MyBatis基础入门《七》查询参数传入对象
    MyBatis基础入门《六》Like模糊查询
    MyBatis基础入门《五》核心配置文件
  • 原文地址:https://www.cnblogs.com/pigdata/p/10305564.html
Copyright © 2011-2022 走看看