zoukankan      html  css  js  c++  java
  • spark操作HBASE

    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName, HBaseConfiguration}
    import org.apache.hadoop.hbase.client._
    import org.apache.spark.SparkContext
    import scala.collection.JavaConversions._
    
    /**
     * HBase 1.0.0 新版API, CRUD 的基本操作代码示例
     **/
    object HBaseNewAPI {
      def main(args: Array[String]) {
        val sc = new SparkContext("local", "SparkHBase")
        val conf = HBaseConfiguration.create()
        conf.set("hbase.zookeeper.property.clientPort", "2181")
        conf.set("hbase.zookeeper.quorum", "master")
    
    
        //Connection 的创建是个重量级的工作,线程安全,是操作hbase的入口
        val conn = ConnectionFactory.createConnection(conf)
    
        //从Connection获得 Admin 对象(相当于以前的 HAdmin)
        val admin = conn.getAdmin
    
        //本例将操作的表名
        val userTable = TableName.valueOf("user")
    
        //创建 user 表
        val tableDescr = new HTableDescriptor(userTable)
        tableDescr.addFamily(new HColumnDescriptor("basic".getBytes))
        println("Creating table `user`. ")
        if (admin.tableExists(userTable)) {
          admin.disableTable(userTable)
          admin.deleteTable(userTable)
        }
        admin.createTable(tableDescr)
        println("Done!")
    
        try{
          //获取 user 表
          val table = conn.getTable(userTable)
    
          try{
            //准备插入一条 key 为 id001 的数据
            val p = new Put("id001".getBytes)
            //为put操作指定 column 和 value (以前的 put.add 方法被弃用了)
            p.addColumn("basic".getBytes,"name".getBytes, "wuchong".getBytes)
            //提交
            table.put(p)
    
            //查询某条数据
            val g = new Get("id001".getBytes)
            val result = table.get(g)
            val value = Bytes.toString(result.getValue("basic".getBytes,"name".getBytes))
            println("GET id001 :"+value)
    
            //扫描数据
            val s = new Scan()
            s.addColumn("basic".getBytes,"name".getBytes)
            val scanner = table.getScanner(s)
    
            try{
              for(r <- scanner){
                println("Found row: "+r)
                println("Found value: "+Bytes.toString(r.getValue("basic".getBytes,"name".getBytes)))
              }
            }finally {
              //确保scanner关闭
              scanner.close()
            }
    
            //删除某条数据,操作方式与 Put 类似
            val d = new Delete("id001".getBytes)
            d.addColumn("basic".getBytes,"name".getBytes)
            table.delete(d)
    
          }finally {
            if(table != null) table.close()
          }
    
        }finally {
          conn.close()
        }
      }
    }
    

      

    import org.apache.hadoop.hbase.client.Put
    import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
    import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapred.TableOutputFormat
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.protobuf.ProtobufUtil
    import org.apache.hadoop.hbase.util.{Base64, Bytes}
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.mapred.JobConf
    import org.apache.spark.SparkContext
    import org.apache.hadoop.hbase.client._
    
    /**
     * Spark 读取和写入 HBase
     **/
    object SparkOnHBase {
    
    
      def convertScanToString(scan: Scan) = {
        val proto = ProtobufUtil.toScan(scan)
        Base64.encodeBytes(proto.toByteArray)
      }
    
      def main(args: Array[String]) {
        val sc = new SparkContext("local","SparkOnHBase")
    
        val conf = HBaseConfiguration.create()
        conf.set("hbase.zookeeper.property.clientPort", "2181")
        conf.set("hbase.zookeeper.quorum", "master")
    
    
        // ======Save RDD to HBase========
        // step 1: JobConf setup
        val jobConf = new JobConf(conf,this.getClass)
        jobConf.setOutputFormat(classOf[TableOutputFormat])
        jobConf.set(TableOutputFormat.OUTPUT_TABLE,"user")
    
    
        // step 2: rdd mapping to table
    
        // 在 HBase 中表的 schema 一般是这样的
        // *row   cf:col_1    cf:col_2
        // 而在Spark中,我们操作的是RDD元组,比如(1,"lilei",14) , (2,"hanmei",18)
        // 我们需要将 *RDD[(uid:Int, name:String, age:Int)]* 转换成 *RDD[(ImmutableBytesWritable, Put)]*
        // 我们定义了 convert 函数做这个转换工作
        def convert(triple: (Int, String, Int)) = {
          val p = new Put(Bytes.toBytes(triple._1))
          p.addColumn(Bytes.toBytes("basic"),Bytes.toBytes("name"),Bytes.toBytes(triple._2))
          p.addColumn(Bytes.toBytes("basic"),Bytes.toBytes("age"),Bytes.toBytes(triple._3))
          (new ImmutableBytesWritable, p)
        }
    
        // step 3: read RDD data from somewhere and convert
        val rawData = List((1,"lilei",14), (2,"hanmei",18), (3,"someone",38))
        val localData = sc.parallelize(rawData).map(convert)
    
        //step 4: use `saveAsHadoopDataset` to save RDD to HBase
        localData.saveAsHadoopDataset(jobConf)
        // =================================
    
    
        // ======Load RDD from HBase========
        // use `newAPIHadoopRDD` to load RDD from HBase
        //直接从 HBase 中读取数据并转成 Spark 能直接操作的 RDD[K,V]
    
        //设置查询的表名
        conf.set(TableInputFormat.INPUT_TABLE, "user")
    
        //添加过滤条件,年龄大于 18 岁
        val scan = new Scan()
        scan.setFilter(new SingleColumnValueFilter("basic".getBytes,"age".getBytes,
          CompareOp.GREATER_OR_EQUAL,Bytes.toBytes(18)))
        conf.set(TableInputFormat.SCAN,convertScanToString(scan))
    
        val usersRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
          classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
          classOf[org.apache.hadoop.hbase.client.Result])
    
        val count = usersRDD.count()
        println("Users RDD Count:" + count)
        usersRDD.cache()
    
        //遍历输出
        usersRDD.foreach{ case (_,result) =>
          val key = Bytes.toInt(result.getRow)
          val name = Bytes.toString(result.getValue("basic".getBytes,"name".getBytes))
          val age = Bytes.toInt(result.getValue("basic".getBytes,"age".getBytes))
          println("Row key:"+key+" Name:"+name+" Age:"+age)
        }
        // =================================
      }
    }
    

      

    转:https://gist.github.com/wuchong/95630f80966d07d7453b#file-hbasenewapi-scala

    http://wuchong.me/blog/2015/04/04/spark-on-yarn-cluster-deploy/

  • 相关阅读:
    MySQL -- select count(1) 计算一共有多百少符合条件的行
    Python3 -- 文件I/O总结(with、read、write、txt、CSV等)
    Linux -- wget 之 FTP篇
    Linux -- head/tail 查看文件的指定行数
    linux -- 查看linux磁盘容量和文件夹所占磁盘容量
    Linux -- 查询某个文件夹下的文件数量
    Python3 -- 查看python安装路径以及pip安装的包列表及路径
    Python3 --Linux 编码注释# -*- coding:utf-8 -*-
    VisualStudio2013 如何打开之前版本开发的(.vdproj )安装项目
    const int *p与int *const p的区别(转:csdn,suer0101)
  • 原文地址:https://www.cnblogs.com/seaspring/p/5631112.html
Copyright © 2011-2022 走看看