zoukankan      html  css  js  c++  java
  • scala对复杂json的处理

    本次代码主要侧重为flink stream流解析cannal-json,经过多次实验,发现还是阿里的fastjson较为好用,故在此做记录

    将依赖引入

    <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.33</version>
    </dependency>

    案例数据,json数据:
    {
        "data": [
            {
                "name": "张三", 
                "age": "22", 
                "xb": ""
            }
        ], 
        "database": "test", 
        "es": 1623029629000, 
        "id": 1, 
        "isDdl": false, 
        "mysqlType": {
            "name": "varchar(500)", 
            "age": "int(11)", 
            "xb": "varchar(20)"
        }, 
        "old": null, 
        "pkNames": null, 
        "sql": "", 
        "sqlType": {
            "name": 12, 
            "age": 4, 
            "xb": 12
        }, 
        "table": "mysql_result1", 
        "ts": 1623029681690, 
        "type": "INSERT"
    }
    案例:本次将通过fink stream读取kafka中json数据,然后过滤出插入的数据,并且拿到数据,代码如下
    package it.bigdata.flink.study.test
    
    import java.util.Properties
    
    import com.alibaba.fastjson.JSON
    import it.bigdata.flink.study.test.entity.MysqlResult
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
    
    
    object SteamBinLog {
      def main(args: Array[String]): Unit = {
        //创建执行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        //3.从kafka中读取数据
        val props = new Properties()
        props.setProperty("bootstrap.servers","10.18.35.155:9092,10.18.35.156:9092,10.18.35.157:9092")
        //    props.setProperty("group.id","consumer-group")
        val stream3 = env.addSource(new FlinkKafkaConsumer[String]("test_mysql_result1", new SimpleStringSchema(), props)
        .setStartFromEarliest())
        val stream4 = stream3.map(data => {
          val json= JSON.parseObject(data)
          json
        })
            .filter(_.get("type").equals("INSERT"))
            .map(data=>{
              val json = JSON.parseObject(data.getJSONArray("data").get(0).toString)
              val my = new MysqlResult()
              my.setName(json.getString("name"))
              my.setAge(json.getIntValue("age"))
              my.setXb(json.getString("xb"))
              my
            })
    
        stream4.print()
    
    
        env.execute("stream binlog")
      }
    
    
    }

    下边是报错json的相关操作,复制的原为内容:https://segmentfault.com/a/1190000039415392

    1.将数据转为json

    import org.json4s.JsonDSL._
    import org.json4s.jackson.JsonMethods._
    import org.json4s.jackson.Serialization._
    import org.json4s.jackson.Serialization
    case class WOE(col: String, woe: Map[String, String])
    implicit val formats = Serialization.formats(NoTypeHints)
    val testMap = Map[String, String]()
    testMap += ("1" -> "1.1")
    val a = WOE("1", immutable.Map(testMap.toList:_*))
    println(write(a)) 

    输出{"col":"1","woe":{"1":"1.1"}}

    2、解析json

    implicit val formats = Serialization.formats(NoTypeHints)
    val js =
    """
    {"col":"1","woe":{"1":"1.1"}}
    """
    val ab = parse(js).extract[WOE]
    println(write(ab)) 

    如果是List也可以

    implicit val formats = Serialization.formats(NoTypeHints)
    
        val b = new ListBuffer[WOE]
        val testMap = Map[String, String]()
        testMap += ("1" -> "1.1")
        b += WOE("1", immutable.Map(testMap.toList:_*))
        b += WOE("3", immutable.Map(testMap.toList:_*))
        println(write(b))
    
    val js =
          """
     [{"col":"1","woe":{"1":"1.1"}},{"col":"3","woe":{"1":"1.1"}}]
          """
        val ab = parse(js).extract[List[WOE]]
        println(ab.toString) 

    1、scala自带的Json解析
    scala 2.10(以上,其他版本不清楚)自带Json解析,scala.util.parsing.json.JSON
    object转json

    val testMap = Map[String, String]()
        testMap += ("1" -> "2.034")
        testMap += ("2" -> "2.0134")
        println(scala.util.parsing.json.JSONObject(scala.collection.immutable.Map(testMap.toList: _*))) 

    但好像只能处理map,且map要转成immutable

    2、fastjson

    解析json

    import com.alibaba.fastjson.JSON
    object JsonDemo {
      def main(args: Array[String]) {
        val text = "{"name":"name1", "age":55}"
        val json = JSON.parseObject(text)
        println(json.get("name"))
        println(json.get("age"))
      }
    } 

    再例如

    import com.alibaba.fastjson.JSON
    
    object Json {
      def main(args: Array[String]): Unit = {
        val str2 = "{"et":"kanqiu_client_join","vtm":1435898329434,"body":{"client":"866963024862254","client_type":"android","room":"NBA_HOME","gid":"","type":"","roomid":""},"time":1435898329}"
           val json=JSON.parseObject(str2)
           //获取成员
           val fet=json.get("et")
           //返回字符串成员
           val etString=json.getString("et")
           //返回整形成员
           val vtm=json.getInteger("vtm")
           println(vtm)
           //返回多级成员
           val client=json.getJSONObject("body").get("client")
           println(client) 

    在spark-steaming中,使用fast-json更加稳定,json-lib经常出现莫名问题,而且fastjson的解析速度更快.

    object转json,首先必须要显式的定义参数,否则会报错

    ambiguous reference to overloaded definition
    1
    例如:

    val testMap = Map[String, String]()
    testMap += ("1" -> "2.034")
    testMap += ("2" -> "2.0134")
    val a = JSON.toJSONString(testMap, true)
    println(a) 

    不会报错,但是输出结果是奇怪的

    {
        "empty":false,
        "sizeMapDefined":false,
        "traversableAgain":true
    } 

    3、json4s

    object转json

    val testMap = Map[String, String]()
    testMap += ("1" -> "2.034")
    testMap += ("2" -> "2.0134")
    val jj = compact(render(testMap))
    println(jj) 

    输出

    [{"2":"2.0134"},{"1":"2.034"}]

    如果都是String,复杂的Map结构也可以解析

    val testMap = Map[String, Map[String, String]]()
    val subMap = Map[String, String]()
    subMap += ("1" -> "1.1")
    testMap += ("1" -> subMap)
    println(write(testMap)) 

    输出{"1":{"1":"1.1"}}
    但这样的形式不利于解析

    再例如

    implicit val formats = Serialization.formats(NoTypeHints)
    val m = Map(
          "name" -> "john doe",
          "age" -> 18,
          "hasChild" -> true,
          "childs" -> List(
            Map("name" -> "dorothy", "age" -> 5, "hasChild" -> false),
            Map("name" -> "bill", "age" -> 8, "hasChild" -> false)))
        val mm = Map(
          "1" -> Map ("1"->"1.2")
        )
    println(write(a)) 

    TEST

    package com.dfssi.dataplatform.analysis.exhaust.alarm
    
    import java.sql.Timestamp
    import java.util
    
    import com.alibaba.fastjson.serializer.SerializerFeature
    import org.apache.spark.Logging
    import org.json4s.NoTypeHints
    
    //将要解析得数据
    case class NeedEntity(val vin: String,
                          val downoutput: Double,
                          val collectTime: Long,
                          val lon: Double,
                          val lat: Double,
                          val failureList: java.util.List[Integer] = new util.ArrayList[Integer]()
                         ) extends Serializable
    
    //管理状态
    //这是事件管理得 按照每个事件来处理
    
    class OverLimitEvent(var vin: String,
                         var startTime: Long,
                         var startLon: Double,
                         var startLat: Double,
                         var eventType:String="overlimit",
                          var endTime: Long = 0,
                          var endLon: Double = 0.0,
                          var endLat: Double = 0.0,
                          var minValue: Double = 0.0,
                          var maxValue: Double = 0.0
                             ) extends Serializable with Logging{
    
      def getInsertMap(): Map[String, Any] = {
        Map(
          "vin" -> vin,
          "startTime" -> new Timestamp(startTime),
          "startLon" -> startLon,
          "startLat" -> startLat
        )
      }
    
      def getUpdateMap(): Map[String, Any] = {
        Map(
          "vin" -> vin,
          "startTime" -> new Timestamp(startTime),
          "endTime" -> new Timestamp(startTime),
          "endLon" -> startLon,
          "endLat" -> startLat,
          "maxValue" -> maxValue,
          "minValue" -> minValue
        )
      }
    
      def updateByEntity(entity: NeedEntity) = {
        this.endTime = entity.collectTime
        this.endLat = entity.lat
        this.endLon = entity.lon
        if (this.maxValue != null && this.maxValue < entity.downoutput) {
          this.maxValue = entity.downoutput
        }
        if (this.minValue != null && this.minValue > entity.downoutput) {
          this.minValue = entity.downoutput
        }
    
      }
    
      override  def toString(): String ={
        import org.json4s.jackson.Serialization._
        import org.json4s.jackson.Serialization
        implicit val formats = Serialization.formats(NoTypeHints)
        write(this)
      }
    
    }
    
    object OverLimitEvent {
      val ID_FIELD = Array("vin", "startTime")
      def apply(
                 vin: String,
                 startTime: Long,
                 startLon: Double,
                 startLat: Double,
                 endTime: Long,
                 endLon: Double,
                 endLat: Double,
                 minValue: Double,
                 maxValue: Double
               ): OverLimitEvent = {
        val event = new OverLimitEvent(vin, startTime, startLon, startLat)
        event.endTime = endTime
        event.endLat = endLat
        event.endLon = endLon
        event.maxValue = maxValue
        event.minValue = minValue
        event
      }
    
      def buildByEntity(entity: NeedEntity): OverLimitEvent = {
        new OverLimitEvent(entity.vin, entity.collectTime, entity.lon, entity.lat)
      }
    
      def buildByJson(json: String): OverLimitEvent = {
        com.alibaba.fastjson.JSON.parseObject(json, classOf[OverLimitEvent])
      }
    
      override  def toString(): String ={
        import org.json4s.jackson.Serialization._
        import org.json4s.jackson.Serialization
        implicit val formats = Serialization.formats(NoTypeHints)
        write(this)
      }
    
    }
    
    case class ExhaustAlarmStatus(val vin: String, var overLimitEvent: OverLimitEvent=null,var faultEvent:Map[String,OverLimitEvent]=null, var lastTime: Long) {
      override  def toString(): String ={
        import org.json4s.jackson.Serialization._
        import org.json4s.jackson.Serialization
        implicit val formats = Serialization.formats(NoTypeHints)
        write(this)
      }
    }
    
    object ExhaustAlarmStatus {
      def buildByJson(json: String): ExhaustAlarmStatus = {
        if(json!=null){
            com.alibaba.fastjson.JSON.parseObject(json,
            classOf[ExhaustAlarmStatus])
        }else{
          null
        }
      }
    
      def toJSON(state: ExhaustAlarmStatus): String = com.alibaba.fastjson.JSON.toJSONString(state, SerializerFeature.PrettyFormat)
    
      def main(args: Array[String]): Unit = {
        val json = "{"vin":"222", "OverLimitEvent":{ "vin":"222",  "startTime":123456789, "startLon":1.0, "startLat":1.0, "endTime":123456789, "endLon":1.0, "endLat":1.0, "minValue":1.0, "maxValue":1.0 },"lastTime":1556441242000}";
        val state  = com.alibaba.fastjson.JSON.parseObject(json,
          classOf[ExhaustAlarmStatus])
        println(state.overLimitEvent)
        import org.json4s.JsonDSL._
        import org.json4s.jackson.JsonMethods._
        import org.json4s.jackson.Serialization._
        import org.json4s.jackson.Serialization
        implicit val formats = Serialization.formats(NoTypeHints)
        val jsonstr = write(state)
        println(jsonstr)
      }
    
    }
     
    author@nohert
  • 相关阅读:
    Leetcode NO.110 Balanced Binary Tree 平衡二叉树
    Leetcode NO.226 Invert Binary Tree 翻转二叉树
    Leetcode NO.215 Kth Largest Element In An Array 数组中的第K个最大元素
    根据特征的浏览器判断
    Cygwin在打开在当前目录
    【转帖】科学对待 健康养猫 打造快乐孕妇
    解决chrome浏览器安装扩展、应用程序一直处在“检查中”的问题
    对【SQL SERVER 分布式事务解决方案】的心得补充
    关于“点击这里继续访问您选择的百度XXX”
    VBA一例:如何保持文本框焦点
  • 原文地址:https://www.cnblogs.com/gzgBlog/p/14859689.html
Copyright © 2011-2022 走看看