zoukankan      html  css  js  c++  java
  • 实时同步到Hbase的优化-1

    最近没有管测试环境的,上去看了下,好家伙,kafka羁留了上百万数据,于是打算把数据同步到测试的Hbase库中,在这期间发现了插入性能问题

    def putMapData(tableName: String , columnFamily:String, key:String  , mapData:Map[String , String]) = {

    val startTime = System.currentTimeMillis()

    val table: Table = Init(tableName , columnFamily)
    val endTime = System.currentTimeMillis()
    Logger.getLogger("处理事务").info(s"插入的时间:${(endTime - startTime)}")
    try{
    //TODO rowKeyWithMD5Prefix

    val rowkey = HbaseTools.rowKeyByMD5(key)

    val put: Put = new Put(rowkey)
    if(mapData.size > 0){
    for((k , v) <- mapData){
    put.addColumn(Bytes.toBytes(columnFamily) ,Bytes.toBytes(k.toString) , Bytes.toBytes(v.toString))
    }
    }
    table.put(put)

    }catch{
    case e:Exception => e.printStackTrace()
    }finally {
    table.close()
    }

    }
     /**
        * @return 构建表的连接
        * */
      def Init(tableName: String , columnFamily:String):Table = {
        val hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName))
        val hColumnDescriptor = new HColumnDescriptor(columnFamily)
        hTableDescriptor.addFamily(hColumnDescriptor)
        if(!admin.tableExists(TableName.valueOf(tableName))){
    //      admin.createTable(hTableDescriptor)
          createHTable(conn , tableName , 10 , Array(columnFamily))
    
    
        }
        conn.getTable(TableName.valueOf(tableName))
      }

    发现一条数据过来,会进行一次init,就是判断这个表是不是存在的,如果不存在那么创建表,但是这个过程要50~70ms时间,海量数据下来,处理是非常慢的

    也就是说:这块儿代码及其耗费时间

    val hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName))
        val hColumnDescriptor = new HColumnDescriptor(columnFamily)
        hTableDescriptor.addFamily(hColumnDescriptor)
        if(!admin.tableExists(TableName.valueOf(tableName))){
          createHTable(conn , tableName , 10 , Array(columnFamily))
    
    
        }

    所以,尽量在实时处理时候,不要走这些从程序;

    因此,将上面的代码提升带object的成员变量处:

    private val config: Configuration = HBaseConfiguration.create()
      config.set("hbase.zookeeper.quorum" , GlobalConfigUtils.hbaseQuorem)
      config.set("hbase.master" , GlobalConfigUtils.hbaseMaster)
      config.set("hbase.zookeeper.property.clientPort" , GlobalConfigUtils.clientPort)
      config.set("hbase.rpc.timeout" , GlobalConfigUtils.rpcTimeout)
      config.set("hbase.client.operator.timeout" , GlobalConfigUtils.operatorTimeout)
      config.set("hbase.client.scanner.timeout.period" , GlobalConfigUtils.scannTimeout)
      private val conn: Connection = ConnectionFactory.createConnection(config)
      private val admin: Admin = conn.getAdmin
      val atomic = new AtomicInteger(0)
      var resultAtomic = 0
      val hTableDescriptor = new HTableDescriptor(TableName.valueOf(GlobalConfigUtils.tableOrderInfo))
      val hColumnDescriptor = new HColumnDescriptor(GlobalConfigUtils.tableColumnFamily)
      hTableDescriptor.addFamily(hColumnDescriptor)
      if(!admin.tableExists(TableName.valueOf(GlobalConfigUtils.tableOrderInfo))){
        createHTable(conn , GlobalConfigUtils.tableOrderInfo , 10 , Array(GlobalConfigUtils.tableColumnFamily))
      }private val config: Configuration = HBaseConfiguration.create()
      config.set("hbase.zookeeper.quorum" , GlobalConfigUtils.hbaseQuorem)
      config.set("hbase.master" , GlobalConfigUtils.hbaseMaster)
      config.set("hbase.zookeeper.property.clientPort" , GlobalConfigUtils.clientPort)
      config.set("hbase.rpc.timeout" , GlobalConfigUtils.rpcTimeout)
      config.set("hbase.client.operator.timeout" , GlobalConfigUtils.operatorTimeout)
      config.set("hbase.client.scanner.timeout.period" , GlobalConfigUtils.scannTimeout)
      private val conn: Connection = ConnectionFactory.createConnection(config)
      private val admin: Admin = conn.getAdmin
      val atomic = new AtomicInteger(0)
      var resultAtomic = 0
      val hTableDescriptor = new HTableDescriptor(TableName.valueOf(GlobalConfigUtils.tableOrderInfo))
      val hColumnDescriptor = new HColumnDescriptor(GlobalConfigUtils.tableColumnFamily)
      hTableDescriptor.addFamily(hColumnDescriptor)
      if(!admin.tableExists(TableName.valueOf(GlobalConfigUtils.tableOrderInfo))){
        createHTable(conn , GlobalConfigUtils.tableOrderInfo , 10 , Array(GlobalConfigUtils.tableColumnFamily))
      }

    然后我在提交代码。发现:

    以前的2000条数据插入时间大概需要140000ms的时间

    现在处理的时间:

    速度提升了大概140000 /10 倍

  • 相关阅读:
    从.Net到Java学习第十篇——Spring Boot文件上传和下载
    Access denied for user 'root'@'localhost' (using password:YES) Mysql5.7
    从.Net到Java学习第八篇——SpringBoot实现session共享和国际化
    从.Net到Java学习第九篇——SpringBoot下Thymeleaf
    从.Net到Java学习第七篇——SpringBoot Redis 缓存穿透
    从.Net到Java学习第六篇——SpringBoot+mongodb&Thymeleaf&模型验证
    从.Net到Java学习第五篇——Spring Boot &&Profile &&Swagger2
    从.Net到Java学习第四篇——spring boot+redis
    从.Net到Java学习第三篇——spring boot+mybatis+mysql
    从.Net到Java学习第一篇——开篇
  • 原文地址:https://www.cnblogs.com/niutao/p/10785831.html
Copyright © 2011-2022 走看看