数据仓库特性
面向主题性
集成性
非易失性
时变性
OLTP/OLAP
OLTP:面向事务处理 注重的事务 注重响应的时间 也就是我们所说的RDBMS(关系型数据库):比如mysql,oracle,注意和非关系型数据库(noSQL)的区分:比如redis mongodb
OLAP:面向分析处理 注重分析 数仓就是一个OLAP.比如:Apache Hive
数仓的三层架构
ODS(源数据层)其数据来自于各个不同数据源 之间存异 临时存储 不做分析
DW(数据仓库层)来自于ODS数据进行ETL操作 数据是格式整洁统一的模式
DA(数据应用层)最终消费数据的,其消费的数据来源于DW层的数据模型
为什么分层
集中数据整合/一步变多步降低维护成本/用空间换时间
hive安装部署
hive中没有所谓的主从角色 主备角色的概念 不是分布的软件。类似于mysql单机部署即可。
但是根据访问形式分为:本地模式 远程模式
本地模式:访问hive的客户端和hive服务在一台机器上
远程模式:启动hive服务,其他机器上使用客户端远程连接访问
根据元数据存储介质不同,分为:
内置derby:优点是不需要配置/缺点是不同路径下元数据无法共享
mysql版:由第三方mysql统一管理元数据 任何路径 远程还是本地 元数据一致
hive启动
本地模式:
bin/hive 进入hive的交互式命令行中 (在启动的时候还可以添加参数 -e -f)
远程模式:
bin/hiveServer2 启动hive服务
bin/beeline 客户端远程连接
beeline> ! connect jdbc:hive2://node-1:10000
scan complete in 8ms
Connecting to jdbc:hive2://node-1:10000
Enter username for jdbc:hive2://node-1:10000: root hive服务所在机器的linux用户名
Enter password for jdbc:hive2://node-1:10000: ******* 对应的密码
hive语法
create [external] table [if not exist] table_name
[(col_name data_type[comment col_comment],...,...)]
[comment table_comment]
[partition by(col_name data_type[comment col_comment],...,...)]
[clustered by(col_name,col_name,...)]
[sort by(col_name [ASC|DESC],...)into num_buckets buckets]
[row format row_rowformat]
[srored as file_format]
[location hdfs_path]
like运行用户复制现有的表结构但是不复制数据
create [external] table [if not exists] [db_name.]table_name like exiting_table
hive映射表
create table test_1(id int,name string,age int); 映射失败 null
--hdfs上自动生成的路径 /user/hive/warehouse/test.db/test_1
create table test_2(id int,name string,age int) row format delimited fields terminated by ','; -- 指定分隔符
--hdfs上自动生成的路径 /user/hive/warehouse/test.db/test_2
表的字段类型和顺序需要和结构化文件对应吗? 必须对应
hive数据类型
- hive除了sql数据类型还支持Java数据类型 且大小写不敏感
- 除了基本的数据类型 还支持复杂类型(往往需要结合分隔符进行操作)
- 在定义表的时候 需要注意自己定义的数据类型符合结构化数据文件中的类型 如果不一样 hive会尝试转
hive分隔符
hive默认读取文件加载映射的机制
inputformat(默认TextInputformat)负责读取数据默认是一行一行读取
serDe(lazysimpleSerDe)负责按照指定的分隔符进行切割
lazySimpleSerDe就是语法中的delimited
row format delimited | serde
row format表明开始制定分隔符
delimited表明使用内置的分隔符类进行数据切割(lazySimpleSerDe)
serDe表明使用其他的分隔符类来进行数据切割
需要根据结构化数据之间的分隔符进行分隔符的指定 如果数据是复杂类型 需要更多的参数指定清除
fields terminated by char --指定字段之间分隔符
collection items terminated by char --指定集合元素之间的分隔符
map keysterminated by char --指定集合元素kv之间的分隔符
line terminated by char --指定换行的分隔符
举个栗子:
1|allen|18
2|tom|19
create table t_test(id int,name string,age int) row format delimited fields terminated by '|';
hive默认分隔符( 01)
hive内部表外部表
create external table t_ext(id int, name string, age int)row format delimited fields terminated by ','locatioin'/hello';
内部表 数据文件默认映射的路径下 /user/hive/warehouse/数据库名.db/表名
外部表 数据可以在hdfs任意路径下 建表需要locatin 指定清楚
内部表外部表区别
drop删除表时
内部表除了删除hive表中信息还会删除位于hdfs默认路径下的文件
外部表只会删除hive中表的信息
分区表
优化类型表 减少查询的时候全表的扫描
create table t_user_part (id int, name string,country string)
partition by(country string)
row format delimited fields terminated by ',';
--分区字段不能与表中字段重复 错误
create table t_user_part (id int, name string,country string)
partition by(guojia string)
row format delimited fields terminated by ',';
--分区字段会以虚拟字段的形式出现在表结构上
分区表数据加载
使用load进行数据加载 加载的同时需要指定分组字段的值
load data local inpath '/root/hivedata/china.txt' into table
t_user_part partition(guojia='zhongguo')
load data local inpath '/root/hivedata/usa.txt' into table
t_user_part partition(guojia='meiguo')
分区表使用
select * from t_user_part where country ="china";普通查询 使用表中字段过滤全表扫描
select *from t_user_part where guojia="zhongguo";根据分区字段 扫名分区字段对应的文件夹 减少全表扫描
/user/hive/warehouse/test.db/t_user_part/guojia=zhongguo
分区表特点
-
优化表非必须
-
本质就是把数据按照不同文佳佳进行划分管理(文件名就是分区字段和其值guojia="zhongguo")
-
分区字段不能是表中已有的
-
显示在查询语句中 是一个虚拟的字段 其数值并不会位于文件中
-
查询时可以指定分区就不会扫描其他分区减少全区扫描
分桶(簇)表
clustered by xxx into N buckets
把结构化数据按照xxx字段分成N个部分
如何分
数值类型字段hashfunc(xxx)=xxx%N 余数为几就去哪桶中
其他类型字段hashfunc(xxx)=xxx.hashcode % N 余数为几就去哪桶中
分桶作用 减少join查询时候笛卡尔积(交叉相乘)的数量
分桶创建
开启分桶功能 指定分桶的个数要等于建表的个数
set hive.enforce.bucketing=true;
set mapreduce.job.reduces=4
create table stu_buck(Sno int,Sname string,Sex string,Sage int,Sdept string)
clustered by(sno)
into 4 buckets
row format delimited
fileds terminated by ',';
分桶表加载数据
直接hdfs fs -put虽然把数据加载到表中但没有分桶 违背分桶意义 错误
load data 错误
insert + into(插入表的数据来源于后续查询的结果)意味着分桶表无法直接获取 需要间接查询插入
先创建临时表
create table stu_tmp(Sno int,Sname string,Sex string,Sage int,Sdept string)row format delimited fields terminated by ',';
hadoop fs -put students.txt /user/hive/warehouse/test.db/stu_tmp
最后
insert overwrite table stu_buck
select * from stu_tmp cluster by(Sno);
这是一个mr过程
分桶表特点
- 优化表非必须
- 把文件按照指定字段分成若跟部分
- 指定字段必须表中已有
- 意义体现在join查询时笛卡尔积的数量优化
表的修改
alter table t_user_part partiton(guojia='riben')rename to partition(guojia='xiaoriben');
表的信息查看
desc formatted table_name;--格式美观
MANAGE_TABLE 受管理的表即内部表
EXTERNAL_TABLE 外部表
load加载数据(DML)
把数据从其他路径加载到hive表指定的路径下./user/hive/warehouse/数据库.db/表名
load 针对于内部表
locatition 针对于外部表
--本地加载数据(local)在hive服务器本地文件系统(数据复制操作)
load data local inpath '/root/hivedata/students.txt' into table stu_local;
日志信息:INFO : Loading data to table test.stu_local from file:/root/hivedata/students.txt
本质上是 hadoop fs -put
--非本地加载 数据位于hdfs上(数据移动)
load data inpath 'hello/student.txt' into table stu_no_local;
日志
Loading data to table test.stu_no_local from hdfs://node01:9000/hello/students.txt
本质 hadoop fs -mv
后续加载表数据的时会经常使用load data命令
insert(DML)
mysql INSERT INTO table_name (列1, 列2,...) VALUES (值1, 值2,....)
hive insert +select 插入内容来自后续查询结果
多重插入 在一次扫描中完成多次插入语句的执行,减少扫描
from source_table
insert overwrite table tablename1 [partition (partcol1=val1,partclo2=val2)]
select_statement1
insert overwrite table tablename2 [partition (partcol1=val1,partclo2=val2)]
select_statement2..
动态分区 分区表中字段并不是有自己手动指定 而是通过动态查询获取的
INSERT OVERWRITE TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...)
select_statement FROM from_statement
多分区表
create table t_user_part_duo (id int, name string,country string) partitioned by (guojia string,sheng string ) row format delimited fields terminated by ',';
load data local inpath '/root/hivedata/china.txt' into table t_user_part_duo partition(guojia='zhongguo',sheng="beijing");
load data local inpath '/root/hivedata/china.txt' into table t_user_part_duo partition(guojia='zhongguo',sheng="shanghai");
load data local inpath '/root/hivedata/usa.txt' into table t_user_part_duo partition(guojia='meiguo',sheng="jiazhou");
select * from t_user_part_duo where guojia='zhongguo' and sheng='beijing';
所谓多重分区指的是前一个分组的基础上继续进行分区 从而形成一种递进关系
企业中常见的所分区有
时间维度管理的分区 partition(month='xx',day='xx');
地域维度管理的分区 partition(provice='xxx',city='xxx');
数据导出
insert overwrite local directory '/root/12345'
select * from t_user where country="use";--导出到hive服务器所在本地文件
insert overwrite directory '/hello/12345'
select * from t_user where country="use";--导出到hdfs
select(分桶查询 cluster by)
首先需要开启分桶功能
set hive.enforce.buckteting = true;
分为几桶,如何确定
默认情况下:
Number of reduce tasks not specified. Estimated from input data size: 1
人为指定分桶个数
set mapreduce.job.reduces=2;
Number of reduce tasks not specified. Defaulting to jobconf value of: 2
Number of reduce tasks not specified. Defaulting to jobconf value of: 3
总结分桶查询就是根据指定的字段把数据分开至于分成几份如果不指定hive会自己根据数据量评估,如果指定就根据指定的来
分桶查询还会根据该字段排序(分且排序 默认升序)
distribute by(分)+sort by(排序)
select * from stu_tmp cluster by(sno) sort by (sage desc); 报错
Cannot have both CLUSTER BY and SORT BY clauses
select * from stu_tmp distribute by(sno) sort by (sage desc);
如果 DISTRIBUTE BY(分) +SORT BY(排序)的字段是同一个字段:
cluster by(分且排序)= DISTRIBUTE BY(分) +SORT BY(排序)
order by是全局排序
既然是全局排序 意味着只能有一个reducetask 这时候不管你设置几个 程序执行的时候只使用一个。
SORT BY是负责每个分桶内部的排序。
hive的执行模式
hive sql--->mapreduce---->yarn---->hdfs(数据)---->结果
hive sql--->mapreduce---->local Hadoop model--->hdfs(数据)---->结果
hive智能切换mr的集群或者本地模式
set hive.exec.model.local.auto=true;
切换标准
The total input size of the job is lower than: hive.exec.mode.local.auto.inputbytes.max (128MB by default)
The total number of map-tasks is less than: hive.exec.mode.local.auto.tasks.max (4 by default)
The total number of reduce tasks required is 1 or 0.
hive中的join
- inner join内关联:只显示两边关联上数据
- full outer join :外关联 所有数据都显示
- left join :左关联 以左表为准 左表都显示 右表关联上的 显示 关联不上的显示null
- right join:右关联 以右表为准 右表都显示 左表关联上的 显示 关联不上的显示null
hive命令行
bin/hive #本地交互式shell
bin/hive -e #启动执行后边指定的sql语句
bin/hive -f #启动执行后面指定的sql脚本文件 (企业中生产环境常用的方式)
hive中几种参数配置
conf/hive-site.xml #全局有效影响该安装包的任何启动方式
--hiveconf 启动会话有效 谁启动设置谁生效
set xxx- 会话级别 谁连接 谁开启会话 谁设置 谁生效
set hive.exec.reducers.bytes.per.reducer=<number> 每个 reduce task的平均负载数据量
set hive.exec.reducers.max=<number> 设置 reduce task 数量的上限
set mapreduce.job.reduces=<number> 指定固定的 reduce task 数量
这三种方式范围一次递减 优先级依次递增 最为开发中 用的最多的是set命令行设置。
作为基于hadoop的数据仓库 hive在加载的时候 也会把同机器上hadoop的配置文件加载进来。
某些系统级的参数,例如 log4j 相关的设定,必须用前两种方式设定,因
为那些参数的读取在 Session 建立以前已经完成了
UDF开发步骤
写类继承UDF
重载evaluate方法,里面就是自定义函数的具体逻辑
把工程打成jar 添加至hive的环境变量中
add jar path;
注册自定义函数
create temporary function 自定义函数 as '类的全路径';
行存储与列存储
行式存储
优点
- 符合面向对象思维,一行数据就是一条记录
- 方便进行更新insert/update
缺点
- 查询只涉及几列的话影响性能
- 每一行数据类型不同不易获得极高压缩比/空间利用率不高
- 不是所有列都适合作为索引
列式存储
优点
- 只涉及到列才会被查询,效率高
- 高效压缩比,节省存储空间和CPU
- 任何列都可作为索引
缺点
- 插入/更新数据麻烦
- 不适合扫描小量数据
hive常见存储格式
-
默认普通文本文件 textfile
-
还支持sequencefile rcfile OrcFile(0.11后出现)
SEQUENCEFILE,RCFILE,ORCFILE格式的表不能直接从本地文件导入数据,数据要先导入到textfile格式的
表中, 然后再从表中用insert导入SequenceFile,RCFile,ORCFile表中。 -
如果需要还可以结合压缩编码功能一起使用 压缩使cpu压力过大换来磁盘空间
hive优化--压缩设置
提高数据吞吐量和性能的一种手段
减少磁盘存储空间
减少IO
但会加大CPU开销
- hive中间数据压缩
hive.exec.compress.intermediate=true
--默认为false,中间数据压缩功能 hql语句最终转化为mr job 开启中间数据压缩功能就是在mrshuffle阶段对mapper产生的中间结果数据压缩
SnappyCOdec比较适合作为压缩算法 很好地压缩性能较低的CPU开销
mapred.map.output.compression.codec --设置具体的压缩算法的参数
具体设置
set hive.exec.compress.intermediate=true;
set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set mapred.map.output.compression.codec=com.hadoop.copression.lzo.lzoCodec;
- hive最终数据压缩
hive.exec.compress.output=true;--对最终生成的hive表数据压缩
set hive.exec.compress.output=true;
set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
-
压缩模式评价
压缩比:压缩的越小越好 大小 中快慢
压缩时间:越快越好
已经压缩的是否可在分割:可分隔的文件可以有多个mr处理,更好地并行化
压缩方式 | 压缩后大小 | 压缩速度 | 是否可分割 |
---|---|---|---|
Gzip | 中 | 中 | 否 |
Bzip2 | 小 | 慢 | 是 |
LZO | 大 | 快 | 是 |
Snappy | 大 | 快 | 是 |
总结
Bzip2 压缩比最高 CPU占用更高 GZIP次之可以更好的提高磁盘利用率和降低IO
LZO和Snappy 算法 更快的解压缩速度 压缩速度相当,解压速度Snappy较快
hadoop将大文件分割成HDFS block默认64mb的splits分片 每个分片对应一个mapper程序.
其中Gzip不支持分割
压缩格式 | 对应编码器/解码器 |
---|---|
Deflate | org.apache.hadoop.io.compress.DefaultCodec |
Gzip | org.apache.hadoop.io.compress.GzipCodec |
BZip2 | org.apache.hadoop.io.compress.BZipCodec |
Snappy | org.apache.hadoop.io.compress.SnappyCodec(中间输出使用) |
LZO | com.hadoop.compression.lzo.LzoCodec(中间输出使用) |