zoukankan      html  css  js  c++  java
  • Spark Core源代码分析: Spark任务模型

    概述

    一个Spark的Job分为多个stage,最后一个stage会包含一个或多个ResultTask,前面的stages会包含一个或多个ShuffleMapTasks。

    ResultTask运行并将结果返回给driver application。

    ShuffleMapTask将task的output依据task的partition分离到多个buckets里。一个ShuffleMapTask相应一个ShuffleDependency的partition,而总partition数同并行度、reduce数目是一致的。


    Task

    Task的代码在scheduler package下。

    抽象类Task构造參数例如以下:

    private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable

    Task相应一个stageId和partitionId。

    提供runTask()接口、kill()接口等。

    提供killed变量、TaskMetrics变量、TaskContext变量等。

    除了上述基本接口和变量,Task的伴生对象提供了序列化和反序列化应用依赖的jar包的方法。原因是Task须要保证工作节点具备本次Task须要的其它依赖,注冊到SparkContext下,所以提供了把依赖转成流写入写出的方法。


    Task的两种实现


    ShuffleMapTask

    ShuffleMapTask构造參数例如以下,

    private[spark] class ShuffleMapTask(
        stageId: Int,
        var rdd: RDD[_],
        var dep: ShuffleDependency[_,_],
        _partitionId: Int,
        @transient private var locs: Seq[TaskLocation])
      extends Task[MapStatus](stageId, _partitionId)
    

    RDD partitioner相应的是ShuffleDependency。

    ShuffleMapTask复写了MapStatus向外读写的方法,由于向外读写的内容包含:stageId,rdd,dep,partitionId,epoch和split(某个partition)。对于当中的stageId,rdd,dep有统一的序列化和反序列化操作并会cache在内存里,再放到ObjectOutput里写出去。序列化操作使用的是Gzip,序列化信息会维护在serializedInfoCache = newHashMap[Int, Array[Byte]]。这部分须要序列化并保存的原因是:stageId,rdd,dep真正代表了本次Shuffle Task的信息,为了减轻master节点负担,把这部分序列化结果cache了起来。


    Stage运行逻辑

    主要过程例如以下:

    val ser = Serializer.getSerializer(dep.serializer)
    shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)
    

    这一步是初始化一个ShuffleWriterGroup,Group里面是一个BlockObjectWriter数组。


    for (elem <- rdd.iterator(split, context)) {
    val pair = elem.asInstanceOf[Product2[Any, Any]]
      val bucketId = dep.partitioner.getPartition(pair._1)
      shuffle.writers(bucketId).write(pair)
    }
    

    这一步是为每一个Writer相应一个bucket,调用每一个BlockObjectWriter的write()方法写数据


    var totalBytes = 0L
    var totalTime = 0L
    val compressedSizes: Array[Byte] = 
    shuffle.writers.map { writer: BlockObjectWriter =>
        writer.commit()
        writer.close()
    val size = writer.fileSegment().length
        totalBytes += size
    totalTime += writer.timeWriting()
    MapOutputTracker.compressSize(size)
    }
    

    这一步是运行writer.commit(),并得到结果file segment大小,对总大小压缩


    val shuffleMetrics = new ShuffleWriteMetrics
    shuffleMetrics.shuffleBytesWritten = totalBytes
    shuffleMetrics.shuffleWriteTime = totalTime
    metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)
    
    success = true
    new MapStatus(blockManager.blockManagerId, compressedSizes)
    

    这一步是记录metrcis信息,最后返回一个MapStatus类,里面是本地ShuffleMapTask结果的相关信息。

    最后会release writers,让相应的shuffle文件得到记录和重用(ShuffleBlockManager管理这些file,这些file是Shuffle Task中一组Writer写的对象)。

    主要把下图看懂。


    重要类

    介绍涉及到的重要外部类,帮助理解。


    ShuffleBlockManager

    总体梳理:

    ShuffleState维护了两个ShuffleFileGroup的ConcurrentLinkedQueue,以记录眼下shuffle的state。

    ShuffleState记录了一次shuffle操作的文件组状态,在ShuffleBlockManager内用Map为每一个shuffleId维护了一个ShuffleState。

    每一个shuffleId通过forMapTask()方法得到一组writer,即ShuflleWriterGroup。这组里的writers共享一个shuffleId和mapId,可是每一个相应不同的bucketId和file。在为writer分配FileGroup的时候,会从shuffleId相应的shuffle state里先取unusedFileGroup,假设不存在,则在HDFS上新建File。

    对于HDFS上的目标file,writer是能够append写的。在新建file的时候,是依据shuffleId和bucket number和一个递增的fileId来创建新的文件的。


    ShuffleFileGroup的重用files和记录mapId,index,offset这块似懂非懂。

    重要方法:

    def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = { new ShuffleWriterGroup {} }

    该方法被一个ShuffleMapTask调用,传入了这次shuffle操作的id,mapId是partitionId。Buckects数目等于分区数目。该方法返回的ShuffleWriterGroup里面是一组DiskBlockObjectWriter,每个writer都属于这一次shuffle操作,所以他们有共同的shuffleId,mapId,可是他们相应了不同的bucket,而且各自相应一个file。

    在shuffle run里的调用和參数传入:

    val ser = Serializer.getSerializer(dep.serializer)
    shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)
    

    shuffleId是由ShuffleDependency获得的全局唯一id,代表本次shuffle任务id

    mapId等于partitionId

    Bucket数目等于分区数目

    产生writers:

    Writer类型是DiskBlockObjectWriter,数目等于buckets数目。bufferSize的设置:

    conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024

    blockId产生自:

    blockId = ShuffleBlockId(shuffleId, mapId, bucketId)

    在生成writer的时候调用的是BlockManager的getDiskWriter方法,ShuffleBlockManager初始化的时候绑定BlockManager。

    private[spark] class DiskBlockObjectWriter(
        blockId: BlockId,
        file: File,
        serializer: Serializer,
        bufferSize: Int,
        compressStream: OutputStream => OutputStream,
        syncWrites: Boolean)
      extends BlockObjectWriter(blockId)
    
    

    ShuffleFileGroup:私有内部类,相应了一组shuffle files,每一个file相应一个reducer。一个Mapper会分到一个ShuffleFileGroup,把mapper的结果写到这组File里去。

    MapStatus

    注意到ShuffleMapTask的类型是MapStatus类。MapStatus类是ShuffleMapTask要返回给scheduler的运行结果,包含两个东西:

    class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])

    前者是run这次task的block manager地址(BlockManagerId是一个类,保存了executorId,host, port, nettyPort),后者是output大小,该值会传给接下来的reduce任务。该size是被MapOutputTracker压缩过的。


    MapStatus类提供了两个方法例如以下,ShuffleMapTask进行了复写。

      def writeExternal(out: ObjectOutput) {
        location.writeExternal(out)
        out.writeInt(compressedSizes.length)
        out.write(compressedSizes)
      }
    
      def readExternal(in: ObjectInput) {
        location = BlockManagerId(in)
        compressedSizes = new Array[Byte](in.readInt())
        in.readFully(compressedSizes)
      }
    

    BlockManagerId

    BlockManagerId类构造依赖executorId, host, port, nettyPort这些信息。伴生对象维护了一个blockManagerIdCache ,实现为ConcurrentHashMap[BlockManagerId,BlockManagerId]() 。

    比方MapStatus的readExternal方法把ObjectInput传入BlockManagerId构造函数的时候,BlockManagerId的apply()方法就会依据ObjectInput取出executorId, host, port,nettyPort信息,把这个BlockManagerIdobj维护到blockManagerIdCache


    ResultTask

    构造參数

    private[spark] class ResultTask[T, U](
        stageId: Int,
        var rdd: RDD[T],
        var func: (TaskContext, Iterator[T]) => U,
        _partitionId: Int,
        @transient locs: Seq[TaskLocation],
        var outputId: Int)
      extends Task[U](stageId, _partitionId) with Externalizable {
    

    ResultTask比較简单,runTask方法调用的是rdd的迭代器:

      override def runTask(context: TaskContext): U = {
        metrics = Some(context.taskMetrics)
        try {
          func(context, rdd.iterator(split, context))
        } finally {
          context.executeOnCompleteCallbacks()
        }
      }
    

    进程模型 vs. 线程模型

    Spark同节点上的任务以多线程的方式执行在一个JVM进程中。

    长处:

    启动任务快

    共享内存,适合内存密集型任务

    Executor所占资源可反复利用

    缺点:

    同节点上的全部任务执行在一个进程中,会出现严重的资源争用,难以细粒度控制每一个任务的占用资源。MapReduce为Map Task和Reduce Task设置不同资源,细粒度控制任务占用资源量。

    MapReduce的每一个Task都是一个JVM进程,都要经历:资源申请->执行任务->释放资源的过程

    每一个节点能够有一个或多个Executor,Executor配有一定数量slots,Executor内能够跑多个Result Task和ShuffleMap Task。

    在共享内存方面,broadcast的变量会在每一个executor里存一份,这个executor内的任务能够共享。




    全文完 :)



  • 相关阅读:
    非静态成员的sizeof
    Android中java.lang.ClassNotFoundException: ***.**** in loader dalvik.system.PathClassLoader[/data/app
    手机 SIM卡的EF
    android 获取手机ip的三种方式
    获取图片倒影效果
    python基础
    Python学习_数据处理split方法
    Python学习_从文件读取数据和保存数据
    Python学习_列表推导和Lambda表达式
    python学习_数据处理编程实例(一)
  • 原文地址:https://www.cnblogs.com/mengfanrong/p/3862458.html
Copyright © 2011-2022 走看看