zoukankan      html  css  js  c++  java
  • Parquet && spark和Hive的问题排查

    Parquet异常问题排查
    问题异常如下:

    Caused by: parquet.hadoop.MemoryManager$1: New Memory allocation 1044273 bytes is smaller than the minimum allocation size of 1048576 bytes.
            at parquet.hadoop.MemoryManager.updateAllocation(MemoryManager.java:125)
            at parquet.hadoop.MemoryManager.addWriter(MemoryManager.java:82)
            at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:104)
            at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:303)
            at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:267)
            at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.<init>(ParquetRecordWriterWrapper.java:65)
            at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:125)
            at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:114)
            at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:261)
            at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:246)
            ... 19 more
    

    定位异常的代码:

    private void updateAllocation() {
    ...其他代码
        for (Map.Entry<InternalParquetRecordWriter, Long> entry : writerList.entrySet()) {
          long newSize = (long) Math.floor(entry.getValue() * scale);
          if(scale < 1.0 && minMemoryAllocation > 0 && newSize < minMemoryAllocation) {
              throw new ParquetRuntimeException(String.format("New Memory allocation %d bytes" +
              " is smaller than the minimum allocation size of %d bytes.",
                  newSize, minMemoryAllocation)){};
          }
          entry.getKey().setRowGroupSizeThreshold(newSize);
          LOG.debug(String.format("Adjust block size from %,d to %,d for writer: %s",
                entry.getValue(), newSize, entry.getKey()));
        }
      }
    

    抛出异常的检查条件 scale 小于1 并且 minMemoryAllocation 大于0 并且新申请的空间大小小于 minMemoryAllocation 的值。那么解决此问题的思路简单的看有两种方法
    1. 将minMemoryAllocation 设置为0
    2. 将minMemoryAllocation 设置的小一点,比新申请的空间大小还要小,具体的参考值,可以参考异常提示的值。

    修改minMemoryAllocation的值通过

    hiveContext.setConf("parquet.memory.min.chunk.size", (1024 * 32).toString)
    

    设置,此处我设置了32K。问题得到了解决。

    但是深入研究发现,其实 scale 小于1 ,也是触发此异常的关键,为什么要对此进行判断呢?

    我们先看看scale是如何计算的?

        long totalAllocations = 0;
        double scale;
        for (Long allocation : writerList.values()) {
          totalAllocations += allocation;
        }
        if (totalAllocations <= totalMemoryPool) {
          scale = 1.0;
        } else {
          scale = (double) totalMemoryPool / totalAllocations;
          LOG.warn(String.format(
              "Total allocation exceeds %.2f%% (%,d bytes) of heap memory
    " +
              "Scaling row group sizes to %.2f%% for %d writers",
              100*memoryPoolRatio, totalMemoryPool, 100*scale, writerList.size()));
        }
    

    此处显示当需要分配的内存大于等于系统总的内存时,scale的值会小于1.需要分配的内存通过writerList的值累加获取,那么writerList是什么呢?

    
      /**
       * Add a new writer and its memory allocation to the memory manager.
       * @param writer the new created writer
       * @param allocation the requested buffer size
       */
      synchronized void addWriter(InternalParquetRecordWriter writer, Long allocation) {
        Long oldValue = writerList.get(writer);
        if (oldValue == null) {
          writerList.put(writer, allocation);
        } else {
          throw new IllegalArgumentException("[BUG] The Parquet Memory Manager should not add an " +
              "instance of InternalParquetRecordWriter more than once. The Manager already contains " +
              "the writer: " + writer);
        }
        updateAllocation();
      }
    

    新增writer时,会增加需要分配的值,新增writer是如何触发的呢?

      public ParquetRecordWriter(
          ParquetFileWriter w,
          WriteSupport<T> writeSupport,
          MessageType schema,
          Map<String, String> extraMetaData,
          long blockSize, int pageSize,
          BytesCompressor compressor,
          int dictionaryPageSize,
          boolean enableDictionary,
          boolean validating,
          WriterVersion writerVersion,
          MemoryManager memoryManager) {
        internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
            extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary,
            validating, writerVersion);
        this.memoryManager = checkNotNull(memoryManager, "memoryManager");
        memoryManager.addWriter(internalWriter, blockSize);
      }
      
      
      public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, CompressionCodecName codec)
            throws IOException, InterruptedException {
        final WriteSupport<T> writeSupport = getWriteSupport(conf);
    
        CodecFactory codecFactory = new CodecFactory(conf);
        long blockSize = getLongBlockSize(conf);
        if (INFO) LOG.info("Parquet block size to " + blockSize);
        int pageSize = getPageSize(conf);
        if (INFO) LOG.info("Parquet page size to " + pageSize);
        int dictionaryPageSize = getDictionaryPageSize(conf);
        if (INFO) LOG.info("Parquet dictionary page size to " + dictionaryPageSize);
        boolean enableDictionary = getEnableDictionary(conf);
        if (INFO) LOG.info("Dictionary is " + (enableDictionary ? "on" : "off"));
        boolean validating = getValidation(conf);
        if (INFO) LOG.info("Validation is " + (validating ? "on" : "off"));
        WriterVersion writerVersion = getWriterVersion(conf);
        if (INFO) LOG.info("Writer version is: " + writerVersion);
    
        WriteContext init = writeSupport.init(conf);
        ParquetFileWriter w = new ParquetFileWriter(conf, init.getSchema(), file);
        w.start();
    
        float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
            MemoryManager.DEFAULT_MEMORY_POOL_RATIO);
        long minAllocation = conf.getLong(ParquetOutputFormat.MIN_MEMORY_ALLOCATION,
            MemoryManager.DEFAULT_MIN_MEMORY_ALLOCATION);
        if (memoryManager == null) {
          memoryManager = new MemoryManager(maxLoad, minAllocation);
        } else if (memoryManager.getMemoryPoolRatio() != maxLoad) {
          LOG.warn("The configuration " + MEMORY_POOL_RATIO + " has been set. It should not " +
              "be reset by the new value: " + maxLoad);
        }
    
        return new ParquetRecordWriter<T>(
            w,
            writeSupport,
            init.getSchema(),
            init.getExtraMetaData(),
            blockSize, pageSize,
            codecFactory.getCompressor(codec, pageSize),
            dictionaryPageSize,
            enableDictionary,
            validating,
            writerVersion,
            memoryManager);
      }
    

    由此可见,随着writer创建的个数越来越多,导致申请的内存的数量超出了系统分配的数量,从而导致 scale 计算得到小于1的情况。而在我们的场景下,是因为采用了自动分区,由于创建分区的数量超出了我们设想的值,因此才触发此bug。为什么要这么设置呢?
    我们看看下面代码:

    entry.getKey().setRowGroupSizeThreshold(newSize);
    

    最终设置的是 RowGroupSizeThreshold 的值,

      private void checkBlockSizeReached() throws IOException {
        if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
          long memSize = columnStore.getBufferedSize();
          if (memSize > rowGroupSizeThreshold) {
            LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, rowGroupSizeThreshold, recordCount));
            flushRowGroupToStore();
            initStore();
            recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
          } else {
            float recordSize = (float) memSize / recordCount;
            recordCountForNextMemCheck = min(
                max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(rowGroupSizeThreshold / recordSize)) / 2), // will check halfway
                recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead
                );
            if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck));
          }
        }
      }
    

    当内存的值超过阈值的时候,会自动触发将内存的数据flush到硬盘中,从而保证不会出现内存溢出的情况,所以writer越多,每个writer的阈值会越小。

    总结一下:
    当hive写入的时候,每增加一个分区时,会创建一个writer,而增加一个writer,会触发修改所有的writer的RowGroupSizeThreshold内存阈值,从而保证不会发生内存溢出的情况。

  • 相关阅读:
    某个牛人做WINDOWS系统文件详解
    常用ASP脚本程序集锦
    LINUX基础:文件安全与权限
    proftpd+mysql+quota
    apache2.0.49tomcat5.0.19jk2建立virtualHost
    URL Redirection(转) Anny
    顶级域名后缀列表(转) Anny
    \u4E00\u9FA5意义 Anny
    How to POST Form Data Using Ruby(转) Anny
    How to get rid of 'Enter password to unlock your login keyring' in Ubuntu(转) Anny
  • 原文地址:https://www.cnblogs.com/luckuan/p/6721940.html
Copyright © 2011-2022 走看看