zoukankan      html  css  js  c++  java
  • Spark SQL(3) Parser到Unresolved LogicPlan

    Spark SQL Parser到Unresolved LogicPlan

    Spark SQL Parser简单来说就是将sql语句解析成为算子树的过程,在这个过程中,spark sql采用了antlr4来完成。

    当执行spark.sql()方法时,会调用

    Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
    实际会调用:
    /** Creates LogicalPlan for a given SQL string. */
    override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
    val tmp = astBuilder.visitSingleStatement(parser.singleStatement())
    tmp match {
    case plan: LogicalPlan => plan
    case _ =>
    val position = Origin(None, None)
    throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
    }
    }
    解析详细的操作如下:
    protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
    logDebug(s"Parsing command: $command")
    val oo = CharStreams.fromString(command)
    logInfo(s"Parsing command: $oo")

    val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
    lexer.removeErrorListeners()
    lexer.addErrorListener(ParseErrorListener)

    val tokenStream = new CommonTokenStream(lexer)
    val parser = new SqlBaseParser(tokenStream)
    parser.addParseListener(PostProcessor)
    parser.removeErrorListeners()
    parser.addErrorListener(ParseErrorListener)

    try {
    try {
    // first, try parsing with potentially faster SLL mode
    parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
    toResult(parser)
    }
    catch {
    case e: ParseCancellationException =>
    // if we fail, parse with LL mode
    tokenStream.seek(0) // rewind input stream
    parser.reset()

    // Try Again.
    parser.getInterpreter.setPredictionMode(PredictionMode.LL)
    toResult(parser)
    }
    }
    catch {
    case e: ParseException if e.command.isDefined =>
    throw e
    case e: ParseException =>
    throw e.withCommand(command)
    case e: AnalysisException =>
    val position = Origin(e.line, e.startPosition)
    throw new ParseException(Option(command), e.message, position, position)
    }
    }
    }

          在这里面会利用antlr4来解析整个sql语句,首先会尝试使用比较快速的SLL方式来解析,如果失败会转而使用LL方式来解析。解析完成的sqlBaseParser会调用singleStatement()之后会构建整棵树

    之后调用AstBuilder的visitSingleStatement来递归查看每个节点,来返回生成的LogicalPlan, 这一步,主要利用antlr4生成的代码,使用访问者模式来挨个查看各个节点的处理,返回对应的结果,其主要的实现是继承了SqlBaseBaseVisitor的AstBuild类中。

    假设有一段sql如下:

    SELECT * FROM NAME WHERE AGE > 10 

    那么它经过antlr4解析之后的树结构如下:

    在AstBuilder.visitSingleStatement方法中:

    override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan = withOrigin(ctx) {
      val statement: StatementContext = ctx.statement
      printRuleContextInTreeStyle(statement, 1)
      visit(ctx.statement).asInstanceOf[LogicalPlan]
    }
    

      首先输入是SingleStatementContext,实际也就是上面树形图里面的根节点,之后获取了根节点下面的StatementContext,由上图可知实际获取的就是StatementDefaultContext,再来看看他的accept方法

    @Override
    		public <T> T accept(ParseTreeVisitor<? extends T> visitor) {
    			if ( visitor instanceof SqlBaseVisitor ) return ((SqlBaseVisitor<? extends T>)visitor).visitStatementDefault(this);
    			else return visitor.visitChildren(this);
    		}
    

      大多数的节点都是类似这样的实现,对某些特殊的节点例如FromClauseContext,在AstBuilder中有特殊的处理逻辑实现。所以AstBuilder解析整棵树都是通过遍历整棵树,形成logicalPlan。

    在上面的整棵树里面在解析到QuerySpecification节点时,在这里面会触发形成logicalPlan的操作:

     /**
       * Create a logical plan using a query specification.
       */
      override def visitQuerySpecification(
          ctx: QuerySpecificationContext): LogicalPlan = withOrigin(ctx) {
        val from = OneRowRelation().optional(ctx.fromClause) {
          visitFromClause(ctx.fromClause)
        }
        withQuerySpecification(ctx, from)
      }
    

      在visitFromClause中会去查看FromClause节点下面的Relation,同时如果有join操作的话,也会在其中解析join操作,生成from子树;

          在from解析之后就是在withQuerySpecification中携带着from子树解析where,聚合、表达式等子节点形成一颗对整个sql解析之后的树结构。

    /**
       * Add a query specification to a logical plan. The query specification is the core of the logical
       * plan, this is where sourcing (FROM clause), transforming (SELECT TRANSFORM/MAP/REDUCE),
       * projection (SELECT), aggregation (GROUP BY ... HAVING ...) and filtering (WHERE) takes place.
       *
       * Note that query hints are ignored (both by the parser and the builder).
       */
      private def withQuerySpecification(
          ctx: QuerySpecificationContext,
          relation: LogicalPlan): LogicalPlan = withOrigin(ctx) {
        import ctx._
    
        // WHERE
        def filter(ctx: BooleanExpressionContext, plan: LogicalPlan): LogicalPlan = {
          Filter(expression(ctx), plan)
        }
    
        // Expressions.
        val expressions = Option(namedExpressionSeq).toSeq
          .flatMap(_.namedExpression.asScala)
          .map(typedVisit[Expression])
    
        // Create either a transform or a regular query.
        val specType = Option(kind).map(_.getType).getOrElse(SqlBaseParser.SELECT)
        specType match {
          case SqlBaseParser.MAP | SqlBaseParser.REDUCE | SqlBaseParser.TRANSFORM =>
            // Transform
    
            // Add where.
            val withFilter = relation.optionalMap(where)(filter)
    
            // Create the attributes.
            val (attributes, schemaLess) = if (colTypeList != null) {
              // Typed return columns.
              (createSchema(colTypeList).toAttributes, false)
            } else if (identifierSeq != null) {
              // Untyped return columns.
              val attrs = visitIdentifierSeq(identifierSeq).map { name =>
                AttributeReference(name, StringType, nullable = true)()
              }
              (attrs, false)
            } else {
              (Seq(AttributeReference("key", StringType)(),
                AttributeReference("value", StringType)()), true)
            }
    
            // Create the transform.
            ScriptTransformation(
              expressions,
              string(script),
              attributes,
              withFilter,
              withScriptIOSchema(
                ctx, inRowFormat, recordWriter, outRowFormat, recordReader, schemaLess))
    
          case SqlBaseParser.SELECT =>
            // Regular select
    
            // Add lateral views.
            val withLateralView = ctx.lateralView.asScala.foldLeft(relation)(withGenerate)
    
            // Add where.
            val withFilter = withLateralView.optionalMap(where)(filter)
    
            // Add aggregation or a project.
            val namedExpressions = expressions.map {
              case e: NamedExpression => e
              case e: Expression => UnresolvedAlias(e)
            }
            val withProject = if (aggregation != null) {
              withAggregation(aggregation, namedExpressions, withFilter)
            } else if (namedExpressions.nonEmpty) {
              Project(namedExpressions, withFilter)
            } else {
              withFilter
            }
    
            // Having
            val withHaving = withProject.optional(having) {
              // Note that we add a cast to non-predicate expressions. If the expression itself is
              // already boolean, the optimizer will get rid of the unnecessary cast.
              val predicate = expression(having) match {
                case p: Predicate => p
                case e => Cast(e, BooleanType)
              }
              Filter(predicate, withProject)
            }
    
            // Distinct
            val withDistinct = if (setQuantifier() != null && setQuantifier().DISTINCT() != null) {
              Distinct(withHaving)
            } else {
              withHaving
            }
    
            // Window
            val withWindow = withDistinct.optionalMap(windows)(withWindows)
    
            // Hint
            hints.asScala.foldRight(withWindow)(withHints)
        }
      }
    

      在这里解析后的算子树是未绑定的树结构算是unresolved LogicPlan。

          总结,这一步的理解主要是spark中sqlBase.g4文件中的定义,然后就是对于观察者模式中生成的默认代码,其实大多数的代码规律都一样,之后就是AstBuild类中对某些节点逻辑重写;这里如果想仔细研究这步的话,可以参考上面,打开spark中的sqlBase.g4文件,然后在idea里面打开ANTLR Preview在里面输入sql,然后在sqlBase.g4文件里面选中singleExpression右击test Rule,然后看生成的树结构,其实对应的就是AstBuild树从上往下遍历的节点,这里面有的节点AstBuilder没有重写,直接遍历子节点,有的在AstBuilder里面有重写可以看看其具体逻辑,主要就是上面讲到的QuerySpecification、FromClauseContext、booleanDefault、nameExpression等。

  • 相关阅读:
    Android中Handler的使用
    Android ListView使用
    Android ListView的XML属性
    Android ListView几个重要属性
    Android设置日期DatePickerDialog
    Android资源文件说明
    Android使用xml文件中的array资源
    Android:控件Spinner实现下拉列表
    如何搭建个人博客网站(Mac)
    SVProgressHUD源码解读(2.0.3)
  • 原文地址:https://www.cnblogs.com/ldsggv/p/13380370.html
Copyright © 2011-2022 走看看