zoukankan      html  css  js  c++  java
  • Spark和HBase整合

    写入HBase表代码示例:

    import org.apache.spark.SparkConf

    import org.apache.spark.SparkContext

    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat

    import org.apache.hadoop.mapreduce.Job

    import org.apache.hadoop.hbase.io.ImmutableBytesWritable

    import org.apache.hadoop.fs.shell.find.Result

    import org.apache.hadoop.hbase.client.Put

    import org.apache.hadoop.hbase.util.Bytes

       

    object WriteDriver {

     

    def main(args: Array[String]): Unit = {

     

    val conf=new SparkConf().setMaster("local").setAppName("writeHbase")

     

    val sc=new SparkContext(conf)

     

    sc.hadoopConfiguration.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")

    sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort","2181")

    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"tabx")

     

    val job=new Job(sc.hadoopConfiguration)

     

    job.setOutputKeyClass(classOf[ImmutableBytesWritable])

    job.setOutputValueClass(classOf[Result])

    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

     

    val data=sc.makeRDD(Array("rk1,tom,23","rk2,rose,25","rk3,jary,30"))

     

    val hbaseRDD=data.map { line =>{

    val infos=line.split(",")

    val rowKey=infos(0)

    val name=infos(1)

    val age=infos(2)

     

    val put=new Put(Bytes.toBytes(rowKey))

    put.add(Bytes.toBytes("cf1"),Bytes.toBytes("name"),Bytes.toBytes(name))

    put.add(Bytes.toBytes("cf1"),Bytes.toBytes("age"),Bytes.toBytes(age))

     

    (new ImmutableBytesWritable,put)

    } }

     

    //--RDD数据存储进Hbase

    hbaseRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)

       

    }

    }

       

    读取HBase表代码:

    import org.apache.spark.SparkConf

    import org.apache.spark.SparkContext

    import org.apache.hadoop.hbase.HBaseConfiguration

    import org.apache.hadoop.hbase.mapreduce.TableInputFormat

    import org.apache.hadoop.hbase.io.ImmutableBytesWritable

    import org.apache.hadoop.hbase.client.Result

    import org.apache.hadoop.hbase.util.Bytes

       

    object ReadDriver {

     

    def main(args: Array[String]): Unit = {

     

    val conf=new SparkConf().setMaster("local").setAppName("readHbase")

    val sc=new SparkContext(conf)

     

    //--创建Hbase的环境变量参数

    val hbaseConf=HBaseConfiguration.create()

     

    hbaseConf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")

    hbaseConf.set("hbase.zookeeper.property.clientPort","2181")

    hbaseConf.set(TableInputFormat.INPUT_TABLE,"tabx")

     

    val resultRDD=sc.newAPIHadoopRDD(hbaseConf,classOf[TableInputFormat],

    classOf[ImmutableBytesWritable],classOf[Result])

     

    resultRDD.foreach{x=>{

    //--查询出来的结果集存在 (ImmutableBytesWritable, Result)第二个元素

    val result=x._2

    //--获取行键

    val rowKey=Bytes.toString(result.getRow)

     

    val name=Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name")))

    val age=Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("age")))

     

    println(rowKey+":"+name+":"+age)

    }}

    }

    }

       

    过滤器代码:

    import org.apache.spark.SparkConf

    import org.apache.spark.SparkContext

    import org.apache.hadoop.hbase.HBaseConfiguration

    import org.datanucleus.store.types.backed.Set

    import org.apache.hadoop.hbase.mapreduce.TableInputFormat

    import org.apache.hadoop.hbase.client.Scan

    import org.apache.hadoop.hbase.filter.RandomRowFilter

    import org.apache.hadoop.hbase.util.Base64

    import org.apache.hadoop.hbase.protobuf.ProtobufUtil

    import org.apache.hadoop.hbase.io.ImmutableBytesWritable

    import org.apache.hadoop.hbase.client.Result

    import org.apache.hadoop.hbase.util.Bytes

       

    object ReadDriver2 {

     

    def main(args: Array[String]): Unit = {

    val conf=new SparkConf().setMaster("local").setAppName("readHbaseFilter")

    val sc=new SparkContext(conf)

     

    val hbaseConf=HBaseConfiguration.create()

     

    hbaseConf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")

    hbaseConf.set("hbase.zookeeper.property.clientPort","2181")

    hbaseConf.set(TableInputFormat.INPUT_TABLE,"tabx")

     

    val scan=new Scan

    scan.setFilter(new RandomRowFilter(0.5f))

    //--设置scan对象,让filter生效

    hbaseConf.set(TableInputFormat.SCAN,

    Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))

     

    val resultRDD=sc.newAPIHadoopRDD(hbaseConf,classOf[TableInputFormat],

    classOf[ImmutableBytesWritable],classOf[Result])

     

    resultRDD.foreach{x=>{

    //--查询出来的结果集存在 (ImmutableBytesWritable, Result)第二个元素

    val result=x._2

    //--获取行键

    val rowKey=Bytes.toString(result.getRow)

     

    val name=Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name")))

    val age=Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("age")))

     

    println(rowKey+":"+name+":"+age)

     

    }}

    }

    }

       

       

  • 相关阅读:
    为什么要使用Handler
    使用Java中的Timer和TimerTask
    Top子句对查询计划的影响
    一个单表死锁的示例
    tracer token 追踪标记
    DDL Trigger
    事物复制的troubleshooting 1
    在分发服务器上查看信息
    将windows 2003 sp2的cluster升级到windows 2008 r2
    DistributionDB过大的原因
  • 原文地址:https://www.cnblogs.com/shuzhiwei/p/11323148.html
Copyright © 2011-2022 走看看