Understanding closures(闭包)
Spark
->Programming Guides
->RDD Programming Guide
->Resilient Distribute Dataset(RDDs)
->RDD Operations
-> Understanding closures
1. 什么是Spark Closures?用途?
2. RDD怎样进行全局聚合?
3. 如何打印所有RDD中的元素,在cluster运行模式下?
One of the harder things(一个比较困难的事情) about Spark is understanding the scope and life cycle variables and methods (范围,生活循环变量,方法) when executing code across a cluster(集群).RDD operations(RDD操作) that modify variables(修改变量) outside of their scope can be a frequent source of confusion(一个常见资源的混乱).
In the example below we'll look at code that uses foreach() to increment a counter(增加一个计数器), but similar issues can occur for other operations as well.
Example
Consider the native RDD element sum below, which may behave defferently depending on whether execution is happening within the same JVM. A common example of this is when running Spark Local mode (--master = Local[n]) versus(对抗) deploying(分发) a Spark application to a cluster(e.g. via spark-sumbit to YARN)
Scala
var count = 0
var rdd = sc.parallelize(data)
//Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value:" + counter)
Local vs. cluster modes(Local模式和cluste运行模式)
The behavior of the above code is undefined(不明确的), and may not work as intended(如预期). To execute jobs, Spark breaks up(打破了) the processing of RDD operations into tasks, each of which is executed by an executor to perform(执行) its computations(计算指令) on the RDD (in this case foreach()). This closure(闭包) is serialized and sent to each executor. Prior to execution(在执行之前), Spark computes the task's closure. The closure is those variables and methods which must be visible(可见物) for the executor to perform its computations on the RDD (in this case foreach()).This closure is serialized and sent to each executor.
Note:the closure包括变量和方法,对每个executor是可见的,用于执行its computations on the RDD
The variables within the closure sent to each executor are now copies and thus, when counter is referenced within the foreach function(在foreach函数中引用), it’s no longer the counter on the driver node(它不再是驱动程序节点上的计数器). There is still a counter in the memory of the driver node but this is no longer visible to the executors(driver节点的内存中仍然有一个counter,但是对Executors来说,这不再是visible了!)! The executors only see the copy from the serialized closure(executors只看到the serialized closure中的copy。). Thus, the final value of counter will still be zero since all operations on counter were referencing the value within the serialized closure.(因此,the final value of counter仍然是零,因为all operations on counter都引用the value within the serialized closure。)
In local mode, in some circumstances the foreach function will actually execute within the same JVM as the driver and will reference the same original counter, and may actually update it.(在local模式下,在某些情况下,foreach函数实际上将作为driver在同一个JVM中执行,并将引用相同的原始counter,并可能实际更新它。)
To ensure well-defined behavior in these sorts of scenarios one should use an Accumulator. (为了确保在这类场景中定义良好的行为,应该使用Accumulator。) Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster.(Spark中的Accumulators专门用于提供一种机制,用于在集群中的worker节点上拆分execution时安全地更新变量。) The Accumulators section of this guide discusses these in more detail.(本指南的 Accumulators部分将更详细地讨论这些内容。)
In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. (一般来说,closures--constructs比如loops或本地定义的方法--不应该被用来改变某种全局状态。)Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures.(Spark不定义或保证从closures外部引用的对象的突变行为。) Some code that does this may work in local mode, but that’s just by accident and such code will not behave as expected in distributed mode.(有些代码可以在本地模式下工作,但这只是偶然的,这种代码在分布式模式下的行为并不像预期的那样。) Use an Accumulator instead if some global aggregation is needed.(如果需要某种全局聚合,则使用Accumulator)
Printing elements of an RDD
Another common idiom is attempting to print out the elements of an RDD using rdd.foreach(println) or rdd.map(println). (另一个常见的成语[common idiom]是试图打印 RDD的元素使用rdd.foreach(println)或rdd.map(println)。)On a single machine, this will generate the expected output and print all the RDD’s elements.(在一台机器上,这将生成预期的输出并打印所有RDD的元素。) However, in cluster mode, the output to stdout being called by the executors is now writing to the executor’s stdout instead, not the one on the driver, so stdout on the driver won’t show these! (但是,在cluster模式下,由执行程序调用的stdout输出现在正在写入executors程序的stdout,而不是在driver程序上的stdout,因此驱动程序上的stdout不会显示这些输出!) To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node thus: rdd.collect().foreach(println). (要打印driver程序上的所有元素,您可以使用collect()方法首先将RDD带到driver节点,例如:rdd.collect().foreach(println)。) This can cause the driver to run out of memory, though, because collect() fetches the entire RDD to a single machine;(但是,这会导致驱动程序耗尽内存,因为collect()将整个RDD提取到一台机器上;) if you only need to print a few elements of the RDD, a safer approach is to use the take(): rdd.take(100).foreach(println).(如果您只需要打印RDD的几个元素,更安全的方法是使用take():rdd.take(100).foreach(println)。)