zoukankan      html  css  js  c++  java
  • 【原创】大叔问题定位分享(27)spark中rdd.cache

    spark 2.1.1

    spark应用中有一些task非常慢,持续10个小时,有一个task日志如下:

    2019-01-24 21:38:56,024 [dispatcher-event-loop-22] INFO org.apache.spark.executor.CoarseGrainedExecutorBackend - Got assigned task 4031
    2019-01-24 21:38:56,024 [Executor task launch worker for task 4031] INFO org.apache.spark.executor.Executor - Running task 11.0 in stage 98.0 (TID 4031)
    2019-01-24 21:38:56,050 [Executor task launch worker for task 4031] INFO org.apache.spark.MapOutputTrackerWorker - Don't have map outputs for shuffle 13, fetching them
    2019-01-24 21:38:56,050 [Executor task launch worker for task 4031] INFO org.apache.spark.MapOutputTrackerWorker - Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@server1:30384)
    2019-01-24 21:38:56,052 [Executor task launch worker for task 4031] INFO org.apache.spark.MapOutputTrackerWorker - Got the output locations
    2019-01-24 21:38:56,052 [Executor task launch worker for task 4031] INFO org.apache.spark.storage.ShuffleBlockFetcherIterator - Getting 200 non-empty blocks out of 200 blocks
    2019-01-24 21:38:56,054 [Executor task launch worker for task 4031] INFO org.apache.spark.storage.ShuffleBlockFetcherIterator - Started 19 remote fetches in 2 ms

    2019-01-25 07:07:54,200 [Executor task launch worker for task 4031] INFO org.apache.spark.storage.memory.MemoryStore - Block rdd_108_11 stored as values in memory (estimated size 222.6 MB, free 1893.2 MB)
    2019-01-25 07:07:54,546 [Executor task launch worker for task 4031] INFO org.apache.spark.storage.memory.MemoryStore - Block rdd_117_11 stored as values in memory (estimated size 87.5 MB, free 1805.8 MB)
    2019-01-25 07:07:54,745 [Executor task launch worker for task 4031] INFO org.apache.spark.storage.memory.MemoryStore - Block rdd_118_11 stored as values in memory (estimated size 87.5 MB, free 1718.3 MB)
    2019-01-25 07:07:54,987 [Executor task launch worker for task 4031] INFO org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer - Sorting complete. Writing out partition files one at a time.
    2019-01-25 07:07:57,425 [Executor task launch worker for task 4031] INFO org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - Saved output of task 'attempt_20190124213852_0098_m_000011_0' to hdfs://namenode/user/hive/warehouse/
    db_name.db/table_name/.hive-staging_hive_2019-01-24_21-38-52_251_7997709482427937209-1/-ext-10000/_temporary/0/task_20190124213852_0098_m_000011
    2019-01-25 07:07:57,425 [Executor task launch worker for task 4031] INFO org.apache.spark.mapred.SparkHadoopMapRedUtil - attempt_20190124213852_0098_m_000011_0: Committed
    2019-01-25 07:07:57,426 [Executor task launch worker for task 4031] INFO org.apache.spark.executor.Executor - Finished task 11.0 in stage 98.0 (TID 4031). 4259 bytes result sent to driver

    从2019-01-24 21:38:56到2019-01-25 07:07:54之间没有任何日志,应用还没结束,当前还有一些很慢的task在运行,查看这些task所在executor的thread dump发现卡在一个线程上:

    java.lang.Thread.sleep(Native Method)
    app.package.AppClass.do(AppClass.scala:228)
    org.apache.spark.sql.execution.MapElementsExec$$anonfun$8$$anonfun$apply$1.apply(objects.scala:237)
    org.apache.spark.sql.execution.MapElementsExec$$anonfun$8$$anonfun$apply$1.apply(objects.scala:237)
    scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
    org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
    org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
    org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
    org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
    org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
    org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
    org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
    org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
    org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
    org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
    org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
    org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
    org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
    org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
    org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
    org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
    org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
    org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
    org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
    org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
    org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
    org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
    org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
    org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
    org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
    org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
    org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
    org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
    org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    org.apache.spark.scheduler.Task.run(Task.scala:99)
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    java.lang.Thread.run(Thread.java:745)

    其中app.package.AppClass.do是一个很耗时的操作,会在rdd的每个element上操作一次,问题是已经在这个操作之后对rdd做了cache,为什么后续依赖这个rdd的时候又会重新计算一遍?

    问题简化如下:

    rdd.map(item => doLongTime(item))
    rdd.cache
    //take long time
    println(rdd.count)
    //take long time too, why?
    println(rdd.count)

    查看代码

    RDD的compute由子类覆盖,通常会调用RDD.iterator

    org.apache.spark.rdd.RDD

      /**
       * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
       * This should ''not'' be called by users directly, but is available for implementors of custom
       * subclasses of RDD.
       */
      final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
        if (storageLevel != StorageLevel.NONE) {
          getOrCompute(split, context)
        } else {
          computeOrReadCheckpoint(split, context)
        }
      }
    
      /**
       * Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached.
       */
      private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
        val blockId = RDDBlockId(id, partition.index)
        var readCachedBlock = true
        // This method is called on executors, so we need call SparkEnv.get instead of sc.env.
        SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
          readCachedBlock = false
          computeOrReadCheckpoint(partition, context)
        }) match {
          case Left(blockResult) =>
            if (readCachedBlock) {
              val existingMetrics = context.taskMetrics().inputMetrics
              existingMetrics.incBytesRead(blockResult.bytes)
              new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
                override def next(): T = {
                  existingMetrics.incRecordsRead(1)
                  delegate.next()
                }
              }
            } else {
              new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
            }
          case Right(iter) =>
            new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
        }
      }

    RDD.iterator中会根据storageLevel有一个判断,一个是尝试从checkpoint中恢复或者计算,一个是从cache中get或计算,加了cache的rdd会执行RDD.getOrCompute,RDD.getOrCompute会调用BlockManager.getOrElseUpdate

    org.apache.spark.storage.BlockManager

      /**
       * Retrieve the given block if it exists, otherwise call the provided `makeIterator` method
       * to compute the block, persist it, and return its values.
       *
       * @return either a BlockResult if the block was successfully cached, or an iterator if the block
       *         could not be cached.
       */
      def getOrElseUpdate[T](
          blockId: BlockId,
          level: StorageLevel,
          classTag: ClassTag[T],
          makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
        // Attempt to read the block from local or remote storage. If it's present, then we don't need
        // to go through the local-get-or-put path.
        get[T](blockId)(classTag) match {
          case Some(block) =>
            return Left(block)
          case _ =>
            // Need to compute the block.
        }
        // Initially we hold no locks on this block.
        doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
          case None =>
            // doPut() didn't hand work back to us, so the block already existed or was successfully
            // stored. Therefore, we now hold a read lock on the block.
            val blockResult = getLocalValues(blockId).getOrElse {
              // Since we held a read lock between the doPut() and get() calls, the block should not
              // have been evicted, so get() not returning the block indicates some internal error.
              releaseLock(blockId)
              throw new SparkException(s"get() failed for block $blockId even though we held a lock")
            }
            // We already hold a read lock on the block from the doPut() call and getLocalValues()
            // acquires the lock again, so we need to call releaseLock() here so that the net number
            // of lock acquisitions is 1 (since the caller will only call release() once).
            releaseLock(blockId)
            Left(blockResult)
          case Some(iter) =>
            // The put failed, likely because the data was too large to fit in memory and could not be
            // dropped to disk. Therefore, we need to pass the input iterator back to the caller so
            // that they can decide what to do with the values (e.g. process them without caching).
           Right(iter)
        }
      }

    getOrElseUpdate.getOrElseUpdate首先尝试从cache中获取block,如果没有则调用doPutIterator计算并放到cache中;

    org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
    org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
    org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)

    所以jstack中的堆栈doPutIterator表明cache中没有,需要重新计算;

    org.apache.spark.rdd.RDD

      /**
       * Persist this RDD with the default storage level (`MEMORY_ONLY`).
       */
      def cache(): this.type = persist()
    
      /**
       * Persist this RDD with the default storage level (`MEMORY_ONLY`).
       */
      def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

    cache使用的StorageLevel是MEMORY_ONLY,如果内存不够有些分区可能会被evict掉,具体策略在org.apache.spark.storage.memory.MemoryStore中

    下面看StorageLevel:

    org.apache.spark.storage.StorageLevel

    /**
     * :: DeveloperApi ::
     * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
     * or ExternalBlockStore, whether to drop the RDD to disk if it falls out of memory or
     * ExternalBlockStore, whether to keep the data in memory in a serialized format, and whether
     * to replicate the RDD partitions on multiple nodes.
     *
     * The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants
     * for commonly useful storage levels. To create your own storage level object, use the
     * factory method of the singleton object (`StorageLevel(...)`).
     */
    @DeveloperApi
    class StorageLevel private(
        private var _useDisk: Boolean,
        private var _useMemory: Boolean,
        private var _useOffHeap: Boolean,
        private var _deserialized: Boolean,
        private var _replication: Int = 1)
      extends Externalizable {
    ...
    
    object StorageLevel {
      val NONE = new StorageLevel(false, false, false, false)
      val DISK_ONLY = new StorageLevel(true, false, false, false)
      val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
      val MEMORY_ONLY = new StorageLevel(false, true, false, true)
      val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
      val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
      val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
      val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
      val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
      val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
      val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
      val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

    所以一些昂贵的操作之后不要以为Rdd.cache就可以避免重复计算,因为MEMORY_ONLY只是尽量帮你把数据缓存在内存,并不是一种保证,应该使用RDD.persist(StorageLevel.MEMORY_AND_DISK)

  • 相关阅读:
    8.C++-类的关键字
    BFS-九宫格重排(详解)
    7.C++类与封装的概念
    4.数码相框-freetype多行显示,居中显示
    3.数码相框-通过freetype库实现矢量显示
    Linux-iconv命令、并批处理将所有GBK文件转换为UTF8(18)
    2.数码相框-编码(ASCII/GB2312/Unicode)介绍,并使LCD显示汉字字符(2)
    1.数码相框-相框框架分析(1)
    Kotlin 之操作符重载
    Andorid-解决View重复点击的思路
  • 原文地址:https://www.cnblogs.com/barneywill/p/10321152.html
Copyright © 2011-2022 走看看