zoukankan      html  css  js  c++  java
  • 19.Hive函数、存储压缩

    一、函数

    1.1 系统内置函数

    查看系统自带的函数hive> show functions;

    显示自带的函数的用法hive> desc function upper;

    详细显示自带的函数的用法hive> desc function extended upper;

    1.2 自定义函数

    Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数(UDF:user-defined function

    UDF(User-Defined-Function)
    一进一出

    UDAF(User-Defined Aggregation Function)
    聚集函数,多进一出,如countmaxmin

    UDTF(User-Defined Table-Generating Functions)
    一进多出,如lateral view explore()

    编程步骤:

    1. 引入依赖
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>1.2.1</version>
    </dependency>
    
    1. 继承org.apache.hadoop.hive.ql.exec.UDF。需要实现evaluate函数,evaluate函数支持重载。
    public class Upper extends UDF {
        public String evaluate(String original) {
            if (original == null) {
                return null;
            }
            return original.toUpperCase();
        }
    }
    
    1. 打包命名为h.jar后上传至/opt/module/hive-1.2.1/lib文件下
    2. 在hive的命令行窗口创建函数
      添加Jaradd jar /opt/module/hive-1.2.1/lib/h.jar
      创建functioncreate temporary function myupper as "com.hucheng.hive.Upper"
    3. 删除命令:Drop temporary function if exists myupper;

    二、压缩和存储

    Hadoop数据压缩见https://hucheng.blog.csdn.net/article/details/106520311

    2.1 开启Map输出阶段压缩

    开启Map输出阶段压缩可以减少JobMapReduceTask间数据传输量,具体配置如下:

    1. 开启Hive中间传输数据压缩功能
      hive (default)>set hive.exec.compress.intermediate=true;
    2. 开启MapReduceMap输出压缩功能
      hive (default)>set mapreduce.map.output.compress=true;
    3. 设置MapReduceMap输出数据的压缩方式
      hive (default)>set mapreduce.map.output.compress.codec= org.apache.hadoop.io.compress.SnappyCodec;
    4. 执行查询语句
      hive (default)> select count(ename) name from emp;

    2.2 开启Reduce输出阶段压缩

    Hive将输出写入到表中时,输出内容同样可以进行压缩。属性hive.exec.compress.output控制着这个功能。用户可能需要保持默认设置文件中的默认值false,这样默认的输出就是非压缩的纯文本文件了。用户可以通过在查询语句或执行脚本中设置这个值为true,来开启输出结果压缩功能。

    1. 开启Hive最终输出数据压缩功能
      hive (default)>set hive.exec.compress.output=true;
    2. 开启MapReduce最终输出数据压缩
      hive (default)>set mapreduce.output.fileoutputformat.compress=true;
    3. 设置MapReduce最终数据输出压缩方式
      hive (default)> set mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;
    4. 设置MapReduce最终数据输出压缩为块压缩
      hive (default)> set mapreduce.output.fileoutputformat.compress.type=BLOCK;
    5. 测试一下输出结果是否是压缩文件
      hive (default)> insert overwrite local directory '/opt/module/datas/distribute-result' select * from emp distribute by deptno sort by empno desc;

    2.3 Hive文件存储格式

    Hive支持的存储数据的格式主要有:TEXTFILE SEQUENCEFILEORCPARQUET

    ①列式存储和行式存储
    在这里插入图片描述
    行存储的特点:查询满足条件的一整行数据的时候,列存储则需要去每个聚集的字段找到对应的每个列的值,行存储只需要找到其中一个值,其余的值都在相邻地方,所以此时行存储查询的速度更快。

    列存储的特点:因为每个字段的数据聚集存储,在查询只需要少数几个字段的时候,能大大减少读取的数据量;每个字段的数据类型一定是相同的,列式存储可以针对性的设计更好的设计压缩算法。

    TEXTFILESEQUENCEFILE的存储格式都是基于行存储的;ORCPARQUET是基于列式存储的。

    ②TextFile格式

    默认格式,数据不做压缩,磁盘开销大,数据解析开销大。可结合GzipBzip2使用,但使用Gzip这种方式,Hive不会对数据进行切分,从而无法对数据进行并行操作。

    ③Orc格式

    Orc (Optimized Row Columnar)Hive 0.11版里引入的新的存储格式。

    如图所示可以看到每个Orc文件由1个或多个Stripe组成,每个Stripe一般为HDFS的块大小,每一个Stripe包含多条记录,这些记录按照列进行独立存储,对应到Parquet中的Row Group的概念。每个Stripe里有三部分组成,分别是Index DataRow DataStripe Footer
    在这里插入图片描述

    • Index Data:一个轻量级的Index,默认是每隔1W行做一个索引。这里做的索引应该只是记录某行的各字段在Row Data中的Offset
    • Row Data:存的是具体的数据,先取部分行,然后对这些行按列进行存储。对每个列进行了编码,分成多个Stream来存储。
    • Stripe Footer:存的是各个Stream的类型,长度等信息。

    每个文件有一个File Footer,这里面存的是每个Stripe的行数,每个Column的数据类型信息等;每个文件的尾部是一个PostScript,这里面记录了整个文件的压缩类型以及FileFooter的长度信息等。在读取文件时,会seek到文件尾部读PostScript,从里面解析到File Footer长度,再读FileFooter,从里面解析到各个Stripe信息,再读各个Stripe,即从后往前读。

    ④Parquet格式

    Parquet文件是以二进制方式存储的,所以是不可以直接读取的,文件中包括该文件的数据和元数据,因此Parquet格式文件是自解析的。

    • 行组(Row Group):每一个行组包含一定的行数,在一个HDFS文件中至少存储一个行组,类似于orcstripe的概念。
    • 列块(Column Chunk):在一个行组中每一列保存在一个列块中,行组中的所有列连续的存储在这个行组文件中。一个列块中的值都是相同类型的,不同的列块可能使用不同的算法进行压缩。
    • 页(Page):每一个列块划分为多个页,一个页是最小的编码的单位,在同一个列块的不同页可能使用不同的编码方式。

    通常情况下,在存储Parquet数据的时候会按照Block大小设置行组的大小,由于一般情况下每一个Mapper任务处理数据的最小单位是一个Block,这样可以把每一个行组由一个Mapper任务处理,增大任务执行并行度。Parquet文件的格式如图所示:
    在这里插入图片描述
    上图展示了一个Parquet文件的内容,一个文件中可以存储多个行组,文件的首位都是该文件的Magic Code,用于校验它是否是一个Parquet文件,Footer length记录了文件元数据的大小,通过该值和文件长度可以计算出元数据的偏移量,文件的元数据中包括每一个行组的元数据信息和该文件存储数据的Schema信息。除了文件中每一个行组的元数据,每一页的开始都会存储该页的元数据,在Parquet中,有三种类型的页:数据页、字典页和索引页。数据页用于存储当前行组中该列的值,字典页存储该列值的编码字典,每一个列块中最多包含一个字典页,索引页用来存储当前行组下该列的索引,目前Parquet中还不支持索引页。

    2.4 主流文件存储格式对比实验

    ①存储文件的压缩比测试:

    TextFile

    hive (default)> create table log_text (
    track_time string,
    url string,
    session_id string,
    referer string,
    ip string,
    end_user_id string,
    city_id string
    )
    row format delimited fields terminated by '	'
    stored as textfile ;
    
    hive (default)> load data local inpath '/root/datas/log.data' into table log_text ;
    
    hive (default)> dfs -du -h /user/hive/warehouse/log_text;
    18.1 M  /user/hive/warehouse/log_text/log.data
    

    ORC

    create table log_orc(
    track_time string,
    url string,
    session_id string,
    referer string,
    ip string,
    end_user_id string,
    city_id string
    )
    row format delimited fields terminated by '	'
    stored as orc ;
    
    hive (default)> insert into table log_orc select * from log_text ;
    
    hive (default)> dfs -du -h /user/hive/warehouse/log_orc/ ;
    2.8 M  /user/hive/warehouse/log_orc/000000_0
    

    Parquet

    hive (default)> create table log_parquet(
    track_time string,
    url string,
    session_id string,
    referer string,
    ip string,
    end_user_id string,
    city_id string
    )
    row format delimited fields terminated by '	'
    stored as parquet ;
    
    hive (default)> insert into table log_parquet select * from log_text ;
    
    hive (default)> dfs -du -h /user/hive/warehouse/log_parquet/ ;
    13.1 M  /user/hive/warehouse/log_parquet/000000_0
    

    存储文件的压缩比总结:ORC > Parquet > textFile

    ②存储文件的查询速度测试:
    TextFile

    hive (default)> select count(*) from log_text;
    _c0
    100000
    Time taken: 21.54 seconds, Fetched: 1 row(s)
    Time taken: 21.08 seconds, Fetched: 1 row(s)
    Time taken: 19.298 seconds, Fetched: 1 row(s)
    

    ORC

    hive (default)> select count(*) from log_orc;
    _c0
    100000
    Time taken: 20.867 seconds, Fetched: 1 row(s)
    Time taken: 22.667 seconds, Fetched: 1 row(s)
    Time taken: 18.36 seconds, Fetched: 1 row(s)
    

    Parquet

    hive (default)> select count(*) from log_parquet;
    _c0
    100000
    Time taken: 22.922 seconds, Fetched: 1 row(s)
    Time taken: 21.074 seconds, Fetched: 1 row(s)
    Time taken: 18.384 seconds, Fetched: 1 row(s)
    

    存储文件的查询速度总结:查询速度相近

    2.5 存储和压缩结合

    修改Hadoop集群具有Snappy压缩方式

    ①将编译好的支持Snappy压缩的Hadoop-2.7.2.tar.gz包导入到Hadoop100/opt/software

    ②解压Hadoop-2.7.2.tar.gz到当前路径:tar -zxvf hadoop-2.7.2.tar.gz

    ③将hadoop-2.7.2/lib/native路径下的Snappy压缩的动态链接拷贝到开发集群中
    cp ../native/* /opt/module/hadoop-2.7.2/lib/native/

    ④分发集群:xsync native

    ⑤查看hadoop支持的压缩类型(此时已经支持snappy):

    [root@hadoop100 hadoop-2.7.2]$ hadoop checknative
    17/12/24 20:45:02 WARN bzip2.Bzip2Factory: Failed to load/initialize native-bzip2 library system-native, will use pure-Java version
    17/12/24 20:45:02 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
    Native library checking:
    hadoop:  true /opt/module/hadoop-2.7.2/lib/native/libhadoop.so
    zlib:    true /lib64/libz.so.1
    snappy:  true /opt/module/hadoop-2.7.2/lib/native/libsnappy.so.1
    lz4:     true revision:99
    bzip2:   false
    

    测试存储和压缩

    ORC存储方式的压缩:

    Key Default Notes
    orc.compress ZLIB high level compression (one of NONE, ZLIB, SNAPPY)
    orc.compress.size 262,144 number of bytes in each compression chunk
    orc.stripe.size 268,435,456 number of bytes in each stripe
    orc.row.index.stride 10,000 number of rows between index entries (must be >= 1000)
    orc.create.index true whether to create row indexes
    orc.bloom.filter.columns "" comma separated list of column names for which bloom filter should be created
    orc.bloom.filter.fpp 0.05 false positive probability for bloom filter (must >0.0 and <1.0)
    注意:所有关于ORCFile的参数都是在HQL语句的TBLPROPERTIES字段里面出现

    ①创建一个非压缩的的ORC存储方式

    hive (default)> create table log_orc_none(
    track_time string,
    url string,
    session_id string,
    referer string,
    ip string,
    end_user_id string,
    city_id string
    )
    row format delimited fields terminated by '	'
    stored as orc tblproperties ("orc.compress"="NONE");
    
    hive (default)> insert into table log_orc_none select * from log_text ;
    
    hive (default)> dfs -du -h /user/hive/warehouse/log_orc_none/ ;
    7.7 M  /user/hive/warehouse/log_orc_none/000000_0
    

    ②创建一个SNAPPY压缩的ORC存储方式

    hive (default)> create table log_orc_snappy(
    track_time string,
    url string,
    session_id string,
    referer string,
    ip string,
    end_user_id string,
    city_id string
    )
    row format delimited fields terminated by '	'
    stored as orc tblproperties ("orc.compress"="SNAPPY");
    
    hive (default)> insert into table log_orc_snappy select * from log_text ;
    
    hive (default)> dfs -du -h /user/hive/warehouse/log_orc_snappy/ ;
    3.8 M  /user/hive/warehouse/log_orc_snappy/000000_0
    

    上一节中默认创建的ORC存储方式,导入数据后的大小为2.8M,比Snappy压缩的还小。原因是ORC存储文件默认采用ZLIB压缩,ZLIB采用的是deflate压缩算法。比Snappy压缩的小。

    在实际的项目开发当中,hive表的数据存储格式一般选择:orc或parquet。压缩方式一般选择snappy,lzo。

  • 相关阅读:
    mysql之四.表介绍
    mysql之三.mysql的工作流程
    mysql之二.mysql中的存储引擎
    mysql之一.初识mysql
    数据及表结构的导出
    迭代器和生成器
    python字符串格式化的几种方式
    关于global 和 nonlocal你需要注意的问题
    请编写一个函数实现将IP地址转换成一个整数
    Python中__repr__和__str__区别
  • 原文地址:https://www.cnblogs.com/hucheng1997/p/13450896.html
Copyright © 2011-2022 走看看