zoukankan      html  css  js  c++  java
  • spark读取hbase形成RDD,存入hive或者spark_sql分析

    object SaprkReadHbase {
        var total:Int = 0
        def main(args: Array[String]) {
          val spark = SparkSession
            .builder()
            .master("local[2]")
            .appName("Spark Read  Hbase ")
            .enableHiveSupport()    //如果要读取hive的表,就必须使用这个
            .getOrCreate()
         val sc= spark.sparkContext
    //zookeeper信息设置,存储着hbase的元信息
          val conf = HBaseConfiguration.create()
          conf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")
          conf.set("hbase.zookeeper.property.clientPort", "2181")
          conf.set(TableInputFormat.INPUT_TABLE, "event_logs_20190218")
    
          //读取数据并转化成rdd
          val hBaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
            classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], //定义输入格式
            classOf[org.apache.hadoop.hbase.client.Result]) //定义输出
          val count = hBaseRDD.count()
          println("
    
    
    :" + count)
          import spark.implicits._
        val logRDD: RDD[EventLog] = hBaseRDD.map{case (_,result) =>{
            //获取行键v
            val rowKey = Bytes.toString(result.getRow)
           val api_v=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("api_v")))
            val app_id=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("app_id")))
            val c_time=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("c_time")))
            val ch_id=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("ch_id")))
            val city=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("city")))
            val province=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("province")))
            val country=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("country")))
            val en=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("en")))
            val ip=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("ip")))
            val net_t=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("net_t")))
            val pl=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("pl")))
            val s_time=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("s_time")))
            val user_id=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("user_id")))
            val uuid=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("uuid")))
            val ver=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("ver")))
    //样例类进行schemal信息构建。元组与样例类的字段值据说不能超过22个,一般structureType构建(row,schemal) new EventLog(rowKey,api_v,app_id,c_time,ch_id,city,province,country,en,ip,net_t,pl,s_time,user_id,uuid,ver) } }
    //可以转为dataframe、dataset存入hive作为宽表 或者直接进行sparkcore分析 val logds= logRDD.toDS() logds.createTempView("event_logs") val sq= spark.sql("select * from event_logs limit 1") println(sq.explain()) sq.show() sc.stop() spark.stop() } }


    //write hbase
    /**
    * @created by imp ON 2018/2/19
    */
    class SparkWriteHbase {
    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local")
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "hadoop01,hadoop02,hadoop03")
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set(TableOutputFormat.OUTPUT_TABLE, "test")
    val job = new Job(conf)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    var arrResult: Array[String] = new Array[String](1)
    arrResult(0) = "1, 3000000000";
    //arrResult(0) = "1,100,11"

    val resultRDD = sc.makeRDD(arrResult)
    val saveRDD = resultRDD.map(_.split(',')).map { arr => {
    val put = new Put(Bytes.toBytes(arr(0)))
    put.add(Bytes.toBytes("info"), Bytes.toBytes("total"), Bytes.toBytes(arr(1)))
    (new ImmutableBytesWritable, put)
    }
    }
    println("getConfiguration")
    var c = job.getConfiguration()
    println("save")
    saveRDD.saveAsNewAPIHadoopDataset(c)

    sc.stop()
    // spark.stop()
    }

    }
     
  • 相关阅读:
    hdu 1290 献给杭电五十周年校庆的礼物 (DP)
    hdu 3123 GCC (数学)
    hdu 1207 汉诺塔II (DP)
    hdu 1267 下沙的沙子有几粒? (DP)
    hdu 1249 三角形 (DP)
    hdu 2132 An easy problem (递推)
    hdu 2139 Calculate the formula (递推)
    hdu 1284 钱币兑换问题 (DP)
    hdu 4151 The Special Number (DP)
    hdu 1143 Tri Tiling (DP)
  • 原文地址:https://www.cnblogs.com/hejunhong/p/10403572.html
Copyright © 2011-2022 走看看