zoukankan      html  css  js  c++  java
  • Spark DataFrame写入HBase的常用方式

    Spark是目前最流行的分布式计算框架,而HBase则是在HDFS之上的列式分布式存储引擎,基于Spark做离线或者实时计算,数据结果保存在HBase中是目前很流行的做法。例如用户画像、单品画像、推荐系统等都可以用HBase作为存储媒介,供客户端使用。

    因此Spark如何向HBase中写数据就成为很重要的一个环节了。本文将会介绍三种写入的方式,其中一种还在期待中,暂且官网即可...

    代码在spark 2.2.0版本亲测

    1. 基于HBase API批量写入

    第一种是最简单的使用方式了,就是基于RDD的分区,由于在spark中一个partition总是存储在一个excutor上,因此可以创建一个HBase连接,提交整个partition的内容。

    大致的代码是:

    rdd.foreachPartition { records =>
    	val config = HBaseConfiguration.create
        config.set("hbase.zookeeper.property.clientPort", "2181")
        config.set("hbase.zookeeper.quorum", "a1,a2,a3")
        val connection = ConnectionFactory.createConnection(config)
        val table = connection.getTable(TableName.valueOf("rec:user_rec"))
        
        // 举个例子而已,真实的代码根据records来
    	val list = new java.util.ArrayList[Put]
        for(i <- 0 until 10){
    		val put = new Put(Bytes.toBytes(i.toString))
            put.addColumn(Bytes.toBytes("t"), Bytes.toBytes("aaaa"), Bytes.toBytes("1111"))
            list.add(put)
        }
        // 批量提交
    	table.put(list)
    	// 分区数据写入HBase后关闭连接
        table.close()
    }
    

    这样每次写的代码很多,显得不够友好,如果能跟dataframe保存parquet、csv之类的就好了。下面就看看怎么实现dataframe直接写入hbase吧!

    2. Hortonworks的SHC写入

    由于这个插件是hortonworks提供的,maven的中央仓库并没有直接可下载的版本。需要用户下载源码自己编译打包,如果有maven私库,可以上传到自己的maven私库里面。具体的步骤可以参考如下:

    2.1 下载源码、编译、上传

    去官网github下载即可:https://github.com/hortonworks-spark/shc
    可以直接按照下面的readme说明来,也可以跟着我的笔记走。

    下载完成后,如果有自己的私库,可以修改shc中的distributionManagement。然后点击旁边的maven插件deploy发布工程,如果只想打成jar包,那就直接install就可以了。

    2.2 引入

    在pom.xml中引入:

    <dependency>
        <groupId>com.hortonworks</groupId>
        <artifactId>shc-core</artifactId>
        <version>1.1.2-2.2-s_2.11-SNAPSHOT</version>
    </dependency>
    

    2.3

    首先创建应用程序,Application.scala

    object Application {
    	def main(args: Array[String]): Unit = {
    		val spark = SparkSession.builder().master("local").appName("normal").getOrCreate()
    	    spark.sparkContext.setLogLevel("warn")
    		val data = (0 to 255).map { i =>  HBaseRecord(i, "extra")}
    
    	    val df:DataFrame = spark.createDataFrame(data)
    	    df.write
    	      .mode(SaveMode.Overwrite)
    	      .options(Map(HBaseTableCatalog.tableCatalog -> catalog))
    	      .format("org.apache.spark.sql.execution.datasources.hbase")
    	      .save()
    	}
    	def catalog = s"""{
                       |"table":{"namespace":"rec", "name":"user_rec"},
                       |"rowkey":"key",
                       |"columns":{
                       |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                       |"col1":{"cf":"t", "col":"col1", "type":"boolean"},
                       |"col2":{"cf":"t", "col":"col2", "type":"double"},
                       |"col3":{"cf":"t", "col":"col3", "type":"float"},
                       |"col4":{"cf":"t", "col":"col4", "type":"int"},
                       |"col5":{"cf":"t", "col":"col5", "type":"bigint"},
                       |"col6":{"cf":"t", "col":"col6", "type":"smallint"},
                       |"col7":{"cf":"t", "col":"col7", "type":"string"},
                       |"col8":{"cf":"t", "col":"col8", "type":"tinyint"}
                       |}
                       |}""".stripMargin
    }
    case class HBaseRecord(
                      col0: String,
                      col1: Boolean,
                      col2: Double,
                      col3: Float,
                      col4: Int,
                      col5: Long,
                      col6: Short,
                      col7: String,
                      col8: Byte)
    
    object HBaseRecord
    {
      def apply(i: Int, t: String): HBaseRecord = {
        val s = s"""row${"%03d".format(i)}"""
        HBaseRecord(s,
          i % 2 == 0,
          i.toDouble,
          i.toFloat,
          i,
          i.toLong,
          i.toShort,
          s"String$i: $t",
          i.toByte)
      }
    }
    

    然后再resources目录下,添加hbase-site.xml、hdfs-site.xml、core-site.xml等配置文件。主要是获取Hbase中的一些连接地址。

    3. HBase 2.x+即将发布的hbase-spark

    如果有浏览官网习惯的同学,一定会发现,HBase官网的版本已经到了3.0.0-SNAPSHOT,并且早就在2.0版本就增加了一个hbase-spark模块,使用的方法跟上面hortonworks一样,只是format的包名不同而已,猜想就是把hortonworks给拷贝过来了。

    另外Hbase-spark 2.0.0-alpha4目前已经公开在maven仓库中了。

    http://mvnrepository.com/artifact/org.apache.hbase/hbase-spark

    不过,内部的spark版本是1.6.0,太陈旧了!!!!真心等不起了...

    期待hbase-spark官方能快点提供正式版吧。

    参考

    1. hortonworks-spark/shc github:https://github.com/hortonworks-spark/shc
    2. maven仓库地址: http://mvnrepository.com/artifact/org.apache.hbase/hbase-spark
    3. Hbase spark sql/ dataframe官方文档:https://hbase.apache.org/book.html#_sparksql_dataframes
  • 相关阅读:
    linux系统mysql数据库安装步骤
    uwsgi 配置文件
    服务器重启后,docker无法启动
    标准库functools.wraps的使用方法
    闭包函数延迟绑定问题
    python的面向对象编程
    python中包的介绍与常用模块
    drf知识整理一
    Django知识整理四(choices参数,MTV与MVC模型,ajax介绍,ajax传json文件,ajax传文件,contentType前后端传输数据编码格式)
    DRF序列化组件
  • 原文地址:https://www.cnblogs.com/xing901022/p/8486290.html
Copyright © 2011-2022 走看看