zoukankan      html  css  js  c++  java
  • Spark读取分析在ES中存储的SQL

    用户通过elasticsearch-sql对存储在elasticsearch中的数据进行查询,假设事先会把查询语句保存在elasticsearch中,那么如何对这些sql语句中涉及到的表进行统计?

    Spark读取Elasticsearch

    import org.elasticsearch.spark._
    val esOptions = Map("es.nodes"->"localhost", "es.port"->"9200","es.mapping.date.rich"->"false")
    val esRDD = spark.sparkContext.esRDD("collectorapimetricslog2-2020.12/logs", esOptions)
    esRDD.take(20).foreach(println)
    val esJsonRDD = esRDD.map(x=>{
      import org.json4s._
      import org.json4s.JsonDSL._
      import org.json4s.jackson.JsonMethods._
      import org.json4s.jackson.Serialization
      import org.json4s.DefaultFormats
      implicit val json4sFormats = DefaultFormats
      val origM = x._2
      Serialization.writePretty(origM)
    })
    val esDF = spark.read.json(esRDD)
    

    用RDD方式把query语句从es中读取出来,转换为json串之后,再转换为DataFrame。

    那为什么不直接采用Elasticsearch-Hadoop中提供的Dataframe接口方式, 原因在于使用DataFrame方式直接读取,会有多种格式不匹配或出错的问题出现,elasticsearch-hadoop在兼容性方面,还有许多细节考虑不周。

    JSqlParser

    使用JSqlParser把query语句中涉及到的表找出来

    第一步, 加载jsqlparser库

    bin/spark-shell --packages "com.github.jsqlparser:jsqlparser:3.1"
    

    第二步, 分析使用的代码,先去除识别上错误,然后parse

    import net.sf.jsqlparser.util.TablesNamesFinder._
    import net.sf.jsqlparser.util.TablesNamesFinder
    import net.sf.jsqlparser.parser.CCJSqlParserUtil
    import net.sf.jsqlparser.statement.select._
    val stmt = CCJSqlParserUtil.parse("select * from tabl1 a join tab2 b on a.id=b.id")
    val sel = stmt.asInstanceOf[Select]
    val tblFinder = new TablesNamesFinder()
    tblFinder.getTableList(sel)
    
    val esQueryContentDF = esDF.filter("engine=='es'").select("queryContent")
    val parsedQueryDF = esQueryContentDF.map(r => {
        import net.sf.jsqlparser.util.TablesNamesFinder._
        import net.sf.jsqlparser.util.TablesNamesFinder
        import net.sf.jsqlparser.parser.CCJSqlParserUtil
        import net.sf.jsqlparser.statement.select._
        import spark.implicits._
        import scala.collection.JavaConverters._
        var targetTable:String = "exception"
        val originalQuery = r.getString(0)
        try {
            val sQuery = r.getString(0)
            val dateHistoPattern = "date_histogram(?:.*[)])".r
            val sQuery2 = dateHistoPattern.replaceAllIn(sQuery,"date_histogram()")
            val qPattern = raw"(w+-[d.]+)".r
            val queryStr = qPattern.replaceAllIn(sQuery2,"`$1`")
            val stmt = CCJSqlParserUtil.parse(queryStr)
            val sel = stmt.asInstanceOf[Select]
            val tblNamesFinder = new TablesNamesFinder()
            val tblLst = tblNamesFinder.getTableList(sel)
            targetTable = tblLst.asScala.mkString(",")
        }catch {
            case ex: Exception => {
                targetTable = "exception: " + originalQuery
            }
        }
        targetTable
    })
    
    parsedQueryDF.filter(" value not like 'exception%'").createOrReplaceTempView("parsed_query")
    spark.sql("select split(replace(value,'`',''),'-')[0] from parsed_query").distinct.collect.foreach(println)
    
  • 相关阅读:
    前端总结数据结构与算法基础
    Linux 常用命令
    mariadb下载二进制包源码包地址(使用清华)
    centos7添加永久静态路由
    登录普通用户会报错-bash: ulimit: open files: cannot modify limit: Operation not permitted
    编写二进制安装mariadb10.2的ansible-playbook剧本
    su
    URL后面加不加“/”有区别吗?
    RocketMQ在面试中那些常见问题及答案+汇总
    通过实现网站访问计数器带你理解 轻量级锁CAS原理,还学不会算我输!!!
  • 原文地址:https://www.cnblogs.com/hseagle/p/14173712.html
Copyright © 2011-2022 走看看