hive 2.1
一 问题
最近有一个场景,要向一个表的多个分区写数据,为了缩短执行时间,采用并发的方式,多个sql同时执行,分别写不同的分区,同时开启动态分区:
set hive.exec.dynamic.partition=true
insert overwrite table test_table partition(dt) select * from test_table_another where dt = 1;
结果发现只有1个sql运行,其他sql都会卡住;
查看hive thrift server线程堆栈发现请求都卡在DbTxnManager上,hive关键配置如下:
hive.support.concurrency=true
hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
配置对应的默认值及注释:
org.apache.hadoop.hive.conf.HiveConf
HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false, "Whether Hive supports concurrency control or not. " + "A ZooKeeper instance must be up and running when using zookeeper Hive lock manager "), HIVE_TXN_MANAGER("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager", "Set to org.apache.hadoop.hive.ql.lockmgr.DbTxnManager as part of turning on Hive " + "transactions, which also requires appropriate settings for hive.compactor.initiator.on, " + "hive.compactor.worker.threads, hive.support.concurrency (true), hive.enforce.bucketing " + "(true), and hive.exec.dynamic.partition.mode (nonstrict). " + "The default DummyTxnManager replicates pre-Hive-0.13 behavior and provides " + "no transactions."),
二 代码分析
hive执行sql的详细过程详见:https://www.cnblogs.com/barneywill/p/10185168.html
hive中执行sql最终都会调用到Driver.run,run会调用runInternal,下面直接看runInternal代码:
org.apache.hadoop.hive.ql.Driver
private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled) throws CommandNeedRetryException { ... if (requiresLock()) { // a checkpoint to see if the thread is interrupted or not before an expensive operation if (isInterrupted()) { ret = handleInterruption("at acquiring the lock."); } else { ret = acquireLocksAndOpenTxn(startTxnImplicitly); } ... private boolean requiresLock() { if (!checkConcurrency()) { return false; } // Lock operations themselves don't require the lock. if (isExplicitLockOperation()){ return false; } if (!HiveConf.getBoolVar(conf, ConfVars.HIVE_LOCK_MAPRED_ONLY)) { return true; } Queue<Task<? extends Serializable>> taskQueue = new LinkedList<Task<? extends Serializable>>(); taskQueue.addAll(plan.getRootTasks()); while (taskQueue.peek() != null) { Task<? extends Serializable> tsk = taskQueue.remove(); if (tsk.requireLock()) { return true; } ... private boolean checkConcurrency() { boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); if (!supportConcurrency) { LOG.info("Concurrency mode is disabled, not creating a lock manager"); return false; } return true; } private int acquireLocksAndOpenTxn(boolean startTxnImplicitly) { ... txnMgr.acquireLocks(plan, ctx, userFromUGI); ...
runInternal会调用requiresLock判断是否需要lock,requiresLock有两个判断:
- 调用checkConcurrency,checkConcurrency会检查hive.support.concurrency=true才需要lock;
- 调用Task.requireLock,只有部分task才需要lock;
如果判断需要lock,会调用acquireLocksAndOpenTxn,acquireLocksAndOpenTxn会调用HiveTxnManager.acquireLocks来获取lock;
1)先看那些task需要lock:
org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer
private void analyzeAlterTablePartMergeFiles(ASTNode ast, String tableName, HashMap<String, String> partSpec) throws SemanticException { ... DDLWork ddlWork = new DDLWork(getInputs(), getOutputs(), mergeDesc); ddlWork.setNeedLock(true); ...
可见DDL操作需要;
2)再看怎样获取lock:
org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException { try { acquireLocksWithHeartbeatDelay(plan, ctx, username, 0); ... void acquireLocksWithHeartbeatDelay(QueryPlan plan, Context ctx, String username, long delay) throws LockException { LockState ls = acquireLocks(plan, ctx, username, true); ... LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isBlocking) throws LockException { ... switch (output.getType()) { case DATABASE: compBuilder.setDbName(output.getDatabase().getName()); break; case TABLE: case DUMMYPARTITION: // in case of dynamic partitioning lock the table t = output.getTable(); compBuilder.setDbName(t.getDbName()); compBuilder.setTableName(t.getTableName()); break; case PARTITION: compBuilder.setPartitionName(output.getPartition().getName()); t = output.getPartition().getTable(); compBuilder.setDbName(t.getDbName()); compBuilder.setTableName(t.getTableName()); break; default: // This is a file or something we don't hold locks for. continue; } ... LockState lockState = lockMgr.lock(rqstBuilder.build(), queryId, isBlocking, locks); ctx.setHiveLocks(locks); return lockState; }
可见当开启动态分区时,锁的粒度是DbName+TableName,这样就会导致多个sql只有1个sql可以拿到lock,其他sql只能等待;
三 总结
解决问题的方式有几种:
- 关闭动态分区:set hive.exec.dynamic.partition=false
- 关闭并发:set hive.support.concurrency=false
- 关闭事务:set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager
三者任选其一,推荐第1种,因为在刚才的场景下,不需要动态分区;