写入HBase表代码示例:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.fs.shell.find.Result
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
object WriteDriver {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("writeHbase")
val sc=new SparkContext(conf)
sc.hadoopConfiguration.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")
sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort","2181")
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"tabx")
val job=new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val data=sc.makeRDD(Array("rk1,tom,23","rk2,rose,25","rk3,jary,30"))
val hbaseRDD=data.map { line =>{
val infos=line.split(",")
val rowKey=infos(0)
val name=infos(1)
val age=infos(2)
val put=new Put(Bytes.toBytes(rowKey))
put.add(Bytes.toBytes("cf1"),Bytes.toBytes("name"),Bytes.toBytes(name))
put.add(Bytes.toBytes("cf1"),Bytes.toBytes("age"),Bytes.toBytes(age))
(new ImmutableBytesWritable,put)
} }
//--将RDD数据存储进Hbase
hbaseRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
}
读取HBase表代码:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes
object ReadDriver {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("readHbase")
val sc=new SparkContext(conf)
//--创建Hbase的环境变量参数
val hbaseConf=HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")
hbaseConf.set("hbase.zookeeper.property.clientPort","2181")
hbaseConf.set(TableInputFormat.INPUT_TABLE,"tabx")
val resultRDD=sc.newAPIHadoopRDD(hbaseConf,classOf[TableInputFormat],
classOf[ImmutableBytesWritable],classOf[Result])
resultRDD.foreach{x=>{
//--查询出来的结果集存在 (ImmutableBytesWritable, Result)第二个元素
val result=x._2
//--获取行键
val rowKey=Bytes.toString(result.getRow)
val name=Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name")))
val age=Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("age")))
println(rowKey+":"+name+":"+age)
}}
}
}
过滤器代码:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.HBaseConfiguration
import org.datanucleus.store.types.backed.Set
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.filter.RandomRowFilter
import org.apache.hadoop.hbase.util.Base64
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes
object ReadDriver2 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("readHbaseFilter")
val sc=new SparkContext(conf)
val hbaseConf=HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")
hbaseConf.set("hbase.zookeeper.property.clientPort","2181")
hbaseConf.set(TableInputFormat.INPUT_TABLE,"tabx")
val scan=new Scan
scan.setFilter(new RandomRowFilter(0.5f))
//--设置scan对象,让filter生效
hbaseConf.set(TableInputFormat.SCAN,
Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))
val resultRDD=sc.newAPIHadoopRDD(hbaseConf,classOf[TableInputFormat],
classOf[ImmutableBytesWritable],classOf[Result])
resultRDD.foreach{x=>{
//--查询出来的结果集存在 (ImmutableBytesWritable, Result)第二个元素
val result=x._2
//--获取行键
val rowKey=Bytes.toString(result.getRow)
val name=Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name")))
val age=Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("age")))
println(rowKey+":"+name+":"+age)
}}
}
}