一.案例SparkPi代码
1 package scala 2 3 import org.apache.spark.sql.SparkSession 4 5 import scala.math.random 6 7 /** Computes an approximation to pi */ 8 object SparkPi { 9 def main(args: Array[String]) { 10 val spark = SparkSession 11 .builder 12 .appName("Spark Pi") 13 .master("local[2]") 14 .getOrCreate() 15 val slices = if (args.length > 0) args(0).toInt else 2 16 val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow 17 val count = spark.sparkContext.parallelize(1 until n, slices).map { i => 18 val x = random * 2 - 1 19 val y = random * 2 - 1 20 if (x*x + y*y <= 1) 1 else 0 21 }.reduce(_ + _) 22 println(s"Pi is roughly ${4.0 * count / (n - 1)}") 23 spark.stop() 24 } 25 }
二.执行结果
三.日志分析
1.在使用提交命令./run-example SparkPi 10执行案例SparkPi时,根据警告信息可知,因为是local【本地模式】,Spark会先检查本地IP。
2.其次,Spark会检测是否配置本地Hadoop及相关log4j等配置,配置会优先加载用户指定的Hadoop,无配置则使用自带的默认Hadoop.
3.基本信息检查完之后,开始启动Spark任务,向服务器注册该任务,启动可视化组件acls,开启服务sparkDriver
4.Spark开始注册任务调度器和资源管理器
5.创建本地临时目录,根据缓存模式缓存数据
6.SparkUI开启成功
7.开启Spark自带的netty web服务器
8.执行计算
9.执行成功,关闭SparkUI、任务调度器、资源管理器
四.源码分析
1.创建SparkSession程序执行入口
val spark = SparkSession.builder.appName("Spark Pi").master("local[2]").getOrCreate()
该程序首先调用对象SparkSession,指定应用的名称,运行方式【集群or单机】以及一些类如使用内存大小,核数等配置。在这个过程中会检测IP【仅限单机模式】和Hadoop配置。对应日志中的1、2、3。
源码如下:
1 object SparkSession extends Logging { 2 3 /** 4 * Builder for [[SparkSession]]. 5 */ 6 @InterfaceStability.Stable 7 class Builder extends Logging { 8 9 private[this] val options = new scala.collection.mutable.HashMap[String, String] 10 11 private[this] val extensions = new SparkSessionExtensions 12 13 private[this] var userSuppliedContext: Option[SparkContext] = None 14 15 private[spark] def sparkContext(sparkContext: SparkContext): Builder = synchronized { 16 userSuppliedContext = Option(sparkContext) 17 this 18 } 19 20 /** 21 * Sets a name for the application, which will be shown in the Spark web UI. 22 * If no application name is set, a randomly generated name will be used. 23 * 24 * @since 2.0.0 25 */ 26 def appName(name: String): Builder = config("spark.app.name", name) 27 28 /** 29 * Sets a config option. Options set using this method are automatically propagated to 30 * both `SparkConf` and SparkSession's own configuration. 31 * 32 * @since 2.0.0 33 */ 34 def config(key: String, value: String): Builder = synchronized { 35 options += key -> value 36 this 37 } 38 39 /** 40 * Sets the Spark master URL to connect to, such as "local" to run locally, "local[4]" to 41 * run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster. 42 * 43 * @since 2.0.0 44 */ 45 def master(master: String): Builder = config("spark.master", master) 46 47 /** 48 * Enables Hive support, including connectivity to a persistent Hive metastore, support for 49 * Hive serdes, and Hive user-defined functions. 50 * 51 * @since 2.0.0 52 */ 53 def enableHiveSupport(): Builder = synchronized { 54 if (hiveClassesArePresent) { 55 config(CATALOG_IMPLEMENTATION.key, "hive") 56 } else { 57 throw new IllegalArgumentException( 58 "Unable to instantiate SparkSession with Hive support because " + 59 "Hive classes are not found.") 60 } 61 } 62 63 /** 64 * Gets an existing [[SparkSession]] or, if there is no existing one, creates a new 65 * one based on the options set in this builder. 66 * 67 * This method first checks whether there is a valid thread-local SparkSession, 68 * and if yes, return that one. It then checks whether there is a valid global 69 * default SparkSession, and if yes, return that one. If no valid global default 70 * SparkSession exists, the method creates a new SparkSession and assigns the 71 * newly created SparkSession as the global default. 72 * 73 * In case an existing SparkSession is returned, the config options specified in 74 * this builder will be applied to the existing SparkSession. 75 * 76 * @since 2.0.0 77 */ 78 def getOrCreate(): SparkSession = synchronized { 79 assertOnDriver() 80 // Get the session from current thread's active session. 81 var session = activeThreadSession.get() 82 if ((session ne null) && !session.sparkContext.isStopped) { 83 options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } 84 if (options.nonEmpty) { 85 logWarning("Using an existing SparkSession; some configuration may not take effect.") 86 } 87 return session 88 } 89 90 // Global synchronization so we will only set the default session once. 91 SparkSession.synchronized { 92 // If the current thread does not have an active session, get it from the global session. 93 session = defaultSession.get() 94 if ((session ne null) && !session.sparkContext.isStopped) { 95 options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } 96 if (options.nonEmpty) { 97 logWarning("Using an existing SparkSession; some configuration may not take effect.") 98 } 99 return session 100 } 101 102 // No active nor global default session. Create a new one. 103 val sparkContext = userSuppliedContext.getOrElse { 104 val sparkConf = new SparkConf() 105 options.foreach { case (k, v) => sparkConf.set(k, v) } 106 107 // set a random app name if not given. 108 if (!sparkConf.contains("spark.app.name")) { 109 sparkConf.setAppName(java.util.UUID.randomUUID().toString) 110 } 111 112 SparkContext.getOrCreate(sparkConf) 113 // Do not update `SparkConf` for existing `SparkContext`, as it's shared by all sessions. 114 } 115 116 // Initialize extensions if the user has defined a configurator class. 117 val extensionConfOption = sparkContext.conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS) 118 if (extensionConfOption.isDefined) { 119 val extensionConfClassName = extensionConfOption.get 120 try { 121 val extensionConfClass = Utils.classForName(extensionConfClassName) 122 val extensionConf = extensionConfClass.newInstance() 123 .asInstanceOf[SparkSessionExtensions => Unit] 124 extensionConf(extensions) 125 } catch { 126 // Ignore the error if we cannot find the class or when the class has the wrong type. 127 case e @ (_: ClassCastException | 128 _: ClassNotFoundException | 129 _: NoClassDefFoundError) => 130 logWarning(s"Cannot use $extensionConfClassName to configure session extensions.", e) 131 } 132 } 133 134 session = new SparkSession(sparkContext, None, None, extensions) 135 options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) } 136 setDefaultSession(session) 137 setActiveSession(session) 138 139 // Register a successfully instantiated context to the singleton. This should be at the 140 // end of the class definition so that the singleton is updated only if there is no 141 // exception in the construction of the instance. 142 sparkContext.addSparkListener(new SparkListener { 143 override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { 144 defaultSession.set(null) 145 } 146 }) 147 } 148 149 return session 150 } 151 } 152 }
2.程序计算逻辑执行
val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y <= 1) 1 else 0
}.reduce(_ + _)
首先,程序调用SparkContext对象的parallelize函数,把数据转换为RDD并执行计算。对应日志中的步骤8。
源码如下:
1 /** Distribute a local Scala collection to form an RDD. 2 * 3 * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call 4 * to parallelize and before the first action on the RDD, the resultant RDD will reflect the 5 * modified collection. Pass a copy of the argument to avoid this. 6 * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an 7 * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions. 8 * @param seq Scala collection to distribute 9 * @param numSlices number of partitions to divide the collection into 10 * @return RDD representing distributed collection 11 */ 12 def parallelize[T: ClassTag]( 13 seq: Seq[T], 14 numSlices: Int = defaultParallelism): RDD[T] = withScope { 15 assertNotStopped() 16 new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) 17 }
其中,比较重要的调用是withScope,该函数可以实现执行传入的函数体,以使在该主体中创建的所有RDD具有相同的作用域。
源码如下:
1 /** 2 * Execute the given body such that all RDDs created in this body will have the same scope. 3 * The name of the scope will be the first method name in the stack trace that is not the 4 * same as this method's. 5 * 6 * Note: Return statements are NOT allowed in body. 7 */ 8 private[spark] def withScope[T]( 9 sc: SparkContext, 10 allowNesting: Boolean = false)(body: => T): T = { 11 val ourMethodName = "withScope" 12 val callerMethodName = Thread.currentThread.getStackTrace() 13 .dropWhile(_.getMethodName != ourMethodName) 14 .find(_.getMethodName != ourMethodName) 15 .map(_.getMethodName) 16 .getOrElse { 17 // Log a warning just in case, but this should almost certainly never happen 18 logWarning("No valid method name for this RDD operation scope!") 19 "N/A" 20 } 21 withScope[T](sc, callerMethodName, allowNesting, ignoreParent = false)(body) 22 } 23 24 /** 25 * Execute the given body such that all RDDs created in this body will have the same scope. 26 * 27 * If nesting is allowed, any subsequent calls to this method in the given body will instantiate 28 * child scopes that are nested within our scope. Otherwise, these calls will take no effect. 29 * 30 * Additionally, the caller of this method may optionally ignore the configurations and scopes 31 * set by the higher level caller. In this case, this method will ignore the parent caller's 32 * intention to disallow nesting, and the new scope instantiated will not have a parent. This 33 * is useful for scoping physical operations in Spark SQL, for instance. 34 * 35 * Note: Return statements are NOT allowed in body. 36 */ 37 private[spark] def withScope[T]( 38 sc: SparkContext, 39 name: String, 40 allowNesting: Boolean, 41 ignoreParent: Boolean)(body: => T): T = { 42 // Save the old scope to restore it later 43 val scopeKey = SparkContext.RDD_SCOPE_KEY 44 val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY 45 val oldScopeJson = sc.getLocalProperty(scopeKey) 46 val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson) 47 val oldNoOverride = sc.getLocalProperty(noOverrideKey) 48 try { 49 if (ignoreParent) { 50 // Ignore all parent settings and scopes and start afresh with our own root scope 51 sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson) 52 } else if (sc.getLocalProperty(noOverrideKey) == null) { 53 // Otherwise, set the scope only if the higher level caller allows us to do so 54 sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson) 55 } 56 // Optionally disallow the child body to override our scope 57 if (!allowNesting) { 58 sc.setLocalProperty(noOverrideKey, "true") 59 } 60 body 61 } finally { 62 // Remember to restore any state that was modified before exiting 63 sc.setLocalProperty(scopeKey, oldScopeJson) 64 sc.setLocalProperty(noOverrideKey, oldNoOverride) 65 } 66 }