zoukankan      html  css  js  c++  java
  • sparkonhbase

    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.Result
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    
    
    import org.apache.hadoop.hbase.spark.HBaseContext
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.SparkSession
    
    import org.apache.hadoop.hbase.NamespaceDescriptor
    import org.apache.hadoop.hbase.TableName
    import org.apache.hadoop.hbase.client._
    
    import org.apache.hadoop.hbase.filter._
    import org.apache.hadoop.hbase.io.compress.Compression
    import org.apache.hadoop.hbase.util.Bytes
    /**
      * Created by lq on 2017/9/7.
      */
    object spark2habse {
    
    }
    object Sparkonhbase {
      val spark=SparkSession.builder().appName("").master("").getOrCreate()
      val sc=spark.sparkContext
      val conf= HBaseConfiguration.create()
      val habsecontext=new HBaseContext(sc,conf)
    
    
      def scanHbaseTB(tableName:String)(implicit startKey:Option[String],endKey:Option[String]):RDD[(ImmutableBytesWritable,Result)]={
        //如果有StartRowKey根据提供查询
        startKey match {
          case Some(x)=>{
            val scan=new Scan()
            scan.setStartRow(Bytes.toBytes(x))
            scan.setStopRow(Bytes.toBytes(endKey.getOrElse(x)))
            val hbaeRDD=habsecontext.hbaseRDD(TableName.valueOf(tableName),scan)
            hbaeRDD
          }
          case None=>{
            val scan=new Scan()
            val hbaeRDD=habsecontext.hbaseRDD(TableName.valueOf(tableName),scan)
            hbaeRDD
          }
        }
    
    
        def main(args: Array[String]): Unit = {
          //传统方式
          conf.set(TableInputFormat.SCAN_ROW_START, "startrowkey")
          conf.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey")
          conf.set(TableInputFormat.INPUT_TABLE, "SparkHbase")
          val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    
    
    
          //利用HbaseContext进行操作
          val SparkHbaseRDD=scanHbaseTB("SparkHbase")
          SparkHbaseRDD.foreach(x=>{
            val rowKey=x._1.toString
            val rs=x._2
            val cell=rs.getColumnLatestCell(Bytes.toBytes(""),Bytes.toBytes(""))
            println(s"the rowKey is $rowKey the values is $cell")
          })
    
    
        }
    
    
      }
    }
    http://blog.csdn.net/UnionIBM/article/details/77850979 
         <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-spark</artifactId>
                <version>2.0.0-alpha2<ersion>
            </dependency>
    这个是spark2.0里面的,低版本的只有cdh的依赖.
  • 相关阅读:
    以&quot;小刀会“的成败论当今创业成败
    COCOS2D 学习笔记
    password加密的算法
    bzoj1087【SCOI2005】互不侵犯King
    HDU--2222--Keywords Search--AC自己主动机
    【leetcode】Subsets II (middle) ☆
    【leetcode】Word Search (middle)
    【hadoop2.6.0】利用JAVA API 实现数据上传
    【leetcode】Palindrome Partitioning II(hard) ☆
    【hadoop2.6.0】利用Hadoop的 Java API
  • 原文地址:https://www.cnblogs.com/rocky-AGE-24/p/7489079.html
Copyright © 2011-2022 走看看