案列一:
package com.lg.bigdata.streaming import org.apache.spark.streaming.StreamingContext import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.storage.StorageLevel import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.ConnectionFactory import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.hbase.util.Bytes import com.lg.bigdata.utils.JZWUtil import java.util.Calendar import java.text.SimpleDateFormat import scala.collection.mutable import org.apache.hadoop.hbase.filter.FilterList import org.apache.hadoop.hbase.filter.RowFilter import org.apache.hadoop.hbase.filter.RegexStringComparator import org.apache.hadoop.hbase.filter.CompareFilter import org.apache.spark.SparkContext import org.apache.spark.sql.Row import org.apache.spark.sql.types.StringType import org.apache.spark.sql.SparkSession import org.apache.hadoop.hbase.client.Connection import scala.collection.mutable.ListBuffer import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.mapred.TableOutputFormat import java.util.LinkedHashMap import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.spark.rdd.RDD.rddToPairRDDFunctions import org.apache.spark.sql.DataFrame /** * 一.5分钟数合成 :车道级,断面级 * 从hbase拿取1分钟车道级数据合成5分钟断面级数据存入hbase * 要求: * (1)格式保持与1分钟一致 * * */ object HbaseSectionMinute5Data { val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm") val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val fmtminute = new SimpleDateFormat("mm") val conf = HBaseConfiguration.create() val tableNameInPut = "jzw_data:spot_jt_para_1min" //输出表 val tableNameOutPut = "jzw_data:spot_jt_para_5min" //输入表→车道级 val sectionTableNameOutPut = "jzw_data:spot_jt_section_5min" //输入表→断面级 var connection: Connection = null var thism: String = null def main(args: Array[String]): Unit = { val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HbaseSectionMinute5Data") config.set("spark.streaming.receiverRestartDelay", "60000"); //设置Receiver启动频率,每5s启动一次(单位:毫秒) val sc = new SparkContext(config) sc.setLogLevel("WARN") val ssc = new StreamingContext(sc, Seconds(60)) //设置Spark时间窗口,每5分钟处理一次300 val rddStream: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver()) rddStream.print(1) //强行触发action操作//强行触发action操作 //导入所需的类型 import org.apache.spark.sql.types._ //常见的聚合函数 import org.apache.spark.sql.functions._ //数据类型 val schema = StructType(List( StructField("car_num", StringType), StructField("lane_position", StringType), StructField("now_avg_speed", StringType), StructField("now_avg_densit", StringType), StructField("now_avg_passTime", StringType), StructField("now_avg_TPI", StringType), StructField("now_avg_space", StringType), StructField("cameraId", StringType), StructField("time", StringType))) val jobConf = new JobConf(conf) jobConf.setOutputFormat(classOf[TableOutputFormat])//输出数据的类型 jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableNameOutPut) val sectionjobConf = new JobConf(conf) sectionjobConf.setOutputFormat(classOf[TableOutputFormat])//输出数据的类型 sectionjobConf.set(TableOutputFormat.OUTPUT_TABLE, sectionTableNameOutPut) //属性提取 var spark: SparkSession=null var dataFrame: DataFrame=null var line: Array[Row]=null var sectionline: Array[Row]=null var rsList:ListBuffer[mutable.Map[String, String]] = null var sectionList:ListBuffer[mutable.Map[String, String]] = null var map: mutable.Map[String, String] = null var smap: mutable.Map[String, String] = null rddStream.map(record =>JZWUtil.handlerMessage2Row(record)).foreachRDD(rdd => { if (rdd.count() > 0) { println("数据条数=======:" + rdd.count()) val time=thism //时间获取 val elong = format.parse(time).getTime() if(spark==null){ spark= SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate() } //rdd转DataFrame dataFrame= spark.createDataFrame(rdd, schema) /* 一.5分钟车道级别 * 聚合分组 :摄像头 车道 * car_num :车流量和 * now_avg_speed:速度和 * now_avg_densit:密度和 * now_avg_passTime:通行时间和 * now_avg_TPI:排队长度和 * now_avg_space:车间距和 **/ line = dataFrame.groupBy("cameraId", "lane_position").agg( sum("car_num"), round((sum("now_avg_speed") / count("cameraId")), 2), round((sum("now_avg_densit") / count("cameraId")), 2), round((sum("now_avg_passTime") / count("cameraId")), 2), round((sum("now_avg_TPI") / count("cameraId")), 2), round((sum("now_avg_space") / count("cameraId")), 2)).collect() //二.5分钟断面级别 sectionline= dataFrame.groupBy("cameraId").agg( sum("car_num"), round((sum("now_avg_speed") / count("cameraId")), 2), round((sum("now_avg_densit") / count("cameraId")), 2), round((sum("now_avg_passTime") / count("cameraId")), 2), round((sum("now_avg_TPI") / count("cameraId")), 2), round((sum("now_avg_space") / count("cameraId")), 2)).collect() //车道级别数据计算 println("车道级别行数:" + line.size) rsList=new ListBuffer[mutable.Map[String, String]]() var lane: Int = 0 line.foreach(x ⇒ { map= mutable.Map() /* rowkey设计: * 列:00111607048520000 * 001 :前三位为摄像头编号去'V' * 1 :第四位 (L:1 M:2 R:3) * 1607048520000:第四位之后的部分为精确到分钟的4时间戳 **/ if(x.apply(1).equals("L")){ lane=1 }else if(x.apply(1).equals("M")){ lane=2 }else{ lane=3 } map("key") = (x.apply(0).toString().replace("V", "") + lane + elong).toString() //hbase rowkey这里分钟数是5的倍数 map("car_num") = x.apply(2).toString() //车流量 map("time") = time //时间(2020-12-02 19:49:00) map("lane_position") = x.apply(1).toString()//车道类别 map("now_avg_speed") = x.apply(3).toString()//速度 map("now_avg_densit") = x.apply(4).toString()//当前车道密度(1分钟求平均) map("now_avg_passTime") = x.apply(5).toString()//通行时间(1分钟求平均) map("now_avg_TPI") =x.apply(6).toString()//拥堵度(1分钟求平均) map("now_avg_space") = x.apply(7).toString()//车间距(1分钟求平均) map("cameraId") = x.apply(0).toString() rsList.append(map) }) println("rsList条数:" + rsList.size) var newrdd = sc.parallelize(rsList) //结果数据转rdd val dataRDD=newrdd.map(x⇒{ JZWUtil.convert(x,5) }) //一.保存5分钟的车道级数据. dataRDD.saveAsHadoopDataset(jobConf) //二.计算5分钟的断面级数据 sectionList = new ListBuffer[mutable.Map[String, String]]() sectionline.foreach(x ⇒ { smap= mutable.Map() /* rowkey设计: * 列:00111607048520000 * 001 :前三位为摄像头编号去'V' * 1 :第四位 (L:1 M:2 R:3) * 1607048520000:第四位之后的部分为精确到分钟的时间戳 **/ smap("key") =elong+(x.apply(0).toString().replace("V", "")).toString() //hbase rowkey这里分钟数是5的倍数 smap("car_num") = x.apply(1).toString() //车流量 smap("time") = time //时间(2020-12-02 19:49:00) smap("now_avg_speed") = x.apply(2).toString()//速度 smap("now_avg_densit") = x.apply(3).toString()//当前车道密度(1分钟求平均) smap("now_avg_passTime") = x.apply(4).toString()//通行时间(1分钟求平均) smap("now_avg_TPI") =x.apply(5).toString()//拥堵度(1分钟求平均) smap("now_avg_space") = x.apply(6).toString()//车间距(1分钟求平均) smap("cameraId") = x.apply(0).toString() sectionList.append(smap) }) println("sectionList条数:" + sectionList.size) //结果数据转rdd val Sectionrdd = sc.parallelize(sectionList) //rdd转为habse可存储格式 val dataSectionRDD=Sectionrdd.map(x⇒{ JZWUtil.convert(x,6) }) dataSectionRDD.saveAsHadoopDataset(sectionjobConf) } }) ssc.start() ssc.awaitTermination() println("connection关闭") connection.close() } /* * .由于没有读取HBase的Stream接口,需要一个自定义的Receiver用于查询HBase数据类 * MyReceiver:自定义Receiver通过私有方法receive()方法读取HBase数据并调用store(b.toString())将数据写入DStream。 * 申明采集器 * 1.继承Receiver 参数是存储级别 * 2.重写方法onStart,onStop */ class MyReceiver() extends Receiver[String](StorageLevel.MEMORY_ONLY) { override def onStart(): Unit = { receive() } override def onStop(): Unit = { } private def receive(): Unit = { if (connection == null) { connection = ConnectionFactory.createConnection(conf) } //获取当前分钟数 val edate: Calendar = Calendar.getInstance() val endMinute = sdf.format(edate.getTime()) + ":00" thism = endMinute println("时间:" + thism) val elong = format.parse(thism).getTime() val Minute = fmtminute.format(elong) if (Integer.valueOf(Minute) % 5 == 0) { println("IFSTART::>>>"+thism) val date: Calendar = Calendar.getInstance() val admin = connection.getAdmin; val table = new HTable(conf, tableNameInPut) val scan = new Scan() scan.setCacheBlocks(false) scan.addFamily(Bytes.toBytes("info")) val rowkey_1 = "." + elong //5 val rowkey_2 = "." + getTime.get("rowkey_2").get //4 val rowkey_3 = "." + getTime.get("rowkey_3").get //3 val rowkey_4 = "." + getTime.get("rowkey_4").get //2 val rowkey_5 = "." + getTime.get("rowkey_5").get //1 /* * 摄像头前缀获取数据, * MUST_PASS_ONE: 取并集 相当于or 操作 * MUST_PASS_ALL: 取交集 相当一and操作 */ val filterList: FilterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); val rowFilter1: RowFilter = new RowFilter( CompareFilter.CompareOp.EQUAL, new RegexStringComparator(rowkey_1)) filterList.addFilter(rowFilter1) val rowFilter2: RowFilter = new RowFilter( CompareFilter.CompareOp.EQUAL, new RegexStringComparator(rowkey_2)) filterList.addFilter(rowFilter2) val rowFilter3: RowFilter = new RowFilter( CompareFilter.CompareOp.EQUAL, new RegexStringComparator(rowkey_3)) filterList.addFilter(rowFilter3) val rowFilter4: RowFilter = new RowFilter( CompareFilter.CompareOp.EQUAL, new RegexStringComparator(rowkey_4)) filterList.addFilter(rowFilter4) val rowFilter5: RowFilter = new RowFilter( CompareFilter.CompareOp.EQUAL, new RegexStringComparator(rowkey_5)) filterList.addFilter(rowFilter5) scan.setFilter(filterList) val rs = table.getScanner(scan) val iterator = rs.iterator() while (iterator.hasNext) { val result = iterator.next(); val b = new StringBuilder b.append("{") b.append(""car_num":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("car_num"))) + """) //流量 b.append(",") b.append(""lane_position":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("lane_position"))) + """) //车道 b.append(",") b.append(""now_avg_speed":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_speed"))) + """) //速度 b.append(",") b.append(""now_avg_densit":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_densit"))) + """) //密度 b.append(",") b.append(""now_avg_passTime":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_passTime"))) + """) //通行时间 b.append(",") b.append(""now_avg_TPI":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_TPI"))) + """) //拥堵度 b.append(",") b.append(""now_avg_space":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_space"))) + """) //车间距 b.append(",") b.append(""cameraId":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("cameraId"))) + """) //摄像头 b.append(",") b.append(""time":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("time"))) + """) //摄像头 b.append("}") store(b.toString()) } table.close() } restart("Trying to connect again") } } def getTime(): mutable.Map[String, Long] = { var map: mutable.Map[String, Long] = mutable.Map() //0 val date: Calendar = Calendar.getInstance() val Minute_1 = sdf.format(date.getTime()) + ":00" val dlong_1: Long = format.parse(Minute_1).getTime() map("rowkey_1") = dlong_1 //1 date.add(Calendar.MINUTE, -1) // 当前时间减1分钟 val Minute_2 = sdf.format(date.getTime()) + ":00" val dlong_2: Long = format.parse(Minute_2).getTime() map("rowkey_2") = dlong_2 //2 date.add(Calendar.MINUTE, -1) // 当前时间减2分钟 val Minute_3 = sdf.format(date.getTime()) + ":00" val dlong_3: Long = format.parse(Minute_3).getTime() map("rowkey_3") = dlong_3 //3 date.add(Calendar.MINUTE, -1) // 当前时间减3分钟 val Minute_4 = sdf.format(date.getTime()) + ":00" val dlong_4: Long = format.parse(Minute_4).getTime() map("rowkey_4") = dlong_4 //4 date.add(Calendar.MINUTE, -1) // 当前时间减4分钟 val Minute_5 = sdf.format(date.getTime()) + ":00" val dlong_5: Long = format.parse(Minute_5).getTime() map("rowkey_5") = dlong_5 (map) } }
案列二:
package com.lg.bigdata.streaming import org.apache.spark.streaming.StreamingContext import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.storage.StorageLevel import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.ConnectionFactory import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.hbase.util.Bytes import com.lg.bigdata.utils.JZWUtil import com.alibaba.fastjson.JSON import java.util.Calendar import java.text.SimpleDateFormat import scala.collection.mutable import org.apache.hadoop.hbase.filter.FilterList import org.apache.hadoop.hbase.filter.RowFilter import org.apache.hadoop.hbase.filter.RegexStringComparator import org.apache.hadoop.hbase.filter.CompareFilter import org.apache.spark.SparkContext import java.util.LinkedHashMap import org.apache.spark.sql.Row import org.apache.spark.sql.types.StringType import org.apache.spark.sql.SparkSession import org.apache.hadoop.hbase.client.Connection import scala.collection.mutable.ListBuffer import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.mapred.TableOutputFormat import java.util.LinkedHashMap import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.spark.rdd.RDD.rddToPairRDDFunctions import org.apache.spark.sql.DataFrame import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.spark.rdd.RDD import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.client.Result import java.lang.Long import java.lang.Double import org.apache.hadoop.hbase.client.Put /** * 一.5分钟预测数据合成 :断面级 * 从hbase拿取5分钟断面级数据预测5分钟断面数据存入hbase * 要求: * (1)参数:时间/摄像头/车流量/速度/密度/拥堵度 * * */ object PredSectionMinute5Data { val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm") val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val fmtminute = new SimpleDateFormat("mm") var sc: SparkContext=null val conf = HBaseConfiguration.create() val tableNameInPut = "jzw_data:spot_jt_section_5min" //输出表→断面级 val predSectionTableNameOut= "jzw_data:spot_jt_section_5min_pred"//输入表→断面级预测 var connection: Connection = null var thism:String=null var predTime:String=null def main(args: Array[String]): Unit = { //1.创建spark上下文 val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("PredSectionMinute5Data") config.set("spark.streaming.receiverRestartDelay", "60000"); //设置Receiver启动频率,每5s启动一次(单位:毫秒) sc= new SparkContext(config) sc.setLogLevel("WARN") val ssc = new StreamingContext(sc, Seconds(60)) //设置Spark时间窗口,每1分钟处理一次,时间是5分钟节点的时候计算 //2.获取DStream类型的节点数据 val rddStream: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver()) rddStream.print(1) //强行触发action操作 //3.导入所需的类型 import org.apache.spark.sql.types._ //常见的聚合函数 import org.apache.spark.sql.functions._ //数据类型 val schema = StructType(List( StructField("car_num", StringType), StructField("now_avg_speed", StringType), StructField("now_avg_densit", StringType), StructField("now_avg_passTime", StringType), StructField("now_avg_TPI", StringType), StructField("now_avg_space", StringType), StructField("cameraId", StringType), StructField("time", StringType))) //4.配置hbase的输出和输入表 val jobConf = new JobConf(conf) jobConf.setOutputFormat(classOf[TableOutputFormat])//输出数据的类型 jobConf.set(TableOutputFormat.OUTPUT_TABLE, predSectionTableNameOut) //5.数据转dataFrame后sql解析 //属性提取 var df: DataFrame=null var sortDF: DataFrame=null var line: Array[Row]=null rddStream.map(record => JZWUtil.handlerMessage2Row(record)).foreachRDD(rdd => { if (rdd.count() > 0) { println("数据条数=======:" + rdd.count()) //时间戳获取 val predlong = format.parse(predTime).getTime() val spark= SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate() //rdd转DataFrame df= spark.createDataFrame(rdd, schema) //2.分组摄像头和对应条数 line=df.groupBy("cameraId").count().collect() //把数据按照时间升序排序 sortDF=df.sort("time") var resultData = new ListBuffer[mutable.Map[String, String]]() //结果数据统计 line.foreach(x⇒{ //1.把数据排升序 cameraId val words=sortDF.filter("cameraId =='"+x.apply(0)+"'").collect() //3.套用公式计算 //Ft=(5Xt-1+4Xt-2+3Xt-3+2Xt-4+Xt-5)/15 var carNum:Int=6 //权重 var carData:Double=0 //流量值(分子) var speed:Double=0 //速度值(分子) var densit:Double=0 //密度值(分子) var rsNum=0 //权重之和(分母) words.foreach(y⇒{ carNum=carNum-1 rsNum+=carNum carData+=Double.valueOf(y.apply(0).toString())*(carNum) speed+=Double.valueOf(y.apply(1).toString())*(carNum) densit+=Double.valueOf(y.apply(2).toString())*(carNum) }) //保存可用数据 var dataMap: mutable.Map[String, String] = mutable.Map() val car_num:Double=Math.abs(Double.valueOf(scala.math.round(carData/rsNum))) //流量(取整) val now_avg_speed=(speed/rsNum).formatted("%.2f") //(速度)四舍五入保留两位小数 val now_avg_densit=(densit/rsNum).formatted("%.2f") //(密度)四舍五入保留两位小数 //时间/摄像头/车流量/速度/密度/拥堵度 dataMap("key") =predlong+(x.apply(0).toString().replace("V", "")).toString() //hbase rowkey dataMap("time")=format.format(getTime.get("rowkey_pred").get) dataMap("car_num")= car_num.toString() dataMap("now_avg_speed")=now_avg_speed dataMap("now_avg_densit")= now_avg_densit dataMap("now_avg_TPI") =Math.abs(((1 - (Double.valueOf(now_avg_speed) / 80)) * 10)).formatted("%.2f") //拥堵度((1-(平均速度/最高限速))*10) dataMap("cameraId")=x.apply(0).toString() resultData.append(dataMap) }) println("resultData数量:"+resultData.size) //结果数据转rdd val Sectionrdd = sc.parallelize(resultData) //rdd转为habse可存储格式 val dataSectionRDD=Sectionrdd.map(x⇒{ convert(x) }) dataSectionRDD.saveAsHadoopDataset(jobConf) } }) ssc.start() ssc.awaitTermination() println("connection关闭") connection.close() } //定义往Hbase插入数据的方法 def convert(map: mutable.Map[String, String])= { //1.获取表对像 val put = new Put(Bytes.toBytes(map.get("key").get)) //rowkey //车流量 put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("car_num"), Bytes.toBytes(map.get("car_num").get)) //时间 put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("time"), Bytes.toBytes(map.get("time").get)) //平均速度(车道速度和/车道车辆数)(平均) put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("now_avg_speed"), Bytes.toBytes(map.get("now_avg_speed").get)) //当前车道密度(1km/平均车间距) put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("now_avg_densit"), Bytes.toBytes(map.get("now_avg_densit").get)) //拥堵度((1-(平均速度/最高限速))*10) put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("now_avg_TPI"), Bytes.toBytes(map.get("now_avg_TPI").get)) //摄像头编号 put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("cameraId"), Bytes.toBytes(map.get("cameraId").get)) (new ImmutableBytesWritable, put) } /* * .由于没有读取HBase的Stream接口,需要一个自定义的Receiver用于查询HBase数据类 * MyReceiver:自定义Receiver通过私有方法receive()方法读取HBase数据并调用store(b.toString())将数据写入DStream。 * 申明采集器 * 1.继承Receiver 参数是存储级别 * 2.重写方法onStart,onStop */ class MyReceiver() extends Receiver[String](StorageLevel.MEMORY_ONLY) { override def onStart(): Unit = { receive() } override def onStop(): Unit = { } private def receive(): Unit = { //1.获取连接 if (connection == null) { connection = ConnectionFactory.createConnection(conf) } //2.判断是否进行计算(5分钟节点开始预测) val elong = format.parse(thisMinute).getTime() thism=thisMinute //预测时间点 predTime=format.format(getTime.get("rowkey_pred").get) val Minute = fmtminute.format(elong) println(Minute+"<++>时间:"+thism) if (Integer.valueOf(Minute) % 5 == 0) { //3.组建rowkey val rowkey_5 = getTime.get("rowkey_1").get+"." //权重5 val rowkey_4 = getTime.get("rowkey_2").get+"." //权重4 val rowkey_3 = getTime.get("rowkey_3").get+"." //权重3 val rowkey_2 = getTime.get("rowkey_4").get+"." //权重2 val rowkey_1 = getTime.get("rowkey_5").get+"." //权重1 //4.指定连接表 val table = new HTable(conf, tableNameInPut) //5.组建scan多条件查询 val scan = new Scan() scan.setCacheBlocks(false)//是否缓存 scan.addFamily(Bytes.toBytes("info")) /* * 摄像头后缀获取数据, * MUST_PASS_ONE: 取并集 相当于or 操作 * MUST_PASS_ALL: 取交集 相当一and操作 */ val filterList: FilterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); val rowFilter1: RowFilter = new RowFilter( CompareFilter.CompareOp.EQUAL, new RegexStringComparator(rowkey_1)) filterList.addFilter(rowFilter1) val rowFilter2: RowFilter = new RowFilter( CompareFilter.CompareOp.EQUAL, new RegexStringComparator(rowkey_2)) filterList.addFilter(rowFilter2) val rowFilter3: RowFilter = new RowFilter( CompareFilter.CompareOp.EQUAL, new RegexStringComparator(rowkey_3)) filterList.addFilter(rowFilter3) val rowFilter4: RowFilter = new RowFilter( CompareFilter.CompareOp.EQUAL, new RegexStringComparator(rowkey_4)) filterList.addFilter(rowFilter4) val rowFilter5: RowFilter = new RowFilter( CompareFilter.CompareOp.EQUAL, new RegexStringComparator(rowkey_5)) filterList.addFilter(rowFilter5) scan.setFilter(filterList) //6.查询数据 val rs = table.getScanner(scan) val iterator = rs.iterator() while (iterator.hasNext) { val result = iterator.next(); val b = new StringBuilder b.append("{") b.append(""car_num":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("car_num"))) + """) //流量 b.append(",") b.append(""now_avg_speed":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_speed"))) + """) //速度 b.append(",") b.append(""now_avg_densit":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_densit"))) + """) //密度 b.append(",") b.append(""now_avg_passTime":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_passTime"))) + """) //通行时间 b.append(",") b.append(""now_avg_TPI":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_TPI"))) + """) //拥堵度 b.append(",") b.append(""now_avg_space":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_space"))) + """) //车间距 b.append(",") b.append(""cameraId":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("cameraId"))) + """) //摄像头 b.append(",") b.append(""time":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("time"))) + """) //时间 b.append("}") //将采集的数据存贮到采集器的内部进行转换 store(b.toString()) } table.close() } restart("Trying to connect again") } } //获取当前分钟数 def thisMinute(): String = { val edate: Calendar = Calendar.getInstance() val endMinute = sdf.format(edate.getTime()) + ":00" (endMinute) } //计算出需要拿取的数据时间节点及预测的时间点 def getTime(): mutable.Map[String, Long] = { //计算出最新的5分钟时间节点 val date: Calendar = Calendar.getInstance() val indexMinute = sdf.format(date.getTime()) + ":00" var dt: String = null val minute = fmtminute.format(date.getTime) val rs: Int = Integer.valueOf(minute) / 5 if (Integer.valueOf(minute) % 5 != 0 && Integer.valueOf(minute) % 5 > 2) { val min = (rs * 5).toString() val builderDate = new StringBuilder(indexMinute).replace(14, 16, min) dt = builderDate.toString() } else { val min = ((rs * 5) - 5).toString() val builderDate = new StringBuilder(indexMinute).replace(14, 16, min) dt = builderDate.toString() } var map: mutable.Map[String, Long] = mutable.Map() //预测的时间点 val preddate: Calendar = Calendar.getInstance() preddate.setTime(format.parse(dt)) preddate.add(Calendar.MINUTE, +(2 * 5)) val Minute_pred = sdf.format(preddate.getTime()) + ":00" val dlong_pred: Long = format.parse(Minute_pred).getTime() map("rowkey_pred") = dlong_pred map("rowkey_1") = format.parse(dt).getTime().longValue() //5 val newdate: Calendar = Calendar.getInstance() newdate.setTime(format.parse(dt)) newdate.add(Calendar.MINUTE, -(1 * 5)) val Minute_2 = sdf.format(newdate.getTime()) + ":00" val dlong_2: Long = format.parse(Minute_2).getTime() map("rowkey_2") = dlong_2 newdate.add(Calendar.MINUTE, -(1 * 5)) val Minute_3 = sdf.format(newdate.getTime()) + ":00" val dlong_3: Long = format.parse(Minute_3).getTime() map("rowkey_3") = dlong_3 newdate.add(Calendar.MINUTE, -(1 * 5)) val Minute_4 = sdf.format(newdate.getTime()) + ":00" val dlong_4: Long = format.parse(Minute_4).getTime() map("rowkey_4") = dlong_4 newdate.add(Calendar.MINUTE, -(1 * 5)) val Minute_5 = sdf.format(newdate.getTime()) + ":00" val dlong_5: Long = format.parse(Minute_5).getTime() map("rowkey_5") = dlong_5 (map) } }