zoukankan      html  css  js  c++  java
  • sparksql系列(七) Json转Map,多文件生成

            公司所有产品均是json数据上报给数仓使用,由于格式的不统一造成数据处理很麻烦,经过讨论将公共字段抽取出来,将业务线自己的字段放在 extends字段里面各个业务线的人自己写sql解析extends字段处理。里面涉及到一个json转map的知识点再此记录一下。        

    一:JSON转Map

    为什需要将JSON转Map

            公司里面产品很多,上报的数据很多,格式极其不规范同名的事情是常有的,对于解析来说是非常困难的,需要统一的脚本把字段解析出来。

            上报的数据类似:{"id":"7","sex":"7","data":{"sex":"13","class":"7"}}

    jar包导入

            我们使用fastjson来将json处理成Map的数据结构

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>

    数据

            {"id":"7","sex":"7","da","data":{"name":"7","class":"7","data":{"name":"7","class":"7"}}}
            {"id":"8","name":"8","data":{"sex":"8","class":"8"},"data":{"sex":"8","class":"8"}}
            {"class":"9","data":{"name":"9","sex":"9"}}
            {"id":"10","name":"10","data":{"sex":"10","class":"10"}}
            {"id":"11","class":"11","data":{"name":"11","sex":"11"}}

    代码

            import org.apache.spark.sql.SparkSession
            import com.alibaba.fastjson.JSON
            import java.util

            //我们把例子中的id单独提取出来,将其余字段保留到extends里面
            val sparkSession= SparkSession.builder().master("local").getOrCreate()
            val nameRDD1df = sparkSession.read.textFile("/software/java/idea/data")

            import sparkSession.implicits._
            import org.apache.spark.sql.functions.col
            val finalResult = nameRDD1df.map(x=>{
                    var map:util.HashMap[String, Object] = new util.HashMap[String, Object]()
                    try{
                            map = JSON.parseObject(x, classOf[util.HashMap[String, Object]])
                    }catch {case e :Exception =>{ println(e.printStackTrace())}}

                    var finalMap:util.HashMap[String, Object] = if(map.containsKey("data")){

                            var dataMap:util.HashMap[String, Object] = new util.HashMap[String, Object]()
                            try{
                                    dataMap = JSON.parseObject(map.get("data").toString, classOf[util.HashMap[String, Object]])
                            }catch {case e :Exception =>{ println(e.printStackTrace())}}
                            dataMap.putAll(map);dataMap.remove("id");dataMap.remove("data");
                            dataMap
                    }else {new util.HashMap[String, Object]()}
                    val id = if(map.get("id") == null) "" else map.get("id").toString
                    (id,JSON.toJSONString(finalMap,false))
            })
            .toDF("id","extends")
            .filter(col("id") =!= "")

            finalResult.show(10,false)

    二:多文件生成

            很多时候我们使用sparksql,就是读取一个目录生成一个目录,但是真正使用的时候,会有读取多个目录生成多个目录的需求(数据里面有ID字段的作为区分),这次使用了这个,记录下来。其实本质是partitionBy

     sparksql--->>>partitionBy

        import org.apache.spark.sql.SparkSession
        val sparkSession= SparkSession.builder().master("local").getOrCreate()
    val nameRDD1df = sparkSession.read.json("/software/java/idea/data")
    .select("id","name")
    .write.mode(SaveMode.Append).partitionBy("id")
    .json("/software/java/idea/end")

     spark-core--->>>自定义函数

        import org.apache.spark.sql.SparkSession
    import org.apache.hadoop.fs.{FileSystem, Path}

    val sparkSession= SparkSession.builder().master("local").getOrCreate()
    val sparkContext = sparkSession.sparkContext
    val fileSystem = FileSystem.get(sparkContext.hadoopConfiguration)
    fileSystem.delete(new Path("/software/java/idea/end"), true)

    sparkContext.textFile("/software/java/idea/data").map(x=>{
    val array = x.split("\|")
    ((array(0)+"="+array(1)),array(2))
    }).saveAsHadoopFile("/software/java/idea/end",classOf[String],classOf[String],classOf[RDDMultipleTextOutputFormat[_, _]])
        import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
    class RDDMultipleTextOutputFormat[K, V]() extends MultipleTextOutputFormat[K, V]() {
    override def generateFileNameForKeyValue(key: K, value: V, name: String) : String = {
    (key + "/" + name)
    }
    }
  • 相关阅读:
    I NEED A OFFER!
    水题 Codeforces Round #303 (Div. 2) A. Toy Cars
    模拟 HDOJ 5099 Comparison of Android versions
    模拟 HDOJ 5095 Linearization of the kernel functions in SVM
    贪心 HDOJ 5090 Game with Pearls
    Kruskal HDOJ 1863 畅通工程
    Kruskal HDOJ 1233 还是畅通工程
    并查集 HDOJ 1232 畅通工程
    DFS/并查集 Codeforces Round #286 (Div. 2) B
    水题 Codeforces Round #286 (Div. 2) A Mr. Kitayuta's Gift
  • 原文地址:https://www.cnblogs.com/wuxiaolong4/p/12590473.html
Copyright © 2011-2022 走看看