zoukankan      html  css  js  c++  java
  • Spark(八)【利用广播小表实现join避免Shuffle】

    使用场景

    大表join小表 只能广播小表

    普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜。

    注意:RDD是并不能进行广播的,只能将RDD内部的数据通过collect拉取到Driver内存然后再进行广播

    核心思路

    将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。

    代码演示

    正常join

        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MapJoin")
        val sc: SparkContext = new SparkContext(conf)
        val rdd1: RDD[(String, Int)] = sc.makeRDD(List("key1" -> 2, "key1" -> 10, "key2" -> 20, "key3" -> 30))
        val rdd2: RDD[(String, Int)] = sc.makeRDD(List("key1" -> 5, "key1" -> 20, "key2" -> 40, "key4" -> 30))
    	 //join
        rdd1.join(rdd2).collect().foreach(println)
    

    控制台

    (key1,(2,5))
    (key1,(2,20))
    (key1,(10,5))
    (key1,(10,20))
    (key2,(20,40))
    

    正常left join

    //left join
    rdd1.leftOuterJoin(rdd2).collect().foreach(println)
    
    (k1,(10,Some(-10)))
    (k1,(10,Some(-100)))
    (k2,(20,Some(-20)))
    (k1,(100,Some(-10)))
    (k1,(100,Some(-100)))
    (k3,(30,None))
    

    广播:join

        //广播rdd2
        val bd: Broadcast[Array[(String, Int)]] = sc.broadcast(rdd2.collect())
        val result = rdd1.flatMap {
          case (key1, value1) => {
            bd.value
              .filter(key1 == _._1)
              .map {
                case (key2, value2) =>
                  (key1, (value1, value2))
              }
          }
        }
        result.collect().foreach(println)
    

    广播:left join

        //广播rdd2
        val bd: Broadcast[Array[(String, Int)]] = sc.broadcast(rdd2.collect())
        val result: RDD[(String, (Int, Option[Int]))] = rdd1.flatMap {
          case (key1, value1) =>
            val arr = bd.value
            val keys = arr.map(_._1)
            if (keys.contains(key1)) {
              bd.value.filter(key1 == _._1).map {
                case (key2, value2) =>
                  (key1, (value1, Some(value2)))
              }
            } else {
              Array(key1 -> (value1, None))
            }
        }
        result.collect.foreach(println)
    

    不适用场景

    由于Spark的广播变量是在每个Executor中保存一个副本,如果两个RDD数据量都比较大,那么如果将一个数据量比较大的 RDD做成广播变量,那么很有可能会造成内存溢出

  • 相关阅读:
    sql语句游标的写法
    oracle的安装与plsql的环境配置
    oracle中创建表时添加注释
    jsp中Java代码中怎么获取jsp页面元素
    sql模糊查询
    jQuery循环给某个ID赋值
    Codeforces Round #671 (Div. 2)
    TYVJ1935 导弹防御塔
    Educational Codeforces Round 95 (Rated for Div. 2)
    Codeforces Round #670 (Div. 2)
  • 原文地址:https://www.cnblogs.com/wh984763176/p/13668240.html
Copyright © 2011-2022 走看看