zoukankan      html  css  js  c++  java
  • spark读写hbase的几种方式,及读写相关问题

    读写方式

    其实个人最近访问hbase 都是通过shc df/sql 来访问的

    df的读写方式,足够覆盖90%的场景,但该方案有一个前提是,明确的知道hbase 的列族和列信息,新项目都会规范这一点,可以使用

    但有些历史包袱的项目,列族是明确的,但是列族里的列信息是不明确的,正好要做一个旧项目列的标准化

    每行数据列信息都不一样,部分多列,部分少列,必须读一条,解析一条,因此df之类的方案不适用

    也借此,整理下读写hbase的实现方案-其实各方案网上有很多介绍了

    Which Spark HBase Connector to use in 2020? — SparkByExamples

    方式一

    纯粹调用java api实现,这种在spark任务时很少用,都有spark环境了,通过java api读写是可行,但是性能一般,spark hbase的集成方案,底层实现是直接读的region下HFile文件,性能比纯粹的java api 高些。像spark-streaming,flink之类流处理写入hbase可以用这种api类的方案,但批处理大量读写的场景,并不推荐

    sc.makeRDD(
          Seq(("10003", "page#10003", 20), ("10004", "page#10004", 60))
        ).foreachPartition(iterator => {
          val conf = HBaseConfiguration.create()
          conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
          conf.set("hbase.zookeeper.quorum", zkHost)
          conf.set("zookeeper.znode.parent", "/hbase-unsecure")
          val connection = ConnectionFactory.createConnection(conf)
          val table = connection.getTable(TableName.valueOf(tableName))
          iterator.foreach(t => {
            val put = new Put(Bytes.toBytes(t._1))
            put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes(t._2))
            put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("b"), Bytes.toBytes(t._3))
            table.put(put)
          })
          table.close()
          connection.close()
        })
    
    方式二 类似m/r的方式,最基础的读写方法,直接读写regin hfile性能较高,对hbase的负担也小
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkReadHBaseDemo")
        val sc = new SparkContext(sparkConf)
        val scan = new Scan()
        scan.addFamily(Bytes.toBytes("cf"))
        val proto = ProtobufUtil.toScan(scan)
        val scanToString = Base64.encodeBytes(proto.toByteArray)
        val conf = HBaseConfiguration.create()
        conf.set(TableInputFormat.INPUT_TABLE, option.getTableName)
        conf.set(TableInputFormat.SCAN, scanToString)
        conf.set("hbase.zookeeper.quorum", option.getZkHost)
        conf.set("zookeeper.znode.parent", "/hbase-unsecure")
    
    //读
        sc.newAPIHadoopRDD(conf,
          classOf[TableInputFormat],
          classOf[ImmutableBytesWritable],
          classOf[org.apache.hadoop.hbase.client.Result]
        ).foreach(t => {
          val result = t._2
          println("====================row=========================>>" + Bytes.toString(result.getRow))
          println("====================cf:a=========================>>" + Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("a"))))
          println("====================cf:b=========================>>" + Bytes.toInt(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("b"))))
        })
    //写
        sc.makeRDD(
          Seq(("10001", "page#10001", 30), ("10002", "page#10002", 50))
        ).map(t => {
          val put = new Put(Bytes.toBytes(t._1))
          put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes(t._2))
          put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("b"), Bytes.toBytes(t._3))
          (new ImmutableBytesWritable(), put)
        }).saveAsNewAPIHadoopDataset(job.getConfiguration)
    

    方式三 connectors 这个方案有

    https://mvnrepository.com/artifact/org.apache.hbase.connectors.spark/hbase-spark
            <dependency>
                <groupId>org.apache.hbase.connectors.spark</groupId>
                <artifactId>hbase-spark</artifactId>
                <version>replace version here</version>
            </dependency>        
            或
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-spark</artifactId>
                <version>1.2.0-cdh5.16.99</version>
            <dependency>                           
    

    主要是版本问题,spark版本更新较快,大版本1.6,2.x,3.x,但是因为很多大公司的底层大数据解决方案是cdh之类,考虑稳定性和升级成本,几年都不更新的,即使更新了,其集成的hbase版本也依然较低,因此业内很多hbase版本都比较低,还是1.0,1.2的样子,1.x版本间兼容性良好,但是2.0和1.0 就有问题了

    org.apache.hbase.connectors.spark 提供的包 原生支持spark 2.x 但是只支持hbase 2.0 对hbase1.x 会有异常

    org.apache.hbase的包呢,正相反,原生支持的spark是1.6(太老了),但是支持hbase 1.0 spark2.x 调度会有些问题,不过问题也好解决

    个人环境是spark 2.x hbase 1.x 因此选择 org.apache.hbase 的 hbase-spark

        val sparkConf = new SparkConf().setAppName("HBaseDistributedScanExample " + tableName )
        val sc = new SparkContext(sparkConf)
        val conf = HBaseConfiguration.create()
         val scan = new Scan()
        scan.setCaching(1000)
        val hbaseContext = new HBaseContext(sc, conf)
    //读
        val rdd=hbaseContext.hbaseRDD(TableName.valueOf("ns_old:"+tableName),scan).map(row=>{
          val index_name=row._2.getValue(cf,qualifier_doc_index)
          val type_name=row._2.getValue(cf,qualifier_doc_type)
          (Bytes.toString(index_name),Bytes.toString(type_name),row)
        }).hbaseForeachPartition(hbaseContext,(it,connection)=>{
                //写
            val m=connection.getBufferedMutator(targetTable)      
            it.foreach(r=>{
              val put = new Put(hbase_result.getRow)
              for (cell <- hbase_result.listCells()) {
                put.add(cell)
              }          
              m.mutate(put)
            })
              m.flush()
              m.close()
          })
    

    方式四 shc df方案

    之前有写过相关的博客

    spark 使用shc 访问hbase超时问题解决办法 - 资本主义接班人 - 博客园 (cnblogs.com)

    df shc 见 Use Spark to read and write HBase data - Azure HDInsight | Microsoft Docs

    Create Spark DataFrame from HBase using Hortonworks — SparkByExamples

    object HBaseSparkRead {
    
      def main(args: Array[String]): Unit = {
    
        def catalog =
          s"""{
             |"table":{"namespace":"default", "name":"employee"},
             |"rowkey":"key",
             |"columns":{
             |"key":{"cf":"rowkey", "col":"key", "type":"string"},
             |"fName":{"cf":"person", "col":"firstName", "type":"string"},
             |"lName":{"cf":"person", "col":"lastName", "type":"string"},
             |"mName":{"cf":"person", "col":"middleName", "type":"string"},
             |"addressLine":{"cf":"address", "col":"addressLine", "type":"string"},
             |"city":{"cf":"address", "col":"city", "type":"string"},
             |"state":{"cf":"address", "col":"state", "type":"string"},
             |"zipCode":{"cf":"address", "col":"zipCode", "type":"string"}
             |}
             |}""".stripMargin
    
        val sparkSession: SparkSession = SparkSession.builder()
          .master("local[1]")
          .appName("SparkByExamples.com")
          .getOrCreate()
    
        import sparkSession.implicits._
    
        val hbaseDF = sparkSession.read
          .options(Map(HBaseTableCatalog.tableCatalog -> catalog))
          .format("org.apache.spark.sql.execution.datasources.hbase")
          .load()
    
        hbaseDF.printSchema()
    
        hbaseDF.show(false)
    
        hbaseDF.filter($"key" === "1" && $"state" === "FL")
          .select("key", "fName", "lName")
          .show()
    
        //Create Temporary Table on DataFrame
        hbaseDF.createOrReplaceTempView("employeeTable")
    
        //Run SQL
        sparkSession.sql("select * from employeeTable where fName = 'Amaya' ").show
    
      }
    }
    

    明确结构的数据使用 方式四 df ,结构不明确的使用方式二 或 方式三(个人更倾向方式三)

  • 相关阅读:
    element input搜索框探索
    Github网站css加载不出来的处理方法(转,亲测有效)
    通过用axios发送请求,全局拦截请求,获取到错误弄明白promise对象
    vuex和localStorage/sessionStorage 区别
    leetcode刷题笔记十一 盛最多水的容器 Scala版本
    leetcode刷题笔记十 正则表达式 Scala版本
    leetcode刷题笔记九 回文数 Scala版本
    leetcode刷题笔记八 字符串转整性 Scala版本
    leetcode刷题笔记七 整数反转 Scala版本
    leetcode刷题笔记六 Z字型转换 Scala版本
  • 原文地址:https://www.cnblogs.com/zihunqingxin/p/14916131.html
Copyright © 2011-2022 走看看