zoukankan      html  css  js  c++  java
  • scala对复杂json的处理

    本次代码主要侧重为flink stream流解析cannal-json,经过多次实验,发现还是阿里的fastjson较为好用,故在此做记录

    将依赖引入

    <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.33</version>
    </dependency>

    案例数据,json数据:
    {
        "data": [
            {
                "name": "张三", 
                "age": "22", 
                "xb": ""
            }
        ], 
        "database": "test", 
        "es": 1623029629000, 
        "id": 1, 
        "isDdl": false, 
        "mysqlType": {
            "name": "varchar(500)", 
            "age": "int(11)", 
            "xb": "varchar(20)"
        }, 
        "old": null, 
        "pkNames": null, 
        "sql": "", 
        "sqlType": {
            "name": 12, 
            "age": 4, 
            "xb": 12
        }, 
        "table": "mysql_result1", 
        "ts": 1623029681690, 
        "type": "INSERT"
    }
    案例:本次将通过fink stream读取kafka中json数据,然后过滤出插入的数据,并且拿到数据,代码如下
    package it.bigdata.flink.study.test
    
    import java.util.Properties
    
    import com.alibaba.fastjson.JSON
    import it.bigdata.flink.study.test.entity.MysqlResult
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
    
    
    object SteamBinLog {
      def main(args: Array[String]): Unit = {
        //创建执行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        //3.从kafka中读取数据
        val props = new Properties()
        props.setProperty("bootstrap.servers","10.18.35.155:9092,10.18.35.156:9092,10.18.35.157:9092")
        //    props.setProperty("group.id","consumer-group")
        val stream3 = env.addSource(new FlinkKafkaConsumer[String]("test_mysql_result1", new SimpleStringSchema(), props)
        .setStartFromEarliest())
        val stream4 = stream3.map(data => {
          val json= JSON.parseObject(data)
          json
        })
            .filter(_.get("type").equals("INSERT"))
            .map(data=>{
              val json = JSON.parseObject(data.getJSONArray("data").get(0).toString)
              val my = new MysqlResult()
              my.setName(json.getString("name"))
              my.setAge(json.getIntValue("age"))
              my.setXb(json.getString("xb"))
              my
            })
    
        stream4.print()
    
    
        env.execute("stream binlog")
      }
    
    
    }

    下边是报错json的相关操作,复制的原为内容:https://segmentfault.com/a/1190000039415392

    1.将数据转为json

    import org.json4s.JsonDSL._
    import org.json4s.jackson.JsonMethods._
    import org.json4s.jackson.Serialization._
    import org.json4s.jackson.Serialization
    case class WOE(col: String, woe: Map[String, String])
    implicit val formats = Serialization.formats(NoTypeHints)
    val testMap = Map[String, String]()
    testMap += ("1" -> "1.1")
    val a = WOE("1", immutable.Map(testMap.toList:_*))
    println(write(a)) 

    输出{"col":"1","woe":{"1":"1.1"}}

    2、解析json

    implicit val formats = Serialization.formats(NoTypeHints)
    val js =
    """
    {"col":"1","woe":{"1":"1.1"}}
    """
    val ab = parse(js).extract[WOE]
    println(write(ab)) 

    如果是List也可以

    implicit val formats = Serialization.formats(NoTypeHints)
    
        val b = new ListBuffer[WOE]
        val testMap = Map[String, String]()
        testMap += ("1" -> "1.1")
        b += WOE("1", immutable.Map(testMap.toList:_*))
        b += WOE("3", immutable.Map(testMap.toList:_*))
        println(write(b))
    
    val js =
          """
     [{"col":"1","woe":{"1":"1.1"}},{"col":"3","woe":{"1":"1.1"}}]
          """
        val ab = parse(js).extract[List[WOE]]
        println(ab.toString) 

    1、scala自带的Json解析
    scala 2.10(以上,其他版本不清楚)自带Json解析,scala.util.parsing.json.JSON
    object转json

    val testMap = Map[String, String]()
        testMap += ("1" -> "2.034")
        testMap += ("2" -> "2.0134")
        println(scala.util.parsing.json.JSONObject(scala.collection.immutable.Map(testMap.toList: _*))) 

    但好像只能处理map,且map要转成immutable

    2、fastjson

    解析json

    import com.alibaba.fastjson.JSON
    object JsonDemo {
      def main(args: Array[String]) {
        val text = "{"name":"name1", "age":55}"
        val json = JSON.parseObject(text)
        println(json.get("name"))
        println(json.get("age"))
      }
    } 

    再例如

    import com.alibaba.fastjson.JSON
    
    object Json {
      def main(args: Array[String]): Unit = {
        val str2 = "{"et":"kanqiu_client_join","vtm":1435898329434,"body":{"client":"866963024862254","client_type":"android","room":"NBA_HOME","gid":"","type":"","roomid":""},"time":1435898329}"
           val json=JSON.parseObject(str2)
           //获取成员
           val fet=json.get("et")
           //返回字符串成员
           val etString=json.getString("et")
           //返回整形成员
           val vtm=json.getInteger("vtm")
           println(vtm)
           //返回多级成员
           val client=json.getJSONObject("body").get("client")
           println(client) 

    在spark-steaming中,使用fast-json更加稳定,json-lib经常出现莫名问题,而且fastjson的解析速度更快.

    object转json,首先必须要显式的定义参数,否则会报错

    ambiguous reference to overloaded definition
    1
    例如:

    val testMap = Map[String, String]()
    testMap += ("1" -> "2.034")
    testMap += ("2" -> "2.0134")
    val a = JSON.toJSONString(testMap, true)
    println(a) 

    不会报错,但是输出结果是奇怪的

    {
        "empty":false,
        "sizeMapDefined":false,
        "traversableAgain":true
    } 

    3、json4s

    object转json

    val testMap = Map[String, String]()
    testMap += ("1" -> "2.034")
    testMap += ("2" -> "2.0134")
    val jj = compact(render(testMap))
    println(jj) 

    输出

    [{"2":"2.0134"},{"1":"2.034"}]

    如果都是String,复杂的Map结构也可以解析

    val testMap = Map[String, Map[String, String]]()
    val subMap = Map[String, String]()
    subMap += ("1" -> "1.1")
    testMap += ("1" -> subMap)
    println(write(testMap)) 

    输出{"1":{"1":"1.1"}}
    但这样的形式不利于解析

    再例如

    implicit val formats = Serialization.formats(NoTypeHints)
    val m = Map(
          "name" -> "john doe",
          "age" -> 18,
          "hasChild" -> true,
          "childs" -> List(
            Map("name" -> "dorothy", "age" -> 5, "hasChild" -> false),
            Map("name" -> "bill", "age" -> 8, "hasChild" -> false)))
        val mm = Map(
          "1" -> Map ("1"->"1.2")
        )
    println(write(a)) 

    TEST

    package com.dfssi.dataplatform.analysis.exhaust.alarm
    
    import java.sql.Timestamp
    import java.util
    
    import com.alibaba.fastjson.serializer.SerializerFeature
    import org.apache.spark.Logging
    import org.json4s.NoTypeHints
    
    //将要解析得数据
    case class NeedEntity(val vin: String,
                          val downoutput: Double,
                          val collectTime: Long,
                          val lon: Double,
                          val lat: Double,
                          val failureList: java.util.List[Integer] = new util.ArrayList[Integer]()
                         ) extends Serializable
    
    //管理状态
    //这是事件管理得 按照每个事件来处理
    
    class OverLimitEvent(var vin: String,
                         var startTime: Long,
                         var startLon: Double,
                         var startLat: Double,
                         var eventType:String="overlimit",
                          var endTime: Long = 0,
                          var endLon: Double = 0.0,
                          var endLat: Double = 0.0,
                          var minValue: Double = 0.0,
                          var maxValue: Double = 0.0
                             ) extends Serializable with Logging{
    
      def getInsertMap(): Map[String, Any] = {
        Map(
          "vin" -> vin,
          "startTime" -> new Timestamp(startTime),
          "startLon" -> startLon,
          "startLat" -> startLat
        )
      }
    
      def getUpdateMap(): Map[String, Any] = {
        Map(
          "vin" -> vin,
          "startTime" -> new Timestamp(startTime),
          "endTime" -> new Timestamp(startTime),
          "endLon" -> startLon,
          "endLat" -> startLat,
          "maxValue" -> maxValue,
          "minValue" -> minValue
        )
      }
    
      def updateByEntity(entity: NeedEntity) = {
        this.endTime = entity.collectTime
        this.endLat = entity.lat
        this.endLon = entity.lon
        if (this.maxValue != null && this.maxValue < entity.downoutput) {
          this.maxValue = entity.downoutput
        }
        if (this.minValue != null && this.minValue > entity.downoutput) {
          this.minValue = entity.downoutput
        }
    
      }
    
      override  def toString(): String ={
        import org.json4s.jackson.Serialization._
        import org.json4s.jackson.Serialization
        implicit val formats = Serialization.formats(NoTypeHints)
        write(this)
      }
    
    }
    
    object OverLimitEvent {
      val ID_FIELD = Array("vin", "startTime")
      def apply(
                 vin: String,
                 startTime: Long,
                 startLon: Double,
                 startLat: Double,
                 endTime: Long,
                 endLon: Double,
                 endLat: Double,
                 minValue: Double,
                 maxValue: Double
               ): OverLimitEvent = {
        val event = new OverLimitEvent(vin, startTime, startLon, startLat)
        event.endTime = endTime
        event.endLat = endLat
        event.endLon = endLon
        event.maxValue = maxValue
        event.minValue = minValue
        event
      }
    
      def buildByEntity(entity: NeedEntity): OverLimitEvent = {
        new OverLimitEvent(entity.vin, entity.collectTime, entity.lon, entity.lat)
      }
    
      def buildByJson(json: String): OverLimitEvent = {
        com.alibaba.fastjson.JSON.parseObject(json, classOf[OverLimitEvent])
      }
    
      override  def toString(): String ={
        import org.json4s.jackson.Serialization._
        import org.json4s.jackson.Serialization
        implicit val formats = Serialization.formats(NoTypeHints)
        write(this)
      }
    
    }
    
    case class ExhaustAlarmStatus(val vin: String, var overLimitEvent: OverLimitEvent=null,var faultEvent:Map[String,OverLimitEvent]=null, var lastTime: Long) {
      override  def toString(): String ={
        import org.json4s.jackson.Serialization._
        import org.json4s.jackson.Serialization
        implicit val formats = Serialization.formats(NoTypeHints)
        write(this)
      }
    }
    
    object ExhaustAlarmStatus {
      def buildByJson(json: String): ExhaustAlarmStatus = {
        if(json!=null){
            com.alibaba.fastjson.JSON.parseObject(json,
            classOf[ExhaustAlarmStatus])
        }else{
          null
        }
      }
    
      def toJSON(state: ExhaustAlarmStatus): String = com.alibaba.fastjson.JSON.toJSONString(state, SerializerFeature.PrettyFormat)
    
      def main(args: Array[String]): Unit = {
        val json = "{"vin":"222", "OverLimitEvent":{ "vin":"222",  "startTime":123456789, "startLon":1.0, "startLat":1.0, "endTime":123456789, "endLon":1.0, "endLat":1.0, "minValue":1.0, "maxValue":1.0 },"lastTime":1556441242000}";
        val state  = com.alibaba.fastjson.JSON.parseObject(json,
          classOf[ExhaustAlarmStatus])
        println(state.overLimitEvent)
        import org.json4s.JsonDSL._
        import org.json4s.jackson.JsonMethods._
        import org.json4s.jackson.Serialization._
        import org.json4s.jackson.Serialization
        implicit val formats = Serialization.formats(NoTypeHints)
        val jsonstr = write(state)
        println(jsonstr)
      }
    
    }
     
    author@nohert
  • 相关阅读:
    向cmd中添加字体的方法
    学生成绩管理系统C++
    立即抢注我的免费1T云空间
    js原型
    step3 . day8数据结构之算法
    代码练习中的bug及修改方法
    step3 . day7数据结构之二叉顺序数的创建和二叉树的栈形式遍历
    step3 . day6数据结构之非线性表 满二叉树和不完全二叉树
    step3 . day5 数据结构之线性表 栈和队的应用-球钟问题
    step3 . day4 数据结构之线性表 栈和队
  • 原文地址:https://www.cnblogs.com/gzgBlog/p/14859689.html
Copyright © 2011-2022 走看看