package com.lg.blgdata.streaming import org.apache.spark.streaming.StreamingContext import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.storage.StorageLevel import org.apache.kafka.common.serialization.StringDeserializer import kafka.serializer.StringDecoder import kafka.serializer.StringDecoder import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.LocationStrategies import org.apache.spark.streaming.kafka010.ConsumerStrategies import org.apache.spark.streaming.kafka010.PerPartitionConfig import org.apache.spark.streaming.kafka010.PreferConsistent import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategy import org.apache.spark.streaming.Seconds import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.sql.Row import org.apache.spark.sql.DataFrame import java.text.SimpleDateFormat import java.util.Calendar import org.apache.spark.sql.Dataset import org.apache.spark.sql.types.LongType import java.util.Date import scala.collection.mutable import java.lang.Long import org.apache.kafka.common.TopicPartition import redis.clients.jedis.Jedis import redis.clients.jedis.Pipeline import com.lg.blgdata.utils.JedisConnectionPool import com.lg.bigdata.utils.JZWUtil /** * 1. 创建Driver 无状态 * kafka给redis推送实时5分钟/流量,1天/流量 */ object KafkaRedis { val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm") val hourSdf = new SimpleDateFormat("yyyy-MM-dd HH") val daysdf = new SimpleDateFormat("yyyy-MM-dd") val fmtScornd = new SimpleDateFormat("ss") def main(args: Array[String]): Unit = { val groupId = "jwz" //1.创建SparkConf并初始化SSC val sparkConf = new SparkConf().setMaster("local[*]").setAppName("CarCount") val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.sparkContext.setLogLevel("WARN") /*2.定义kafka参数将kafka参数映射为map * earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 * latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 * none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 */ val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hadoop104:9092", //kafka链接地址 "key.deserializer" -> classOf[StringDeserializer], //序列化 "value.deserializer" -> classOf[StringDeserializer], //反序列化 "group.id" -> groupId, //主题 "auto.offset.reset" -> "latest", //earliest latest "enable.auto.commit" -> (true: java.lang.Boolean) //是否让消费者自己提交偏移量 ) val topics = Array("car") //3.通过KafkaUtil创建kafkaDSteam //官方推荐的直连方式,使用kafka底层的API,效率更高 val kafkaDSteam = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)) //数据类型 val schema = StructType(List( StructField("cameraId", StringType), StructField("time", StringType), StructField("lane_position", StringType), StructField("carType", StringType), StructField("speed", StringType), StructField("space", StringType))) //4.yKey结果输出到redis var jedis: Jedis = null //开启redis的(pipeline)事务 var pipeline: Pipeline = null var spark:SparkSession =null /** * 将reduceB * 处理JSON字符串为Row 生成RDD[Row] 然后通过schema创建DataFrame * 左线 :V158 * 右线 :V005 */ kafkaDSteam.map(record => JZWUtil.handlerMessage2Row(record.value())).foreachRDD(rdd => { if (!rdd.isEmpty()) { //数据不为空 if(spark==null){ spark= SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate() } val df:DataFrame = spark.createDataFrame(rdd, schema) val map=getTime //主线左时间节点和点位筛选 val dfV158=df.filter(" cameraId =='V158' and time >"+map.get("sdate").get).toDF() //筛选两个列 val countV158=dfV158.select("time","cameraId").count() //主线右时间节点和点位筛选 val dfV005=df.filter(" cameraId =='V005' and time >"+map.get("sdate").get).toDF() //筛选两个列 val countV005=dfV005.select("time","cameraId").count() //主线右时间节点和点位筛选 val dfV024=df.filter(" cameraId =='V024' and time >"+map.get("sdate").get).toDF() //筛选两个列 val countV024=dfV024.select("time","cameraId").count() try { //获取一个jedis连接池 if(jedis==null){ jedis=JedisConnectionPool.getConnections() } jedis.select(3)//3号db,默认有16个 //开启pipeline pipeline=jedis.pipelined() //开启多操作模式 pipeline.multi() //写入计算好的结果 /* * pipeline.hset(x$1, x$2, x$3)//覆盖 * 大key 小key 值 * 有则累加,无则新增 */ //5s实时 pipeline.hincrBy("SV158", format.format(map.get("edate").get),countV158) //分钟实时 pipeline.hincrBy("MV158", sdf.format(map.get("edate").get),countV158) //小时实时 pipeline.hincrBy("HV158", hourSdf.format(map.get("edate").get),countV158) //天实时 pipeline.hincrBy("DV158", daysdf.format(map.get("edate").get),countV158) //全线 pipeline.hincrBy("allM", sdf.format(map.get("edate").get),countV158) //V005 pipeline.hincrBy("SV005",format.format(map.get("edate").get), countV005) pipeline.hincrBy("MV005",sdf.format(map.get("edate").get),countV005) pipeline.hincrBy("HV005",hourSdf.format(map.get("edate").get),countV005) pipeline.hincrBy("DV005",daysdf.format(map.get("edate").get), countV005) //全线 pipeline.hincrBy("allM", sdf.format(map.get("edate").get),countV005) //V024 pipeline.hincrBy("HV024", hourSdf.format(map.get("edate").get),countV024) pipeline.hincrBy("DV024", daysdf.format(map.get("edate").get),countV024) //提交事务 pipeline.sync() pipeline.exec() } catch { case e: Exception => { e.printStackTrace() pipeline.discard()//放弃前面的操作 ssc.stop(true)//优雅关闭 } }finally{ if(pipeline!=null){ pipeline.close() } if(jedis!=null){ jedis.close() } } } }) //启动采集器 ssc.start() //Driver等待采集器的执行,采集器终止,Driver也会终止 ssc.awaitTermination() } def getTime(): mutable.Map[String, Long] = { //计算出最新的5秒钟时间节点 val date: Calendar = Calendar.getInstance() val indexMinute = format.format(date.getTime()) var dt: String = null val scornd = fmtScornd.format(date.getTime) if (Integer.valueOf(scornd) % 5 != 0) { val rs: Int = Integer.valueOf(scornd) / 5 val min = (rs * 5 + 5).toString() val builderDate = new StringBuilder(indexMinute).replace(17, 19, min) dt = builderDate.toString() } else { dt = indexMinute } //算出上一个5秒钟节点的结束时间 val time: Date = format.parse(dt.toString()) val sdate: Calendar = Calendar.getInstance() sdate.setTime(time) sdate.add(Calendar.SECOND, -5) var map: mutable.Map[String, Long] = mutable.Map() map("sdate") = sdate.getTimeInMillis.toLong //时间戳,用于做时间比对 map("edate") = format.parse(dt).getTime().longValue() //存入redis的是格式化的时间 (map) } }