zoukankan      html  css  js  c++  java
  • Kylin Cube构建过程优化

    原文地址:https://kylin.apache.org/docs16/howto/howto_optimize_build.html

    Kylin将一个cube的build过程分解为若干个子步骤,然后串行执行这些子步骤。这些步骤包括Hive操作,MR任务和其他类型的工作。如果每天都有许多cube进行build操作,那么肯定会办法加速这一过程。这里有一些建议可以参考,我们就按照build的顺序依次介绍。

    Create Intermediate Flat Hive Table

    该步骤会从源Hive表中抽取数据(将所有的相关表join之后的数据),并且插入到一个临时的扁平表中。如果cube是带有分区列的,Kylin将会增加一个时间条件,这样就会保证只有符合条件的数据才会被抓取。可以在日志中查看与该步骤相关的Hive命令。如下所示:

    hive -e "USE default;
    DROP TABLE IF EXISTS kylin_intermediate_airline_cube_v3610f668a3cdb437e8373c034430f6c34;
    
    CREATE EXTERNAL TABLE IF NOT EXISTS kylin_intermediate_airline_cube_v3610f668a3cdb437e8373c034430f6c34
    (AIRLINE_FLIGHTDATE date,AIRLINE_YEAR int,AIRLINE_QUARTER int,...,AIRLINE_ARRDELAYMINUTES int)
    STORED AS SEQUENCEFILE
    LOCATION 'hdfs:///kylin/kylin200instance/kylin-0a8d71e8-df77-495f-b501-03c06f785b6c/kylin_intermediate_airline_cube_v3610f668a3cdb437e8373c034430f6c34';
    
    SET dfs.replication=2;
    SET hive.exec.compress.output=true;
    SET hive.auto.convert.join.noconditionaltask=true;
    SET hive.auto.convert.join.noconditionaltask.size=100000000;
    SET mapreduce.job.split.metainfo.maxsize=-1;
    
    INSERT OVERWRITE TABLE kylin_intermediate_airline_cube_v3610f668a3cdb437e8373c034430f6c34 SELECT
    AIRLINE.FLIGHTDATE
    ,AIRLINE.YEAR
    ,AIRLINE.QUARTER
    ,...
    ,AIRLINE.ARRDELAYMINUTES
    FROM AIRLINE.AIRLINE as AIRLINE
    WHERE (AIRLINE.FLIGHTDATE >= '1987-10-01' AND AIRLINE.FLIGHTDATE < '2017-01-01');
    "
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    当执行Hive命令的时候,Kylin会使用conf/kylin_hive_conf.xml中的配置项,例如,使用更少的副本数和允许Hive的mapper连接操作。如果有需要的话,也可以添加其他的配置项。

    如果Cube的分区列(这里是“FLIGHTDATE ”)与Hive表的分区列是同一个列,那么对于该列进行过滤将会使Hive非常快速地剔除不符合条件的分区。因此强烈建议使用Hive的分区列(如果该分区列是日期)作为Cube的分区列。这对于数据非常多的表来说几乎是必须的,否则在执行这一步骤的时候,Hive每次都需要扫描所有的文件,会耗费很多时间。

    如果你的Hive允许文件合并,你可以在“conf/kylin_hive_conf.xml”中通过配置项来禁用这一功能。因为Kylin有它自己的文件合并方法(我们将在下面介绍):

    <property>
        <name>hive.merge.mapfiles</name>
        <value>false</value>
        <description>Disable Hive's auto merge</description>
    </property>
    • 1
    • 2
    • 3
    • 4
    • 5

    Redistribute intermediate table

    经过上一个步骤之后,Hive会在HDFS的目录中生成一些数据文件,但是一些文件可能会很大,而另外一些文件可能会很小甚至是空的。文件大小分布的不均衡也会导致后续的MR任务执行的不平衡:一些mapper任务会执行的很快,而其他的mapper可能会执行的很慢。为了使这些数据分布的更均匀一些,Kylin增加了该步骤用来重新分配各个数据文件中的数据。下面是一个简单的输出:

    total input rows = 159869711
    expected input rows per mapper = 1000000
    num reducers for RedistributeFlatHiveTableStep = 160
    • 1
    • 2
    • 3

    重分配数据之后执行的Hive命令如下所示:

    hive -e "USE default;
    SET dfs.replication=2;
    SET hive.exec.compress.output=true;
    SET hive.auto.convert.join.noconditionaltask=true;
    SET hive.auto.convert.join.noconditionaltask.size=100000000;
    SET mapreduce.job.split.metainfo.maxsize=-1;
    set mapreduce.job.reduces=160;
    set hive.merge.mapredfiles=false;
    
    INSERT OVERWRITE TABLE kylin_intermediate_airline_cube_v3610f668a3cdb437e8373c034430f6c34 SELECT * FROM kylin_intermediate_airline_cube_v3610f668a3cdb437e8373c034430f6c34 DISTRIBUTE BY RAND();
    "
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    首先,Kylin会获取临时表的行数;然后,基于这个行数就可以获取需要进行数据重分配的文件的数量。Kylin默认一个文件包含1百万行的数据。在这个例子中,一共有1.6亿行数据和160个reducer,并且每个reducer写一个文件。在后续的MR步骤中,Hadoop将会启动相同数量的mapper来对这些文件进行处理(通常1百万行数据的大小会比一个HDFS块要小)。如果日常的数据规模不是很大或者Hadoop集群有足够的资源,可以通过在“conf/kylin.properties”中将配置项“kylin.job.mapreduce.mapper.input.rows”设置为更小的值来获取更高的并发度,如下所示:

    kylin.job.mapreduce.mapper.input.rows=500000
    • 1

    其次,Kylin通过运行一个HiveQL “INSERT OVERWIRTE TABLE … DISTRIBUTE BY …” 来为指定数目的reducer重新分配数据行。

    在大多数情况下,Kylin会要求Hive随机地为这些reducer重新分配数据行,以保证这些文件在大小上相近。此时,重分配语句就是 “DISTRIBUTE BY RAND()”

    如果Cube中确定了一个“shard by”维度列(在Cube的“Advanced setting”界面进行设置),该维度列是一个基数很大的维度列(例如“USER_ID”),那么Kylin会要求Hive通过该维度列的值来重新分配数据。对于该列拥有相同值的数据行将会被分配在同一个文件中。这比随机分配要好,因为数据不仅被重新分配了,而且在没有消耗额外成本的情况下就对数据进行了重新分类,这对于后续的Cube构建过程是有帮助的。在特定的情况下,这种优化可以减少40%的构建时间。这种情况下的重分配语句就是 “DISTRIBUTE BY USER_ID”

    请注意:
    1.设置为“shard by”的维度列应该是一个基数很大的维度列,并且它会出现在很多cuboid中(不会是仅仅出现在很少的cuboid中)。使用它进行重分配可能会在每一个时间间隔上获得等分布;反之则会造成数据倾斜,而这则会减少构建速度。典型的适合情景就是:“USER_ID”,“SELLER_ID”,“PRODUCT”,“CELL_NUMBER”等等,一般基数应该大于1000(应该远远大于reducer的数目)。
    2.使用“shard by”在Cube存储中还有其他的优势,但不在本文的讨论范围中。

    Extract Fact Table Distinct Columns

    在此步骤中,Kylin通过运行一个MR任务来获取维度列的distinct值,用于进行字典编码。

    实际上,该步骤还做了其他的工作:通过使用HyperLogLog计数器预估每个cuboid的行数,依次来收集cube的统计信息。如果你发现mapper任务执行非常慢,通过就意味着cube设计的太复杂,可以参考:Cube设计优化,对cube进行优化,使cube更加精简。如果reducer发生了OOM错误,通常意味着cuboid的维度组合数太多或者默认的yarn内存分配不能满足需求。如果此步骤不能在合理的时间内完成,请重新对cube进行设计,因为真正的build过程会花费更长的时间。

    你可以减少采样比例(通过在kylin.properties中设置kylin.job.cubing.inmem.sampling.percent配置项),来加速该步骤的执行,但是这可能不会有太大的效果,而且还会影响cube统计信息的准确性,因此一般不推荐这么做。

    Build Dimension Dictionary

    在上一步中已经获得了所有维度列的distinct值,接着Kylin将会在内存中构建字典(在下个版本中将会将此操作移到MR任务中)。通常这一步会很快,但是如果distinct值的集合很大,Kylin可能会报错,例如,“Too high cardinality is not suitable for dictionary”。对于UHC(超大容量)列,请使用其他编码方式,例如“fixed_length”,“integer”等。

    Save Cuboid Statistics and Create HTable

    这两个步骤是轻量级的,并且很快。

    Build Base Cuboid

    这一步是通过临时表构建基本的cuboid,这是逐层算法的第一轮MR任务。Mapper的数量等于步骤二中reducer的数量;而Reducer(这里指的是本步骤中启动的reducer)的数量是通过cube的统计信息预估出来的:默认每500M使用一个reducer。如果你发现reducer的数目很少,可以通过在kylin.properties中对配置项“kylin.job.mapreduce.default.reduce.input.mb”设置更小的值,来获取更多的集群资源,如下所示:
    kylin.job.mapreduce.default.reduce.input.mb=200

    Build N-Dimension Cuboid

    这些步骤是逐层算法的处理过程,每一步都使用前一步的输出作为输入,然后去除某个维度进行聚合,生成一个子cuboid。例如,对于cuboid ABCD,去除维度A可以获得cuboid BCD,去除维度B可以获得cuboid ACD等。

    有些cuboid可以通过一个以上的父cuboid聚合而成,在这种情况下,Kylin将会选择最小的父cuboid。例如,AB能够通过ABC(id:1110)和ABD(id:1101)聚合生成,因此ABD会被作为父cuboid使用,因为它的id比ABC要小。基于以上处理,如果D的基数很小,那么此次聚合操作就会花费很小的代价。因此,当设计cube的rowkey顺序的时候,请记住,将低基数的维度列放在尾部。这不仅对cube的构建过程有好处,而且对cube查询也有好处,因为后聚合(应该是指在HBase查找对应cuboid的过程)也遵循这个规则。

    通常从N-D到(N/2)-D的构建过程很慢,因为这是一个cuboid爆炸增长的过程:N-D有1个cuboid,(N-1)-D有N个cuboid,(N-1)-D有N*(N-1)个cuboid等等。在(N/2)-D步骤之后,构建过程会越来越快。

    Build Cube

    这一步骤使用了一种新的算法来构建cube:逐块算法(也称作“in-mem”算法)。该算法只使用一轮MR任务来构建所有的cuboid,但它比逐层算法需要更多占用更多的内存。该步骤在执行的时候会使用“conf/kylin_job_conf_inmem.xml”中的相关配置项。默认情况下,每个mapper需要3G的内存。如果集群有足够大的内存,可以在“conf/kylin_job_conf_inmem.xml”中通过修改配置来获取更大的内存,这样就可以处理更多的数据,并且性能也会更好。修改配置如下所示:

    <property>
        <name>mapreduce.map.memory.mb</name>
        <value>6144</value>
        <description></description>
    </property>
    
    <property>
        <name>mapreduce.map.java.opts</name>
        <value>-Xmx5632m</value>
        <description></description>
    </property>
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    请注意,Kylin会根据数据分布(通过cube统计信息获取)自动地选择合适的cube构建算法。一般不需要显示地选择使用哪个算法。

    Convert Cuboid Data to HFile

    这一步会启动一个MR任务用来将cuboid文件(顺序文件格式)转换为Hbase的HFile文件。Kylin通过cube的统计信息来计算HBase的region个数,默认每个region大小是5G。Region数越多,就会使用更多的reducer。如果发现reducer的数目很少,并且性能很差,就可以在“conf/kylin.properties”中增加如下配置项:

    kylin.hbase.region.cut=2
    kylin.hbase.hfile.size.gb=1
    • 1
    • 2

    如果不能确定一个HBase的region该设置为多大,请联系HBase管理员。

    Load HFile to HBase Table

    这一步使用了HBase API将HFile导入到HBase的region中,这一步很简单,也很快。

    Update Cube Info

    将数据导入Hbase中之后,Kylin会将新生成的segment在元数据中的状态修改为ready。这一步也非常快。

    Cleanup

    这一步主要就是从Hive中删除临时表。由于在上一步中,已经将segment的状态修改为ready,所以这一步的操作不会对segment产生任何影响。即使这一步执行发生了错误,也不需要担心,因为所有的垃圾都会在Kylin执行StorageCleanupJob的时候进行回收。

  • 相关阅读:
    linux系统telnet端口不通能收到SYN但不回SYN+ACK响应问题排查(转载)
    leveldb
    SSTable and Log Structured Storage: LevelDB
    fio terse输出详解
    bash的循环中无法保存变量
    怎样当好一个师长
    共享变量的并发读写
    Storage Systems topics and related papers
    Storage System and File System Courses
    调试std::string
  • 原文地址:https://www.cnblogs.com/qqflying/p/9267154.html
Copyright © 2011-2022 走看看