zoukankan      html  css  js  c++  java
  • sparksql系列(十) hive map嵌套struct、struct嵌套array、array嵌套struct

    hive简单的数据结构像基本类型一样,处理起来没有难度。
    但是hive有复杂的数据结构如struct、map、array等,处理起来较为复杂了,下面简单介绍一下用spark处理hive复杂数据结构。

    struct结构

     熟悉C语言的同学可能会对这个比较熟悉。C语言中稍微复杂的类型都是由struct构成的。sttuct可以包含基本类型,也可以包含复杂类型。是较为常用的hive数据类型之一

    hive建表语句

    drop table appopendetail;
    create table if not exists appopendetail
    (
    username String,
    appname String,
    opencount INT
    )
    row format delimited fields terminated by '|'
    location '/hive/table/appopendetail';

    drop table appopentable;
    create table if not exists appopentable
    (
    username String,
    appopen struct<appname:String,opencount:INT>
    )
    row format delimited fields terminated by '|'
    COLLECTION ITEMS TERMINATED BY ':'
    location '/hive/table/appopen';

    sql生成struct

    insert into appopentable
    select username,named_struct('appname',appname,'opencount',opencount) as appopen from appopendetail;

    sql解析struct

    select appopen.appname as appname,appopen.opencount as opencount from appopentable ;

    spark生成struct

    import java.util
    import org.apache.spark.api.java.JavaSparkContext
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    val sparkSession= SparkSession.builder().master("local").appName("AppName").enableHiveSupport().getOrCreate()
    val javasc = new JavaSparkContext(sparkSession.sparkContext)
    val nameRDD = javasc.parallelize(util.Arrays.asList("{'username':'wangwu','opencount':'18','appname':'weixin'}",
    "{'username':'sunliu','opencount':'19','appname':'qq'}","{'username':'zhangsan','opencount':'20','appname':'brower'}"));
    val namedf = sparkSession.read.json(nameRDD)
    namedf.select(col("username"),concat_ws(":",col("appname"),col("opencount")))
    .write.option("sep","|").csv("/software/java/data/mm")

    简单描述一下就是拼接列将拼接的字段用“:”链接起来 。

    sparksql有直接的struct函数,但是hive文件最终的数据是文本格式的,sparksql不支持将struct保存为文本格式。

    spark解析struct

    1.spark.sql(“select appopen.appname as appname,appopen.opencount as opencount from appopentable”)

    map结构

    其实本质上和struct结构是差不多的

    hive建表语句

    drop table appopendetail;
    create table if not exists appopendetail
    (
    username String,
    appname String,
    opencount INT
    )
    row format delimited fields terminated by '|'
    location '/hive/table/appopendetail';

    drop table appopentable;
    create table if not exists appopentable
    (
    username String,
    appopen map<String,INT>
    )
    row format delimited fields terminated by '|'
    COLLECTION ITEMS TERMINATED BY ':'
    MAP KEYS TERMINATED BY ' '
    location '/hive/table/appopen';

    sql生成map

    insert into appopentable select username,map('appname',appname,'opencount',opencount) as appopen from appopendetail;

    sql解析map

    select appopen["appname"],appopen["opencount"] from appopentable ;

    spark生成map

    import java.util
    import org.apache.spark.api.java.JavaSparkContext
    import org.apache.spark.sql.{SaveMode, SparkSession}
    import org.apache.spark.sql.functions._
    val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()
    val javasc = new JavaSparkContext(sparkSession.sparkContext)
    val nameRDD = javasc.parallelize(util.Arrays.asList("{'username':'wangwu','opencount':'18','appname':'weixin'}",
    "{'username':'sunliu','opencount':'19','appname':'qq'}","{'username':'zhangsan','opencount':'20','appname':'brower'}"));
    val namedf = sparkSession.read.json(nameRDD).select(col("appname"),lit("appname") as "appnamekey",col("opencount"),lit("opencount") as "opencountkey" ,col("username"))
    namedf.select(col("username"),concat_ws(":",
    (" ",col("appnamekey"),col("appname")),
    concat_ws(" ",col("opencountkey"),col("opencount"))
    ))

    .write.mode(SaveMode.Overwrite).option("sep","|").csv("/software/java/data/mm")

    sparksql有直接的map函数,但是hive文件最终的数据是文本格式的,sparksql不支持将map保存为文本格式。

    spark解析map

    1.spark.sql(“select appopen["appname"],appopen["opencount"] from appopentable”)

    array结构

    hive建表语句

    drop table appopendetail;
    create table if not exists appopendetail
    (
    username String,
    appname String,
    opencount INT
    )
    row format delimited fields terminated by '|'
    location '/hive/table/appopendetail';

    drop table appopentable;
    create table if not exists appopentable
    (
    username String,
    array array<String>
    )
    row format delimited fields terminated by '|'
    COLLECTION ITEMS TERMINATED BY ':'
    MAP KEYS TERMINATED BY ' '
    location '/hive/table/appopen';

    sql生成array

    insert into appopentable select username,collect_list(appname) as appopen from appopendetail group by username;

    sql解析array

    select appopen[0] from appopentable ;

    spark生成array

    方法一:

    import java.util
    import org.apache.spark.api.java.JavaSparkContext
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()
    val javasc = new JavaSparkContext(sparkSession.sparkContext)
    val nameRDD = javasc.parallelize(util.Arrays.asList("{'username':'wangwu','opencount':'18','appname':'weixin'}",
    "{'username':'wangwu','opencount':'19','appname':'qq'}","{'username':'wangwu','opencount':'20','appname':'brower'}"));
    val namedf = sparkSession.read.json(nameRDD).select("username","appname")
    .groupBy("username")
    .agg(collect_list("appname") as "appname")
    .select(col("username"),concat_ws(":",col("appname")))

    .repartition(1).write.mode(SaveMode.Overwrite).option("sep","|").csv("/software/java/data/mm")

    简单描述一下就是拼接列将拼接的字段用“:”链接起来 

    spark解析array

    1.spark.sql(“select appopen[0] from appopentable“)

    struct组合map array 结构

    hive建表语句

    drop table appopendetail;
    create table if not exists appopendetail
    (
    username String,
    appname String,
    opencount INT
    )
    row format delimited fields terminated by '|'
    location '/hive/table/appopendetail';

    create table if not exists appopentablestruct_map
    (
    struct_map struct<appname:String,opencount:map<String,String>>
    )
    row format delimited fields terminated by '|'
    COLLECTION ITEMS TERMINATED BY ':'
    MAP KEYS TERMINATED BY ' '
    location '/hive/table/appopentablestruct_map';

    create table if not exists appopentablestruct_array
    (
    struct_array struct<appname:String,opencount:array<INT>>
    )
    row format delimited fields terminated by '|'
    COLLECTION ITEMS TERMINATED BY ':'
    MAP KEYS TERMINATED BY ' '
    location '/hive/table/appopentablestruct_array';

    sql生成struct嵌套

    insert into appopentablestruct_map select named_struct('appname',appname,'opencount',map('appname',appname,'opencount',opencount)) as struct_map from appopendetail;
    insert into appopentablestruct_array select named_struct('appname',appname,'opencount',collect_list(opencount)) as struct_map from appopendetail group by appname;

    sql解析struct嵌套

    select struct_map.appname,struct_map.opencount,struct_map.opencount["appname"],struct_map.opencount["opencount"] from appopentablestruct_map;
    select struct_array.appname,struct_array.opencount,struct_array.opencount[0] from appopentablestruct_array;

    spark生成struct嵌套

    方法一:

    import java.util
    import org.apache.spark.api.java.JavaSparkContext
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()
    val javasc = new JavaSparkContext(sparkSession.sparkContext)
    val nameRDD = javasc.parallelize(util.Arrays.asList("{'username':'wangwu','opencount':'18','appname':'weixin'}",
    "{'username':'wangwu','opencount':'19','appname':'qq'}","{'username':'wangwu','opencount':'20','appname':'brower'}"));
    val namedf = sparkSession.read.json(nameRDD)
    namedf.select(col("appname"),col("username"),lit("appname") as "appnamekey",
    col("opencount"),lit("opencount") as "opencountkey")

    .select(concat_ws(":",col("appname"),
    concat_ws(" ",
    concat_ws("u0004",col("appnamekey"),col("appname")),
    concat_ws("u0004",col("opencountkey"),col("opencount"))
    )))
    .repartition(1).write.mode(SaveMode.Overwrite).option("sep","|").csv("/software/java/data/appopentablestruct_map")

    namedf.select(col("appname"),col("username"),lit("appname") as "appnamekey",
    col("opencount"),lit("opencount") as "opencountkey")
    .groupBy("appname")
    .agg(concat_ws(":",col("appname"),concat_ws(" ",collect_list("opencount") as "opencount")) as "opencount" )
    .select("opencount")
    .repartition(1).write.mode(SaveMode.Overwrite).option("sep","|").csv("/software/java/data/appopentablestruct_array")

    简单描述一下就是拼接列将拼接的字段用“:”链接起来 

    spark解析struct嵌套

    1.spark.sql(“select struct_map.appname,struct_map.opencount,struct_map.opencount["appname"],struct_map.opencount["opencount"] from appopentablestruct_map“)

    2.spark.sql(“select struct_array.appname,struct_array.opencount,struct_array.opencount[0] from appopentablestruct_array“)

    map组合struct array 结构

    hive建表语句

    drop table appopendetail;
    create table if not exists appopendetail
    (
    username String,
    appname String,
    opencount INT
    )
    row format delimited fields terminated by '|'
    location '/hive/table/appopendetail';

    drop table appopentablemap_struct;
    drop table appopentablestruct_array;
    create table if not exists appopentablemap_struct
    (
    map_struct map<String,struct<appname:String,opencount:INT>>
    )
    row format delimited fields terminated by '|'
    COLLECTION ITEMS TERMINATED BY ':'
    MAP KEYS TERMINATED BY ' '
    location '/hive/table/appopentablemap_struct';

    create table if not exists appopentablestruct_array
    (
    map_array map<String,array<INT>>
    )
    row format delimited fields terminated by '|'
    COLLECTION ITEMS TERMINATED BY ':'
    MAP KEYS TERMINATED BY ' '
    location '/hive/table/appopentablestruct_array';

    sql生成map嵌套

    insert into appopentablemap_struct 

    select map('map_struct',map_struct) from (select named_struct('appname',appname,'opencount',opencount) as map_struct from appopendetail)t;

    insert into appopentablestruct_array
    select map('map_array',appopen) from (select username,collect_list(opencount) as appopen from appopendetail group by username)t;

    sql解析map嵌套

    select map_struct["map_struct"].appname,map_struct["map_struct"].opencount from appopentablemap_struct;
    select map_array["map_array"][0],map_array["map_array"][1] from appopentablestruct_array;

    spark生成map嵌套

    方法一:

    import java.util
    import org.apache.spark.api.java.JavaSparkContext
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()
    val javasc = new JavaSparkContext(sparkSession.sparkContext)
    val nameRDD = javasc.parallelize(util.Arrays.asList("{'username':'wangwu','opencount':'18','appname':'weixin'}",
    "{'username':'wangwu','opencount':'19','appname':'qq'}","{'username':'wangwu','opencount':'20','appname':'brower'}"));
    val namedf = sparkSession.read.json(nameRDD)
    namedf.select(col("appname"),col("username"),lit("appname") as "appnamekey",
    col("opencount"),lit("opencount") as "opencountkey")
    .select(concat_ws(" ",col("appnamekey"),
    concat_ws("u0004",col("appname"),col("opencount")))
    )
    .repartition(1).write.mode(SaveMode.Overwrite).option("sep","|").csv("/software/java/data/appopentablemap_struct")

    namedf
    .select(col("appname"),col("username"),lit("appname") as "appnamekey",
    col("opencount"),lit("opencount") as "opencountkey")
    .groupBy("appname")
    .agg(concat_ws(" ",col("appname"),concat_ws("u0004",collect_list("opencount") as "opencount")) as "opencount" )
    .select("opencount")

    简单描述一下就是拼接列将拼接的字段用“:”链接起来 

    spark解析map嵌套

    1.spark.sql(“select map_struct["map_struct"].appname,map_struct["map_struct"].opencount from appopentablemap_struct“)

    2.spark.sql(“select map_array["map_array"][0],map_array["map_array"][1] from appopentablestruct_array“)

    array组合struct map 结构

    hive建表语句

    drop table appopendetail;
    create table if not exists appopendetail
    (
    username String,
    appname String,
    opencount INT
    )
    row format delimited fields terminated by '|'
    location '/hive/table/appopendetail';

    drop table appopentablearray_map;
    create table if not exists appopentablearray_map
    (
    array_map array<map<String,String>>
    )
    row format delimited fields terminated by '|'
    COLLECTION ITEMS TERMINATED BY ':'
    MAP KEYS TERMINATED BY ' '
    location '/hive/table/appopentablearray_map';
    drop table appopentablearray_struct;
    create table if not exists appopentablearray_struct
    (
    array_struct array<struct<appname:String,opencount:String>>
    )
    row format delimited fields terminated by '|'
    COLLECTION ITEMS TERMINATED BY ':'
    MAP KEYS TERMINATED BY ' '
    location '/hive/table/appopentablearray_struct';

    sql生成array嵌套

    insert into appopentablearray_map
    select collect_list(openmap) as openmap from
    (select appname,map('appname',appname,'opencount',opencount) as openmap from appopendetail)t;

    insert into appopentablearray_struct
    select collect_list(openmap) as openmap from
    (select appname,named_struct('appname',appname,'opencount',cast(opencount as string)) as openmap from appopendetail)t;

    sql解析array嵌套

    select array_map[0]["appname"],array_map[0]["opencount"] from appopentablearray_map;
    select array_struct[0].appname,array_struct[0].opencount from appopentablearray_struct;

    spark生成array嵌套

    方法一:

    import java.util
    import org.apache.spark.api.java.JavaSparkContext
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()
    val javasc = new JavaSparkContext(sparkSession.sparkContext)
    val nameRDD = javasc.parallelize(util.Arrays.asList("{'username':'wangwu','opencount':'18','appname':'weixin'}",
    "{'username':'wangwu','opencount':'19','appname':'qq'}","{'username':'wangwu','opencount':'20','appname':'brower'}"));
    val namedf = sparkSession.read.json(nameRDD)
    namedf.select(col("appname"),col("username"),lit("appname") as "appnamekey",
    col("opencount"),lit("opencount") as "opencountkey")
    .agg(concat_ws(":",collect_list(
    concat_ws(" ",
    concat_ws("u0004",col("appnamekey"),col("appname")),
    concat_ws("u0004",col("opencountkey"),col("opencount"))
    )
    ))as "openmap")
    .select("openmap")

    namedf.select(col("appname"),col("username"),lit("appname") as "appnamekey",
    col("opencount"),lit("opencount") as "opencountkey")
    .agg(concat_ws(":",collect_list(
    concat_ws(" ",col("appname"),col("opencount"))
    ))as "openmap")
    .select("openmap")

    spark解析array嵌套

    1.spark.sql(“select array_map[0]["appname"],array_map[0]["opencount"] from appopentablearray_map“)

    2.spark.sql(“select array_struct[0].appname,array_struct[0].opencount from appopentablearray_struct“)

  • 相关阅读:
    修改微信电脑版的字体
    今天把自己的ocr镜像开源了
    写点恐怖小说为自己打call
    分享一波目前写的最强的autohotkey 插件
    分享一个我改进过的ocr算法
    从git源码安装zabbix-agent
    windows bat更改系统时间 & 同步internet时间
    Jmeter执行多条Mysql语句报错
    性能测试图片总结
    Jmeter beanshell 生成手机号加密签名
  • 原文地址:https://www.cnblogs.com/wuxiaolong4/p/13200674.html
Copyright © 2011-2022 走看看