zoukankan      html  css  js  c++  java
  • Spark读写HBase

    Spark读写HBase示例

    1、HBase shell查看表结构

    hbase(main):002:0> desc 'SDAS_Person'
    Table SDAS_Person is ENABLED                                                                               
    SDAS_Person                                                                                                
    COLUMN FAMILIES DESCRIPTION                                                                                
    {NAME => 'cf0', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE',
     DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE =>
     'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                                   
    {NAME => 'cf1', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE',
     DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE =>
     'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                                   
    {NAME => 'cf2', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE',
     DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE =>
     'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                                   
    3 row(s) in 0.0810 seconds
    hbase(main):006:0> desc 'RESULT'
    Table RESULT is ENABLED 
    RESULT 
    COLUMN FAMILIES DESCRIPTION 
    {NAME => 'cf0', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE',
    DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE =>
    'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'} 
    1 row(s) in 0.0250 seconds

    2、HBase shell插入数据

    hbase(main):007:0> scan 'SDAS_Person'
    ROW                         COLUMN+CELL                                                                    
     SDAS_1#1                   column=cf0:Age, timestamp=1505209254407, value=33                              
     SDAS_1#1                   column=cf0:CompanyID, timestamp=1505209254407, value=1                         
     SDAS_1#1                   column=cf0:InDate, timestamp=1505209254407, value=2017-08-02 16:02:08.49       
     SDAS_1#1                   column=cf0:Money, timestamp=1505209254407, value=5.20                          
     SDAS_1#1                   column=cf0:Name, timestamp=1505209254407, value=zhangsan                       
     SDAS_1#1                   column=cf0:PersonID, timestamp=1505209254407, value=1                          

    3、pom.xml:

        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>${scala.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_${scala.binary.version}</artifactId>
          <version>${spark.version}</version>
          <scope>provided</scope>
        </dependency>

    4、源码:

    package com.zxth.sdas.spark.apps
    import org.apache.spark._  
    import org.apache.spark.rdd.NewHadoopRDD  
    import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}  
    import org.apache.hadoop.hbase.client.HBaseAdmin  
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.util.Bytes  
    import org.apache.hadoop.hbase.client.Put  
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.mapreduce.Job 
    import org.apache.hadoop.hbase.client.Result  
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
    
    object HBaseOp {
      var total:Int = 0
      def main(args: Array[String]) {
        val sparkConf = new SparkConf().setAppName("HBaseOp").setMaster("local")  
        val sc = new SparkContext(sparkConf)
    
        val conf = HBaseConfiguration.create()
        conf.set("hbase.zookeeper.quorum","master,slave1,slave2")  
        conf.set("hbase.zookeeper.property.clientPort", "2181")  
        conf.set(TableInputFormat.INPUT_TABLE, "SDAS_Person")
    
        //读取数据并转化成rdd
        val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
          classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
          classOf[org.apache.hadoop.hbase.client.Result])  
      
        val count = hBaseRDD.count()  
        println("
    
    
    :" + count)
        hBaseRDD.foreach{case (_,result) =>{  
          //获取行键  
          val key = Bytes.toString(result.getRow)  
          //通过列族和列名获取列
          var obj = result.getValue("cf0".getBytes,"Name".getBytes)
          val name = if(obj==null) "" else Bytes.toString(obj)
          
          obj = result.getValue("cf0".getBytes,"Age".getBytes);
          val age:Int = if(obj == null) 0 else Bytes.toString(obj).toInt
          
          total = total + age
          println("Row key:"+key+" Name:"+name+" Age:"+age+" total:"+total)
        }} 
        var average:Double = total.toDouble/count.toDouble
        println("" + total + "/" + count + " average age:" + average.toString())
        
        //write hbase
        conf.set(TableOutputFormat.OUTPUT_TABLE, "RESULT")
        val job = new Job(conf)  
        job.setOutputKeyClass(classOf[ImmutableBytesWritable])  
        job.setOutputValueClass(classOf[Result])    
        job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
        
        var arrResult:Array[String] = new Array[String](1)
        arrResult(0) = "1," + total + "," + average;
        //arrResult(0) = "1,100,11"
    
        val resultRDD = sc.makeRDD(arrResult)
        val saveRDD = resultRDD.map(_.split(',')).map{arr=>{  
          val put = new Put(Bytes.toBytes(arr(0)))
          put.add(Bytes.toBytes("cf0"),Bytes.toBytes("total"),Bytes.toBytes(arr(1)))  
          put.add(Bytes.toBytes("cf0"),Bytes.toBytes("average"),Bytes.toBytes(arr(2)))  
          (new ImmutableBytesWritable, put)   
        }}
        println("getConfiguration")
        var c = job.getConfiguration()
        println("save")
        saveRDD.saveAsNewAPIHadoopDataset(c)  
        
        sc.stop()
      }  
    }

    5、maven打包

    mvn clean scala:compile compile package

    6、提交运算

    bin/spark-submit 
    --jars $(echo /opt/hbase-1.2.6/lib/*.jar | tr ' ' ',') 
    --class com.zxth.sdas.spark.apps.HBaseOp 
    --master local 
    sdas-spark-1.0.0.jar
  • 相关阅读:
    深入浅出 Java 8 Lambda 表达式
    OneAPM x 腾讯 | OneAPM 技术公开课·深圳 报名:前端性能大作战!
    第30节:Java基础-内部类
    第二十九节:Java基础知识-类,多态,Object,数组和字符串
    第二十九节:Java基础知识-类,多态,Object,数组和字符串
    第二十九节:Java基础知识-类,多态,Object,数组和字符串
    第二十八节:Java基础-进阶继承,抽象类,接口
    第二十八节:Java基础-进阶继承,抽象类,接口
    第二十八节:Java基础-进阶继承,抽象类,接口
    ES6教程-字符串,函数的参数,了解函数的arguments对象,js面向对象,设计模式-单例模式,解构赋值
  • 原文地址:https://www.cnblogs.com/unreal/p/7610872.html
Copyright © 2011-2022 走看看