zoukankan      html  css  js  c++  java
  • spark 分析任务血缘关系

    接到新的需求,在spark中增加埋点,解析任务的血缘关系,包括sql和代码方式,不包括中间临时视图(createOrReplaceTempView(XXX表))。

    有位同学已经https://www.cnblogs.com/wuxilc/p/9326130.html 做了hive解析相关的,但是spark部分因为hive parseDriver解析不了。

    还是在spark中搞搞吧。

    spark  BaseSessionStateBuilder 类

      /**
    * Build the [[SessionState]].
    */
    def build(): SessionState = {
    new SessionState(
    session.sharedState,
    conf,
    experimentalMethods,
    functionRegistry,
    udfRegistration,
    () => catalog,
    sqlParser,
    () => analyzer,
    () => optimizer,
    planner,
    streamingQueryManager,
    listenerManager,
    () => resourceLoader,
    createQueryExecution,
    createClone)
    }
    }

    optimizer 是spark语法优化器,HiveSessionStateBilder 继承了BaseSessionStateBuilder类

     Spark 所有优化器都继承抽象类Optimizer

    /**
    * Abstract class all optimizers should inherit of, contains the standard batches (extending
    * Optimizers can override this.
    */
    abstract class Optimizer(sessionCatalog: SessionCatalog)
    extends RuleExecutor[LogicalPlan] {


    在优化器 rule添加新的匹配规则
    HiveSessionStateBuilder 类中修改
    override lazy val optimizer: Optimizer = new SparkOptimizer(catalog, conf, experimentalMethods) {
    override def batches: Seq[Batch] = super.batches :+
    Batch("Determine stats of partitionedTable", Once,
    DeterminePartitionedTableStats(sparkSession)) :+
    Batch("Collect read and write tables", Once, DependencyCollect(sparkSession))
    }


    添加如下代码 匹配inset、create等语句解析出输入表输出表。
    case class DependencyCollect(sparkSession: SparkSession) extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = {
    if (sparkSession.sparkContext.conf.getBoolean("spark.collectDependencies", true)) {
    val readTables = mutable.HashSet[String]()
    val writeTables = mutable.HashSet[String]()
    plan transformDown {
    case a@InsertIntoHiveTable(table: CatalogTable,_,_,_,_,_) =>
    writeTables += s"${fillBlankDatabase(table)}.${table.identifier.table}"
    a
    case i@InsertIntoTable(table: HiveTableRelation, _, _, _, _) =>
    writeTables += s"${table.tableMeta.database}.${table.tableMeta.identifier.table}"
    i
    case c@CreateTable(table: CatalogTable, _, _) =>
    writeTables += s"${fillBlankDatabase(table)}.${table.identifier.table}"
    c
    case d@CreateTableCommand(table: CatalogTable, _) =>
    writeTables += s"${fillBlankDatabase(table)}.${table.identifier.table}"
    d
    case p@PhysicalOperation(_, _, table: HiveTableRelation) =>
    readTables += s"${table.tableMeta.database}.${table.tableMeta.identifier.table}"
    p
    }
    if (readTables.size > 0 || writeTables.size > 0) {
    logInfo(String.format("src table -> %s target table -> %s", readTables.mkString(","), writeTables.mkString(",")))
    AsyncExecution.AsycnHandle(new CallChain.Event(s"${readTables.mkString(",")}#${writeTables.mkString(",")}", AsyncExecution.getSparkAppName(sparkSession.sparkContext.conf), "bloodlineage"))
    sparkSession.sparkContext.listenerBus.post(DependencyEvent(readTables, writeTables))
    }
    }
    plan
    }

    private def fillBlankDatabase(table: CatalogTable): String = {
    var database = ""
    if (table.database.isEmpty) {
    database = sparkSession.sessionState.catalog.getCurrentDatabase
    } else {
    database = table.database
    }
    database
    }





  • 相关阅读:
    自建mail服务器之一:dns解析
    区间树
    3d tech
    3d
    平板比较
    Node。js 访问gmail
    node nightmare 网页自动化测试 sample
    node start
    中國駐香港外交部
    create a simple COM object
  • 原文地址:https://www.cnblogs.com/songchaolin/p/12807224.html
Copyright © 2011-2022 走看看