zoukankan      html  css  js  c++  java
  • spark源码阅读 RDDs

    RDDs弹性分布式数据集

    spark就是实现了RDDs编程模型的集群计算平台。有很多RDDs的介绍,这里就不仔细说了,这儿主要看源码。

    abstract class RDD[T: ClassTag](
        @transient private var _sc: SparkContext,
        @transient private var deps: Seq[Dependency[_]]
      ) extends Serializable with Logging {

    SparkEnv几个重要组件

    BlockManager

    主要成员

      private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
      // Actual storage of where blocks are kept
      private var externalBlockStoreInitialized = false
      private[spark] val memoryStore = new MemoryStore(this, memoryManager)
      private[spark] val diskStore = new DiskStore(this, diskBlockManager)
      private[spark] lazy val externalBlockStore: ExternalBlockStore = {
        externalBlockStoreInitialized = true
        new ExternalBlockStore(this, executorId)
      }

    主要方法

    get(blockId: BlockId) 通过BlockId找Block

      /**
       * Get a block from the block manager (either local or remote).
       */
      def get(blockId: BlockId): Option[BlockResult] = {
        val local = getLocal(blockId)
        if (local.isDefined) {
          logInfo(s"Found block $blockId locally")
          return local
        }
        val remote = getRemote(blockId)
        if (remote.isDefined) {
          logInfo(s"Found block $blockId remotely")
          return remote
        }
        None
      }
    View Code

    getLocal(blockId: BlockId): Option[BlockResult] 通过本地Block Manager找Block

      /**
       * Get block from local block manager.
       */
      def getLocal(blockId: BlockId): Option[BlockResult] = {
        logDebug(s"Getting local block $blockId")
        doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
      }
    View Code

    doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] 在本地获取Block代码实现

      private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
        val info = blockInfo.get(blockId).orNull
        if (info != null) {
          info.synchronized {
            // Double check to make sure the block is still there. There is a small chance that the
            // block has been removed by removeBlock (which also synchronizes on the blockInfo object).
            // Note that this only checks metadata tracking. If user intentionally deleted the block
            // on disk or from off heap storage without using removeBlock, this conditional check will
            // still pass but eventually we will get an exception because we can't find the block.
            if (blockInfo.get(blockId).isEmpty) {
              logWarning(s"Block $blockId had been removed")
              return None
            }
    
            // If another thread is writing the block, wait for it to become ready.
            if (!info.waitForReady()) {
              // If we get here, the block write failed.
              logWarning(s"Block $blockId was marked as failure.")
              return None
            }
    
            val level = info.level
            logDebug(s"Level for block $blockId is $level")
    
            // Look for the block in memory
            if (level.useMemory) {
              logDebug(s"Getting block $blockId from memory")
              val result = if (asBlockResult) {
                memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
              } else {
                memoryStore.getBytes(blockId)
              }
              result match {
                case Some(values) =>
                  return result
                case None =>
                  logDebug(s"Block $blockId not found in memory")
              }
            }
    
            // Look for the block in external block store
            if (level.useOffHeap) {
              logDebug(s"Getting block $blockId from ExternalBlockStore")
              if (externalBlockStore.contains(blockId)) {
                val result = if (asBlockResult) {
                  externalBlockStore.getValues(blockId)
                    .map(new BlockResult(_, DataReadMethod.Memory, info.size))
                } else {
                  externalBlockStore.getBytes(blockId)
                }
                result match {
                  case Some(values) =>
                    return result
                  case None =>
                    logDebug(s"Block $blockId not found in ExternalBlockStore")
                }
              }
            }
    
            // Look for block on disk, potentially storing it back in memory if required
            if (level.useDisk) {
              logDebug(s"Getting block $blockId from disk")
              val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
                case Some(b) => b
                case None =>
                  throw new BlockException(
                    blockId, s"Block $blockId not found on disk, though it should be")
              }
              assert(0 == bytes.position())
    
              if (!level.useMemory) {
                // If the block shouldn't be stored in memory, we can just return it
                if (asBlockResult) {
                  return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,
                    info.size))
                } else {
                  return Some(bytes)
                }
              } else {
                // Otherwise, we also have to store something in the memory store
                if (!level.deserialized || !asBlockResult) {
                  /* We'll store the bytes in memory if the block's storage level includes
                   * "memory serialized", or if it should be cached as objects in memory
                   * but we only requested its serialized bytes. */
                  memoryStore.putBytes(blockId, bytes.limit, () => {
                    // https://issues.apache.org/jira/browse/SPARK-6076
                    // If the file size is bigger than the free memory, OOM will happen. So if we cannot
                    // put it into MemoryStore, copyForMemory should not be created. That's why this
                    // action is put into a `() => ByteBuffer` and created lazily.
                    val copyForMemory = ByteBuffer.allocate(bytes.limit)
                    copyForMemory.put(bytes)
                  })
                  bytes.rewind()
                }
                if (!asBlockResult) {
                  return Some(bytes)
                } else {
                  val values = dataDeserialize(blockId, bytes)
                  if (level.deserialized) {
                    // Cache the values before returning them
                    val putResult = memoryStore.putIterator(
                      blockId, values, level, returnValues = true, allowPersistToDisk = false)
                    // The put may or may not have succeeded, depending on whether there was enough
                    // space to unroll the block. Either way, the put here should return an iterator.
                    putResult.data match {
                      case Left(it) =>
                        return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
                      case _ =>
                        // This only happens if we dropped the values back to disk (which is never)
                        throw new SparkException("Memory store did not return an iterator!")
                    }
                  } else {
                    return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
                  }
                }
              }
            }
          }
        } else {
          logDebug(s"Block $blockId not registered locally")
        }
        None
      }
    View Code

     

    相关类

    Dependency

    宽依赖和窄依赖两种。Denpendency类中主要保存父RDD,根据partition id获得所依赖的父RDD partitions列表。ShuffleDependency还保存了shuffle需要的必要组件,shuffle output partitioner、serializer、keyOrdering、aggregator等

    abstract class Dependency[T] extends Serializable {
      def rdd: RDD[T]
    }
    
    
    /**
     * :: DeveloperApi ::
     * Base class for dependencies where each partition of the child RDD depends on a small number
     * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
     */
    @DeveloperApi
    abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
      /**
       * Get the parent partitions for a child partition.
       * @param partitionId a partition of the child RDD
       * @return the partitions of the parent RDD that the child partition depends upon
       */
      def getParents(partitionId: Int): Seq[Int]
    
      override def rdd: RDD[T] = _rdd
    }
    
    
    /**
     * :: DeveloperApi ::
     * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
     * the RDD is transient since we don't need it on the executor side.
     *
     * @param _rdd the parent RDD
     * @param partitioner partitioner used to partition the shuffle output
     * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,
     *                   the default serializer, as specified by `spark.serializer` config option, will
     *                   be used.
     * @param keyOrdering key ordering for RDD's shuffles
     * @param aggregator map/reduce-side aggregator for RDD's shuffle
     * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
     */
    @DeveloperApi
    class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
        @transient private val _rdd: RDD[_ <: Product2[K, V]],
        val partitioner: Partitioner,
        val serializer: Option[Serializer] = None,
        val keyOrdering: Option[Ordering[K]] = None,
        val aggregator: Option[Aggregator[K, V, C]] = None,
        val mapSideCombine: Boolean = false)
      extends Dependency[Product2[K, V]] {
    
      override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
    
      private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
      private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
      // Note: It's possible that the combiner class tag is null, if the combineByKey
      // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
      private[spark] val combinerClassName: Option[String] =
        Option(reflect.classTag[C]).map(_.runtimeClass.getName)
    
      val shuffleId: Int = _rdd.context.newShuffleId()
    
      val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
        shuffleId, _rdd.partitions.size, this)
    
      _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
    }
    
    
    /**
     * :: DeveloperApi ::
     * Represents a one-to-one dependency between partitions of the parent and child RDDs.
     */
    @DeveloperApi
    class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
      override def getParents(partitionId: Int): List[Int] = List(partitionId)
    }
    
    
    /**
     * :: DeveloperApi ::
     * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
     * @param rdd the parent RDD
     * @param inStart the start of the range in the parent RDD
     * @param outStart the start of the range in the child RDD
     * @param length the length of the range
     */
    @DeveloperApi
    class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
      extends NarrowDependency[T](rdd) {
    
      override def getParents(partitionId: Int): List[Int] = {
        if (partitionId >= outStart && partitionId < outStart + length) {
          List(partitionId - outStart + inStart)
        } else {
          Nil
        }
      }
    }
    View Code

    RDDBlockId

    一个RDD partition用一个RDDBlock存储。

    /**
     * :: DeveloperApi ::
     * Identifies a particular Block of data, usually associated with a single file.
     * A Block can be uniquely identified by its filename, but each type of Block has a different
     * set of keys which produce its unique name.
     *
     * If your BlockId should be serializable, be sure to add it to the BlockId.apply() method.
     */
    @DeveloperApi
    sealed abstract class BlockId {
      /** A globally unique identifier for this Block. Can be used for ser/de. */
      def name: String
    
      // convenience methods
      def asRDDId: Option[RDDBlockId] = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
      def isRDD: Boolean = isInstanceOf[RDDBlockId]
      def isShuffle: Boolean = isInstanceOf[ShuffleBlockId]
      def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]
    
      override def toString: String = name
      override def hashCode: Int = name.hashCode
      override def equals(other: Any): Boolean = other match {
        case o: BlockId => getClass == o.getClass && name.equals(o.name)
        case _ => false
      }
    }
    
    @DeveloperApi
    case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
      override def name: String = "rdd_" + rddId + "_" + splitIndex
    }
    View Code

    BlockInfo

    private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
      // To save space, 'pending' and 'failed' are encoded as special sizes:
      @volatile var size: Long = BlockInfo.BLOCK_PENDING
      private def pending: Boolean = size == BlockInfo.BLOCK_PENDING
      private def failed: Boolean = size == BlockInfo.BLOCK_FAILED
      private def initThread: Thread = BlockInfo.blockInfoInitThreads.get(this)
    
      setInitThread()
    
      private def setInitThread() {
        /* Set current thread as init thread - waitForReady will not block this thread
         * (in case there is non trivial initialization which ends up calling waitForReady
         * as part of initialization itself) */
        BlockInfo.blockInfoInitThreads.put(this, Thread.currentThread())
      }
    
      /**
       * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing).
       * Return true if the block is available, false otherwise.
       */
      def waitForReady(): Boolean = {
        if (pending && initThread != Thread.currentThread()) {
          synchronized {
            while (pending) {
              this.wait()
            }
          }
        }
        !failed
      }
    
      /** Mark this BlockInfo as ready (i.e. block is finished writing) */
      def markReady(sizeInBytes: Long) {
        require(sizeInBytes >= 0, s"sizeInBytes was negative: $sizeInBytes")
        assert(pending)
        size = sizeInBytes
        BlockInfo.blockInfoInitThreads.remove(this)
        synchronized {
          this.notifyAll()
        }
      }
    
      /** Mark this BlockInfo as ready but failed */
      def markFailure() {
        assert(pending)
        size = BlockInfo.BLOCK_FAILED
        BlockInfo.blockInfoInitThreads.remove(this)
        synchronized {
          this.notifyAll()
        }
      }
    }
    
    private object BlockInfo {
      /* initThread is logically a BlockInfo field, but we store it here because
       * it's only needed while this block is in the 'pending' state and we want
       * to minimize BlockInfo's memory footprint. */
      private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread]
    
      private val BLOCK_PENDING: Long = -1L
      private val BLOCK_FAILED: Long = -2L
    }
    View Code

    主要成员

    /** sparkContext */
    def sparkContext: SparkContext = sc
    /** 唯一标识的RDD id */
    val id: Int = sc.newRddId()
    /** name */
    @transient var name: String = null
    /** 依赖列表 */
    protected def getDependencies: Seq[Dependency[_]] = deps
    /** 分区列表 */
    protected def getPartitions: Array[Partition]
    /** 子类可选重写,优先存储的位置 */
    protected def getPreferredLocations(split: Partition): Seq[String] = Nil
    /** 分区器 */
    @transient val partitioner: Option[Partitioner] = None
    /** 指定怎么计算的到RDD的分区 */
    def compute(split: Partition, context: TaskContext): Iterator[T]
    View Code

    主要方法

    persist实现

      private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
        // TODO: Handle changes of StorageLevel
        if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
          throw new UnsupportedOperationException(
            "Cannot change storage level of an RDD after it was already assigned a level")
        }
        // If this is the first time this RDD is marked for persisting, register it
        // with the SparkContext for cleanups and accounting. Do this only once.
        if (storageLevel == StorageLevel.NONE) {
          sc.cleaner.foreach(_.registerRDDForCleanup(this))
          sc.persistRDD(this)
        }
        storageLevel = newLevel
        this
      }
    
      private[spark] def persistRDD(rdd: RDD[_]) {
        persistentRdds(rdd.id) = rdd
      }
    View Code

    获取数据iterator方法

      /**
       * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
       * This should ''not'' be called by users directly, but is available for implementors of custom
       * subclasses of RDD.
       */
      final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
        if (storageLevel != StorageLevel.NONE) {
          SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
        } else {
          computeOrReadCheckpoint(split, context)
        }
      }
    View Code

    getOrCompute

      /** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
      def getOrCompute[T](
          rdd: RDD[T],
          partition: Partition,
          context: TaskContext,
          storageLevel: StorageLevel): Iterator[T] = {
    
        val key = RDDBlockId(rdd.id, partition.index)
        logDebug(s"Looking for partition $key")
        blockManager.get(key) match {
          case Some(blockResult) =>
            // Partition is already materialized, so just return its values
            val existingMetrics = context.taskMetrics
              .getInputMetricsForReadMethod(blockResult.readMethod)
            existingMetrics.incBytesRead(blockResult.bytes)
    
            val iter = blockResult.data.asInstanceOf[Iterator[T]]
            new InterruptibleIterator[T](context, iter) {
              override def next(): T = {
                existingMetrics.incRecordsRead(1)
                delegate.next()
              }
            }
          case None =>
            // Acquire a lock for loading this partition
            // If another thread already holds the lock, wait for it to finish return its results
            val storedValues = acquireLockForPartition[T](key)
            if (storedValues.isDefined) {
              return new InterruptibleIterator[T](context, storedValues.get)
            }
    
            // Otherwise, we have to load the partition ourselves
            try {
              logInfo(s"Partition $key not found, computing it")
              val computedValues = rdd.computeOrReadCheckpoint(partition, context)
    
              // If the task is running locally, do not persist the result
              if (context.isRunningLocally) {
                return computedValues
              }
    
              // Otherwise, cache the values and keep track of any updates in block statuses
              val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
              val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
              val metrics = context.taskMetrics
              val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
              metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
              new InterruptibleIterator(context, cachedValues)
    
            } finally {
              loading.synchronized {
                loading.remove(key)
                loading.notifyAll()
              }
            }
        }
      }
    View Code
  • 相关阅读:
    Java异常面试题
    Quickhit快速击键
    多态and接口
    Java面向对象编程概述
    学生管理系统--分层开发
    类型转换
    文件上传
    ongl(示例3-6 多值类型的数据处理)
    ongl(原始类型和包装类型)
    Interceptor
  • 原文地址:https://www.cnblogs.com/qquan/p/5661227.html
Copyright © 2011-2022 走看看