广播变量的好处:
如果你的算子函数中,使用到了特别大的数据,那么,这个时候,推荐将该数据进行广播。这样的话,就不至于将一个大数据拷贝到每一个task上去。而是给每个节点进程executor拷贝一份,然后节点上的task共享该数据。这样的话,就可以减少大数据在节点上的内存消耗。并且可以减少数据到节点的网络传输消耗。使用 广播变量之前,复制map数据和task线程数量相等。
@Test def bc1(): Unit ={ //数据,假装量很大,有一百兆 val map = Map("spark" -> "http://spark.apache.cn", "scala" -> "http://www.scala-lang.org") val conf = new SparkConf().setMaster("local[6]").setAppName("BroadCast") val sc = new SparkContext(conf) val rdd1 = sc.parallelize(Seq("spark", "scala")) val rdd2= rdd1.map(item => map(item)).collect() rdd2.foreach(println(_)) sc.stop() }
使用 广播变量之后,复制map数据和executor进程数量相等。executor数量远小于task数量,减少数据在节点网络传输的消耗。
@Test def bc2(): Unit ={ val map = Map("spark" -> "http://spark.apache.cn", "scala" -> "http://www.scala-lang.org") val conf = new SparkConf().setMaster("local[6]").setAppName("BroadCast") val sc = new SparkContext(conf) //创建广播 val bc = sc.broadcast(map) val rdd1 = sc.parallelize(Seq("spark", "scala")) //在算子中使用广播变量代替直接引用集合,只会复制和executor一样的数量 //在使用广播变量之前,复制map 数是task数量 //在使用广播以后,复制map数和executor一致 val rdd2= rdd1.map(item => bc.value(item)).collect() rdd2.foreach(println(_)) sc.stop() }