zoukankan      html  css  js  c++  java
  • SparkCore离线计算案列

    • 案列一:

           使用SparkCore模块监听UDP端口数据经过逻辑处理数据后存入myslq

            

    package com.lg.bigdata.core
    
    import org.apache.spark.SparkConf
    import com.alibaba.fastjson.JSON
    import java.util.LinkedHashMap
    import org.apache.spark.sql.Row
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.mapred.JobConf
    import org.apache.hadoop.hbase.mapred.TableOutputFormat
    import java.util.Calendar
    import java.text.SimpleDateFormat
    import org.apache.hadoop.hbase.client.Get
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.hbase.client.Table
    import org.apache.hadoop.hbase.client.ConnectionFactory
    import org.apache.hadoop.hbase.TableName
    import org.apache.hadoop.hbase.client.Result
    import org.apache.hadoop.hbase.Cell
    import org.apache.hadoop.hbase.CellUtil
    import org.apache.hadoop.util.StringUtils
    import scala.collection.mutable.ArrayBuffer
    import scala.collection.mutable
    import java.net.DatagramSocket
    import java.net.DatagramPacket
    import java.nio.ByteBuffer
    import java.sql.Connection
    import java.sql.PreparedStatement
    import java.sql.ResultSet
    import java.sql.CallableStatement
    import com.lg.blgdata.utils.DruidConnectionPool
    import scala.collection.mutable.Buffer
    import java.lang.Double
    import java.util.concurrent.Executors
    import java.util.concurrent.Callable
    import java.util.UUID
    import java.util.concurrent.TimeUnit
    import scala.util.control.Breaks._
    import org.apache.spark.SparkContext
    
    /** 
     *      一:此模块的功能(排队长度预测)
     *              作用:接收哈工大的事件信息结合我们自己的平台数据做排队长度预测
     *    UDP协议可解析的数据:事件发生时间,事件点位摄像头,事件类型,车道信息等其他信息未解析出      
     *    
     *    (1)计算预测排队长度
     *    (2)计算实时排队长度
     */
    object EventMonitorUdp {
    	val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm")
    	val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    	val TABLENAME = "jzw_data:section_1min"
    	
    	/**
    	 * 	可缓存线程池:
    	 *  newCachedThreadPool特点,
    	 *  	(1)创建数量几乎没有数量
    	 *  	(2)空闲的线程默认1分钟会自动终止,自动线程回收
    	 *  	(3)注意任务数量,可能由于大量的任务同时运行导致系统瘫痪 
    	 */
    	val pool=Executors.newCachedThreadPool()
    	
    	def main(args: Array[String]): Unit = {
    	    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("EventMonitorUdp")
    
          //创建spark上下文对象
          val sc = new SparkContext(config)
    
    	
    	    var ds:DatagramSocket = null
    	    try { //1.创建接收方对象,指定那个端口接收数据
    			    ds= new DatagramSocket(9101)
    					val buf:Array[scala.Byte]=new Array(700)
    			
    					while(true) { 
    						   val sTime:Array[scala.Byte]=new Array(18)
    
    								//2.创建一个数据包,指定数据 
    								val dp:DatagramPacket = new DatagramPacket(buf,buf.length)
    
    								//3.接收数据,阻塞方法 
    								ds.receive(dp) 
    
      						  //实际dp不一定有1024个字节 //获取收到的数据 
      						  val data:Array[scala.Byte] =dp.getData()
    
    								//(1)协议类型 :1 (2)消息类型:交通事件
    								if(data.apply(2)==1  && data.apply(3)==2){ 
      									//一.解析出发生时间 
      									Array.copy(data, 16, sTime,0, 18)//源数组,开始下标,目标数组,目标数组开始下标,目标数组结束下标
      									val stbuf:StringBuffer = new StringBuffer()
      									sTime.foreach(x⇒{
      									    stbuf.append(x.toString()+",")
      									})
      									val startTime=asciiToString(stbuf.toString())
      
      									//时间格式化
      									val buffer:StringBuffer=new	StringBuffer(startTime)
      									buffer.insert(4,"-") 
      									.insert(7,"-") 
      									.insert(10," ")
      									.insert(13,":") 
      									.insert(16,":"); 
      									val fmtTime:String=buffer.toString().substring(0,19)
    									
    										//二.解析到摄像头编号 
    									  val camIdByte:Array[scala.Byte]=new Array(4); 
    									  System.arraycopy(data, 10,camIdByte, 0, 4); 
                				val camId:Int =ByteBuffer.wrap(camIdByte).getInt()+1
                				
                				//摄像头编号补三位 
                				var carmID:String=""
                				if(String.valueOf(camId).length()==1) {
                					carmID="V00"+camId
                				} 
                				if(String.valueOf(camId).length()==2) {
                					carmID="V0"+camId 
                				} 
                				if(String.valueOf(camId).length()==3) {
                					carmID="V"+camId 
                				}
    
                	      //三.事件类型 data[52]+","+data[53],第一位都是0 
                				//只有停车事件符合条件:1 
                				val event:Int=data.apply(53)
                
                				//四.车道 1~10 
                				val lane:Int=data.apply(15)
                				
                			//条件一:目前只监听停车事件 则进入条件二 
              				if(event==1) {
              					if(carmID.length()>0) {
              						var map: mutable.Map[String, String] =getCameraData(carmID)
              						if(map.size>0) {
                						val uuid:String=UUID.randomUUID().toString().replaceAll("-", "")
                						var datamap: mutable.Map[String, String] =mutable.Map()
                						    datamap("uuid")=uuid
                						    datamap("startTime")=fmtTime
                						    datamap("event")="停车事件"
                						    datamap("carmID")=map.get("curr_cam").get
                				     pool.execute(new ThreadHGD(map,datamap))
              						}
              					} 
              			}
    							}
    					}
    			    pool.shutdown()
    			   }catch { 
    			     case e: Exception => {
    									e.printStackTrace()
    									pool.shutdown()
    									ds.close()//关闭连接
    							}
    		    }
    			   
    			   sc.stop()
    	}
    	
    	/**
    	 * map: 			mysql获取到的摄像头信息等
    	 * datamap:   uuid→uuid,time→时间 , event→"停车事件",carmID→摄像头ID
    	 * 
    	 */
    	class ThreadHGD(map: mutable.Map[String, String],datamap: mutable.Map[String, String]) extends Runnable{
    	  
    	               override def run(){
    	                 //记录是否满足预测拍对长度
    	                 var thbool:Boolean=false
    	                 
    	                 //此状态是判断是否持续报警
    	                 var boo:Boolean=true
    	                 
    	                 //记录是否是第一次1此进入
    	                 var num:Int=0
    	                 
    	                 //持续时间
    	                 var t:String=null 
    	                 
    	                 //预测排队长度
    	                 var x:String=null
    	                 
    	                 //记录本预警任务的任务取消的触发次数,三次则取消
    	                 var numberWN:Int=0
    	                 
    	                 do {
    	                      val date:Calendar = Calendar.getInstance()
                  					val index1:String = sdf.format(date.getTime()) + ":00"
                  					
                  					date.add(Calendar.MINUTE, -1);
                  					val index2:String = sdf.format(date.getTime()) + ":00"
                  					
                  					//当前摄像头当前分钟/上一分钟的数据(流量,速度)
                  					var thisValue:mutable.Map[String, String]=mutable.Map()
                  					
                  					//上游摄像头当前分钟/上一分钟的数据流量,速度)
                  					var upstValue:mutable.Map[String, String]=mutable.Map()
                  					
                  				//Hbase获取(流量,速度)
                  				//取当前摄像头当前分钟的数据
                  					println(index1+"=="+index2+"=="+map)
                  				  //curr_cam当前区段摄像头
                      			//upst_cam上游区段摄像头
                  					val rowkeyt1:String = format.parse(index1).getTime() + map.get("curr_cam").get.replace("V", "")
                  					 thisValue=getHbase(rowkeyt1)
                  					
                  					   //当前分钟的当前摄像头没有数据则取上一分钟
                  					   if(thisValue.size<=0){
                  					     val rowkeyt2:String = format.parse(index2).getTime() + map.get("curr_cam").get.replace("V", "")
                  					     thisValue=getHbase(rowkeyt2)
                  					   }
                  					   
                  					  //取上游摄像头当前分钟的数据
                  					  //上游摄像头
                  						val rowkeyc1:String = format.parse(index1).getTime() + map.get("upst_cam").get.replace("V", "")
                  					  upstValue=getHbase(rowkeyc1)
                  					   
                  					   //当前分钟的上游摄像头没有数据则取上一分钟
                  					   if(upstValue.size<=0){
                      					     val rowkeyc2:String = format.parse(index2).getTime() +map.get("upst_cam").get.replace("V", "")
                      					     upstValue=getHbase(rowkeyc2)
                  					   }
                        	  //条件二:当前摄像头的流量大于42辆车则进入条件三
                        	  if(thisValue.size>0){
                            	     if(Double.valueOf(upstValue.get("car_num").getOrElse("0"))>35){
                            	          thbool=true
                            	     }
                        	  }
                        	  println("本次预警是否满足条件:   "+thbool)
                        	  println("当前摄像头thisValue:   "+thisValue)
                        	  println("上游摄像头upstValue:   "+upstValue)
                        	  
                        	  //产生告警
                  					if(thbool){
                        	  //一个告警只有第一次进入的时候才会进行排队长度预测
                        	      if(num==0){ 
                                	  //条件三:预测排队长度计算和预警判断
                                	  //(1) 计算当前摄像头的密度
                          					//事故时候的通行能力
                                	  val tc:Double=2900.0   
                                  	var   thisDensity=(tc/25.0).formatted("%.2f")
                                	
                                	  //(2) 计算事故解除后当前路段的密度
                                	  var afterDensity:String=(Double.valueOf(map.get("cap_speed80").getOrElse("0"))/80.0).formatted("%.2f")
            
                                	  //(3) 上游摄像头的密度(密度=流量/速度)
                                	   var now_avg_densit=((Double.valueOf(upstValue.get("car_num").getOrElse("0"))/Double.valueOf(upstValue.get("now_avg_speed").getOrElse("0")))*60).formatted("%.2f")
                                	  
                                	  //(4)集结波波速    
                                	  //(80(1-(上游摄像头的密度+当前摄像头的密度)/180))
                                	   var w1=(80*(1-(Double.valueOf(now_avg_densit)+Double.valueOf(thisDensity))/180.0)).formatted("%.2f")
                      					        //取绝对值
                                	      w1=Math.abs(Double.valueOf(w1)).formatted("%.2f")
                                	  
                      						  //(5)消散波波速
                                	  //(80(1-(事故解除后的密度+当前摄像头的密度)/180))
                                	  var w2=(80*(1-(Double.valueOf(afterDensity)+Double.valueOf(thisDensity))/180.0)).formatted("%.2f")
                                	       //取绝对值
                                	      w2=Math.abs(Double.valueOf(w2)).formatted("%.2f")
                                	  
                                	  //(6)最大排队长度时事故持续时间
                                	  //(消散波波速*(13/60))/(消散波波速-集结波波速)
                                	   t=((Double.valueOf(w2)*(13/60.0))/(Double.valueOf(w1)-Double.valueOf(w2))).formatted("%.2f")
                                	   t=Math.abs(Double.valueOf(t)).formatted("%.2f")
                                	   
                                	  //(6)事故引起的最大排队长度(预测排队长度)
                                	  //消散波波速*(最大排队长度时事故持续时间-(13/60))
                                	   x=(Double.valueOf(w2)*(Double.valueOf(t)-(13/60.0))).formatted("%.3f")
                                	   x=Math.abs(Double.valueOf(x)).toString()
                                	   num+=1
                        	      }
                        	      if(Double.valueOf(x)<0.1){//无需预警
                        	        return
                        	      }
                                
                        	      //条件四:实时排队长度(取消告警)
                                //获取上游7路摄像头
                                val cams:String=map.get("cam_7").getOrElse(null)
                                var realTQL:Double=0  //初始实时排队长度
                                
                                var startDist=Double.valueOf(map.get("cam_zh").getOrElse("0"))//本摄像头的桩号
                                
                                if(cams!=null){//如果有上游摄像头,则获取没路摄像头的数据(速度,车间距离)
                                  val Lists:Array[String]=cams.split("、")
                                  
                                 //顺序循环上游摄像头,不满足则return
                              breakable{
                                  Lists.foreach(x⇒{
                                    //装载上游记录摄像头的速度和车间距离
                                    var inValue:mutable.Map[String, String]=mutable.Map()
                                    val rkt1:String = format.parse(index1).getTime() + x.split("-").apply(0).replace("V", "")
                  					        inValue=getHbase(rkt1)
                  					        if(inValue.size<=0){
                  					            val rkt2:String = format.parse(index2).getTime() + x.split("-").apply(0).replace("V", "")
                  					            inValue=getHbase(rkt2)
                  					        }else if(inValue.size<=0){
                    					          inValue("now_avg_space")="60"
                    					        	inValue("now_avg_speed")="60"
                  					        }
                                    //把排队长度累加
                                    if(inValue.size>0){
                                      //(1)判断速度是否小于20,车间距小于15
                                      if(Double.valueOf(inValue.get("now_avg_speed").getOrElse("0"))<20 &&
                                          Double.valueOf(inValue.get("now_avg_space").getOrElse("0"))<15){
                                        //(2)摄像头
                                        realTQL+=startDist-Double.valueOf(x.split("-").apply(1))
                                      }else{
                                         //上游某一个摄像头不满足则退出本论预算,实时排队长度为之前满足的部分之和
                                         break;
                                      }
                                    }
                                  })
                                }
                              } else{
                                //没有上游摄像头则为300米
                                 realTQL=300
                              }
                        	      
                        	    // 取消告警的条件:(实时排队长度-预测排队长度)/预测排队长度 <=0.3;持续三次测取消告警; 
                              //单位,千米
                               val isWarning=Math.abs(scala.math.round(((realTQL/1000-Double.valueOf(x))/Double.valueOf(x))))
                               if(isWarning<=0.3 ||realTQL<=300){
                                 numberWN+=1
                               }
                               
                               if(numberWN==3){
                                 boo=false
                               }
                               
                        	     if(boo){//任务取消时候则把任务状态更新
                        	         datamap("state_len")="1"
                        	     }else{
                        	         datamap("state_len")="0"
                        	     }
                        	      datamap("timelen")=t
                        	    	datamap("act_len_que")=(realTQL/1000.0).toString()
                        	    	datamap("pred_len_que")=x
                        	    	datamap("gz_time")=index1
                        	    	println("datamap:  "+datamap)
                        	    	updateCameraData(datamap)
                                TimeUnit.MINUTES.sleep(1)//1分钟后持续跟踪
                  				}else{//不满足条件
                  				    boo=false
                  				}	  
    	                 }while(boo)
    	               }		
    	}
    	
      //Ascii码转char
    	def asciiToString(value:String) :String={
    			val sbu:StringBuffer = new StringBuffer();
    	    val chars:Array[String]=value.split(",")
    			for (i ← 0 to chars.length-1) {
    				sbu.append(Integer.parseInt(chars.apply(i)).toChar)
    			}
    	    return sbu.toString();
    	}
    
    	
    	
    	/**
      	 *   连接Hbase,拿到摄像头数据
        * carId:rowkey
      	*/
    	def getHbase(carId:String):mutable.Map[String, String]={
          
    	    var map: mutable.Map[String, String] = mutable.Map()
    	  try{
    			    val conf = HBaseConfiguration.create()
    					val hConn = ConnectionFactory.createConnection(conf)
    
    					//通过表名得到想要查询的表
    					val hTable = hConn.getTable(TableName.valueOf(TABLENAME))
    					//导入隐式转换
    					import scala.collection.JavaConverters._
    					
    					val get:Get = new Get(Bytes.toBytes(carId))
    					get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("car_num"))       //流量
    					get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("now_avg_speed")) //速度
    					get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("time"))          //时间
    					get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("now_avg_densit"))//密度
    					get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("now_avg_space"))//车间距
    					
    					val result= hTable.get(get)
    			    var cells:Buffer[Cell]=null
    					if(!result.isEmpty()){ //如果result没有返回结果则不解析字段
    					  cells=result.listCells().asScala
    					}
    					if (cells != null) {	
    										for (kv <- cells) {
    													//每个单元格含有 列族 列名  列值
    													//获取列名-获取列值
    													map(Bytes.toString(CellUtil.cloneQualifier(kv)))=Bytes.toString(CellUtil.cloneValue(kv))
    										}
    					}
    			hTable.close()
    			hConn.close()
    	  }catch { 
    			     case e: Exception => {
    												e.printStackTrace()
    											}
    		 }
    		(map)
    }
    	//mysql排队长度信息添加
    	def updateCameraData(map:mutable.Map[String, String]){
    	  	var connection:Connection=null
    			var pstm:CallableStatement=null
    			
    			try {
    							connection=DruidConnectionPool.getConnection
    							//开启事物,不自动提交
    							connection.setAutoCommit(false)
          
            	    val sql:String="insert into jzwdata.warning_record_sum(uuid,state_len,warning_event,happen_time,location_cam,timelen,act_len_que,pred_len_que,gz_time) values(?,?,?,?,?,?,?,?,?)"
    				      
            	    //将计算好的聚合数据写入Mysql,t_worcount表
    							pstm=connection.prepareCall(sql)
    							pstm.setString(1,map.get("uuid").get)                   // 任务分组ID
    							pstm.setString(2,map.get("state_len").getOrElse("0"))   // 任务状态(0:解除报警 1:正常报警)
    							pstm.setString(3,map.get("event").getOrElse("停车事件"))  // 事件类型(停车事件)
    							pstm.setString(4,map.get("startTime").get)              // 预警发生时刻
    							pstm.setString(5,map.get("carmID").get)                 // 位置(摄像头编号)
    							pstm.setString(6,map.get("timelen").get)                // 已持续时长
    							pstm.setString(7,map.get("act_len_que").get)            // 当前排队长度
    							pstm.setString(8,map.get("pred_len_que").get)           // 预测排队长度
    							pstm.setString(9,map.get("gz_time").get)                // 预警跟踪时间
    							
    							pstm.executeUpdate()//(数据量少)提交
    							
    							//提交事务
    							connection.commit()
    
    			  } catch {
    										case e: Exception => {
    											e.printStackTrace()
    											//回滚事务
    											connection.rollback()
    										} 
    							}finally{
    											//释放资源
    											if(pstm!=null){
    												pstm.close()
    											}
    											
    											if(connection!=null){
    												connection.close()//连接还回连接池,并不是真正关闭
    											}
    							}				
    	}
    	
    	//根据摄像头编号去mysql获取
    	//eum→1  lenque_cam:预测排队长度/实时排队长度摄像头选取
    	def getCameraData(carmID:String): mutable.Map[String, String]={
              //获取一个数据库连接(适用数据库连接池)
    							var connection:Connection=null
                  var statement:PreparedStatement=null
                  var resultSet:ResultSet=null
                  var map: mutable.Map[String, String] = mutable.Map()
    		   try{
                    connection=DruidConnectionPool.getConnection
                    // SQL语句
                     val querySql= "select cam_ID_ac,curr_cam,upst_cam,down_cam,cap_speed80,cam_zh,cam_7 from lenque_cam where cam_ID_ac =? limit 1;";
        		        statement=connection.prepareStatement(querySql)
                    statement.setString(1, carmID)
    							  resultSet=statement.executeQuery()
    							  
      							 //遍历结果集
        						while(resultSet.next()){
            									map.put("cam_ID_ac", resultSet.getString(1))   //哈工大的摄像头编号
                      				map.put("curr_cam", resultSet.getString(2))    //当前区段摄像头
                      				map.put("upst_cam", resultSet.getString(3))    //上游区段摄像头     
                      				map.put("down_cam", resultSet.getString(4))    //下游区段摄像头
                      				map.put("cap_speed80", resultSet.getString(5)) //速度80对应的通行能力
                      				map.put("cam_zh", resultSet.getString(6))      //桩号
                      				map.put("cam_7", resultSet.getString(7))       //上游7路摄像头
        						}
    							 
    				 }catch{
                					case e: Exception => {
                						throw new RuntimeException("查询历史在网车出现异常!")
                				  }
    					} finally{
                						if(resultSet!=null){
                							resultSet.close()
                						}
                						if(statement!=null){
                							statement.close()
                						}
                						if(connection!=null){
                							connection.close()
                						}
    			 }
    			return map
    	}
    }
    

      

    • 案列二:

           使用SparkCore模块从Redis拉取数据处理后存入Hbase数据库

            

    package com.lg.bigdata.core
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import redis.clients.jedis.Jedis
    import java.util.Set
    import redis.clients.jedis.ScanParams
    import redis.clients.jedis.ScanResult
    import org.apache.spark.rdd.RDD
    import com.google.gson.Gson
    import java.util.Map
    import org.apache.hadoop.hbase.util.Bytes
    import com.google.gson.reflect.TypeToken
    import scala.collection.Seq
    import scala.collection.mutable.HashMap
    import java.text.SimpleDateFormat
    import java.util.Date
    import org.apache.spark.sql.DataFrame
    import scala.collection.mutable
    import com.lg.blgdata.utils.JedisConnectionPool
    import java.lang.Long
    import java.util.Calendar
    import java.lang.Double
    import scala.collection.mutable.ListBuffer
    import org.apache.hadoop.hbase.client.Put
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.mapred.JobConf
    import org.apache.hadoop.hbase.mapred.TableOutputFormat
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import com.lg.bigdata.utils.JZWUtil
    
    /**
     *    一.1分钟数合成
     * 	从Redis合成分钟数据存入hbase
     *         要求:
     *         (1)断面分类
     *                       (2)车道划分
     *
     *  二.原始数据格式
     *         C:摄像头编号
     *         T:时间
     *         L:车道(L→左,M→中,R→右)
     *         N:车型(car→小轿车,bus→卡车)
     *         S1:速度(默认km/h)
     *         S2:车间距(默认m)
     */
    object MinuteData {
      val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm")
      val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      //创建Hbase连接
      val tableName="jzw_data:spot_jt_para_1min"
      val conf = HBaseConfiguration.create()
          //查询时候的输出类型
          conf.set(TableInputFormat.INPUT_TABLE, tableName)
      
      def main(args: Array[String]): Unit = {
        //.setMaster("local[*]")
        val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MinuteData")
        
        //创建spark上下文对象
        val sc = new SparkContext(config)
        
        //当前分钟开始
        val sdate: Calendar = Calendar.getInstance();
        sdate.add(Calendar.MINUTE, -1) // 当前时间减1分钟
        val startMinute = sdf.format(sdate.getTime()) + ":00"
        val slong = format.parse(startMinute).getTime();
    
        //结束时间
        val edate: Calendar = Calendar.getInstance()
        val endMinute = sdf.format(edate.getTime()) + ":00"
        val elong = format.parse(endMinute).getTime()
        
        //获取一个jedis连接池
        val jedis: Jedis = JedisConnectionPool.getConnections()
        jedis.select(0) //db,默认有16个
    
        //导入隐式转换
        import scala.collection.JavaConverters._
    
        //所有摄像头的RDD,原始数据
        val list = JZWUtil.getCameraId
    
        //gson转换把类型设置为String后时间戳不会被强制转换
        val gson: Gson = new Gson()
    
        var rsListL = new ListBuffer[mutable.Map[String, String]]()
        var rsListM = new ListBuffer[mutable.Map[String, String]]()
        var rsListR = new ListBuffer[mutable.Map[String, String]]()
    
        list.foreach {
          rdd ⇒
            { //获取一个摄像头key的最新100条数据(1分钟内数据目前没有超过这么多)
            //保留当前key最新的200条数据
            val boorm:String=jedis.ltrim(rdd, 0, 200)
            
              //获取最右端的100条数据
              val result: java.util.List[String] = jedis.lrange(rdd, 0, 150)
              val its: java.util.Iterator[String] = result.iterator()
              val carMaps = JZWUtil.contentMatch(rdd)
    
              if (carMaps != null) { //摄像头已经配置
                val spacing = Double.parseDouble(carMaps.get("dZLength").get.toString()) //dZLength:检测单元距离
                var booL: Boolean = false
                var booM: Boolean = false
                var booR: Boolean = false
    
                var mapL: mutable.Map[String, String] = mutable.Map() //L
                var mapM: mutable.Map[String, String] = mutable.Map() //M
                var mapR: mutable.Map[String, String] = mutable.Map() //R
    
                //默认左车流量(和)
                var sumL: Int = 0 // L
                var sumM: Int = 0 // M
                var sumR: Int = 0 // R
    
                //标准车流量PCU(和)
                var pcuL: Int = 0
                var pcuM: Int = 0
                var pcuR: Int = 0
    
                //速度(和)
                var speedL: Double = 0
                var speedM: Double = 0
                var speedR: Double = 0
    
                //车间距(和)
                var spacingL: Double = 0
                var spacingM: Double = 0
                var spacingR: Double = 0
    
                while (its.hasNext()) {
                  val coum: String = its.next()
                  
                  //数据扁平化
                  val map: Map[String, String] = gson.fromJson(coum, new TypeToken[Map[String, String]]() {}.getType())
                  //过滤出当前分钟内的数据,无效数据剔除
              
                  if (map.get("T") != null) {
                    val times = Long.parseLong(map.get("T").toString())
                    
                    if (times > 0 && slong <= times && times < elong) { //判断在时间段内
    
                      val laneType: String = map.get("L").toString() //车道类别
    
                      val carType: String = map.get("N").toString() //车型类别
    
                      var speed: Double = 0.0 //速度
                      if (map.get("S1") != null) {
                        speed = Double.parseDouble(map.get("S1").toString())
                      }
    
                      var spacing: Double = 0.0 //间距
                      if (map.get("S2") != null) {
                        spacing = Double.parseDouble(map.get("S2").toString())
                      }
    
                      if (laneType.equals("L")) { //左车道
                        sumL += 1
                        speedL += speed
                        spacingL += spacing
                        if (carType.equals("car")) {
                          pcuL += 1
                        } else if (carType.equals("bus")) {
                          pcuL += 2
                        }
                        booL = true
                      }
    
                      if (laneType.equals("M")) { //中车道
                        sumM += 1
                        speedM += speed
                        spacingM += spacing
                        if (carType.equals("car")) {
                          pcuM += 1
                        } else if (carType.equals("bus")) {
                          pcuM += 2
                        }
                        booM = true
                      }
                      if (laneType.equals("R")) { //右车道
                        sumR += 1
                        speedR += speed
                        spacingR += spacing
                        if (carType.equals("car")) {
                          pcuR += 1
                        } else if (carType.equals("bus")) {
                          pcuR += 2
                        }
                        booR = true
                      }
                    }
                  }
                }
                if (booL) {
                              /*
    													 * rowkey设计:
    													   *        列:00111607048520000
    													 *   001 :前三位为摄像头编号去'V'
    													 *   1  :第四位 (L:1  M:2  R:3)
    													 *   1607048520000:第四位之后的部分为精确到分钟的时间戳
    													 */
                  mapL("key") = (rdd.replace("V", "") + "1" + elong).toString() //hbase rowkey
                  mapL("car_num") = sumL.toString() //车流量
                  mapL("time") = endMinute //时间(2020-12-02 19:49:00)
                  mapL("lane_position") = "L" //车道类别
                  mapL("now_flow_pcu") = pcuL.toString() //标准车流量PCU(平均)
                  mapL("now_avg_speed") = (speedL / sumL).formatted("%.2f").toString() //平均速度(车道速度和/车道车辆数)(平均)
                  mapL("now_avg_densit") = (1 /(spacingL / sumL/1000)).formatted("%.2f").toString() //当前车道密度(1km/平均车间距)
                  mapL("now_avg_densit_pcu") = (1 /(spacingL / pcuL/1000)).formatted("%.2f").toString() //当前pcu车道密度(1km/平均车间距)
                  mapL("now_avg_passTime") = scala.math.round(spacing / (speedL / sumL) * 60).toString() //通行时间(车道检测单元长度/平均速度*60)
                  mapL("now_avg_TPI") = Math.abs(scala.math.round((1 - (speedL / sumL / 80)) * 10)).toString() //拥堵度((1-(平均速度/最高限速))*10)
                  mapL("now_avg_space") = (spacingL / sumL).formatted("%.2f").toString() //车间距(平均)
                  mapL("cameraId") = rdd
                  rsListL.append(mapL)
                }
                if (booM) {
                  mapM("key") = rdd.replace("V", "") + "2" + elong
                  mapM("car_num") = sumM.toString()
                  mapM("time") = endMinute
                  mapM("lane_position") = "M"
                  mapM("now_flow_pcu") = pcuM.toString()
                  mapM("now_avg_speed") = (speedM / sumM).formatted("%.2f").toString()
                  mapM("now_avg_densit") = (1 /( spacingM / sumM/1000)).formatted("%.2f").toString()
                  mapM("now_avg_densit_pcu") = (1 /( spacingM / pcuM/1000)).formatted("%.2f").toString()
                  mapM("now_avg_passTime") = scala.math.round(spacing / (speedM / sumM) * 60).toString()
                  mapM("now_avg_TPI") = Math.abs(scala.math.round((1 - (speedM / sumM / 80)) * 10)).toString()
                  mapM("now_avg_space") = (spacingM / sumM).formatted("%.2f").toString()
                  mapM("cameraId") = rdd
                  rsListM.append(mapM)
                }
                if (booR) {
                  mapR("key") = rdd.replace("V", "") + "3" + elong
                  mapR("car_num") = sumR.toString()
                  mapR("time") = endMinute
                  mapR("lane_position") = "R"
                  mapR("now_flow_pcu") = pcuR.toString()
                  mapR("now_avg_speed") = (speedR / sumR).formatted("%.2f").toString()
                  mapR("now_avg_densit") = (1 / (spacingR / pcuR/1000)).formatted("%.2f").toString()
                  mapR("now_avg_densit_pcu") = (1 / spacingR / sumR).formatted("%.2f").toString()
                  mapR("now_avg_passTime") = scala.math.round(spacing / (speedR / sumR) * 60).toString()
                  mapR("now_avg_TPI") = Math.abs(scala.math.round((1 - (speedR / sumR / 80)) * 10)).toString()
                  mapR("now_avg_space") = (spacingR / sumR).formatted("%.2f").toString()
                  mapR("cameraId") = rdd
                  rsListR.append(mapR)
                }
              }
            }
        }
        /**
         * 求排队长度后入库hbase
         * 参数计算条件:
         *  1.当前检测断面和他上游的检测断面的车速小于5km/h,且车间距小于10m
         *  2.满足条件一的前提下:排队长度=(1-(相邻上游断面的平均速度/80))*当前检测断面与相邻检测断面的距离)
         */
    
         rsListL.foreach(x ⇒ {
          //当前检测断面的检测断面的车速小于5km/h,且车间距小于10m
          var boos: Boolean = false
          val carMaps = JZWUtil.contentMatch(x.get("cameraId").get)
          if (Double.parseDouble(x.get("now_avg_speed").get) < 5 && Double.parseDouble(x.get("now_avg_space").get) < 10) {
    
            if (carMaps.get("cName").get != null) {
              //且他相邻上游的检测断面的车速小于5km/h,且车间距小于10m
              rsListL.foreach(y ⇒ {
                if (y.get("cameraId").get.equals(carMaps.get("cName").get)) {
                  if (Double.parseDouble(y.get("now_avg_speed").get) < 5 && Double.parseDouble(y.get("now_avg_space").get) < 10) {
                    boos = true
                  }
                }
              })
            }
          }
          if (boos) {
            val queueLength = ((1 - (Double.parseDouble(x.get("now_avg_speed").get) / 80)) * (Double.parseDouble(carMaps.get("cDLength").get)) ).formatted("%.2f").toString()
            x("nwo_len_que") = queueLength
          } else {
            x("nwo_len_que") = "0"
          }
        })
    
       rsListM.foreach(x ⇒ {
          var boos: Boolean = false
          val carMaps = JZWUtil.contentMatch(x.get("cameraId").get)
          if (Double.parseDouble(x.get("now_avg_speed").get) < 5 && Double.parseDouble(x.get("now_avg_space").get) < 10) {
            if (carMaps.get("cName").get != null) {
              //且他相邻上游的检测断面的车速小于5km/h,且车间距小于10m
              rsListM.foreach(y ⇒ {
                if (y.get("cameraId").get.equals(carMaps.get("cName").get)) {
                  if (Double.parseDouble(y.get("now_avg_speed").get) < 5 && Double.parseDouble(y.get("now_avg_space").get) < 10) {
                    boos = true
                  }
                }
              })
            }
          }
          if (boos) {
            val queueLength = ((1 - (Double.parseDouble(x.get("now_avg_speed").get) / 80)) * (Double.parseDouble(carMaps.get("cDLength").get))).formatted("%.2f").toString()
            x("nwo_len_que") = queueLength
          } else {
            x("nwo_len_que") = "0"
          }
        })
    
        rsListR.foreach(x ⇒ {
          var boos: Boolean = false
          val carMaps = JZWUtil.contentMatch(x.get("cameraId").get)
          if (Double.parseDouble(x.get("now_avg_speed").get) < 5 && Double.parseDouble(x.get("now_avg_space").get) < 10) {
            if (carMaps.get("cName").get != null) {
              //且他相邻上游的检测断面的车速小于5km/h,且车间距小于10m
              rsListR.foreach(y ⇒ {
                if (y.get("cameraId").get.equals(carMaps.get("cName").get)) {
                  if (Double.parseDouble(y.get("now_avg_speed").get) < 5 && Double.parseDouble(y.get("now_avg_space").get) < 10) {
                    boos = true
                  }
                }
              })
            }
          }
          if (boos) {
            val queueLength = ((1 - (Double.parseDouble(x.get("now_avg_speed").get) / 80)) * (Double.parseDouble(carMaps.get("cDLength").get)) * 1000).formatted("%.2f").toString()
            x("nwo_len_que") = queueLength
          } else {
            x("nwo_len_que") = "0"
          }
        })
           rsListL = rsListL.++=(rsListM).++=(rsListR) 
        val rdd = sc.parallelize(rsListL)
        
         val jobConf = new JobConf(conf)
          	jobConf.setOutputFormat(classOf[TableOutputFormat])//输出数据的类型
          	jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
     
          	rdd.foreach(println)
        val dataRDD=rdd.map(x⇒{
          JZWUtil.convert(x,1)
        })
       
        dataRDD.saveAsHadoopDataset(jobConf)
        sc.stop()
      }
      
       //查询指定库的key
      def getRedisKey(db: Int) = {
        val jedis: Jedis = JedisConnectionPool.getConnections()
        jedis.select(db) //db,默认有16个
        val keys: Set[String] = jedis.keys("*")
        val it: java.util.Iterator[String] = keys.iterator()
        while (it.hasNext()) {
          println(it.next())
        }
      }
    }
    

      

    • 案列三:

           使用SparkCore从Hbase拉取数据合成后存入Hbase另一个表

            

    package com.lg.bigdata.core
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import java.text.SimpleDateFormat
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.mapred.TableOutputFormat
    import org.apache.spark.rdd.RDD
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import redis.clients.jedis.ScanResult
    import java.util.Calendar
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.hbase.client.Result
    import org.apache.hadoop.hbase.filter.CompareFilter
    import org.apache.hadoop.hbase.filter.RowFilter
    import org.apache.hadoop.hbase.filter.Filter
    import org.apache.hadoop.hbase.filter.BinaryPrefixComparator
    import com.lg.bigdata.utils.JZWUtil
    import org.apache.hadoop.hbase.client.Scan
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
    import org.apache.hadoop.hbase.filter.RegexStringComparator
    import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
    import scala.collection.mutable
    import org.apache.hadoop.hbase.filter.FilterList
    import scala.collection.mutable.ListBuffer
    import java.lang.Double
    import org.apache.hadoop.mapred.JobConf
    import java.util.Date
    import java.lang.Long
    import org.apache.hadoop.hbase.client.Put
    
    /**
     *    一.预测5分钟数据合成 ,车道级
     * 	从hbase拿取5分钟车道数据合成5分钟车道预测数据存入hbase
     *         要求:
     *        (1)流量
     *        (2)速度
     *        (3)密度
     *        (4)拥堵度
     *
     *
     */
    object PredMinute5Data {
    
      val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm")
      val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      val fmtminute = new SimpleDateFormat("mm")
    
      //创建Hbase连接
      val tableNameOutPut = "jzw_data:spot_jt_para_5min_pred" //预测5分钟数据合成
      val tableNameInPut = "jzw_data:spot_jt_para_5min"
      val conf = HBaseConfiguration.create()
    
      //查询时候的输出类型
      conf.set(TableInputFormat.INPUT_TABLE, tableNameInPut)
    
      def main(args: Array[String]): Unit = {
        //.setMaster("local[*]")
        val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("PredMinute5Data")
    
        //创建spark上下文对象
        val sc = new SparkContext(config)
    
        //5分钟从HBase读取数据形成RDD
        val listId = JZWUtil.getCameraId
        val rowkey_5 = "." + getTime.get("rowkey_1").get //权重5
        val rowkey_4 = "." + getTime.get("rowkey_2").get //权重4
        val rowkey_3 = "." + getTime.get("rowkey_3").get //权重3
        val rowkey_2 = "." + getTime.get("rowkey_4").get //权重2
        val rowkey_1 = "." + getTime.get("rowkey_5").get //权重1
    
        val scan = new Scan()
        scan.setCacheBlocks(false)
        scan.addFamily(Bytes.toBytes("info"))
    
        /*
    												 *   摄像头前缀获取数据,
    												 * MUST_PASS_ONE: 取并集 相当于or 操作
    												 * MUST_PASS_ALL: 取交集 相当一and操作
    												 */
        val filterList: FilterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
        val rowFilter1: RowFilter = new RowFilter(
          CompareFilter.CompareOp.EQUAL,
          new RegexStringComparator(rowkey_1))
        filterList.addFilter(rowFilter1)
    
        val rowFilter2: RowFilter = new RowFilter(
          CompareFilter.CompareOp.EQUAL,
          new RegexStringComparator(rowkey_2))
        filterList.addFilter(rowFilter2)
    
        val rowFilter3: RowFilter = new RowFilter(
          CompareFilter.CompareOp.EQUAL,
          new RegexStringComparator(rowkey_3))
        filterList.addFilter(rowFilter3)
    
        val rowFilter4: RowFilter = new RowFilter(
          CompareFilter.CompareOp.EQUAL,
          new RegexStringComparator(rowkey_4))
        filterList.addFilter(rowFilter4)
    
        val rowFilter5: RowFilter = new RowFilter(
          CompareFilter.CompareOp.EQUAL,
          new RegexStringComparator(rowkey_5))
        filterList.addFilter(rowFilter5)
    
        scan.setFilter(filterList)
    
        //将scan类转化成string类型
        val scan_str = TableMapReduceUtil.convertScanToString(scan)
        conf.set(TableInputFormat.SCAN, scan_str)
        val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
          conf, //配置文件
          classOf[TableInputFormat], //classOf:取这个类的类型
          classOf[ImmutableBytesWritable], //rowkey的类型
          classOf[Result]) //结果value的类型
          
         var resultDataL = new ListBuffer[mutable.Map[String, String]]() //L
         var resultDataM = new ListBuffer[mutable.Map[String, String]]() //M
         var resultDataR = new ListBuffer[mutable.Map[String, String]]() //R
        
       listId.foreach(x ⇒ { //循环摄像头
              var rsListL = new ListBuffer[mutable.Map[String, String]]()
              var rsListM = new ListBuffer[mutable.Map[String, String]]()
              var rsListR = new ListBuffer[mutable.Map[String, String]]()
            
              val variableL = sc.broadcast(rsListL)
              val variableM = sc.broadcast(rsListM)
              val variableR = sc.broadcast(rsListR)
              hbaseRDD.foreach {
                case (rowkey, result) =>
                  val key: String = Bytes.toString(result.getRow)
                  val time: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("time")))
                  val car_num: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("car_num")))
                  val lane_position: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("lane_position")))
                  val now_avg_speed: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_speed")))
                  val now_avg_densit: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_densit")))
                  val now_avg_passTime: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_passTime")))
                  val now_avg_TPI: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_TPI")))
                  val now_avg_space: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_space")))
                  val cameraId: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("cameraId")))
                  
                      if(x.equals(cameraId)){
                            var mapL: mutable.Map[String, String] = mutable.Map() //L
                            var mapM: mutable.Map[String, String] = mutable.Map() //M
                            var mapR: mutable.Map[String, String] = mutable.Map() //R
                            
                            var carNum: Double = 0
                            if (car_num != null || !car_num.equals("0")) { //流量
                              carNum = Double.valueOf(car_num)
                            }
                
                            var nowAvgSpeedNum: Double = 0
                            if (now_avg_speed != null || !now_avg_speed.equals("0")) { //速度
                              nowAvgSpeedNum = Double.parseDouble(now_avg_speed)
                            }
                
                            var nowAvgDensitNum: Double = 0
                            if (now_avg_densit != null || !now_avg_densit.equals("0")) { //当前车道密度
                              nowAvgDensitNum = Double.parseDouble(now_avg_densit)
                            }
                
                            var nowAvgTPI: Double = 0
                            if (now_avg_TPI != null || !now_avg_TPI.equals("0")) { //拥堵度
                              nowAvgTPI = Double.parseDouble(now_avg_TPI)
                            }
                            
                            if(lane_position.equals("L")){
                            	 mapL("time")=time                     //时间
                               mapL("car_num")=car_num               //流量
                            	 mapL("now_avg_speed")=now_avg_speed   //速度
                            	 mapL("now_avg_densit")=now_avg_densit //密度
                            	
                            	 
                            }
                            
                            if(lane_position.equals("M")){
                               mapM("time")=time
                            	 mapM("car_num")=car_num
                            	 mapM("now_avg_speed")=now_avg_speed
                            	 mapM("now_avg_densit")=now_avg_densit
                            }
                            
                            if(lane_position.equals("R")){
                               mapR("time")=time
                            	 mapR("car_num")=car_num
                            	 mapR("now_avg_speed")=now_avg_speed
                            	 mapR("now_avg_densit")=now_avg_densit
                            }
                            
                          //把一路摄像头的左,中,右的信息保存入List  
                         if(mapL.size>0){
                            variableL.value.append(mapL)
                         }
                         
                         if(mapM.size>0){
                            variableM.value.append(mapM)
                         }
                         
                         if(mapR.size>0){
                            variableR.value.append(mapR)
                         }
                      }
               
            }
               //保存可用数据
               var DataPL: mutable.Map[String, String] = mutable.Map() //L
               var DataPM: mutable.Map[String, String] = mutable.Map() //M
               var DataPR: mutable.Map[String, String] = mutable.Map() //R 
               
              if(variableL.value.size>0){//L
                  //时间升序
                  val dataL=variableL.value.sortWith{
                      case (a,b)=>{
                         Long.valueOf(format.parse(a.get("time").get.toString()).getTime())>
                         Long.valueOf(format.parse(b.get("time").get.toString()).getTime())
                      }
                  }
                 //套用公式计算
                 //Ft=(5Xt-1+4Xt-2+3Xt-3+2Xt-4+Xt-5)/15
                  var carNumL:Int=6        //权重
                  var carDataL:Double=0    //流量值(分子)
                  var speedL:Double=0      //速度值(分子)
                  var densitL:Double=0     //密度值(分子)
                  var rsNumL=0             //权重之和(分母)
                  dataL.foreach(x⇒{
                          carNumL=carNumL-1
                          rsNumL+=carNumL
                          carDataL+=Double.valueOf(x.get("car_num").get)*(carNumL)
                          speedL+=Double.valueOf(x.get("now_avg_speed").get)*(carNumL)
                          densitL+=Double.valueOf(x.get("now_avg_densit").get)*(carNumL)
                  })
                  val car_numL:Double=Math.abs(Double.valueOf(scala.math.round(carDataL/rsNumL)))  //流量(取整)
                  val now_avg_speedL=(speedL/rsNumL).formatted("%.2f")                             //(速度)四舍五入保留两位小数
                  val now_avg_densitL=(densitL/rsNumL).formatted("%.2f")                           //(密度)四舍五入保留两位小数
                  DataPL("key")=x.replace("V", "") + "1" + getTime.get("rowkey_pred").get  //hbase rowkey
                  DataPL("time")=format.format(getTime.get("rowkey_pred").get)
                  DataPL("car_num")= car_numL.toString()      
                  DataPL("now_avg_speed")=now_avg_speedL
                  DataPL("now_avg_densit")= now_avg_densitL               
                  DataPL("now_avg_TPI") =Math.abs(((1 - (Double.valueOf(now_avg_speedL) / 80)) * 10)).formatted("%.2f")   //拥堵度((1-(平均速度/最高限速))*10)
                  DataPL("cameraId")=x          //摄像头编号                                                                 
                  DataPL("lane_position") = "L" //车道类别
                  
                  resultDataL.append(DataPL)
              }
              
              if(variableM.value.size>0){//M
                  val dataM=variableM.value.sortWith{
                    case (a,b)=>{
                  	  Long.valueOf(format.parse(a.get("time").get.toString()).getTime())>
                  		Long.valueOf(format.parse(b.get("time").get.toString()).getTime())
                  			
                    }
                  }
                  
                  var carNumM:Int=6        //权重
                  var carDataM:Double=0    //流量值(分子)
                  var speedM:Double=0      //速度值(分子)
                  var densitM:Double=0     //密度值(分子)
                  var rsNumM=0             //权重之和(分母)
                  dataM.foreach(x⇒{
                          carNumM=carNumM-1
                          rsNumM+=carNumM
                          carDataM+=Double.valueOf(x.get("car_num").get)*(carNumM)
                          speedM+=Double.valueOf(x.get("now_avg_speed").get)*(carNumM)
                          densitM+=Double.valueOf(x.get("now_avg_densit").get)*(carNumM)
                  })
                  
                  val car_numM:Double=Math.abs(Double.valueOf(scala.math.round(carDataM/rsNumM)))  //流量(取整)
                  val now_avg_speedM=(speedM/rsNumM).formatted("%.2f")                             //(速度)四舍五入保留两位小数
                  val now_avg_densitM=(densitM/rsNumM).formatted("%.2f")                           //(密度)四舍五入保留两位小数
                  
                  DataPM("key")=x.replace("V", "") + "2" + getTime.get("rowkey_pred").get  
                  DataPM("time")=format.format(getTime.get("rowkey_pred").get)
                  DataPM("car_num")= car_numM.toString()      
                  DataPM("now_avg_speed")=now_avg_speedM
                  DataPM("now_avg_densit")= now_avg_densitM              
                  DataPM("now_avg_TPI") = Math.abs(((1 - (Double.valueOf(now_avg_speedM) / 80)) * 10)).formatted("%.2f")   //拥堵度((1-(平均速度/最高限速))*10)
                  DataPM("cameraId")=x
                  DataPM("lane_position") = "M" //车道类别
                    
                  resultDataM.append(DataPM)
               }  
               
               if(variableR.value.size>0){//R
                  val dataR=variableR.value.sortWith{
                    case (a,b)=>{
                  	  Long.valueOf(format.parse(a.get("time").get.toString()).getTime())>
                  	  Long.valueOf(format.parse(b.get("time").get.toString()).getTime())
                    }
                  }
                      
                  var carNumR:Int=6        //权重
                  var carDataR:Double=0    //流量值(分子)
                  var speedR:Double=0      //速度值(分子)
                  var densitR:Double=0     //密度值(分子)
                  var rsNumR=0             //权重之和(分母)
                  dataR.foreach(x⇒{
                          carNumR=carNumR-1
                          rsNumR+=carNumR
                          carDataR+=Double.valueOf(x.get("car_num").get)*(carNumR)
                          speedR+=Double.valueOf(x.get("now_avg_speed").get)*(carNumR)
                          densitR+=Double.valueOf(x.get("now_avg_densit").get)*(carNumR)
                  })
                  val car_numR:Double=Math.abs(Double.valueOf(scala.math.round(carDataR/rsNumR))) 
                  val now_avg_speedR=(speedR/rsNumR).formatted("%.2f")                             
                  val now_avg_densitR=(densitR/rsNumR).formatted("%.2f")
                  
                  DataPR("key")=x.replace("V", "") + "3" + getTime.get("rowkey_pred").get  
                  DataPR("time")=format.format(getTime.get("rowkey_pred").get)
                  DataPR("car_num")=car_numR.toString()
                  DataPR("now_avg_speed")=now_avg_speedR
                  DataPR("now_avg_densit")=now_avg_densitR
                  DataPR("now_avg_TPI") =((1 - (Double.valueOf(now_avg_speedR)/ 80)) * 10).formatted("%.2f")  
                  DataPR("cameraId")=x
                  DataPR("lane_position") = "R" //车道类别
                  
                  resultDataR.append(DataPR)
               }
       })
       
       //合并数据
       resultDataL=resultDataL.++=(resultDataM).++=(resultDataR)
       val rdd = sc.parallelize(resultDataL)
       println("========")
       rdd.foreach(println)
       val dataRDD=rdd.map(x⇒{
         convert(x)
       })
       
      val jobConf = new JobConf(conf)
          jobConf.setOutputFormat(classOf[TableOutputFormat])//输出数据的类型
          jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableNameOutPut)
       
       dataRDD.saveAsHadoopDataset(jobConf)
       sc.stop()
      }
      
      //计算出需要拿取的数据时间节点及预测的时间点
      def getTime(): mutable.Map[String, Long] = {
        //计算出最新的5分钟时间节点
        val date: Calendar = Calendar.getInstance()
        val indexMinute = sdf.format(date.getTime()) + ":00"
        var dt: String = null
        val minute = fmtminute.format(date.getTime)
        val rs: Int = Integer.valueOf(minute) / 5
        if (Integer.valueOf(minute) % 5 != 0 && Integer.valueOf(minute) % 5 > 2) {
          val min = (rs * 5).toString()
          val builderDate = new StringBuilder(indexMinute).replace(14, 16, min)
          dt = builderDate.toString()
        } else {
          val min = ((rs * 5) - 5).toString()
          val builderDate = new StringBuilder(indexMinute).replace(14, 16, min)
          dt = builderDate.toString()
        }
        
        var map: mutable.Map[String, Long] = mutable.Map()
        //预测的时间点
        val preddate: Calendar = Calendar.getInstance()
        preddate.setTime(format.parse(dt))
        preddate.add(Calendar.MINUTE, +(2 * 5))
        val Minute_pred = sdf.format(preddate.getTime()) + ":00"
        val dlong_pred: Long = format.parse(Minute_pred).getTime()
        map("rowkey_pred") = dlong_pred
    
        map("rowkey_1") = format.parse(dt).getTime().longValue() //5
    
        val newdate: Calendar = Calendar.getInstance()
        newdate.setTime(format.parse(dt))
        newdate.add(Calendar.MINUTE, -(1 * 5))
        val Minute_2 = sdf.format(newdate.getTime()) + ":00"
        val dlong_2: Long = format.parse(Minute_2).getTime()
        map("rowkey_2") = dlong_2
    
        newdate.add(Calendar.MINUTE, -(1 * 5))
        val Minute_3 = sdf.format(newdate.getTime()) + ":00"
        val dlong_3: Long = format.parse(Minute_3).getTime()
        map("rowkey_3") = dlong_3
    
        newdate.add(Calendar.MINUTE, -(1 * 5))
        val Minute_4 = sdf.format(newdate.getTime()) + ":00"
        val dlong_4: Long = format.parse(Minute_4).getTime()
        map("rowkey_4") = dlong_4
    
        newdate.add(Calendar.MINUTE, -(1 * 5))
        val Minute_5 = sdf.format(newdate.getTime()) + ":00"
        val dlong_5: Long = format.parse(Minute_5).getTime()
        map("rowkey_5") = dlong_5
    
        (map)
      }
      
      //定义往Hbase插入数据的方法
     def convert(map: mutable.Map[String, String])= {
        //1.获取表对像
        val put = new Put(Bytes.toBytes(map.get("key").get)) //rowkey
    
        //车流量
        put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("car_num"), Bytes.toBytes(map.get("car_num").get))
    
        //时间
        put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("time"), Bytes.toBytes(map.get("time").get))
    
        //车道
        put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("lane_position"), Bytes.toBytes(map.get("lane_position").get))
    
        //平均速度(车道速度和/车道车辆数)(平均)
        put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("now_avg_speed"), Bytes.toBytes(map.get("now_avg_speed").get))
    
        //当前车道密度(1km/平均车间距)
        put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("now_avg_densit"), Bytes.toBytes(map.get("now_avg_densit").get))
      
        //拥堵度((1-(平均速度/最高限速))*10)
        put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("now_avg_TPI"), Bytes.toBytes(map.get("now_avg_TPI").get))
    
        //摄像头编号
        put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("cameraId"), Bytes.toBytes(map.get("cameraId").get))
    
        (new ImmutableBytesWritable, put)
      }
    }
    

      

    • 案列四:

           使用SparkCore模块从Redis拉取数据经过逻辑处理数据后存入Redis

            

    package com.lg.blgdata.core
    
    import java.text.SimpleDateFormat
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.Seconds
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.spark.SparkConf
    import java.util.Calendar
    import scala.collection.mutable
    import java.util.Date
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import com.lg.blgdata.utils.JedisConnectionPool
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.types.StringType
    import org.apache.spark.sql.types.StructField
    import org.apache.spark.sql.DataFrame
    import com.google.gson.Gson
    import com.google.gson.reflect.TypeToken
    import org.apache.spark.sql.Row
    import java.util.LinkedHashMap
    import com.alibaba.fastjson.JSON
    import org.apache.spark.SparkContext
    import scala.collection.mutable.ListBuffer
    import java.lang.Long
    import java.lang.Double
    import redis.clients.jedis.Pipeline
    import redis.clients.jedis.Jedis
    
    /**
     *     定时任务执行
     * 1. 短时预测
     * 	从redis获取最新5分钟的数据合成:合成未来5个一分钟时间节点的流量
     */
    object WriteRedis {
      val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm")
      val fmtminute = new SimpleDateFormat("mm")
      val hourSdf = new SimpleDateFormat("yyyy-MM-dd HH")
      val daysdf = new SimpleDateFormat("yyyy-MM-dd")
      val fmtScornd = new SimpleDateFormat("ss")
    
      def main(args: Array[String]): Unit = {
    
           val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WriteRedis")
    
    							//创建spark上下文对象
    							val sc = new SparkContext(config)
    
    							//获取一个jedis连接池
    							val jedis: Jedis = JedisConnectionPool.getConnections()
    							jedis.select(3) //db,默认有16个
    
    							val carRDD:List[String]=List("MV005","MV158")
    
    							//分别拿到左右线的5分钟内车流量
                  var rsListL = new ListBuffer[mutable.Map[String, String]]()
                  var rsListR = new ListBuffer[mutable.Map[String, String]]()
    
    							carRDD.foreach(carId⇒{
    								    val result: java.util.Map[String,String] = jedis.hgetAll(carId)
    										val keys=result.keySet().toArray()
    										for(key <- keys){
    										  val thisDate=format.parse(key.toString()+":00").getTime().longValue()
    											val startDate=getTime.get("sdate").get
    											val endDate=getTime.get("edate").get
    							
    										  if(startDate<=thisDate &&thisDate<endDate){//时间判断
      										     if(carId.replace("M", "").equals("V005")){//左右线筛选
      										       	 var DataPL: mutable.Map[String, String] = mutable.Map()
        										       	   DataPL("key")=key.toString()
        										       	   DataPL("sum")=key.toString()
        										       	   DataPL("num")=result.get(key).toString()
        										       	   rsListL.append(DataPL)
      										     }
      										     
      										     if(carId.replace("M", "").equals("V158")){//左右线筛选
      										    	  var DataPR: mutable.Map[String, String] = mutable.Map()
      										    	      DataPR("key")=key.toString()
      										    	      DataPR("sum")=key.toString()
      										       	    DataPR("num")=result.get(key).toString()
      										       	    rsListR.append(DataPR)
      										     }
    										  }
    										}
    							})
    							
    							 //时间升序
                  var dataL=rsListL.sortWith{
                      case (a,b)=>{
                         Long.valueOf(format.parse(a.get("key").get.toString()+":00").getTime())<
                         Long.valueOf(format.parse(b.get("key").get.toString()+":00").getTime())
                      }
                  }
           
    							var dataR=rsListR.sortWith{
        							case (a,b)=>{
        								Long.valueOf(format.parse(a.get("key").get.toString()+":00").getTime())<
        								Long.valueOf(format.parse(b.get("key").get.toString()+":00").getTime())
        							}
    							}
    					
    						
    							var iL=1
    							var keyLS=0     //建的和
    							var valueLS=0   //值的和
    							var sumLS=0     //权重值的和
    							dataL.foreach{x⇒{
    							  x("key")=(iL).toString()
    							  x("sum")=(iL*Integer.valueOf(x.get("num").get)).toString()
    							   keyLS+=iL
    							  iL=iL+1
    							  valueLS+=Integer.valueOf(x.get("num").get)
    							  sumLS+=Integer.valueOf(x.get("sum").get)
    							}}
    							var L:Array[Int]=Array(keyLS,valueLS,sumLS)
    							
    							val bL:String=Double.valueOf((5*L(2)-L(0)*L(1))/(50.0)).formatted("%.2f")//求b的值
    							val yL:String=Double.valueOf(valueLS/5.0).formatted("%.2f")              //求y的值
    							val aL=(Double.valueOf(yL)-3*Double.valueOf(bL)).formatted("%.2f")       //求a的值
    							
    							//以上求出A和B  带入公式S=a+bx,x=7,8,9,10,11
    					     val st:List[Int]=List(7,8,9,10,11)
    					     
    					     var sListL = new ListBuffer[mutable.Map[String, String]]()
    					     var siL=1
    					     st.foreach(x⇒{
    					       var rsL: mutable.Map[String, String] = mutable.Map()
    					       rsL("sum")=Math.abs(scala.math.round(Double.valueOf(aL)+(Double.valueOf(bL)*x))).toString()
    					       rsL("time")=format.format(getTime.get("rowkey_"+siL).get)
    					       siL=siL+1
    					       sListL.append(rsL)
    					     })
    					
    					     
    							//<<<<<<<<<<<<<<<<<<<<<<<<>>>>>>>>>>>>>>>>>>>>>>>> 
    							var iR=1
    							var keyRS=0     //建的和
    							var valueRS=0   //值的和
    							var sumRS=0     //权重值的和
    							dataR.foreach{x⇒{
    								x("key")=(iR).toString()
    								x("sum")=(iR*Integer.valueOf(x.get("num").get)).toString()
    								keyRS+=iR
    								iR=iR+1
    								valueRS+=Integer.valueOf(x.get("num").get)
    							  sumRS+=Integer.valueOf(x.get("sum").get)
    							}}
    						  var R:Array[Int]=Array(keyRS,valueRS,sumRS)
    						
      					   val bR:String=Double.valueOf((5*R(2)-R(0)*R(1))/(50.0)).formatted("%.2f") 
      					   val yR:String=Double.valueOf(valueRS/5.0).formatted("%.2f")                
      					   val aR=(Double.valueOf(yR)-3*Double.valueOf(bR)).formatted("%.2f")                             //求a的值
    					
    				       var sListR = new ListBuffer[mutable.Map[String, String]]()
    					     var siR=1
    					     st.foreach(x⇒{
    					       var rsR: mutable.Map[String, String] = mutable.Map()
    					       rsR("sum")=Math.abs(scala.math.round((Double.valueOf(aR)+Double.valueOf(bR)*x))).toString()
    					       rsR("time")=format.format(getTime.get("rowkey_"+siR).get)
    					       siR=siR+1
    					       sListR.append(rsR)
    					     })
    					 
    					     
    				//开启redis的(pipeline)事务
    				var pipeline: Pipeline = null
    				  try {   
    				    
    						//开启pipeline
    						pipeline=jedis.pipelined()
    						pipeline.multi()
    					   //循环数据
    					   sListL.foreach(x⇒{
    					     pipeline.hset("predV005", x.get("time").get, x.get("sum").get)
    					   })
    					   sListR.foreach(y⇒{
    					     pipeline.hset("predV158", y.get("time").get, y.get("sum").get)
    					   })
    
    					  //提交事务
    						pipeline.sync()
    						pipeline.exec()
    						
    					} catch {
    											case e: Exception => {
    												e.printStackTrace()
    												pipeline.discard()//放弃前面的操作
    												sc.stop()
    											}
    					}finally{
    												if(pipeline!=null){
    													pipeline.close()
    												}
    												if(jedis!=null){
    													jedis.close()
    												}
    			 }
    					
        //关闭
        sc.stop()
    
      }
      def handlerMessageRow(jsonStr: String): Row = {
        import scala.collection.JavaConverters._
        val array = JSON.parseObject(jsonStr, classOf[LinkedHashMap[String, Object]]).asScala.values.map(x => String.valueOf(x)).toArray
        Row(array: _*)
      }
    
      //计算出需要拿取的数据时间节点及预测的时间点
      def getTime(): mutable.Map[String, Long] = {
        //计算出最新的5分钟时间节点
        val date: Calendar = Calendar.getInstance()
        date.add(Calendar.MINUTE, +1)
        val rowkey_1 = sdf.format(date.getTime()) + ":00"
    
        var map: mutable.Map[String, Long] = mutable.Map()
        //预测的时间点
        map("rowkey_1") = format.parse(rowkey_1).getTime().longValue() //thisTime加1分钟
    
        date.add(Calendar.MINUTE, +1)
        val rowkey_2 = sdf.format(date.getTime()) + ":00"
        map("rowkey_2") = format.parse(rowkey_2).getTime().longValue()
    
        date.add(Calendar.MINUTE, +1)
        val rowkey_3 = sdf.format(date.getTime()) + ":00"
        map("rowkey_3") = format.parse(rowkey_3).getTime().longValue()
    
        date.add(Calendar.MINUTE, +1)
        val rowkey_4 = sdf.format(date.getTime()) + ":00"
        map("rowkey_4") = format.parse(rowkey_4).getTime().longValue()
    
        date.add(Calendar.MINUTE, +1)
        val rowkey_5 = sdf.format(date.getTime()) + ":00"
        map("rowkey_5") = format.parse(rowkey_5).getTime().longValue()
    
        val sdate: Calendar = Calendar.getInstance()
        val sd = sdf.format(sdate.getTime()) + ":00"
        map("edate") = format.parse(sd).getTime().longValue()
    
        sdate.add(Calendar.MINUTE, -(1 * 5))
        val ed = sdf.format(sdate.getTime()) + ":00"
        map("sdate") = format.parse(ed).getTime().longValue()
    
        (map)
      }
    
    }
    

      

            

    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
  • 相关阅读:
    js幻灯片效果!
    构造函数和析构函数的简单说明
    ASP.NET接口的基础使用例子
    带预览图的js切换效果!
    在win7系统中安装sqlserver2005出现 [Microsoft][SQL Native Client]客户端不支持加密问题!
    Win7开启无线共享上网的方法
    C# 结构体 简明介绍
    C#访问修饰符简单说明
    C#不定长参数的使用
    研究了一下Google Ajax Search API, 给博客做了个样品
  • 原文地址:https://www.cnblogs.com/KdeS/p/14307197.html
Copyright © 2011-2022 走看看