zoukankan      html  css  js  c++  java
  • spark读写hbase性能对比

    一、spark写入hbase

        hbase client以put方式封装数据,并支持逐条或批量插入。spark中内置saveAsHadoopDataset和saveAsNewAPIHadoopDataset两种方式写入hbase。为此,将同样的数据插入其中对比性能。

    依赖如下:

     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
         
        <dependency>
         
        <groupId>org.apache.spark</groupId>
         
        <artifactId>spark-core_2.11</artifactId>
         
        <version>2.3.1</version>
         
        </dependency>
         
        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
         
        <dependency>
         
        <groupId>org.apache.hbase</groupId>
         
        <artifactId>hbase-client</artifactId>
         
        <version>1.4.6</version>
         
        </dependency>
         
        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-common -->
         
        <dependency>
         
        <groupId>org.apache.hbase</groupId>
         
        <artifactId>hbase-common</artifactId>
         
        <version>1.4.6</version>
         
        </dependency>
         
        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
         
        <dependency>
         
        <groupId>org.apache.hbase</groupId>
         
        <artifactId>hbase-server</artifactId>
         
        <version>1.4.6</version>
         
        </dependency>
         
        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-protocol -->
         
        <dependency>
         
        <groupId>org.apache.hbase</groupId>
         
        <artifactId>hbase-protocol</artifactId>
         
        <version>1.4.6</version>
         
        </dependency>
         
        <!-- https://mvnrepository.com/artifact/commons-cli/commons-cli -->
         
        <dependency>
         
        <groupId>commons-cli</groupId>
         
        <artifactId>commons-cli</artifactId>
         
        <version>1.4</version>
         
        </dependency>

    1. put逐条插入
    1.1 hbase客户端建表

    create 'keyword1',{NAME=>'info',BLOCKSIZE=>'16384',BLOCKCACHE=>'false'},{NUMREGIONS=>10,SPLITALGO=>'HexStringSplit'}


    1.2 code

           

    val start_time1 = new Date().getTime
         
            keyword.foreachPartition(records =>{
         
              HBaseUtils1x.init()
         
              records.foreach(f => {
         
                val keyword = f.getString(0)
         
                val app_id = f.getString(1)
         
                val catalog_name = f.getString(2)
         
                val keyword_catalog_pv = f.getString(3)
         
                val keyword_catalog_pv_rate = f.getString(4)
         
                val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
         
                val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
         
                HBaseUtils1x.insertData(tableName1, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))
         
              })
         
              HBaseUtils1x.closeConnection()
         
            })
         
            var end_time1 =new Date().getTime
         
            println("HBase逐条插入运行时间为:" + (end_time1 - start_time1))



    2.put批量插入
    2.1 建表

    create 'keyword2',{NAME=>'info',BLOCKSIZE=>'16384',BLOCKCACHE=>'false'},{NUMREGIONS=>10,SPLITALGO=>'HexStringSplit'}


    2.2 代码

          

     val start_time2 = new Date().getTime
         
            keyword.foreachPartition(records =>{
         
              HBaseUtils1x.init()
         
              val puts = ArrayBuffer[Put]()
         
              records.foreach(f => {
         
                val keyword = f.getString(0)
         
                val app_id = f.getString(1)
         
                val catalog_name = f.getString(2)
         
                val keyword_catalog_pv = f.getString(3)
         
                val keyword_catalog_pv_rate = f.getString(4)
         
                val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
         
                val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
         
                try{
         
                  puts.append(HBaseUtils1x.getPutAction(rowKey,
         
                    cf, columns, cols))
         
                }catch{
         
                  case e:Throwable => println(f)
         
                }
         
              })
         
              import collection.JavaConverters._
         
              HBaseUtils1x.addDataBatchEx(tableName2, puts.asJava)
         
              HBaseUtils1x.closeConnection()
         
            })
         
            val end_time2 = new Date().getTime
         
            println("HBase批量插入运行时间为:" + (end_time2 - start_time2))


    3. saveAsHadoopDataset写入

        使用旧的Hadoop API将RDD输出到任何Hadoop支持的存储系统,为该存储系统使用Hadoop JobConf对象。JobConf设置一个OutputFormat和任何需要输出的路径,就像为Hadoop MapReduce作业配置那样。
    3.1 建表

    create 'keyword3',{NAME=>'info',BLOCKSIZE=>'16384',BLOCKCACHE=>'false'},{NUMREGIONS=>10,SPLITALGO=>'HexStringSplit'}


    3.2 代码

    val start_time3 = new Date().getTime
         
            keyword.rdd.map(f =>{
         
              val keyword = f.getString(0)
         
              val app_id = f.getString(1)
         
              val catalog_name = f.getString(2)
         
              val keyword_catalog_pv = f.getString(3)
         
              val keyword_catalog_pv_rate = f.getString(4)
         
              val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
         
              val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
         
              (new ImmutableBytesWritable, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))
         
            }).saveAsHadoopDataset(HBaseUtils1x.getJobConf(tableName3))
         
            val end_time3 = new Date().getTime
         
            println("saveAsHadoopDataset方式写入运行时间为:" + (end_time3 - start_time3))


    4. saveAsNewAPIHadoopDataset写入

        使用新的Hadoop API将RDD输出到任何Hadoop支持存储系统,为该存储系统使用Hadoop Configuration对象.Conf设置一个OutputFormat和任何需要的输出路径,就像为Hadoop MapReduce作业配置那样。
    4.1 建表

    create 'keyword4',{NAME=>'info',BLOCKSIZE=>'16384',BLOCKCACHE=>'false'},{NUMREGIONS=>10,SPLITALGO=>'HexStringSplit'}


    4.2 code

       

    val start_time4 = new Date().getTime
         
            keyword.rdd.map(f =>{
         
              val keyword = f.getString(0)
         
              val app_id = f.getString(1)
         
              val catalog_name = f.getString(2)
         
              val keyword_catalog_pv = f.getString(3)
         
              val keyword_catalog_pv_rate = f.getString(4)
         
              val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
         
              val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
         
              (new ImmutableBytesWritable, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))
         
            }).saveAsNewAPIHadoopDataset(HBaseUtils1x.getNewJobConf(tableName4,spark.sparkContext))
         
            val end_time4 = new Date().getTime
         
            println("saveAsNewAPIHadoopDataset方式写入运行时间为:" + (end_time4 - start_time4))



    5. 性能对比
         

    可以看出,saveAsHadoopDataset和saveAsNewAPIHadoopDataset方式要优于put逐条插入和批量插入。


    二、spark读取hbase

    newAPIHadoopRDD API可以将hbase表转化为RDD,具体使用如下:

    val start_time1 = new Date().getTime
     
        val hbaseRdd = spark.sparkContext.newAPIHadoopRDD(HBaseUtils1x.getNewConf(tableName1), classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
     
        println(hbaseRdd.count())
     
        hbaseRdd.foreach{
     
          case(_,result) => {
     
            // 获取行键
     
            val rowKey = Bytes.toString(result.getRow)
     
            val keyword = Bytes.toString(result.getValue(cf.getBytes(), "keyword".getBytes()))
     
            val keyword_catalog_pv_rate = Bytes.toDouble(result.getValue(cf.getBytes(), "keyword_catalog_pv_rate".getBytes()))
     
            println(rowKey + "," + keyword + "," + keyword_catalog_pv_rate)
     
          }
     
        }

    三、完整代码

       

    package com.sparkStudy.utils
         
         
         
        import java.util.Date
         
        import org.apache.hadoop.hbase.client.{Put, Result}
         
        import org.apache.hadoop.hbase.io.ImmutableBytesWritable
         
        import org.apache.hadoop.hbase.mapreduce.TableInputFormat
         
        import org.apache.hadoop.hbase.util.{Bytes, MD5Hash}
         
        import org.apache.spark.sql.SparkSession
         
        import scala.collection.mutable.ArrayBuffer
         
         
         
        /**
          * @Author: JZ.lee
          * @Description: TODO
          * @Date: 18-8-28 下午4:28
          * @Modified By:
          */
         
        object SparkRWHBase {
         
          def main(args: Array[String]): Unit = {
         
            val spark = SparkSession.builder()
         
              .appName("SparkRWHBase")
         
              .master("local[2]")
         
              .config("spark.some.config.option", "some-value")
         
              .getOrCreate()
         
         
         
            val keyword = spark.read
         
              .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
         
              .option("header",false)
         
              .option("delimiter",",")
         
              .load("file:/opt/data/keyword_catalog_day.csv")
         
         
         
            val tableName1 = "keyword1"
         
            val tableName2 = "keyword2"
         
            val tableName3 = "keyword3"
         
            val tableName4 = "keyword4"
         
            val cf = "info"
         
            val columns = Array("keyword", "app_id", "catalog_name", "keyword_catalog_pv", "keyword_catalog_pv_rate")
         
         
         
            val start_time1 = new Date().getTime
         
            keyword.foreachPartition(records =>{
         
              HBaseUtils1x.init()
         
              records.foreach(f => {
         
                val keyword = f.getString(0)
         
                val app_id = f.getString(1)
         
                val catalog_name = f.getString(2)
         
                val keyword_catalog_pv = f.getString(3)
         
                val keyword_catalog_pv_rate = f.getString(4)
         
                val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
         
                val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
         
                HBaseUtils1x.insertData(tableName1, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))
         
              })
         
              HBaseUtils1x.closeConnection()
         
            })
         
            var end_time1 =new Date().getTime
         
            println("HBase逐条插入运行时间为:" + (end_time1 - start_time1))
         
         
         
            val start_time2 = new Date().getTime
         
            keyword.foreachPartition(records =>{
         
              HBaseUtils1x.init()
         
              val puts = ArrayBuffer[Put]()
         
              records.foreach(f => {
         
                val keyword = f.getString(0)
         
                val app_id = f.getString(1)
         
                val catalog_name = f.getString(2)
         
                val keyword_catalog_pv = f.getString(3)
         
                val keyword_catalog_pv_rate = f.getString(4)
         
                val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
         
                val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
         
                try{
         
                  puts.append(HBaseUtils1x.getPutAction(rowKey,
         
                    cf, columns, cols))
         
                }catch{
         
                  case e:Throwable => println(f)
         
                }
         
              })
         
              import collection.JavaConverters._
         
              HBaseUtils1x.addDataBatchEx(tableName2, puts.asJava)
         
              HBaseUtils1x.closeConnection()
         
            })
         
            val end_time2 = new Date().getTime
         
            println("HBase批量插入运行时间为:" + (end_time2 - start_time2))
         
         
         
            val start_time3 = new Date().getTime
         
            keyword.rdd.map(f =>{
         
              val keyword = f.getString(0)
         
              val app_id = f.getString(1)
         
              val catalog_name = f.getString(2)
         
              val keyword_catalog_pv = f.getString(3)
         
              val keyword_catalog_pv_rate = f.getString(4)
         
              val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
         
              val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
         
              (new ImmutableBytesWritable, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))
         
            }).saveAsHadoopDataset(HBaseUtils1x.getJobConf(tableName3))
         
            val end_time3 = new Date().getTime
         
            println("saveAsHadoopDataset方式写入运行时间为:" + (end_time3 - start_time3))
         
            //
         
            val start_time4 = new Date().getTime
         
            keyword.rdd.map(f =>{
         
              val keyword = f.getString(0)
         
              val app_id = f.getString(1)
         
              val catalog_name = f.getString(2)
         
              val keyword_catalog_pv = f.getString(3)
         
              val keyword_catalog_pv_rate = f.getString(4)
         
              val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
         
              val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
         
              (new ImmutableBytesWritable, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))
         
            }).saveAsNewAPIHadoopDataset(HBaseUtils1x.getNewJobConf(tableName4,spark.sparkContext))
         
            val end_time4 = new Date().getTime
         
            println("saveAsNewAPIHadoopDataset方式写入运行时间为:" + (end_time4 - start_time4))
         
         
         
            val hbaseRdd = spark.sparkContext.newAPIHadoopRDD(HBaseUtils1x.getNewConf(tableName1), classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
         
            println(hbaseRdd.count())
         
            hbaseRdd.foreach{
         
              case(_,result) => {
         
                // 获取行键
         
                val rowKey = Bytes.toString(result.getRow)
         
                val keyword = Bytes.toString(result.getValue(cf.getBytes(), "keyword".getBytes()))
         
                val keyword_catalog_pv_rate = Bytes.toDouble(result.getValue(cf.getBytes(), "keyword_catalog_pv_rate".getBytes()))
         
                println(rowKey + "," + keyword + "," + keyword_catalog_pv_rate)
         
              }
         
            }
         
          }
         
        }
         
        package com.sparkStudy.utils
         
         
         
        import org.apache.hadoop.conf.Configuration
         
        import org.apache.hadoop.hbase.client.BufferedMutator.ExceptionListener
         
        import org.apache.hadoop.hbase.client._
         
        import org.apache.hadoop.hbase.io.ImmutableBytesWritable
         
        import org.apache.hadoop.hbase.protobuf.ProtobufUtil
         
        import org.apache.hadoop.hbase.util.{Base64, Bytes}
         
        import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
         
        import org.apache.hadoop.mapred.JobConf
         
        import org.apache.hadoop.mapreduce.Job
         
        import org.apache.spark.SparkContext
         
        import org.slf4j.LoggerFactory
         
         
         
         
         
        /**
          * @Author: JZ.Lee
          * @Description:HBase1x增删改查
          * @Date: Created at 上午11:02 18-8-14
          * @Modified By:
          */
         
        object HBaseUtils1x {
         
          private val LOGGER = LoggerFactory.getLogger(this.getClass)
         
          private var connection:Connection = null
         
          private var conf:Configuration = null
         
         
         
          def init() = {
         
            conf = HBaseConfiguration.create()
         
            conf.set("hbase.zookeeper.quorum", "lee")
         
            connection = ConnectionFactory.createConnection(conf)
         
          }
         
         
         
          def getJobConf(tableName:String) = {
         
            val conf = HBaseConfiguration.create()
         
            val jobConf = new JobConf(conf)
         
            jobConf.set("hbase.zookeeper.quorum", "lee")
         
            jobConf.set("hbase.zookeeper.property.clientPort", "2181")
         
            jobConf.set(org.apache.hadoop.hbase.mapred.TableOutputFormat.OUTPUT_TABLE,tableName)
         
            jobConf.setOutputFormat(classOf[org.apache.hadoop.hbase.mapred.TableOutputFormat])
         
            jobConf
         
          }
         
         
         
          def getNewConf(tableName:String) = {
         
            conf = HBaseConfiguration.create()
         
            conf.set("hbase.zookeeper.quorum", "lee")
         
            conf.set("hbase.zookeeper.property.clientPort", "2181")
         
            conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE,tableName)
         
            val scan = new Scan()
         
            conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN,Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))
         
            conf
         
          }
         
         
         
          def getNewJobConf(tableName:String) = {
         
            val conf = HBaseConfiguration.create()
            conf.set("hbase.zookeeper.quorum", Constants.ZOOKEEPER_SERVER_NODE)
            conf.set("hbase.zookeeper.property.clientPort", "2181")
            conf.set("hbase.defaults.for.version.skip", "true")
            conf.set(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.OUTPUT_TABLE, tableName)
            conf.setClass("mapreduce.job.outputformat.class", classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[String]],
              classOf[org.apache.hadoop.mapreduce.OutputFormat[String, Mutation]])
            new JobConf(conf)
          }
         
         
         
          def closeConnection(): Unit = {
         
            connection.close()
         
          }
         
          def getGetAction(rowKey: String):Get = {
         
            val getAction = new Get(Bytes.toBytes(rowKey));
         
            getAction.setCacheBlocks(false);
         
            getAction
         
          }
         
         
         
          def getPutAction(rowKey: String, familyName:String, column: Array[String], value: Array[String]):Put = {
         
            val put: Put = new Put(Bytes.toBytes(rowKey));
         
            for (i <- 0 until(column.length)) {
         
              put.add(Bytes.toBytes(familyName), Bytes.toBytes(column(i)), Bytes.toBytes(value(i)));
         
            }
         
            put
         
          }
         
         
         
          def insertData(tableName:String, put: Put) = {
         
            val name = TableName.valueOf(tableName)
         
            val table = connection.getTable(name)
         
            table.put(put)
         
          }
         
         
         
          def addDataBatchEx(tableName:String, puts:java.util.List[Put]): Unit = {
         
            val name = TableName.valueOf(tableName)
         
            val table = connection.getTable(name)
         
            val listener = new ExceptionListener {
         
              override def onException
         
              (e: RetriesExhaustedWithDetailsException, bufferedMutator: BufferedMutator): Unit = {
         
                for(i <-0 until e.getNumExceptions){
         
                  LOGGER.info("写入put失败:" + e.getRow(i))
         
                }
         
              }
         
            }
         
            val params = new BufferedMutatorParams(name)
         
              .listener(listener)
         
              .writeBufferSize(4*1024*1024)
         
            try{
         
              val mutator = connection.getBufferedMutator(params)
         
              mutator.mutate(puts)
         
              mutator.close()
         
            }catch {
         
              case e:Throwable => e.printStackTrace()
         
            }
         
          }
         
        }


         
     https://blog.csdn.net/baymax_007/article/details/82191188 

  • 相关阅读:
    Springboot 之 自定义配置文件及读取配置文件
    SQLSERVER系统视图 sql server系统表详细说明
    MySQL Workbench建表时 PK NN UQ BIN UN ZF AI 的含义
    使用Ecplise git commit时出现"There are no stages files"
    maven添加sqlserver的jdbc驱动包
    java将XML文档转换成json格式数据
    java将XML文档转换成json格式数据
    cannot be resolved. It is indirectly referenced from required .class files
    org.codehaus.jackson.map.JsonMappingException: Can not construct instance of java.util.Date from String value '2012-12-12 12:01:01': not a valid representation (error: Can not parse date "2012-12-
    @Autowired注解和静态方法 NoClassDefFoundError could not initialize class 静态类
  • 原文地址:https://www.cnblogs.com/felixzh/p/10251200.html
Copyright © 2011-2022 走看看