zoukankan      html  css  js  c++  java
  • Spark读写HBase

    Spark读写HBase示例

    1、HBase shell查看表结构

    hbase(main):002:0> desc 'SDAS_Person'
    Table SDAS_Person is ENABLED                                                                               
    SDAS_Person                                                                                                
    COLUMN FAMILIES DESCRIPTION                                                                                
    {NAME => 'cf0', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE',
     DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE =>
     'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                                   
    {NAME => 'cf1', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE',
     DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE =>
     'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                                   
    {NAME => 'cf2', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE',
     DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE =>
     'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                                   
    3 row(s) in 0.0810 seconds
    hbase(main):006:0> desc 'RESULT'
    Table RESULT is ENABLED 
    RESULT 
    COLUMN FAMILIES DESCRIPTION 
    {NAME => 'cf0', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE',
    DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE =>
    'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'} 
    1 row(s) in 0.0250 seconds

    2、HBase shell插入数据

    hbase(main):007:0> scan 'SDAS_Person'
    ROW                         COLUMN+CELL                                                                    
     SDAS_1#1                   column=cf0:Age, timestamp=1505209254407, value=33                              
     SDAS_1#1                   column=cf0:CompanyID, timestamp=1505209254407, value=1                         
     SDAS_1#1                   column=cf0:InDate, timestamp=1505209254407, value=2017-08-02 16:02:08.49       
     SDAS_1#1                   column=cf0:Money, timestamp=1505209254407, value=5.20                          
     SDAS_1#1                   column=cf0:Name, timestamp=1505209254407, value=zhangsan                       
     SDAS_1#1                   column=cf0:PersonID, timestamp=1505209254407, value=1                          

    3、pom.xml:

        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>${scala.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_${scala.binary.version}</artifactId>
          <version>${spark.version}</version>
          <scope>provided</scope>
        </dependency>

    4、源码:

    package com.zxth.sdas.spark.apps
    import org.apache.spark._  
    import org.apache.spark.rdd.NewHadoopRDD  
    import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}  
    import org.apache.hadoop.hbase.client.HBaseAdmin  
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.util.Bytes  
    import org.apache.hadoop.hbase.client.Put  
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.mapreduce.Job 
    import org.apache.hadoop.hbase.client.Result  
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
    
    object HBaseOp {
      var total:Int = 0
      def main(args: Array[String]) {
        val sparkConf = new SparkConf().setAppName("HBaseOp").setMaster("local")  
        val sc = new SparkContext(sparkConf)
    
        val conf = HBaseConfiguration.create()
        conf.set("hbase.zookeeper.quorum","master,slave1,slave2")  
        conf.set("hbase.zookeeper.property.clientPort", "2181")  
        conf.set(TableInputFormat.INPUT_TABLE, "SDAS_Person")
    
        //读取数据并转化成rdd
        val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
          classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
          classOf[org.apache.hadoop.hbase.client.Result])  
      
        val count = hBaseRDD.count()  
        println("
    
    
    :" + count)
        hBaseRDD.foreach{case (_,result) =>{  
          //获取行键  
          val key = Bytes.toString(result.getRow)  
          //通过列族和列名获取列
          var obj = result.getValue("cf0".getBytes,"Name".getBytes)
          val name = if(obj==null) "" else Bytes.toString(obj)
          
          obj = result.getValue("cf0".getBytes,"Age".getBytes);
          val age:Int = if(obj == null) 0 else Bytes.toString(obj).toInt
          
          total = total + age
          println("Row key:"+key+" Name:"+name+" Age:"+age+" total:"+total)
        }} 
        var average:Double = total.toDouble/count.toDouble
        println("" + total + "/" + count + " average age:" + average.toString())
        
        //write hbase
        conf.set(TableOutputFormat.OUTPUT_TABLE, "RESULT")
        val job = new Job(conf)  
        job.setOutputKeyClass(classOf[ImmutableBytesWritable])  
        job.setOutputValueClass(classOf[Result])    
        job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
        
        var arrResult:Array[String] = new Array[String](1)
        arrResult(0) = "1," + total + "," + average;
        //arrResult(0) = "1,100,11"
    
        val resultRDD = sc.makeRDD(arrResult)
        val saveRDD = resultRDD.map(_.split(',')).map{arr=>{  
          val put = new Put(Bytes.toBytes(arr(0)))
          put.add(Bytes.toBytes("cf0"),Bytes.toBytes("total"),Bytes.toBytes(arr(1)))  
          put.add(Bytes.toBytes("cf0"),Bytes.toBytes("average"),Bytes.toBytes(arr(2)))  
          (new ImmutableBytesWritable, put)   
        }}
        println("getConfiguration")
        var c = job.getConfiguration()
        println("save")
        saveRDD.saveAsNewAPIHadoopDataset(c)  
        
        sc.stop()
      }  
    }

    5、maven打包

    mvn clean scala:compile compile package

    6、提交运算

    bin/spark-submit 
    --jars $(echo /opt/hbase-1.2.6/lib/*.jar | tr ' ' ',') 
    --class com.zxth.sdas.spark.apps.HBaseOp 
    --master local 
    sdas-spark-1.0.0.jar
  • 相关阅读:
    DataReader相关知识点⭐⭐⭐⭐⭐
    C# Distanct List集合
    RePlace函数
    DataTable和DataRow和DataColumn ⭐⭐⭐⭐⭐
    scrapy 基础组件专题(八):scrapy-redis 框架分析
    scrapy 基础组件专题(九):scrapy-redis 源码分析
    scrapy 基础组件专题(七):scrapy 调度器、调度器中间件、自定义调度器
    scrapy 基础组件专题(六):自定义命令
    scrapy 基础组件专题(五):自定义扩展
    scrapy 基础组件专题(四):信号运用
  • 原文地址:https://www.cnblogs.com/unreal/p/7610872.html
Copyright © 2011-2022 走看看