zoukankan      html  css  js  c++  java
  • hadoop Shuffle Error OOM错误分析和解决

     

    在执行Reduce Shuffle的过程中,偶尔会遇到Shuffle Error,但是重启任务之后,Shuffle Error会消失,当然这只是在某些特定情况下才会报出来的错误。虽然在每次执行很短的时间报出这个错误,但是如果单个Reducer的错误数量超出maxAttempt,就会导致整个任务失败。

     



     

     

    Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#50 
    at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:121) 
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:380) 
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:415) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) 
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157) 
    Caused by: java.lang.OutOfMemoryError: Java heap space 
    at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:56) 
    at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:46) 
    at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.<init>(InMemoryMapOutput.java:63) 
    at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.unconditionalReserve(MergeManagerImpl.java:297) 
    at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:287) 
    at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:411) 
    at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:341) 
    at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:165)

     

    本分析过程同时借鉴了这篇blog:

    http://www.sqlparty.com/yarn%E5%9C%A8shuffle%E9%98%B6%E6%AE%B5%E5%86%85%E5%AD%98%E4%B8%8D%E8%B6%B3%E9%97%AE%E9%A2%98error-in-shuffle-in-fetcher/

     

    结合hadoop 2.2.0的源代码来对整个失败过程进行简要分析。

     

    从代码分析来看,最底层Fetcher.run方法执行时出现的错误,在Shuffle.run方法中,会启动一定数量的Fetcher线程(数量由参数mapreduce.reduce.shuffle.parallelcopies决定,我们配置的事50个,是不是有点多,默认是5),Fetcher线程用来从map端copy数据到Reducer端本地。

     

    Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
        for (int i=0; i < numFetchers; ++i) {
          fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
                                         reporter, metrics, this,
                                         reduceTask.getShuffleSecret());
          fetchers[i].start();
        }
       
        // Wait for shuffle to complete successfully
        while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
          reporter.progress();
         
          synchronized (this) {
            if (throwable != null) {
              throw new ShuffleError("error in shuffle in " + throwingThreadName,
                                     throwable);
            }
          }
        }

     

     

    当任意一个Fetcher发生异常时,就会在scheduler的等待后能够在主线程发现,停掉整个Reducer。

     

    public synchronized void reportException(Throwable t) {
        if (throwable == null) {
          throwable = t;
          throwingThreadName = Thread.currentThread().getName();
          // Notify the scheduler so that the reporting thread finds the
          // exception immediately.
          synchronized (scheduler) {
            scheduler.notifyAll();
          }
        }
      }

     

     

    在异常堆栈发生的地方,Fetcher中调用copyFromHost方法,调用到Fetcher的114行,merger.reserve方法会调用MergerManagerImpl.reserve

     

    @Override
      public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId,
                                                 long requestedSize,
                                                 int fetcher
                                                 ) throws IOException {
        if (!canShuffleToMemory(requestedSize)) {
          LOG.info(mapId + ": Shuffling to disk since " + requestedSize +
                   " is greater than maxSingleShuffleLimit (" +
                   maxSingleShuffleLimit + ")");
          return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize,
                                          jobConf, mapOutputFile, fetcher, true);
        }
    ...

     

     

    重点是这个canShuffleToMemory方法,它会决定是启动OnDiskMapOutput还是InMemoryMapOutput类,标准就是需要的内存数量小于设置的限制。

     

    private boolean canShuffleToMemory(long requestedSize) {
        return (requestedSize < maxSingleShuffleLimit);
      }

     

     

    在初始化MergerManageImpl的时候设置了这个限制,MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES(mapreduce.reduce.memory.totalbytes)这个参数我们并没有设置,因此使用的是Runtime.getRuntime.maxMemory()*maxInMemCopyUse, MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT(mapreduce.reduce.shuffle.input.buffer.percent) 参数使用的是0.70,也就是最大内存的70%用于做Shuffle/Merge,比如当前Reducer端内存设置成2G,那么就会有1.4G内存。

     

    final float maxInMemCopyUse =
          jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 0.90f);
    this.memoryLimit =
          (long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
              Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
            * maxInMemCopyUse);
    final float singleShuffleMemoryLimitPercent =
            jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT,
                DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT);
    this.maxSingleShuffleLimit =
          (long)(memoryLimit * singleShuffleMemoryLimitPercent);

     

     

    而单个Shuffle最大能够使用多少内存,还需要再乘一个参数:MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT(mapreduce.reduce.shuffle.memory.limit.percent),我们当前并没有设置这个参数,那么默认值为0.25f,此时单个Shuffle最大能够使用1.4G*0.25f=350M内存。

     

    InMemory会在初始化时接收一个size参数,这个size的计算方式暂时未知,用于初始化其BoundedByteArrayOutputStream, 

    public InMemoryMapOutput(Configuration conf, TaskAttemptID mapId,
                               MergeManagerImpl<K, V> merger,
                               int size, CompressionCodec codec,
                               boolean primaryMapOutput) {

     

     

    这个size也就是BoundedByteArrayOutputStream作为byte[]的大小:

    public BoundedByteArrayOutputStream(int capacity, int limit) {
        this(new byte[capacity], 0, limit);
      }

     

     

    OOM也就是出现在这一行。

     

    而我们出的错可能就是出现在判定为使用InMemoryMapOutput但是分配内存时出现的错误,试想使用50个Fetcher线程,单个线程设置为最大接收350M,而堆的最大内存为2G,这样只要有7个Fetcher线程判断为使用InMemoryMapOutput,且同时开始接收数据,就可能造成Java Heap的OOM错误,从而导致Shuffle Error。

     

    我觉得我们可以对使用的参数进行一定的调整,比如说减少Fetcher线程的数量,减少单个Shuffle使用InMemory操作的比例让其OnDisk操作等等,来避免这个问题。

     

     

  • 相关阅读:
    股票数据可视化
    试下代码高亮
    【Spark亚太研究院系列丛书】Spark实战高手之路-第3章Spark架构设计与编程模型第3节:Spark架构设计(2)
    【Spark亚太研究院系列丛书】Spark实战高手之路-第3章Spark架构设计与编程模型第3节:Spark架构设计(1)
    【Spark亚太研究院系列丛书】Spark实战高手之路-第3章Spark架构设计与编程模型第2节:Spark架构设计(2)
    【Spark亚太研究院系列丛书】Spark实战高手之路-第3章Spark架构设计与编程模型第2节:Spark架构设计(1)
    【Spark亚太研究院系列丛书】Spark实战高手之路-第3章Spark架构设计与编程模型第1节:为什么Spark是大数据必然的现在和未来?(2)
    【Spark亚太研究院系列丛书】Spark实战高手之路-第3章Spark架构设计与编程模型第1节:为什么Spark是大数据必然的现在和未来?(1)
    【Spark亚太研究院系列丛书】Spark实战高手之路-第2章动手实战Scala第3小节:动手实战Scala函数式编程(2)
    【Spark亚太研究院系列丛书】Spark实战高手之路-第2章动手实战Scala第3小节:动手实战Scala函数式编程(1)
  • 原文地址:https://www.cnblogs.com/mmaa/p/5789905.html
Copyright © 2011-2022 走看看