zoukankan      html  css  js  c++  java
  • sparksql 报错Container killed by YARN for exceeding memory limits. xGB of x GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling

    对此 提高了对外内存 spark.executor.memoryOverhead  = 4096m 

    重新执行sql 改报下面的错误

    19/12/25 15:49:02 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from bigdata-datanode:7339
    io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 5721030656, max: 5726797824)
        at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640)
        at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594)
        at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764)
        at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740)
        at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:226)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:146)
        at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324)
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)
        at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:137)
        at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:80)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:122)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)

    从错误来看 是堆外内存溢出

    Spark的shuffle部分使用了netty框架进行网络传输,但netty会申请堆外内存缓存 Shuffle时,每个Reduce都需要获取每个map对应的输出,当一个reduce需要获取的一个map数据比较大 超出配置的限制就报了这个错。

    我们看下spark2.4的源码 发现新增了一个配置

     private[spark] val MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM =
        ConfigBuilder("spark.maxRemoteBlockSizeFetchToMem")
          .doc("Remote block will be fetched to disk when size of the block is above this threshold " +
            "in bytes. This is to avoid a giant request takes too much memory. We can enable this " +
            "config by setting a specific value(e.g. 200m). Note this configuration will affect " +
            "both shuffle fetch and block manager remote block fetch. For users who enabled " +
            "external shuffle service, this feature can only be worked when external shuffle" +
            "service is newer than Spark 2.2.")
          .bytesConf(ByteUnit.BYTE)
          // fetch-to-mem is guaranteed to fail if the message is bigger than 2 GB, so we might
          // as well use fetch-to-disk in that case.  The message includes some metadata in addition
          // to the block data itself (in particular UploadBlock has a lot of metadata), so we leave
          // extra room.
          .createWithDefault(Int.MaxValue - 512)

    为避免shuffle从远程拉取的块过大 加载到内存中。超出这个配置的大小 溢写到磁盘中

     val wrappedStreams = new ShuffleBlockFetcherIterator(
          context,
          blockManager.shuffleClient,
          blockManager,
          mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
          serializerManager.wrapStream,
          // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
          SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
          SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
          SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
          SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
          SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))

    跟进去看下

        // Fetch remote shuffle blocks to disk when the request is too large. Since the shuffle data is
        // already encrypted and compressed over the wire(w.r.t. the related configs), we can just fetch
        // the data and write it to file directly.
        if (req.size > maxReqSizeShuffleToMem) {
          shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,
            blockFetchingListener, this)
        } else {
          shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,
            blockFetchingListener, null)
        }
      }

    在spark-default.conf文件中 配置下

    spark.maxRemoteBlockSizeFetchToMem 512m

    重新执行sql试试

     

     

     可以看到shuffle 文件是很大的 难怪会失败 通过给更多内存貌似也不行了 由于任务量太大 还是比较慢  有些时候报以下错误

    FetchFailed(BlockManagerId(1958, bigdata-odatanode, 7339, None), shuffleId=18, mapId=1, reduceId=1, message=
    org.apache.spark.shuffle.FetchFailedException: Connection from bigdata-datanode/ip:7339 closed
      at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554)
      at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485)
      at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
      at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
      at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
      at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
      at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
      at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
      at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage40.sort_addToSorter_0$(Unknown Source)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage40.processNext(Unknown Source)
      at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
      at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
      at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:794)
      at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoinExec.scala:755)
      at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeJoinExec.scala:917)
      at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeJoinExec.scala:953)
      at org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage45.processNext(Unknown Source)
      at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
      at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:232)
      at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
      at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
      at org.apache.spark.scheduler.Task.run(Task.scala:123)
      at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
    Caused by: java.io.IOException: Connection from bigdata-datanode/ip:7339 closed

    这个错误是 心跳超时 连接断开的问题 ,分析可能是excutor发现gc或者 文件太大导致 后观察excutor没有full gc那么就是文件太大 运行太慢。

    继续调大并行度 目前并行度设置 

    spark.sql.shuffle.partitions 1000

    将之改成5000

    重新跑sql 快了很多

     上面的错误减少了

  • 相关阅读:
    LeetCode Array Easy 1. Two Sum
    关于VS2015 发布.net mvc 网站失败的问题
    2016计蒜之道复赛 百度地图的实时路况 floyd+cdq分治
    2016计蒜之道复赛 菜鸟物流的运输网络 网络流EK
    HDU5715 XOR 游戏 二分+字典树+dp
    HDU5697 刷题计划 dp+最小乘积生成树
    codeforces 687D Dividing Kingdom II 带权并查集(dsu)
    codeforces 687C
    codeforces 687B
    HDU 5693 D Game 区间dp
  • 原文地址:https://www.cnblogs.com/songchaolin/p/12098426.html
Copyright © 2011-2022 走看看