zoukankan      html  css  js  c++  java
  • 【原创】大数据基础之Hive(1)Hive SQL执行过程之代码流程

    hive 2.1

    hive执行sql有两种方式:

    • 执行hive命令,又细分为hive -e,hive -f,hive交互式;
    • 执行beeline命令,beeline会连接远程thrift server;

    下面分别看这些场景下sql是怎样被执行的:

    1 hive命令

    启动命令

    启动hive客户端命令

    $HIVE_HOME/bin/hive

    等价于

    $HIVE_HOME/bin/hive --service cli

    会调用

    $HIVE_HOME/bin/ext/cli.sh

    实际启动类为:org.apache.hadoop.hive.cli.CliDriver

    代码解析

    org.apache.hadoop.hive.cli.CliDriver

      public static void main(String[] args) throws Exception {
        int ret = new CliDriver().run(args);
        System.exit(ret);
      }
    
      public  int run(String[] args) throws Exception {
    ...
        // execute cli driver work
        try {
          return executeDriver(ss, conf, oproc);
        } finally {
          ss.resetThreadName();
          ss.close();
        }
    ...
    
      private int executeDriver(CliSessionState ss, HiveConf conf, OptionsProcessor oproc)
          throws Exception {
    ...
        if (ss.execString != null) {
          int cmdProcessStatus = cli.processLine(ss.execString);
          return cmdProcessStatus;
        }
    ...
        try {
          if (ss.fileName != null) {
            return cli.processFile(ss.fileName);
          }
        } catch (FileNotFoundException e) {
          System.err.println("Could not open input file for reading. (" + e.getMessage() + ")");
          return 3;
        }
    ...
        while ((line = reader.readLine(curPrompt + "> ")) != null) {
          if (!prefix.equals("")) {
            prefix += '
    ';
          }
          if (line.trim().startsWith("--")) {
            continue;
          }
          if (line.trim().endsWith(";") && !line.trim().endsWith("\;")) {
            line = prefix + line;
            ret = cli.processLine(line, true);
    ...
    
      public int processFile(String fileName) throws IOException {
    ...
          rc = processReader(bufferReader);
    ...
    
      public int processReader(BufferedReader r) throws IOException {
        String line;
        StringBuilder qsb = new StringBuilder();
    
        while ((line = r.readLine()) != null) {
          // Skipping through comments
          if (! line.startsWith("--")) {
            qsb.append(line + "
    ");
          }
        }
    
        return (processLine(qsb.toString()));
      }
      
      public int processLine(String line, boolean allowInterrupting) {
    ...
            ret = processCmd(command);
    ...
    
      public int processCmd(String cmd) {
    ...
            CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf);
            ret = processLocalCmd(cmd, proc, ss);
    ...
    
      int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) {
        int tryCount = 0;
        boolean needRetry;
        int ret = 0;
    
        do {
          try {
            needRetry = false;
            if (proc != null) {
              if (proc instanceof Driver) {
                Driver qp = (Driver) proc;
                PrintStream out = ss.out;
                long start = System.currentTimeMillis();
                if (ss.getIsVerbose()) {
                  out.println(cmd);
                }
    
                qp.setTryCount(tryCount);
                ret = qp.run(cmd).getResponseCode();
    ...
                  while (qp.getResults(res)) {
                    for (String r : res) {
                      out.println(r);
                    }
    ...

    CliDriver.main会调用run,run会调用executeDriver,在executeDriver中对应上边提到的三种情况:

    • 一种是hive -e执行sql,此时ss.execString非空,执行完进程退出;
    • 一种是hive -f执行sql文件,此时ss.fileName非空,执行完进程退出;
    • 一种是hive交互式执行sql,此时会不断读取reader.readLine,然后执行失去了并输出结果;

    上述三种情况最终都会调用processLine,processLine会调用processLocalCmd,在processLocalCmd中会先调用到Driver.run执行sql,执行完之后再调用Driver.getResults输出结果,这也是Driver最重要的两个接口,Driver实现后边再看;

    2 beeline命令

    beeline需要连接到hive thrift server,先看hive thrift server如何启动:

    hive thrift server

    启动命令

    启动hive thrift server命令

    $HIVE_HOME/bin/hiveserver2

    等价于

    $HIVE_HOME/bin/hive --service hiveserver2

    会调用

    $HIVE_HOME/bin/ext/hiveserver2.sh

    实际启动类为:org.apache.hive.service.server.HiveServer2

    启动过程

    HiveServer2.main

             startHiveServer2

                      init

                              addService-CLIService,ThriftBinaryCLIService

                      start

                              Service.start

                                       CLIService.start

                                       ThriftBinaryCLIService.start

                                                TThreadPoolServer.serve

     类结构:【接口或父类->子类】

    TServer->TThreadPoolServer

             TProcessorFactory->SQLPlainProcessorFactory

                      TProcessor->TSetIpAddressProcessor

                              ThriftCLIService->ThriftBinaryCLIService

                                       CLIService

                                                HiveSession

    代码解析

    org.apache.hive.service.cli.thrift.ThriftBinaryCLIService

      public ThriftBinaryCLIService(CLIService cliService, Runnable oomHook) {
        super(cliService, ThriftBinaryCLIService.class.getSimpleName());
        this.oomHook = oomHook;
      }

    ThriftBinaryCLIService是一个核心类,其中会实际启动thrift server,同时包装一个CLIService,请求最后都会调用底层的CLIService处理,下面看CLIService代码:

    org.apache.hive.service.cli.CLIService

      @Override
      public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
          Map<String, String> confOverlay) throws HiveSQLException {
        OperationHandle opHandle =
            sessionManager.getSession(sessionHandle).executeStatement(statement, confOverlay);
        LOG.debug(sessionHandle + ": executeStatement()");
        return opHandle;
      }
      
      @Override
      public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
                                 long maxRows, FetchType fetchType) throws HiveSQLException {
        RowSet rowSet = sessionManager.getOperationManager().getOperation(opHandle)
            .getParentSession().fetchResults(opHandle, orientation, maxRows, fetchType);
        LOG.debug(opHandle + ": fetchResults()");
        return rowSet;
      }

    CLIService最重要的两个接口,一个是executeStatement,一个是fetchResults,两个接口都会转发给HiveSession处理,下面看HiveSession实现类代码:

    org.apache.hive.service.cli.session.HiveSessionImpl

      @Override
      public OperationHandle executeStatement(String statement, Map<String, String> confOverlay) throws HiveSQLException {
        return executeStatementInternal(statement, confOverlay, false, 0);
      }
    
      private OperationHandle executeStatementInternal(String statement,
          Map<String, String> confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException {
        acquire(true, true);
    
        ExecuteStatementOperation operation = null;
        OperationHandle opHandle = null;
        try {
          operation = getOperationManager().newExecuteStatementOperation(getSession(), statement,
              confOverlay, runAsync, queryTimeout);
          opHandle = operation.getHandle();
          operation.run();
    ...
      @Override
      public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
          long maxRows, FetchType fetchType) throws HiveSQLException {
        acquire(true, false);
        try {
          if (fetchType == FetchType.QUERY_OUTPUT) {
            return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows);
          }
          return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows, sessionConf);
        } finally {
          release(true, false);
        }
      }

    可见

    • HiveSessionImpl.executeStatement是调用ExecuteStatementOperation.run(ExecuteStatementOperation是Operation的一种)
    • HiveSessionImpl.fetchResults是调用OperationManager.getOperationNextRowSet,然后会调用到Operation.getNextRowSet

    org.apache.hive.service.cli.operation.OperationManager

      public RowSet getOperationNextRowSet(OperationHandle opHandle,
          FetchOrientation orientation, long maxRows)
              throws HiveSQLException {
        return getOperation(opHandle).getNextRowSet(orientation, maxRows);
      }

    下面写详细看Operation的run和getOperationNextRowSet:

    org.apache.hive.service.cli.operation.Operation

      public void run() throws HiveSQLException {
        beforeRun();
        try {
          Metrics metrics = MetricsFactory.getInstance();
          if (metrics != null) {
            try {
              metrics.incrementCounter(MetricsConstant.OPEN_OPERATIONS);
            } catch (Exception e) {
              LOG.warn("Error Reporting open operation to Metrics system", e);
            }
          }
          runInternal();
        } finally {
          afterRun();
        }
      }
      
      public RowSet getNextRowSet() throws HiveSQLException {
        return getNextRowSet(FetchOrientation.FETCH_NEXT, DEFAULT_FETCH_MAX_ROWS);
      }

    Operation是一个抽象类,

    • run会调用抽象方法runInternal
    • getNextRowSet会调用抽象方法getNextRowSet

    下面会看到这两个抽象方法在子类中的实现,最终会依赖Driver的run和getResults;

    1)先看runInternal在子类HiveCommandOperation中被实现:

    org.apache.hive.service.cli.operation.HiveCommandOperation

      @Override
      public void runInternal() throws HiveSQLException {
        setState(OperationState.RUNNING);
        try {
          String command = getStatement().trim();
          String[] tokens = statement.split("\s");
          String commandArgs = command.substring(tokens[0].length()).trim();
    
          CommandProcessorResponse response = commandProcessor.run(commandArgs);
    ...

    这里会调用CommandProcessor.run,实际会调用Driver.run(Driver是CommandProcessor的实现类);

    2)再看getNextRowSet在子类SQLOperation中被实现:

    org.apache.hive.service.cli.operation.SQLOperation

      public RowSet getNextRowSet(FetchOrientation orientation, long maxRows)
        throws HiveSQLException {
    ...
          driver.setMaxRows((int) maxRows);
          if (driver.getResults(convey)) {
            return decode(convey, rowSet);
          }
    ...

    这里会调用Driver.getResults;

    3 Driver

    通过上面的代码分析发现无论是hive命令行执行还是beeline连接thrift server执行,最终都会依赖Driver,

    Driver最核心的两个接口:

    • run
    • getResults

    代码解析

    org.apache.hadoop.hive.ql.Driver

      @Override
      public CommandProcessorResponse run(String command)
          throws CommandNeedRetryException {
        return run(command, false);
      }
      
      public CommandProcessorResponse run(String command, boolean alreadyCompiled)
            throws CommandNeedRetryException {
        CommandProcessorResponse cpr = runInternal(command, alreadyCompiled);
    ...
      private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled)
          throws CommandNeedRetryException {
    ...
            ret = compileInternal(command, true);
    ...
          ret = execute(true);
    ...
      private int compileInternal(String command, boolean deferClose) {
    ...
          ret = compile(command, true, deferClose);
    ...
      public int compile(String command, boolean resetTaskIds, boolean deferClose) {
    ...
          plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
            queryState.getHiveOperation(), schema);
    ...
      public int execute(boolean deferClose) throws CommandNeedRetryException {
    ...
          // Add root Tasks to runnable
          for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
            // This should never happen, if it does, it's a bug with the potential to produce
            // incorrect results.
            assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty();
            driverCxt.addToRunnable(tsk);
          }
    ...
          // Loop while you either have tasks running, or tasks queued up
          while (driverCxt.isRunning()) {
    
            // Launch upto maxthreads tasks
            Task<? extends Serializable> task;
            while ((task = driverCxt.getRunnable(maxthreads)) != null) {
              TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
              if (!runner.isRunning()) {
                break;
              }
            }
    
            // poll the Tasks to see which one completed
            TaskRunner tskRun = driverCxt.pollFinished();
            if (tskRun == null) {
              continue;
            }
            hookContext.addCompleteTask(tskRun);
            queryDisplay.setTaskResult(tskRun.getTask().getId(), tskRun.getTaskResult());
    
            Task<? extends Serializable> tsk = tskRun.getTask();
            TaskResult result = tskRun.getTaskResult();
    ...
            if (tsk.getChildTasks() != null) {
              for (Task<? extends Serializable> child : tsk.getChildTasks()) {
                if (DriverContext.isLaunchable(child)) {
                  driverCxt.addToRunnable(child);
                }
              }
            }
          }
    
      public boolean getResults(List res) throws IOException, CommandNeedRetryException {
        if (driverState == DriverState.DESTROYED || driverState == DriverState.CLOSED) {
          throw new IOException("FAILED: query has been cancelled, closed, or destroyed.");
        }
    
        if (isFetchingTable()) {
          /**
           * If resultset serialization to thrift object is enabled, and if the destination table is
           * indeed written using ThriftJDBCBinarySerDe, read one row from the output sequence file,
           * since it is a blob of row batches.
           */
          if (fetchTask.getWork().isUsingThriftJDBCBinarySerDe()) {
            maxRows = 1;
          }
          fetchTask.setMaxRows(maxRows);
          return fetchTask.fetch(res);
        }
    ...
    • Driver的run会调用runInternal,runInternal中会先compileInternal编译sql并生成QueryPlan,然后调用execute执行QueryPlan中的所有task;
    • Driver的getResults会调用FetchTask的fetch来获取结果;

    Hive SQL解析过程详见: https://www.cnblogs.com/barneywill/p/10186644.html

  • 相关阅读:
    H5调用本地摄像头[转]
    [转]把树莓派配置成无线路由器
    [转]Raspberry Pi做成路由器
    websocket for python
    HRMS(人力资源管理系统)-SaaS架构设计-概要设计实践
    HRMS(人力资源管理系统)-从单机应用到SaaS应用-架构分析(功能性、非功能性、关键约束)-下篇
    HRMS(人力资源管理系统)-从单机应用到SaaS应用-架构分析(功能性、非功能性、关键约束)-上篇
    系统架构-设计模式(适配器、观察者、代理、抽象工厂等)及架构模式(C/S、B/S、分布式、SOA、SaaS)(干货)
    HRMS(人力资源管理系统)-从单机应用到SaaS应用-系统介绍
    2018,全新出发(全力推动实现住有所居)
  • 原文地址:https://www.cnblogs.com/barneywill/p/10185168.html
Copyright © 2011-2022 走看看