zoukankan      html  css  js  c++  java
  • spark读写hbase性能对比

    一、spark写入hbase

        hbase client以put方式封装数据,并支持逐条或批量插入。spark中内置saveAsHadoopDataset和saveAsNewAPIHadoopDataset两种方式写入hbase。为此,将同样的数据插入其中对比性能。

    依赖如下:

     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
         
        <dependency>
         
        <groupId>org.apache.spark</groupId>
         
        <artifactId>spark-core_2.11</artifactId>
         
        <version>2.3.1</version>
         
        </dependency>
         
        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
         
        <dependency>
         
        <groupId>org.apache.hbase</groupId>
         
        <artifactId>hbase-client</artifactId>
         
        <version>1.4.6</version>
         
        </dependency>
         
        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-common -->
         
        <dependency>
         
        <groupId>org.apache.hbase</groupId>
         
        <artifactId>hbase-common</artifactId>
         
        <version>1.4.6</version>
         
        </dependency>
         
        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
         
        <dependency>
         
        <groupId>org.apache.hbase</groupId>
         
        <artifactId>hbase-server</artifactId>
         
        <version>1.4.6</version>
         
        </dependency>
         
        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-protocol -->
         
        <dependency>
         
        <groupId>org.apache.hbase</groupId>
         
        <artifactId>hbase-protocol</artifactId>
         
        <version>1.4.6</version>
         
        </dependency>
         
        <!-- https://mvnrepository.com/artifact/commons-cli/commons-cli -->
         
        <dependency>
         
        <groupId>commons-cli</groupId>
         
        <artifactId>commons-cli</artifactId>
         
        <version>1.4</version>
         
        </dependency>

    1. put逐条插入
    1.1 hbase客户端建表

    create 'keyword1',{NAME=>'info',BLOCKSIZE=>'16384',BLOCKCACHE=>'false'},{NUMREGIONS=>10,SPLITALGO=>'HexStringSplit'}


    1.2 code

           

    val start_time1 = new Date().getTime
         
            keyword.foreachPartition(records =>{
         
              HBaseUtils1x.init()
         
              records.foreach(f => {
         
                val keyword = f.getString(0)
         
                val app_id = f.getString(1)
         
                val catalog_name = f.getString(2)
         
                val keyword_catalog_pv = f.getString(3)
         
                val keyword_catalog_pv_rate = f.getString(4)
         
                val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
         
                val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
         
                HBaseUtils1x.insertData(tableName1, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))
         
              })
         
              HBaseUtils1x.closeConnection()
         
            })
         
            var end_time1 =new Date().getTime
         
            println("HBase逐条插入运行时间为:" + (end_time1 - start_time1))



    2.put批量插入
    2.1 建表

    create 'keyword2',{NAME=>'info',BLOCKSIZE=>'16384',BLOCKCACHE=>'false'},{NUMREGIONS=>10,SPLITALGO=>'HexStringSplit'}


    2.2 代码

          

     val start_time2 = new Date().getTime
         
            keyword.foreachPartition(records =>{
         
              HBaseUtils1x.init()
         
              val puts = ArrayBuffer[Put]()
         
              records.foreach(f => {
         
                val keyword = f.getString(0)
         
                val app_id = f.getString(1)
         
                val catalog_name = f.getString(2)
         
                val keyword_catalog_pv = f.getString(3)
         
                val keyword_catalog_pv_rate = f.getString(4)
         
                val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
         
                val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
         
                try{
         
                  puts.append(HBaseUtils1x.getPutAction(rowKey,
         
                    cf, columns, cols))
         
                }catch{
         
                  case e:Throwable => println(f)
         
                }
         
              })
         
              import collection.JavaConverters._
         
              HBaseUtils1x.addDataBatchEx(tableName2, puts.asJava)
         
              HBaseUtils1x.closeConnection()
         
            })
         
            val end_time2 = new Date().getTime
         
            println("HBase批量插入运行时间为:" + (end_time2 - start_time2))


    3. saveAsHadoopDataset写入

        使用旧的Hadoop API将RDD输出到任何Hadoop支持的存储系统,为该存储系统使用Hadoop JobConf对象。JobConf设置一个OutputFormat和任何需要输出的路径,就像为Hadoop MapReduce作业配置那样。
    3.1 建表

    create 'keyword3',{NAME=>'info',BLOCKSIZE=>'16384',BLOCKCACHE=>'false'},{NUMREGIONS=>10,SPLITALGO=>'HexStringSplit'}


    3.2 代码

    val start_time3 = new Date().getTime
         
            keyword.rdd.map(f =>{
         
              val keyword = f.getString(0)
         
              val app_id = f.getString(1)
         
              val catalog_name = f.getString(2)
         
              val keyword_catalog_pv = f.getString(3)
         
              val keyword_catalog_pv_rate = f.getString(4)
         
              val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
         
              val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
         
              (new ImmutableBytesWritable, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))
         
            }).saveAsHadoopDataset(HBaseUtils1x.getJobConf(tableName3))
         
            val end_time3 = new Date().getTime
         
            println("saveAsHadoopDataset方式写入运行时间为:" + (end_time3 - start_time3))


    4. saveAsNewAPIHadoopDataset写入

        使用新的Hadoop API将RDD输出到任何Hadoop支持存储系统,为该存储系统使用Hadoop Configuration对象.Conf设置一个OutputFormat和任何需要的输出路径,就像为Hadoop MapReduce作业配置那样。
    4.1 建表

    create 'keyword4',{NAME=>'info',BLOCKSIZE=>'16384',BLOCKCACHE=>'false'},{NUMREGIONS=>10,SPLITALGO=>'HexStringSplit'}


    4.2 code

       

    val start_time4 = new Date().getTime
         
            keyword.rdd.map(f =>{
         
              val keyword = f.getString(0)
         
              val app_id = f.getString(1)
         
              val catalog_name = f.getString(2)
         
              val keyword_catalog_pv = f.getString(3)
         
              val keyword_catalog_pv_rate = f.getString(4)
         
              val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
         
              val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
         
              (new ImmutableBytesWritable, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))
         
            }).saveAsNewAPIHadoopDataset(HBaseUtils1x.getNewJobConf(tableName4,spark.sparkContext))
         
            val end_time4 = new Date().getTime
         
            println("saveAsNewAPIHadoopDataset方式写入运行时间为:" + (end_time4 - start_time4))



    5. 性能对比
         

    可以看出,saveAsHadoopDataset和saveAsNewAPIHadoopDataset方式要优于put逐条插入和批量插入。


    二、spark读取hbase

    newAPIHadoopRDD API可以将hbase表转化为RDD,具体使用如下:

    val start_time1 = new Date().getTime
     
        val hbaseRdd = spark.sparkContext.newAPIHadoopRDD(HBaseUtils1x.getNewConf(tableName1), classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
     
        println(hbaseRdd.count())
     
        hbaseRdd.foreach{
     
          case(_,result) => {
     
            // 获取行键
     
            val rowKey = Bytes.toString(result.getRow)
     
            val keyword = Bytes.toString(result.getValue(cf.getBytes(), "keyword".getBytes()))
     
            val keyword_catalog_pv_rate = Bytes.toDouble(result.getValue(cf.getBytes(), "keyword_catalog_pv_rate".getBytes()))
     
            println(rowKey + "," + keyword + "," + keyword_catalog_pv_rate)
     
          }
     
        }

    三、完整代码

       

    package com.sparkStudy.utils
         
         
         
        import java.util.Date
         
        import org.apache.hadoop.hbase.client.{Put, Result}
         
        import org.apache.hadoop.hbase.io.ImmutableBytesWritable
         
        import org.apache.hadoop.hbase.mapreduce.TableInputFormat
         
        import org.apache.hadoop.hbase.util.{Bytes, MD5Hash}
         
        import org.apache.spark.sql.SparkSession
         
        import scala.collection.mutable.ArrayBuffer
         
         
         
        /**
          * @Author: JZ.lee
          * @Description: TODO
          * @Date: 18-8-28 下午4:28
          * @Modified By:
          */
         
        object SparkRWHBase {
         
          def main(args: Array[String]): Unit = {
         
            val spark = SparkSession.builder()
         
              .appName("SparkRWHBase")
         
              .master("local[2]")
         
              .config("spark.some.config.option", "some-value")
         
              .getOrCreate()
         
         
         
            val keyword = spark.read
         
              .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
         
              .option("header",false)
         
              .option("delimiter",",")
         
              .load("file:/opt/data/keyword_catalog_day.csv")
         
         
         
            val tableName1 = "keyword1"
         
            val tableName2 = "keyword2"
         
            val tableName3 = "keyword3"
         
            val tableName4 = "keyword4"
         
            val cf = "info"
         
            val columns = Array("keyword", "app_id", "catalog_name", "keyword_catalog_pv", "keyword_catalog_pv_rate")
         
         
         
            val start_time1 = new Date().getTime
         
            keyword.foreachPartition(records =>{
         
              HBaseUtils1x.init()
         
              records.foreach(f => {
         
                val keyword = f.getString(0)
         
                val app_id = f.getString(1)
         
                val catalog_name = f.getString(2)
         
                val keyword_catalog_pv = f.getString(3)
         
                val keyword_catalog_pv_rate = f.getString(4)
         
                val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
         
                val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
         
                HBaseUtils1x.insertData(tableName1, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))
         
              })
         
              HBaseUtils1x.closeConnection()
         
            })
         
            var end_time1 =new Date().getTime
         
            println("HBase逐条插入运行时间为:" + (end_time1 - start_time1))
         
         
         
            val start_time2 = new Date().getTime
         
            keyword.foreachPartition(records =>{
         
              HBaseUtils1x.init()
         
              val puts = ArrayBuffer[Put]()
         
              records.foreach(f => {
         
                val keyword = f.getString(0)
         
                val app_id = f.getString(1)
         
                val catalog_name = f.getString(2)
         
                val keyword_catalog_pv = f.getString(3)
         
                val keyword_catalog_pv_rate = f.getString(4)
         
                val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
         
                val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
         
                try{
         
                  puts.append(HBaseUtils1x.getPutAction(rowKey,
         
                    cf, columns, cols))
         
                }catch{
         
                  case e:Throwable => println(f)
         
                }
         
              })
         
              import collection.JavaConverters._
         
              HBaseUtils1x.addDataBatchEx(tableName2, puts.asJava)
         
              HBaseUtils1x.closeConnection()
         
            })
         
            val end_time2 = new Date().getTime
         
            println("HBase批量插入运行时间为:" + (end_time2 - start_time2))
         
         
         
            val start_time3 = new Date().getTime
         
            keyword.rdd.map(f =>{
         
              val keyword = f.getString(0)
         
              val app_id = f.getString(1)
         
              val catalog_name = f.getString(2)
         
              val keyword_catalog_pv = f.getString(3)
         
              val keyword_catalog_pv_rate = f.getString(4)
         
              val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
         
              val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
         
              (new ImmutableBytesWritable, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))
         
            }).saveAsHadoopDataset(HBaseUtils1x.getJobConf(tableName3))
         
            val end_time3 = new Date().getTime
         
            println("saveAsHadoopDataset方式写入运行时间为:" + (end_time3 - start_time3))
         
            //
         
            val start_time4 = new Date().getTime
         
            keyword.rdd.map(f =>{
         
              val keyword = f.getString(0)
         
              val app_id = f.getString(1)
         
              val catalog_name = f.getString(2)
         
              val keyword_catalog_pv = f.getString(3)
         
              val keyword_catalog_pv_rate = f.getString(4)
         
              val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
         
              val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
         
              (new ImmutableBytesWritable, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))
         
            }).saveAsNewAPIHadoopDataset(HBaseUtils1x.getNewJobConf(tableName4,spark.sparkContext))
         
            val end_time4 = new Date().getTime
         
            println("saveAsNewAPIHadoopDataset方式写入运行时间为:" + (end_time4 - start_time4))
         
         
         
            val hbaseRdd = spark.sparkContext.newAPIHadoopRDD(HBaseUtils1x.getNewConf(tableName1), classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
         
            println(hbaseRdd.count())
         
            hbaseRdd.foreach{
         
              case(_,result) => {
         
                // 获取行键
         
                val rowKey = Bytes.toString(result.getRow)
         
                val keyword = Bytes.toString(result.getValue(cf.getBytes(), "keyword".getBytes()))
         
                val keyword_catalog_pv_rate = Bytes.toDouble(result.getValue(cf.getBytes(), "keyword_catalog_pv_rate".getBytes()))
         
                println(rowKey + "," + keyword + "," + keyword_catalog_pv_rate)
         
              }
         
            }
         
          }
         
        }
         
        package com.sparkStudy.utils
         
         
         
        import org.apache.hadoop.conf.Configuration
         
        import org.apache.hadoop.hbase.client.BufferedMutator.ExceptionListener
         
        import org.apache.hadoop.hbase.client._
         
        import org.apache.hadoop.hbase.io.ImmutableBytesWritable
         
        import org.apache.hadoop.hbase.protobuf.ProtobufUtil
         
        import org.apache.hadoop.hbase.util.{Base64, Bytes}
         
        import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
         
        import org.apache.hadoop.mapred.JobConf
         
        import org.apache.hadoop.mapreduce.Job
         
        import org.apache.spark.SparkContext
         
        import org.slf4j.LoggerFactory
         
         
         
         
         
        /**
          * @Author: JZ.Lee
          * @Description:HBase1x增删改查
          * @Date: Created at 上午11:02 18-8-14
          * @Modified By:
          */
         
        object HBaseUtils1x {
         
          private val LOGGER = LoggerFactory.getLogger(this.getClass)
         
          private var connection:Connection = null
         
          private var conf:Configuration = null
         
         
         
          def init() = {
         
            conf = HBaseConfiguration.create()
         
            conf.set("hbase.zookeeper.quorum", "lee")
         
            connection = ConnectionFactory.createConnection(conf)
         
          }
         
         
         
          def getJobConf(tableName:String) = {
         
            val conf = HBaseConfiguration.create()
         
            val jobConf = new JobConf(conf)
         
            jobConf.set("hbase.zookeeper.quorum", "lee")
         
            jobConf.set("hbase.zookeeper.property.clientPort", "2181")
         
            jobConf.set(org.apache.hadoop.hbase.mapred.TableOutputFormat.OUTPUT_TABLE,tableName)
         
            jobConf.setOutputFormat(classOf[org.apache.hadoop.hbase.mapred.TableOutputFormat])
         
            jobConf
         
          }
         
         
         
          def getNewConf(tableName:String) = {
         
            conf = HBaseConfiguration.create()
         
            conf.set("hbase.zookeeper.quorum", "lee")
         
            conf.set("hbase.zookeeper.property.clientPort", "2181")
         
            conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE,tableName)
         
            val scan = new Scan()
         
            conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN,Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))
         
            conf
         
          }
         
         
         
          def getNewJobConf(tableName:String) = {
         
            val conf = HBaseConfiguration.create()
            conf.set("hbase.zookeeper.quorum", Constants.ZOOKEEPER_SERVER_NODE)
            conf.set("hbase.zookeeper.property.clientPort", "2181")
            conf.set("hbase.defaults.for.version.skip", "true")
            conf.set(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.OUTPUT_TABLE, tableName)
            conf.setClass("mapreduce.job.outputformat.class", classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[String]],
              classOf[org.apache.hadoop.mapreduce.OutputFormat[String, Mutation]])
            new JobConf(conf)
          }
         
         
         
          def closeConnection(): Unit = {
         
            connection.close()
         
          }
         
          def getGetAction(rowKey: String):Get = {
         
            val getAction = new Get(Bytes.toBytes(rowKey));
         
            getAction.setCacheBlocks(false);
         
            getAction
         
          }
         
         
         
          def getPutAction(rowKey: String, familyName:String, column: Array[String], value: Array[String]):Put = {
         
            val put: Put = new Put(Bytes.toBytes(rowKey));
         
            for (i <- 0 until(column.length)) {
         
              put.add(Bytes.toBytes(familyName), Bytes.toBytes(column(i)), Bytes.toBytes(value(i)));
         
            }
         
            put
         
          }
         
         
         
          def insertData(tableName:String, put: Put) = {
         
            val name = TableName.valueOf(tableName)
         
            val table = connection.getTable(name)
         
            table.put(put)
         
          }
         
         
         
          def addDataBatchEx(tableName:String, puts:java.util.List[Put]): Unit = {
         
            val name = TableName.valueOf(tableName)
         
            val table = connection.getTable(name)
         
            val listener = new ExceptionListener {
         
              override def onException
         
              (e: RetriesExhaustedWithDetailsException, bufferedMutator: BufferedMutator): Unit = {
         
                for(i <-0 until e.getNumExceptions){
         
                  LOGGER.info("写入put失败:" + e.getRow(i))
         
                }
         
              }
         
            }
         
            val params = new BufferedMutatorParams(name)
         
              .listener(listener)
         
              .writeBufferSize(4*1024*1024)
         
            try{
         
              val mutator = connection.getBufferedMutator(params)
         
              mutator.mutate(puts)
         
              mutator.close()
         
            }catch {
         
              case e:Throwable => e.printStackTrace()
         
            }
         
          }
         
        }


         
     https://blog.csdn.net/baymax_007/article/details/82191188 

  • 相关阅读:
    (转)EDM邮件制作规范完整版
    (转)Gmail,你必须了解的12个邮件编码问题
    说说CakePHP的关联模型之一 基本关联
    HTML5 离线应用程序
    CakePHP模型中使用join的多种写法
    判断浏览器
    Javascript闭包例子
    安装wamp后,其显示目录的图标显示不出来
    underscore.js 分析 第二天
    HTML5心得
  • 原文地址:https://www.cnblogs.com/felixzh/p/10251200.html
Copyright © 2011-2022 走看看