zoukankan      html  css  js  c++  java
  • hive调优之SQL语法和运行参数

    hive语法和运行参数层面,主要写出高效运行SQL,并且利用一些运行参数进行调优SQL执行

    查看hive执行计划

    hive的SQL语句在执行之前需要将SQL语句转换成MapReduce任务,因此需要了解转换过程,可以再SQL语句中输入如下命令查看具体的执行计划。

    explain [extended] query 	--查看执行计划,添加extended关键字可以查看更详细的查询计划
    

    示例如下

    explain select department, count(*) as total from student where age >= 18 group
    by department order by total desc limit 3;
    

    关于 Hive 的执行计划中的 Operator 的概念:(逻辑执行计划:Operator Tree)

    select ... from ... where ... group by ... having ... order by .... limit ....
    select: FetchOperator
    from: TableScanOperator
    where+having: FilterOperator
    .....
    

    列裁剪

    列裁剪就是在查询时只读取需要列,当列很多或者数据量很大时,如果查询时不指定字段(如:select * from table)执行全表扫描效率很低。

    hive在读取数据时,可以只读取计算中所需要用到的列的数据,忽略不需要计算使用数据列,这样可以节省数据读取开销、中间表存储开销和数据整合开销。

    set hive.optimize.op = true;   --开启列裁剪,读取数据只加载所需要计算数据,此设置默认为true
    

    谓词下推

    将SQL语句中的where谓词逻辑都尽可能提前执行,减少下游处理的数据量。对应逻辑优化器是PredicatePushDown。

    set hive.optimize.ppd = true;	--此设置默认为true
    

    示例如下

    select a.*,b.* from a join b on a.id = b.id where b.age > 20;
    

    优化后:

    select a.*,c.* from a join (select * from b where age > 20)c on a.id = c.id
    

    分区裁剪

    分区裁剪就是只读取所需要的分区。当分区很多且数量特别大时,如果不指定分区执行全表扫描效率就很低。所以在查询过程中只选择所需分区数据可以减少读入数据量提高查询效率。

    set hive.optimize.pruner = true;	--此设置默认为true
    

    在hive SQL解析阶段对应的则是ColumnPruner逻辑优化器。

    合并小文件

    大数据技术组件主要存在问题

    1. 存储组件主要是海量小文件
    2. 计算组件主要是数据倾斜

    如果MapReduce job 碰到一堆小文件作为输入,一个文件启动一个Task任务,在MapReduce编程模型中CombineFileInputFormat 可以将一个节点或者同一个机架上的多个小文件划分为同一个切片,hive默认使用InputFormat的TextInputFormat处理。

    1. Map 输入合并

      在执行 MapReduce 程序的时候,一般情况是一个文件的一个数据分块需要一个 mapTask 来处理。但是如果数据源是大量的小文件,这样就会启动大量的 mapTask 任务,这样会浪费大量资源。可以将输入的小文件进行合并,从而减少 mapTask 任务数量。

      --Map端输入合并文件之后按照block的大小分割(默认设置)
      set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
      
      --Map端输入不合并小文件
      set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
      
    2. Map和Reduce输出合并

      大量的小文件会给 HDFS 带来压力,影响处理效率。可以通过合并 Map 和 Reduce 的结果文件来消除影响。如果多个节点都是一个小文件,这种情况主要是在输出并没有进行文件合并,且输入切片为256M,则需要通过进行合并小文件进行输出合并。

      set hive.merge.mapfiles=true;	--Map输出文件合并,默认设置为true
      set hive.merge.mapredfiles=true;	--Reduce端输出文件合并,默认设置为false
      set hive.merge.size.per.task=256000000;	--合并文件的大小,默认值为256000000(256M)
      set mapred.max.split.size=256000000;	--每个Map 最大分割大小
      set mapred.min.split.size.per.node=1; 	--节点上split的最少值,服务器节点
      set mapred.min.split.size.per.rack=1; // 服务器机架
      

      hive。merge.size.per.task 和 mapred.min.split.size.per.node联合配置

      1. 默认情况把这个节点上的所有数据进行合并,如果合并的那个文件超过了256M就需要开启另外一个文件继续合并
      2. 如果当前节点上的数据不足256M,那么久合并成一个逻辑切片。

    MapTask并行度

    当Mapper阶段计算数据来源

    1. MapReduce中的MapTask的并行度机制

      现有100个节点,每个任务512M的数据,但是只有40个节点运行一个任务

      • Map数过大:当输入文件特别大,MapTask 特别多,每个计算节点分配执行的MapTask都很多,这时候可以考虑减少MapTask的数量,增大MapTask处理的数据量,而且MapTask过多最终生成的结果文件数也会增多。
        1. Map 阶段输出文件太小,产生大量小文件
        2. 初始化和创建Map开销很大
      • Map数过小:当输入文件很大,任务逻辑复杂,MapTask执行非常慢时,可以考虑增加MapTask数,来使得每个MapTask处理的数据量减少,从而提高任务的执行效率。
        1. 文件处理或查询并发度小,Job 执行时间过长
        2. 大量作业时,容易阻塞集群

      在MapReduce编程实例中,一个MapReduce Job的MapTask数量是有输入切片InputSplit 决定的,而输入分片是由FileInputFormat.getSplit() 决定,一个输入分片对应一个MapTask,而输入分片是由下列参数决定

      参数 默认值 意义
      dfs.blocksize 128M HDFS默认数据块大小
      mapreduce.input.fileinputformat.split.minsize 1M 最小分片大小(MR)
      mapreduce.input.fileinputformat.split.maxsize 256M 最大分片大小(MR)

      输入分片大小计算:针对数据是原始数据情况,最终调整splitsize的大小最好是blocksize的整数倍,计算规则如下

      long split = Math.max(minsize,Math.min(maxsize,blocksize))
      

      默认情况下,输入分片大小和HDFS集群默认数据块大小一致,即默认一个数据块启动一个MapTask处理,可以避免服务器节点之间数据传输提高Job处理效率。

      控制MapTask数方案

      • 减少MapTask 数通过合并小文件,主要是针对数据源做处理
      • 增加MapTask数可以控制Job的ReduceTask个数
      • 推荐使用HDFS默认切块大小,若要调整需将调整成为HDFS默认切块大小的N倍
    2. 合理控制MapTask数量

      • 减少MapTask数可以通过合并小文件来实现
      • 增加MapTask数可以通过控制ReduceTask默认MapTask个数

      计算方式如下:

      输入文件总数为:total_size
      HDFS设置数据块大小:dfs_block_size
      默认MapTask个数为 default_mapper_num = total_size/dfs_block_size
      

      MapReduce 中提供了如下参数来控制 map 任务个数,从字面上看,貌似是可以直接设置 MapTask 个数的样子,但是很遗憾不行,这个参数设置只有在大于 default_mapper_num 的时候,才会生效。

      set mapred.map.tasks=10;	--默认值为2
      

      如果要减少MapTask数量可以通过设置mapred.min.split.size设置每个任务处理文件的大小,设置文件大小只有大于dfs_block_size时才会生效。计算方式如下:

      split_size = max(mapred.min.split.size, dfs_block_size)
      split_num = total_size / split_size
      compute_map_num = Math.min(split_num, Math.max(default_mapper_num,mapred.map.tasks))
      

      控制Mapper个数方法

      • 若想增加MapTask个数,设置mapred.map.tasks为一个较大值
      • 若想减少MapTask个数,设置map.min.split.size为一个较大值
      • 如果输入大量小文件想减少Mapper个数,可以通过设置hive.input.format 合并小文件

    如果想要调整 mapper 个数,在调整之前,需要确定处理的文件大概大小以及文件的存在形式(是大量小文件,还是单个大文件),然后再设置合适的参数。不能盲目进行暴力设置,不然适得其反。

    MapTask 数量与输入文件的 split 数息息相关,在 Hadoop 源码org.apache.hadoop.mapreduce.lib.input.FileInputFormat 类中可以看到 split 划分的具体逻辑。可以直接通过参数 mapred.map.tasks (默认值2)来设定 MapTask 数的期望值,但它不一定会生效。

    ReduceTask并行度

    如果 ReduceTask 数量过多,一个 ReduceTask 会产生一个结果文件,这样就会生成很多小文件,那么如果这些结果文件会作为下一个 Job 的输入,则会出现小文件需要进行合并的问题,而且启动和初始化ReduceTask 需要耗费资源。
    如果 ReduceTask 数量过少,这样一个 ReduceTask 就需要处理大量的数据,并且还有可能会出现数据倾斜的问题,使得整个查询耗时长。 默认情况下,Hive 分配的 reducer 个数由下列参数决定:
    Hadoop MapReduce 程序中,ReducerTask 个数的设定极大影响执行效率,ReducerTask 数量与输出文件的数量相关。如果 ReducerTask 数太多,会产生大量小文件,对HDFS造成压力。如果ReducerTask 数太少,每个ReducerTask 要处理很多数据,容易拖慢运行时间或者造成 OOM。这使得Hive 怎样决定 ReducerTask 个数成为一个关键问题。遗憾的是 Hive 的估计机制很弱,不指定ReducerTask 个数的情况下,Hive 会猜测确定一个ReducerTask 个数,基于以下两个设定:

    参数1:hive.exec.reducers.bytes.per.reducer	--默认为256M
    参数2:mapreduce.exec.reduces.max				--默认为1009
    参数3:mapreduce.job.reduces					--默认值为-1,标识未进行设置,按照设置为2执行
    

    ReduceTask计算公式

    N = Math.min(参数2,输入数据大小/参数1)
    

    可以通过改变上述两个参数的值来控制 ReduceTask 的数量。 也可以通过下列参数进行设置

    set mapred.map.tasks=10;
    set mapreduce.job.reduces=10;
    

    通常情况下,有必要手动指定 ReduceTask 个数。考虑到 Mapper 阶段的输出数据量通常会比输入有大幅减少,因此即使不设定 ReduceTask 个数,重设 参数2 还是必要的。
    依据经验,可以将 参数2 设定为 M * (0.95 * N) (N为集群中 NodeManager 个数)。一般来说,NodeManage 和 DataNode 的个数是一样的。

    Join优化

    • Join优化整体原则

      1. 优先考虑再进行Join操作,最大限度地减少参与Join的数据量(where能用就用)
      2. 小表join大表,最好启动mapjoin,hive自动启动mapjoin,小表不能超过25M(默认25M可以进行更改)
      3. join on 条件相同最好放入同一个Job中,并且join表的排列顺序从小到大
      4. 多张表join操作,多个连接条件都相同会进行转化到同一个Job中
    • 有限过滤数据

      尽量减少每个阶段数据量,对于分区表能用分区字段过滤尽量使用,同时只选择后面需要使用到的列,最大限度减少参与join的数据量。

    • 小表join大表原则

      小表join大表时应遵循小表join大表原则,因为join操作reduce阶段位于join左边表的内容会加载到内存中,将数据量小的表放在左边,可以有效减少发生内存溢出几率,join执行是从左到右生成join,应该保证连续查询中标的大小从左到右依次递增的顺序。

    • 使用相同连接键

      在hive中,当对 3 个或更多张表进行 join 时,如果 on 条件使用相同字段,那么它们会合并为一个MapReduce Job,利用这种特性,可以将相同的 join on 放入一个 job 来节省执行时间。

    • 尽量原子操作

      尽量避免一个SQL包含复杂的逻辑,可以使用中间表来完成复杂的逻辑。

    • 大表join大表

      1. 空key过滤:有时join超时是因为某些key对应的数据太多,而相同key对应的数据都会发送到相同的reducer上,从而导致内存不够。此时我们应该仔细分析这些异常的key,很多情况下,这些key对应的数据是异常数据,我们需要在SQL语句中进行过滤。
      2. 空key转换:有时虽然某个key为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在join的结果中,此时我们可以表a中key为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的reducer上

    启动MapJoin

    关于hive调优中,能用就用的原则和方法:

    1. where 能用则用原则,数据join执行能进行数据过滤就进行数据过滤
    2. mapjoin 在资源充足情况下进行开启
    3. 再不改变实际业务计算的情况下,可以进行局部combiner

    MapJoin 是将 join 双方比较小的表直接分发到各个 map 进程的内存中,在 map 进程中进行 join 操作,这样就不用进行 reduce 步骤,从而提高了速度。只有 join 操作才能启用 MapJoin。参数设置如下:

    --是否根据输入小表的大小,自动将reduce端的common join 转化为map join,将小表刷入内存中
    set hive.auto.convert.join = true;	--对应逻辑优化器是MapJoinProcessor
    set hive.mapjoin.smalltable.filesize = 25000000;	--刷入内存表的大小(字节) 25M = 2G
    set hive.auto.convert.join.noconditionaltask=true;	--hive会基于表的size自动的将普通join转换成mapjoin
    set hive.auto.convert.join.noconditionaltask.size=10000000;	--多大的表可以自动触发放到内层LocalTask中,默认大小10M
    

    Hive 可以进行多表 Join。Join 操作尤其是 Join 大表的时候代价是非常大的。MapJoin 特别适合大小表join的情况。在Hive join场景中,一般总有一张相对小的表和一张相对大的表,小表叫 build table,大表叫 probe table。Hive 在解析带 join 的 SQL 语句时,会默认将最后一个表作为 probe table,将前面的表作为 build table 并试图将它们读进内存。如果表顺序写反,probe table 在前面,引发 OOM 的风险就高了。在维度建模数据仓库中,事实表就是 probe table,维度表就是 build table。这种 Join 方式在 map 端直接完成 join 过程,消灭了 reduce,效率很高。而且 MapJoin 还支持非等值连接。

    当 Hive 执行 Join 时,需要选择哪个表被流式传输(stream),哪个表被缓存(cache)。Hive 将JOIN 语句中的最后一个表用于流式传输,因此我们需要确保这个流表在两者之间是最大的。如果要在不同的 key 上 join 更多的表,那么对于每个 join 集,只需在 ON 条件右侧指定较大的表。可以进行手动开启MapJoin如下:

    --SQL方式,在SQL语句中添加MapJoin标记(mapjoin hint),将小表放到内存中,省去shffle操作
    SELECT /*MAPJOIN(smalltable)*/ smalltable.key,bigtable.value FROM smalltable JOIN bigtable ON smalltable.key = bigtable.key
    

    Sort-Merge-Bucket(SMB)Map Join是一种Hive Join 优化技术,使用这个技术的前提是所有的表都必须是分桶表(bucket)和分桶排序的(sort),并对分桶表进行优化。具体实现如下:

    1. 针对参与join的这两张做相同的hash散列,每个桶里面的数据还要排序
    2. 两张表数据个数必须成倍数
    3. 开启SMB Join

    具体参数设置如下:

    --当用户执行bucket map join的时候,发现不能执行时,禁止查询
    set hive.enforce.sortmergebucketmapjoin=false; 
    -- join的表通过sort merge join的条件,join是否会自动转换为sort merge join
    set hive.auto.convert.sortmerge.join=true;	
    --当两个分桶表 join 时,如果 join on的是分桶字段,小表的分桶数是大表的倍数时,可以启用mapjoin 来提高效率。
    set hive.optimize.bucketmapjoin=false;	--bucket map join优化,默认值是 false
    set hive.optimize.bucketmapjoin.sortedmerge=false;	--bucket map join 优化,默认值是 false
    

    join数据倾斜优化

    在编写join查询语句时,如果确定是由于join出现的数据倾斜,那么请做如下设置

    --join的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置
    set hive.skewjoin.key=100000;
    
    set hive.optimize.skewjoin=false;	--如果是join过程出现倾斜应该设置为true
    

    如果开启了,在 Join 过程中 Hive 会将计数超过阈值 hive.skewjoin.key(默认100000)的倾斜 key 对应的行临时写进文件中,然后再启动另一个 job 做 map join 生成结果。

    通过 hive.skewjoin.mapjoin.map.tasks 参数还可以控制第二个 job 的 mapper 数量,默认10000。

    set hive.skewjoin.mapjoin.map.tasks=10000;
    

    CBO优化

    join的时候表的顺序关系:前面的表都会被夹在到内存中,后面的表进行磁盘扫描

    select a.*,b.*,c.* from a join b on a.id = b.id join c on a.id = c.id;
    

    Hive 自 0.14.0 开始,加入了一项 "Cost based Optimizer" 来对 HQL 执行计划进行优化,这个功能通过 "hive.cbo.enable" 来开启。在 Hive 1.1.0 之后,这个 feature 是默认开启的,它可以 自动优化 HQL中多个 Join 的顺序,并选择合适的 Join 算法。

    CBO成本优化器,代价最小的执行计划就是最好的执行计划。传统的数据库,成本优化器做出最优化的执行计划是依据统计信息来计算的。Hive 的成本优化器也一样。

    Hive 在提供最终执行前,优化每个查询的执行逻辑和物理执行计划。这些优化工作是交给底层来完成的。根据查询成本执行进一步的优化,从而产生潜在的不同决策:如何排序连接,执行哪种类型的连接,并行度等等。

    要使用基于成本的优化(也称为 CBO),请在查询开始设置以下参数:

    set hive.cbo.enable=true;
    set hive.compute.query.using.stats=true;
    set hive.stats.fetch.column.stats=true;
    set hive.stats.fetch.partition.stats=true;
    

    笛卡尔积处理

    需求:笛卡尔积操作,默认情况下是不允许使用的!底层使用一个ReduceTask来做。笛卡尔积实现方式如下:

    select a.*, b.* from a, b;
    select a.*, b.* from a, b where a.id = b.id;
    select a.*, b.* from a join b on a.id = b.id;
    select a.*, b.* from a cross join b
    

    当 Hive 设定为严格模式(hive.mapred.mode=strict,nonstrict)时,不允许在 HQL 语句中出现笛卡尔积,这实际说明了 Hive 对笛卡尔积支持较弱。因为找不到 Join key,Hive 只能使用 1 个 reducer 来完成笛卡尔积。
    当然也可以使用 limit 的办法来减少某个表参与 join 的数据量,但对于需要笛卡尔积语义的需求来说,经常是一个大表和一个小表的 Join 操作,结果仍然很大(以至于无法用单机处理),这时 MapJoin 才是最好的解决办法。MapJoin,顾名思义,会在 Map 端完成 Join 操作。这需要将 Join 操作的一个或多个表完全读入内存。
    PS:MapJoin 在子查询中可能出现未知 BUG。在大表和小表做笛卡尔积时,规避笛卡尔积的方法是,给 Join 添加一个 Join key,原理很简单:将小表扩充一列 join key,并将小表的条目复制数倍,joinkey 各不相同;将大表扩充一列 join key 为随机数。

    精髓就在于复制几倍,最后就有几个 reduce 来做,而且大表的数据是前面小表扩张 key 值范围里面随机出来的,所以复制了几倍 n,就相当于这个随机范围就有多大 n,那么相应的,大表的数据就被随机的分为了 n 份。并且最后处理所用的 reduce 数量也是 n,而且也不会出现数据倾斜。

    两张表:
    table_a(小表):a,b,c,d
    table_b(大表):1,2,3,4,5,6,7,8,9
    

    image-20210227103949504

    group by 优化

    默认情况下,Map 阶段同一个 Key 的数据会分发到一个 Reduce 上,当一个 Key 的数据过大时会产生数据倾斜。进行 group by 操作时可以从以下两个方面进行优化:

    1. Map端部分预聚合

      事实上并不是所有的聚合操作都需要在 Reduce 部分进行,很多聚合操作都可以先在 Map 端进行部分聚合,然后在 Reduce 端的得出最终结果。

      set hive.map.aggr=true;		//开启Map端聚合参数设置
      
      //设置Map端聚合的行数阈值,超过此值时则会拆分job,默认值为100000
      set hive.groupby.mapaggr.checkinterval=100000;
      
    2. 有数据倾斜的时候进行负载均衡

      当HQL语句使用group by 时数据出现倾斜,如果该设置为true,那么hive会自动进行负载均衡。策略就是把MapReduce任务拆分两个:

      • 第一个做预汇总
      • 第二个做最终汇总
      // 自动优化,有数据倾斜的时候进行负载均衡(默认false)
      set hive.groupby.skewindata=false;
      

      当设定为true时,生成查询计划有两个MapReduce任务

      1. 在第一个MapReduce任务中,map 的输出结果会随机分布到 reduce 中,每个 reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的group by key有可能分发到不同的 reduce 中,从而达到负载均衡的目的;
      2. 第二个 MapReduce 任务再根据预处理的数据结果按照 group by key 分布到各个 reduce 中,最后完成最终的聚合操作。

      Map 端部分聚合:并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端进行部分聚合,最后在 Reduce 端得出最终结果,对应的优化器为 GroupByOptimizer。例如:

      select t.a, sum(t.b), count(t.c), count(t.d) from table t group by t.a;
      

      优化后SQL:

      select t.a, sum(t.b), count(t.c), count(t.d) from (
      select a,b,null c,null d from table
      union all
      select a,0 b,c,null d from table group by a,c
      union all
      select a,0 b,null c,d from table group by a,d
      ) t;
      

    order by优化

    cluster by = distribute by + sort by 可以使用多个ReduceTask运行

    order by 只能使用一个ReduceTask运行

    order by 只能是在一个 reduce 进程中进行,所以如果对一个大数据集进行 order by ,会导致一个reduce 进程中处理的数据相当大,造成查询执行缓慢。

    1. 在最终结果上进行order by,不要在中间的大数据集上进行排序。如果最终结果较少,可以在一个reduce上进行排序时,那么就在最后的结果集上进行order by。
    2. 如果是取排序后的前N条数据,可以使用distribute by和sort by在各个reduce上进行排序后前N条,然后再对各个reduce的结果集合合并后在一个reduce中全局排序,再取前N条,因为参与全局排序的order by的数据量最多是reduce个数 * N,所以执行效率会有很大提升。

    在Hive中,关于数据排序,提供了四种语法,一定要区分这四种排序的使用方式和适用场景。

    1. order by:全局排序,缺陷是只能使用一个reduce
    2. sort by:单机排序,单个reduce结果有序
    3. cluster by:对同一字段分桶并排序,不能和sort by连用
    4. distribute by + sort by:分桶,保证同一字段值只存在一个结果文件当中,结合sort by保证每个reduceTask结果有序

    Hive HQL 中的 order by 与其他 SQL 方言中的功能一样,就是将结果按某字段全局排序,这会导致所有 map 端数据都进入一个 reducer 中,在数据量大时可能会长时间计算不完。

    如果使用 sort by,那么还是会视情况启动多个 reducer 进行排序,并且保证每个 reducer 内局部有序。为了控制map 端数据分配到 reducer 的 key,往往还要配合 distribute by 一同使用。如果不加distribute by 的话,map 端数据就会随机分配到 reducer,根据优化提供两种全局排序方式如下

    建表导入数据

    create table if not exists student(id int, name string, sex string, age int,
    department string) row format delimited fields terminated by ",";
    load data local inpath "/home/bigdata/students.txt" into table student;
    
    1. 第一种直接使用order by来做。如果结果数据量很大,这个任务的执行效率会非常低,实现SQL如下

      select id,name,age from student order by age desc limit 3;
      
    2. 使用distribute by + sort by 多个reduceTask,每个reduceTask分别有序

      set mapreduce.job.reduces=3;
      
      create table student_orderby_result as select * from student distribute by (case
      when age > 20 then 0 when age < 18 then 2 else 1 end) sort by (age desc);
      

      关于上述实现SQL中分界值的确定可以通过采样的方式来估计数据分布规律。

    Count Distinct 优化

    当要统计某一列去重数时,如果数据量很大,count(distinct) 就会非常慢,原因与 group by 类似,count(distinct) 逻辑只会有很少的 reducer 来处理。这时可以用 group by 来改写:

    // 直接使用 count(distinct xx)
    select count(distinct id) from student;
    
    //使用group by进行统计替代distinct
    select count(1) from (
    select age from student
    where department >= "MA"
    group by age
    ) t;
    

    具体示例如下:

    优化前 ,一个普通的只使用一个reduceTask来进行count(distinct) 操作,实现SQL如下

    // 优化前(只有一个reduce,先去重再count负担比较大)
    select count(distinct id) from tablename;
    

    优化后 ,但是这样写会启动两个MR job(单纯 distinct 只会启动一个),所以要确保数据量大到启动job 的 overhead 远小于计算耗时,才考虑这种方法。当数据集很小或者 key 的倾斜比较明显时,group by 还可能会比 distinct 慢。

    -- 优化后(启动两个job,一个job负责子查询(可以有多个reduce),另一个job负责count(1)):
    select count(1) from (select distinct id from tablename) tmp;
    select count(1) from (select id from tablename group by id) tmp; // 推荐使用这种
    

    怎样写in/exists语句

    在hive的早期版本中,in/exists语法是不支持的,但是从hive-0.8X以后的版本开始支持此语法,但是不推荐使用。虽然经过测验,Hive-2.3.6 也支持 in/exists 操作,但还是推荐使用 Hive 的一个高效替代方案:left semi join,例如下列SQL实现

    -- in / exists 实现
    select a.id, a.name from a where a.id in (select b.id from b);
    select a.id, a.name from a where exists (select id from b where a.id = b.id);
    

    使用join改写后SQL实现

    select a.id, a.name from a join b on a.id = b.id;
    

    最终使用left semi join 实现

    select a.id, a.name from a left semi join b on a.id = b.id;
    

    vectorization技术

    vectorization:矢量计算技术,在计算类似 scan, filter, aggregation 的时候, vectorization 技术以设置批处理的增量大小为 1024 行单次来达到比单条记录单次获得更高的效率。开启设置如下

    set hive.vectorized.execution.enabled=true ;
    set hive.vectorized.execution.reduce.enabled=true;
    

    多重模式

    如果 碰到一堆SQL且这一堆SQL的模式一样,都是从同一个表进行扫描做不通逻辑。

    可优化的地方:如果有n条SQL,每个SQL执行都会扫描一次这张表,如下执行SQL多次扫描同一张表进行存储

    insert .... select id,name,sex, age from student where age > 17;
    insert .... select id,name,sex, age from student where age > 18;
    insert .... select id,name,sex, age from student where age > 19;
    

    上述SQL执行插入会扫描多次同一张表,导致执行效率下降,针对此类型SQL执行优化过程如下

    // 原SQL执行根据不同值插入不同分区
    insert .... select id,name,sex, age from student where age > 17;
    insert .... select id,name,sex, age from student where age > 18;
    insert .... select id,name,sex, age from student where age > 19;
    
    优化后如下
    from student
    insert int t_ptn partition(city=A) select id,name,sex, age where city= A
    insert int t_ptn partition(city=B) select id,name,sex, age where city= B
    insert int t_ptn partition(city=C) select id,name,sex, age where city= C
    

    如果一个 HQL 底层要执行 10 个 Job,那么能优化成 8 个一般来说,肯定能有所提高,多重插入就是一个非常实用的技能。一次读取,多次插入,有些场景是从一张表读取数据后,要多次利用,这时可以使用 multi insert 语法,执行SQL示例如下:

    from sale_detail
    insert overwrite table sale_detail_multi partition (sale_date='2019',region='china' )
    select shop_name, customer_id, total_price where .....
    insert overwrite table sale_detail_multi partition (sale_date='2020',region='china' )
    select shop_name, customer_id, total_price where .....;
    

    说明:multi insert语法有一些限制

    1. 一般情况下,单个SQL中最多可以写128路输出,超过128路,则报语法错误。
    2. 在一个multi insert中,对于分区表,同一个目标分区不允许出现多次;对于未分区表,该表不能出现多次。
    3. 对于同一张分区表的不同分区,不能同时有insert overwrite和insert into操作,否则报错返回。

    Multi-Group by 是 Hive 的一个非常好的特性,它使得 Hive 中利用中间结果变得非常方便。例如:

    FROM (SELECT a.status, b.school, b.gender FROM status_updates a JOIN profiles b
    ON (a.userid = b.userid and a.ds='2019-03-20' )) subq1
    INSERT OVERWRITE TABLE gender_summary PARTITION(ds='2019-03-20')
    SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender
    INSERT OVERWRITE TABLE school_summary PARTITION(ds='2019-03-20')
    SELECT subq1.school, COUNT(1) GROUP BY subq1.school;
    

    上述查询语句使用了 Multi-Group by 特性连续 group by 了 2 次数据,使用不同的 Multi-Group by。这一特性可以减少一次 MapReduce 操作。

    中间结果压缩

    map输出压缩设置参数如下:

    set mapreduce.map.output.compress=true;
    set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
    

    中检结果压缩

    ​ 中间数据压缩就是对 hive 查询的多个 Job 之间的数据进行压缩。最好是选择一个节省CPU耗时的压缩方式。可以采用 snappy 压缩算法,该算法的压缩和解压效率都非常高。参数设置如下

    set hive.exec.compress.intermediate=true;
    set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
    set hive.intermediate.compression.type=BLOCK;
    

    结果数据压缩

    最终的结果数据(Reducer输出数据)也是可以进行压缩的,可以选择一个压缩效果比较好的,可以减少数据的大小和数据的磁盘读写时间; 注:常用的 gzip,snappy 压缩算法是不支持并行处理的,如果数据源是 gzip/snappy压缩文件大文件,这样只会有有个 mapper 来处理这个文件,会严重影响查询效率。 所以如果结果数据需要作为其他查询任务的数据源,可以选择支持 splitable 的 LZO 算法,这样既能对结果文件进行压缩,还可以并行的处理,这样就可以大大的提高 job 执行的速度了。参数设置如下

    set hive.exec.compress.output=true;
    set mapreduce.output.fileoutputformat.compress=true;
    set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
    set mapreduce.output.fileoutputformat.compress.type=BLOCK;
    

    在Hadoop集群中支持的压缩算法如下

    org.apache.hadoop.io.compress.DefaultCodec
    org.apache.hadoop.io.compress.GzipCodec
    org.apache.hadoop.io.compress.BZip2Codec
    org.apache.hadoop.io.compress.DeflateCodec
    org.apache.hadoop.io.compress.SnappyCodec
    org.apache.hadoop.io.compress.Lz4Codec
    com.hadoop.compression.lzo.LzoCodec
    com.hadoop.compression.lzo.LzopCodec
    
  • 相关阅读:
    Django基于Form之登录和注册
    python中函数参数*args和**kw的区别
    django用户认证系统——自定义认证后台8
    django用户认证系统——重置密码7
    django用户认证系统——修改密码6
    django用户认证系统——注销和页面跳转5
    关于Shiro的盐值加密法的使用
    Shiro的登录验证【基于SpringMVC框架下】
    Shrio的登录验证过程中的密码验证,过程,及relam在applicationContext.xml的配置
    taglib自定义标签的使用
  • 原文地址:https://www.cnblogs.com/starzy/p/14473400.html
Copyright © 2011-2022 走看看