- 案列一:
使用SparkCore模块监听UDP端口数据经过逻辑处理数据后存入myslq
package com.lg.bigdata.core import org.apache.spark.SparkConf import com.alibaba.fastjson.JSON import java.util.LinkedHashMap import org.apache.spark.sql.Row import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.mapred.TableOutputFormat import java.util.Calendar import java.text.SimpleDateFormat import org.apache.hadoop.hbase.client.Get import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Table import org.apache.hadoop.hbase.client.ConnectionFactory import org.apache.hadoop.hbase.TableName import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.Cell import org.apache.hadoop.hbase.CellUtil import org.apache.hadoop.util.StringUtils import scala.collection.mutable.ArrayBuffer import scala.collection.mutable import java.net.DatagramSocket import java.net.DatagramPacket import java.nio.ByteBuffer import java.sql.Connection import java.sql.PreparedStatement import java.sql.ResultSet import java.sql.CallableStatement import com.lg.blgdata.utils.DruidConnectionPool import scala.collection.mutable.Buffer import java.lang.Double import java.util.concurrent.Executors import java.util.concurrent.Callable import java.util.UUID import java.util.concurrent.TimeUnit import scala.util.control.Breaks._ import org.apache.spark.SparkContext /** * 一:此模块的功能(排队长度预测) * 作用:接收哈工大的事件信息结合我们自己的平台数据做排队长度预测 * UDP协议可解析的数据:事件发生时间,事件点位摄像头,事件类型,车道信息等其他信息未解析出 * * (1)计算预测排队长度 * (2)计算实时排队长度 */ object EventMonitorUdp { val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm") val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val TABLENAME = "jzw_data:section_1min" /** * 可缓存线程池: * newCachedThreadPool特点, * (1)创建数量几乎没有数量 * (2)空闲的线程默认1分钟会自动终止,自动线程回收 * (3)注意任务数量,可能由于大量的任务同时运行导致系统瘫痪 */ val pool=Executors.newCachedThreadPool() def main(args: Array[String]): Unit = { val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("EventMonitorUdp") //创建spark上下文对象 val sc = new SparkContext(config) var ds:DatagramSocket = null try { //1.创建接收方对象,指定那个端口接收数据 ds= new DatagramSocket(9101) val buf:Array[scala.Byte]=new Array(700) while(true) { val sTime:Array[scala.Byte]=new Array(18) //2.创建一个数据包,指定数据 val dp:DatagramPacket = new DatagramPacket(buf,buf.length) //3.接收数据,阻塞方法 ds.receive(dp) //实际dp不一定有1024个字节 //获取收到的数据 val data:Array[scala.Byte] =dp.getData() //(1)协议类型 :1 (2)消息类型:交通事件 if(data.apply(2)==1 && data.apply(3)==2){ //一.解析出发生时间 Array.copy(data, 16, sTime,0, 18)//源数组,开始下标,目标数组,目标数组开始下标,目标数组结束下标 val stbuf:StringBuffer = new StringBuffer() sTime.foreach(x⇒{ stbuf.append(x.toString()+",") }) val startTime=asciiToString(stbuf.toString()) //时间格式化 val buffer:StringBuffer=new StringBuffer(startTime) buffer.insert(4,"-") .insert(7,"-") .insert(10," ") .insert(13,":") .insert(16,":"); val fmtTime:String=buffer.toString().substring(0,19) //二.解析到摄像头编号 val camIdByte:Array[scala.Byte]=new Array(4); System.arraycopy(data, 10,camIdByte, 0, 4); val camId:Int =ByteBuffer.wrap(camIdByte).getInt()+1 //摄像头编号补三位 var carmID:String="" if(String.valueOf(camId).length()==1) { carmID="V00"+camId } if(String.valueOf(camId).length()==2) { carmID="V0"+camId } if(String.valueOf(camId).length()==3) { carmID="V"+camId } //三.事件类型 data[52]+","+data[53],第一位都是0 //只有停车事件符合条件:1 val event:Int=data.apply(53) //四.车道 1~10 val lane:Int=data.apply(15) //条件一:目前只监听停车事件 则进入条件二 if(event==1) { if(carmID.length()>0) { var map: mutable.Map[String, String] =getCameraData(carmID) if(map.size>0) { val uuid:String=UUID.randomUUID().toString().replaceAll("-", "") var datamap: mutable.Map[String, String] =mutable.Map() datamap("uuid")=uuid datamap("startTime")=fmtTime datamap("event")="停车事件" datamap("carmID")=map.get("curr_cam").get pool.execute(new ThreadHGD(map,datamap)) } } } } } pool.shutdown() }catch { case e: Exception => { e.printStackTrace() pool.shutdown() ds.close()//关闭连接 } } sc.stop() } /** * map: mysql获取到的摄像头信息等 * datamap: uuid→uuid,time→时间 , event→"停车事件",carmID→摄像头ID * */ class ThreadHGD(map: mutable.Map[String, String],datamap: mutable.Map[String, String]) extends Runnable{ override def run(){ //记录是否满足预测拍对长度 var thbool:Boolean=false //此状态是判断是否持续报警 var boo:Boolean=true //记录是否是第一次1此进入 var num:Int=0 //持续时间 var t:String=null //预测排队长度 var x:String=null //记录本预警任务的任务取消的触发次数,三次则取消 var numberWN:Int=0 do { val date:Calendar = Calendar.getInstance() val index1:String = sdf.format(date.getTime()) + ":00" date.add(Calendar.MINUTE, -1); val index2:String = sdf.format(date.getTime()) + ":00" //当前摄像头当前分钟/上一分钟的数据(流量,速度) var thisValue:mutable.Map[String, String]=mutable.Map() //上游摄像头当前分钟/上一分钟的数据流量,速度) var upstValue:mutable.Map[String, String]=mutable.Map() //Hbase获取(流量,速度) //取当前摄像头当前分钟的数据 println(index1+"=="+index2+"=="+map) //curr_cam当前区段摄像头 //upst_cam上游区段摄像头 val rowkeyt1:String = format.parse(index1).getTime() + map.get("curr_cam").get.replace("V", "") thisValue=getHbase(rowkeyt1) //当前分钟的当前摄像头没有数据则取上一分钟 if(thisValue.size<=0){ val rowkeyt2:String = format.parse(index2).getTime() + map.get("curr_cam").get.replace("V", "") thisValue=getHbase(rowkeyt2) } //取上游摄像头当前分钟的数据 //上游摄像头 val rowkeyc1:String = format.parse(index1).getTime() + map.get("upst_cam").get.replace("V", "") upstValue=getHbase(rowkeyc1) //当前分钟的上游摄像头没有数据则取上一分钟 if(upstValue.size<=0){ val rowkeyc2:String = format.parse(index2).getTime() +map.get("upst_cam").get.replace("V", "") upstValue=getHbase(rowkeyc2) } //条件二:当前摄像头的流量大于42辆车则进入条件三 if(thisValue.size>0){ if(Double.valueOf(upstValue.get("car_num").getOrElse("0"))>35){ thbool=true } } println("本次预警是否满足条件: "+thbool) println("当前摄像头thisValue: "+thisValue) println("上游摄像头upstValue: "+upstValue) //产生告警 if(thbool){ //一个告警只有第一次进入的时候才会进行排队长度预测 if(num==0){ //条件三:预测排队长度计算和预警判断 //(1) 计算当前摄像头的密度 //事故时候的通行能力 val tc:Double=2900.0 var thisDensity=(tc/25.0).formatted("%.2f") //(2) 计算事故解除后当前路段的密度 var afterDensity:String=(Double.valueOf(map.get("cap_speed80").getOrElse("0"))/80.0).formatted("%.2f") //(3) 上游摄像头的密度(密度=流量/速度) var now_avg_densit=((Double.valueOf(upstValue.get("car_num").getOrElse("0"))/Double.valueOf(upstValue.get("now_avg_speed").getOrElse("0")))*60).formatted("%.2f") //(4)集结波波速 //(80(1-(上游摄像头的密度+当前摄像头的密度)/180)) var w1=(80*(1-(Double.valueOf(now_avg_densit)+Double.valueOf(thisDensity))/180.0)).formatted("%.2f") //取绝对值 w1=Math.abs(Double.valueOf(w1)).formatted("%.2f") //(5)消散波波速 //(80(1-(事故解除后的密度+当前摄像头的密度)/180)) var w2=(80*(1-(Double.valueOf(afterDensity)+Double.valueOf(thisDensity))/180.0)).formatted("%.2f") //取绝对值 w2=Math.abs(Double.valueOf(w2)).formatted("%.2f") //(6)最大排队长度时事故持续时间 //(消散波波速*(13/60))/(消散波波速-集结波波速) t=((Double.valueOf(w2)*(13/60.0))/(Double.valueOf(w1)-Double.valueOf(w2))).formatted("%.2f") t=Math.abs(Double.valueOf(t)).formatted("%.2f") //(6)事故引起的最大排队长度(预测排队长度) //消散波波速*(最大排队长度时事故持续时间-(13/60)) x=(Double.valueOf(w2)*(Double.valueOf(t)-(13/60.0))).formatted("%.3f") x=Math.abs(Double.valueOf(x)).toString() num+=1 } if(Double.valueOf(x)<0.1){//无需预警 return } //条件四:实时排队长度(取消告警) //获取上游7路摄像头 val cams:String=map.get("cam_7").getOrElse(null) var realTQL:Double=0 //初始实时排队长度 var startDist=Double.valueOf(map.get("cam_zh").getOrElse("0"))//本摄像头的桩号 if(cams!=null){//如果有上游摄像头,则获取没路摄像头的数据(速度,车间距离) val Lists:Array[String]=cams.split("、") //顺序循环上游摄像头,不满足则return breakable{ Lists.foreach(x⇒{ //装载上游记录摄像头的速度和车间距离 var inValue:mutable.Map[String, String]=mutable.Map() val rkt1:String = format.parse(index1).getTime() + x.split("-").apply(0).replace("V", "") inValue=getHbase(rkt1) if(inValue.size<=0){ val rkt2:String = format.parse(index2).getTime() + x.split("-").apply(0).replace("V", "") inValue=getHbase(rkt2) }else if(inValue.size<=0){ inValue("now_avg_space")="60" inValue("now_avg_speed")="60" } //把排队长度累加 if(inValue.size>0){ //(1)判断速度是否小于20,车间距小于15 if(Double.valueOf(inValue.get("now_avg_speed").getOrElse("0"))<20 && Double.valueOf(inValue.get("now_avg_space").getOrElse("0"))<15){ //(2)摄像头 realTQL+=startDist-Double.valueOf(x.split("-").apply(1)) }else{ //上游某一个摄像头不满足则退出本论预算,实时排队长度为之前满足的部分之和 break; } } }) } } else{ //没有上游摄像头则为300米 realTQL=300 } // 取消告警的条件:(实时排队长度-预测排队长度)/预测排队长度 <=0.3;持续三次测取消告警; //单位,千米 val isWarning=Math.abs(scala.math.round(((realTQL/1000-Double.valueOf(x))/Double.valueOf(x)))) if(isWarning<=0.3 ||realTQL<=300){ numberWN+=1 } if(numberWN==3){ boo=false } if(boo){//任务取消时候则把任务状态更新 datamap("state_len")="1" }else{ datamap("state_len")="0" } datamap("timelen")=t datamap("act_len_que")=(realTQL/1000.0).toString() datamap("pred_len_que")=x datamap("gz_time")=index1 println("datamap: "+datamap) updateCameraData(datamap) TimeUnit.MINUTES.sleep(1)//1分钟后持续跟踪 }else{//不满足条件 boo=false } }while(boo) } } //Ascii码转char def asciiToString(value:String) :String={ val sbu:StringBuffer = new StringBuffer(); val chars:Array[String]=value.split(",") for (i ← 0 to chars.length-1) { sbu.append(Integer.parseInt(chars.apply(i)).toChar) } return sbu.toString(); } /** * 连接Hbase,拿到摄像头数据 * carId:rowkey */ def getHbase(carId:String):mutable.Map[String, String]={ var map: mutable.Map[String, String] = mutable.Map() try{ val conf = HBaseConfiguration.create() val hConn = ConnectionFactory.createConnection(conf) //通过表名得到想要查询的表 val hTable = hConn.getTable(TableName.valueOf(TABLENAME)) //导入隐式转换 import scala.collection.JavaConverters._ val get:Get = new Get(Bytes.toBytes(carId)) get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("car_num")) //流量 get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("now_avg_speed")) //速度 get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("time")) //时间 get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("now_avg_densit"))//密度 get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("now_avg_space"))//车间距 val result= hTable.get(get) var cells:Buffer[Cell]=null if(!result.isEmpty()){ //如果result没有返回结果则不解析字段 cells=result.listCells().asScala } if (cells != null) { for (kv <- cells) { //每个单元格含有 列族 列名 列值 //获取列名-获取列值 map(Bytes.toString(CellUtil.cloneQualifier(kv)))=Bytes.toString(CellUtil.cloneValue(kv)) } } hTable.close() hConn.close() }catch { case e: Exception => { e.printStackTrace() } } (map) } //mysql排队长度信息添加 def updateCameraData(map:mutable.Map[String, String]){ var connection:Connection=null var pstm:CallableStatement=null try { connection=DruidConnectionPool.getConnection //开启事物,不自动提交 connection.setAutoCommit(false) val sql:String="insert into jzwdata.warning_record_sum(uuid,state_len,warning_event,happen_time,location_cam,timelen,act_len_que,pred_len_que,gz_time) values(?,?,?,?,?,?,?,?,?)" //将计算好的聚合数据写入Mysql,t_worcount表 pstm=connection.prepareCall(sql) pstm.setString(1,map.get("uuid").get) // 任务分组ID pstm.setString(2,map.get("state_len").getOrElse("0")) // 任务状态(0:解除报警 1:正常报警) pstm.setString(3,map.get("event").getOrElse("停车事件")) // 事件类型(停车事件) pstm.setString(4,map.get("startTime").get) // 预警发生时刻 pstm.setString(5,map.get("carmID").get) // 位置(摄像头编号) pstm.setString(6,map.get("timelen").get) // 已持续时长 pstm.setString(7,map.get("act_len_que").get) // 当前排队长度 pstm.setString(8,map.get("pred_len_que").get) // 预测排队长度 pstm.setString(9,map.get("gz_time").get) // 预警跟踪时间 pstm.executeUpdate()//(数据量少)提交 //提交事务 connection.commit() } catch { case e: Exception => { e.printStackTrace() //回滚事务 connection.rollback() } }finally{ //释放资源 if(pstm!=null){ pstm.close() } if(connection!=null){ connection.close()//连接还回连接池,并不是真正关闭 } } } //根据摄像头编号去mysql获取 //eum→1 lenque_cam:预测排队长度/实时排队长度摄像头选取 def getCameraData(carmID:String): mutable.Map[String, String]={ //获取一个数据库连接(适用数据库连接池) var connection:Connection=null var statement:PreparedStatement=null var resultSet:ResultSet=null var map: mutable.Map[String, String] = mutable.Map() try{ connection=DruidConnectionPool.getConnection // SQL语句 val querySql= "select cam_ID_ac,curr_cam,upst_cam,down_cam,cap_speed80,cam_zh,cam_7 from lenque_cam where cam_ID_ac =? limit 1;"; statement=connection.prepareStatement(querySql) statement.setString(1, carmID) resultSet=statement.executeQuery() //遍历结果集 while(resultSet.next()){ map.put("cam_ID_ac", resultSet.getString(1)) //哈工大的摄像头编号 map.put("curr_cam", resultSet.getString(2)) //当前区段摄像头 map.put("upst_cam", resultSet.getString(3)) //上游区段摄像头 map.put("down_cam", resultSet.getString(4)) //下游区段摄像头 map.put("cap_speed80", resultSet.getString(5)) //速度80对应的通行能力 map.put("cam_zh", resultSet.getString(6)) //桩号 map.put("cam_7", resultSet.getString(7)) //上游7路摄像头 } }catch{ case e: Exception => { throw new RuntimeException("查询历史在网车出现异常!") } } finally{ if(resultSet!=null){ resultSet.close() } if(statement!=null){ statement.close() } if(connection!=null){ connection.close() } } return map } }
- 案列二:
使用SparkCore模块从Redis拉取数据处理后存入Hbase数据库
package com.lg.bigdata.core import org.apache.spark.SparkConf import org.apache.spark.SparkContext import redis.clients.jedis.Jedis import java.util.Set import redis.clients.jedis.ScanParams import redis.clients.jedis.ScanResult import org.apache.spark.rdd.RDD import com.google.gson.Gson import java.util.Map import org.apache.hadoop.hbase.util.Bytes import com.google.gson.reflect.TypeToken import scala.collection.Seq import scala.collection.mutable.HashMap import java.text.SimpleDateFormat import java.util.Date import org.apache.spark.sql.DataFrame import scala.collection.mutable import com.lg.blgdata.utils.JedisConnectionPool import java.lang.Long import java.util.Calendar import java.lang.Double import scala.collection.mutable.ListBuffer import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.mapreduce.TableInputFormat import com.lg.bigdata.utils.JZWUtil /** * 一.1分钟数合成 * 从Redis合成分钟数据存入hbase * 要求: * (1)断面分类 * (2)车道划分 * * 二.原始数据格式 * C:摄像头编号 * T:时间 * L:车道(L→左,M→中,R→右) * N:车型(car→小轿车,bus→卡车) * S1:速度(默认km/h) * S2:车间距(默认m) */ object MinuteData { val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm") val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") //创建Hbase连接 val tableName="jzw_data:spot_jt_para_1min" val conf = HBaseConfiguration.create() //查询时候的输出类型 conf.set(TableInputFormat.INPUT_TABLE, tableName) def main(args: Array[String]): Unit = { //.setMaster("local[*]") val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MinuteData") //创建spark上下文对象 val sc = new SparkContext(config) //当前分钟开始 val sdate: Calendar = Calendar.getInstance(); sdate.add(Calendar.MINUTE, -1) // 当前时间减1分钟 val startMinute = sdf.format(sdate.getTime()) + ":00" val slong = format.parse(startMinute).getTime(); //结束时间 val edate: Calendar = Calendar.getInstance() val endMinute = sdf.format(edate.getTime()) + ":00" val elong = format.parse(endMinute).getTime() //获取一个jedis连接池 val jedis: Jedis = JedisConnectionPool.getConnections() jedis.select(0) //db,默认有16个 //导入隐式转换 import scala.collection.JavaConverters._ //所有摄像头的RDD,原始数据 val list = JZWUtil.getCameraId //gson转换把类型设置为String后时间戳不会被强制转换 val gson: Gson = new Gson() var rsListL = new ListBuffer[mutable.Map[String, String]]() var rsListM = new ListBuffer[mutable.Map[String, String]]() var rsListR = new ListBuffer[mutable.Map[String, String]]() list.foreach { rdd ⇒ { //获取一个摄像头key的最新100条数据(1分钟内数据目前没有超过这么多) //保留当前key最新的200条数据 val boorm:String=jedis.ltrim(rdd, 0, 200) //获取最右端的100条数据 val result: java.util.List[String] = jedis.lrange(rdd, 0, 150) val its: java.util.Iterator[String] = result.iterator() val carMaps = JZWUtil.contentMatch(rdd) if (carMaps != null) { //摄像头已经配置 val spacing = Double.parseDouble(carMaps.get("dZLength").get.toString()) //dZLength:检测单元距离 var booL: Boolean = false var booM: Boolean = false var booR: Boolean = false var mapL: mutable.Map[String, String] = mutable.Map() //L var mapM: mutable.Map[String, String] = mutable.Map() //M var mapR: mutable.Map[String, String] = mutable.Map() //R //默认左车流量(和) var sumL: Int = 0 // L var sumM: Int = 0 // M var sumR: Int = 0 // R //标准车流量PCU(和) var pcuL: Int = 0 var pcuM: Int = 0 var pcuR: Int = 0 //速度(和) var speedL: Double = 0 var speedM: Double = 0 var speedR: Double = 0 //车间距(和) var spacingL: Double = 0 var spacingM: Double = 0 var spacingR: Double = 0 while (its.hasNext()) { val coum: String = its.next() //数据扁平化 val map: Map[String, String] = gson.fromJson(coum, new TypeToken[Map[String, String]]() {}.getType()) //过滤出当前分钟内的数据,无效数据剔除 if (map.get("T") != null) { val times = Long.parseLong(map.get("T").toString()) if (times > 0 && slong <= times && times < elong) { //判断在时间段内 val laneType: String = map.get("L").toString() //车道类别 val carType: String = map.get("N").toString() //车型类别 var speed: Double = 0.0 //速度 if (map.get("S1") != null) { speed = Double.parseDouble(map.get("S1").toString()) } var spacing: Double = 0.0 //间距 if (map.get("S2") != null) { spacing = Double.parseDouble(map.get("S2").toString()) } if (laneType.equals("L")) { //左车道 sumL += 1 speedL += speed spacingL += spacing if (carType.equals("car")) { pcuL += 1 } else if (carType.equals("bus")) { pcuL += 2 } booL = true } if (laneType.equals("M")) { //中车道 sumM += 1 speedM += speed spacingM += spacing if (carType.equals("car")) { pcuM += 1 } else if (carType.equals("bus")) { pcuM += 2 } booM = true } if (laneType.equals("R")) { //右车道 sumR += 1 speedR += speed spacingR += spacing if (carType.equals("car")) { pcuR += 1 } else if (carType.equals("bus")) { pcuR += 2 } booR = true } } } } if (booL) { /* * rowkey设计: * 列:00111607048520000 * 001 :前三位为摄像头编号去'V' * 1 :第四位 (L:1 M:2 R:3) * 1607048520000:第四位之后的部分为精确到分钟的时间戳 */ mapL("key") = (rdd.replace("V", "") + "1" + elong).toString() //hbase rowkey mapL("car_num") = sumL.toString() //车流量 mapL("time") = endMinute //时间(2020-12-02 19:49:00) mapL("lane_position") = "L" //车道类别 mapL("now_flow_pcu") = pcuL.toString() //标准车流量PCU(平均) mapL("now_avg_speed") = (speedL / sumL).formatted("%.2f").toString() //平均速度(车道速度和/车道车辆数)(平均) mapL("now_avg_densit") = (1 /(spacingL / sumL/1000)).formatted("%.2f").toString() //当前车道密度(1km/平均车间距) mapL("now_avg_densit_pcu") = (1 /(spacingL / pcuL/1000)).formatted("%.2f").toString() //当前pcu车道密度(1km/平均车间距) mapL("now_avg_passTime") = scala.math.round(spacing / (speedL / sumL) * 60).toString() //通行时间(车道检测单元长度/平均速度*60) mapL("now_avg_TPI") = Math.abs(scala.math.round((1 - (speedL / sumL / 80)) * 10)).toString() //拥堵度((1-(平均速度/最高限速))*10) mapL("now_avg_space") = (spacingL / sumL).formatted("%.2f").toString() //车间距(平均) mapL("cameraId") = rdd rsListL.append(mapL) } if (booM) { mapM("key") = rdd.replace("V", "") + "2" + elong mapM("car_num") = sumM.toString() mapM("time") = endMinute mapM("lane_position") = "M" mapM("now_flow_pcu") = pcuM.toString() mapM("now_avg_speed") = (speedM / sumM).formatted("%.2f").toString() mapM("now_avg_densit") = (1 /( spacingM / sumM/1000)).formatted("%.2f").toString() mapM("now_avg_densit_pcu") = (1 /( spacingM / pcuM/1000)).formatted("%.2f").toString() mapM("now_avg_passTime") = scala.math.round(spacing / (speedM / sumM) * 60).toString() mapM("now_avg_TPI") = Math.abs(scala.math.round((1 - (speedM / sumM / 80)) * 10)).toString() mapM("now_avg_space") = (spacingM / sumM).formatted("%.2f").toString() mapM("cameraId") = rdd rsListM.append(mapM) } if (booR) { mapR("key") = rdd.replace("V", "") + "3" + elong mapR("car_num") = sumR.toString() mapR("time") = endMinute mapR("lane_position") = "R" mapR("now_flow_pcu") = pcuR.toString() mapR("now_avg_speed") = (speedR / sumR).formatted("%.2f").toString() mapR("now_avg_densit") = (1 / (spacingR / pcuR/1000)).formatted("%.2f").toString() mapR("now_avg_densit_pcu") = (1 / spacingR / sumR).formatted("%.2f").toString() mapR("now_avg_passTime") = scala.math.round(spacing / (speedR / sumR) * 60).toString() mapR("now_avg_TPI") = Math.abs(scala.math.round((1 - (speedR / sumR / 80)) * 10)).toString() mapR("now_avg_space") = (spacingR / sumR).formatted("%.2f").toString() mapR("cameraId") = rdd rsListR.append(mapR) } } } } /** * 求排队长度后入库hbase * 参数计算条件: * 1.当前检测断面和他上游的检测断面的车速小于5km/h,且车间距小于10m * 2.满足条件一的前提下:排队长度=(1-(相邻上游断面的平均速度/80))*当前检测断面与相邻检测断面的距离) */ rsListL.foreach(x ⇒ { //当前检测断面的检测断面的车速小于5km/h,且车间距小于10m var boos: Boolean = false val carMaps = JZWUtil.contentMatch(x.get("cameraId").get) if (Double.parseDouble(x.get("now_avg_speed").get) < 5 && Double.parseDouble(x.get("now_avg_space").get) < 10) { if (carMaps.get("cName").get != null) { //且他相邻上游的检测断面的车速小于5km/h,且车间距小于10m rsListL.foreach(y ⇒ { if (y.get("cameraId").get.equals(carMaps.get("cName").get)) { if (Double.parseDouble(y.get("now_avg_speed").get) < 5 && Double.parseDouble(y.get("now_avg_space").get) < 10) { boos = true } } }) } } if (boos) { val queueLength = ((1 - (Double.parseDouble(x.get("now_avg_speed").get) / 80)) * (Double.parseDouble(carMaps.get("cDLength").get)) ).formatted("%.2f").toString() x("nwo_len_que") = queueLength } else { x("nwo_len_que") = "0" } }) rsListM.foreach(x ⇒ { var boos: Boolean = false val carMaps = JZWUtil.contentMatch(x.get("cameraId").get) if (Double.parseDouble(x.get("now_avg_speed").get) < 5 && Double.parseDouble(x.get("now_avg_space").get) < 10) { if (carMaps.get("cName").get != null) { //且他相邻上游的检测断面的车速小于5km/h,且车间距小于10m rsListM.foreach(y ⇒ { if (y.get("cameraId").get.equals(carMaps.get("cName").get)) { if (Double.parseDouble(y.get("now_avg_speed").get) < 5 && Double.parseDouble(y.get("now_avg_space").get) < 10) { boos = true } } }) } } if (boos) { val queueLength = ((1 - (Double.parseDouble(x.get("now_avg_speed").get) / 80)) * (Double.parseDouble(carMaps.get("cDLength").get))).formatted("%.2f").toString() x("nwo_len_que") = queueLength } else { x("nwo_len_que") = "0" } }) rsListR.foreach(x ⇒ { var boos: Boolean = false val carMaps = JZWUtil.contentMatch(x.get("cameraId").get) if (Double.parseDouble(x.get("now_avg_speed").get) < 5 && Double.parseDouble(x.get("now_avg_space").get) < 10) { if (carMaps.get("cName").get != null) { //且他相邻上游的检测断面的车速小于5km/h,且车间距小于10m rsListR.foreach(y ⇒ { if (y.get("cameraId").get.equals(carMaps.get("cName").get)) { if (Double.parseDouble(y.get("now_avg_speed").get) < 5 && Double.parseDouble(y.get("now_avg_space").get) < 10) { boos = true } } }) } } if (boos) { val queueLength = ((1 - (Double.parseDouble(x.get("now_avg_speed").get) / 80)) * (Double.parseDouble(carMaps.get("cDLength").get)) * 1000).formatted("%.2f").toString() x("nwo_len_que") = queueLength } else { x("nwo_len_que") = "0" } }) rsListL = rsListL.++=(rsListM).++=(rsListR) val rdd = sc.parallelize(rsListL) val jobConf = new JobConf(conf) jobConf.setOutputFormat(classOf[TableOutputFormat])//输出数据的类型 jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName) rdd.foreach(println) val dataRDD=rdd.map(x⇒{ JZWUtil.convert(x,1) }) dataRDD.saveAsHadoopDataset(jobConf) sc.stop() } //查询指定库的key def getRedisKey(db: Int) = { val jedis: Jedis = JedisConnectionPool.getConnections() jedis.select(db) //db,默认有16个 val keys: Set[String] = jedis.keys("*") val it: java.util.Iterator[String] = keys.iterator() while (it.hasNext()) { println(it.next()) } } }
- 案列三:
使用SparkCore从Hbase拉取数据合成后存入Hbase另一个表
package com.lg.bigdata.core import org.apache.spark.SparkConf import org.apache.spark.SparkContext import java.text.SimpleDateFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.spark.rdd.RDD import org.apache.hadoop.hbase.io.ImmutableBytesWritable import redis.clients.jedis.ScanResult import java.util.Calendar import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.filter.CompareFilter import org.apache.hadoop.hbase.filter.RowFilter import org.apache.hadoop.hbase.filter.Filter import org.apache.hadoop.hbase.filter.BinaryPrefixComparator import com.lg.bigdata.utils.JZWUtil import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.hbase.filter.RegexStringComparator import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp import scala.collection.mutable import org.apache.hadoop.hbase.filter.FilterList import scala.collection.mutable.ListBuffer import java.lang.Double import org.apache.hadoop.mapred.JobConf import java.util.Date import java.lang.Long import org.apache.hadoop.hbase.client.Put /** * 一.预测5分钟数据合成 ,车道级 * 从hbase拿取5分钟车道数据合成5分钟车道预测数据存入hbase * 要求: * (1)流量 * (2)速度 * (3)密度 * (4)拥堵度 * * */ object PredMinute5Data { val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm") val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val fmtminute = new SimpleDateFormat("mm") //创建Hbase连接 val tableNameOutPut = "jzw_data:spot_jt_para_5min_pred" //预测5分钟数据合成 val tableNameInPut = "jzw_data:spot_jt_para_5min" val conf = HBaseConfiguration.create() //查询时候的输出类型 conf.set(TableInputFormat.INPUT_TABLE, tableNameInPut) def main(args: Array[String]): Unit = { //.setMaster("local[*]") val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("PredMinute5Data") //创建spark上下文对象 val sc = new SparkContext(config) //5分钟从HBase读取数据形成RDD val listId = JZWUtil.getCameraId val rowkey_5 = "." + getTime.get("rowkey_1").get //权重5 val rowkey_4 = "." + getTime.get("rowkey_2").get //权重4 val rowkey_3 = "." + getTime.get("rowkey_3").get //权重3 val rowkey_2 = "." + getTime.get("rowkey_4").get //权重2 val rowkey_1 = "." + getTime.get("rowkey_5").get //权重1 val scan = new Scan() scan.setCacheBlocks(false) scan.addFamily(Bytes.toBytes("info")) /* * 摄像头前缀获取数据, * MUST_PASS_ONE: 取并集 相当于or 操作 * MUST_PASS_ALL: 取交集 相当一and操作 */ val filterList: FilterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); val rowFilter1: RowFilter = new RowFilter( CompareFilter.CompareOp.EQUAL, new RegexStringComparator(rowkey_1)) filterList.addFilter(rowFilter1) val rowFilter2: RowFilter = new RowFilter( CompareFilter.CompareOp.EQUAL, new RegexStringComparator(rowkey_2)) filterList.addFilter(rowFilter2) val rowFilter3: RowFilter = new RowFilter( CompareFilter.CompareOp.EQUAL, new RegexStringComparator(rowkey_3)) filterList.addFilter(rowFilter3) val rowFilter4: RowFilter = new RowFilter( CompareFilter.CompareOp.EQUAL, new RegexStringComparator(rowkey_4)) filterList.addFilter(rowFilter4) val rowFilter5: RowFilter = new RowFilter( CompareFilter.CompareOp.EQUAL, new RegexStringComparator(rowkey_5)) filterList.addFilter(rowFilter5) scan.setFilter(filterList) //将scan类转化成string类型 val scan_str = TableMapReduceUtil.convertScanToString(scan) conf.set(TableInputFormat.SCAN, scan_str) val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD( conf, //配置文件 classOf[TableInputFormat], //classOf:取这个类的类型 classOf[ImmutableBytesWritable], //rowkey的类型 classOf[Result]) //结果value的类型 var resultDataL = new ListBuffer[mutable.Map[String, String]]() //L var resultDataM = new ListBuffer[mutable.Map[String, String]]() //M var resultDataR = new ListBuffer[mutable.Map[String, String]]() //R listId.foreach(x ⇒ { //循环摄像头 var rsListL = new ListBuffer[mutable.Map[String, String]]() var rsListM = new ListBuffer[mutable.Map[String, String]]() var rsListR = new ListBuffer[mutable.Map[String, String]]() val variableL = sc.broadcast(rsListL) val variableM = sc.broadcast(rsListM) val variableR = sc.broadcast(rsListR) hbaseRDD.foreach { case (rowkey, result) => val key: String = Bytes.toString(result.getRow) val time: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("time"))) val car_num: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("car_num"))) val lane_position: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("lane_position"))) val now_avg_speed: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_speed"))) val now_avg_densit: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_densit"))) val now_avg_passTime: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_passTime"))) val now_avg_TPI: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_TPI"))) val now_avg_space: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_space"))) val cameraId: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("cameraId"))) if(x.equals(cameraId)){ var mapL: mutable.Map[String, String] = mutable.Map() //L var mapM: mutable.Map[String, String] = mutable.Map() //M var mapR: mutable.Map[String, String] = mutable.Map() //R var carNum: Double = 0 if (car_num != null || !car_num.equals("0")) { //流量 carNum = Double.valueOf(car_num) } var nowAvgSpeedNum: Double = 0 if (now_avg_speed != null || !now_avg_speed.equals("0")) { //速度 nowAvgSpeedNum = Double.parseDouble(now_avg_speed) } var nowAvgDensitNum: Double = 0 if (now_avg_densit != null || !now_avg_densit.equals("0")) { //当前车道密度 nowAvgDensitNum = Double.parseDouble(now_avg_densit) } var nowAvgTPI: Double = 0 if (now_avg_TPI != null || !now_avg_TPI.equals("0")) { //拥堵度 nowAvgTPI = Double.parseDouble(now_avg_TPI) } if(lane_position.equals("L")){ mapL("time")=time //时间 mapL("car_num")=car_num //流量 mapL("now_avg_speed")=now_avg_speed //速度 mapL("now_avg_densit")=now_avg_densit //密度 } if(lane_position.equals("M")){ mapM("time")=time mapM("car_num")=car_num mapM("now_avg_speed")=now_avg_speed mapM("now_avg_densit")=now_avg_densit } if(lane_position.equals("R")){ mapR("time")=time mapR("car_num")=car_num mapR("now_avg_speed")=now_avg_speed mapR("now_avg_densit")=now_avg_densit } //把一路摄像头的左,中,右的信息保存入List if(mapL.size>0){ variableL.value.append(mapL) } if(mapM.size>0){ variableM.value.append(mapM) } if(mapR.size>0){ variableR.value.append(mapR) } } } //保存可用数据 var DataPL: mutable.Map[String, String] = mutable.Map() //L var DataPM: mutable.Map[String, String] = mutable.Map() //M var DataPR: mutable.Map[String, String] = mutable.Map() //R if(variableL.value.size>0){//L //时间升序 val dataL=variableL.value.sortWith{ case (a,b)=>{ Long.valueOf(format.parse(a.get("time").get.toString()).getTime())> Long.valueOf(format.parse(b.get("time").get.toString()).getTime()) } } //套用公式计算 //Ft=(5Xt-1+4Xt-2+3Xt-3+2Xt-4+Xt-5)/15 var carNumL:Int=6 //权重 var carDataL:Double=0 //流量值(分子) var speedL:Double=0 //速度值(分子) var densitL:Double=0 //密度值(分子) var rsNumL=0 //权重之和(分母) dataL.foreach(x⇒{ carNumL=carNumL-1 rsNumL+=carNumL carDataL+=Double.valueOf(x.get("car_num").get)*(carNumL) speedL+=Double.valueOf(x.get("now_avg_speed").get)*(carNumL) densitL+=Double.valueOf(x.get("now_avg_densit").get)*(carNumL) }) val car_numL:Double=Math.abs(Double.valueOf(scala.math.round(carDataL/rsNumL))) //流量(取整) val now_avg_speedL=(speedL/rsNumL).formatted("%.2f") //(速度)四舍五入保留两位小数 val now_avg_densitL=(densitL/rsNumL).formatted("%.2f") //(密度)四舍五入保留两位小数 DataPL("key")=x.replace("V", "") + "1" + getTime.get("rowkey_pred").get //hbase rowkey DataPL("time")=format.format(getTime.get("rowkey_pred").get) DataPL("car_num")= car_numL.toString() DataPL("now_avg_speed")=now_avg_speedL DataPL("now_avg_densit")= now_avg_densitL DataPL("now_avg_TPI") =Math.abs(((1 - (Double.valueOf(now_avg_speedL) / 80)) * 10)).formatted("%.2f") //拥堵度((1-(平均速度/最高限速))*10) DataPL("cameraId")=x //摄像头编号 DataPL("lane_position") = "L" //车道类别 resultDataL.append(DataPL) } if(variableM.value.size>0){//M val dataM=variableM.value.sortWith{ case (a,b)=>{ Long.valueOf(format.parse(a.get("time").get.toString()).getTime())> Long.valueOf(format.parse(b.get("time").get.toString()).getTime()) } } var carNumM:Int=6 //权重 var carDataM:Double=0 //流量值(分子) var speedM:Double=0 //速度值(分子) var densitM:Double=0 //密度值(分子) var rsNumM=0 //权重之和(分母) dataM.foreach(x⇒{ carNumM=carNumM-1 rsNumM+=carNumM carDataM+=Double.valueOf(x.get("car_num").get)*(carNumM) speedM+=Double.valueOf(x.get("now_avg_speed").get)*(carNumM) densitM+=Double.valueOf(x.get("now_avg_densit").get)*(carNumM) }) val car_numM:Double=Math.abs(Double.valueOf(scala.math.round(carDataM/rsNumM))) //流量(取整) val now_avg_speedM=(speedM/rsNumM).formatted("%.2f") //(速度)四舍五入保留两位小数 val now_avg_densitM=(densitM/rsNumM).formatted("%.2f") //(密度)四舍五入保留两位小数 DataPM("key")=x.replace("V", "") + "2" + getTime.get("rowkey_pred").get DataPM("time")=format.format(getTime.get("rowkey_pred").get) DataPM("car_num")= car_numM.toString() DataPM("now_avg_speed")=now_avg_speedM DataPM("now_avg_densit")= now_avg_densitM DataPM("now_avg_TPI") = Math.abs(((1 - (Double.valueOf(now_avg_speedM) / 80)) * 10)).formatted("%.2f") //拥堵度((1-(平均速度/最高限速))*10) DataPM("cameraId")=x DataPM("lane_position") = "M" //车道类别 resultDataM.append(DataPM) } if(variableR.value.size>0){//R val dataR=variableR.value.sortWith{ case (a,b)=>{ Long.valueOf(format.parse(a.get("time").get.toString()).getTime())> Long.valueOf(format.parse(b.get("time").get.toString()).getTime()) } } var carNumR:Int=6 //权重 var carDataR:Double=0 //流量值(分子) var speedR:Double=0 //速度值(分子) var densitR:Double=0 //密度值(分子) var rsNumR=0 //权重之和(分母) dataR.foreach(x⇒{ carNumR=carNumR-1 rsNumR+=carNumR carDataR+=Double.valueOf(x.get("car_num").get)*(carNumR) speedR+=Double.valueOf(x.get("now_avg_speed").get)*(carNumR) densitR+=Double.valueOf(x.get("now_avg_densit").get)*(carNumR) }) val car_numR:Double=Math.abs(Double.valueOf(scala.math.round(carDataR/rsNumR))) val now_avg_speedR=(speedR/rsNumR).formatted("%.2f") val now_avg_densitR=(densitR/rsNumR).formatted("%.2f") DataPR("key")=x.replace("V", "") + "3" + getTime.get("rowkey_pred").get DataPR("time")=format.format(getTime.get("rowkey_pred").get) DataPR("car_num")=car_numR.toString() DataPR("now_avg_speed")=now_avg_speedR DataPR("now_avg_densit")=now_avg_densitR DataPR("now_avg_TPI") =((1 - (Double.valueOf(now_avg_speedR)/ 80)) * 10).formatted("%.2f") DataPR("cameraId")=x DataPR("lane_position") = "R" //车道类别 resultDataR.append(DataPR) } }) //合并数据 resultDataL=resultDataL.++=(resultDataM).++=(resultDataR) val rdd = sc.parallelize(resultDataL) println("========") rdd.foreach(println) val dataRDD=rdd.map(x⇒{ convert(x) }) val jobConf = new JobConf(conf) jobConf.setOutputFormat(classOf[TableOutputFormat])//输出数据的类型 jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableNameOutPut) dataRDD.saveAsHadoopDataset(jobConf) sc.stop() } //计算出需要拿取的数据时间节点及预测的时间点 def getTime(): mutable.Map[String, Long] = { //计算出最新的5分钟时间节点 val date: Calendar = Calendar.getInstance() val indexMinute = sdf.format(date.getTime()) + ":00" var dt: String = null val minute = fmtminute.format(date.getTime) val rs: Int = Integer.valueOf(minute) / 5 if (Integer.valueOf(minute) % 5 != 0 && Integer.valueOf(minute) % 5 > 2) { val min = (rs * 5).toString() val builderDate = new StringBuilder(indexMinute).replace(14, 16, min) dt = builderDate.toString() } else { val min = ((rs * 5) - 5).toString() val builderDate = new StringBuilder(indexMinute).replace(14, 16, min) dt = builderDate.toString() } var map: mutable.Map[String, Long] = mutable.Map() //预测的时间点 val preddate: Calendar = Calendar.getInstance() preddate.setTime(format.parse(dt)) preddate.add(Calendar.MINUTE, +(2 * 5)) val Minute_pred = sdf.format(preddate.getTime()) + ":00" val dlong_pred: Long = format.parse(Minute_pred).getTime() map("rowkey_pred") = dlong_pred map("rowkey_1") = format.parse(dt).getTime().longValue() //5 val newdate: Calendar = Calendar.getInstance() newdate.setTime(format.parse(dt)) newdate.add(Calendar.MINUTE, -(1 * 5)) val Minute_2 = sdf.format(newdate.getTime()) + ":00" val dlong_2: Long = format.parse(Minute_2).getTime() map("rowkey_2") = dlong_2 newdate.add(Calendar.MINUTE, -(1 * 5)) val Minute_3 = sdf.format(newdate.getTime()) + ":00" val dlong_3: Long = format.parse(Minute_3).getTime() map("rowkey_3") = dlong_3 newdate.add(Calendar.MINUTE, -(1 * 5)) val Minute_4 = sdf.format(newdate.getTime()) + ":00" val dlong_4: Long = format.parse(Minute_4).getTime() map("rowkey_4") = dlong_4 newdate.add(Calendar.MINUTE, -(1 * 5)) val Minute_5 = sdf.format(newdate.getTime()) + ":00" val dlong_5: Long = format.parse(Minute_5).getTime() map("rowkey_5") = dlong_5 (map) } //定义往Hbase插入数据的方法 def convert(map: mutable.Map[String, String])= { //1.获取表对像 val put = new Put(Bytes.toBytes(map.get("key").get)) //rowkey //车流量 put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("car_num"), Bytes.toBytes(map.get("car_num").get)) //时间 put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("time"), Bytes.toBytes(map.get("time").get)) //车道 put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("lane_position"), Bytes.toBytes(map.get("lane_position").get)) //平均速度(车道速度和/车道车辆数)(平均) put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("now_avg_speed"), Bytes.toBytes(map.get("now_avg_speed").get)) //当前车道密度(1km/平均车间距) put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("now_avg_densit"), Bytes.toBytes(map.get("now_avg_densit").get)) //拥堵度((1-(平均速度/最高限速))*10) put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("now_avg_TPI"), Bytes.toBytes(map.get("now_avg_TPI").get)) //摄像头编号 put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("cameraId"), Bytes.toBytes(map.get("cameraId").get)) (new ImmutableBytesWritable, put) } }
- 案列四:
使用SparkCore模块从Redis拉取数据经过逻辑处理数据后存入Redis
package com.lg.blgdata.core import java.text.SimpleDateFormat import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.spark.SparkConf import java.util.Calendar import scala.collection.mutable import java.util.Date import org.apache.hadoop.hbase.mapreduce.TableInputFormat import com.lg.blgdata.utils.JedisConnectionPool import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.DataFrame import com.google.gson.Gson import com.google.gson.reflect.TypeToken import org.apache.spark.sql.Row import java.util.LinkedHashMap import com.alibaba.fastjson.JSON import org.apache.spark.SparkContext import scala.collection.mutable.ListBuffer import java.lang.Long import java.lang.Double import redis.clients.jedis.Pipeline import redis.clients.jedis.Jedis /** * 定时任务执行 * 1. 短时预测 * 从redis获取最新5分钟的数据合成:合成未来5个一分钟时间节点的流量 */ object WriteRedis { val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm") val fmtminute = new SimpleDateFormat("mm") val hourSdf = new SimpleDateFormat("yyyy-MM-dd HH") val daysdf = new SimpleDateFormat("yyyy-MM-dd") val fmtScornd = new SimpleDateFormat("ss") def main(args: Array[String]): Unit = { val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WriteRedis") //创建spark上下文对象 val sc = new SparkContext(config) //获取一个jedis连接池 val jedis: Jedis = JedisConnectionPool.getConnections() jedis.select(3) //db,默认有16个 val carRDD:List[String]=List("MV005","MV158") //分别拿到左右线的5分钟内车流量 var rsListL = new ListBuffer[mutable.Map[String, String]]() var rsListR = new ListBuffer[mutable.Map[String, String]]() carRDD.foreach(carId⇒{ val result: java.util.Map[String,String] = jedis.hgetAll(carId) val keys=result.keySet().toArray() for(key <- keys){ val thisDate=format.parse(key.toString()+":00").getTime().longValue() val startDate=getTime.get("sdate").get val endDate=getTime.get("edate").get if(startDate<=thisDate &&thisDate<endDate){//时间判断 if(carId.replace("M", "").equals("V005")){//左右线筛选 var DataPL: mutable.Map[String, String] = mutable.Map() DataPL("key")=key.toString() DataPL("sum")=key.toString() DataPL("num")=result.get(key).toString() rsListL.append(DataPL) } if(carId.replace("M", "").equals("V158")){//左右线筛选 var DataPR: mutable.Map[String, String] = mutable.Map() DataPR("key")=key.toString() DataPR("sum")=key.toString() DataPR("num")=result.get(key).toString() rsListR.append(DataPR) } } } }) //时间升序 var dataL=rsListL.sortWith{ case (a,b)=>{ Long.valueOf(format.parse(a.get("key").get.toString()+":00").getTime())< Long.valueOf(format.parse(b.get("key").get.toString()+":00").getTime()) } } var dataR=rsListR.sortWith{ case (a,b)=>{ Long.valueOf(format.parse(a.get("key").get.toString()+":00").getTime())< Long.valueOf(format.parse(b.get("key").get.toString()+":00").getTime()) } } var iL=1 var keyLS=0 //建的和 var valueLS=0 //值的和 var sumLS=0 //权重值的和 dataL.foreach{x⇒{ x("key")=(iL).toString() x("sum")=(iL*Integer.valueOf(x.get("num").get)).toString() keyLS+=iL iL=iL+1 valueLS+=Integer.valueOf(x.get("num").get) sumLS+=Integer.valueOf(x.get("sum").get) }} var L:Array[Int]=Array(keyLS,valueLS,sumLS) val bL:String=Double.valueOf((5*L(2)-L(0)*L(1))/(50.0)).formatted("%.2f")//求b的值 val yL:String=Double.valueOf(valueLS/5.0).formatted("%.2f") //求y的值 val aL=(Double.valueOf(yL)-3*Double.valueOf(bL)).formatted("%.2f") //求a的值 //以上求出A和B 带入公式S=a+bx,x=7,8,9,10,11 val st:List[Int]=List(7,8,9,10,11) var sListL = new ListBuffer[mutable.Map[String, String]]() var siL=1 st.foreach(x⇒{ var rsL: mutable.Map[String, String] = mutable.Map() rsL("sum")=Math.abs(scala.math.round(Double.valueOf(aL)+(Double.valueOf(bL)*x))).toString() rsL("time")=format.format(getTime.get("rowkey_"+siL).get) siL=siL+1 sListL.append(rsL) }) //<<<<<<<<<<<<<<<<<<<<<<<<>>>>>>>>>>>>>>>>>>>>>>>> var iR=1 var keyRS=0 //建的和 var valueRS=0 //值的和 var sumRS=0 //权重值的和 dataR.foreach{x⇒{ x("key")=(iR).toString() x("sum")=(iR*Integer.valueOf(x.get("num").get)).toString() keyRS+=iR iR=iR+1 valueRS+=Integer.valueOf(x.get("num").get) sumRS+=Integer.valueOf(x.get("sum").get) }} var R:Array[Int]=Array(keyRS,valueRS,sumRS) val bR:String=Double.valueOf((5*R(2)-R(0)*R(1))/(50.0)).formatted("%.2f") val yR:String=Double.valueOf(valueRS/5.0).formatted("%.2f") val aR=(Double.valueOf(yR)-3*Double.valueOf(bR)).formatted("%.2f") //求a的值 var sListR = new ListBuffer[mutable.Map[String, String]]() var siR=1 st.foreach(x⇒{ var rsR: mutable.Map[String, String] = mutable.Map() rsR("sum")=Math.abs(scala.math.round((Double.valueOf(aR)+Double.valueOf(bR)*x))).toString() rsR("time")=format.format(getTime.get("rowkey_"+siR).get) siR=siR+1 sListR.append(rsR) }) //开启redis的(pipeline)事务 var pipeline: Pipeline = null try { //开启pipeline pipeline=jedis.pipelined() pipeline.multi() //循环数据 sListL.foreach(x⇒{ pipeline.hset("predV005", x.get("time").get, x.get("sum").get) }) sListR.foreach(y⇒{ pipeline.hset("predV158", y.get("time").get, y.get("sum").get) }) //提交事务 pipeline.sync() pipeline.exec() } catch { case e: Exception => { e.printStackTrace() pipeline.discard()//放弃前面的操作 sc.stop() } }finally{ if(pipeline!=null){ pipeline.close() } if(jedis!=null){ jedis.close() } } //关闭 sc.stop() } def handlerMessageRow(jsonStr: String): Row = { import scala.collection.JavaConverters._ val array = JSON.parseObject(jsonStr, classOf[LinkedHashMap[String, Object]]).asScala.values.map(x => String.valueOf(x)).toArray Row(array: _*) } //计算出需要拿取的数据时间节点及预测的时间点 def getTime(): mutable.Map[String, Long] = { //计算出最新的5分钟时间节点 val date: Calendar = Calendar.getInstance() date.add(Calendar.MINUTE, +1) val rowkey_1 = sdf.format(date.getTime()) + ":00" var map: mutable.Map[String, Long] = mutable.Map() //预测的时间点 map("rowkey_1") = format.parse(rowkey_1).getTime().longValue() //thisTime加1分钟 date.add(Calendar.MINUTE, +1) val rowkey_2 = sdf.format(date.getTime()) + ":00" map("rowkey_2") = format.parse(rowkey_2).getTime().longValue() date.add(Calendar.MINUTE, +1) val rowkey_3 = sdf.format(date.getTime()) + ":00" map("rowkey_3") = format.parse(rowkey_3).getTime().longValue() date.add(Calendar.MINUTE, +1) val rowkey_4 = sdf.format(date.getTime()) + ":00" map("rowkey_4") = format.parse(rowkey_4).getTime().longValue() date.add(Calendar.MINUTE, +1) val rowkey_5 = sdf.format(date.getTime()) + ":00" map("rowkey_5") = format.parse(rowkey_5).getTime().longValue() val sdate: Calendar = Calendar.getInstance() val sd = sdf.format(sdate.getTime()) + ":00" map("edate") = format.parse(sd).getTime().longValue() sdate.add(Calendar.MINUTE, -(1 * 5)) val ed = sdf.format(sdate.getTime()) + ":00" map("sdate") = format.parse(ed).getTime().longValue() (map) } }