zoukankan      html  css  js  c++  java
  • 实时同步到Hbase的优化-1

    最近没有管测试环境的,上去看了下,好家伙,kafka羁留了上百万数据,于是打算把数据同步到测试的Hbase库中,在这期间发现了插入性能问题

    def putMapData(tableName: String , columnFamily:String, key:String  , mapData:Map[String , String]) = {

    val startTime = System.currentTimeMillis()

    val table: Table = Init(tableName , columnFamily)
    val endTime = System.currentTimeMillis()
    Logger.getLogger("处理事务").info(s"插入的时间:${(endTime - startTime)}")
    try{
    //TODO rowKeyWithMD5Prefix

    val rowkey = HbaseTools.rowKeyByMD5(key)

    val put: Put = new Put(rowkey)
    if(mapData.size > 0){
    for((k , v) <- mapData){
    put.addColumn(Bytes.toBytes(columnFamily) ,Bytes.toBytes(k.toString) , Bytes.toBytes(v.toString))
    }
    }
    table.put(put)

    }catch{
    case e:Exception => e.printStackTrace()
    }finally {
    table.close()
    }

    }
     /**
        * @return 构建表的连接
        * */
      def Init(tableName: String , columnFamily:String):Table = {
        val hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName))
        val hColumnDescriptor = new HColumnDescriptor(columnFamily)
        hTableDescriptor.addFamily(hColumnDescriptor)
        if(!admin.tableExists(TableName.valueOf(tableName))){
    //      admin.createTable(hTableDescriptor)
          createHTable(conn , tableName , 10 , Array(columnFamily))
    
    
        }
        conn.getTable(TableName.valueOf(tableName))
      }

    发现一条数据过来,会进行一次init,就是判断这个表是不是存在的,如果不存在那么创建表,但是这个过程要50~70ms时间,海量数据下来,处理是非常慢的

    也就是说:这块儿代码及其耗费时间

    val hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName))
        val hColumnDescriptor = new HColumnDescriptor(columnFamily)
        hTableDescriptor.addFamily(hColumnDescriptor)
        if(!admin.tableExists(TableName.valueOf(tableName))){
          createHTable(conn , tableName , 10 , Array(columnFamily))
    
    
        }

    所以,尽量在实时处理时候,不要走这些从程序;

    因此,将上面的代码提升带object的成员变量处:

    private val config: Configuration = HBaseConfiguration.create()
      config.set("hbase.zookeeper.quorum" , GlobalConfigUtils.hbaseQuorem)
      config.set("hbase.master" , GlobalConfigUtils.hbaseMaster)
      config.set("hbase.zookeeper.property.clientPort" , GlobalConfigUtils.clientPort)
      config.set("hbase.rpc.timeout" , GlobalConfigUtils.rpcTimeout)
      config.set("hbase.client.operator.timeout" , GlobalConfigUtils.operatorTimeout)
      config.set("hbase.client.scanner.timeout.period" , GlobalConfigUtils.scannTimeout)
      private val conn: Connection = ConnectionFactory.createConnection(config)
      private val admin: Admin = conn.getAdmin
      val atomic = new AtomicInteger(0)
      var resultAtomic = 0
      val hTableDescriptor = new HTableDescriptor(TableName.valueOf(GlobalConfigUtils.tableOrderInfo))
      val hColumnDescriptor = new HColumnDescriptor(GlobalConfigUtils.tableColumnFamily)
      hTableDescriptor.addFamily(hColumnDescriptor)
      if(!admin.tableExists(TableName.valueOf(GlobalConfigUtils.tableOrderInfo))){
        createHTable(conn , GlobalConfigUtils.tableOrderInfo , 10 , Array(GlobalConfigUtils.tableColumnFamily))
      }private val config: Configuration = HBaseConfiguration.create()
      config.set("hbase.zookeeper.quorum" , GlobalConfigUtils.hbaseQuorem)
      config.set("hbase.master" , GlobalConfigUtils.hbaseMaster)
      config.set("hbase.zookeeper.property.clientPort" , GlobalConfigUtils.clientPort)
      config.set("hbase.rpc.timeout" , GlobalConfigUtils.rpcTimeout)
      config.set("hbase.client.operator.timeout" , GlobalConfigUtils.operatorTimeout)
      config.set("hbase.client.scanner.timeout.period" , GlobalConfigUtils.scannTimeout)
      private val conn: Connection = ConnectionFactory.createConnection(config)
      private val admin: Admin = conn.getAdmin
      val atomic = new AtomicInteger(0)
      var resultAtomic = 0
      val hTableDescriptor = new HTableDescriptor(TableName.valueOf(GlobalConfigUtils.tableOrderInfo))
      val hColumnDescriptor = new HColumnDescriptor(GlobalConfigUtils.tableColumnFamily)
      hTableDescriptor.addFamily(hColumnDescriptor)
      if(!admin.tableExists(TableName.valueOf(GlobalConfigUtils.tableOrderInfo))){
        createHTable(conn , GlobalConfigUtils.tableOrderInfo , 10 , Array(GlobalConfigUtils.tableColumnFamily))
      }

    然后我在提交代码。发现:

    以前的2000条数据插入时间大概需要140000ms的时间

    现在处理的时间:

    速度提升了大概140000 /10 倍

  • 相关阅读:
    414. Third Maximum Number 第三大的数字
    java 正则表达式
    将含有逻辑运算符的字符串解析为逻辑符号
    ora-01830:日期格式图片在转换整个输入字符串之前结束
    mysql的字符拼接
    oracle执行计划详解
    oracle获取执行计划及优缺点 详解
    kmp算法中的nextval实例解释
    kmp算法中的next数组实例解释
    哈夫曼实例解释(哈夫曼编码)
  • 原文地址:https://www.cnblogs.com/niutao/p/10785831.html
Copyright © 2011-2022 走看看