Hive从0.14版本开始支持事务和行级更新,但缺省是不支持的,需要一些附加的配置。要想支持行级insert、update、delete,需要配置Hive支持事务。
一、Hive具有ACID语义事务的使用场景
1. 流式接收数据。
许多用户使用诸如Apache Flume、Apache Storm或Apache Kafka这样的工具将流数据灌入Hadoop集群。当这些工具以每秒数百行的频率写入时,Hive也许只能每15分钟到1小时添加一个分区,因为过于频繁地添加分区很快就会使一个表中的分区数量难以维护。而且这些工具还可能向已存在的分区中写数据,但是这样将会产生脏读(可能读到查询开始时间点以后写入的数据),还在这些分区的所在目录中遗留大量小文件,进而给NameNode造成压力。在这个使用场景下,事务支持可以获得数据的一致性视图同时避免产生过多的文件。
2. 缓慢变化维。
在一个典型的星型模式数据仓库中,维度表随时间的变化很缓慢。例如,一个零售商开了一家新商店,需要将新店数据加到商店表,或者一个已有商店的营业面积或其它需要跟踪的特性改变了。这些改变会导致插入或修改个别记录。从0.14版本开始,Hive支持行级更新。
3. 数据重述。
有时发现数据集合有错误并需要更正。或者当前数据只是个近似值(如只有全部数据的90%,得到全部数据会滞后)。或者业务业务规则可能需要根据后续事务重述特定事务(打个比方,一个客户购买了一些商品后又购买了一个会员资格,此时可以享受折扣价格,包括先前购买的商品)。或者一个客户可能按照合同在终止了合作关系后要求删除他们的客户数据。从Hive 0.14开始,这些使用场景可以通过INSERT、UPDATE和DELETE支持。
二、配置Hive支持事务(Hive 2.0版)
1. 在hive-site.xml文件中添加如下配置项
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
< property > < name >hive.support.concurrency</ name > < value >true</ value > </ property > < property > < name >hive.exec.dynamic.partition.mode</ name > < value >nonstrict</ value > </ property > < property > < name >hive.txn.manager</ name > < value >org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</ value > </ property > < property > < name >hive.compactor.initiator.on</ name > < value >true</ value > </ property > < property > < name >hive.compactor.worker.threads</ name > < value >1</ value > </ property > |
2. 添加Hive元数据(使用mysql存储)
1
2
3
4
|
INSERT INTO NEXT_LOCK_ID VALUES (1); INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES (1); INSERT INTO NEXT_TXN_ID VALUES (1); COMMIT ; |
说明:初始时这三个表没有数据,如果不添加数据,会报以下错误:
org.apache.hadoop.hive.ql.lockmgr.DbTxnManager FAILED: Error in acquiring locks: Error communicating with the metastore
三、测试
1. 启动hadoop集群和mysql
1
2
3
|
$HADOOP_HOME /sbin/start-dfs .sh $HADOOP_HOME /sbin/start-yarn .sh ~ /mysql/bin/mysqld & |
2. 建立测试表
1
2
3
4
|
use test ; create table t1( id int, name string) clustered by ( id ) into 8 buckets stored as orc TBLPROPERTIES ( 'transactional' = 'true' ); |
说明:建表语句必须带有into buckets子句和stored as orc TBLPROPERTIES ('transactional'='true')子句,并且不能带有sorted by子句。
3. 测试insert、update、delete
1
2
3
4
|
insert into t1 values (1, 'aaa' ); insert into t1 values (2, 'bbb' ); update t1 set name = 'ccc' where id=1; delete from t1 where id=2; |
执行结果分别如图1-3所示。
<ignore_js_op>
图1
<ignore_js_op>
图2
<ignore_js_op>
图3
说明:不能修改bucket列的值,否则会报以下错误:
FAILED: SemanticException [Error 10302]: Updating values of bucketing columns is not supported. Column id.
4. 已有非ORC表的转换
-- 在本地文件/home/grid/a.txt中写入以下4行数据
1,张三,US,CA
2,李四,US,CB
3,王五,CA,BB
4,赵六,CA,BC
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
-- 建立非分区表并加载数据 CREATE TABLE t1 (id INT , name STRING, cty STRING, st STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ; LOAD DATA LOCAL INPATH '/home/grid/a.txt' INTO TABLE t1; SELECT * FROM t1; -- 建立外部分区事务表并加载数据 CREATE EXTERNAL TABLE t2 (id INT , name STRING) PARTITIONED BY (country STRING, state STRING) CLUSTERED BY (id) INTO 8 BUCKETS STORED AS ORC TBLPROPERTIES ( 'transactional' = 'true' ); INSERT INTO T2 PARTITION (country, state) SELECT * FROM T1; SELECT * FROM t2; -- 修改数据 INSERT INTO TABLE t2 PARTITION (country, state) VALUES (5, '刘' , 'DD' , 'DD' ); UPDATE t2 SET name = '张' WHERE id=1; DELETE FROM t2 WHERE name = '李四' ; SELECT * FROM t2; |
修改前和修改后的数据分别如图4、图5所示。
<ignore_js_op>
图4
<ignore_js_op>
图5
说明:不能update分区键,否则会报以下错误:
FAILED: SemanticException [Error 10292]: Updating values of partition columns is not supported