zoukankan      html  css  js  c++  java
  • Spark操作HBase

    Spark写HBase

      要通过Spark向 HBase 写入数据,我们需要用到PairRDDFunctions.saveAsHadoopDataset的方式

    package cn.com.win
    
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.Put
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapred.TableOutputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.mapred.JobConf
    import org.apache.log4j.Logger
    import org.apache.spark.{SparkConf, SparkContext}
    
    object TestHbase {
    
      def main(args: Array[String]) {
    
        val log = Logger.getLogger("TestHbase")
        //初始化Spark
        val conf = new SparkConf().setMaster("local[2]").setAppName("testHbase")
        val sc = new SparkContext(conf)
    
       // 定义HBase 的配置 val hconf = HBaseConfiguration.create() val jobConf = new JobConf(hconf, this.getClass)
       // 指定输出格式和输出的表名 jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE, "wifiTarget") val arr = Array(("tjiloaB#3#20190520", 10, 11), ("tjiloaB#3#20190521", 12, 22), ("tjiloaB#3#20190522", 13, 42)) val rdd = sc.parallelize(arr) val localData = rdd.map(convert) localData.saveAsHadoopDataset(jobConf) sc.stop() }  

    // 定义函数 RDD -> RDD[(ImmutableBytesWritable,Put)] def convert(triple: (String, Int, Int)) = { val p = new Put(Bytes.toBytes(triple._1)) p.addColumn(Bytes.toBytes("wifiTargetCF"), Bytes.toBytes("inNum"), Bytes.toBytes(triple._2)) p.addColumn(Bytes.toBytes("wifiTargetCF"), Bytes.toBytes("outNum"), Bytes.toBytes(triple._3)) (new ImmutableBytesWritable, p) } }

      执行结果:

      

     Spark读取HBase

      Spark读取HBase,我们主要使用SparkContext 提供的newAPIHadoopRDDAPI将表的内容以 RDDs 的形式加载到 Spark 中。

    指定列:

    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.Scan
    import org.apache.hadoop.hbase.filter.PrefixFilter
    import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
    import org.apache.hadoop.hbase.protobuf.ProtobufUtil
    import org.apache.hadoop.hbase.util.{Base64, Bytes}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * AUTHOR Guozy
      * DATE   2020/2/7-0:33
      **/
    object TestHbase2 {
      def main(args: Array[String]): Unit = {
    
        //初始化Spark
        val conf = new SparkConf().setMaster("local[2]").setAppName("testHbase")
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sc = new SparkContext(conf)
    
        val scan = new Scan()
        val filter = new PrefixFilter("tjiloaB#3#20190520".getBytes())
        scan.setFilter(filter)
        val hconf = HBaseConfiguration.create()
        hconf.set(TableInputFormat.INPUT_TABLE, "wifiTarget")
        hconf.set(TableInputFormat.SCAN, convertScanToString(scan))
    
        val dataRdd = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat],
          classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
          classOf[org.apache.hadoop.hbase.client.Result])
    
        val count = dataRdd.count()
        println("dataRdd Count is " + count)
        dataRdd.cache()
    
        dataRdd.map(_._2).filter(!_.isEmpty).take(20).foreach { result =>
          val key = Bytes.toString(result.getRow)
          val innum = Bytes.toInt(result.getValue(Bytes.toBytes("wifiTargetCF"), Bytes.toBytes("inNum")))
          val outnum = Bytes.toInt(result.getValue(Bytes.toBytes("wifiTargetCF"), Bytes.toBytes("outNum")))
          println(s"key:${key},inNum:${innum},outNum:${outnum}")
        }
        sc.stop()
      }
    
      /**
        * 将Scan转换为String
        */
      def convertScanToString(scan: Scan): String = {
        val proto = ProtobufUtil.toScan(scan);
        Base64.encodeBytes(proto.toByteArray());
      }

    运行结果:

    循环遍历列:

    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.Scan
    import org.apache.hadoop.hbase.filter.PrefixFilter
    import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
    import org.apache.hadoop.hbase.protobuf.ProtobufUtil
    import org.apache.hadoop.hbase.util.{Base64, Bytes}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * AUTHOR Guozy
      * DATE   2020/2/7-0:33
      **/
    object TestHbase2 {
      def main(args: Array[String]): Unit = {
    
        //初始化Spark
        val conf = new SparkConf().setMaster("local[2]").setAppName("testHbase")
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sc = new SparkContext(conf)
    
        val scan = new Scan()
        val filter = new PrefixFilter("tjiloaB#3#20190520".getBytes())
        scan.setFilter(filter)
        val hconf = HBaseConfiguration.create()
        hconf.set(TableInputFormat.INPUT_TABLE, "wifiTarget")
        hconf.set(TableInputFormat.SCAN, convertScanToString(scan))
    
        val dataRdd = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat],
          classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
          classOf[org.apache.hadoop.hbase.client.Result])
    
        val count = dataRdd.count()
        println("dataRdd Count is " + count)
        dataRdd.cache()
    
        dataRdd.map(_._2).filter(!_.isEmpty).take(20).foreach { result =>
          val key = Bytes.toString(result.getRow)
          val cells = result.listCells().iterator()
             while (cells.hasNext) {
               val cell = cells.next()
               val innum = Bytes.toInt(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
               val outnum = Bytes.toInt(result.getValue(Bytes.toBytes("wifiTargetCF"), Bytes.toBytes("outNum")))
               println(s"key:${key},inNum:${innum},outNum:${outnum}")
             }
        }
        sc.stop()
      }
    
      /**
        * 将Scan转换为String
        */
      def convertScanToString(scan: Scan): String = {
        val proto = ProtobufUtil.toScan(scan);
        Base64.encodeBytes(proto.toByteArray());
      }
    

    运行结果

    注意:在导入包的时候,TableInputFormat对应的包是 org.apache.hadoop.hbase.mapreduce,而不是 org.apache.hadoop.hbase.maped

     

  • 相关阅读:
    SpringBoot及Vue3.0 登录验证码实现
    SpringBoot 文件上传、下载、显示
    Redis持久化
    Redis持久化之RDB、AOF详解
    SpringBoot AOP 记录操作日志、异常日志
    JAVA后端《持续集成 持续交付 持续部署》
    MySQL Binlog 介绍
    谷歌浏览器chrome即将在2020年底停止支持flash,我们程序员该怎么办
    年薪170万的阿里P8级员工征婚有感--话说阿里真有钱,这员工要求的条件真多
    为什么百度只抓取了首页而不抓取我的网站的内页的原因分析
  • 原文地址:https://www.cnblogs.com/Gxiaobai/p/12275218.html
Copyright © 2011-2022 走看看