zoukankan      html  css  js  c++  java
  • Hive优化

    Hive优化

    目标:①横向增加并发,②纵向较少依赖

    Map的优化

    • 作业会通过input的目录产生一个或者多个map任务。set dfs.block.size(=128)

    • 是不是每个map处理接近文件块的大小?

    • 如何合并小文件,减少map数

      set mapred.max.split.size=100000000;
      set mapred.min.split.size.per.node=100000000;
      set mapred.min.split.size.per.rack=100000000;
      set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
    • 如何适当的增加map数?

      set mapred.map.tasks=10;
    • map聚合:mapreduce里面有个combiner

      hive.map.aggr=true

    Reduce优化

    • 调整Reduce个数

      set hive.exec.reducers.bytes.per.reduce=10
      set mapred.reduce.tasks=100(优先)
    • MR中有几个痛点

      • 什么情况会存在reduce只有1个的情况

        • 没有 group by,优化添加 group by

          select count(1) from tablename where pt='2012-07-04';
          -- 优化后
          select pt,count(1) from talename where pt='2012-07-04' group by pt;
        • 使用order by:对输入的数据做全局排序,优化:distribute by和sort by结合起来

          • order by:全局排序

          • sort by:不是全局排序,数据进入reduce之前做的排序

          • distribute by:控制map端如何拆分数据给reduce

        • 笛卡儿积:join的时候用on就能避免一个reduce

          join(两张表及以上)的时候不加on条件或者无效的on条件,Hive只能使用1个reduce来完成笛卡儿积

      • 加块查询速度

        • partition

        • Map join

          按照join顺序中最后一个表应该尽量是大表 ,因为join前一阶段生成的数据会存在于Reduce的buffer中,通过stream最后面的表,直接从Reduce的buffer中读取已经缓存的中间结果数据(这个中间结果数据可能时join顺序中,前面表连接的结果的key,数据量相对较小,内存开销小),这样,与后面的大表进行连接时,只需要从buffer中读取缓存的key,与大表中指定key进行连接,速度会更快,也可能避免内存缓冲区溢出。

          • 通常大大表做join,依赖MR的框架的sort功能

          • 通常大小表做join,依赖MR的框架的sort功能(不优化)

            -- 优化
            -- mapjoin会把小表的数据全部读入内存中,在map阶段直接拿另一个表的数据和内存中表数据做匹配,由于map时进行join操作,省去了reduce,运行的效率也会高很多;b小表
            select /*+mapjoin(b)*/ a.key,a.value
            from a
            join b on a.key=b.key;
            -- a表视为大表
            select /*+streamtable(a)*/ a.val,b.val,c.val
            from a
            join b on a.key=b.key
            join c on c.key=b.key;
        • Union all(不去重)比Union (去重)更优化

          先做union all再做join或group by等操作可以有效减少MR过程,尽管是多个select,最终只有一个mr

        • Multi-insert & Multi-group by

          from tablename 
          insert overwrite table tablename1 [partiton(partcol1=val1)]
          select_statement1 group by key1
          insert overwrite table tablename2 [partiton(partcol2=val2)]
          select_statement2 group by key2
        • Automatic merge

          当文件大小比阀值小时,hive会启动一个mr进行合并

          -- 是否合并Map输出文件,默认为true
          hive.merge.mapfiles=true
          -- 是否合并Reduce输出文件,默认为False
          hive.merge.mapredfiles=false
          -- 合并文件的大小
          hive.merge.size.per.task=256*1000*1000
        • Multi-Count Distinct

          -- 目的负载均衡
          set hive.groupby.skewindata=true;
          select dt,count(distinct uniq_id),count(distinct ip)
          from ods_log where dt='20170301' group by dt

    join优化

    • 一个MR job

      select a.val,b.val,c.val 
      from a
      join b on a.key = b.key1
      join c on a.key = c.key2
    • 生成多个MR job

      select a.val,b.val,c.val 
      from a
      join b on a.key = b.key1
      join c on c.key = b.key2
    • 表连接顺序

      • 左连接时,左表中出现的join字段都保留,右表没连接上的都为空

        -- 执行顺序:首先完成2表join,然后再通过where条件进行过滤,这样在join过程中可能会输出大量结果,再对这些结果进行过滤,比较耗时。
        select a.val,b.val
        from a
        left outer join b on a.key = b.key
        where a.dt='2009-07-07' and b.dt='2009-07-07'
        -- 优化:将where条件放再on后,在join过程中,就对不满足条件的记录进行了预先过滤
        select a.val,b.val
        from a
        left outer join b
        on (a.key = b.key and a.dt='2009-07-07' and b.dt='2009-07-07')
    • 并行执行

      -- 开启
      set live.exec.parallel = true;

    数据倾斜

    • 操作

      • join

      • group by

      • count distinct

    • 原因

      • key分布不均导致

      • 人为设计表

      • 业务数据特点

    • 症状

      • 任务进度长时间维持在99%(或者100%),查看任务监控页面,发现只有少量(1个或几个)reduce字任务未完成

      • 查看未完成的子任务,可以看到本地读数据量积累非常大,通常超过10G可以认为发生数据倾斜

    • 倾斜度

      • 平均记录数超过50w且最大记录数时超过平均记录数的4倍

      • 最长时长比平均时长超过4分钟,且最大时长超过平均时长的2倍

    • 万能方法

      hive,groupby.skewindata=true;

    数据导出导入

    • 导出到本地

      insert overwrite local directory '/root/badou/hadoop/hive_test/behavior.txt'
      select userId,title from test.behavior;
    • 导出到hdfs

      insert overwrite directory '/rawdata/database/test/behavior_table'
      select userId,title from test.behavior;
    • 本地数据导入表中

      load data local inpath '/root/badou/hadoop/hive_test/2008-08'
      overwrite into table test.rating_p partition(dt='2008-08');
    • hdfs数据导入表中

      load data inpath '/root/badou/hadoop/hive_test/2010-08'
      overwrite into table test.rating_p partition(dt='2010-08');

    format

    • Hive的UDF都是通过Java语言编写的,hive提供了另外一种方式,也达到了类似的目的,但是方法更加简单transform支持多种语言

      -- 添加一个shell脚本
      hive> add file /root/badou/hadoop/hive_test/transform.awk;
      Added resources: [/root/badou/hadoop/hive_test/transform.awk]
      -- 使用transform.awk
      select transform(movieid , title) using "awk -f transform.awk" from test.movie limit 10;
      select transform(movieid , title) using "awk -f transform.awk" as uu from test.movie limit 10;

      -- 添加一个python脚本
      hive> add file /root/badou/hadoop/hive_test/transform.py;
      hive> select transform(movieid , title) using "python transform.py" as uu from test.movie limit 10;
      -- wordcount
      hive> add file /root/badou/hadoop/hive_test/map.py;
      Added resources: [/root/badou/hadoop/hive_test/map.py]
      hive> add file /root/badou/hadoop/hive_test/red.py;
      Added resources: [/root/badou/hadoop/hive_test/red.py]
      select transform(line) using 'python map.py' as word,count from test.docs limit 10;
      -- 结果
      Preface 1
      “The    1
      Forsyte 1
      Saga”   1
      was     1
      the     1
      title   1
      originally      1
      destined        1
      for     1
      -- map和reduce
      select transform(wc.word , wc.count) using 'python red.py' as w,c
      from
      (select transform(line) using 'python map.py' as word,count from test.docs cluster by word) as wc limit 100;
      -- 结果插入到表中
      insert overwrite table test.word_count
      select transform(wc.word , wc.count) using 'python red.py' as w,c
      from
      (select transform(line) using 'python map.py' as word,count from test.docs cluster by word) as wc limit 100;
  • 相关阅读:
    为什么 PCB 生产时推荐出 Gerber 给工厂?
    Fedora Redhat Centos 有什么区别和关系?
    【KiCad】 如何给元件给元件的管脚加上划线?
    MCU ADC 进入 PD 模式后出现错误的值?
    FastAdmin 生产环境升级注意
    EMC EMI 自行评估记录
    如何让你的 KiCad 在缩放时不眩晕?
    KiCad 5.1.0 正式版终于发布
    一次单片机 SFR 页引发的“事故”
    java基础之集合
  • 原文地址:https://www.cnblogs.com/zxbdboke/p/10465971.html
Copyright © 2011-2022 走看看