Spark使用Akka作为各种功能和组件之间的通信工具。同样,在资源调度过程中也使用其作为消息传递系统。之前,在分析了Apache Spark-1.0.0资源调度过程中,明确了主要消息的传递过程和引起的相关动作,本文主要分析Spark资源调度过程中所用到的Akka通信的初始化过程。
@volatile private[spark] var dagScheduler: DAGScheduler = _ try { dagScheduler = new DAGScheduler(this) } catch { case e: Exception => throw new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage)) }
private val dagSchedulerActorSupervisor = env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this)))
private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) extends Actor with Logging { override val supervisorStrategy = OneForOneStrategy() { case x: Exception => logError("eventProcesserActor failed due to the error %s; shutting down SparkContext" .format(x.getMessage)) dagScheduler.doCancelAllJobs() Stop } def receive = { case p: Props => sender ! context.actorOf(p) case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor") } }
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf,
securityManager = securityManager)
/** * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the * ActorSystem itself and its port (which is hard to get from Akka). * * Note: the `name` parameter is important, as even if a client sends a message to right * host + port, if the system name is incorrect, Akka will drop the message. * * If indestructible is set to true, the Actor System will continue running in the event * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]]. */ def createActorSystem(name: String, host: String, port: Int, conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = { val akkaThreads = conf.getInt("spark.akka.threads", 4) val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15) val akkaTimeout = conf.getInt("spark.akka.timeout", 100) val akkaFrameSize = maxFrameSizeBytes(conf) val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false) val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" if (!akkaLogLifecycleEvents) { // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent. // See: Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL)) } val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off" val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 600) val akkaFailureDetector = conf.getDouble("spark.akka.failure-detector.threshold", 300.0) val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000) val secretKey = securityManager.getSecretKey() val isAuthOn = securityManager.isAuthenticationEnabled() if (isAuthOn && secretKey == null) { throw new Exception("Secret key is null with authentication on") } val requireCookie = if (isAuthOn) "on" else "off" val secureCookie = if (isAuthOn) secretKey else "" logDebug("In createActorSystem, requireCookie is: " + requireCookie) val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback( ConfigFactory.parseString( s""" |akka.daemonic = on |akka.loggers = [""akka.event.slf4j.Slf4jLogger""] |akka.stdout-loglevel = "ERROR" |akka.jvm-exit-on-fatal-error = off |akka.remote.require-cookie = "$requireCookie" | = "$secureCookie" |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector | = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = $port |akka.remote.netty.tcp.tcp-nodelay = on |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B |akka.remote.netty.tcp.execution-pool-size = $akkaThreads | = $akkaBatchSize |akka.log-config-on-start = $logAkkaConfig |akka.remote.log-remote-lifecycle-events = $lifecycleEvents |akka.log-dead-letters = $lifecycleEvents |akka.log-dead-letters-during-shutdown = $lifecycleEvents """.stripMargin)) val actorSystem = ActorSystem(name, akkaConf) val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider val boundPort = provider.getDefaultAddress.port.get (actorSystem, boundPort) }
private[scheduler] var eventProcessActor: ActorRef = _ private def initializeEventProcessActor() { // blocking the thread until supervisor is started, which ensures eventProcessActor is // not null before any job is submitted implicit val timeout = Timeout(30 seconds) val initEventActorReply = dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this)) eventProcessActor = Await.result(initEventActorReply, timeout.duration). asInstanceOf[ActorRef] } initializeEventProcessActor()
def receive = { case p: Props => sender ! context.actorOf(p) case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor") }
private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler) extends Actor with Logging { override def preStart() { // set DAGScheduler for taskScheduler to ensure eventProcessActor is always // valid when the messages arrive dagScheduler.taskScheduler.setDAGScheduler(dagScheduler) } /** * The main event loop of the DAG scheduler. */ def receive = { case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) case StageCancelled(stageId) => dagScheduler.handleStageCancellation(stageId) case JobCancelled(jobId) => dagScheduler.handleJobCancellation(jobId) case JobGroupCancelled(groupId) => dagScheduler.handleJobGroupCancelled(groupId) case AllJobsCancelled => dagScheduler.doCancelAllJobs() case ExecutorAdded(execId, host) => dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId) => dagScheduler.handleExecutorLost(execId) case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => dagScheduler.handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason) => dagScheduler.handleTaskSetFailed(taskSet, reason) case ResubmitFailedStages => dagScheduler.resubmitFailedStages() } override def postStop() { // Cancel any active jobs in postStop hook dagScheduler.cleanUpAfterSchedulerStop() } }
(II)Task相关(CoarseGrainedSchedulerBackend.scala + CoarseGrainedExecutorBackend.scala)
CoarseGrainedSchedulerBackend中,应用actorSystem.actorOfchuagnjian 实例化DriverActor,命名为“CoarseGrainedScheduler”
override def start() { val properties = new ArrayBuffer[(String, String)] for ((key, value) <- { if (key.startsWith("spark.")) { properties += ((key, value)) } } // TODO (prashant) send conf instead of properties driverActor = actorSystem.actorOf( Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME) }
private val executorActor = new HashMap[String, ActorRef]
def run(driverUrl: String, executorId: String, hostname: String, cores: Int, workerUrl: Option[String]) { SparkHadoopUtil.get.runAsSparkUser { () => // Debug code Utils.checkHost(hostname) val conf = new SparkConf // Create a new ActorSystem to run the backend, because we can't create a // SparkEnv / Executor before getting started with all our system properties, etc val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0, conf, new SecurityManager(conf)) // set it val sparkHostPort = hostname + ":" + boundPort actorSystem.actorOf( Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores), name = "Executor") workerUrl.foreach { url => actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") } actorSystem.awaitTermination() } }
override def preStart() { logInfo("Connecting to driver: " + driverUrl) driver = context.actorSelection(driverUrl) driver ! RegisterExecutor(executorId, hostPort, cores) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) }
def receive = { case RegisterExecutor(executorId, hostPort, cores) => Utils.checkHostPort(hostPort, "Host port expected " + hostPort) if (executorActor.contains(executorId)) { sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId) } else { logInfo("Registered executor: " + sender + " with ID " + executorId) sender ! RegisteredExecutor(sparkProperties) executorActor(executorId) = sender executorHost(executorId) = Utils.parseHostPort(hostPort)._1 totalCores(executorId) = cores freeCores(executorId) = cores executorAddress(executorId) = sender.path.address addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) makeOffers() } case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { if (executorActor.contains(executorId)) { freeCores(executorId) += scheduler.CPUS_PER_TASK makeOffers(executorId) } else { // Ignoring the update since we don't know about the executor. val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s" logWarning(msg.format(taskId, state, sender, executorId)) } } case ReviveOffers => makeOffers() case KillTask(taskId, executorId, interruptThread) => executorActor(executorId) ! KillTask(taskId, executorId, interruptThread) case StopDriver => sender ! true context.stop(self) case StopExecutors => logInfo("Asking each executor to shut down") for (executor <- executorActor.values) { executor ! StopExecutor } sender ! true case RemoveExecutor(executorId, reason) => removeExecutor(executorId, reason) sender ! true case DisassociatedEvent(_, address, _) => addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated")) }
case RegisterExecutor(executorId, hostPort, cores) => Utils.checkHostPort(hostPort, "Host port expected " + hostPort) if (executorActor.contains(executorId)) { sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId) } else { logInfo("Registered executor: " + sender + " with ID " + executorId) sender ! RegisteredExecutor(sparkProperties) executorActor(executorId) = sender executorHost(executorId) = Utils.parseHostPort(hostPort)._1 totalCores(executorId) = cores freeCores(executorId) = cores executorAddress(executorId) = sender.path.address addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) makeOffers() }
override def receive = { case RegisteredExecutor(sparkProperties) => logInfo("Successfully registered with driver") // Make this host instead of hostPort ? executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties, false) case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) System.exit(1) case LaunchTask(taskDesc) => logInfo("Got assigned task " + taskDesc.taskId) if (executor == null) { logError("Received LaunchTask command but executor was null") System.exit(1) } else { executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) } case KillTask(taskId, _, interruptThread) => if (executor == null) { logError("Received KillTask command but executor was null") System.exit(1) } else { executor.killTask(taskId, interruptThread) } case x: DisassociatedEvent => logError(s"Driver $x disassociated! Shutting down.") System.exit(1) case StopExecutor => logInfo("Driver commanded a shutdown") context.stop(self) context.system.shutdown() }
Messages are sent to an Actor through one of the following methods. ! means “fire-and-forget”, e.g. send a message asynchronously and return immediately. Also known as tell. ? sends a message asynchronously and returns a Future representing a possible reply. Also known as ask.