zoukankan      html  css  js  c++  java
  • spark bulkload hbase笔记

    1. 现有的三方包不能完全支持
    - 官方:hbase-spark,不能设置 timestamp
    - unicredit/hbase-rdd:接口太复杂,不能同时支持多个 family

    2. HFile 得是有序的,排序依据 KeyValue.KVComparator,于是我们自定义一个 Comparator,内部调用 KeyValue.KVComparator

    3. 如果没有自定义 partitioner,极有可能出现以下异常
    ERROR: "java.io.IOException: Retry attempted 10 times without completing, bailing out"
    https://community.hortonworks.com/content/supportkb/150138/error-javaioioexception-retry-attempted-10-times-w.html

    自定义的方法,参考了:https://github.com/unicredit/hbase-rdd/blob/master/src/main/scala/unicredit/spark/hbase/HFileSupport.scala

    4. 很多博客中有以下代码,一开始理解为可以用来对 rdd 分区,实际没有用。这是 mapreduce 的 job 参数,spark中不生效
    val job = Job.getInstance(hbaseConfig)
    HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor, regionLocator)
    job.getConfiguration


    其他知识点:
    1. scala 中实现 serializable 接口
    2. HFilePartitioner,使用 hbase 的 regionLocator.getStartKeys,将 rdd 中的 put,按 rowkey 分割成不同的 partition,每个 partition 会产生一个 hfile,对应于 hbase region 的分区


    代码,以后整理:

    object BulkloadHelper {
      private val logger = Logger.getLogger(this.getClass)
    
      def bulkloadWrite(rdd: RDD[Put], hbaseConfig: Configuration, thisTableName: TableName): Unit = {
        val hbaseConnection = ConnectionFactory.createConnection(hbaseConfig)
        val regionLocator = hbaseConnection.getRegionLocator(thisTableName)
        val myPartitioner = HFilePartitioner.apply(hbaseConfig, regionLocator.getStartKeys, 1)
    
        logger.info(s"regionLocator.getStartKeys.length = ${regionLocator.getStartKeys.length}")
        regionLocator.getStartKeys.foreach(keys => logger.info("regionLocator.getStartKeys: " + new String(keys)))
    
        val hFilePath = getHFilePath()
        logger.info(s"bulkload, begin to write to hdfs path: $hFilePath")
    
        /**
          * HFile sort function -> KeyValue.KVComparator
          *                        CellComparator
          */
        rdd.flatMap(put => putToKeyValueList(put))
          .map(c => (c, 1))
          .repartitionAndSortWithinPartitions(myPartitioner) // repartition so each hfile can match the hbase region
          .map(tuple => (new ImmutableBytesWritable(tuple._1.row), tuple._1.getKeyValue()))
          .saveAsNewAPIHadoopFile(
            hFilePath,
            classOf[ImmutableBytesWritable],
            classOf[KeyValue],
            classOf[HFileOutputFormat2],
            hbaseConfig)
    
        //  Bulk load Hfiles to Hbase
        logger.info("bulkload, begin to load to hbase")
        val bulkLoader = new LoadIncrementalHFiles(hbaseConfig)
        bulkLoader.doBulkLoad(new Path(hFilePath), new HTable(hbaseConfig, thisTableName))
    
        logger.info("bulkload, delete hdfs path")
        val hadoopConf = new Configuration()
        val fileSystem = FileSystem.get(hadoopConf)
        fileSystem.delete(new Path(hFilePath), true)
        hbaseConnection.close()
        fileSystem.close()
        logger.info("bulkload, done")
      }
    
      def getHFilePath():String = "hdfs:///user/hadoop/hbase/bulkload/hfile/" + LocalDate.now().toString + "-" + UUID.randomUUID().toString
    
      /**
        * select one keyvalue from put
        * @param put
        */
      def putToKeyValueList(put: Put): Seq[MyKeyValue] = {
        put.getFamilyCellMap.asScala
          .flatMap(_._2.asScala) // list cells
          .map(cell => new MyKeyValue(put.getRow, cell.getFamily, cell.getQualifier, cell.getTimestamp, cell.getValue))
          .toSeq
      }
    }
    

      

    class MyKeyValue(var row: Array[Byte], var family: Array[Byte], var qualifier: Array[Byte], var timestamp: Long, var value: Array[Byte])
      extends Serializable with Ordered[MyKeyValue] {
    
      import java.io.IOException
      import java.io.ObjectInputStream
      import java.io.ObjectOutputStream
    
      var keyValue: KeyValue = _
    
      def getKeyValue(): KeyValue = {
        if (keyValue == null) {
          keyValue = new KeyValue(row, family, qualifier, timestamp, value)
        }
        keyValue
      }
    
      @throws[IOException]
      private def writeObject(out: ObjectOutputStream) {
        keyValue = null
        out.defaultWriteObject()
        out.writeObject(this)
      }
    
      @throws[IOException]
      @throws[ClassNotFoundException]
      private def readObject(in: ObjectInputStream) {
        in.defaultReadObject()
        val newKeyValue = in.readObject().asInstanceOf[MyKeyValue]
        this.row = newKeyValue.row
        this.family = newKeyValue.family
        this.qualifier = newKeyValue.qualifier
        this.timestamp = newKeyValue.timestamp
        this.value = newKeyValue.value
        getKeyValue()
      }
    
      class MyComparator extends KeyValue.KVComparator with Serializable {}
      val comparator = new MyComparator()
    
      override def compare(that: MyKeyValue): Int = {
        comparator.compare(this.getKeyValue(), that.getKeyValue())
      }
    
      override def toString: String = {
        getKeyValue().toString
      }
    }
    

      

    object HFilePartitionerHelper {
      object HFilePartitioner {
        def apply(conf: Configuration, splits: Array[Array[Byte]], numFilesPerRegionPerFamily: Int): HFilePartitioner = {
          if (numFilesPerRegionPerFamily == 1)
            new SingleHFilePartitioner(splits)
          else {
            val fraction = 1 max numFilesPerRegionPerFamily min conf.getInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, 32)
            new MultiHFilePartitioner(splits, fraction)
          }
        }
      }
    
      protected abstract class HFilePartitioner extends Partitioner {
        def extractKey(n: Any): Array[Byte] = {
    //      println(s"n = $n")
          n match {
            case kv: MyKeyValue => kv.row
          }
        }
      }
    
      private class MultiHFilePartitioner(splits: Array[Array[Byte]], fraction: Int) extends HFilePartitioner {
        override def getPartition(key: Any): Int = {
          val k = extractKey(key)
          val h = (k.hashCode() & Int.MaxValue) % fraction
          for (i <- 1 until splits.length)
            if (Bytes.compareTo(k, splits(i)) < 0) return (i - 1) * fraction + h
    
          (splits.length - 1) * fraction + h
        }
    
        override def numPartitions: Int = splits.length * fraction
      }
    
      private class SingleHFilePartitioner(splits: Array[Array[Byte]]) extends HFilePartitioner {
        override def getPartition(key: Any): Int = {
          val p = selfGetPartition(key)
    //      println(s"p = $p")
          p
        }
    
        def selfGetPartition(key: Any): Int = {
          val k = extractKey(key)
          for (i <- 1 until splits.length)
            if (Bytes.compareTo(k, splits(i)) < 0) return i - 1
    
          splits.length - 1
        }
    
        override def numPartitions: Int = splits.length
      }
    }
    

      

  • 相关阅读:
    python 安装mysql-python模块
    Linux 目录结构
    python logging 配置
    mac下docker使用笔记
    centos yum 使用笔记
    python 正则使用笔记
    django 动态更新属性值
    nginx 反向代理
    yii2 高级版新建一个应用(api应用为例子)
    tfn2k工具使用介绍
  • 原文地址:https://www.cnblogs.com/keepthinking/p/10364869.html
Copyright © 2011-2022 走看看