Hive 是 Facebook 开源的一款基于 Hadoop 的数据仓库工具,它能完美支持 SQL 查询功能,将 SQL 查询转变为 MapReduce 任务执行。这使得大数据统计得以实现。Hive 是最早的也是目前应用最广泛的大数据处理解决方案。
一、基本操作
1、创建表
创建表的语法如下:
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
[CLUSTERED BY (col_name, col_name, ...) [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]
[
[ROW FORMAT row_format] [STORED AS file_format]
| STORED BY 'storage.handler.class.name' [ WITH SERDEPROPERTIES (...) ]
]
[LOCATION hdfs_path]
创建内部表,创建一个内部表,按天分区,字段直接以' '分割:
create table table_test_1 -- 这里没有标注external创建的表, 就是内部表 uid string comment '用户id', age string comment '用户年龄', gender string comment '用户性别' PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS Textfile
本机本地数据导入刚创建的表方式如下:
LOAD DATA LOCAL INPATH '本机文件路径' OVERWRITE INTO TABLE table_test_1 PARTITION(dt='20200822');
HDFS 路径加载数据方式如下:
LOAD DATA INPATH 'hdfs文件路径' OVERWRITE INTO TABLE table_test_1 PARTITION(dt='20200822');
外部表创建,即为某路径下的文件指定为一个 Hive 表结构,这种外部表的创建,不涉及数据的移动。
外部表创建有两种方式,第一种,就是在建表的时候就指定外部表的数据依赖路径:
create external table table_test_2 -- 这里标注external创建的表, 就是外部表 uid string comment '用户id', age string comment '用户年龄', gender string comment '用户性别' ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LOCATION 'HDFS路径' -- 这里指定集群中某个路径的文件为该外部表数据源
第二种,是先创建一个外部表结构,然后再指定路径:
create external table table_test_2 -- 这里标注external创建的表, 就是外部表 uid string comment '用户id', age string comment '用户年龄', gender string comment '用户性别' ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '
2、复合类型数据
对于 Hive 的基础数据结果,比如 string、bigint 等比较简单,这里直接跳过。下面经哥讲下 Hive 的复合类型数据用法。即 map、array、json,这三个复合数据类型要相对有一点点难度,可在工作中我们会经常使用。
map
map 的数据结构是 key-value 格式。数据语法为 map(k1,v1,k2,v2,…)。使用给定的 key-value 对,构造一个 map 数据结构。
hive> select map('k1','v1','k2','v2'), map('k1,'v1','k2','v2')['k1'] -- 获取k1对应值, map'k1',v1','k2','v2')['k2'] -- 获取k2对应值from tble_tet;OK{"k2""v","k1:"1"} v1 v2
工作中,map 样式数据被建表的同学声明为 string。这时候如果要使用 map 类型的特性,需要先将 string 转化为 map 类型。那么首先,我们要将下面字符串 'zhang:101&wang:102&li:103' 转化为 map,就需要使用 str_to_map 这个函数:
- str_to_map(your_string, delimiter1, delimiter2) -- your_string: 是准备要转化的字符串 -- delimiter1: 将字符串切分成KV格式 -- 每个KV的分隔符
举个例子,我们可以看一下这个代码:
select str_to_map('zhang:101&wang:102&li:103', '&', ':'); -- 这里注意第2、3个参数 >>> 输出如下: {"zhang":"101","wang":"102","li":"103"}
array
array 的语法为 array(val1,val2,val3,…),操作类型为 array。我们可以使用给定的表达式,构造一个 array 数据结构。举例:
hive> select array(100,200,300) , array(100,200,300)[0] , array(100,200,300)[1] from table_test; OK [100,200,300] 100 200
刚才我们说到实际工作的表,几乎都是默认使用 string 存储,很少在创建表的时候使用 array,往往都是基于 string 生成 array 进行使用的。那么,什么时候会用到 array 类型呢?下面我们举两个使用 array 类型的实例:
split 切分字符串返回 array 类型:
hive> select split('zhang:wang:li', ':') -- 即:将字符串切分为array , split('zhang:wang:li', ':')[0] -- 即:将字符串切分为array, 并获取array第一个元素 , split('zhang:wang:li', ':')[1] -- 即:将字符串切分为array, 并获取array第二个元素 from table_test; OK ["zhang","wang","li"] zhang wang
collect_list 聚合某字段返回 array 类型。我们假设一张记录用户登录使用手机型号的表,如下所示:
hive> select userId, phone from table_test; OK 10001 MI_1 10001 MI_2 10001 MI_3 10001 MI_4 10002 MIX_2 10002 IPHONE11 10002 HUAWEI
随后,我们对不同用户的不同终端类型以及终端数进行统计:
select userId , sort_array(collect_set(phone)) as phone_array -- 这里将同一个用户使用的终端类型情况聚合到一个数组内并去除重复的手机型号, 最后再排序 , size(collect_set(phone)) as phone_size -- 将用一个用户使用手机类型去重后求数组大小, 即为用户使用不同终端数量 from table_test group by userId
需要注意的是collect_set、collect_list。这里生成 array、sort_array 和 size 函数,是为了对生成的 array 进行处理。
json
json 同样是我们使用较多的数据。因为 json 具有良好的可读性和便于快速编写的特点,可以在不同平台之间进行数据交换。所以 json 数据类型在开发接口的时候应用最广。
第一个例子相对简单,只解析 json 第一层:
hive> select get_json_object('{"data": {"teachers":[{"gender":1,"age":"30"},{"gender":0,"age":"35"}], "students":[{"gender":1,"age":"10"},{"gender":0,"age":"11"}] },"city_name":"beijing"', '$.city_name'); >>> 输出: beijing
第二个例子相对复杂。这是一个嵌套 json array 的解析过程。需要你花时间学习、理解。以后再遇到 json 嵌套问题,都可以用这种方法解决。
这个例子中的数据如下:
hive> select json_sample from table_test; OK -- 返回json字符串,记录信息为用户在某个时间访问了不同城市旅游产品信息,为方便阅读,已经过json格式化处理: { "time": 1597920700558, "from": "104", "uid": "124789", "ip": "198.168.0.52", "data": [{ "post_id": "{"city_id":"10001","time":1597920700558"}", "type": 3 }, { "post_id": "{"city_id":"10002","time":1597920700558"}", "type": 8 }] }
这里的目的是将用户访问过的城市 id 提取出来,并进行行转列操作。
hive> select uid, split(get_json_object(get_json_object(json_sample, '$.post_id'),'$.city_id'),'_')[0] as city_id from table_test lateral view explode(split( regexp_replace( regexp_replace( get_json_object(data, '$.data'), '\[|\]',''), --将 json 数组两边的中括号去掉。 这里需要特别注意反斜杠在hive中的用法。 '\}\,\{' --将 json 数组元素之间的逗号换成分号, 因为data内部是多个 json 串,不同 json 串也是用逗号分隔,这个容易和下一层分隔符混淆 ,'\};\{'), '\;')) tb as json_sample -- 将 json 数组切分成一个个 json 结构的字符串,以使用 get_json_object 进行解析 -- 最后输出结果如下: 124789 10001 124789 10002
可以看出,上面 SQL 解析出 json 中 uid、city_id, 并将一行转为两行输出。这个案例分享的是对于嵌套 json 的解析过程。如果可以理解并掌握,那么以后 Hive 中涉及 json 数据解析都不会是问题。
二、特殊数据处理场景
除去上述场景外,还有一些特殊的数据处理场景。下面我们介绍一个较为典型的时间函数。
时间函数
首先,我讲下日期格式转化:
时间戳转化日期:from_unixtime(bigint unixtime, string format)。
参数1:unixtime 为时间戳,即从 19700101 开始至今累计的秒数。
参数2:是希望转行日期的格式,比如年月日可表示为"yyyy-MM-dd"。
-- 注意下面:将字符串可以按照第二个参数定制的样式输出 hive> select from_unixtime(1598360886, 'yyyy-MM-dd HH:mm:ss') , from_unixtime(1598360886, 'yyyy-MM-dd HH') , from_unixtime(1598360886, 'yyyy-MM-dd') , from_unixtime(1598360886, 'yyyyMMdd') OK
日期转化时间戳:unix_timestamp(string date, string format)。
参数1:date为日期字符串,如"2020-01-01"。
参数2:是对应参数 1 的格式,格式必须保持一致,即前面是年月日,这里也要对应年月日。
-- 根据第一个参数格式, 第二个参数格式也需要一致才可以运行 hive> select unix_timestamp('2020-08-23', 'yyyy-MM-dd') , unix_timestamp('20200823', 'yyyyMMdd') , unix_timestamp('2020-08-23 21', 'yyyy-MM-dd') , unix_timestamp('2020-08-23 21:30:30', 'yyyy-MM-dd HH:mm:ss') , unix_timestamp('20200823 21:30:30', 'yyyyMMdd HH:mm:ss') -- 注意与上一行格式上区别, 以及两个参数对应的调整 OK 1598112000 1598112000 1598112000 1598189430
yyyyMMdd 转化为 yyyy-MM-dd。
hive> select from_unixtime(unix_timestamp('20200823','yyyyMMdd'), 'yyyy-MM-dd') OK 2020-08-23
月份转化季度。即给出 1~12 整数,划分到不同季度,提取月份转化为整数。
hive> select floor((month('2020-08-23')-1)/3)+1 -- 季度 date=yyyy-MM-dd , floor((cast(substr('20200823',5,2) as int)-1)/3)+1 -- 季度 date=yyyyMMd , pmod(datediff('2020-08-23','2012-01-01'),7) -- 星期几, 0-6, 0表示周日, 表示周六 , weekofyear('2020-08-23') -- 一年中第几周 , year('2020-08-23') -- 取出年的数字部分 , month('2020-08-23') -- 取出月的数字部分 , day('2020-08-23') -- 取出日的数字部分 , hour('2020-08-23 21:30:35') -- 取出日的数字部分 , minute('2020-08-23 21:30:35') -- 取出日的数字部分 , second('2020-08-23 21:30:35') -- 取出日的数字部分 OK 3 3 0 34 2020 8 23 21 30 35
下面直接列举函数再附上实例,大家会一目了然。
hive> select datediff('2020-08-23','2020-08-20') OK 3 hive> select date_add('2020-08-23',3) -- 注意这里也可以填写负数, 即返回前几天日期 OK 2020-08-26 hive> select date_sub('2020-08-23',3) -- 这里也可以为负数,即返回后几天日期 OK 2020-08-20
三、查询性能优化
1、合并小文件
对于 Hive 来说,就是将输入的小文件进行合并,从而减少 mapper 任务数量。
例如,当查询数据时,如果已知对应数据源有很多小文件,可以先进行如下设置:
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- Map端输入、合并文件之后按照block的大小分割(默认) set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; -- Map端输入,不合并
当自己在建立中间表时,输出数据为了避免小文件过多,可以进行如下设置:
set hive.merge.mapredfiles = true; set hive.merge.size.per.task=256000000; -- 256MB set hive.merge.smallfiles.avgsize=256000000; -- 256MB
2、数据倾斜
现在解决数据倾斜的方案更加成熟,社区考虑 Hive 使用过程中会出现这种情况,进而提供了比较好的解决方案。现在,仅仅需要你配置如下参数分两部分进行。
第一部分:有些聚合可以先在 Map 端进行,然后进入 Reduce 端,这时数据量会小很多。我们再得出最终结果即可。
set hive.map.aggr=true; -- 开启Map端聚合参数设置 set hive.grouby.mapaggr.checkinterval=100000; -- 在Map端进行聚合操作的条目数目
第二部分:如下参数将会添加一个 MapReduce 任务,目的就是让数据平均分配到各个 reduce 上,这样基本能解决数据倾斜问题。
set hive.groupby.skewindata = true; -- 有数据倾斜的时候进行负载均衡(默认是false)
上面通过实例解释了 Hive 进行数据统计中,为什么会遇到数据倾斜,以及如何解决数据倾斜。最后我们来分享下 Hive 如何合理控制 map 和 reduce 数量来优化数据处理效率。
四、控制 map/reduce 任务数量
1、控制 map 数量
第一种解决办法是增加 mapper 个数。可以设置 set mapred.map.tasks= 一个很大的数值, 需要比系统默认的 map 数量大。
第二种解决办法是减少 mapper 个数。set maperd.min.split.size= 一个数字,该数值单位是字节,比如设置 1GB,即为 1024000000,因为默认一个 mapper 是 64MB,这样设置就可以让一个 mapper 处理 1GB 数据,自然 mapper 的数量也就减少了。
2、控制 reducer 数量
相比调整 mapper 数量,调整 reducer 数量的场景在工作中相对要多一些。当然,一般情况下我们也是不需要调整的。有些同学认为设置更多 reducer 会加快计算任务,但其实结果却不尽人意。
方式一:通过设置每个 reducer 处理的数量大小,最多 reducer 数量来间接控制 reducer 数据。
set hive.exec.reducers.bytes.per.reducer=一个数字(默认1GB, 即1024000000) set hive.exec.reducers.max=一个数字(默认为999)
方式二:直接设置 reducer 数量。比如下面设置 reducer 为 500,无论数据大小,都会强制启动 500 个 reducer 处理数据。
set mapred.reduce.tasks=500
上面分享的对 reducer 数量的控制,可能工作中会遇到。尤其在多个大表进行 join 操作的时候,希望你可以掌握使用的场景和方法。