zoukankan      html  css  js  c++  java
  • spark StructType的应用,用在处理mongoDB keyvalue

    近期在处理mongoDB 数据的时候,遇到了非常奇怪的格式,账号密码的日志都追加在一条记录里面,要取一个密码的时长和所有密码的平均时长就非常繁琐。

     用了各种迭代计算,非常困难,而且printschema出来结构也是不规范的。

    和同事一起研究后用了StructType 效率非常高。

    代码如下:

    import java.sql.{DriverManager, ResultSet}
    
    import mongoDb_foundation_data20180913.url
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.SparkSession
    object devicests_20180916 {
    
     // spark-submit --driver-class-path /usr/local/jdk/lib/mysql-connector-java-5.1.46.jar  --class  "devicests_20180916"  /testdata/u3.jar
      val url = "jdbc:mysql://192.168.1.10:3306/saas?user=saas&password=saas2018"
      //val url = "jdbc:mysql://134.175.180.116:3306/saas?user=saas&password=saas2018"
      val conn = DriverManager.getConnection(url)
    
      def main(args: Array[String]): Unit = {
    
    
    
        val conn = DriverManager.getConnection(url)
        val conf = new SparkConf().setAppName("appName").setMaster("local")
        val sc = new SparkContext(conf)
        val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.debug.maxToStringFields", "200").getOrCreate()
        spark.sql("use saas")
        import spark.implicits._
        import org.apache.spark.sql.types._
        import org.apache.spark.sql.SparkSession
        val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE)
    
      //  val logData=spark.read.textFile("file:////mysqldata/aasdata/2018-08-17/devices_2018-08-17")
    
    //    val log = spark.read.json(logData)
    val prop = new java.util.Properties
    //
    //    log.createOrReplaceTempView("devicests_states")
    
    //
    //
    //   // df.write.mode("append").jdbc(url, "saas.devicests_states", prop)
    //import org.apache.calcite.adapter
        val schema = new StructType()
          .add("__v", StringType)
          .add("_id", new StructType()
            .add("$oid", StringType))
          .add("device_type", StringType)
          .add("hardware_info", new StructType()
            .add("cid", StringType)
            .add("mac", StringType)
            .add("sn", StringType)
            .add("versions", new StructType()
              .add("app_version", StringType)
              .add("hardware_version", StringType)
              .add("zigbee_version", StringType)))
          .add("model_id", StringType)
          .add("name", StringType)
          .add("nickname", StringType)
          .add("parent", StringType)
          .add("services", ArrayType(StringType))
          .add("states", new StructType()
            .add("onoff_line", StringType)
            .add("passwords",
              // spark 默认将 passwords 视为 struct,不便于使用 explode 和 map_values
              // 需要手动定义为 Map[String, Struct]
              MapType(StringType, new StructType()
                .add("description", StringType)
                .add("id", StringType)
                .add("is_default", StringType)
                .add("name", StringType)
                .add("permission", new StructType()
                  .add("begin", StringType)
                  .add("end", StringType)
                  .add("status", StringType))
                .add("status", IntegerType)
                .add("time", StringType)))
            .add("power", StringType))
          .add("status", IntegerType)
          .add("time", StringType)
          .add("uuid", StringType)
    
        spark.read.schema(schema)
          .json(s"file:///mysqldata/aasdata/2018-09-12/devices_2018-09-12")
          .createOrReplaceTempView("devices")
    
        val res = spark.sql(
          """
            |SELECT uuid,
            |    COUNT(passwords.permission) AS count,
            |    AVG(passwords.permission.end - passwords.permission.begin) AS avg
            |FROM
            |    (
            |      SELECT uuid,explode(map_values(states.passwords)) AS passwords
            |      FROM devices
            |    )
            |WHERE
            |    passwords.permission.begin IS NOT NULL
            |    AND passwords.permission.end IS NOT NULL group by uuid""".stripMargin)//.collect.head
        res.write.mode("overwrite").jdbc(url, "saas.res_count_avg", prop)
    ////
    ////    val count = Long(res(0))
    ////    val avg = Double(res(1))
    
      }
    }
  • 相关阅读:
    【电子书】企业级IT运维宝典之GoldenGate实战下载
    10.Oracle Golden Date(ogg)的搭建和管理(转载)
    VMware Workstation 15 Pro 永久激活密钥
    oracle undo表空间增大不释放
    Oracle11g-BBED安装
    alter system/session set events相关知识
    DG环境的日常巡检
    nginx ----http强制跳转https
    转载:Zabbix-(五)监控Docker容器与自定义jvm监控项
    ORACLE备份保留策略(RETENTION POLICY)
  • 原文地址:https://www.cnblogs.com/canyangfeixue/p/9699886.html
Copyright © 2011-2022 走看看