zoukankan      html  css  js  c++  java
  • Spark SQL(3) Parser到Unresolved LogicPlan

    Spark SQL Parser到Unresolved LogicPlan

    Spark SQL Parser简单来说就是将sql语句解析成为算子树的过程,在这个过程中,spark sql采用了antlr4来完成。

    当执行spark.sql()方法时,会调用

    Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
    实际会调用:
    /** Creates LogicalPlan for a given SQL string. */
    override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
    val tmp = astBuilder.visitSingleStatement(parser.singleStatement())
    tmp match {
    case plan: LogicalPlan => plan
    case _ =>
    val position = Origin(None, None)
    throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
    }
    }
    解析详细的操作如下:
    protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
    logDebug(s"Parsing command: $command")
    val oo = CharStreams.fromString(command)
    logInfo(s"Parsing command: $oo")

    val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
    lexer.removeErrorListeners()
    lexer.addErrorListener(ParseErrorListener)

    val tokenStream = new CommonTokenStream(lexer)
    val parser = new SqlBaseParser(tokenStream)
    parser.addParseListener(PostProcessor)
    parser.removeErrorListeners()
    parser.addErrorListener(ParseErrorListener)

    try {
    try {
    // first, try parsing with potentially faster SLL mode
    parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
    toResult(parser)
    }
    catch {
    case e: ParseCancellationException =>
    // if we fail, parse with LL mode
    tokenStream.seek(0) // rewind input stream
    parser.reset()

    // Try Again.
    parser.getInterpreter.setPredictionMode(PredictionMode.LL)
    toResult(parser)
    }
    }
    catch {
    case e: ParseException if e.command.isDefined =>
    throw e
    case e: ParseException =>
    throw e.withCommand(command)
    case e: AnalysisException =>
    val position = Origin(e.line, e.startPosition)
    throw new ParseException(Option(command), e.message, position, position)
    }
    }
    }

          在这里面会利用antlr4来解析整个sql语句,首先会尝试使用比较快速的SLL方式来解析,如果失败会转而使用LL方式来解析。解析完成的sqlBaseParser会调用singleStatement()之后会构建整棵树

    之后调用AstBuilder的visitSingleStatement来递归查看每个节点,来返回生成的LogicalPlan, 这一步,主要利用antlr4生成的代码,使用访问者模式来挨个查看各个节点的处理,返回对应的结果,其主要的实现是继承了SqlBaseBaseVisitor的AstBuild类中。

    假设有一段sql如下:

    SELECT * FROM NAME WHERE AGE > 10 

    那么它经过antlr4解析之后的树结构如下:

    在AstBuilder.visitSingleStatement方法中:

    override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan = withOrigin(ctx) {
      val statement: StatementContext = ctx.statement
      printRuleContextInTreeStyle(statement, 1)
      visit(ctx.statement).asInstanceOf[LogicalPlan]
    }
    

      首先输入是SingleStatementContext,实际也就是上面树形图里面的根节点,之后获取了根节点下面的StatementContext,由上图可知实际获取的就是StatementDefaultContext,再来看看他的accept方法

    @Override
    		public <T> T accept(ParseTreeVisitor<? extends T> visitor) {
    			if ( visitor instanceof SqlBaseVisitor ) return ((SqlBaseVisitor<? extends T>)visitor).visitStatementDefault(this);
    			else return visitor.visitChildren(this);
    		}
    

      大多数的节点都是类似这样的实现,对某些特殊的节点例如FromClauseContext,在AstBuilder中有特殊的处理逻辑实现。所以AstBuilder解析整棵树都是通过遍历整棵树,形成logicalPlan。

    在上面的整棵树里面在解析到QuerySpecification节点时,在这里面会触发形成logicalPlan的操作:

     /**
       * Create a logical plan using a query specification.
       */
      override def visitQuerySpecification(
          ctx: QuerySpecificationContext): LogicalPlan = withOrigin(ctx) {
        val from = OneRowRelation().optional(ctx.fromClause) {
          visitFromClause(ctx.fromClause)
        }
        withQuerySpecification(ctx, from)
      }
    

      在visitFromClause中会去查看FromClause节点下面的Relation,同时如果有join操作的话,也会在其中解析join操作,生成from子树;

          在from解析之后就是在withQuerySpecification中携带着from子树解析where,聚合、表达式等子节点形成一颗对整个sql解析之后的树结构。

    /**
       * Add a query specification to a logical plan. The query specification is the core of the logical
       * plan, this is where sourcing (FROM clause), transforming (SELECT TRANSFORM/MAP/REDUCE),
       * projection (SELECT), aggregation (GROUP BY ... HAVING ...) and filtering (WHERE) takes place.
       *
       * Note that query hints are ignored (both by the parser and the builder).
       */
      private def withQuerySpecification(
          ctx: QuerySpecificationContext,
          relation: LogicalPlan): LogicalPlan = withOrigin(ctx) {
        import ctx._
    
        // WHERE
        def filter(ctx: BooleanExpressionContext, plan: LogicalPlan): LogicalPlan = {
          Filter(expression(ctx), plan)
        }
    
        // Expressions.
        val expressions = Option(namedExpressionSeq).toSeq
          .flatMap(_.namedExpression.asScala)
          .map(typedVisit[Expression])
    
        // Create either a transform or a regular query.
        val specType = Option(kind).map(_.getType).getOrElse(SqlBaseParser.SELECT)
        specType match {
          case SqlBaseParser.MAP | SqlBaseParser.REDUCE | SqlBaseParser.TRANSFORM =>
            // Transform
    
            // Add where.
            val withFilter = relation.optionalMap(where)(filter)
    
            // Create the attributes.
            val (attributes, schemaLess) = if (colTypeList != null) {
              // Typed return columns.
              (createSchema(colTypeList).toAttributes, false)
            } else if (identifierSeq != null) {
              // Untyped return columns.
              val attrs = visitIdentifierSeq(identifierSeq).map { name =>
                AttributeReference(name, StringType, nullable = true)()
              }
              (attrs, false)
            } else {
              (Seq(AttributeReference("key", StringType)(),
                AttributeReference("value", StringType)()), true)
            }
    
            // Create the transform.
            ScriptTransformation(
              expressions,
              string(script),
              attributes,
              withFilter,
              withScriptIOSchema(
                ctx, inRowFormat, recordWriter, outRowFormat, recordReader, schemaLess))
    
          case SqlBaseParser.SELECT =>
            // Regular select
    
            // Add lateral views.
            val withLateralView = ctx.lateralView.asScala.foldLeft(relation)(withGenerate)
    
            // Add where.
            val withFilter = withLateralView.optionalMap(where)(filter)
    
            // Add aggregation or a project.
            val namedExpressions = expressions.map {
              case e: NamedExpression => e
              case e: Expression => UnresolvedAlias(e)
            }
            val withProject = if (aggregation != null) {
              withAggregation(aggregation, namedExpressions, withFilter)
            } else if (namedExpressions.nonEmpty) {
              Project(namedExpressions, withFilter)
            } else {
              withFilter
            }
    
            // Having
            val withHaving = withProject.optional(having) {
              // Note that we add a cast to non-predicate expressions. If the expression itself is
              // already boolean, the optimizer will get rid of the unnecessary cast.
              val predicate = expression(having) match {
                case p: Predicate => p
                case e => Cast(e, BooleanType)
              }
              Filter(predicate, withProject)
            }
    
            // Distinct
            val withDistinct = if (setQuantifier() != null && setQuantifier().DISTINCT() != null) {
              Distinct(withHaving)
            } else {
              withHaving
            }
    
            // Window
            val withWindow = withDistinct.optionalMap(windows)(withWindows)
    
            // Hint
            hints.asScala.foldRight(withWindow)(withHints)
        }
      }
    

      在这里解析后的算子树是未绑定的树结构算是unresolved LogicPlan。

          总结,这一步的理解主要是spark中sqlBase.g4文件中的定义,然后就是对于观察者模式中生成的默认代码,其实大多数的代码规律都一样,之后就是AstBuild类中对某些节点逻辑重写;这里如果想仔细研究这步的话,可以参考上面,打开spark中的sqlBase.g4文件,然后在idea里面打开ANTLR Preview在里面输入sql,然后在sqlBase.g4文件里面选中singleExpression右击test Rule,然后看生成的树结构,其实对应的就是AstBuild树从上往下遍历的节点,这里面有的节点AstBuilder没有重写,直接遍历子节点,有的在AstBuilder里面有重写可以看看其具体逻辑,主要就是上面讲到的QuerySpecification、FromClauseContext、booleanDefault、nameExpression等。

  • 相关阅读:
    jQuery EasyUI API 中文文档 可调整尺寸
    jQuery EasyUI API 中文文档 链接按钮(LinkButton)
    jQuery EasyUI API 中文文档 手风琴(Accordion)
    jQuery EasyUI API 中文文档 表单(Form)
    jQuery EasyUI API 中文文档 组合(Combo)
    jQuery EasyUI API 中文文档 布局(Layout)
    jQuery EasyUI API 中文文档 拆分按钮(SplitButton)
    jQuery EasyUI API 中文文档 菜单按钮(MenuButton)
    jQuery EasyUI API 中文文档 搜索框
    jQuery EasyUI API 中文文档 验证框(ValidateBox)
  • 原文地址:https://www.cnblogs.com/ldsggv/p/13380370.html
Copyright © 2011-2022 走看看