zoukankan      html  css  js  c++  java
  • HBase协处理器统计表数据量

    1.Java代码实现

    import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
    import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
    import org.apache.hadoop.hbase.coprocessor.AggregateImplementation;
    
    /**
    * <p>
    * 协处理器统计HBase表数据量
    * </p>
    * 
    */
    public class HBaseRecordsCounter {
    
    /**
    * HBase API添加协处理器
    * */
    public static void addCoprocessor(Configuration conf, String tableName) {
    try {
    

      byte[] tableNameBytes = Bytes.toBytes(tableName);
      HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
      HTableDescriptor htd = hbaseAdmin.getTableDescriptor(tableNameBytes);
      if (!htd.hasCoprocessor(AggregateImplementation.class.getName())) {
        hbaseAdmin.disableTable(tableNameBytes);
        htd.addCoprocessor(AggregateImplementation.class.getName());
        hbaseAdmin.modifyTable(tableNameBytes, htd);
        hbaseAdmin.enableTable(tableNameBytes);
      }

      hbaseAdmin.close();

    } catch (MasterNotRunningException e) {
    e.printStackTrace();
    } catch (ZooKeeperConnectionException e) {
    e.printStackTrace();
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    
    /**
    * 统计表数量
    * 
    */
    public static void exeCount(Configuration conf, String tableName, String family) {
    
    try {
      // 使用hbase提供的聚合coprocessor
      AggregationClient aggregationClient = new AggregationClient(conf);
      Scan scan = new Scan();
      // 指定扫描列族,唯一值
      scan.addFamily(Bytes.toBytes(family));
      long start = System.currentTimeMillis();
      long rowCount = aggregationClient.rowCount(TableName.valueOf(tableName), new LongColumnInterpreter(), scan);
      System.out
      .println("Row count: " + rowCount + "; time cost: " + (System.currentTimeMillis() - start) + "ms");
    } catch (Throwable e) {
      e.printStackTrace();
    }
    }
    
    public static void main(String[] args) {
    
      String tableName = "test";
      Configuration conf = new Configuration();
      conf.set("hbase.zookeeper.quorum", "host1,host2,host3");
      conf.set("hbase.rootdir", "hdfs://host:8020/hbase");
      // 提高RPC通信时长
      conf.setLong("hbase.rpc.timeout", 600000);
      // 设置Scan缓存
      conf.setLong("hbase.client.scanner.caching", 1000);
      addCoprocessor(conf, tableName);
      exeCount(conf, tableName, "info");
    
    }
    }

    2. 启用协处理器

    启用协处理器方法1.

    启动全局aggregation,能过操纵所有的表上的数据。通过修改hbase-site.xml这个文件来实现,只需要添加如下代码:

    <property>
       <name>hbase.coprocessor.user.region.classes</name>
       <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
     </property>

    启用协处理器方法2.

    hbase shell添加coprocessor:

    disable 'member'
    alter 'member',METHOD => 'table_att','coprocessor' => 'hdfs://master24:9000/user/hadoop/jars/test.jar|mycoprocessor.SampleCoprocessor|1001|'
    enable 'member'

    hbase shell 删除coprocessor:

    disable 'member'
    alter 'member',METHOD => 'table_att_unset',NAME =>'coprocessor$1'
    enable 'member'

  • 相关阅读:
    CSS3 盒模型
    前端Vue框架使用思路
    request+正则爬猫眼电影榜top100
    最大似然估计(Maximum likelihood estimation)
    pandas-resample按时间聚合
    Sklearn调参之sklearn.model_selection.GridSearchCV
    sklearn.metrics中的评估方法介绍(accuracy_score, recall_score, roc_curve, roc_auc_score, confusion_matrix,classification_report)
    【vivado】安装丢失组件和无法启动
    【电路】连接器选型
    【IP分析】合并信号concat,拆分总线slice
  • 原文地址:https://www.cnblogs.com/warmingsun/p/4916606.html
Copyright © 2011-2022 走看看