zoukankan      html  css  js  c++  java
  • SparkStreaming消费Kafka数据并计算后往Kafka写数据案列(2)

    案列一:

    package com.lg.bigdata.streaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.catalyst.expressions.Concat
    import org.apache.spark.sql.Column
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.Seconds
    import org.apache.spark.streaming.Milliseconds
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.kafka010.LocationStrategies
    import org.apache.spark.streaming.kafka010.ConsumerStrategies
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.types.StructField
    import org.apache.spark.sql.types.StringType
    import org.apache.spark.sql.Row
    import scala.collection.mutable
    import java.lang.Double
    import java.util.UUID
    import org.apache.spark.sql.Dataset
    import org.apache.spark.rdd.RDD
    import com.google.gson.JsonObject
    import scala.util.parsing.json.JSONArray
    import org.apache.hadoop.mapred.KeyValueTextInputFormat
    import org.apache.hadoop.io.Text
    import org.apache.hadoop.mapreduce.Job
    import java.util.Properties
    import org.apache.kafka.clients.producer.KafkaProducer
    import java.util.concurrent.Future
    import org.apache.kafka.clients.producer.RecordMetadata
    import org.apache.kafka.clients.producer.ProducerRecord
    import org.apache.spark.broadcast.Broadcast
    import org.apache.kafka.common.serialization.StringSerializer
    import org.apache.spark.TaskContext
    import com.lg.bigdata.utils.JZWUtil
    
    /** 
     *  	弃用
     * 	一:模块功能介绍
     * 			(1) 功能介绍:轨迹推算
     */
    object KafkaAndJsonGJTS_back {
    	def main(args:Array[String]):Unit={
    			    val groupId = "jwz_GJ"
    			    //val groupId = "jwz_test"
    			    
    					//1.创建SparkConf并初始化SSC,.setMaster("local[*]")
    					val sparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaAndJsonGJTS_back")
    					val ssc = new StreamingContext(sparkConf, Milliseconds(500))
    					ssc.sparkContext.setLogLevel("WARN")
    					
    				  val   spark= SparkSession.builder().config(sparkConf).getOrCreate()
    					val   sc=spark.sparkContext
    					/*2.定义kafka参数将kafka参数映射为map
    					 * earliest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    					 * 						如果offect不存在,自动重置偏移量为最小偏移量
    					 * latest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
    					 * 					如果offect不存在,自动重置偏移量为最大偏移量
    					 * none  topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
    					 */
    
    					val kafkaParams = Map[String, Object](
    							"bootstrap.servers" -> "hadoop104:9092", //kafka链接地址
    							"key.deserializer" -> classOf[StringDeserializer], //序列化
    							"value.deserializer" -> classOf[StringDeserializer], //反序列化
    							"group.id" -> groupId, //主题
    							"auto.offset.reset" -> "earliest", //earliest latest
    							"enable.auto.commit" -> (true: java.lang.Boolean) //是否让消费者自己提交偏移量(默认true)
    							)
    
    					val topics = Array("car")
    
    					//3.通过KafkaUtil创建kafkaDSteam
    					//官方推荐的直连方式,使用kafka底层的API,效率更高
    					val kafkaDSteam = KafkaUtils.createDirectStream(
    							ssc,
    							LocationStrategies.PreferConsistent,
    							ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
    
    					//数据类型
    					val schema = StructType(List(
    							StructField("cameraId", StringType),
    							StructField("time", StringType),
    							StructField("lane_position", StringType),
    							StructField("carType", StringType),
    							StructField("speed", StringType),
    							StructField("space", StringType)))
    
    					//初始化轨迹的位置
    					var mapLeft: mutable.Map[String, String] = mutable.Map()
    					mapLeft("L")="in/left_lane/ZL.json"
    					mapLeft("M")="in/left_lane/ZM.json"
    					mapLeft("R")="in/left_lane/ZR.json"
    
    					var mapReght: mutable.Map[String, String] = mutable.Map()
    					mapReght("L")="in/reght_lane/YL.json"
    					mapReght("M")="in/reght_lane/YM.json"
    					mapReght("R")="in/reght_lane/YR.json"
    					
    					//变量往外抽
    					val init:Int=43200
    					var df:DataFrame=null
    					var dfV158:DataFrame=null
    					var dfV005:DataFrame=null
    					var seV158:Array[Row]=null
    					var seV005:Array[Row]=null
    					var json8:DataFrame=null
    					var json5:DataFrame=null
    					var newJson8:String=null
    					var newJson5:String=null
    					var rdd8:RDD[String]=null
    					var rdd5:RDD[String]=null
    					
    					//2.利用广播变量的形式,将kafkaProducer广播到每一个executor
    					//广播kafkasink
    				  val kafkaProducer:Broadcast[KafkaSink[String,String]]={
    							val kafkaProducerConfig = {
    									val p = new Properties()
    											p.setProperty("bootstrap.servers", "hadoop104:9092") //kafka地址
    											p.setProperty("key.serializer", classOf[StringSerializer].getName) //key序列化
    											p.setProperty("value.serializer", classOf[StringSerializer].getName) //value序列化
    											p
    							} 
    							sc.broadcast(KafkaSink[String,String](kafkaProducerConfig))
    					}
    
    					/**
    					 * 将reduceB
    					 * 处理JSON字符串为Row 生成RDD[Row] 然后通过schema创建DataFrame
    					 * 左线 :V158
    					 * 右线 :V005
    					 */
    					import org.apache.spark.sql.functions._
    					kafkaDSteam.map(record => JZWUtil.handlerMessage2Row(record.value())).foreachRDD(rdd => {
    						if (!rdd.isEmpty()) { //数据不为空
    						    df= spark.createDataFrame(rdd, schema)
    								//主线左
    								dfV158=df.filter("cameraId =='V158'").toDF()
    								/*
    								 * 第一步:拿到车辆的参数
    								 * 	筛选列
    								 *  	位置: lane_position:L,M,R
    								 *	     车型: carType      :car→1,bus→2
    								 *	      速度: speed        
    								 * 		摄像头编号: cameraId
    								 */
    								if(dfV158.count()>0){
    									    seV158=dfV158.select("lane_position","carType","cameraId","speed").collect()
    											//第二步:拿到车辆的参数根据车辆信息读取JSON
    											seV158.foreach(x⇒{
        												//读取对应车道的轨迹,缓存
        											  json8=spark.read.json(mapLeft.get(x.get(0).toString()).get).cache()
        											
    														//(1)车型赋值
    														var rowV158:DataFrame=null
    														if(x.get(1).toString().equals("car")){
    															rowV158=json8.withColumn("type",concat(json8.col("type"),lit("1")))
    														}else{
    															rowV158=json8.withColumn("type",concat(json8.col("type"),lit("2")))
    														}
    
    											    	//(2)车辆编号,唯一即可
    														rowV158=rowV158.withColumn("fz_car_id",concat(json8.col("fz_car_id"),lit(uuid)))
    
    														//(3)毫秒/12米
    														val time=Math.abs(scala.math.round(init/Double.valueOf(x.get(3).toString())))
    														rowV158=rowV158.withColumn("time",(rowV158("id")-1)*time)//取DataFrame中的id((id-1)*(12米的毫秒数))
    														rowV158=rowV158.withColumn("is_show",concat(json8.col("is_show"),lit(time)))
    
    														if(rowV158.count()>0){
    														  //把spark的json格式数据转java可用的json 
    													    newJson8=rowV158.toJSON.collectAsList().toString()
    													    //结果写入Kakfa
    													    kafkaProducer.value.send("GJTS_topic",newJson8)
    														}
    														
    											})
    								}
    
    									//主线右
    									dfV005=df.filter(" cameraId =='V005'").toDF()
    											//筛选两个列
    									if(dfV005.count()>0){
    											    seV005=dfV005.select("lane_position","carType","cameraId","speed").collect()
      											  seV005.foreach(x⇒{
    
          													//读取对应车道的轨迹,缓存
          													json5=spark.read.json(mapReght.get(x.get(0).toString()).get).cache()
    
      															//(1)车型赋值
      															var rowV005:DataFrame=null
      															if(x.get(1).toString().equals("car")){
      																rowV005=json5.withColumn("type",concat(json5.col("type"),lit("1")))
      															}else{
      																rowV005=json5.withColumn("type",concat(json5.col("type"),lit("2")))
      															}
    
          													//(2)车辆编号,唯一即可
          															rowV005=rowV005.withColumn("fz_car_id",concat(json5.col("fz_car_id"),lit(uuid)))
        
          													//(3)毫秒/12米
          													val time=Math.abs(scala.math.round(init/Double.valueOf(x.get(3).toString())))
          													    rowV005=rowV005.withColumn("time",(rowV005("id")-1)*time)//取DataFrame中的id((id-1)*(12米的毫秒数))
          													    rowV005=rowV005.withColumn("is_show",concat(json5.col("is_show"),lit(time)))
        
          													 if(rowV005.count()>0){
          													      //把spark的json格式数据转java可用的json 
          													      newJson5=rowV005.toJSON.collectAsList().toString() 
                													//结果写入Kakfa
                													 kafkaProducer.value.send("GJTS_topic",newJson5)
          													}
        											  })
    											}
    						}
    					})
    
    					//启动采集器
    					ssc.start()
    					//Driver等待采集器的执行,采集器终止,Driver也会终止
    					ssc.awaitTermination()
    	}
    
    	//车辆编号生成
    	def uuid():String={
    	 UUID.randomUUID().toString().replaceAll("-", "").toString()
    	}
    	class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
    		/* This is the key idea that allows us to work around running into
         NotSerializableExceptions. */
    		    lazy val producer = createProducer()
    				def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    				producer.send(new ProducerRecord[K, V](topic, key, value))
    				def send(topic: String, value: V): Future[RecordMetadata] =
    				producer.send(new ProducerRecord[K, V](topic, value))
    	}
    	object KafkaSink {
    		import scala.collection.JavaConversions._
    		def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
    				val createProducerFunc = () => {
    					val producer = new KafkaProducer[K, V](config)
    							sys.addShutdownHook {
    						// Ensure that, on executor JVM shutdown, the Kafka producer sends
    						// any buffered messages to Kafka before shutting down.
    						producer.close()
    					}
    					producer
    				}
    				new KafkaSink(createProducerFunc)
    		}
    		def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
    	}
    }
    

      

    案列二:

    package com.lg.bigdata.streaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.catalyst.expressions.Concat
    import org.apache.spark.sql.Column
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.Seconds
    import org.apache.spark.streaming.Milliseconds
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.kafka010.LocationStrategies
    import org.apache.spark.streaming.kafka010.ConsumerStrategies
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.types.StructField
    import org.apache.spark.sql.types.StringType
    import org.apache.spark.sql.Row
    import scala.collection.mutable
    import java.lang.Double
    import java.util.UUID
    import org.apache.spark.sql.Dataset
    import org.apache.spark.rdd.RDD
    import com.google.gson.JsonObject
    import scala.util.parsing.json.JSONArray
    import org.apache.hadoop.mapred.KeyValueTextInputFormat
    import org.apache.hadoop.io.Text
    import org.apache.hadoop.mapreduce.Job
    import java.util.Properties
    import org.apache.kafka.clients.producer.KafkaProducer
    import java.util.concurrent.Future
    import org.apache.kafka.clients.producer.RecordMetadata
    import org.apache.kafka.clients.producer.ProducerRecord
    import org.apache.spark.broadcast.Broadcast
    import org.apache.kafka.common.serialization.StringSerializer
    import org.apache.spark.TaskContext
    import com.lg.bigdata.utils.JZWUtil
    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.types.LongType
    import org.apache.spark.SparkException
    import org.apache.spark.streaming.kafka010.HasOffsetRanges
    import org.apache.spark.streaming.kafka010.CanCommitOffsets
    
    /**
     * 	一:模块功能介绍
     * 			(1) 功能介绍:轨迹推算
     */
    object KafkaAndJsonGJTS {
          	val groupId = "jwz_test"
    					//val groupId = "jwz_GJ"
    			def main(args: Array[String]): Unit = {
    				
    
    							//1.创建SparkConf并初始化SSC,.setMaster("local[*]")
    							val sparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaAndJsonGJTS")
    							//设置序列化器为KryoSerializer
    							//sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    
    							val ssc = new StreamingContext(sparkConf,Milliseconds(500)) //500毫秒:
    							ssc.sparkContext.setLogLevel("WARN")
    
    							val spark= SparkSession.builder().config(sparkConf).getOrCreate()
    							val sc=spark.sparkContext
    
    							/*2.定义kafka参数将kafka参数映射为map
    							 * earliest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    							 * 						如果offect不存在,自动重置偏移量为最小偏移量
    							 * latest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
    							 * 					如果offect不存在,自动重置偏移量为最大偏移量
    							 * none  topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
    							 */
    
    							val kafkaParams = Map[String, Object](
    									"bootstrap.servers" -> "hadoop104:9092", //kafka链接地址
    									"key.deserializer" -> classOf[StringDeserializer], //序列化
    									"value.deserializer" -> classOf[StringDeserializer], //反序列化
    									"group.id" -> groupId, //主题
    									"auto.offset.reset" -> "latest", //earliest latest
    									"enable.auto.commit" -> (true: java.lang.Boolean), //是否让消费者自己提交偏移量(默认true)
    									"auto.commit.interval.ms" -> "500" //自动提交的时间
    									)
    
    							val topics = Array("car")
    
    							//3.通过KafkaUtil创建kafkaDSteam
    							//官方推荐的直连方式,使用kafka底层的API,效率更高
    							val kafkaDSteam = KafkaUtils.createDirectStream(
    									ssc,
    									LocationStrategies.PreferConsistent,
    									ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
    
    							//数据类型
    							val schema = StructType(List(
    									StructField("cameraId", StringType),
    									StructField("time", StringType),
    									StructField("lane_position", StringType),
    									StructField("carType", StringType),
    									StructField("speed", StringType),
    									StructField("space", StringType)))
    
    
    							//变量往外抽
    							val init: Int = 43200
    							var carjson:Dataset[Row]=null
    							var singleCarTrack: DataFrame=null
    							var datacar: Array[Row] = null
    							var rddString:RDD[String]= null
    							var newJson:String =null
    							var singleCarTrack_1:DataFrame=null
    
    							//2.利用广播变量的形式,将kafkaProducer广播到每一个executor
    							//广播kafkasink
    							val kafkaProducer:Broadcast[KafkaSink[String,String]]={
    							val kafkaProducerConfig = {
    									val p = new Properties()
    											p.setProperty("bootstrap.servers", "hadoop104:9092") //kafka地址
    											p.setProperty("key.serializer", classOf[StringSerializer].getName) //key序列化
    											p.setProperty("value.serializer", classOf[StringSerializer].getName) //value序列化
    											p
    							} 
    							sc.broadcast(KafkaSink[String,String](kafkaProducerConfig))
    					}
    							/**
    							 *	 每个摄像头单独推
    							 * 	将reduceB
    							 * 	处理JSON字符串为Row 生成RDD[Row] 然后通过schema创建DataFrame
    							 *
    							 */
    							import org.apache.spark.sql.functions._
    							kafkaDSteam.map(record => JZWUtil.handlerMessage2Row(record.value())).foreachRDD(rdd => {
    							  //获取偏移量信息
    						
    								if (!rdd.isEmpty()) { //数据不为空
        									//第一步:得到原始数据	
        									datacar= spark.createDataFrame(rdd, schema).select("lane_position","carType","cameraId","speed").collect()
    											/*	拿到车辆的参数:
    											 * 	筛选列
    											 *  	位置: lane_position:L,M,R
    											 *	     车型: carType      :car→1,bus→2
    											 *	      速度: speed
    											 * 		摄像头编号: cameraId
    											 * */
    
    											if(datacar.size>0){
    												//第三步:原始数据与json的比对,得到对应的轨迹点
    												datacar.foreach(x⇒{
        													//第二步:拿到车辆的JSON信息
        													carjson=spark.read.json(getPath.get(x.apply(2)+""+x.apply(0)).get)
    											
    
    															//单个车辆单路摄像头的json轨迹
    															singleCarTrack=carjson.filter("cam_ID=='"+x.apply(2)+"' and lane=='"+x.apply(0)+"'").toDF()
    															
    															//(1)数据处理1:车型赋值
    															if(x.get(1).toString().equals("car")){
    																singleCarTrack_1=singleCarTrack.withColumn("type",concat(singleCarTrack.col("type"),lit("1")))
    															}else{
    																singleCarTrack_1=singleCarTrack.withColumn("type",concat(singleCarTrack.col("type"),lit("2")))
    															}
    
        													//(2)车辆编号,唯一即可
        													singleCarTrack_1=singleCarTrack_1.withColumn("fz_car_id",concat(singleCarTrack_1.col("fz_car_id"),lit(uuid)))
    
    															//(3) 给当前的json设置递增的新id
    															singleCarTrack_1=singleCarTrack_1.withColumn("newid",row_number().over(Window.partitionBy(lit(1)).orderBy(lit(1).cast(LongType))))
    
    															//(4) 数据处理2:根据当前速度算出每12米的毫秒数(毫秒/12米)
    															val time=Math.abs(scala.math.round(init/Double.valueOf(x.get(3).toString())))
    															singleCarTrack_1=singleCarTrack_1.withColumn("time",(singleCarTrack_1("newid")-1)*time)//取DataFrame中的id((id-1)*(12米的毫秒数))
    															
    															//把spark的json格式数据转java可用的json,追加 [  ]
    															if(singleCarTrack_1.count()>0){
    															  	newJson=singleCarTrack_1.toJSON.collectAsList().toString()
    															  	val rddrow=sc.makeRDD(Seq(newJson))
    															  	rddrow.foreach(record⇒{
    															  	  kafkaProducer.value.send("GJTS_topic",record)
    															  	})
    														      
    															}
    														
    												})
    											}
    								}
    							})
    
    							//启动采集器
    							ssc.start()
    							//Driver等待采集器的执行,采集器终止,Driver也会终止
    							ssc.awaitTermination()
    	}
    
    	def getPath():mutable.Map[String, String]={
    			var mapPath: mutable.Map[String, String] = mutable.Map()
    					mapPath("V140L")="in/left_lane/ZL.json"
    					mapPath("V153L")="in/left_lane/ZL.json"
    					mapPath("V108L")="in/left_lane/ZL.json"
    					mapPath("V158L")="in/left_lane/ZL.json"
    					mapPath("V122L")="in/left_lane/ZL.json"
    
    					mapPath("V098L")="in/left_lane/ZL.json"
    					mapPath("V150L")="in/left_lane/ZL.json"
    					mapPath("V134L")="in/left_lane/ZL.json"
    					mapPath("V085L")="in/left_lane/ZL.json"
    					mapPath("V114L")="in/left_lane/ZL.json"
    
    					mapPath("V146L")="in/left_lane/ZL.json"
    					mapPath("V125L")="in/left_lane/ZL.json"
    					mapPath("V143L")="in/left_lane/ZL.json"
    					mapPath("V131L")="in/left_lane/ZL.json"
    					mapPath("V102L")="in/left_lane/ZL.json"
    
    					mapPath("V137L")="in/left_lane/ZL.json"
    					mapPath("V089L")="in/left_lane/ZL.json"
    					mapPath("V128L")="in/left_lane/ZL.json"
    					mapPath("V093L")="in/left_lane/ZL.json"
    					mapPath("V118L")="in/left_lane/ZL.json"
    					//	------------------------------------------
    					mapPath("V140M")="in/left_lane/ZM.json"
    					mapPath("V153M")="in/left_lane/ZM.json"
    					mapPath("V108M")="in/left_lane/ZM.json"
    					mapPath("V158M")="in/left_lane/ZM.json"
    					mapPath("V122M")="in/left_lane/ZM.json"
    
    					mapPath("V098M")="in/left_lane/ZM.json"
    					mapPath("V150M")="in/left_lane/ZM.json"
    					mapPath("V134M")="in/left_lane/ZM.json"
    					mapPath("V085M")="in/left_lane/ZM.json"
    					mapPath("V114M")="in/left_lane/ZM.json"
    
    					mapPath("V146M")="in/left_lane/ZM.json"
    					mapPath("V125M")="in/left_lane/ZM.json"
    					mapPath("V143M")="in/left_lane/ZM.json"
    					mapPath("V131M")="in/left_lane/ZM.json"
    					mapPath("V102M")="in/left_lane/ZM.json"
    
    					mapPath("V137M")="in/left_lane/ZM.json"
    					mapPath("V089M")="in/left_lane/ZM.json"
    					mapPath("V128M")="in/left_lane/ZM.json"
    					mapPath("V093M")="in/left_lane/ZM.json"
    					mapPath("V118M")="in/left_lane/ZM.json"
    					//	------------------------------------------
    					mapPath("V140R")="in/left_lane/ZR.json"
    					mapPath("V153R")="in/left_lane/ZR.json"
    					mapPath("V108R")="in/left_lane/ZR.json"
    					mapPath("V158R")="in/left_lane/ZR.json"
    					mapPath("V122R")="in/left_lane/ZR.json"
    
    					mapPath("V098R")="in/left_lane/ZR.json"
    					mapPath("V150R")="in/left_lane/ZR.json"
    					mapPath("V134R")="in/left_lane/ZR.json"
    					mapPath("V085R")="in/left_lane/ZR.json"
    					mapPath("V114R")="in/left_lane/ZR.json"
    
    					mapPath("V146R")="in/left_lane/ZR.json"
    					mapPath("V125R")="in/left_lane/ZR.json"
    					mapPath("V143R")="in/left_lane/ZR.json"
    					mapPath("V131R")="in/left_lane/ZR.json"
    					mapPath("V102R")="in/left_lane/ZR.json"
    
    					mapPath("V137R")="in/left_lane/ZR.json"
    					mapPath("V089R")="in/left_lane/ZR.json"
    					mapPath("V128R")="in/left_lane/ZR.json"
    					mapPath("V093R")="in/left_lane/ZR.json"
    					mapPath("V118R")="in/left_lane/ZR.json"
    
    					//======================================
    					mapPath("V032L")="in/reght_lane/YL.json"
    					mapPath("V072L")="in/reght_lane/YL.json"
    					mapPath("V029L")="in/reght_lane/YL.json"
    					mapPath("V005L")="in/reght_lane/YL.json"
    					mapPath("V051L")="in/reght_lane/YL.json"
    					//--------------------------------------
    					mapPath("V009L")="in/reght_lane/YL.json"
    					mapPath("V027L")="in/reght_lane/YL.json"
    					mapPath("V062L")="in/reght_lane/YL.json"
    					mapPath("V039L")="in/reght_lane/YL.json"
    					mapPath("V067L")="in/reght_lane/YL.json"
    					//--------------------------------------
    					mapPath("V035L")="in/reght_lane/YL.json"
    					mapPath("V058L")="in/reght_lane/YL.json"
    					mapPath("V018L")="in/reght_lane/YL.json"
    					mapPath("V045L")="in/reght_lane/YL.json"
    					mapPath("V042L")="in/reght_lane/YL.json"
    					//--------------------------------------
    					mapPath("V048L")="in/reght_lane/YL.json"
    					mapPath("V014L")="in/reght_lane/YL.json"
    					mapPath("V024L")="in/reght_lane/YL.json"
    					mapPath("V076L")="in/reght_lane/YL.json"
    					mapPath("V054L")="in/reght_lane/YL.json"
    					//======================================
    					mapPath("V032M")="in/reght_lane/YM.json"
    					mapPath("V072M")="in/reght_lane/YM.json"
    					mapPath("V029M")="in/reght_lane/YM.json"
    					mapPath("V005M")="in/reght_lane/YM.json"
    					mapPath("V051M")="in/reght_lane/YM.json"
    					//--------------------------------------
    					mapPath("V009M")="in/reght_lane/YM.json"
    					mapPath("V027M")="in/reght_lane/YM.json"
    					mapPath("V062M")="in/reght_lane/YM.json"
    					mapPath("V039M")="in/reght_lane/YM.json"
    					mapPath("V067M")="in/reght_lane/YM.json"
    					//--------------------------------------
    					mapPath("V035M")="in/reght_lane/YM.json"
    					mapPath("V058M")="in/reght_lane/YM.json"
    					mapPath("V018M")="in/reght_lane/YM.json"
    					mapPath("V045M")="in/reght_lane/YM.json"
    					mapPath("V042M")="in/reght_lane/YM.json"
    					//--------------------------------------
    					mapPath("V048M")="in/reght_lane/YM.json"
    					mapPath("V014M")="in/reght_lane/YM.json"
    					mapPath("V024M")="in/reght_lane/YM.json"
    					mapPath("V076M")="in/reght_lane/YM.json"
    					mapPath("V054M")="in/reght_lane/YM.json"
    
    					//======================================
    					mapPath("V032R")="in/reght_lane/YR.json"
    					mapPath("V072R")="in/reght_lane/YR.json"
    					mapPath("V029R")="in/reght_lane/YR.json"
    					mapPath("V005R")="in/reght_lane/YR.json"
    					mapPath("V051R")="in/reght_lane/YR.json"
    					//--------------------------------------
    					mapPath("V009R")="in/reght_lane/YR.json"
    					mapPath("V027R")="in/reght_lane/YR.json"
    					mapPath("V062R")="in/reght_lane/YR.json"
    					mapPath("V039R")="in/reght_lane/YR.json"
    					mapPath("V067R")="in/reght_lane/YR.json"
    					//--------------------------------------
    					mapPath("V035R")="in/reght_lane/YR.json"
    					mapPath("V058R")="in/reght_lane/YR.json"
    					mapPath("V018R")="in/reght_lane/YR.json"
    					mapPath("V045R")="in/reght_lane/YR.json"
    					mapPath("V042R")="in/reght_lane/YR.json"
    					//--------------------------------------
    					mapPath("V048R")="in/reght_lane/YR.json"
    					mapPath("V014R")="in/reght_lane/YR.json"
    					mapPath("V024R")="in/reght_lane/YR.json"
    					mapPath("V076R")="in/reght_lane/YR.json"
    					mapPath("V054R")="in/reght_lane/YR.json"
    
    					mapPath
    	}
    
    	//车辆编号生成
    	def uuid(): String = {
    			UUID.randomUUID().toString().replaceAll("-", "").toString()
    	}
    
    	//1.首先需要将KafkaProducer利用lazy val的方式进行包装
    	class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
    		//这是一个关键的想法,使我们能够绕过运行到NotSerializableExceptions。
    		lazy val producer = createProducer()
    				def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    				producer.send(new ProducerRecord[K, V](topic, key, value))
    
    				def send(topic: String, value: V): Future[RecordMetadata] =
    				producer.send(new ProducerRecord[K, V](topic, value))
    	}
    	object KafkaSink {
    		import scala.collection.JavaConversions._
    		def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
    				val createProducerFunc = () => {
    					val producer = new KafkaProducer[K, V](config)
    						sys.addShutdownHook {
    						//确保在executor JVM关闭时,Kafka生产者发送
    						//关闭前任何缓冲的消息。
    						producer.close()
    					}
    					producer
    				}
    				new KafkaSink(createProducerFunc)
    		}
    		def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
    	}
    }
    

      

    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
  • 相关阅读:
    osg::BlendFunc来设置透明度
    LCA(Tarjan)
    CODEVS1073 家族 (并查集)
    CODEVS1533 互斥的数(哈希表)
    2014-12-4
    BZOJ2661 连连看 (费用流)
    2014-11-30
    JAVA语法基础作业——动手动脑以及课后实验性问题
    课后作业01——相加
    再读大道至简第二章
  • 原文地址:https://www.cnblogs.com/KdeS/p/14307041.html
Copyright © 2011-2022 走看看