zoukankan      html  css  js  c++  java
  • Spark读取Hbase的数据

    val conf = HBaseConfiguration.create()
        conf.addResource(new Path("/opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p0.4/lib/hbase/conf/hbase-site.xml"))
        conf.addResource(new Path("/opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p0.4/lib/hadoop/etc/hadoop/core-site.xml"))
        conf.set(TableInputFormat.INPUT_TABLE, "FLOW")
    
        //添加过滤条件,年龄大于 18 岁
        //val scan = new Scan()
        //conf.set(TableInputFormat.SCAN, convertScanToString(scan))
        /*
        scan.setFilter(new SingleColumnValueFilter("basic".getBytes, "age".getBytes,
          CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(18)))
        */
    
        val usersRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
          classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
          classOf[org.apache.hadoop.hbase.client.Result])
    
        val data1 = usersRDD.count()
    
        val sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSS")
    
        println("data length:" + data1)
    
        var map = HashMap[String, HashMap[String, collection.mutable.ArrayBuffer[Double]]]()
    
        usersRDD.collect().map {
          case (_, result) =>
            val key = Bytes.toInt(result.getRow)
            println("Key:" + key)
            val ip = Bytes.toString(result.getValue("F".getBytes, "SADDR".getBytes))
            val port = Bytes.toString(result.getValue("F".getBytes, "SPORT".getBytes))
            val startTimeLong = Bytes.toString(result.getValue("F".getBytes, "STIME".getBytes))
            val endTimeLong = Bytes.toString(result.getValue("F".getBytes, "LTIME".getBytes))
            val protocol = Bytes.toString(result.getValue("F".getBytes, "PROTO".getBytes))
            val sumTime = Bytes.toString(result.getValue("F".getBytes, "DUR".getBytes))
            val sum = Bytes.toString(result.getValue("F".getBytes, "DBYTES".getBytes)).toDouble
    
            println("ip:" + ip + ",port:" + port + ",startTime:" + startTimeLong + ",endTime:" + endTimeLong + ",protocol:" + protocol + ",sum:" + sum)
    
            //ip+port+udp,14:02 14:07 List
            //ip+port+tcp,15:02 15:07 List
            val startTimeDate = sf.parse(startTimeLong)
            val endTimeLongDate = sf.parse(endTimeLong)
            val startHours = startTimeDate.getHours
            val startMinutes = startTimeDate.getMinutes
    
            val endHours = endTimeLongDate.getHours
            val endMinutes = endTimeLongDate.getMinutes
    
            val key1 = ip + "_" + port + "_" + protocol
            println("key1:" + key1)
    
            val key2 = startHours + ":" + startMinutes + "_" + endHours + ":" + endMinutes
    
            println("key2:" + key2)
    
            val tmpMap = map.get(key1)
    
            if (!tmpMap.isEmpty) {
              println("--------------------map is not null:" + tmpMap.size + "--------------------")
              val sumArray = tmpMap.get.get(key2)
              if (!sumArray.isEmpty) {
                sumArray.get += sum
              }
            } else {
              println("--------------------map is null--------------------")
              //如果当前Key不存在的话,是一个全新的Ip
              val sumArray = collection.mutable.ArrayBuffer[Double]()
              sumArray += sum
    
              val secondMap = HashMap[String, collection.mutable.ArrayBuffer[Double]]()
              secondMap += (key2 -> sumArray)
              map += (key1 -> secondMap)
            }
            map
            println("map size-----------------:" + map.size)
        }
    
        println("map size:" + map.size)
    
        map.map(e => {
          println("--------------------Statistics start --------------------")
          val resultKey1 = e._1
          val resultVal1 = e._2
          println("resultKey1:" + resultKey1)
          resultVal1.foreach(f => {
            val resultKey2 = f._1
            val resultVal2 = f._2
            println("resultKey2:" + resultKey2)
            println("-----------------resultVal2:" + resultVal2.length)
            
            resultVal2.map(f=>{
                println("------------------------f:"+f)
            })
    
            val dataArray = resultVal2.map(f => Vectors.dense(f))
    
            val summary: MultivariateStatisticalSummary = Statistics.colStats(sc.parallelize(dataArray))
    
            //
            println("--------------------mean:" + summary.mean + " --------------------")
            println("--------------------variance:" + summary.variance + " --------------------")
    
            println("--------------------mean apply 0:" + summary.mean.toArray.apply(0) + " --------------------")
            println("--------------------variance apply 0:" + summary.variance.apply(0) + " --------------------")
    
            val upbase = summary.mean.toArray.apply(0) + 1.960 * Math.sqrt(summary.variance.apply(0))
            val downbase = summary.mean.toArray.apply(0) - 1.960 * Math.sqrt(summary.variance.apply(0))
            println("------------------- " + upbase + " ---------- " + downbase)
            val df = new DecimalFormat(".##")
            val upbaseString = df.format(upbase)
            val downbaseString = df.format(downbase)
            //resultMap.put(key, value)
            val result3 = HashMap[Double, Double]()
            //result3 +=(upbase -> downbase)
            println("ip port:" + resultKey1 + ",time:" + resultKey2 + ",upbase:" + upbase + ",downbase:" + downbase)
          })
        })
    
        println("--------------------baseLine end --------------------")
        sc.stop()
    
  • 相关阅读:
    移动端解决fixed和input弹出虚拟键盘时样式错位
    JS的面向对象
    js计算两个时间范围间的间隔秒数
    使用js过滤字符串前后的空格
    C#时间格式-摘自http://www.cnblogs.com/xiaogongzhu/p/3825600.html
    [dp/贪心]435. 无重叠区间-----经典问题
    【dp】Leetcode面试题 17.16. 按摩师
    [dp]Leetcode.376.摆动序列
    Leetcode 945 使数组唯一的最小增量
    LeetCode 365.水壶问题
  • 原文地址:https://www.cnblogs.com/qq27271609/p/4941921.html
Copyright © 2011-2022 走看看