zoukankan      html  css  js  c++  java
  • sparksql系列(七) Json转Map,多文件生成

            公司所有产品均是json数据上报给数仓使用,由于格式的不统一造成数据处理很麻烦,经过讨论将公共字段抽取出来,将业务线自己的字段放在 extends字段里面各个业务线的人自己写sql解析extends字段处理。里面涉及到一个json转map的知识点再此记录一下。        

    一:JSON转Map

    为什需要将JSON转Map

            公司里面产品很多,上报的数据很多,格式极其不规范同名的事情是常有的,对于解析来说是非常困难的,需要统一的脚本把字段解析出来。

            上报的数据类似:{"id":"7","sex":"7","data":{"sex":"13","class":"7"}}

    jar包导入

            我们使用fastjson来将json处理成Map的数据结构

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>

    数据

            {"id":"7","sex":"7","da","data":{"name":"7","class":"7","data":{"name":"7","class":"7"}}}
            {"id":"8","name":"8","data":{"sex":"8","class":"8"},"data":{"sex":"8","class":"8"}}
            {"class":"9","data":{"name":"9","sex":"9"}}
            {"id":"10","name":"10","data":{"sex":"10","class":"10"}}
            {"id":"11","class":"11","data":{"name":"11","sex":"11"}}

    代码

            import org.apache.spark.sql.SparkSession
            import com.alibaba.fastjson.JSON
            import java.util

            //我们把例子中的id单独提取出来,将其余字段保留到extends里面
            val sparkSession= SparkSession.builder().master("local").getOrCreate()
            val nameRDD1df = sparkSession.read.textFile("/software/java/idea/data")

            import sparkSession.implicits._
            import org.apache.spark.sql.functions.col
            val finalResult = nameRDD1df.map(x=>{
                    var map:util.HashMap[String, Object] = new util.HashMap[String, Object]()
                    try{
                            map = JSON.parseObject(x, classOf[util.HashMap[String, Object]])
                    }catch {case e :Exception =>{ println(e.printStackTrace())}}

                    var finalMap:util.HashMap[String, Object] = if(map.containsKey("data")){

                            var dataMap:util.HashMap[String, Object] = new util.HashMap[String, Object]()
                            try{
                                    dataMap = JSON.parseObject(map.get("data").toString, classOf[util.HashMap[String, Object]])
                            }catch {case e :Exception =>{ println(e.printStackTrace())}}
                            dataMap.putAll(map);dataMap.remove("id");dataMap.remove("data");
                            dataMap
                    }else {new util.HashMap[String, Object]()}
                    val id = if(map.get("id") == null) "" else map.get("id").toString
                    (id,JSON.toJSONString(finalMap,false))
            })
            .toDF("id","extends")
            .filter(col("id") =!= "")

            finalResult.show(10,false)

    二:多文件生成

            很多时候我们使用sparksql,就是读取一个目录生成一个目录,但是真正使用的时候,会有读取多个目录生成多个目录的需求(数据里面有ID字段的作为区分),这次使用了这个,记录下来。其实本质是partitionBy

     sparksql--->>>partitionBy

        import org.apache.spark.sql.SparkSession
        val sparkSession= SparkSession.builder().master("local").getOrCreate()
    val nameRDD1df = sparkSession.read.json("/software/java/idea/data")
    .select("id","name")
    .write.mode(SaveMode.Append).partitionBy("id")
    .json("/software/java/idea/end")

     spark-core--->>>自定义函数

        import org.apache.spark.sql.SparkSession
    import org.apache.hadoop.fs.{FileSystem, Path}

    val sparkSession= SparkSession.builder().master("local").getOrCreate()
    val sparkContext = sparkSession.sparkContext
    val fileSystem = FileSystem.get(sparkContext.hadoopConfiguration)
    fileSystem.delete(new Path("/software/java/idea/end"), true)

    sparkContext.textFile("/software/java/idea/data").map(x=>{
    val array = x.split("\|")
    ((array(0)+"="+array(1)),array(2))
    }).saveAsHadoopFile("/software/java/idea/end",classOf[String],classOf[String],classOf[RDDMultipleTextOutputFormat[_, _]])
        import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
    class RDDMultipleTextOutputFormat[K, V]() extends MultipleTextOutputFormat[K, V]() {
    override def generateFileNameForKeyValue(key: K, value: V, name: String) : String = {
    (key + "/" + name)
    }
    }
  • 相关阅读:
    特征工程
    TensorFlow学习之路1-TensorFlow介绍
    深度学习中数据的augmentation
    求解矩阵特征值及特征向量
    Faster R-CNN
    python的浅拷贝和深拷贝
    AirSim的搭建和使用
    C++11 binary Tree
    win10 开启ubuntu
    c++ priority_queue
  • 原文地址:https://www.cnblogs.com/wuxiaolong4/p/12590473.html
Copyright © 2011-2022 走看看