zoukankan      html  css  js  c++  java
  • Spark学习笔记——读写Hbase

    1.首先在Hbase中建立一张表,名字为student

    参考 Hbase学习笔记——基本CRUD操作

    一个cell的值,取决于Row,Column family,Column Qualifier和Timestamp

    Hbase表结构

    2.往Hbase中写入数据,写入的时候,需要写family和column

    build.sbt

    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-core" % "2.1.0",
      "mysql" % "mysql-connector-java" % "5.1.31",
      "org.apache.spark" %% "spark-sql" % "2.1.0",
      "org.apache.hbase" % "hbase-common" % "1.3.0",
      "org.apache.hbase" % "hbase-client" % "1.3.0",
      "org.apache.hbase" % "hbase-server" % "1.3.0",
      "org.apache.hbase" % "hbase" % "1.2.1"
    )
    

    在hbaseshell中写数据的时候,写的是String,但是在idea中写代码的话,如果写的是int类型的,就会出现x00...的情况

    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql._
    import java.util.Properties
    
    import com.google.common.collect.Lists
    import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.{Get, Put, Result, Scan}
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapred.TableOutputFormat
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.mapred.JobConf
    
    /**
      * Created by mi on 17-4-11.
      */
    
    case class resultset(name: String,
                         info: String,
                         summary: String)
    
    case class IntroItem(name: String, value: String)
    
    
    case class BaikeLocation(name: String,
                             url: String = "",
                             info: Seq[IntroItem] = Seq(),
                             summary: Option[String] = None)
    
    case class MewBaikeLocation(name: String,
                                url: String = "",
                                info: Option[String] = None,
                                summary: Option[String] = None)
    
    
    object MysqlOpt {
    
      def main(args: Array[String]): Unit = {
    
        // 本地模式运行,便于测试
        val conf = new SparkConf().setAppName("WordCount").setMaster("local")
        // 创建 spark context
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
    
        //定义数据库和表信息
        val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&characterEncoding=UTF-8"
        val table = "baike_pages"
    
        // 读取Hbase文件,在hbase的/usr/local/hbase/conf/hbase-site.xml中写的地址
        val hbasePath = "file:///usr/local/hbase/hbase-tmp"
    
        // 创建hbase configuration
        val hBaseConf = HBaseConfiguration.create()
        hBaseConf.set(TableInputFormat.INPUT_TABLE, "student")
    
        // 初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!
        val jobConf = new JobConf(hBaseConf)
        jobConf.setOutputFormat(classOf[TableOutputFormat])
        jobConf.set(TableOutputFormat.OUTPUT_TABLE, "student")
    
        val indataRDD = sc.makeRDD(Array("1,99,98","2,97,96","3,95,94"))
    
        val rdd = indataRDD.map(_.split(',')).map{arr=>{
          /*一个Put对象就是一行记录,在构造方法中指定主键
           * 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换
           * Put.add方法接收三个参数:列族,列名,数据
           */
          val put = new Put(Bytes.toBytes(arr(0)))
          put.add(Bytes.toBytes("course"),Bytes.toBytes("math"),Bytes.toBytes(arr(1)))
          put.add(Bytes.toBytes("course"),Bytes.toBytes("english"),Bytes.toBytes(arr(2)))
          //转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
          (new ImmutableBytesWritable, put)
        }}
    
        rdd.saveAsHadoopDataset(jobConf)
    
        sc.stop()
    
      }
    
    }
    

    3.从Hbase中读取数据

    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql._
    import java.util.Properties
    
    import com.google.common.collect.Lists
    import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.{Get, Put, Result, Scan}
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapred.TableOutputFormat
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.mapred.JobConf
    
    /**
      * Created by mi on 17-4-11.
      */
    
    case class resultset(name: String,
                         info: String,
                         summary: String)
    
    case class IntroItem(name: String, value: String)
    
    
    case class BaikeLocation(name: String,
                             url: String = "",
                             info: Seq[IntroItem] = Seq(),
                             summary: Option[String] = None)
    
    case class MewBaikeLocation(name: String,
                                url: String = "",
                                info: Option[String] = None,
                                summary: Option[String] = None)
    
    
    object MysqlOpt {
    
      def main(args: Array[String]): Unit = {
    
        // 本地模式运行,便于测试
        val conf = new SparkConf().setAppName("WordCount").setMaster("local")
        // 创建 spark context
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
    
        //定义数据库和表信息
        val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&characterEncoding=UTF-8"
        val table = "baike_pages"
    
        // 读取Hbase文件,在hbase的/usr/local/hbase/conf/hbase-site.xml中写的地址
        val hbasePath = "file:///usr/local/hbase/hbase-tmp"
    
        // 创建hbase configuration
        val hBaseConf = HBaseConfiguration.create()
        hBaseConf.set(TableInputFormat.INPUT_TABLE, "student")
    
        // 从数据源获取数据并转化成rdd
        val hBaseRDD = sc.newAPIHadoopRDD(hBaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    
        println(hBaseRDD.count())
    
        // 将数据映射为表  也就是将 RDD转化为 dataframe schema
        hBaseRDD.foreach{case (_,result) =>{
          //获取行键
          val key = Bytes.toString(result.getRow)
          //通过列族和列名获取列
          val math = Bytes.toString(result.getValue("course".getBytes,"math".getBytes))
          println("Row key:"+key+" Math:"+math)
        }}
    
        sc.stop()
    
      }
    
    
    }
    

     输出

    Row key:    Math:99
    Row key:    Math:97
    Row key:    Math:95
    Row key:1 Math:99
    Row key:1000 Math:99
    Row key:2 Math:97
    Row key:3 Math:95
    
  • 相关阅读:
    基于jQuery的鼠标悬停时放大图片的效果制作
    让document.write的广告无阻塞的加载
    浅谈DOM事件的优化
    前端自动化构建和发布系统的设计
    parseQueryString
    元素缩放
    apply用法
    3D旋转
    CSS居中的多种方法
    System Center Configuration Manager 2016 域准备篇(Part2)
  • 原文地址:https://www.cnblogs.com/tonglin0325/p/6728874.html
Copyright © 2011-2022 走看看