zoukankan      html  css  js  c++  java
  • SparkSql处理嵌套json数据

    一、数据准备:

    {
    "dc_id": "dc-101",
    "source": {
        "sensor-igauge": {
          "id": 10,
          "ip": "68.28.91.22",
          "description": "Sensor attached to the container ceilings",
          "temp":35,
          "c02_level": 1475,
          "geo": {"lat":38.00, "long":97.00}                        
        },
        "sensor-ipad": {
          "id": 13,
          "ip": "67.185.72.1",
          "description": "Sensor ipad attached to carbon cylinders",
          "temp": 34,
          "c02_level": 1370,
          "geo": {"lat":47.41, "long":-122.00}
        },
        "sensor-inest": {
          "id": 8,
          "ip": "208.109.163.218",
          "description": "Sensor attached to the factory ceilings",
          "temp": 40,
          "c02_level": 1346,
          "geo": {"lat":33.61, "long":-111.89}
        },
        "sensor-istick": {
          "id": 5,
          "ip": "204.116.105.67",
          "description": "Sensor embedded in exhaust pipes in the ceilings",
          "temp": 40,
          "c02_level": 1574,
          "geo": {"lat":35.93, "long":-85.46}
        }
      }
    }

    代码示例:

    package spark.project_1
    
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._
    import org.apache.spark.{SparkConf, sql}
    
    /**
      * Author Mr. Guo
      * Create 2018/10/19 - 14:36
      */
    case class DeviceAlert(dcId: String, deviceType: String, ip: String, deviceId: Long, temp: Long, c02_level: Long,
                           lat: Double, lon: Double)
    
    object dispose_json {
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf()
        val ssc = new sql.SparkSession
        .Builder()
          .config(conf)
          .master("local[2]")
          .appName("dispose_json")
          .getOrCreate()
    
        ssc.sparkContext.setLogLevel("error")
        println("--------------------------------------------------------------------")
        //导入隐式转换
        import ssc.implicits._
        val dataDS1 = Seq(
          """
            |{
            |"dc_id": "dc-101",
            |"source": {
            |    "sensor-igauge": {
            |      "id": 10,
            |      "ip": "68.28.91.22",
            |      "description": "Sensor attached to the container ceilings",
            |      "temp":35,
            |      "c02_level": 1475,
            |      "geo": {"lat":38.00, "long":97.00}
            |    },
            |    "sensor-ipad": {
            |      "id": 13,
            |      "ip": "67.185.72.1",
            |      "description": "Sensor ipad attached to carbon cylinders",
            |      "temp": 34,
            |      "c02_level": 1370,
            |      "geo": {"lat":47.41, "long":-122.00}
            |    },
            |    "sensor-inest": {
            |      "id": 8,
            |      "ip": "208.109.163.218",
            |      "description": "Sensor attached to the factory ceilings",
            |      "temp": 40,
            |      "c02_level": 1346,
            |      "geo": {"lat":33.61, "long":-111.89}
            |    },
            |    "sensor-istick": {
            |      "id": 5,
            |      "ip": "204.116.105.67",
            |      "description": "Sensor embedded in exhaust pipes in the ceilings",
            |      "temp": 40,
            |      "c02_level": 1574,
            |      "geo": {"lat":35.93, "long":-85.46}
            |    }
            |  }
            |}
          """.stripMargin).toDS()
        //定义schema
        val schema1 = new StructType()
          .add("dc_id", StringType)
          .add("source",
            MapType(StringType,
              new StructType()
                .add("description", StringType)
                .add("ip", StringType)
                .add("id", LongType)
                .add("temp", LongType)
                .add("c02_level", LongType)
                .add("geo",
                  new StructType()
                    .add("lat", DoubleType)
                    .add("long", DoubleType)
                )
            )
          )
    val df1 = ssc.read.schema(schema1).json(dataDS1.rdd)
        df1.printSchema()
        df1.show(false)
        println("=======================================")
        val explodeDF = df1.select($"dc_id", explode($"source"))
        explodeDF.printSchema()
        explodeDF.show(10, false)
        println("=======================================")
        val notifydevicesDS = explodeDF.select($"dc_id" as "dcId",
          $"key" as "deviceType",
          'value.getItem("ip") as 'ip,
          'value.getItem("id") as 'deviceId,
          'value.getItem("c02_level") as 'c02_level,
          'value.getItem("temp") as 'temp,
          'value.getItem("geo").getItem("lat") as 'lat,
          'value.getItem("geo").getItem("long") as 'lon)
          .as[DeviceAlert]
        notifydevicesDS.printSchema()
        notifydevicesDS.show(20, false)
    
    ssc.stop()
    

    二、数据准备

    {
      "devices": {
         "thermostats": {
            "peyiJNo0IldT2YlIVtYaGQ": {
              "device_id": "peyiJNo0IldT2YlIVtYaGQ",
              "locale": "en-US",
              "software_version": "4.0",
              "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
              "where_name": "Hallway Upstairs",
              "last_connection": "2016-10-31T23:59:59.000Z",
              "is_online": true,
              "can_cool": true,
              "can_heat": true,
              "is_using_emergency_heat": true,
              "has_fan": true,
              "fan_timer_active": true,
              "fan_timer_timeout": "2016-10-31T23:59:59.000Z",
              "temperature_scale": "F",
              "target_temperature_f": 72,
              "target_temperature_high_f": 80,
              "target_temperature_low_f": 65,
              "eco_temperature_high_f": 80,
              "eco_temperature_low_f": 65,
              "away_temperature_high_f": 80,
              "away_temperature_low_f": 65,
              "hvac_mode": "heat",
              "humidity": 40,
              "hvac_state": "heating",
              "is_locked": true,
              "locked_temp_min_f": 65,
              "locked_temp_max_f": 80
              }
            },
            "smoke_co_alarms": {
              "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs": {
                "device_id": "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs",
                "locale": "en-US",
                "software_version": "1.01",
                "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
                "where_name": "Jane's Room",
                "last_connection": "2016-10-31T23:59:59.000Z",
                "is_online": true,
                "battery_health": "ok",
                "co_alarm_state": "ok",
                "smoke_alarm_state": "ok",
                "is_manual_test_active": true,
                "last_manual_test_time": "2016-10-31T23:59:59.000Z",
                "ui_color_state": "gray"
                }
              },
           "cameras": {
            "awJo6rH0IldT2YlIVtYaGQ": {
              "device_id": "awJo6rH",
              "software_version": "4.0",
              "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
              "where_name": "Foyer",
              "is_online": true,
              "is_streaming": true,
              "is_audio_input_enabled": true,
              "last_is_online_change": "2016-12-29T18:42:00.000Z",
              "is_video_history_enabled": true,
              "web_url": "https://home.nest.com/cameras/device_id?auth=access_token",
              "app_url": "nestmobile://cameras/device_id?auth=access_token",
              "is_public_share_enabled": true,
              "activity_zones": { "name": "Walkway", "id": 244083 },
              "last_event": "2016-10-31T23:59:59.000Z"
              }
            }
          }
         }

    代码示例:

    package spark.project_1
    
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._
    import org.apache.spark.{SparkConf, sql}
    
    /**
      * Author Mr. Guo
      * Create 2018/10/19 - 14:36
      */
    case class DeviceAlert(dcId: String, deviceType: String, ip: String, deviceId: Long, temp: Long, c02_level: Long,
                           lat: Double, lon: Double)
    
    object dispose_json {
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf()
        val ssc = new sql.SparkSession
        .Builder()
          .config(conf)
          .master("local[2]")
          .appName("dispose_json")
          .getOrCreate()
    
        ssc.sparkContext.setLogLevel("error")
        println("--------------------------------------------------------------------")
        //导入隐式转换
        import ssc.implicits._
        val dataDS2 = Seq(
          """
            |{
            |  "devices": {
            |     "thermostats": {
            |        "peyiJNo0IldT2YlIVtYaGQ": {
            |          "device_id": "peyiJNo0IldT2YlIVtYaGQ",
            |          "locale": "en-US",
            |          "software_version": "4.0",
            |          "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
            |          "where_name": "Hallway Upstairs",
            |          "last_connection": "2016-10-31T23:59:59.000Z",
            |          "is_online": true,
            |          "can_cool": true,
            |          "can_heat": true,
            |          "is_using_emergency_heat": true,
            |          "has_fan": true,
            |          "fan_timer_active": true,
            |          "fan_timer_timeout": "2016-10-31T23:59:59.000Z",
            |          "temperature_scale": "F",
            |          "target_temperature_f": 72,
            |          "target_temperature_high_f": 80,
            |          "target_temperature_low_f": 65,
            |          "eco_temperature_high_f": 80,
            |          "eco_temperature_low_f": 65,
            |          "away_temperature_high_f": 80,
            |          "away_temperature_low_f": 65,
            |          "hvac_mode": "heat",
            |          "humidity": 40,
            |          "hvac_state": "heating",
            |          "is_locked": true,
            |          "locked_temp_min_f": 65,
            |          "locked_temp_max_f": 80
            |          }
            |        },
            |        "smoke_co_alarms": {
            |          "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs": {
            |            "device_id": "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs",
            |            "locale": "en-US",
            |            "software_version": "1.01",
            |            "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
            |            "where_name": "Jane's Room",
            |            "last_connection": "2016-10-31T23:59:59.000Z",
            |            "is_online": true,
            |            "battery_health": "ok",
            |            "co_alarm_state": "ok",
            |            "smoke_alarm_state": "ok",
            |            "is_manual_test_active": true,
            |            "last_manual_test_time": "2016-10-31T23:59:59.000Z",
            |            "ui_color_state": "gray"
            |            }
            |          },
            |       "cameras": {
            |        "awJo6rH0IldT2YlIVtYaGQ": {
            |          "device_id": "awJo6rH",
            |          "software_version": "4.0",
            |          "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
            |          "where_name": "Foyer",
            |          "is_online": true,
            |          "is_streaming": true,
            |          "is_audio_input_enabled": true,
            |          "last_is_online_change": "2016-12-29T18:42:00.000Z",
            |          "is_video_history_enabled": true,
            |          "web_url": "https://home.nest.com/cameras/device_id?auth=access_token",
            |          "app_url": "nestmobile://cameras/device_id?auth=access_token",
            |          "is_public_share_enabled": true,
            |          "activity_zones": { "name": "Walkway", "id": 244083 },
            |          "last_event": "2016-10-31T23:59:59.000Z"
            |          }
            |        }
            |      }
            |     }
          """.stripMargin).toDS()
    
    val schmea2 = new StructType()
          .add("devices",
            new StructType()
              .add("thermostats", MapType(StringType,
                new StructType()
                  .add("device_id", StringType)
                  .add("locale", StringType)
                  .add("software_version", StringType)
                  .add("structure_id", StringType)
                  .add("where_name", StringType)
                  .add("last_connection", StringType)
                  .add("is_online", BooleanType)
                  .add("can_cool", BooleanType)
                  .add("can_heat", BooleanType)
                  .add("is_using_emergency_heat", BooleanType)
                  .add("has_fan", BooleanType)
                  .add("fan_timer_active", BooleanType)
                  .add("fan_timer_timeout", StringType)
                  .add("temperature_scale", StringType)
                  .add("target_temperature_f", DoubleType)
                  .add("target_temperature_high_f", DoubleType)
                  .add("target_temperature_low_f", DoubleType)
                  .add("eco_temperature_high_f", DoubleType)
                  .add("eco_temperature_low_f", DoubleType)
                  .add("away_temperature_high_f", DoubleType)
                  .add("away_temperature_low_f", DoubleType)
                  .add("hvac_mode", StringType)
                  .add("humidity", DoubleType)
                  .add("hvac_state", StringType)
                  .add("is_locked", BooleanType)
                  .add("locked_temp_min_f", DoubleType)
                  .add("locked_temp_max_f", DoubleType)))
              .add("smoke_co_alarms", MapType(StringType,
                new StructType()
                  .add("device_id", StringType)
                  .add("locale", StringType)
                  .add("software_version", StringType)
                  .add("structure_id", StringType)
                  .add("where_name", StringType)
                  .add("last_connection", StringType)
                  .add("is_online", BooleanType)
                  .add("battery_health", StringType)
                  .add("co_alarm_state", StringType)
                  .add("smoke_alarm_state", StringType)
                  .add("is_manual_test_active", BooleanType)
                  .add("last_manual_test_time", StringType)
                  .add("ui_color_state", StringType)))
              .add("cameras", MapType(StringType,
                new StructType()
                  .add("device_id", StringType)
                  .add("software_version", StringType)
                  .add("structure_id", StringType)
                  .add("where_name", StringType)
                  .add("is_online", BooleanType)
                  .add("is_streaming", BooleanType)
                  .add("is_audio_input_enabled", BooleanType)
                  .add("last_is_online_change", StringType)
                  .add("is_video_history_enabled", BooleanType)
                  .add("web_url", StringType)
                  .add("app_url", StringType)
                  .add("is_public_share_enabled", BooleanType)
                  .add("activity_zones",
                    new StructType()
                      .add("name", StringType)
                      .add("id", LongType))
                  .add("last_event", StringType))))
    
    val df2 = ssc.read.schema(schmea2).json(dataDS2.rdd)
        val stringJsonDF = df2.select(to_json(struct($"*"))).toDF("nestDevice")
        val mapColumnsDF = df2.select($"devices".getItem("smoke_co_alarms").alias("smoke_alarms"),
          $"devices".getItem("cameras").alias("cameras"),
          $"devices".getItem("thermostats").alias("thermostats"))
    
        val explodeThermostatsDF = mapColumnsDF.select(explode($"thermostats"))
        val explodeCamerasDF = mapColumnsDF.select(explode($"cameras"))
        val explodeSmokedAlarmsDF = df2.select(explode($"devices.smoke_co_alarms"))
        explodeSmokedAlarmsDF.printSchema()
    
        val thermostateDF =
          explodeThermostatsDF.select($"value".getItem("device_id").alias("device_id"),
            $"value".getItem("locale").alias("locale"),
            $"value".getItem("where_name").alias("location"),
            $"value".getItem("last_connection").alias("last_connection"),
            $"value".getItem("humidity").alias("humidity"),
            $"value".getItem("target_temperature_f").alias("target_temperature_f"),
            $"value".getItem("hvac_mode").alias("mode"),
            $"value".getItem("software_version").alias("version"))
    
        val cameraDF =
          explodeCamerasDF.select($"value".getItem("device_id").alias("device_id"),
            $"value".getItem("where_name").alias("location"),
            $"value".getItem("software_version").alias("version"),
            $"value".getItem("activity_zones").getItem("name").alias("name"),
            $"value".getItem("activity_zones").getItem("id").alias("id"))
    
        val smokedAlarmsDF =
          explodeSmokedAlarmsDF.select($"value".getItem("device_id").alias("device_id"),
            $"value".getItem("where_name").alias("location"),
            $"value".getItem("software_version").alias("version"),
            $"value".getItem("last_connection").alias("last_connected"),
            $"value".getItem("battery_health").alias("battery_health"))
    
        cameraDF.show
    
        val joineDFs = thermostateDF.join(cameraDF,"version")
        joineDFs.show(10,false)
    
        ssc.stop()
    

      

  • 相关阅读:
    正则表达式
    centos搭建好了lamp,能访问.html文件,无法访问.php文件
    错误:rpmdb: BDB0113 Thread/process 15381/140029102753600 failed: BDB1507 Thread died in Berkeley DB library 错误:db5 错误(-30973) 来自 dbenv->failchk:BDB0087 DB_RUNRECOVERY: Fatal error, run dat
    wget和yum下载慢,更换阿里镜像源
    TypeError: Cannot read property 'getUserMedia' of undefined
    vscode使用SFTP同步代码到Windows上vmvare搭建的centos服务器上
    react-native-vector-icons的使用说明
    react-native安装和使用tabbar
    Super expression must either be null or a function, not undefined _inherits
    用链表排序,并删除指定数字
  • 原文地址:https://www.cnblogs.com/Gxiaobai/p/9818334.html
Copyright © 2011-2022 走看看