zoukankan      html  css  js  c++  java
  • 【原创】大数据基础之Spark(3)Spark Thrift实现原理及代码实现

    spark 2.1.1

    一 启动命令

    启动spark thrift命令

    $SPARK_HOME/sbin/start-thriftserver.sh

    然后会执行

    org.apache.spark.deploy.SparkSubmit --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2

    二 启动过程及代码分析

    hive thrift代码详见:https://www.cnblogs.com/barneywill/p/10185168.html

    HiveThriftServer2是spark thrift核心类,继承自Hive的HiveServer2

    org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 extends org.apache.hive.service.server.HiveServer2

    启动过程:

    HiveThriftServer2.main

             SparkSQLEnv.init (sparkConf sparkSession sparkContext sqlContext)

             HiveThriftServer2.init

                      addService(ThriftBinaryCLIService)

             HiveThriftServer2.start

                      ThriftBinaryCLIService.run

                              TServer.serve

    类结构:【接口或父类->子类】

    TServer->TThreadPoolServer

             TProcessorFactory->SQLPlainProcessorFactory

                      TProcessor->TSetIpAddressProcessor

                              ThriftCLIService->ThriftBinaryCLIService

                                       CLIService->SparkSQLCLIService (核心子类)

    服务初始化过程:

    CLIService.init

             SparkSQLCLIService.init

                      addService(SparkSQLSessionManager)

                      initCompositeService

                              SparkSQLSessionManager.init

                                       addService(SparkSQLOperationManager)

                                       initCompositeService

                                                SparkSQLOperationManager.init

    三 DDL执行过程

    ddl执行过程需要和hive metastore交互

    从执行计划开始:

    spark-sql> explain create table test_table(id string);
    == Physical Plan ==
    ExecutedCommand
    +- CreateTableCommand CatalogTable(
    Table: `test_table`
    Created: Wed Dec 19 18:04:15 CST 2018
    Last Access: Thu Jan 01 07:59:59 CST 1970
    Type: MANAGED
    Schema: [StructField(id,StringType,true)]
    Provider: hive
    Storage(InputFormat: org.apache.hadoop.mapred.TextInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false
    Time taken: 0.28 seconds, Fetched 1 row(s)

    从执行计划里可以找到具体的Command,这里是CreateTableCommand 

    org.apache.spark.sql.execution.command.tables

    case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand {
    
      override def run(sparkSession: SparkSession): Seq[Row] = {
        sparkSession.sessionState.catalog.createTable(table, ifNotExists)
        Seq.empty[Row]
      }
    }

    这里可以看到是直接将请求分发给sparkSession.sessionState.catalog

    org.apache.spark.sql.internal.SessionState

      /**
       * Internal catalog for managing table and database states.
       */
      lazy val catalog = new SessionCatalog(
        sparkSession.sharedState.externalCatalog,
        sparkSession.sharedState.globalTempViewManager,
        functionResourceLoader,
        functionRegistry,
        conf,
        newHadoopConf())

    取的是sparkSession.sharedState.externalCatalog

    org.apache.spark.sql.internal.SharedState

      /**
       * A catalog that interacts with external systems.
       */
      val externalCatalog: ExternalCatalog =
        SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
          SharedState.externalCatalogClassName(sparkContext.conf),
          sparkContext.conf,
          sparkContext.hadoopConfiguration)
    ...
      private val HIVE_EXTERNAL_CATALOG_CLASS_NAME = "org.apache.spark.sql.hive.HiveExternalCatalog"
    
      private def externalCatalogClassName(conf: SparkConf): String = {
        conf.get(CATALOG_IMPLEMENTATION) match {
          case "hive" => HIVE_EXTERNAL_CATALOG_CLASS_NAME
          case "in-memory" => classOf[InMemoryCatalog].getCanonicalName
        }
      }

    这里可以看到是通过externalCatalogClassName反射实例化的,代码里硬编码使用的是org.apache.spark.sql.hive.HiveExternalCatalog

    org.apache.spark.sql.hive.HiveExternalCatalog

      /**
       * A Hive client used to interact with the metastore.
       */
      val client: HiveClient = {
        HiveUtils.newClientForMetadata(conf, hadoopConf)
      }
    
      private def withClient[T](body: => T): T = synchronized {
        try {
          body
        } catch {
          case NonFatal(exception) if isClientException(exception) =>
            val e = exception match {
              // Since we are using shim, the exceptions thrown by the underlying method of
              // Method.invoke() are wrapped by InvocationTargetException
              case i: InvocationTargetException => i.getCause
              case o => o
            }
            throw new AnalysisException(
              e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e))
        }
      }
    
      override def createDatabase(
          dbDefinition: CatalogDatabase,
          ignoreIfExists: Boolean): Unit = withClient {
        client.createDatabase(dbDefinition, ignoreIfExists)
      }

    这个类里执行任何ddl方法都会执行withClient,而withClient有synchronized,执行过程是直接把请求分发给client,下面看client是什么

    org.apache.spark.sql.hive.client.IsolatedClientLoader

      /** The isolated client interface to Hive. */
      private[hive] def createClient(): HiveClient = {
        if (!isolationOn) {
          return new HiveClientImpl(version, sparkConf, hadoopConf, config, baseClassLoader, this)
        }
        // Pre-reflective instantiation setup.
        logDebug("Initializing the logger to avoid disaster...")
        val origLoader = Thread.currentThread().getContextClassLoader
        Thread.currentThread.setContextClassLoader(classLoader)
    
        try {
          classLoader
            .loadClass(classOf[HiveClientImpl].getName)
            .getConstructors.head
            .newInstance(version, sparkConf, hadoopConf, config, classLoader, this)
            .asInstanceOf[HiveClient]
        } catch {

    可见client直接用的是org.apache.spark.sql.hive.client.HiveClientImpl

    org.apache.spark.sql.hive.client.HiveClientImpl

      def withHiveState[A](f: => A): A = retryLocked {
        val original = Thread.currentThread().getContextClassLoader
        // Set the thread local metastore client to the client associated with this HiveClientImpl.
        Hive.set(client)
        // The classloader in clientLoader could be changed after addJar, always use the latest
        // classloader
        state.getConf.setClassLoader(clientLoader.classLoader)
        // setCurrentSessionState will use the classLoader associated
        // with the HiveConf in `state` to override the context class loader of the current
        // thread.
        shim.setCurrentSessionState(state)
        val ret = try f finally {
          Thread.currentThread().setContextClassLoader(original)
          HiveCatalogMetrics.incrementHiveClientCalls(1)
        }
        ret
      }
      private def retryLocked[A](f: => A): A = clientLoader.synchronized {
    ...
    
      override def createDatabase(
          database: CatalogDatabase,
          ignoreIfExists: Boolean): Unit = withHiveState {
        client.createDatabase(
          new HiveDatabase(
            database.name,
            database.description,
            database.locationUri,
            Option(database.properties).map(_.asJava).orNull),
            ignoreIfExists)
      }

    这个类执行任何ddl方法都会执行withHiveState,withHiveState会执行retryLocked,retryLocked上有synchronized;而且这里也是直接将请求分发给client,这里的client是hive的类org.apache.hadoop.hive.ql.metadata.Hive

    四 DML执行过程

    dml执行过程最后会执行到spark.sql

    sql执行过程:

    CLIService.executeStatement (返回OperationHandle)

             SessionManager.getSession

             SessionManager.openSession

                      SparkSQLSessionManager.openSession

                              SparkSQLOperationManager.sessionToContexts.set (openSession时:session和sqlContext建立映射)

             HiveSession.executeStatement

                      HiveSessionImpl.executeStatementInternal

                              OperationManager.newExecuteStatementOperation

                                       SparkSQLOperationManager.newExecuteStatementOperation

                                                SparkSQLOperationManager.sessionToContexts.get (通过session取到sqlContext)

                              ExecuteStatementOperation.run

                                       SparkExecuteStatementOperation.run

                                                SparkExecuteStatementOperation.execute

                                                         SQLContext.sql (熟悉的spark sql)

    可见从SparkSQLCLIService初始化开始,逐个将各个类的实现类改为spark的子类比如:

    org.apache.spark.sql.hive.thriftserver.SparkSQLSessionManager extends org.apache.hive.service.cli.session.SessionManager
    org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager extends org.apache.hive.service.cli.operation.OperationManager
    org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation extends org.apache.hive.service.cli.operation.ExecuteStatementOperation

    从而实现底层实现的替换;

    hive的HiveServer2为什么这么容易的被扩展,详见spark代码的sql/hive-thriftserver,这里应该是将hive1.2代码做了很多修改,以后升级就不那么容易了;
    至于spark为什么要花这么大力气扩展HiveServer2而不是重新实现,可能是为了保持接口一致,这样有利于原来使用hive thrift的用户平滑的迁移到spark thrift,因为唯一的改动就是切换url,实际上,相同sql下的spark thrift和hive thrift表现还是有很多不同的。

  • 相关阅读:
    2015抢票记事
    Provide your license server administrator with the following information.error code =-42,147
    微信支付现金红包接口
    SQL Server 触发器
    SQL增删查改注意的事项
    SQL while循环
    SQL SERVER 中is null 和 is not null 将会导致索引失效吗?
    HTML中head里的内容经浏览器解析后全到body里了
    sqLSERVER 计划缓存
    通用分页存储过程
  • 原文地址:https://www.cnblogs.com/barneywill/p/10137672.html
Copyright © 2011-2022 走看看