本次代码主要侧重为flink stream流解析cannal-json,经过多次实验,发现还是阿里的fastjson较为好用,故在此做记录
将依赖引入
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.33</version>
</dependency>
案例数据,json数据:
{ "data": [ { "name": "张三", "age": "22", "xb": "男" } ], "database": "test", "es": 1623029629000, "id": 1, "isDdl": false, "mysqlType": { "name": "varchar(500)", "age": "int(11)", "xb": "varchar(20)" }, "old": null, "pkNames": null, "sql": "", "sqlType": { "name": 12, "age": 4, "xb": 12 }, "table": "mysql_result1", "ts": 1623029681690, "type": "INSERT" }
案例:本次将通过fink stream读取kafka中json数据,然后过滤出插入的数据,并且拿到数据,代码如下
package it.bigdata.flink.study.test import java.util.Properties import com.alibaba.fastjson.JSON import it.bigdata.flink.study.test.entity.MysqlResult import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer object SteamBinLog { def main(args: Array[String]): Unit = { //创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //3.从kafka中读取数据 val props = new Properties() props.setProperty("bootstrap.servers","10.18.35.155:9092,10.18.35.156:9092,10.18.35.157:9092") // props.setProperty("group.id","consumer-group") val stream3 = env.addSource(new FlinkKafkaConsumer[String]("test_mysql_result1", new SimpleStringSchema(), props) .setStartFromEarliest()) val stream4 = stream3.map(data => { val json= JSON.parseObject(data) json }) .filter(_.get("type").equals("INSERT")) .map(data=>{ val json = JSON.parseObject(data.getJSONArray("data").get(0).toString) val my = new MysqlResult() my.setName(json.getString("name")) my.setAge(json.getIntValue("age")) my.setXb(json.getString("xb")) my }) stream4.print() env.execute("stream binlog") } }
下边是报错json的相关操作,复制的原为内容:https://segmentfault.com/a/1190000039415392
1.将数据转为json
import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import org.json4s.jackson.Serialization._ import org.json4s.jackson.Serialization case class WOE(col: String, woe: Map[String, String]) implicit val formats = Serialization.formats(NoTypeHints) val testMap = Map[String, String]() testMap += ("1" -> "1.1") val a = WOE("1", immutable.Map(testMap.toList:_*)) println(write(a))
输出{"col":"1","woe":{"1":"1.1"}}
2、解析json
implicit val formats = Serialization.formats(NoTypeHints) val js = """ {"col":"1","woe":{"1":"1.1"}} """ val ab = parse(js).extract[WOE] println(write(ab))
如果是List也可以
implicit val formats = Serialization.formats(NoTypeHints) val b = new ListBuffer[WOE] val testMap = Map[String, String]() testMap += ("1" -> "1.1") b += WOE("1", immutable.Map(testMap.toList:_*)) b += WOE("3", immutable.Map(testMap.toList:_*)) println(write(b)) val js = """ [{"col":"1","woe":{"1":"1.1"}},{"col":"3","woe":{"1":"1.1"}}] """ val ab = parse(js).extract[List[WOE]] println(ab.toString)
1、scala自带的Json解析
scala 2.10(以上,其他版本不清楚)自带Json解析,scala.util.parsing.json.JSON
object转json
val testMap = Map[String, String]() testMap += ("1" -> "2.034") testMap += ("2" -> "2.0134") println(scala.util.parsing.json.JSONObject(scala.collection.immutable.Map(testMap.toList: _*)))
但好像只能处理map,且map要转成immutable
2、fastjson
解析json
import com.alibaba.fastjson.JSON object JsonDemo { def main(args: Array[String]) { val text = "{"name":"name1", "age":55}" val json = JSON.parseObject(text) println(json.get("name")) println(json.get("age")) } }
再例如
import com.alibaba.fastjson.JSON object Json { def main(args: Array[String]): Unit = { val str2 = "{"et":"kanqiu_client_join","vtm":1435898329434,"body":{"client":"866963024862254","client_type":"android","room":"NBA_HOME","gid":"","type":"","roomid":""},"time":1435898329}" val json=JSON.parseObject(str2) //获取成员 val fet=json.get("et") //返回字符串成员 val etString=json.getString("et") //返回整形成员 val vtm=json.getInteger("vtm") println(vtm) //返回多级成员 val client=json.getJSONObject("body").get("client") println(client)
在spark-steaming中,使用fast-json更加稳定,json-lib经常出现莫名问题,而且fastjson的解析速度更快.
object转json,首先必须要显式的定义参数,否则会报错
ambiguous reference to overloaded definition
1
例如:
val testMap = Map[String, String]() testMap += ("1" -> "2.034") testMap += ("2" -> "2.0134") val a = JSON.toJSONString(testMap, true) println(a)
不会报错,但是输出结果是奇怪的
{ "empty":false, "sizeMapDefined":false, "traversableAgain":true }
3、json4s
object转json
val testMap = Map[String, String]() testMap += ("1" -> "2.034") testMap += ("2" -> "2.0134") val jj = compact(render(testMap)) println(jj)
输出
[{"2":"2.0134"},{"1":"2.034"}]
如果都是String,复杂的Map结构也可以解析
val testMap = Map[String, Map[String, String]]() val subMap = Map[String, String]() subMap += ("1" -> "1.1") testMap += ("1" -> subMap) println(write(testMap))
输出{"1":{"1":"1.1"}}
但这样的形式不利于解析
再例如
implicit val formats = Serialization.formats(NoTypeHints) val m = Map( "name" -> "john doe", "age" -> 18, "hasChild" -> true, "childs" -> List( Map("name" -> "dorothy", "age" -> 5, "hasChild" -> false), Map("name" -> "bill", "age" -> 8, "hasChild" -> false))) val mm = Map( "1" -> Map ("1"->"1.2") ) println(write(a))
TEST
package com.dfssi.dataplatform.analysis.exhaust.alarm import java.sql.Timestamp import java.util import com.alibaba.fastjson.serializer.SerializerFeature import org.apache.spark.Logging import org.json4s.NoTypeHints //将要解析得数据 case class NeedEntity(val vin: String, val downoutput: Double, val collectTime: Long, val lon: Double, val lat: Double, val failureList: java.util.List[Integer] = new util.ArrayList[Integer]() ) extends Serializable //管理状态 //这是事件管理得 按照每个事件来处理 class OverLimitEvent(var vin: String, var startTime: Long, var startLon: Double, var startLat: Double, var eventType:String="overlimit", var endTime: Long = 0, var endLon: Double = 0.0, var endLat: Double = 0.0, var minValue: Double = 0.0, var maxValue: Double = 0.0 ) extends Serializable with Logging{ def getInsertMap(): Map[String, Any] = { Map( "vin" -> vin, "startTime" -> new Timestamp(startTime), "startLon" -> startLon, "startLat" -> startLat ) } def getUpdateMap(): Map[String, Any] = { Map( "vin" -> vin, "startTime" -> new Timestamp(startTime), "endTime" -> new Timestamp(startTime), "endLon" -> startLon, "endLat" -> startLat, "maxValue" -> maxValue, "minValue" -> minValue ) } def updateByEntity(entity: NeedEntity) = { this.endTime = entity.collectTime this.endLat = entity.lat this.endLon = entity.lon if (this.maxValue != null && this.maxValue < entity.downoutput) { this.maxValue = entity.downoutput } if (this.minValue != null && this.minValue > entity.downoutput) { this.minValue = entity.downoutput } } override def toString(): String ={ import org.json4s.jackson.Serialization._ import org.json4s.jackson.Serialization implicit val formats = Serialization.formats(NoTypeHints) write(this) } } object OverLimitEvent { val ID_FIELD = Array("vin", "startTime") def apply( vin: String, startTime: Long, startLon: Double, startLat: Double, endTime: Long, endLon: Double, endLat: Double, minValue: Double, maxValue: Double ): OverLimitEvent = { val event = new OverLimitEvent(vin, startTime, startLon, startLat) event.endTime = endTime event.endLat = endLat event.endLon = endLon event.maxValue = maxValue event.minValue = minValue event } def buildByEntity(entity: NeedEntity): OverLimitEvent = { new OverLimitEvent(entity.vin, entity.collectTime, entity.lon, entity.lat) } def buildByJson(json: String): OverLimitEvent = { com.alibaba.fastjson.JSON.parseObject(json, classOf[OverLimitEvent]) } override def toString(): String ={ import org.json4s.jackson.Serialization._ import org.json4s.jackson.Serialization implicit val formats = Serialization.formats(NoTypeHints) write(this) } } case class ExhaustAlarmStatus(val vin: String, var overLimitEvent: OverLimitEvent=null,var faultEvent:Map[String,OverLimitEvent]=null, var lastTime: Long) { override def toString(): String ={ import org.json4s.jackson.Serialization._ import org.json4s.jackson.Serialization implicit val formats = Serialization.formats(NoTypeHints) write(this) } } object ExhaustAlarmStatus { def buildByJson(json: String): ExhaustAlarmStatus = { if(json!=null){ com.alibaba.fastjson.JSON.parseObject(json, classOf[ExhaustAlarmStatus]) }else{ null } } def toJSON(state: ExhaustAlarmStatus): String = com.alibaba.fastjson.JSON.toJSONString(state, SerializerFeature.PrettyFormat) def main(args: Array[String]): Unit = { val json = "{"vin":"222", "OverLimitEvent":{ "vin":"222", "startTime":123456789, "startLon":1.0, "startLat":1.0, "endTime":123456789, "endLon":1.0, "endLat":1.0, "minValue":1.0, "maxValue":1.0 },"lastTime":1556441242000}"; val state = com.alibaba.fastjson.JSON.parseObject(json, classOf[ExhaustAlarmStatus]) println(state.overLimitEvent) import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import org.json4s.jackson.Serialization._ import org.json4s.jackson.Serialization implicit val formats = Serialization.formats(NoTypeHints) val jsonstr = write(state) println(jsonstr) } }