zoukankan      html  css  js  c++  java
  • SparkContext的初始化(伯篇)——运行环境与元数据清理器

    《深入理解Spark:核心思想与源代码分析》一书前言的内容请看链接《深入理解SPARK:核心思想与源代码分析》一书正式出版上市

    《深入理解Spark:核心思想与源代码分析》一书第一章的内容请看链接《第1章 环境准备》

    《深入理解Spark:核心思想与源代码分析》一书第二章的内容请看链接《第2章 Spark设计理念与基本架构》

    因为本书的第3章内容较多,所以打算分别开辟四篇随笔分别展现。

    本文展现第3章第一部分的内容:


    第3章 SparkContext的初始化


    “道生一。一生二。二生三。三生万物。

    ”——《道德经》

    本章导读:

           SparkContext的初始化是Driver应用程序提交运行的前提。本章内容以local模式为主,并依照代码运行顺序解说,这将有助于首次接触Spark的读者理解源代码。

    读者朋友假设能边跟踪代码,边学习本章内容,或许是高速理解SparkContext初始化过程的便捷途径。已经熟练使用Spark的开发者能够选择跳过本章内容。

           本章将在介绍SparkContext初始化过程的同一时候,向读者介绍各个组件的作用,为阅读后面的章节打好基础。Spark中的组件非常多,就其功能而言涉及到网络通信、分布式、消息、存储、计算、缓存、測量、清理、文件服务、Web UI的方方面面。

     

    3.1 SparkContext概述

            Spark Driver用于提交用户应用程序,实际能够看作Spark的client。了解Spark Driver的初始化。有助于读者理解用户应用程序在client的处理过程。

            Spark Driver的初始化始终环绕着SparkContext的初始化。

    SparkContext能够算得上是全部Spark应用程序的发动机引擎,轿车要想跑起来。发动机首先要启动。SparkContext初始化完成。才干向Spark集群提交任务。在平坦的公路上,发动机仅仅需以较低的转速。较低的功率就能够游刃有余。在山区,你可能须要一台能够提供大功率的发动机,这样才干满足你转山的体验。这些參数都是通过驾驶员操作油门、档位等传送给发动机的,而SparkContext的配置參数则由SparkConf负责,SparkConf就是你的操作面板。

    SparkConf的构造非常easy。主要是通过ConcurrentHashMap来维护各种Spark的配置属性。

    SparkConf代码结构见代码清单3-1。

    Spark的配置属性都是以“spark.”开头的字符串。

    代码清单3-1  SparkConf代码结构

    class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
      importSparkConf._
      def this()= this(true)
      private val settings = newConcurrentHashMap[String, String]()
      if(loadDefaults) {
        // 载入不论什么以spark.开头的系统属性
        for ((key, value) <-Utils.getSystemProperties if key.startsWith("spark.")) {
          set(key, value)
        }
      }
    //其余代码省略

    如今開始介绍SparkContext。SparkContext的初始化过程例如以下:

    1)        创建Spark运行环境SparkEnv。

    2)        创建RDD清理器metadataCleaner。

    3)        创建并初始化SparkUI;

    4)        Hadoop相关配置及Executor环境变量的设置

    5)        创建任务调度TaskScheduler;

    6)        创建和启动DAGScheduler。

    7)        TaskScheduler的启动。

    8)        初始化块管理器BlockManager(BlockManager是存储体系的主要组件之中的一个,将在第4章介绍)。

    9)        启动測量系统MetricsSystem;

    10)     创建和启动Executor分配管理器ExecutorAllocationManager;

    11)     ContextCleaner的创建与启动;

    12)     Spark环境更新;

    13)     创建DAGSchedulerSource和BlockManagerSource。

    14)     将SparkContext标记为激活。

    SparkContext的主构造器參数为SparkConf。事实上现例如以下。

    class SparkContext(config: SparkConf) extends Logging withExecutorAllocationClient {
    private val creationSite: CallSite = Utils.getCallSite()
      private val allowMultipleContexts:Boolean =
       config.getBoolean("spark.driver.allowMultipleContexts", false)
     SparkContext.markPartiallyConstructed(this, allowMultipleContexts)

    上面代码中的CallSite存储了线程栈中最靠近栈顶的用户类及最靠近栈底的Scala或者Spark核心类信息。Utils.getCallSite的具体信息见附录A。

    SparkContext默认仅仅有一个实例(由属性spark.driver.allowMultipleContexts来控制。用户须要多个SparkContext实例时。能够将其设置为true),方法markPartiallyConstructed用来确保实例的唯一性。并将当前SparkContext标记为正在构建中。

             接下来会对SparkConf进行拷贝,然后对各种配置信息进行校验。代码例如以下。

      private[spark] val conf =config.clone()
      conf.validateSettings()
     
      if (!conf.contains("spark.master")) {
        throw newSparkException("A master URL must be set in your configuration")
      }
      if (!conf.contains("spark.app.name")) {
        throw newSparkException("An application name must be set in yourconfiguration")
      }

    从上面校验的代码看到必须指定属性spark.master 和spark.app.name,否则会抛出异常,结束初始化过程。

    spark.master用于设置部署模式。spark.app.name指定应用程序名称。

    3.2 创建运行环境SparkEnv

           SparkEnv是Spark的运行环境对象。当中包含众多与Executor运行相关的对象。因为在local模式下Driver会创建Executor,local-cluster部署模式或者Standalone部署模式下Worker另起的CoarseGrainedExecutorBackend进程中也会创建Executor。所以SparkEnv存在于Driver或者CoarseGrainedExecutorBackend进程中。创建SparkEnv 主要使用SparkEnv的createDriverEnvcreateDriverEnv方法有三个參数。conf、isLocal和 listenerBus。

      val isLocal = (master == "local" ||master.startsWith("local["))
      private[spark] vallistenerBus = newLiveListenerBus
      conf.set("spark.executor.id","driver")
     
      private[spark] valenv =SparkEnv.createDriverEnv(conf,isLocal, listenerBus)
     SparkEnv.set(env)

    上面代码中的conf是对SparkConf的拷贝。isLocal标识是否是单机模式,listenerBus採用监听器模式维护各类事件的处理,在3.14节会具体介绍。

    SparkEnv的方法createDriverEnv终于调用create创建SparkEnv。SparkEnv的构造过程例如以下:

    1)        创建安全管理器SecurityManager;

    2)        创建基于Akka的分布式消息系统ActorSystem;

    3)        创建Map任务输出跟踪器mapOutputTracker。

    4)        实例化ShuffleManager;

    5)        创建ShuffleMemoryManager。

    6)        创建块传输服务BlockTransferService;

    7)        创建BlockManagerMaster;

    8)        创建块管理器BlockManager;

    9)        创建广播管理器BroadcastManager;

    10)    创建缓存管理器CacheManager。

    11)    创建HTTP文件serverHttpFileServer;

    12)    创建測量系统MetricsSystem;

    13)    创建SparkEnv。

     

    3.2.1 安全管理器SecurityManager

             SecurityManager主要对权限、账号进行设置,假设使用Hadoop YARN作为集群管理器。则须要使用证书生成 secret key登录,最后给当前系统设置默认的口令认证实例。此实例採用匿名内部类实现,參见代码清单3-2。

    代码清单3-2  SecurityManager的实现

     private val secretKey =generateSecretKey()
     
      // 使用HTTP连接设置口令认证
      if (authOn) {
       Authenticator.setDefault(
          newAuthenticator() {
            override defgetPasswordAuthentication(): PasswordAuthentication = {
              var passAuth:PasswordAuthentication = null
              val userInfo =getRequestingURL().getUserInfo()
              if (userInfo !=null) {
                val  parts = userInfo.split(":",2)
                passAuth = newPasswordAuthentication(parts(0),parts(1).toCharArray())
              }
              return passAuth
            }
          }
        )
      }<span style="font-family: Arial, Helvetica, sans-serif; background-color: rgb(255, 255, 255);"> </span>

    3.2.2 基于Akka的分布式消息系统ActorSystem

             ActorSystem是Spark中最基础的设施,Spark既使用它发送分布式消息。又用它实现并发编程。怎么,消息系统能够实现并发?要解释清楚这个问题,首先应该简单的介绍下Scala语言的Actor并发编程模型:Scala觉得Java线程通过共享数据以及通过锁来维护共享数据的一致性是糟糕的做法,easy引起锁的争用,并且线程的上下文切换会带来不少开销,减少并发程序的性能。甚至会引入死锁的问题。在Scala中仅仅须要自己定义类型继承Actor,并且提供act方法,就如同Java里实现Runnable接口。须要实现run方法一样。可是不能直接调用act方法。而是通过发送消息的方式(Scala发送消息是异步的),传递数据。如:

             Actor ! message

             Akka是Actor编程模型的高级类库,类似于JDK 1.5之后越来越丰富的并发工具包,简化了程序猿并发编程的难度。

    ActorSystem便是Akka提供的用于创建分布式消息通信系统的基础类。Akka的详细信息见附录B。

             正式由于Actor轻量级的并发编程、消息发送以及ActorSystem支持分布式消息发送等特点,Spark选择了ActorSystem。

             SparkEnv中创建ActorSystem时用到了AkkaUtils工具类。见代码清单3-3。AkkaUtils.createActorSystem方法用于启动ActorSystem。见代码清单3-4。AkkaUtils使用了Utils的静态方法startServiceOnPort, startServiceOnPort终于会回调方法startService: Int => (T, Int),此处的startService实际是方法doCreateActorSystem。真正启动ActorSystem是由doCreateActorSystem方法完毕的。doCreateActorSystem的详细实现细节请见附录B。Spark的Driver中Akka的默认訪问地址是akka://sparkDriver,Spark的Executor中Akka的默认訪问地址是akka://sparkExecutor。假设不指定ActorSystem的port,那么全部节点的ActorSystemport在每次启动时随机产生。关于startServiceOnPort的实现,请见附录A。

    代码清单3-3  使用AkkaUtils工具类创建和启动[计算机3] [初霖4] ActorSystem

        val(actorSystem, boundPort) =
         Option(defaultActorSystem) match {
            case Some(as)=> (as, port)
            case None =>
              valactorSystemName =if (isDriver) driverActorSystemNameelse executorActorSystemName
             AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf,securityManager)
          }

    代码清单3-4  ActorSystem的创建和启动

      def createActorSystem(
          name:String,
          host:String,
          port:Int,
          conf:SparkConf,
          securityManager: SecurityManager):(ActorSystem, Int) = {
        val startService: Int=> (ActorSystem, Int) = { actualPort =>
         doCreateActorSystem(name, host, actualPort, conf, securityManager)
        }
       Utils.startServiceOnPort(port, startService, conf, name)
      }

    3.2.3 map任务输出跟踪器mapOutputTracker

             mapOutputTracker用于跟踪map阶段任务的输出状态,此状态便于reduce阶段任务获取地址及中间输出结果。每一个map任务或者reduce任务都会有其唯一标识。分别为mapId和reduceId。每一个reduce任务的输入可能是多个map任务的输出,reduce会到各个map任务的所在节点上拉取Block,这一过程叫做shuffle。每批shuffle过程都有唯一的标识shuffleId。

             这里先介绍下MapOutputTrackerMaster。MapOutputTrackerMaster内部使用mapStatuses:TimeStampedHashMap[Int,Array[MapStatus]]来维护跟踪各个map任务的输出状态。当中key相应shuffleId,Array存储各个map任务相应的状态信息MapStatus。因为MapStatus维护了map输出Block的地址BlockManagerId,所以reduce任务知道从何处获取map任务的中间输出。MapOutputTrackerMaster还使用cachedSerializedStatuses:TimeStampedHashMap[Int, Array[Byte]]维护序列化后的各个map任务的输出状态。当中key相应shuffleId。Array存储各个序列化MapStatus生成的字节数组。

             Driver和Executor处理MapOutputTrackerMaster的方式有所不同:

    • 假设当前应用程序是Driver,则创建MapOutputTrackerMaster,然后创建MapOutputTrackerMasterActor,而且注冊到ActorSystem中。
    • 假设当前应用程序是Executor。则创建MapOutputTrackerWorker。并从ActorSystem中找到MapOutputTrackerMasterActor。

    不管是Driver还是Executor,最后都由mapOutputTracker的属性trackerActor持有MapOutputTrackerMasterActor的引用,參见代码清单3-5。

    代码清单3-5  registerOrLookup方法用于查找或者注冊Actor的实现

    def registerOrLookup(name: String, newActor: => Actor): ActorRef ={
          if (isDriver) {
           logInfo("Registering" + name)
            actorSystem.actorOf(Props(newActor),name = name)
          } else {
           AkkaUtils.makeDriverRef(name, conf, actorSystem)
          }
        }
     
        val mapOutputTracker=  if (isDriver) {
          newMapOutputTrackerMaster(conf)
        } else {
          newMapOutputTrackerWorker(conf)
    }
     
        mapOutputTracker.trackerActor= registerOrLookup(
         "MapOutputTracker",
          newMapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

    在后面章节大家会知道map任务的状态正是由Executor向持有的MapOutputTrackerMasterActor发送消息,将map任务状态同步到mapOutputTracker的mapStatuses和cachedSerializedStatuses的。Executor到底是怎样找到MapOutputTrackerMasterActor的?registerOrLookup方法通过调用AkkaUtils.makeDriverRef找到MapOutputTrackerMasterActor,实际正是利用ActorSystem提供的分布式消息机制实现的。详细细节參见附录B。这里第一次使用到了Akka提供的功能,以后大家会渐渐感觉到使用Akka的便捷。

    3.2.4 实例化ShuffleManager

             ShuffleManager负责管理本地及远程的block数据的shuffle操作。ShuffleManager默觉得通过反射方式生成的SortShuffleManager的实例,能够改动属性spark.shuffle.manager为hash来显式[计算机5] [初霖6] 使用HashShuffleManager。SortShuffleManager通过持有的IndexShuffleBlockManager间接操作BlockManager中的DiskBlockManager将map结果写入本地。并依据shuffleId、mapId写入索引文件。也能通过MapOutputTrackerMaster中维护的mapStatuses从本地或者其它远程节点读取文件。有读者可能会问,为什么须要shuffle?Spark作为并行计算框架,同一个作业会被划分为多个任务在多个节点上并行运行。reduce的输入可能存在于多个节点上,因此须要通过“洗牌”将全部reduce的输入汇总起来,这个过程就是shuffle。这个问题以及对ShuffleManager的详细使用会在第5章和第6章详述。ShuffleManager的实例化见代码清单3-6。

    代码清单3-6最后创建的ShuffleMemoryManager。将在3.2.5节介绍。

    代码清单3-6  ShuffleManager的实例化及ShuffleMemoryManager的创建

        val shortShuffleMgrNames =Map(
          "hash"-> "org.apache.spark.shuffle.hash.HashShuffleManager",
          "sort"-> "org.apache.spark.shuffle.sort.SortShuffleManager")
        val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
        val shuffleMgrClass = shortShuffleMgrNames.get
    OrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
        val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
     
        val shuffleMemoryManager =new ShuffleMemoryManager(conf)
     

    3.2.5 shuffle线程内存管理器ShuffleMemoryManager

             ShuffleMemoryManager负责管理shuffle线程占有内存的分配与释放。并通过threadMemory:mutable.HashMap[Long, Long]缓存每一个线程的内存字节数,见代码清单3-7。

    代码清单3-7  ShuffleMemoryManager的数据结构

    private[spark] class ShuffleMemoryManager(maxMemory: Long)extends Logging {
      private val threadMemory = newmutable.HashMap[Long, Long]() // threadId -> memory bytes
      def this(conf: SparkConf) = this(ShuffleMemoryManager.getMaxMemory(conf))
     

    getMaxMemory方法用于获取shuffle全部线程占用的最大内存,实现例如以下。

    def getMaxMemory(conf: SparkConf): Long = {
        val memoryFraction =conf.getDouble("spark.shuffle.memoryFraction", 0.2)
        val safetyFraction =conf.getDouble("spark.shuffle.safetyFraction", 0.8)
       (Runtime.getRuntime.maxMemory * memoryFraction *safetyFraction).toLong
      }

    从上面代码能够看出,shuffle全部线程占用的最大内存的计算公式为:

    Java执行时最大内存 * Spark的shuffle最大内存占比 * Spark的安全内存占比

    能够配置属性spark.shuffle.memoryFraction改动Spark的shuffle最大内存占比,配置属性spark.shuffle.safetyFraction改动Spark的安全内存占比。

    注意:ShuffleMemoryManager通常执行在Executor中, Driver中的ShuffleMemoryManager 仅仅有在local模式下才起作用。

     

    3.2.6 块传输服务BlockTransferService

             BlockTransferService默觉得NettyBlockTransferService(能够配置属性spark.shuffle.blockTransferService使用NioBlockTransferService),它使用Netty提供的异步事件驱动的网络应用框架,提供web服务及client。获取远程节点上Block的集合。

    val blockTransferService =
         conf.get("spark.shuffle.blockTransferService","netty").toLowerCase match {
            case "netty"=>
              newNettyBlockTransferService(conf, securityManager, numUsableCores)
            case "nio"=>
              newNioBlockTransferService(conf, securityManager)
          }

    NettyBlockTransferService的具体实现将在第4章具体介绍。

    这里大家可能认为奇怪,这种网络应用为何也要放在存储体系?大家最好还是先带着疑问,直到你真正了解存储体系。

    3.2.7 BlockManagerMaster介绍

    BlockManagerMaster负责对Block的管理和协调,详细操作依赖于BlockManagerMasterActor。Driver和Executor处理BlockManagerMaster的方式不同:

    • 假设当前应用程序是Driver,则创建BlockManagerMasterActor,而且注冊到ActorSystem中。
    • 假设当前应用程序是Executor,则从ActorSystem中找到BlockManagerMasterActor。

    不管是Driver还是Executor,最后BlockManagerMaster的属性driverActor将持有对BlockManagerMasterActor的引用。

    BlockManagerMaster的创建代码例如以下。

    val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
         "BlockManagerMaster",
          newBlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)

    registerOrLookup已在3.2.3节介绍过了,不再赘述。

    BlockManagerMaster及BlockManagerMasterActor的具体实现将在第4章具体介绍。

    3.2.8 创建块管理器BlockManager

             BlockManager负责对Block的管理,仅仅有在BlockManager的初始化方法initialize被调用后,它才是有效的。BlockManager作为存储系统的一部分,详细实现见第4章。BlockManager的创建代码例如以下。

    val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
         serializer, conf, mapOutputTracker, shuffleManager,blockTransferService, securityManager,
          numUsableCores)

    3.2.9 创建广播管理器BroadcastManager

             BroadcastManager用于将配置信息和序列化后的RDD、Job以及ShuffleDependency等信息在本地存储。

    假设为了容灾,也会拷贝到其它节点上。创建BroadcastManager的代码实现例如以下。

    val broadcastManager = new BroadcastManager(isDriver, conf,securityManager)

    BroadcastManager必须在其初始化方法initialize被调用后,才干生效。Initialize方法实际利用反射生成广播工厂实例broadcastFactory(能够配置属性spark.broadcast.factory指定,默觉得org.apache.spark.broadcast.TorrentBroadcastFactory)。BroadcastManager的广播方法newBroadcast实际代理了工厂broadcastFactory的newBroadcast方法来生成广播或者非广播对象。BroadcastManager的Initialize及newBroadcast方法见代码清单3-8。

    代码清单3-8  BroadcastManager的实现

      private def initialize(){
       synchronized {
          if(!initialized) {
            val broadcastFactoryClass = conf.get("spark.broadcast.factory","org.apache.spark.broadcast.TorrentBroadcastFactory")
           broadcastFactory =
             Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
           broadcastFactory.initialize(isDriver, conf, securityManager)
           initialized = true
          }
        }
      }
     
      private val nextBroadcastId = new AtomicLong(0)
     
      defnewBroadcast[T: ClassTag](value_ : T, isLocal: Boolean) = {
       broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
      }
     
      defunbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
       broadcastFactory.unbroadcast(id, removeFromDriver, blocking)
      }
    }<span style="font-family: Arial, Helvetica, sans-serif; background-color: rgb(255, 255, 255);"> </span>

    3.2.10 创建缓存管理器CacheManager

             CacheManager用于缓存RDD某个分区计算后中间结果,缓存计算结果发生在迭代计算的时候。将在6.1节讲到。而CacheManager将在4.14节具体描写叙述。

    创建CacheManager的代码例如以下。

    val cacheManager = new CacheManager(blockManager)

    3.2.11 HTTP文件serverHttpFileServer

             參见代码清单3-9。HttpFileServer主要提供对jar及其它文件的http訪问。这些jar包包含用户上传的jar包。port由属性spark.fileserver.port配置。默觉得0,表示随机生成port号。

    代码清单3-9  HttpFileServer的创建

      val httpFileServer =
          if (isDriver) {
            val fileServerPort = conf.getInt("spark.fileserver.port",0)
            val server = newHttpFileServer(conf, securityManager, fileServerPort)
           server.initialize()
           conf.set("spark.fileserver.uri",  server.serverUri)
           server
          } else {
            null
          }

    HttpFileServer的初始化过程,见代码清单3-10,主要包含下面步骤:

    1)        使用Utils工具类创建文件server的根文件夹及暂时文件夹(暂时文件夹在执行时环境关闭时会删除)。

    Utils工具的具体介绍,见附录A。

    2)        创建存放jar包及其它文件的文件文件夹。

    3)        创建并启动HTTP服务。

    代码清单3-10         HttpFileServer的初始化

      def initialize(){
        baseDir= Utils.createTempDir(Utils.getLocalDir(conf), "httpd")
        fileDir= new File(baseDir,"files")
        jarDir =new File(baseDir,"jars")
       fileDir.mkdir()
       jarDir.mkdir()
       logInfo("HTTP File server directory is " + baseDir)
       httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort,"HTTP file server")
       httpServer.start()
       serverUri = httpServer.uri
       logDebug("HTTP file server started at: " + serverUri)
      }

    HttpServer的构造和start方法的实现中,再次使用了Utils的静态方法startServiceOnPort,因此会回调doStart方法,见代码清单3-11。有关jetty的API使用參见附录C。

    代码清单3-11         HttpServer的启动

      def start() {
        if (server != null) {
          throw newServerStateException("Serveris already started")
        } else {
         logInfo("Starting HTTP Server")
          val(actualServer, actualPort) =
           Utils.startServiceOnPort[Server](requestedPort, doStart, conf,serverName)
          server= actualServer
          port =actualPort
        }
      }
     

    doStart方法中启动内嵌的jetty所提供的HTTP服务。见代码清单3-12。

    代码清单3-12         HttpServer的启动功能实现

     private def doStart(startPort: Int): (Server, Int) = {
        val server = new Server()
        val connector = newSocketConnector
        connector.setMaxIdleTime(60 *1000)
        connector.setSoLingerTime(-1)
        connector.setPort(startPort)
        server.addConnector(connector)
     
        val threadPool = newQueuedThreadPool
        threadPool.setDaemon(true)
        server.setThreadPool(threadPool)
        val resHandler = newResourceHandler
        resHandler.setResourceBase(resourceBase.getAbsolutePath)
     
        val handlerList = new HandlerList
        handlerList.setHandlers(Array(resHandler,newDefaultHandler))
     
        if(securityManager.isAuthenticationEnabled()) {
         logDebug("HttpServer is using security")
          val sh =setupSecurityHandler(securityManager)
          //make sure we go through security handler to get resources
          sh.setHandler(handlerList)
          server.setHandler(sh)
        } else {
         logDebug("HttpServer is not using security")
          server.setHandler(handlerList)
        }
     
        server.start()
        val actualPort = server.getConnectors()(0).getLocalPort
     
        (server,actualPort)
      }


    3.2.12 创建測量系统MetricsSystem

             MetricsSystem是Spark的測量系统。创建MetricsSystem的代码例如以下。

      val metricsSystem = if (isDriver) {
         MetricsSystem.createMetricsSystem("driver", conf,securityManager)
        } else {
         conf.set("spark.executor.id", executorId)
          val ms = MetricsSystem.createMetricsSystem("executor",conf,securityManager)
          ms.start()
          ms
        }

    上面调用的createMetricsSystem方法实际创建了MetricsSystem,代码例如以下。

      def createMetricsSystem(
         instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem= {
        new MetricsSystem(instance, conf, securityMgr)
      }

    构造MetricsSystem的过程最重要的是调用了MetricsConfig的initialize方法。见代码清单3-13。

    代码清单3-13         MetricsConfig的初始化

    def initialize() {
       setDefaultProperties(properties)
     
        var is: InputStream = null
        try {
          is = configFilematch {
            case Some(f) =>newFileInputStream(f)
            case None =>Utils.getSparkClassLoader.getResourceAsStream(METRICS_CONF)
          }
     
          if (is !=null) {
            properties.load(is)
          }
        } catch {
          case e: Exception =>logError("Error loadingconfigure file",e)
        } finally {
          if (is !=null) is.close()
        }
     
       propertyCategories = subProperties(properties,INSTANCE_REGEX)
        if(propertyCategories.contains(DEFAULT_PREFIX)) {
          import scala.collection.JavaConversions._
     
          val defaultProperty =propertyCategories(DEFAULT_PREFIX)
          for { (inst,prop) <-propertyCategories
                if (inst !=DEFAULT_PREFIX)
               (k, v) <- defaultProperty
                if (prop.getProperty(k) ==null) } {
            prop.setProperty(k,v)
          }
        }
      }
     

    从以上实现能够看出,MetricsConfig的initialize方法主要负责载入metrics.properties文件里的属性配置,并对属性进行初始化转换。

    比如:将属性

    {*.sink.servlet.path=/metrics/json,applications.sink.servlet.path=/metrics/applications/json,*.sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, master.sink.servlet.path=/metrics/master/json}

    转换为

    Map(applications ->{sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet,sink.servlet.path=/metrics/applications/json}, master ->{sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/master/json},* -> {sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet,sink.servlet.path=/metrics/json})

    3.2.13 创建SparkEnv

             当全部的基础组件准备好后。终于使用以下的代码创建运行环境SparkEnv。

    new SparkEnv(executorId, actorSystem, serializer, closureSerializer, cacheManager,
         mapOutputTracker, shuffleManager, broadcastManager, blockTransferService,
     blockManager,securityManager, httpFileServer, sparkFilesDir,
    metricsSystem, shuffleMemoryManager, conf)

    注意:serializer和closureSerializer都是使用Class.forName反射生成的org.apache.spark.serializer.JavaSerializer类的实例,当中closureSerializer实例特别用来对Scala中的闭包进行序列化。

    3.3 创建metadataCleaner

    SparkContext为了保持对全部持久化的RDD的跟踪。使用类型是TimeStampedWeakValueHashMap的persistentRdds缓存。metadataCleaner的功能是清除过期的持久化RDD。创建metadataCleaner的代码例如以下。

      private[spark] val persistentRdds = newTimeStampedWeakValueHashMap[Int, RDD[_]]
      private[spark] val metadataCleaner =
        new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT,this.cleanup, conf)


    我们细致看看MetadataCleaner的实现。见代码清单3-14。

    代码清单3-14         MetadataCleaner的实现

    private[spark] class MetadataCleaner(
       cleanerType: MetadataCleanerType.MetadataCleanerType,
       cleanupFunc: (Long) => Unit,
        conf:SparkConf)
      extends Logging
    {
      val name =cleanerType.toString
     
      private val delaySeconds =MetadataCleaner.getDelaySeconds(conf, cleanerType)
      private valperiodSeconds = math.max(10, delaySeconds /10)
      private val timer = new Timer(name +" cleanup timer", true)
     
      private val task = new TimerTask {
        override def run() {
          try {
            cleanupFunc(System.currentTimeMillis()- (delaySeconds *1000))
           logInfo("Ran metadata cleaner for " + name)
          } catch {
            case e: Exception =>logError("Error runningcleanup task for " +name, e)
          }
        }
      }
     
      if (delaySeconds> 0){
       timer.schedule(task, delaySeconds * 1000, periodSeconds *1000)
      }
     
      def cancel() {
        timer.cancel()
      }
    }


    从MetadataCleaner的实现能够看出事实上质是一个用TimerTask实现的定时器。不断调用cleanupFunc: (Long) => Unit这种函数參数。

    构造metadataCleaner时的函数參数是cleanup,用于清理persistentRdds中的过期内容,代码例如以下。

      private[spark] defcleanup(cleanupTime: Long) {
        persistentRdds.clearOldValues(cleanupTime)
      }

     未完待续。。

    后记:自己牺牲了7个月的周末和下班空暇时间,通过研究Spark源代码和原理,总结整理的《深入理解Spark:核心思想与源代码分析》一书现在已经正式出版上市,眼下亚马逊、京东、当当、天猫等站点均有销售,欢迎感兴趣的同学购买。我開始研究源代码时的Spark版本号是1.2.0。经过7个多月的研究和出版社近4个月的流程。Spark自身的版本号迭代也非常快,现在最新已经是1.6.0。

    眼下市面上另外2本源代码研究的Spark书籍的版本号各自是0.9.0版本号和1.2.0版本号。看来这些书的作者都与我一样。遇到了这样的问题。

    因为研究和出版都须要时间,所以不能及时跟上Spark的脚步,还请大家见谅。

    可是Spark核心部分的变化相对还是非常少的,假设对版本号不是过于追求,依旧能够选择本书。

     

    京东(现有满150减50活动)http://item.jd.com/11846120.html 

    当当:http://product.dangdang.com/23838168.html 


  • 相关阅读:
    链表实现
    @Aspect
    mybatis plus
    using
    50道题
    梦想,青春,时间
    存储过程!!!
    事务,视图,索引
    高级查询--嵌套和相关,两套分页!!!
    学习笔记
  • 原文地址:https://www.cnblogs.com/wgwyanfs/p/7271902.html
Copyright © 2011-2022 走看看