zoukankan      html  css  js  c++  java
  • Spark Core(四)用LogQuery的例子来说明Executor是如何运算RDD的算子(转载)

    1. 究竟是怎么运行的?

    很多的博客里大量的讲了什么是RDD, Dependency, Shuffle.......但是究竟那些Executor是怎么运行你提交的代码段的?
    下面是一个日志分析的例子,来自Spark的example
    def main(args: Array[String]) {  
      val sparkConf = new SparkConf().setAppName("Log Query")  
      val sc = new SparkContext(sparkConf)  
      val dataSet =  
        if (args.length == 1) sc.textFile(args(0)) else sc.parallelize(exampleApacheLogs)  
      // scalastyle:off  
      val apacheLogRegex =  
        """^([d.]+) (S+) (S+) 
    ([wd:/]+s[+-]d4)
     "(.+?)" (d{3}) ([d-]+) "([^"]+)" "([^"]+)".*""".r  
      // scalastyle:on  
      /** Tracks the total query count and number of aggregate bytes for a particular group. */  
      class Stats(val count: Int, val numBytes: Int) extends Serializable {  
        def merge(other: Stats): Stats = {  
          new Stats(count + other.count, numBytes + other.numBytes)  
        }  
        override def toString: String = "bytes=%s	n=%s".format(numBytes, count)  
      }  
      
      def extractKey(line: String): (String, String, String) = {  
        apacheLogRegex.findFirstIn(line) match {  
          case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) =>  
            if (user != ""-"") (ip, user, query)  
            else (null, null, null)  
          case _ => (null, null, null)  
        }  
      }  
      
      def extractStats(line: String): Stats = {  
        apacheLogRegex.findFirstIn(line) match {  
          case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) =>  
            new Stats(1, bytes.toInt)  
          case _ => new Stats(1, 0)  
        }  
      }  
      
      dataSet.map(line => (extractKey(line), extractStats(line)))  
        .reduceByKey((c, d) => c.merge(d))  
        .collect().foreach{  
          case (user, query) => println("%s	%s".format(user, query))}  
      
      sc.stop()  
    }  
    在map的RDD算子里,自定义了extractKey, extractStats函数,而在reduceByKey的RDD又自定义了一个相同的key的merge函数
    这些函数是如何被传递到executor里并且进行运算的呢?

    1.1 RDD,ShuffleDependency

    在前面的博文(Executor上是如何launch task的)中,已经讨论过如何获取到Driver的RDD, Dependency, 那么RDD如何能够运行这些函数呢?
     
    Execute获取的DAG里提交的ShuffleMapTask是在TaskDecription中serializedTask中反序列化出来
    ShuffleMapTask的RunTask的方法
    override def runTask(context: TaskContext): MapStatus = {  
       // Deserialize the RDD using the broadcast variable.  
       val threadMXBean = ManagementFactory.getThreadMXBean  
       val deserializeStartTime = System.currentTimeMillis()  
       val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {  
         threadMXBean.getCurrentThreadCpuTime  
       } else 0L  
       val ser = SparkEnv.get.closureSerializer.newInstance()  
       val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](  
         ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)  
       _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime  
       _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {  
         threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime  
       } else 0L  
      
       var writer: ShuffleWriter[Any, Any] = null  
       try {  
         val manager = SparkEnv.get.shuffleManager  
         writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)  
         writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])  
         writer.stop(success = true).get  
       } catch {  
         case e: Exception =>  
           try {  
             if (writer != null) {  
               writer.stop(success = false)  
             }  
           } catch {  
             case e: Exception =>  
               log.debug("Could not stop writer", e)  
           }  
           throw e  
       }  
     }  
    看到了通过shufflewrite去写迭代的rdd数据

    1.1.1 ShuffleWrite

    ShuffleWrite的构建是通过shuffleManager来获取的,在SortShuffleManager.scala中
    /** Get a writer for a given partition. Called on executors by map tasks. */  
     override def getWriter[K, V](  
         handle: ShuffleHandle,  
         mapId: Int,  
         context: TaskContext): ShuffleWriter[K, V] = {  
       numMapsForShuffle.putIfAbsent(  
         handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)  
       val env = SparkEnv.get  
       handle match {  
         case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>  
           new UnsafeShuffleWriter(  
             env.blockManager,  
             shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],  
             context.taskMemoryManager(),  
             unsafeShuffleHandle,  
             mapId,  
             context,  
             env.conf)  
         case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>  
           new BypassMergeSortShuffleWriter(  
             env.blockManager,  
             shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],  
             bypassMergeSortHandle,  
             mapId,  
             context,  
             env.conf)  
         case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>  
           new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)  
       }  
     }  
    在ShuffleDependency中保存着ShuffleHandle, ShuffleHandle中也保存着Dependency
    1. 在Driver DAG 中registerShuffle中dependency决定着使用什么ShuffleHandle
    2. 在Executor的shuffleManager中是由dependency中的ShuffleHandle来决定什么ShuffleWrite
    题外话:Dependency本身就可以直接决定shuffleWrite,整个ShuffleHandle只是在SortShuffleWriter的时候用于获取了dependency, Executor端SortShuffleWriter本身就能获取到Dependency,ShuffleHandle感觉就是一个鸡肋。
     
    在日志分析的这个代码案例中,返回的是SortShuffleWriter

    1.1.2 RDD.iterator

    在ShuffleMapTask中的runTask方法
    writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) 

    writer在调用的write函数中传递了rdd.iterator,也就是通过rdd构造的迭代器

    final def iterator(split: Partition, context: TaskContext): Iterator[T] = {  
       if (storageLevel != StorageLevel.NONE) {  
         getOrCompute(split, context)  
       } else {  
         computeOrReadCheckpoint(split, context)  
       }  
     }  

    Map的rdd的构造迭代器MapPartitionsRDD,MapPartitionsRDD并没有设置缓存或者存储,StorageLevel是NONE,调用computerOrReadCheckpoint方法

    /**  
     * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.  
     */  
    private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =  
    {  
      if (isCheckpointedAndMaterialized) {  
        firstParent[T].iterator(split, context)  
      } else {  
        compute(split, context)  
      }  
    }  

    也没有做过checkpointed ,调用compute方法

    override def compute(split: Partition, context: TaskContext): Iterator[U] =  
      f(context, split.index, firstParent[T].iterator(split, context))  

    先来看fistParent

    /** Returns the first parent RDD */  
    protected[spark] def firstParent[U: ClassTag]: RDD[U] = {  
      dependencies.head.rdd.asInstanceOf[RDD[U]]  
    }  

    每个RDD都会保存一个Dependency的数组,Dependency里有RDD的属性,而Dependency数组的头一个dependency的RDD,就是处理数据的首个RDD,也就是如下的代码里的dataSet

    val dataSet =  
          if (args.length == 1) sc.textFile(args(0)) else sc.parallelize(exampleApacheLogs)  
    我们以parallelize为例子,所对应的RDD就是ParallelCollectionRDD回到
    firstParent[T].iterator(split, context))  

    iterator函数就是前面的RDD函数,StorageLevel依然是NONE,也没有做过checkpointed,依然还是调用compute的方法

    override def compute(s: Partition, context: TaskContext): Iterator[T] = {  
      new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)  
    }  

    生成了一个InterruptibleIterator迭代器,迭代器本质只是一个代理的迭代器

    @DeveloperApi  
    class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])  
      extends Iterator[T] {  
      
      def hasNext: Boolean = {  
        // TODO(aarondav/rxin): Check Thread.interrupted instead of context.interrupted if interrupt  
        // is allowed. The assumption is that Thread.interrupted does not have a memory fence in read  
        // (just a volatile field in C), while context.interrupted is a volatile in the JVM, which  
        // introduces an expensive read fence.  
        if (context.isInterrupted) {  
          throw new TaskKilledException  
        } else {  
          delegate.hasNext  
        }  
      }  
      
      def next(): T = delegate.next()  
    }  

    当发现有打断命令的时候,直接抛出TaskKilledException的异常,其所代理的iterator 是

    s.asInstanceOf[ParallelCollectionPartition[T]].iterator 

    ParallelCollectionRDD的Partition就是ParallelCollectionPartition

    private[spark] class ParallelCollectionPartition[T: ClassTag](  
        var rddId: Long,  
        var slice: Int,  
        var values: Seq[T]  
      ) extends Partition with Serializable {  
      
      def iterator: Iterator[T] = values.iterator  
       .......  
    }  

    Values是需要支持序列化的数组,在Driver端ParallelCollectionRDD中将数据Data进行了ParallelCollectionPartition的分片,分片的数据Values被保存在了ParallelCollectionPartition里,数据并没有被保存在ParallelCollectionRDD中,所以进行计算的数据并不是通过RDD传递过来的,而是通过反序列化ShuffleMapTask获得的,走的是直接的rpc通道

    private[spark] class ShuffleMapTask(  
        stageId: Int,  
        stageAttemptId: Int,  
        taskBinary: Broadcast[Array[Byte]],  
        partition: Partition,  
        @transient private var locs: Seq[TaskLocation],  
        metrics: TaskMetrics,  
        localProperties: Properties,  
        jobId: Option[Int] = None,  
        appId: Option[String] = None,  
        appAttemptId: Option[String] = None)  
      extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId,  
        appId, appAttemptId)  

    回到MapPartitionsRDD原来的函数中去:

    override def compute(split: Partition, context: TaskContext): Iterator[U] =  
      f(context, split.index, firstParent[T].iterator(split, context))  

    要看看f是什么?RDD.map函数

    def map[U: ClassTag](f: T => U): RDD[U] = withScope {  
      val cleanF = sc.clean(f)  
      new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))  
    }  

    我们在看看我们是如何调用map函数的:

    dataSet.map(line => (extractKey(line), extractStats(line)))
    f(context, split.index, firstParent[T].iterator(split, context))就是调用了(context, pid,iter) =>iter.map(cleanF) 关键的是iter.map函数这是scala的基本函数,查看scala代码Iterator.scala
    def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {  
       def hasNext = self.hasNext  
       def next() = f(self.next())  
     }  
    返回的可以简单的认为AbstractIterator,self 指向的是InterruptibleIterator,f 就是 line => (extractKey(line), extractStats(line))
    我们来看ExternalSorter.scala通过迭代器获取Partiton的数据并进行运算的代码
    while (records.hasNext) {  
            addElementsRead()  
            kv = records.next()  
            map.changeValue((getPartition(kv._1), kv._1), update)  
            maybeSpillCollection(usingMap = true)  
          }  
    • AbstractIterator.hasNext -> InterruptibleIterator.hasNext ->  Elements( Seq.interator).hasNext -> def hasNext: Boolean = index < end
    • AbstractIterator.next() -> InterruptibleIterator.next() -> Elements( Seq.interator).next(). -> f(InterruptibleIterator.next()) ->(extractKey(InterruptibleIterator.next()), extractStats(InterruptibleIterator.next()))
    运算extractKey, extractStats后返回的是一个Product2[Tuple3(String,String,String),Stats] KV值
     
    还记得executor会loadDriver的jar么?虽然在scala里所定义函数都默认支持反序列化,但是在运行方法并不需要反序列化,只要加载jar包,classload 这个我们写的driver的类就可以了。

    1.1.3 reduceByKey算子

    在LogQuery中
    .reduceByKey((c, d) => c.merge(d))  

    我们来看PairRDDFunction.scala中的reduceByKey,为什么PairRDDFunction不是RDD在前面的博客已经描述过

    def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {  
        combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)  
      }  

    combineByKeyWithClassTag函数中

    def combineByKeyWithClassTag[C](  
          createCombiner: V => C,  
          mergeValue: (C, V) => C,  
          mergeCombiners: (C, C) => C,  
          partitioner: Partitioner,  
          mapSideCombine: Boolean = true,  
          serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {  
        require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0  
        if (keyClass.isArray) {  
          if (mapSideCombine) {  
            throw new SparkException("Cannot use map-side combining with array keys.")  
          }  
          if (partitioner.isInstanceOf[HashPartitioner]) {  
            throw new SparkException("HashPartitioner cannot partition array keys.")  
          }  
        }  
        val aggregator = new Aggregator[K, V, C](  
          self.context.clean(createCombiner),  
          self.context.clean(mergeValue),  
          self.context.clean(mergeCombiners))  
        if (self.partitioner == Some(partitioner)) {  
          self.mapPartitions(iter => {  
            val context = TaskContext.get()  
            new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))  
          }, preservesPartitioning = true)  
        } else {  
          new ShuffledRDD[K, V, C](self, partitioner)  
            .setSerializer(serializer)  
            .setAggregator(aggregator)  
            .setMapSideCombine(mapSideCombine)  
        }  
      }  
    在以前都没有介绍过Aggregator,我们来介绍一下这个Aggregator,Aggregator有三个关键函数
    1. createCombiner: 通过Map获得的新KV, 在Key不存在的情况下将V转化为C
    2. mergeValue: 通过Map获得的新KV, 在已经存在相同的Key情况下,将新获得的V聚合到C
    3. mergeCombiners: 分布式计算的时候,最后要每个RDD的分区最后汇总,汇总的时候对相同的Key,已经聚合的C和另一个分区已经聚合的C再次聚合
    在logquery的例子中,mergeValue, mergeCombiners 就是 (c,d)  =>c.merge(d)            createCombiner就是 stats不变
    还是回到ExternalSorter.scala的insertAll中
    val mergeValue = aggregator.get.mergeValue  
          val createCombiner = aggregator.get.createCombiner  
          var kv: Product2[K, V] = null  
          val update = (hadValue: Boolean, oldValue: C) => {  
            if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)  
          }  
          while (records.hasNext) {  
            addElementsRead()  
            kv = records.next()  
            map.changeValue((getPartition(kv._1), kv._1), update)  
            maybeSpillCollection(usingMap = true)  
          }  

    我们看到在map.changeValue的时候,通过update的方法更新相同的key

    val update = (hadValue: Boolean, oldValue: C) => {  
            if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)  
          }  

    mergeValue,createCombiner就是从Aggregator中获取到的,而Aggregator被保存在ShuffledRDD和ShuffledDependency中,ShuffledDependency是通过Driver RPC传递给Executor的,所以可以从ShuffledDependency获取到Aggregator,通过Aggregator里指定的算法进行KV的操作,而mergeValue就是Driver中的c.merge(d),因为c 是stats 对象

    class Stats(val count: Int, val numBytes: Int) extends Serializable {  
      def merge(other: Stats): Stats = {  
        new Stats(count + other.count, numBytes + other.numBytes)  
      }  
      override def toString: String = "bytes=%s	n=%s".format(numBytes, count)  
    }  
    调用了Stats.merge的方法

    2. 总结

    • 通过反序列化RDD(不是ShuffleRDD),通过Dependency的列表获的最初获取数据的RDD的迭代器A
    • Map算子对迭代器A重新封装AbstractIterator,在迭代器A获取结果后进行Map算子里的函数调用line => (extractKey(line), extractStats(line)),返回KV的结果
    • reduceByKey算子里的函数传递是通过ShuffledDependency里的aggregator进行传递
    • Executor 只要对迭代器AbstractIterator进行迭代获取KV,调用aggregator里的方法进行相同的K对V进行操作,完成Driver里面的main函数定义的RDD运算。
  • 相关阅读:
    jQuery each的实现与call方法的详细介绍
    转载Entity Framework 5.0(EF first)中的添加,删除,修改,查询,状态跟踪操作
    转载有个小孩跟我说LINQ(重点讲述Linq中GroupBy的原理及用法)
    luogu P3305 [SDOI2013]费用流
    bzoj 4819: [Sdoi2017]新生舞会
    bzoj4817: [Sdoi2017]树点涂色
    bzoj4816: [Sdoi2017]数字表格
    bzoj 4818: [Sdoi2017]序列计数
    [JSOI2007]重要的城市(x)
    BZOJ 1009 [HNOI2008]GT考试
  • 原文地址:https://www.cnblogs.com/itboys/p/9213029.html
Copyright © 2011-2022 走看看