zoukankan      html  css  js  c++  java
  • 理解Spark里的闭包

    闭包的概念如下图:

    640?wx_fmt=png

    在spark应用里,变量及函数的作用范围和声明周期在spark的集群运行模式下是比较难理解的,尤其是对初学者来说。RDD的操作,要修改其作用范围的变量,经常会出点叉子。下面,可以举个用foreach,修改一个计数器的例子。

    例子

    求和RDD元素的例子,该例子会根据该段代码是否执行在同一个jvm里面有不同的输出结果,比如local模式,运行于同一个jvm,输出是15;cluster模式运行于不同jvm输出是0。

    val data=Array(1, 2, 3, 4, 5)

    var counter = 0

    var rdd = sc.parallelize(data)

    // Wrong: Don't do this!!

    rdd.foreach(x => counter += x)

    println("Counter value: " + counter)

    本地或集群模式

    上述代码的行为是未定义的,并且不同模式下运行情况不同。为了执行作业,Spark将RDD操作的处理分解为tasks,每个task由Executor执行。在执行之前,Spark会计算task的闭包。闭包是Executor在RDD上进行计算的时候必须可见的那些变量和方法(在这种情况下是foreach())。闭包会被序列化并发送给每个Executor。

    发送给每个Executor的闭包中的变量是副本,因此,当foreach函数内引用计数器时,它不再是driver节点上的计数器。driver节点的内存中仍有一个计数器,但该变量是Executor不可见的!执行者只能看到序列化闭包的副本。因此,计数器的最终值仍然为零,因为计数器上的所有操作都引用了序列化闭包内的值。

    在本地模式下,在某些情况下,该foreach函数实际上将在与driver相同的JVM内执行,并且会引用相同的原始计数器,并可能实际更新它。

    为了确保在这些场景中明确定义的行为,应该使用一个Accumulator。Spark中的累加器专门用于提供一种机制,用于在集群中的工作节点之间执行拆分时安全地更新变量。

    一般来说,closures - constructs像循环或本地定义的方法,不应该被用来改变一些全局状态。Spark并没有定义或保证从闭包外引用的对象的改变行为。这样做的一些代码可以在本地模式下工作,但这只是偶然,并且这种代码在分布式模式下的行为不会像你想的那样。如果需要某些全局聚合,请改用累加器。

    打印RDD的元素

    另一个常见的习惯用法是尝试使用rdd.foreach(println)或rdd.map(println)打印出RDD的元素。在单台机器上,这将产生预期的输出并打印所有RDD的元素。但是,在cluster模式下,由Executor执行输出写入的是Executor的stdout,而不是driver上的那个stdout,所以driver的stdout不会显示这些!要在driver中打印所有元素,可以使用该collect()方法首先将RDD数据带到driver节点:rdd.collect().foreach(println)。但这可能会导致driver程序内存不足,因为collect()会将整个RDD数据提取到driver端; 如果您只需要打印RDD的一些元素,则更安全的方法是使用take():张晓丽rdd.take(100).foreach(println)。

    推荐阅读:

    1,必读:Spark与kafka010整合

    2,论Spark Streaming的数据可靠性和一致性

    3,Spark技术学院-进去能学到啥?

    与bat大牛一起学习进步,请关注知识星球:spark技术学院

    640?wx_fmt=jpeg


    文章来源:https://blog.csdn.net/rlnLo2pNEfx9c/article/details/80650309

  • 相关阅读:
    Oracle数据库的一些常用命令
    计算机网络:计算路由表下一跳
    怎样快速对二进制和十进制进行互转化——IP地址规划与设计总结
    计算机网络-以太网,局域网,城域网,广域网,互联网,因特网,万维网的区分
    Apache与Tomcat有什么关系和区别
    Oracle 添加用户并赋权,修改密码,解锁,删除用户的方法
    使用Oracle的PROFILE对用户资源限制和密码限制
    通过修改profile 来修改账号的过期时间
    解决oracle用户过期问题
    【项目】项目36
  • 原文地址:https://www.cnblogs.com/mazhujun/p/9633640.html
Copyright © 2011-2022 走看看