zoukankan      html  css  js  c++  java
  • spark(2.1.0) 操作hbase(1.0.2)

    一、写操作

    1、spark中引入外部jar包

      1)创建/usr/software/spark_jars目录,把hbase里的lib里的以下七个jar放入/usr/software/spark_jars里:

        guava-12.0.1.jar

        hbase-common-1.0.2.jar

        hbase-protocol-1.0.2.jar

        htrace-core-3.1.0-incubating.jar

        hbase-client-1.0.2.jar

        hbase-prefix-tree-1.0.2.jar

        hbase-server-1.0.2.jar

      2)修改spark-default.conf文件,加入以下两行: 

        spark.executor.extraClassPath=/usr/software/spark_jars/*
        spark.driver.extraClassPath=/usr/software/spark_jars/*

    2、进入hbase事先创建好表

        create 'test','f1'

    2、用spark-shell进行操作hbase。

    3、代码部分:

    import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
    import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapred.TableOutputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.mapred.JobConf
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    val conf = HBaseConfiguration.create()
    var jobConf = new JobConf(conf)
    jobConf.set("hbase.zookeeper.quorum", "localhost")
    jobConf.set("zookeeper.znode.parent", "/hbase")
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "test")
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    val rdd = sc.makeRDD(Array(1)).flatMap(_ => 0 to 100000)
    rdd.map(x => {
    var put = new Put(Bytes.toBytes(x.toString))
    put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x.toString))
    (new ImmutableBytesWritable, put)
    }).saveAsHadoopDataset(jobConf)

    二、读操作

    1、用shell操作

    import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}  

    import org.apache.hadoop.hbase.client.HBaseAdmin  

    import org.apache.hadoop.hbase.mapreduce.TableInputFormat  

    import org.apache.spark._  

    import org.apache.hadoop.hbase.client.HTable  

    import org.apache.hadoop.hbase.client.Put  

    import org.apache.hadoop.hbase.util.Bytes  

    import org.apache.hadoop.hbase.io.ImmutableBytesWritable  

    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat  

    import org.apache.hadoop.mapred.JobConf  

    import org.apache.hadoop.io._ 

     

    val tablename = "test"

    val conf = HBaseConfiguration.create()  

    conf.set("hbase.zookeeper.quorum","hadoop01")

    conf.set("hbase.zookeeper.property.clientPort", "2181") 

    conf.set(TableInputFormat.INPUT_TABLE, tablename)

    val admin = new HBaseAdmin(conf)  

    if (!admin.isTableAvailable(tablename)) {  

    val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))  

    admin.createTable(tableDesc)  

    }  

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],  

    classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],

    classOf[org.apache.hadoop.hbase.client.Result]) 

    val count = hBaseRDD.count()

    hBaseRDD.foreach{case (_,result) =>{

    val rowKey = Bytes.toString(result.getRow)

    val value= Bytes.toString(result.getValue("f1".getBytes,"c1".getBytes))

    println("rowKey:"+rowKey+" Value:"+value)

    }}  

  • 相关阅读:
    SharePoint 2010 不用工作流模拟工作流表单审批的解决方案
    SharePoint 2010 传入电子邮件和传出电子邮件做成收件箱和发件箱(理解可行性)
    SharePoint 2010 准备虚拟机开发环境出现的问题和解决方式
    使用Shell脚本对Linux系统和进程资源进行监控(转)
    你需要知道的16个Linux服务器监控命令(转)
    数字音频采样率与码率(转)
    ffmpeg libx264 编码 "use an encoding preset (e.g. vpre medium)" 错误解决
    一致性哈希算法consistent hashing
    TIME_WAIT 数量增加解决办法
    linux bc命令使用(转)
  • 原文地址:https://www.cnblogs.com/runnerjack/p/7858241.html
Copyright © 2011-2022 走看看