zoukankan      html  css  js  c++  java
  • 大数据-05-Spark之读写HBase数据

    本文主要来自于 http://dblab.xmu.edu.cn/blog/1316-2/ 谢谢原作者

    准备工作一:创建一个HBase表

    这里依然是以student表为例进行演示。这里假设你已经成功安装了HBase数据库,如果你还没有安装,可以参考大数据-04-Hbase入门,进行安装,安装好以后,不要创建数据库和表,只要跟着本节后面的内容操作即可。

    因为hbase依赖于hadoop,因此启动和停止都是需要按照顺序进行
    如果安装了独立的zookeeper
    启动顺序: hadoop-> zookeeper-> hbase
    停止顺序:hbase-> zookeeper-> hadoop
    使用自带的zookeeper
    启动顺序: hadoop-> hbase
    停止顺序:hbase-> hadoop
    如下所示:

    cd /usr/local/hadoop
    ./sbin/start-all.sh
    cd /usr/local/hbase
    ./bin/start-hbase.sh //启动HBase
    ./bin/hbase shell  //启动hbase shell
    

    这样就可以进入hbase shell命令提示符状态。下面我们在HBase数据库中创建student表(注意:在关系型数据库MySQL中,需要首先创建数据库,然后再创建表,但是,在HBase数据库中,不需要创建数据库,只要直接创建表就可以):

    hbase> list                    # 查看所有表
    hbase> disable 'student'       # 禁用表
    hbase> drop 'student'          # 删除表
    

    下面让我们一起来创建一个student表,我们可以在hbase shell中使用下面命令创建:

    hbase> create 'student','info'
    hbase> describe 'student'
    //首先录入student表的第一个学生记录
    hbase> put 'student','1','info:name','Xueqian'
    hbase> put 'student','1','info:gender','F'
    hbase> put 'student','1','info:age','23'
    //然后录入student表的第二个学生记录
    hbase> put 'student','2','info:name','Weiliang'
    hbase> put 'student','2','info:gender','M'
    hbase> put 'student','2','info:age','24'
    

    数据录入结束后,可以用下面命令查看刚才已经录入的数据:

    //如果每次只查看一行,就用下面命令
    hbase> get 'student','1'
    //如果每次查看全部数据,就用下面命令
    hbase> scan 'student'
    

    准备工作二:配置Spark

    在开始编程操作HBase数据库之前,需要对做一些准备工作。
    (1)请新建一个终端,执行下面命令,把HBase的lib目录下的一些jar文件拷贝到Spark中,这些都是编程时需要引入的jar包,需要拷贝的jar文件包括:所有hbase开头的jar文件、guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar,可以打开一个终端按照以下命令来操作:

    cd /usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/
    mkdir hbase
    cd hbase
    cp /usr/local/hbase/lib/hbase*.jar ./
    cp /usr/local/hbase/lib/guava-12.0.1.jar ./
    cp /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar ./
    cp /usr/local/hbase/lib/protobuf-java-2.5.0.jar ./
    

    只有这样,后面编译和运行过程才不会出错。

    编写程序读取HBase数据

    如果要让Spark读取HBase,就需要使用SparkContext提供的newAPIHadoopRDD API将表的内容以RDD的形式加载到Spark中。
    请在Linux系统中打开一个终端,然后执行以下命令:

    cd /usr/local/spark/mycode
    mkdir hbase
    cd hbase
    mkdir -p src/main/scala
    cd src/main/scala
    vim SparkOperateHBase.scala
    

    然后,在SparkOperateHBase.scala文件中输入以下代码:

    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.hbase._
    import org.apache.hadoop.hbase.client._
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    
    object SparkOperateHBase {
    def main(args: Array[String]) {
    
        val conf = HBaseConfiguration.create()
        val sc = new SparkContext(new SparkConf())
        //设置查询的表名
        conf.set(TableInputFormat.INPUT_TABLE, "student")
        val stuRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
        val count = stuRDD.count()
        println("Students RDD Count:" + count)
        stuRDD.cache()
    
        //遍历输出
        stuRDD.foreach({ case (_,result) =>
            val key = Bytes.toString(result.getRow)
            val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes))
            val gender = Bytes.toString(result.getValue("info".getBytes,"gender".getBytes))
            val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes))
            println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age)
        })
    }
    }
    

    然后就可以用sbt打包编译。不过,在编译之前,需要新建一个simple.sbt文件,在simple.sbt配置文件中,需要知道scalaVersion、spark-core、hbase-client、hbase-common、hbase-server的版本号。在前面章节大数据-03-Spark入门的“编写Scala独立应用程序”部分,我们已经介绍了如何寻找scalaVersion和spark-core的版本号,这里不再赘述。现在介绍如何找到你自己电脑上安装的HBase的hbase-client、hbase-common、hbase-server的版本号。
    请在Linux系统中打开一个终端,输入下面命令:

    cd /usr/local/hbase    # 这是笔者电脑的hbase安装目录
    cd lib
    ls
    

    ls命令会把“/usr/local/hbase/lib”目录下的所有jar文件全部列出来,其中,就可以看到下面三个文件:

    hbase-client-1.1.2.jar          
    hbase-common-1.1.2.jar          
    hbase-server-1.1.2.jar
    

    根据上面三个文件,我们就可以得知hbase-client、hbase-common、hbase-server的版本号是1.1.5(当然,你的电脑上可能不是这个版本号,请以你自己的版本号为准)。
    有了这些版本号信息,我们就可以新建一个simple.sbt文件:

    cd /usr/local/spark/mycode/hbase
    vim simple.sbt
    

    然后在simple.sbt中录入下面内容:

    name := "Simple Project"
    version := "1.0"
    scalaVersion := "2.11.8"
    libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0"
    libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.1.2"
    libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.1.2"
    libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.1.2"
    

    保存该文件,退出vim编辑器。
    然后,输入下面命令:

    find . 
    

    应该可以看到类似下面的文件结构:

    .
    ./src
    ./src/main
    ./src/main/scala
    ./src/main/scala/SparkOperateHBase.scala
    ./simple.sbt
    

    下面就可以运行sbt打包命令:

    /usr/local/sbt/sbt package
    

    打包成功以后,生成的 jar 包的位置为 /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar。
    最后,通过 spark-submit 运行程序。我们就可以将生成的 jar 包通过 spark-submit 提交到 Spark 中运行了,命令如下:

    /usr/local/spark/spark-2.3.0-bin-hadoop2.7/bin/spark-submit  --driver-class-path /usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/hbase/*:/usr/local/hbase/conf --class "SparkOperateHBase"  /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar
    

    特别强调,上面命令中,必须使用“–driver-class-path”参数指定依赖JAR包的路径,而且必须把”/usr/local/hbase/conf”也加到路径中。
    执行后得到如下结果:

    Students RDD Count:2
    Row key:1 Name:Xueqian Gender:F Age:23
    Row key:2 Name:Weiliang Gender:M Age:24

    编写程序向HBase写入数据

    下面编写程序向HBase中写入两行数据。
    请打开一个Linux终端,输入如下命令:

    cd /usr/local/spark/mycode/hbase
    vim src/main/scala/SparkWriteHBase.scala
    

    上面命令用vim编辑器新建了一个文件SparkWriteHBase.scala,然后,在SparkWriteHBase.scala文件中输入下面代码:

    import org.apache.hadoop.hbase.HBaseConfiguration  
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat  
    import org.apache.spark._  
    import org.apache.hadoop.mapreduce.Job  
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
    import org.apache.hadoop.hbase.client.Result  
    import org.apache.hadoop.hbase.client.Put  
    import org.apache.hadoop.hbase.util.Bytes  
    
    object SparkWriteHBase {  
    
      def main(args: Array[String]): Unit = {  
        val sparkConf = new SparkConf().setAppName("SparkWriteHBase").setMaster("local")  
        val sc = new SparkContext(sparkConf)        
        val tablename = "student"        
        sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)  
    
        val job = new Job(sc.hadoopConfiguration)  
        job.setOutputKeyClass(classOf[ImmutableBytesWritable])  
        job.setOutputValueClass(classOf[Result])    
        job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])    
    
        val indataRDD = sc.makeRDD(Array("3,Rongcheng,M,26","4,Guanhua,M,27")) //构建两行记录
        val rdd = indataRDD.map(_.split(',')).map{arr=>{  
          val put = new Put(Bytes.toBytes(arr(0))) //行健的值 
          put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))  //info:name列的值
          put.add(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2)))  //info:gender列的值
                put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3).toInt))  //info:age列的值
          (new ImmutableBytesWritable, put)   
        }}        
        rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())  
      }    
    } 
    

    保存该文件退出vim编辑器,然后,使用sbt打包编译,命令如下:

    /usr/local/sbt/sbt package
    

    打包成功以后,生成的 jar 包的位置为 /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar。实际上,由于之前我们已经编写了另外一个代码文件SparkOperateHBase.scala,所以,simple-project_2.11-1.0.jar中实际包含了SparkOperateHBase.scala和SparkWriteHBase.scala两个代码文件的编译结果(class文件),在运行命令时,可以通过–class后面的名称参数来决定运行哪个程序, 这个名字就是scala文件名。
    最后,通过 spark-submit 运行程序。我们就可以将生成的 jar 包通过 spark-submit 提交到 Spark 中运行了,命令如下:

    /usr/local/spark/spark-2.3.0-bin-hadoop2.7/bin/spark-submit  --driver-class-path /usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/hbase/*:/usr/local/hbase/conf --class "SparkWriteHBase"  /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar
    

    执行后,我们可以切换到刚才的HBase终端窗口,在HBase shell中输入如下命令查看结果:

    hbase> scan 'student'
    
  • 相关阅读:
    可视化工具Grafana:简介及安装
    数据采集工具Telegraf:简介及安装
    怒怼某些自媒体培训机构,吃相不要太难看了!!!
    时序数据库InfluxDB:简介及安装
    jmeter(二十五)linux环境运行jmeter并生成报告
    Linux:CentOS7.4新建用户并授权
    服务端监控工具:Nmon使用方法
    Locust:简介和基本用法
    Quant Finance Master’s Guide 2020
    数据科学入门前需要知道的10件事
  • 原文地址:https://www.cnblogs.com/freebird92/p/8886535.html
Copyright © 2011-2022 走看看