zoukankan      html  css  js  c++  java
  • spark读取hbase数据

    def main(args: Array[String]): Unit = {
          val hConf = HBaseConfiguration.create();
          hConf.set("hbase.zookeeper.quorum","m6,m7,m8") 
          val tableName = "t_person"
          hConf.set(TableInputFormat.INPUT_TABLE, tableName)      
          val hAdmin = new HBaseAdmin(hConf)
          val conf = new SparkConf()
          conf.set("spark.master", "local")
          conf.set("spark.app.name", "spark demo")
          val sc = new SparkContext(conf);
          val rs = sc.newAPIHadoopRDD(hConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
          rs.foreach(x => {
            println(Bytes.toString(x._2.getRow))
            // 通过列族和列名获取列  
            println(Bytes.toInt(x._2.getValue("base_info".getBytes, "age".getBytes)))
          })
      }
    

     保存数据到hbase数据库中

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.set("spark.master", "local")
        conf.set("spark.app.name", "spark demo")
        // 创建SparkSession对象
        val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate();
        // 创建sparkContext对象
        val sc = spark.sparkContext
        
        val hbaseConf = HBaseConfiguration.create()
        val tableName = "t_person"
        hbaseConf.set("hbase.zookeeper.quorum","m6,m7,m8")
        hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)
        
        // val hbaseAdmin = new HBaseAdmin(hbaseConf)
        val jobConf = new JobConf(hbaseConf, this.getClass)
        jobConf.setOutputFormat(classOf[TableOutputFormat])
        jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
        
        val pairs = sc.parallelize(List(("p_0000010", "12")))
        
        def convert(data : (String, String)) = {
          val p = new Put(Bytes.toBytes(data._1))      
          p.add(Bytes.toBytes("base_info"), Bytes.toBytes("age"), Bytes.toBytes(data._2))     
          (new ImmutableBytesWritable , p)         
        }
         
        // 保存数据到hbase数据库中
        new PairRDDFunctions(pairs.map(convert)).saveAsHadoopDataset(jobConf)
      }
    

      

  • 相关阅读:
    mmap和MappedByteBuffer
    Linux命令之TOP
    Linux命令之ss
    MySql Cluster
    HttpClient配置
    注释驱动的 Spring cache 缓存介绍
    Spring AOP 实现原理与 CGLIB 应用
    AspectJ本质剖析
    B树
    imagick-3.1.0RC2 安装错误
  • 原文地址:https://www.cnblogs.com/heml/p/6148139.html
Copyright © 2011-2022 走看看