zoukankan      html  css  js  c++  java
  • Spark学习笔记——读写Hbase

    1.首先在Hbase中建立一张表,名字为student

    参考 Hbase学习笔记——基本CRUD操作

    一个cell的值,取决于Row,Column family,Column Qualifier和Timestamp

    Hbase表结构

    2.往Hbase中写入数据,写入的时候,需要写family和column

    build.sbt

    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-core" % "2.1.0",
      "mysql" % "mysql-connector-java" % "5.1.31",
      "org.apache.spark" %% "spark-sql" % "2.1.0",
      "org.apache.hbase" % "hbase-common" % "1.3.0",
      "org.apache.hbase" % "hbase-client" % "1.3.0",
      "org.apache.hbase" % "hbase-server" % "1.3.0",
      "org.apache.hbase" % "hbase" % "1.2.1"
    )
    

    在hbaseshell中写数据的时候,写的是String,但是在idea中写代码的话,如果写的是int类型的,就会出现x00...的情况

    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql._
    import java.util.Properties
    
    import com.google.common.collect.Lists
    import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.{Get, Put, Result, Scan}
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapred.TableOutputFormat
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.mapred.JobConf
    
    /**
      * Created by mi on 17-4-11.
      */
    
    case class resultset(name: String,
                         info: String,
                         summary: String)
    
    case class IntroItem(name: String, value: String)
    
    
    case class BaikeLocation(name: String,
                             url: String = "",
                             info: Seq[IntroItem] = Seq(),
                             summary: Option[String] = None)
    
    case class MewBaikeLocation(name: String,
                                url: String = "",
                                info: Option[String] = None,
                                summary: Option[String] = None)
    
    
    object MysqlOpt {
    
      def main(args: Array[String]): Unit = {
    
        // 本地模式运行,便于测试
        val conf = new SparkConf().setAppName("WordCount").setMaster("local")
        // 创建 spark context
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
    
        //定义数据库和表信息
        val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&characterEncoding=UTF-8"
        val table = "baike_pages"
    
        // 读取Hbase文件,在hbase的/usr/local/hbase/conf/hbase-site.xml中写的地址
        val hbasePath = "file:///usr/local/hbase/hbase-tmp"
    
        // 创建hbase configuration
        val hBaseConf = HBaseConfiguration.create()
        hBaseConf.set(TableInputFormat.INPUT_TABLE, "student")
    
        // 初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!
        val jobConf = new JobConf(hBaseConf)
        jobConf.setOutputFormat(classOf[TableOutputFormat])
        jobConf.set(TableOutputFormat.OUTPUT_TABLE, "student")
    
        val indataRDD = sc.makeRDD(Array("1,99,98","2,97,96","3,95,94"))
    
        val rdd = indataRDD.map(_.split(',')).map{arr=>{
          /*一个Put对象就是一行记录,在构造方法中指定主键
           * 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换
           * Put.add方法接收三个参数:列族,列名,数据
           */
          val put = new Put(Bytes.toBytes(arr(0)))
          put.add(Bytes.toBytes("course"),Bytes.toBytes("math"),Bytes.toBytes(arr(1)))
          put.add(Bytes.toBytes("course"),Bytes.toBytes("english"),Bytes.toBytes(arr(2)))
          //转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
          (new ImmutableBytesWritable, put)
        }}
    
        rdd.saveAsHadoopDataset(jobConf)
    
        sc.stop()
    
      }
    
    }
    

    3.从Hbase中读取数据

    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql._
    import java.util.Properties
    
    import com.google.common.collect.Lists
    import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.{Get, Put, Result, Scan}
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapred.TableOutputFormat
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.mapred.JobConf
    
    /**
      * Created by mi on 17-4-11.
      */
    
    case class resultset(name: String,
                         info: String,
                         summary: String)
    
    case class IntroItem(name: String, value: String)
    
    
    case class BaikeLocation(name: String,
                             url: String = "",
                             info: Seq[IntroItem] = Seq(),
                             summary: Option[String] = None)
    
    case class MewBaikeLocation(name: String,
                                url: String = "",
                                info: Option[String] = None,
                                summary: Option[String] = None)
    
    
    object MysqlOpt {
    
      def main(args: Array[String]): Unit = {
    
        // 本地模式运行,便于测试
        val conf = new SparkConf().setAppName("WordCount").setMaster("local")
        // 创建 spark context
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
    
        //定义数据库和表信息
        val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&characterEncoding=UTF-8"
        val table = "baike_pages"
    
        // 读取Hbase文件,在hbase的/usr/local/hbase/conf/hbase-site.xml中写的地址
        val hbasePath = "file:///usr/local/hbase/hbase-tmp"
    
        // 创建hbase configuration
        val hBaseConf = HBaseConfiguration.create()
        hBaseConf.set(TableInputFormat.INPUT_TABLE, "student")
    
        // 从数据源获取数据并转化成rdd
        val hBaseRDD = sc.newAPIHadoopRDD(hBaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    
        println(hBaseRDD.count())
    
        // 将数据映射为表  也就是将 RDD转化为 dataframe schema
        hBaseRDD.foreach{case (_,result) =>{
          //获取行键
          val key = Bytes.toString(result.getRow)
          //通过列族和列名获取列
          val math = Bytes.toString(result.getValue("course".getBytes,"math".getBytes))
          println("Row key:"+key+" Math:"+math)
        }}
    
        sc.stop()
    
      }
    
    
    }
    

     输出

    Row key:    Math:99
    Row key:    Math:97
    Row key:    Math:95
    Row key:1 Math:99
    Row key:1000 Math:99
    Row key:2 Math:97
    Row key:3 Math:95
    
  • 相关阅读:
    leetcode 576. Out of Boundary Paths 、688. Knight Probability in Chessboard
    leetcode 129. Sum Root to Leaf Numbers
    leetcode 542. 01 Matrix 、663. Walls and Gates(lintcode) 、773. Sliding Puzzle 、803. Shortest Distance from All Buildings
    leetcode 402. Remove K Digits 、321. Create Maximum Number
    leetcode 139. Word Break 、140. Word Break II
    leetcode 329. Longest Increasing Path in a Matrix
    leetcode 334. Increasing Triplet Subsequence
    leetcode 403. Frog Jump
    android中webView加载H5,JS不能调用问题的解决
    通过nginx中转获取不到IP的问题解决
  • 原文地址:https://www.cnblogs.com/tonglin0325/p/6728874.html
Copyright © 2011-2022 走看看