import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.spark.HBaseContext import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.hadoop.hbase.NamespaceDescriptor import org.apache.hadoop.hbase.TableName import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.filter._ import org.apache.hadoop.hbase.io.compress.Compression import org.apache.hadoop.hbase.util.Bytes /** * Created by lq on 2017/9/7. */ object spark2habse { } object Sparkonhbase { val spark=SparkSession.builder().appName("").master("").getOrCreate() val sc=spark.sparkContext val conf= HBaseConfiguration.create() val habsecontext=new HBaseContext(sc,conf) def scanHbaseTB(tableName:String)(implicit startKey:Option[String],endKey:Option[String]):RDD[(ImmutableBytesWritable,Result)]={ //如果有StartRowKey根据提供查询 startKey match { case Some(x)=>{ val scan=new Scan() scan.setStartRow(Bytes.toBytes(x)) scan.setStopRow(Bytes.toBytes(endKey.getOrElse(x))) val hbaeRDD=habsecontext.hbaseRDD(TableName.valueOf(tableName),scan) hbaeRDD } case None=>{ val scan=new Scan() val hbaeRDD=habsecontext.hbaseRDD(TableName.valueOf(tableName),scan) hbaeRDD } } def main(args: Array[String]): Unit = { //传统方式 conf.set(TableInputFormat.SCAN_ROW_START, "startrowkey") conf.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey") conf.set(TableInputFormat.INPUT_TABLE, "SparkHbase") val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) //利用HbaseContext进行操作 val SparkHbaseRDD=scanHbaseTB("SparkHbase") SparkHbaseRDD.foreach(x=>{ val rowKey=x._1.toString val rs=x._2 val cell=rs.getColumnLatestCell(Bytes.toBytes(""),Bytes.toBytes("")) println(s"the rowKey is $rowKey the values is $cell") }) } } }
http://blog.csdn.net/UnionIBM/article/details/77850979
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-spark</artifactId>
<version>2.0.0-alpha2<ersion>
</dependency>
这个是spark2.0里面的,低版本的只有cdh的依赖.
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-spark</artifactId>
<version>2.0.0-alpha2<ersion>
</dependency>
这个是spark2.0里面的,低版本的只有cdh的依赖.