zoukankan      html  css  js  c++  java
  • Spark SQL:将嵌套的json类型DataFrame压平

    参考:https://www.soinside.com/question/JjhZCytMUFpTNyk6W7ixZa

    (没找到真正的出处,看拙劣的翻译,应该是从Stack Overflow扒过来的)


    将数据如下的DataFrame压平

    val json_string = """{
                       "Total Value": 3,
                       "Topic": "Example",
                       "values": [
                                  {
                                    "value1": "#example1",
                                    "points": [
                                               [
                                               "123",
                                               "156"
                                              ]
                                        ],
                                    "properties": {
                                     "date": "12-04-19",
                                     "model": "Model example 1"
                                        }
                                     },
                                   {"value2": "#example2",
                                    "points": [
                                               [
                                               "124",
                                               "157"
                                              ]
                                        ],
                                    "properties": {
                                     "date": "12-05-19",
                                     "model": "Model example 2"
                                        }
                                     }
                                  ]
                           }"""

    希望得到如下输出

    +-----------+-----------+----------+------------------+------------------+------------------------+-----------------------------+
    |Total Value| Topic     |values 1 | values.points[0] | values.points[1] | values.properties.date | values.properties.model |
    +-----------+-----------+----------+------------------+------------------+------------------------+-----------------------------+
    | 3         | Example   | example1 | 123              | 156              | 12-04-19               |  Model Example 1         |
    | 3         | Example   | example2 | 124              | 157              | 12-05-19               |    Model example 2         
    +-----------+-----------+----------+------------------+------------------+------------------------+-----------------------------+

    解决办法:

    采用spark in-built函数,尤其是explode函数(此函数在org.apache.spark.sql.functions包下)

    先将第一层“炸”开

    scala> val df = spark.read.json(Seq(json_string).toDS)
    scala> var dfd = df.select($"topic",$"total value",explode($"values").as("values"))

    然后选择第二层中想要的列

    scala> dfd.select($"topic",$"total value",$"values.points".getItem(0)(0).as("point_0"),$"values.points".getItem(0)(1).as("point_1"),$"values.properties.date".as("_date"),$"values.properties.model".as("_model")).show
    +-------+-----------+-------+-------+--------+---------------+
    |  topic|total value|point_0|point_1|   _date|         _model|
    +-------+-----------+-------+-------+--------+---------------+
    |Example|          3|    123|    156|12-04-19|Model example 1|
    |Example|          3|    124|    157|12-05-19|Model example 2|
    +-------+-----------+-------+-------+--------+---------------+

    详细点的示例:

    scala> val str = "{"avg_orders_count":[{"count":1.0,"days":3},{"count":0.6,"days":5},{"count":0.3,"days":10},{"count":0.2,"days":15},{"count":0.1,"days":30},{"count":0.066,"days":45},{"count":0.066,"days":60},{"count":0.053,"days":75},{"count":0.044,"days":90}],"m_hotel_id":"92500636"}"
    str: String = {"avg_orders_count":[{"count":1.0,"days":3},{"count":0.6,"days":5},{"count":0.3,"days":10},{"count":0.2,"days":15},{"count":0.1,"days":30},{"count":0.066,"days":45},{"count":0.066,"days":60},{"count":0.053,"days":75},{"count":0.044,"days":90}],"m_hotel_id":"92500636"}
    
    scala> val rdd = sc.makeRDD(str::Nil)
    rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at makeRDD at <console>:29
    
    
    scala> val df = spark.read.json(rdd)
    
    scala> df.show
    +--------------------+----------+
    |    avg_orders_count|m_hotel_id|
    +--------------------+----------+
    |[[1.0, 3], [0.6, ...|  92500636|
    +--------------------+----------+
    
    scala> df.select($"m_hotel_id",explode($"avg_orders_count")).show
    +----------+-----------+
    |m_hotel_id|        col|
    +----------+-----------+
    |  92500636|   [1.0, 3]|
    |  92500636|   [0.6, 5]|
    |  92500636|  [0.3, 10]|
    |  92500636|  [0.2, 15]|
    |  92500636|  [0.1, 30]|
    |  92500636|[0.066, 45]|
    |  92500636|[0.066, 60]|
    |  92500636|[0.053, 75]|
    |  92500636|[0.044, 90]|
    +----------+-----------+
    
    scala> val dfs = dfd.select($"m_hotel_id",$"ct.count", $"ct.days")
    dfs: org.apache.spark.sql.DataFrame = [m_hotel_id: string, count: double ... 1 more field]
    
    scala> dfs.show
    +----------+-----+----+
    |m_hotel_id|count|days|
    +----------+-----+----+
    |  92500636|  1.0|   3|
    |  92500636|  0.6|   5|
    |  92500636|  0.3|  10|
    |  92500636|  0.2|  15|
    |  92500636|  0.1|  30|
    |  92500636|0.066|  45|
    |  92500636|0.066|  60|
    |  92500636|0.053|  75|
    |  92500636|0.044|  90|
    +----------+-----+----+
  • 相关阅读:
    Echarts——一个简单的嵌套饼图
    Gephi——使用map of countries和Geo Layout实现包含地理坐标的数据可视化
    PMP知识点(五)——资源管理表示方法
    Python——LOL官方商城皮肤信息爬取(一次练手)
    Python——一个简单的进度条的实现
    Python——阶段总结(一)
    PMP知识点(六)——项目经理权利类型
    PMP知识点(五)——配置管理
    Python——使用高德API获取指定城指定类别POI并实现XLSX文件合并
    PMP知识点(一)——风险登记册
  • 原文地址:https://www.cnblogs.com/144823836yj/p/14043732.html
Copyright © 2011-2022 走看看