zoukankan      html  css  js  c++  java
  • 实时同步到Hbase的优化-1

    最近没有管测试环境的,上去看了下,好家伙,kafka羁留了上百万数据,于是打算把数据同步到测试的Hbase库中,在这期间发现了插入性能问题

    def putMapData(tableName: String , columnFamily:String, key:String  , mapData:Map[String , String]) = {

    val startTime = System.currentTimeMillis()

    val table: Table = Init(tableName , columnFamily)
    val endTime = System.currentTimeMillis()
    Logger.getLogger("处理事务").info(s"插入的时间:${(endTime - startTime)}")
    try{
    //TODO rowKeyWithMD5Prefix

    val rowkey = HbaseTools.rowKeyByMD5(key)

    val put: Put = new Put(rowkey)
    if(mapData.size > 0){
    for((k , v) <- mapData){
    put.addColumn(Bytes.toBytes(columnFamily) ,Bytes.toBytes(k.toString) , Bytes.toBytes(v.toString))
    }
    }
    table.put(put)

    }catch{
    case e:Exception => e.printStackTrace()
    }finally {
    table.close()
    }

    }
     /**
        * @return 构建表的连接
        * */
      def Init(tableName: String , columnFamily:String):Table = {
        val hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName))
        val hColumnDescriptor = new HColumnDescriptor(columnFamily)
        hTableDescriptor.addFamily(hColumnDescriptor)
        if(!admin.tableExists(TableName.valueOf(tableName))){
    //      admin.createTable(hTableDescriptor)
          createHTable(conn , tableName , 10 , Array(columnFamily))
    
    
        }
        conn.getTable(TableName.valueOf(tableName))
      }

    发现一条数据过来,会进行一次init,就是判断这个表是不是存在的,如果不存在那么创建表,但是这个过程要50~70ms时间,海量数据下来,处理是非常慢的

    也就是说:这块儿代码及其耗费时间

    val hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName))
        val hColumnDescriptor = new HColumnDescriptor(columnFamily)
        hTableDescriptor.addFamily(hColumnDescriptor)
        if(!admin.tableExists(TableName.valueOf(tableName))){
          createHTable(conn , tableName , 10 , Array(columnFamily))
    
    
        }

    所以,尽量在实时处理时候,不要走这些从程序;

    因此,将上面的代码提升带object的成员变量处:

    private val config: Configuration = HBaseConfiguration.create()
      config.set("hbase.zookeeper.quorum" , GlobalConfigUtils.hbaseQuorem)
      config.set("hbase.master" , GlobalConfigUtils.hbaseMaster)
      config.set("hbase.zookeeper.property.clientPort" , GlobalConfigUtils.clientPort)
      config.set("hbase.rpc.timeout" , GlobalConfigUtils.rpcTimeout)
      config.set("hbase.client.operator.timeout" , GlobalConfigUtils.operatorTimeout)
      config.set("hbase.client.scanner.timeout.period" , GlobalConfigUtils.scannTimeout)
      private val conn: Connection = ConnectionFactory.createConnection(config)
      private val admin: Admin = conn.getAdmin
      val atomic = new AtomicInteger(0)
      var resultAtomic = 0
      val hTableDescriptor = new HTableDescriptor(TableName.valueOf(GlobalConfigUtils.tableOrderInfo))
      val hColumnDescriptor = new HColumnDescriptor(GlobalConfigUtils.tableColumnFamily)
      hTableDescriptor.addFamily(hColumnDescriptor)
      if(!admin.tableExists(TableName.valueOf(GlobalConfigUtils.tableOrderInfo))){
        createHTable(conn , GlobalConfigUtils.tableOrderInfo , 10 , Array(GlobalConfigUtils.tableColumnFamily))
      }private val config: Configuration = HBaseConfiguration.create()
      config.set("hbase.zookeeper.quorum" , GlobalConfigUtils.hbaseQuorem)
      config.set("hbase.master" , GlobalConfigUtils.hbaseMaster)
      config.set("hbase.zookeeper.property.clientPort" , GlobalConfigUtils.clientPort)
      config.set("hbase.rpc.timeout" , GlobalConfigUtils.rpcTimeout)
      config.set("hbase.client.operator.timeout" , GlobalConfigUtils.operatorTimeout)
      config.set("hbase.client.scanner.timeout.period" , GlobalConfigUtils.scannTimeout)
      private val conn: Connection = ConnectionFactory.createConnection(config)
      private val admin: Admin = conn.getAdmin
      val atomic = new AtomicInteger(0)
      var resultAtomic = 0
      val hTableDescriptor = new HTableDescriptor(TableName.valueOf(GlobalConfigUtils.tableOrderInfo))
      val hColumnDescriptor = new HColumnDescriptor(GlobalConfigUtils.tableColumnFamily)
      hTableDescriptor.addFamily(hColumnDescriptor)
      if(!admin.tableExists(TableName.valueOf(GlobalConfigUtils.tableOrderInfo))){
        createHTable(conn , GlobalConfigUtils.tableOrderInfo , 10 , Array(GlobalConfigUtils.tableColumnFamily))
      }

    然后我在提交代码。发现:

    以前的2000条数据插入时间大概需要140000ms的时间

    现在处理的时间:

    速度提升了大概140000 /10 倍

  • 相关阅读:
    ViewFlow增强onItemClick功能及ViewFlow AbsListView源代码分析
    函数递归与迭代
    UIPanGestureRecognizer上下左右滑动方向推断算法
    Eclipse中的Web项目自己主动部署到Tomcat
    《从程序猿到项目经理》读后感-职业瓶颈
    安卓开发之简单的短信操作模块
    [Swift通天遁地]二、表格表单-(15)自定义表单文本框内容的格式
    [Swift通天遁地]二、表格表单-(14)实时调整表单元素的激活和失效
    [Swift通天遁地]二、表格表单-(13)实时调整表单元素的显示和隐藏
    [Swift通天遁地]二、表格表单-(12)设置表单文字对齐方式以及自适应高度的文本区域TextArea
  • 原文地址:https://www.cnblogs.com/niutao/p/10785831.html
Copyright © 2011-2022 走看看