zoukankan      html  css  js  c++  java
  • Parquet && spark和Hive的问题排查

    Parquet异常问题排查
    问题异常如下:

    Caused by: parquet.hadoop.MemoryManager$1: New Memory allocation 1044273 bytes is smaller than the minimum allocation size of 1048576 bytes.
            at parquet.hadoop.MemoryManager.updateAllocation(MemoryManager.java:125)
            at parquet.hadoop.MemoryManager.addWriter(MemoryManager.java:82)
            at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:104)
            at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:303)
            at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:267)
            at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.<init>(ParquetRecordWriterWrapper.java:65)
            at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:125)
            at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:114)
            at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:261)
            at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:246)
            ... 19 more
    

    定位异常的代码:

    private void updateAllocation() {
    ...其他代码
        for (Map.Entry<InternalParquetRecordWriter, Long> entry : writerList.entrySet()) {
          long newSize = (long) Math.floor(entry.getValue() * scale);
          if(scale < 1.0 && minMemoryAllocation > 0 && newSize < minMemoryAllocation) {
              throw new ParquetRuntimeException(String.format("New Memory allocation %d bytes" +
              " is smaller than the minimum allocation size of %d bytes.",
                  newSize, minMemoryAllocation)){};
          }
          entry.getKey().setRowGroupSizeThreshold(newSize);
          LOG.debug(String.format("Adjust block size from %,d to %,d for writer: %s",
                entry.getValue(), newSize, entry.getKey()));
        }
      }
    

    抛出异常的检查条件 scale 小于1 并且 minMemoryAllocation 大于0 并且新申请的空间大小小于 minMemoryAllocation 的值。那么解决此问题的思路简单的看有两种方法
    1. 将minMemoryAllocation 设置为0
    2. 将minMemoryAllocation 设置的小一点,比新申请的空间大小还要小,具体的参考值,可以参考异常提示的值。

    修改minMemoryAllocation的值通过

    hiveContext.setConf("parquet.memory.min.chunk.size", (1024 * 32).toString)
    

    设置,此处我设置了32K。问题得到了解决。

    但是深入研究发现,其实 scale 小于1 ,也是触发此异常的关键,为什么要对此进行判断呢?

    我们先看看scale是如何计算的?

        long totalAllocations = 0;
        double scale;
        for (Long allocation : writerList.values()) {
          totalAllocations += allocation;
        }
        if (totalAllocations <= totalMemoryPool) {
          scale = 1.0;
        } else {
          scale = (double) totalMemoryPool / totalAllocations;
          LOG.warn(String.format(
              "Total allocation exceeds %.2f%% (%,d bytes) of heap memory
    " +
              "Scaling row group sizes to %.2f%% for %d writers",
              100*memoryPoolRatio, totalMemoryPool, 100*scale, writerList.size()));
        }
    

    此处显示当需要分配的内存大于等于系统总的内存时,scale的值会小于1.需要分配的内存通过writerList的值累加获取,那么writerList是什么呢?

    
      /**
       * Add a new writer and its memory allocation to the memory manager.
       * @param writer the new created writer
       * @param allocation the requested buffer size
       */
      synchronized void addWriter(InternalParquetRecordWriter writer, Long allocation) {
        Long oldValue = writerList.get(writer);
        if (oldValue == null) {
          writerList.put(writer, allocation);
        } else {
          throw new IllegalArgumentException("[BUG] The Parquet Memory Manager should not add an " +
              "instance of InternalParquetRecordWriter more than once. The Manager already contains " +
              "the writer: " + writer);
        }
        updateAllocation();
      }
    

    新增writer时,会增加需要分配的值,新增writer是如何触发的呢?

      public ParquetRecordWriter(
          ParquetFileWriter w,
          WriteSupport<T> writeSupport,
          MessageType schema,
          Map<String, String> extraMetaData,
          long blockSize, int pageSize,
          BytesCompressor compressor,
          int dictionaryPageSize,
          boolean enableDictionary,
          boolean validating,
          WriterVersion writerVersion,
          MemoryManager memoryManager) {
        internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
            extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary,
            validating, writerVersion);
        this.memoryManager = checkNotNull(memoryManager, "memoryManager");
        memoryManager.addWriter(internalWriter, blockSize);
      }
      
      
      public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, CompressionCodecName codec)
            throws IOException, InterruptedException {
        final WriteSupport<T> writeSupport = getWriteSupport(conf);
    
        CodecFactory codecFactory = new CodecFactory(conf);
        long blockSize = getLongBlockSize(conf);
        if (INFO) LOG.info("Parquet block size to " + blockSize);
        int pageSize = getPageSize(conf);
        if (INFO) LOG.info("Parquet page size to " + pageSize);
        int dictionaryPageSize = getDictionaryPageSize(conf);
        if (INFO) LOG.info("Parquet dictionary page size to " + dictionaryPageSize);
        boolean enableDictionary = getEnableDictionary(conf);
        if (INFO) LOG.info("Dictionary is " + (enableDictionary ? "on" : "off"));
        boolean validating = getValidation(conf);
        if (INFO) LOG.info("Validation is " + (validating ? "on" : "off"));
        WriterVersion writerVersion = getWriterVersion(conf);
        if (INFO) LOG.info("Writer version is: " + writerVersion);
    
        WriteContext init = writeSupport.init(conf);
        ParquetFileWriter w = new ParquetFileWriter(conf, init.getSchema(), file);
        w.start();
    
        float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
            MemoryManager.DEFAULT_MEMORY_POOL_RATIO);
        long minAllocation = conf.getLong(ParquetOutputFormat.MIN_MEMORY_ALLOCATION,
            MemoryManager.DEFAULT_MIN_MEMORY_ALLOCATION);
        if (memoryManager == null) {
          memoryManager = new MemoryManager(maxLoad, minAllocation);
        } else if (memoryManager.getMemoryPoolRatio() != maxLoad) {
          LOG.warn("The configuration " + MEMORY_POOL_RATIO + " has been set. It should not " +
              "be reset by the new value: " + maxLoad);
        }
    
        return new ParquetRecordWriter<T>(
            w,
            writeSupport,
            init.getSchema(),
            init.getExtraMetaData(),
            blockSize, pageSize,
            codecFactory.getCompressor(codec, pageSize),
            dictionaryPageSize,
            enableDictionary,
            validating,
            writerVersion,
            memoryManager);
      }
    

    由此可见,随着writer创建的个数越来越多,导致申请的内存的数量超出了系统分配的数量,从而导致 scale 计算得到小于1的情况。而在我们的场景下,是因为采用了自动分区,由于创建分区的数量超出了我们设想的值,因此才触发此bug。为什么要这么设置呢?
    我们看看下面代码:

    entry.getKey().setRowGroupSizeThreshold(newSize);
    

    最终设置的是 RowGroupSizeThreshold 的值,

      private void checkBlockSizeReached() throws IOException {
        if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
          long memSize = columnStore.getBufferedSize();
          if (memSize > rowGroupSizeThreshold) {
            LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, rowGroupSizeThreshold, recordCount));
            flushRowGroupToStore();
            initStore();
            recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
          } else {
            float recordSize = (float) memSize / recordCount;
            recordCountForNextMemCheck = min(
                max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(rowGroupSizeThreshold / recordSize)) / 2), // will check halfway
                recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead
                );
            if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck));
          }
        }
      }
    

    当内存的值超过阈值的时候,会自动触发将内存的数据flush到硬盘中,从而保证不会出现内存溢出的情况,所以writer越多,每个writer的阈值会越小。

    总结一下:
    当hive写入的时候,每增加一个分区时,会创建一个writer,而增加一个writer,会触发修改所有的writer的RowGroupSizeThreshold内存阈值,从而保证不会发生内存溢出的情况。

  • 相关阅读:
    机器学习(深度学习)
    机器学习(六)
    机器学习一-三
    Leetcode 90. 子集 II dfs
    Leetcode 83. 删除排序链表中的重复元素 链表操作
    《算法竞赛进阶指南》 第二章 Acwing 139. 回文子串的最大长度
    LeetCode 80. 删除有序数组中的重复项 II 双指针
    LeetCode 86 分割链表
    《算法竞赛进阶指南》 第二章 Acwing 138. 兔子与兔子 哈希
    《算法竞赛进阶指南》 第二章 Acwing 137. 雪花雪花雪花 哈希
  • 原文地址:https://www.cnblogs.com/luckuan/p/6721940.html
Copyright © 2011-2022 走看看