zoukankan      html  css  js  c++  java
  • spark读写hbase的几种方式,及读写相关问题

    读写方式

    其实个人最近访问hbase 都是通过shc df/sql 来访问的

    df的读写方式,足够覆盖90%的场景,但该方案有一个前提是,明确的知道hbase 的列族和列信息,新项目都会规范这一点,可以使用

    但有些历史包袱的项目,列族是明确的,但是列族里的列信息是不明确的,正好要做一个旧项目列的标准化

    每行数据列信息都不一样,部分多列,部分少列,必须读一条,解析一条,因此df之类的方案不适用

    也借此,整理下读写hbase的实现方案-其实各方案网上有很多介绍了

    Which Spark HBase Connector to use in 2020? — SparkByExamples

    方式一

    纯粹调用java api实现,这种在spark任务时很少用,都有spark环境了,通过java api读写是可行,但是性能一般,spark hbase的集成方案,底层实现是直接读的region下HFile文件,性能比纯粹的java api 高些。像spark-streaming,flink之类流处理写入hbase可以用这种api类的方案,但批处理大量读写的场景,并不推荐

    sc.makeRDD(
          Seq(("10003", "page#10003", 20), ("10004", "page#10004", 60))
        ).foreachPartition(iterator => {
          val conf = HBaseConfiguration.create()
          conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
          conf.set("hbase.zookeeper.quorum", zkHost)
          conf.set("zookeeper.znode.parent", "/hbase-unsecure")
          val connection = ConnectionFactory.createConnection(conf)
          val table = connection.getTable(TableName.valueOf(tableName))
          iterator.foreach(t => {
            val put = new Put(Bytes.toBytes(t._1))
            put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes(t._2))
            put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("b"), Bytes.toBytes(t._3))
            table.put(put)
          })
          table.close()
          connection.close()
        })
    
    方式二 类似m/r的方式,最基础的读写方法,直接读写regin hfile性能较高,对hbase的负担也小
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkReadHBaseDemo")
        val sc = new SparkContext(sparkConf)
        val scan = new Scan()
        scan.addFamily(Bytes.toBytes("cf"))
        val proto = ProtobufUtil.toScan(scan)
        val scanToString = Base64.encodeBytes(proto.toByteArray)
        val conf = HBaseConfiguration.create()
        conf.set(TableInputFormat.INPUT_TABLE, option.getTableName)
        conf.set(TableInputFormat.SCAN, scanToString)
        conf.set("hbase.zookeeper.quorum", option.getZkHost)
        conf.set("zookeeper.znode.parent", "/hbase-unsecure")
    
    //读
        sc.newAPIHadoopRDD(conf,
          classOf[TableInputFormat],
          classOf[ImmutableBytesWritable],
          classOf[org.apache.hadoop.hbase.client.Result]
        ).foreach(t => {
          val result = t._2
          println("====================row=========================>>" + Bytes.toString(result.getRow))
          println("====================cf:a=========================>>" + Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("a"))))
          println("====================cf:b=========================>>" + Bytes.toInt(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("b"))))
        })
    //写
        sc.makeRDD(
          Seq(("10001", "page#10001", 30), ("10002", "page#10002", 50))
        ).map(t => {
          val put = new Put(Bytes.toBytes(t._1))
          put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes(t._2))
          put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("b"), Bytes.toBytes(t._3))
          (new ImmutableBytesWritable(), put)
        }).saveAsNewAPIHadoopDataset(job.getConfiguration)
    

    方式三 connectors 这个方案有

    https://mvnrepository.com/artifact/org.apache.hbase.connectors.spark/hbase-spark
            <dependency>
                <groupId>org.apache.hbase.connectors.spark</groupId>
                <artifactId>hbase-spark</artifactId>
                <version>replace version here</version>
            </dependency>        
            或
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-spark</artifactId>
                <version>1.2.0-cdh5.16.99</version>
            <dependency>                           
    

    主要是版本问题,spark版本更新较快,大版本1.6,2.x,3.x,但是因为很多大公司的底层大数据解决方案是cdh之类,考虑稳定性和升级成本,几年都不更新的,即使更新了,其集成的hbase版本也依然较低,因此业内很多hbase版本都比较低,还是1.0,1.2的样子,1.x版本间兼容性良好,但是2.0和1.0 就有问题了

    org.apache.hbase.connectors.spark 提供的包 原生支持spark 2.x 但是只支持hbase 2.0 对hbase1.x 会有异常

    org.apache.hbase的包呢,正相反,原生支持的spark是1.6(太老了),但是支持hbase 1.0 spark2.x 调度会有些问题,不过问题也好解决

    个人环境是spark 2.x hbase 1.x 因此选择 org.apache.hbase 的 hbase-spark

        val sparkConf = new SparkConf().setAppName("HBaseDistributedScanExample " + tableName )
        val sc = new SparkContext(sparkConf)
        val conf = HBaseConfiguration.create()
         val scan = new Scan()
        scan.setCaching(1000)
        val hbaseContext = new HBaseContext(sc, conf)
    //读
        val rdd=hbaseContext.hbaseRDD(TableName.valueOf("ns_old:"+tableName),scan).map(row=>{
          val index_name=row._2.getValue(cf,qualifier_doc_index)
          val type_name=row._2.getValue(cf,qualifier_doc_type)
          (Bytes.toString(index_name),Bytes.toString(type_name),row)
        }).hbaseForeachPartition(hbaseContext,(it,connection)=>{
                //写
            val m=connection.getBufferedMutator(targetTable)      
            it.foreach(r=>{
              val put = new Put(hbase_result.getRow)
              for (cell <- hbase_result.listCells()) {
                put.add(cell)
              }          
              m.mutate(put)
            })
              m.flush()
              m.close()
          })
    

    方式四 shc df方案

    之前有写过相关的博客

    spark 使用shc 访问hbase超时问题解决办法 - 资本主义接班人 - 博客园 (cnblogs.com)

    df shc 见 Use Spark to read and write HBase data - Azure HDInsight | Microsoft Docs

    Create Spark DataFrame from HBase using Hortonworks — SparkByExamples

    object HBaseSparkRead {
    
      def main(args: Array[String]): Unit = {
    
        def catalog =
          s"""{
             |"table":{"namespace":"default", "name":"employee"},
             |"rowkey":"key",
             |"columns":{
             |"key":{"cf":"rowkey", "col":"key", "type":"string"},
             |"fName":{"cf":"person", "col":"firstName", "type":"string"},
             |"lName":{"cf":"person", "col":"lastName", "type":"string"},
             |"mName":{"cf":"person", "col":"middleName", "type":"string"},
             |"addressLine":{"cf":"address", "col":"addressLine", "type":"string"},
             |"city":{"cf":"address", "col":"city", "type":"string"},
             |"state":{"cf":"address", "col":"state", "type":"string"},
             |"zipCode":{"cf":"address", "col":"zipCode", "type":"string"}
             |}
             |}""".stripMargin
    
        val sparkSession: SparkSession = SparkSession.builder()
          .master("local[1]")
          .appName("SparkByExamples.com")
          .getOrCreate()
    
        import sparkSession.implicits._
    
        val hbaseDF = sparkSession.read
          .options(Map(HBaseTableCatalog.tableCatalog -> catalog))
          .format("org.apache.spark.sql.execution.datasources.hbase")
          .load()
    
        hbaseDF.printSchema()
    
        hbaseDF.show(false)
    
        hbaseDF.filter($"key" === "1" && $"state" === "FL")
          .select("key", "fName", "lName")
          .show()
    
        //Create Temporary Table on DataFrame
        hbaseDF.createOrReplaceTempView("employeeTable")
    
        //Run SQL
        sparkSession.sql("select * from employeeTable where fName = 'Amaya' ").show
    
      }
    }
    

    明确结构的数据使用 方式四 df ,结构不明确的使用方式二 或 方式三(个人更倾向方式三)

  • 相关阅读:
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    《EffectiveJava中文第二版》 高清PDF下载
    《MoreEffectiveC++中文版》 pdf 下载
    《啊哈c语言》 高清 PDF 下载
  • 原文地址:https://www.cnblogs.com/zihunqingxin/p/14916131.html
Copyright © 2011-2022 走看看