Hive增量更新方案
方案一(总结出来业界可行方案):
1、Hive原始表提前规划好以时间分区,初始化装载源库记录为base_table(最新数据)
2、每个相关表都会有一个timestamp列,对每一行操作做了修改,都会重置这列timestamp为当前时间戳;
3、新增数据通过sqoop(支持当天抽取)或者导出当天文件两种形式,抽取或导入到hive表,记录为新增表incremental_table
4、(1)如果incremental_table表中数据不涉及到更新,直接导入到以时间分区的base_table表中 (2)如果某几个分区涉及到更新操作,将这段时间分区内的base_table和incremental_table数据进行合并,有相同主键的取timestamp最新的那条,合并后新数据重新写回base_table分区中;
(3)如果涉及到删除数据操作,分软删除(打标签)和硬删除;如果是软删除,记录数据也会存在incremental_table,只是在合并base_table和incremental_table表中过滤掉此记录写回base_table即可。如果是硬删除,需将删除的数据行放入额外审计表中audit_table,与base_table和incremental_table一起进行合并过滤处理后,再写回base_table。
采用Oozie、NiFi或者Shell脚本等方式,将上述流程统一做成一个工作流,方便调度。
方案二(如有业务诉求,需要对Hive表具体某条记录进行delete或update操作):
如果一个HIVE表要实现update和delete功能,该表就必须支持ACID,需开启ACID,同时必须满足以下条件:
1、表的存储格式必须是ORC(STORED AS ORC);
2、表必须进行分桶(CLUSTERED BY (col_name, col_name, ...) INTO num_buckets BUCKETS);
3、Table property中参数transactional必须设定为True(tblproperties('transactional'='true'));
4、以下配置项必须被设定:
Client端:
hive.support.concurrency – true
hive.enforce.bucketing – true
hive.exec.dynamic.partition.mode – nonstrict
hive.txn.manager – org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
服务端:
hive.compactor.initiator.on – true
hive.compactor.worker.threads – 1
hive.txn.manager – org.apache.hadoop.hive.ql.lockmgr.DbTxnManager(经过测试,服务端也需要设定该配置项)