zoukankan      html  css  js  c++  java
  • spark SQL读取ORC文件从Driver启动到开始执行Task(或stage)间隔时间太长(计算Partition时间太长)且产出orc单个文件中stripe个数太多问题解决方案

    1、背景:

        控制上游文件个数每天7000个,每个文件大小小于256M,50亿条+,orc格式。查看每个文件的stripe个数,500个左右,查询命令:hdfs fsck viewfs://hadoop/nn01/warehouse/…….db/……/partition_date=2017-11-11/part-06999 -files -blocks;

    stripe个数查看命令:hive --orcfiledump viewfs://hadoop/nn01/warehouse/…….db/table/partition_date=2017-11-11/part-06999 | less

    2、问题出现:

        通过Spark SQL读取orc格式文件,从spark作业提交到计算出Partition,开始执行Task,间隔时间太长。

        频繁打印如下日志:
    17/11/11 03:52:01 INFO BlockManagerMasterEndpoint: Registering block manager gh-data-hdp-dn0640.---:11942 with 6.1 GB RAM, BlockManagerId(554, ----, 11942)
    17/11/11 03:52:29 INFO DFSClient: Firstly choose dn: DatanodeInfoWithStorage[10.20.--.--:50010,DS-32f8aaa5-c6ce-48a9-a2b1-3b169df193b9,DISK], --

    17/11/11 03:52:29 INFO DFSClient: Firstly choose dn: 

        问题抽象:如果执行如下简单SQL 也会出现作业提交后ApplicationMaster(Driver)启动了,作业Task迟迟不执行,Partition不能计算出来。SparkUI刷不出来DAU图,看不到Stage相关信息。

    SELECT * from table where partition_date=2017-11-11 limit 1;

    3、问题分析

        初步分析:Driver读取DataNode的数据,通过分析GC日志发现:确认Driver读取了DataNode上的数据(orc文件的head信息),导致Driver产生了full GC。

        源码跟踪分析:发现和spark读取orc文件的策略有关系。

        查看HiveConf.java发现Spark读取orc文件默认采用HYBRID策略。

    HIVE_ORC_SPLIT_STRATEGY("hive.exec.orc.split.strategy", "HYBRID", new StringSet(new String[]{"HYBRID", "BI", "ETL"}),
     "This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation as opposed 
    to query execution (split generation does not read or cache file footers). ETL strategy is used when spending little more time in 
    split generation is acceptable (split generation reads and caches file footers). HYBRID chooses between the above strategies based 
    on heuristics."),

        查看OrcInputFormat.java文件发现HYBRID切分策略代码如下:

      public SplitStrategy call() throws IOException {
        final SplitStrategy splitStrategy;
        AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
            context.conf, context.transactionList);
        List<Long> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
        Path base = dirInfo.getBaseDirectory();
        List<FileStatus> original = dirInfo.getOriginalFiles();
        boolean[] covered = new boolean[context.numBuckets];
        boolean isOriginal = base == null;
        // if we have a base to work from
        if (base != null || !original.isEmpty()) {
          // find the base files (original or new style)
          List<FileStatus> children = original;
          if (base != null) {
            children = SHIMS.listLocatedStatus(fs, base,
                AcidUtils.hiddenFileFilter);
          }
          long totalFileSize = 0;
          for (FileStatus child : children) {
            totalFileSize += child.getLen();
            AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename
                (child.getPath(), context.conf);
            int b = opts.getBucket();
            // If the bucket is in the valid range, mark it as covered.
            // I wish Hive actually enforced bucketing all of the time.
            if (b >= 0 && b < covered.length) {
              covered[b] = true;
            }
          }
          int numFiles = children.size();
          long avgFileSize = totalFileSize / numFiles;
          switch(context.splitStrategyKind) {
            case BI:
              // BI strategy requested through config
              splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal,
                  deltas, covered);
              break;
            case ETL:
              // ETL strategy requested through config
              splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal,
                  deltas, covered);
              break;
            default:
              // HYBRID strategy
              if (avgFileSize > context.maxSize) {
                splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal, deltas,
                    covered);
              } else {
                splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal, deltas,
                    covered);
              }
              break;
          }
        } else {
          // no base, only deltas
          splitStrategy = new ACIDSplitStrategy(dir, context.numBuckets, deltas, covered);
        }
        return splitStrategy;
      }
    }

        HYBRID策略:Spark Driver启动的时候,会去nameNode读取元数据,根据文件总大小和文件个数计算一个文件的平均大小,如果这个平均值大于默认256M的时候就会触发ETL策略。ETL策略就会去DataNode上读取orc文件的head等信息,如果stripe个数多或元数据信息太大就会导致Driver 产生FUll GC,这个时候就会表现为Driver启动到Task执行间隔时间太久的现象。

    4、解决方案:

    spark 1.6.2:

    val hiveContext = new HiveContext(sc)
    // 默认64M,即代表在压缩前数据量累计到64M就会产生一个stripe。与之对应的hive.exec.orc.default.row.index.stride=10000可以控制有多少行是产生一个stripe。
    // 调整这个参数可控制单个文件中stripe的个数,不配置单个文件stripe过多,影响下游使用,如果配置了ETL切分策略或启发式触发了ETL切分策略,就会使得Driver读取DataNode元数据太大,进而导致频繁GC,使得计算Partition的时间太长难以接受。
    hiveContext.setConf("hive.exec.orc.default.stripe.size","268435456")
    // 总共有三种策略{"HYBRID", "BI", "ETL"}), 默认是"HYBRID","This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation as opposed to query execution (split generation does not read or cache file footers). ETL strategy is used when spending little more time in split generation is acceptable (split generation reads and caches file footers). HYBRID chooses between the above strategies based on heuristics."),
    // 如果不配置,当orc文件大小大于spark框架估算的平均值256M时,会触发ETL策略,导致Driver读取DataNode数据切分split花费大量的时间。
    hiveContext.setConf("hive.exec.orc.split.strategy", "BI")

    spark2.2.0:

    // 创建一个支持Hive的SparkSession
    val sparkSession = SparkSession
      .builder()
      .appName("PvMvToBase")
      // 默认64M,即代表在压缩前数据量累计到64M就会产生一个stripe。与之对应的hive.exec.orc.default.row.index.stride=10000可以控制有多少行是产生一个stripe。
      // 调整这个参数可控制单个文件中stripe的个数,不配置单个文件stripe过多,影响下游使用,如果配置了ETL切分策略或启发式触发了ETL切分策略,就会使得Driver读取DataNode元数据太大,进而导致频繁GC,使得计算Partition的时间太长难以接受。
      .config("hive.exec.orc.default.stripe.size", 268435456L)
      // 总共有三种策略{"HYBRID", "BI", "ETL"}), 默认是"HYBRID","This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation as opposed to query execution (split generation does not read or cache file footers). ETL strategy is used when spending little more time in split generation is acceptable (split generation reads and caches file footers). HYBRID chooses between the above strategies based on heuristics."),
      // 如果不配置,当orc文件大小大于spark框架估算的平均值256M时,会触发ETL策略,导致Driver读取DataNode数据切分split花费大量的时间。
      .config("hive.exec.orc.split.strategy", "BI")
      .enableHiveSupport()
      .getOrCreate()

    Spark Shuffle六大问题 fetch操作、数据存储、文件个数、什么排序算法简单介绍
    MapReduce过程详解及其性能优化

  • 相关阅读:
    mybatis 查询list,内容为null,但list的size 为1
    mysql 父子表 注意事项
    导入
    php生成签名及验证签名
    PHP通过OpenSSL生成证书、密钥并且加密解密数据,以及公钥,私钥和数字签名的理解
    PHP 做 RSA 签名 生成订单(支付宝例子)
    接口安全调用该怎么做?签名?证书?服务安全?
    PHP 以POST方式提交XML、获取XML,最后解析XML
    php 解析xml 的四种方法
    php 模拟POST提交的2种方法
  • 原文地址:https://www.cnblogs.com/felixzh/p/8603426.html
Copyright © 2011-2022 走看看