zoukankan      html  css  js  c++  java
  • 【原创】大数据基础之Hive(5)性能调优Performance Tuning

    1 compress & mr

    hive默认的execution engine是mr

    hive> set hive.execution.engine;
    hive.execution.engine=mr

    所以针对mr的优化就是hive的优化,比如压缩和临时目录

    mapred-site.xml

        <property>
            <name>mapreduce.map.output.compress</name>  
            <value>true</value>
        </property>
        <property>
            <name>mapred.map.output.compress.codec</name>  
            <value>org.apache.hadoop.io.compress.SnappyCodec</value>
        </property>

    yarn-site.xml

        <property>
            <name>yarn.nodemanager.local-dirs</name>
            <value>${hadoop.tmp.dir}/nm-local-dir</value>
        </property>

    1.1 hive启用压缩

    set hive.exec.compress.output=true;

    This controls whether the final outputs of a query (to a local/hdfs file or a Hive table) is compressed. The compression codec and other options are determined from Hadoop configuration variables mapred.output.compress* .

    set hive.exec.compress.intermediate=true;

    This controls whether intermediate files produced by Hive between multiple map-reduce jobs are compressed. The compression codec and other options are determined from Hadoop configuration variables mapred.output.compress*.

    2 decompose

    decomposing table data sets into more manageable parts

    将表数据分解成多个部分(文件或目录),这样就可以根据where条件跳过不需要的数据,有3种方式

    2.1 partition

    分区

    Hive Partitioning provides a way of segregating hive table data into multiple files/directories.

    Partitioned tables can be created using the PARTITIONED BY clause. A table can have one or more partition columns and a separate data directory is created for each distinct value combination in the partition columns. Further, tables or partitions can be bucketed using CLUSTERED BY columns, and data can be sorted within that bucket via SORT BY columns. This can improve performance on certain kinds of queries.

    常见的比如按时间分区(time Partitioning)

    PARTITIONED BY(dt STRING, country STRING)

    2.2 bucket

    分桶

    Bucketed tables are fantastic in that they allow much more efficient sampling than do non-bucketed tables, and they may later allow for time saving operations such as mapside joins. However, the bucketing specified at table creation is not enforced when the table is written to, and so it is possible for the table's metadata to advertise properties which are not upheld by the table's actual layout. This should obviously be avoided.

    How does Hive distribute the rows across the buckets? In general, the bucket number is determined by the expression hash_function(bucketing_column) mod num_buckets.

    Generally, in the table directory, each bucket is just a file, and Bucket numbering is 1-based.

    CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS

    set hive.enforce.bucketing = true; (for Hive 0.x and 1.x)

    参考:
    https://cwiki-test.apache.org/confluence/display/Hive/LanguageManual+DDL+BucketedTables
    https://data-flair.training/blogs/bucketing-in-hive/

    2.3 skewed

    倾斜

    This feature can be used to improve performance for tables where one or more columns have skewed values. By specifying the values that appear very often (heavy skew) Hive will split those out into separate files (or directories in case of list bucketing) automatically and take this fact into account during queries so that it can skip or include the whole file (or directory in case of list bucketing) if possible.

    SKEWED BY (key) ON (1,5,6) [STORED AS DIRECTORIES]

    3 index

    Indexing Is Removed since 3.0

    索引在3.0版本中被删除,替代的方式是物化视图(Materialized View,熟悉oracle的人应该不陌生),或者使用列存储格式(比如parquet、orc);

    There are alternate options which might work similarily to indexing:

    • Materialized views with automatic rewriting can result in very similar results. Hive 2.3.0 adds support for materialzed views.
    • Using columnar file formats (Parquet, ORC) – they can do selective scanning; they may even skip entire files/blocks.

    3.1 Materialized view

    物化视图

    Traditionally, one of the most powerful techniques used to accelerate query processing in data warehouses is the pre-computation of relevant summaries or materialized views.

    Using a materialized view, the optimizer can compare old and new tables, rewrite queries to accelerate processing, and manage maintenance of the materialized view when data updates occur. The optimizer can use a materialized view to fully or partially rewrite projections, filters, joins, and aggregations. Hive stores materialized views in the Hive warehouse or Druid.

    CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db_name.]materialized_view_name AS <query>;

    参考:
    https://cwiki.apache.org/confluence/display/Hive/Materialized+views
    https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Indexing

    4 format

    数据存储格式

    4.1 lzo

    STORED AS INPUTFORMAT
    'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
    OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

    4.2 orc

    STORED AS ORC
    STORED AS ORC TBLPROPERTIES ("orc.compression"="ZLIB")

    注意设置orc压缩格式前一定要先设置:

    set hive.exec.orc.compression.strategy=COMPRESSION;

    否则压缩不生效;

    4.2.1

    set hive.orc.zerocopy=true;

    ORC can use the new HDFS Caching APIs and the ZeroCopy readers to avoid extra data copies into memory while scanning files.

    4.3 parquet

    STORED AS PARQUET

    4.4 snappy

    STORED AS PARQUET TBLPROPERTIES ("parquet.compression"="SNAPPY")
    STORED AS ORC TBLPROPERTIES ("orc.compression"="SNAPPY")

    lzo支持详见:https://www.cnblogs.com/barneywill/p/10439181.html
    有关格式的对比测试详见:https://www.cnblogs.com/barneywill/p/10109508.html

    5 vectorization

    Vectorized query execution is a Hive feature that greatly reduces the CPU usage for typical query operations like scans, filters, aggregates, and joins. A standard query execution system processes one row at a time. This involves long code paths and significant metadata interpretation in the inner loop of execution. Vectorized query execution streamlines operations by processing a block of 1024 rows at a time. Within the block, each column is stored as a vector (an array of a primitive data type). Simple operations like arithmetic and comparisons are done by quickly iterating through the vectors in a tight loop, with no or very few function calls or conditional branches inside the loop. These loops compile in a streamlined way that uses relatively few instructions and finishes each instruction in fewer clock cycles, on average, by effectively using the processor pipeline and cache memory.

    set hive.vectorized.execution.enabled=true;
    set hive.vectorized.execution.reduce.enabled=true;
    set hive.vectorized.execution.reduce.groupby.enabled=true;

    参考:https://cwiki.apache.org/confluence/display/Hive/Vectorized+Query+Execution

    6 join

    6.1 Common Join

    即shuffle

    Use Mappers to do the parallel sort of the tables on the join keys, which are then passed on to reducers. All of the tuples with same key is given to same reducer. A reducer may get tuples for more than one key. Key for tuple will also include table id, thus sorted output from two different tables with same key can be recognized. Reducers will merge the sorted stream to get join output.

    6.2 Map Join

    Useful for star schema joins, this joining algorithm keeps all of the small tables (dimension tables) in memory in all of the mappers and big table (fact table) is streamed over it in the mapper. This avoids shuffling cost that is inherent in Common-Join. For each of the small table (dimension table) a hash table would be created using join key as the hash table key.

    set hive.auto.convert.join=true;
    set hive.auto.convert.join.noconditionaltask = true;
    set hive.auto.convert.join.noconditionaltask.size = 10000000;

    MAPJOINs are processed by loading the smaller table into an in-memory hash map and matching keys with the larger table as they are streamed through. The prior implementation has this division of labor:

    • Local work:
      • read records via standard table scan (including filters and projections) from source on local machine
      • build hashtable in memory
      • write hashtable to local disk
      • upload hashtable to dfs
      • add hashtable to distributed cache
    • Map task
      • read hashtable from local disk (distributed cache) into memory
      • match records' keys against hashtable
      • combine matches and write to output
    • No reduce task

    6.3 Bucket Map Join

    If the joining keys of map-join are bucketed then instead of keeping whole of small table (dimension table) in every mapper, only the matching buckets will be kept. This reduces the memory footprint of the map-join.

    set hive.auto.convert.sortmerge.join=true;
    set hive.optimize.bucketmapjoin = true;

    6.4 SMB Join

    This is an optimization on Bucket Map Join; if data to be joined is already sorted on joining keys then hash table creation is avoided and instead a sort merge join algorithm is used.

    set hive.optimize.bucketmapjoin.sortedmerge = true;

    SMB joins are used wherever the tables are sorted and bucketed. The join boils down to just merging the already sorted tables, allowing this operation to be faster than an ordinary map-join. However, if the tables are partitioned, there could be a slow down as each mapper would need to get a very small chunk of a partition which has a single key.

    6.5 Skew Join

    If the distribution of data is skewed for some specific values, then join performance may suffer since some of the instances of join operators (reducers in map-reduce world) may get over loaded and others may get under utilized. On user hint, hive would rewrite a join query around skew value as union of joins.

    set hive.optimize.skewjoin=true;
    set hive.skewjoin.key=100000;

    Whether to enable skew join optimization. The algorithm is as follows: At runtime, detect the keys with a large skew. Instead of processing those keys, store them temporarily in an HDFS directory. In a follow-up map-reduce job, process those skewed keys. The same key need not be skewed for all the tables, and so, the follow-up map-reduce job (for the skewed keys) would be much faster, since it would be a map-join.

    6.6 semi join

    LEFT SEMI JOIN implements the uncorrelated IN/EXISTS subquery semantics in an efficient way. As of Hive 0.13 the IN/NOT IN/EXISTS/NOT EXISTS operators are supported using subqueries so most of these JOINs don't have to be performed manually anymore.

    参考:
    https://medium.com/hotels-com-technology/skew-join-optimization-in-hive-b66a1f4cc6ba
    https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Joins

    7 merge small files (too many mappers)

    set hive.hadoop.supports.splittable.combineinputformat=true;

    Whether to combine small input files so that fewer mappers are spawned.

    set mapreduce.input.fileinputformat.split.minsize=1;
    set mapreduce.input.fileinputformat.split.maxsize=256000000;

    The minimum size chunk that map input should be split into. Note that some file formats may have minimum split sizes that take priority over this setting.

    set hive.merge.mapfiles=true;

    Merge small files at the end of a map-only job.

    set hive.merge.mapredfiles=true;

    Merge small files at the end of a map-reduce job.

    8 Stats & CBO(Cost-Based Optimizer)

    Most of the existing query optimizations in Hive are about minimizing shuffling cost. Currently user would have to submit an optimized query to Hive with right join order for query to be executed efficiently. Logical optimizations in Hive are limited to filter push down, projection pruning and partition pruning. Cost based logical optimizations can significantly improve Apache Hive’s query latency and ease of use.

    Join reordering and join algorithm selection are few of the optimizations that can benefit from a cost based optimizer. Cost based optimizer would free up user from having to rearrange joins in the right order or from having to specify join algorithm by using query hints and configuration options. This can potentially free up users to model their reporting and ETL needs close to business process without having to worry about query optimizations.

    Calcite is an open source cost based query optimizer and query execution framework. Calcite currently has more than fifty query optimization rules that can rewrite query tree, and an efficient plan pruner that can select cheapest query plan in an optimal manner.

    CBO will be introduced in to Hive in a Phased manner. In the first phase, Calcite would be used to reorder joins and to pick right join algorithm so as to reduce query latency. Table cardinality and Boundary statistics will be used for this cost based optimizations.


    Hive’s Cost-Based Optimizer (CBO) is a core component in Hive’s query processing engine. Powered by Apache Calcite, the CBO optimizes and calculates the cost of various plans for a query.

    The main goal of a CBO is to generate efficient execution plans by examining the tables and conditions specified in the query, ultimately cutting down on query execution time and reducing resource utilization. After parsing, a query gets converted to a logical tree (Abstract Syntax Tree) that represents the operations that the query must perform, such as reading a particular table or performing an inner JOIN.

    Calcite applies various optimizations such as query rewrite, JOIN reordering, and deriving implied predicates and JOIN elimination to produce logically equivalent plans. The current model prefers bushy plans for maximum parallelism. Each logical plan is assigned a cost based in number of distinct value based heuristics.

    Calcite has an efficient plan pruner that can select the cheapest query plan. The chosen logical plan is then converted by Hive to a physical operator tree, optimized and converted to Tez jobs, and then executed on the Hadoop cluster.


    Enabling Cost-Based Optimization

    set hive.cbo.enable=true;
    set hive.stats.autogather=true;
    set hive.compute.query.using.stats=true;
    set hive.stats.fetch.partition.stats=true;
    set hive.stats.fetch.column.stats=true;

    Generating Hive Statistics

    ANALYZE TABLE [table_name] COMPUTE STATISTICS;
    ANALYZE TABLE [table_name] PARTITION(partition_column) COMPUTE STATISTICS;
    ANALYZE TABLE [table_name] COMPUTE STATISTICS for COLUMNS [comma_separated_column_list];

    Viewing Generated Statistics

    DESCRIBE [EXTENDED] table_name;
    DESCRIBE FORMATTED [db_name.]table_name.column_name;

    参考:https://cwiki.apache.org/confluence/display/Hive/Cost-based+optimization+in+Hive

    9 correlation

    set hive.optimize.correlation=true;

    In Hadoop environments, an SQL query submitted to Hive will be evaluated in distributed systems. Thus, after generating a query operator tree representing the submitted SQL query, Hive needs to determine what operations can be executed in a task which will be evalauted in a single node. Also, since a MapReduce job can shuffle data data once, Hive also needs to cut the tree to multiple MapReduce jobs. It is important to cut an operator tree to multiple MapReduce in a good way, so the generated plan can evaluate the query efficiently.

    In a more complex query, correlation-unaware query planning can generate a very inefficient execution plan and result in poor performance.

    参考:https://cwiki.apache.org/confluence/display/Hive/Correlation+Optimizer

    10 write a good sql

    10.1 execution plan

    看懂执行计划

    hive> explain $sql;

    10.2 good practice

    11 engine

    切换engine

    set hive.query.engine=spark;

    12 other

    12.1 parallel

    set hive.exec.parallel=true;

    Whether to execute jobs in parallel. Applies to MapReduce jobs that can run in parallel, for example jobs processing different source tables before a join. As of Hive 0.14, also applies to move tasks that can run in parallel, for example moving files to insert targets during multi-insert.

    12.2 limit is very slow

    limit默认会被转换为本地fetch执行,如果在大表上加复杂查询条件的limit需要指定提交到yarn执行

    set hive.fetch.task.conversion=more;

    Some select queries can be converted to a single FETCH task, minimizing latency. Currently the query should be single sourced not having any subquery and should not have any aggregations or distincts (which incur RS – ReduceSinkOperator, requiring a MapReduce task), lateral views and joins.

    Supported values are none, minimal and more.
    0. none: Disable hive.fetch.task.conversion
    1. minimal: SELECT *, FILTER on partition columns (WHERE and HAVING clauses), LIMIT only
    2. more: SELECT, FILTER, LIMIT only (including TABLESAMPLE, virtual columns)

    set hive.fetch.task.conversion.threshold=1073741824;

    Input threshold (in bytes) for applying hive.fetch.task.conversion. If target table is native, input length is calculated by summation of file lengths. If it's not native, the storage handler for the table can optionally implement the org.apache.hadoop.hive.ql.metadata.InputEstimator interface. A negative threshold means hive.fetch.task.conversion is applied without any input length threshold.

    12.3 sample

    只想快速查看部分数据

    set hive.limit.optimize.enable=true;

    Whether to enable to optimization to trying a smaller subset of data for simple LIMIT first.

    set hive.optimize.sampling.orderby=true;

    Uses sampling on order-by clause for parallel execution.

    调优

    set hive.exec.compress.output=true;
    set hive.exec.compress.intermediate=true;
    set hive.vectorized.execution.enabled = true;
    set hive.optimize.skewjoin=true;
    set hive.hadoop.supports.splittable.combineinputformat=true;
    set hive.optimize.correlation=true;
    set hive.exec.parallel=true;
    set hive.orc.zerocopy=true;


    set hive.cbo.enable=true;
    set hive.stats.autogather=true;
    set hive.compute.query.using.stats=true;
    set hive.stats.fetch.partition.stats=true;
    set hive.stats.fetch.column.stats=true;
    set hive.auto.convert.join=true;
    set hive.auto.convert.join.noconditionaltask = true;
    set hive.map.aggr=true;
    set hive.optimize.reducededuplication=true;
    set hive.optimize.distinct.rewrite=true;
    set hive.optimize.groupby=true;
    set hive.merge.mapfiles=true;
    set hive.merge.mapredfiles=true;
    set hive.merge.orcfile.stripe.level=true;
    set hive.optimize.sort.dynamic.partition=true;
    set hive.mapjoin.lazy.hashtable=true;


    参考:
    https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties
    https://streever.atlassian.net/wiki/spaces/HADOOP/pages/1933314/Hive+Performance+Tips
    https://hortonworks.com/blog/5-ways-make-hive-queries-run-faster/
    https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.5/bk_hive-performance-tuning/content/ch_cost-based-optimizer.html

  • 相关阅读:
    2020软件工程个人作业06——软件工程实践总结作业
    git上传文件夹内容
    git常用命令(部分)
    java命令行输入参数
    2020软件工程作业05
    软件工程——问题清单
    2020软件工程作业04
    2020软件工程作业03
    2020软件工程作业02
    问题清单
  • 原文地址:https://www.cnblogs.com/barneywill/p/10445642.html
Copyright © 2011-2022 走看看