zoukankan      html  css  js  c++  java
  • 实时同步到Hbase的优化-1

    最近没有管测试环境的,上去看了下,好家伙,kafka羁留了上百万数据,于是打算把数据同步到测试的Hbase库中,在这期间发现了插入性能问题

    def putMapData(tableName: String , columnFamily:String, key:String  , mapData:Map[String , String]) = {

    val startTime = System.currentTimeMillis()

    val table: Table = Init(tableName , columnFamily)
    val endTime = System.currentTimeMillis()
    Logger.getLogger("处理事务").info(s"插入的时间:${(endTime - startTime)}")
    try{
    //TODO rowKeyWithMD5Prefix

    val rowkey = HbaseTools.rowKeyByMD5(key)

    val put: Put = new Put(rowkey)
    if(mapData.size > 0){
    for((k , v) <- mapData){
    put.addColumn(Bytes.toBytes(columnFamily) ,Bytes.toBytes(k.toString) , Bytes.toBytes(v.toString))
    }
    }
    table.put(put)

    }catch{
    case e:Exception => e.printStackTrace()
    }finally {
    table.close()
    }

    }
     /**
        * @return 构建表的连接
        * */
      def Init(tableName: String , columnFamily:String):Table = {
        val hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName))
        val hColumnDescriptor = new HColumnDescriptor(columnFamily)
        hTableDescriptor.addFamily(hColumnDescriptor)
        if(!admin.tableExists(TableName.valueOf(tableName))){
    //      admin.createTable(hTableDescriptor)
          createHTable(conn , tableName , 10 , Array(columnFamily))
    
    
        }
        conn.getTable(TableName.valueOf(tableName))
      }

    发现一条数据过来,会进行一次init,就是判断这个表是不是存在的,如果不存在那么创建表,但是这个过程要50~70ms时间,海量数据下来,处理是非常慢的

    也就是说:这块儿代码及其耗费时间

    val hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName))
        val hColumnDescriptor = new HColumnDescriptor(columnFamily)
        hTableDescriptor.addFamily(hColumnDescriptor)
        if(!admin.tableExists(TableName.valueOf(tableName))){
          createHTable(conn , tableName , 10 , Array(columnFamily))
    
    
        }

    所以,尽量在实时处理时候,不要走这些从程序;

    因此,将上面的代码提升带object的成员变量处:

    private val config: Configuration = HBaseConfiguration.create()
      config.set("hbase.zookeeper.quorum" , GlobalConfigUtils.hbaseQuorem)
      config.set("hbase.master" , GlobalConfigUtils.hbaseMaster)
      config.set("hbase.zookeeper.property.clientPort" , GlobalConfigUtils.clientPort)
      config.set("hbase.rpc.timeout" , GlobalConfigUtils.rpcTimeout)
      config.set("hbase.client.operator.timeout" , GlobalConfigUtils.operatorTimeout)
      config.set("hbase.client.scanner.timeout.period" , GlobalConfigUtils.scannTimeout)
      private val conn: Connection = ConnectionFactory.createConnection(config)
      private val admin: Admin = conn.getAdmin
      val atomic = new AtomicInteger(0)
      var resultAtomic = 0
      val hTableDescriptor = new HTableDescriptor(TableName.valueOf(GlobalConfigUtils.tableOrderInfo))
      val hColumnDescriptor = new HColumnDescriptor(GlobalConfigUtils.tableColumnFamily)
      hTableDescriptor.addFamily(hColumnDescriptor)
      if(!admin.tableExists(TableName.valueOf(GlobalConfigUtils.tableOrderInfo))){
        createHTable(conn , GlobalConfigUtils.tableOrderInfo , 10 , Array(GlobalConfigUtils.tableColumnFamily))
      }private val config: Configuration = HBaseConfiguration.create()
      config.set("hbase.zookeeper.quorum" , GlobalConfigUtils.hbaseQuorem)
      config.set("hbase.master" , GlobalConfigUtils.hbaseMaster)
      config.set("hbase.zookeeper.property.clientPort" , GlobalConfigUtils.clientPort)
      config.set("hbase.rpc.timeout" , GlobalConfigUtils.rpcTimeout)
      config.set("hbase.client.operator.timeout" , GlobalConfigUtils.operatorTimeout)
      config.set("hbase.client.scanner.timeout.period" , GlobalConfigUtils.scannTimeout)
      private val conn: Connection = ConnectionFactory.createConnection(config)
      private val admin: Admin = conn.getAdmin
      val atomic = new AtomicInteger(0)
      var resultAtomic = 0
      val hTableDescriptor = new HTableDescriptor(TableName.valueOf(GlobalConfigUtils.tableOrderInfo))
      val hColumnDescriptor = new HColumnDescriptor(GlobalConfigUtils.tableColumnFamily)
      hTableDescriptor.addFamily(hColumnDescriptor)
      if(!admin.tableExists(TableName.valueOf(GlobalConfigUtils.tableOrderInfo))){
        createHTable(conn , GlobalConfigUtils.tableOrderInfo , 10 , Array(GlobalConfigUtils.tableColumnFamily))
      }

    然后我在提交代码。发现:

    以前的2000条数据插入时间大概需要140000ms的时间

    现在处理的时间:

    速度提升了大概140000 /10 倍

  • 相关阅读:
    PHP基础学习笔记(一)
    安装wampserver之后,浏览器中输入localhost页面显示IIS7解决办法
    HTML5常识总结(一)
    AngularJs中的服务
    AngularJs中的directives(指令part1)
    Happy Number——LeetCode
    Binary Tree Zigzag Level Order Traversal——LeetCode
    Construct Binary Tree from Preorder and Inorder Traversal——LeetCode
    Construct Binary Tree from Inorder and Postorder Traversal——LeetCode
    Convert Sorted Array to Binary Search Tree——LeetCode
  • 原文地址:https://www.cnblogs.com/niutao/p/10785831.html
Copyright © 2011-2022 走看看