zoukankan      html  css  js  c++  java
  • Spark操作HBase

    Spark写HBase

      要通过Spark向 HBase 写入数据,我们需要用到PairRDDFunctions.saveAsHadoopDataset的方式

    package cn.com.win
    
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.Put
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapred.TableOutputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.mapred.JobConf
    import org.apache.log4j.Logger
    import org.apache.spark.{SparkConf, SparkContext}
    
    object TestHbase {
    
      def main(args: Array[String]) {
    
        val log = Logger.getLogger("TestHbase")
        //初始化Spark
        val conf = new SparkConf().setMaster("local[2]").setAppName("testHbase")
        val sc = new SparkContext(conf)
    
       // 定义HBase 的配置 val hconf = HBaseConfiguration.create() val jobConf = new JobConf(hconf, this.getClass)
       // 指定输出格式和输出的表名 jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE, "wifiTarget") val arr = Array(("tjiloaB#3#20190520", 10, 11), ("tjiloaB#3#20190521", 12, 22), ("tjiloaB#3#20190522", 13, 42)) val rdd = sc.parallelize(arr) val localData = rdd.map(convert) localData.saveAsHadoopDataset(jobConf) sc.stop() }  

    // 定义函数 RDD -> RDD[(ImmutableBytesWritable,Put)] def convert(triple: (String, Int, Int)) = { val p = new Put(Bytes.toBytes(triple._1)) p.addColumn(Bytes.toBytes("wifiTargetCF"), Bytes.toBytes("inNum"), Bytes.toBytes(triple._2)) p.addColumn(Bytes.toBytes("wifiTargetCF"), Bytes.toBytes("outNum"), Bytes.toBytes(triple._3)) (new ImmutableBytesWritable, p) } }

      执行结果:

      

     Spark读取HBase

      Spark读取HBase,我们主要使用SparkContext 提供的newAPIHadoopRDDAPI将表的内容以 RDDs 的形式加载到 Spark 中。

    指定列:

    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.Scan
    import org.apache.hadoop.hbase.filter.PrefixFilter
    import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
    import org.apache.hadoop.hbase.protobuf.ProtobufUtil
    import org.apache.hadoop.hbase.util.{Base64, Bytes}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * AUTHOR Guozy
      * DATE   2020/2/7-0:33
      **/
    object TestHbase2 {
      def main(args: Array[String]): Unit = {
    
        //初始化Spark
        val conf = new SparkConf().setMaster("local[2]").setAppName("testHbase")
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sc = new SparkContext(conf)
    
        val scan = new Scan()
        val filter = new PrefixFilter("tjiloaB#3#20190520".getBytes())
        scan.setFilter(filter)
        val hconf = HBaseConfiguration.create()
        hconf.set(TableInputFormat.INPUT_TABLE, "wifiTarget")
        hconf.set(TableInputFormat.SCAN, convertScanToString(scan))
    
        val dataRdd = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat],
          classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
          classOf[org.apache.hadoop.hbase.client.Result])
    
        val count = dataRdd.count()
        println("dataRdd Count is " + count)
        dataRdd.cache()
    
        dataRdd.map(_._2).filter(!_.isEmpty).take(20).foreach { result =>
          val key = Bytes.toString(result.getRow)
          val innum = Bytes.toInt(result.getValue(Bytes.toBytes("wifiTargetCF"), Bytes.toBytes("inNum")))
          val outnum = Bytes.toInt(result.getValue(Bytes.toBytes("wifiTargetCF"), Bytes.toBytes("outNum")))
          println(s"key:${key},inNum:${innum},outNum:${outnum}")
        }
        sc.stop()
      }
    
      /**
        * 将Scan转换为String
        */
      def convertScanToString(scan: Scan): String = {
        val proto = ProtobufUtil.toScan(scan);
        Base64.encodeBytes(proto.toByteArray());
      }

    运行结果:

    循环遍历列:

    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.Scan
    import org.apache.hadoop.hbase.filter.PrefixFilter
    import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
    import org.apache.hadoop.hbase.protobuf.ProtobufUtil
    import org.apache.hadoop.hbase.util.{Base64, Bytes}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * AUTHOR Guozy
      * DATE   2020/2/7-0:33
      **/
    object TestHbase2 {
      def main(args: Array[String]): Unit = {
    
        //初始化Spark
        val conf = new SparkConf().setMaster("local[2]").setAppName("testHbase")
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sc = new SparkContext(conf)
    
        val scan = new Scan()
        val filter = new PrefixFilter("tjiloaB#3#20190520".getBytes())
        scan.setFilter(filter)
        val hconf = HBaseConfiguration.create()
        hconf.set(TableInputFormat.INPUT_TABLE, "wifiTarget")
        hconf.set(TableInputFormat.SCAN, convertScanToString(scan))
    
        val dataRdd = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat],
          classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
          classOf[org.apache.hadoop.hbase.client.Result])
    
        val count = dataRdd.count()
        println("dataRdd Count is " + count)
        dataRdd.cache()
    
        dataRdd.map(_._2).filter(!_.isEmpty).take(20).foreach { result =>
          val key = Bytes.toString(result.getRow)
          val cells = result.listCells().iterator()
             while (cells.hasNext) {
               val cell = cells.next()
               val innum = Bytes.toInt(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
               val outnum = Bytes.toInt(result.getValue(Bytes.toBytes("wifiTargetCF"), Bytes.toBytes("outNum")))
               println(s"key:${key},inNum:${innum},outNum:${outnum}")
             }
        }
        sc.stop()
      }
    
      /**
        * 将Scan转换为String
        */
      def convertScanToString(scan: Scan): String = {
        val proto = ProtobufUtil.toScan(scan);
        Base64.encodeBytes(proto.toByteArray());
      }
    

    运行结果

    注意:在导入包的时候,TableInputFormat对应的包是 org.apache.hadoop.hbase.mapreduce,而不是 org.apache.hadoop.hbase.maped

     

  • 相关阅读:
    linux 命令——19 find (转)
    linux 命令——18 locate (转)
    linux 命令——17 whereis(转)
    linux 命令——16 which(转)
    linux 命令——15 tail (转)
    linux 命令——14 head (转)
    Java for LeetCode 038 Count and Say
    Java for LeetCode 037 Sudoku Solver
    Java for LeetCode 036 Valid Sudoku
    Java for LeetCode 035 Search Insert Position
  • 原文地址:https://www.cnblogs.com/Gxiaobai/p/12275218.html
Copyright © 2011-2022 走看看