zoukankan      html  css  js  c++  java
  • Spark-Join优化之Broadcast

    适用场景

    • 进行join中至少有一个RDD的数据量比较少(比如几百M,或者1-2G)
    • 因为,每个Executor的内存中,都会驻留一份广播变量的全量数据

    Broadcast与map进行join代码示例

    创建RDD

    val list1 = List((jame,23), (wade,3), (kobe,24))
    val list2 = List((jame,cave), (wade,bulls), (kobe,lakers))
    val rdd1 = sc.makeRDD(list1)
    val rdd2 = sc.makeRDD(list2)

    传统的join

    // 传统的join操作会导致shuffle操作。
    // 因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。
    val rdd3 = rdd1.join(rdd2)
    // 结果如下
    scala> rdd1.join(rdd2).collect
    res27: Array[(String, (Int, String))] = Array((kobe,(24,lakers)), (wade,(3,bulls)), (jame,(23,cave)))

    使用Broadcast+map的join操作

    // Broadcast+map的join操作,不会导致shuffle操作。
    // 使用Broadcast将一个数据量较小的RDD作为广播变量
    val rdd2Data = rdd2.collect()
    val rdd2Bc = sc.broadcast(rdd2Data)
    
    // 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。
    // 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。
    def function(tuple: (String,Int)): (String,(Int,String)) ={
        for(value <- rdd2Bc.value){
         if(value._1.equals(tuple._1))
            return (tuple._1,(tuple._2,value._2.toString))
             }
             (tuple._1,(tuple._2,null))
             }
    
    // 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。
    // 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。
    // 此时就可以根据自己需要的方式,将rdd1当前数据与rdd2中可以连接的数据,拼接在一起(String或Tuple)。
    val rdd3 = rdd1.map(function(_))
    
    //结果如下,达到了与传统join相同的效果
    scala> rdd1.map(function(_)).collect
    res31: Array[(String, (Int, String))] = Array((jame,(23,cave)), (wade,(3,bulls)), (kobe,(24,lakers)))
  • 相关阅读:
    墙奸有感
    关于ubuntu里的fcitx
    Ubuntu 9.10 ati HD 3470 显卡驱动 搞定
    XP与Ubuntu双系统的问题
    invalid conversion from ‘__pthread_t*’ to ‘pid_t’
    Julian Day
    m的n次幂的求法
    Sublime Text 2
    在虚拟机Virtualbox安装Win8消费者版
    记一个循环的错误
  • 原文地址:https://www.cnblogs.com/0xcafedaddy/p/7613200.html
Copyright © 2011-2022 走看看