zoukankan      html  css  js  c++  java
  • hbase 聚合操作

    hbase本身提供了 聚合方法可以服务端聚合操作

    hbase中的CoprocessorProtocol机制. 

    CoprocessorProtocol的原理比较简单,近似于一个mapreduce框架。由client将scan分解为面向多个region的请求,并行发送请求到多个region,然后client做一个reduce的操作,得到最后的结果。 


    先看一个例子,使用hbase的AggregationClient可以做到简单的面向单个column的统计。 

    Java代码  收藏代码
    1. @Test  
    2. public void testAggregationClient() throws Throwable {  
    3.   
    4.     LongColumnInterpreter columnInterpreter = new LongColumnInterpreter();  
    5.   
    6.     AggregationClient aggregationClient = new AggregationClient(  
    7.             CommonConfig.getConfiguration());  
    8.     Scan scan = new Scan();  
    9.   
    10.     scan.addColumn(ColumnFamilyName, QName1);  
    11.   
    12.     Long max = aggregationClient.max(TableNameBytes, columnInterpreter,  
    13.             scan);  
    14.     Assert.assertTrue(max.longValue() == 100);  
    15.   
    16.     Long min = aggregationClient.min(TableNameBytes, columnInterpreter,  
    17.             scan);  
    18.     Assert.assertTrue(min.longValue() == 20);  
    19.   
    20.     Long sum = aggregationClient.sum(TableNameBytes, columnInterpreter,  
    21.             scan);  
    22.     Assert.assertTrue(sum.longValue() == 120);  
    23.   
    24.     Long count = aggregationClient.rowCount(TableNameBytes,  
    25.             columnInterpreter, scan);  
    26.     Assert.assertTrue(count.longValue() == 4);  
    27.   
    28. }  



    看下hbase的源码。AggregateImplementation 

    Java代码  收藏代码
    1. @Override  
    2.   public <T, S> T getMax(ColumnInterpreter<T, S> ci, Scan scan)  
    3.       throws IOException {  
    4.     T temp;  
    5.     T max = null;  
    6.     InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())  
    7.         .getRegion().getScanner(scan);  
    8.     List<KeyValue> results = new ArrayList<KeyValue>();  
    9.     byte[] colFamily = scan.getFamilies()[0];  
    10.     byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();  
    11.     // qualifier can be null.  
    12.     try {  
    13.       boolean hasMoreRows = false;  
    14.       do {  
    15.         hasMoreRows = scanner.next(results);  
    16.         for (KeyValue kv : results) {  
    17.           temp = ci.getValue(colFamily, qualifier, kv);  
    18.           max = (max == null || (temp != null && ci.compare(temp, max) > 0)) ? temp : max;  
    19.         }  
    20.         results.clear();  
    21.       } while (hasMoreRows);  
    22.     } finally {  
    23.       scanner.close();  
    24.     }  
    25.     log.info("Maximum from this region is "  
    26.         + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()  
    27.             .getRegionNameAsString() + ": " + max);  
    28.     return max;  
    29.   }  


    这里由于 

    Java代码  收藏代码
    1. byte[] colFamily = scan.getFamilies()[0];  
    2. byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();  


    所以,hbase自带的Aggregate函数,只能面向单列进行统计。 

    当我们想对多列进行Aggregate,并同时进行countRow时,有以下选择。 
    1 scan出所有的row,程序自己进行Aggregate和count。 
    2 使用AggregationClient,调用多次,得到所有的结果。由于多次调用,有一致性问题。 
    3 自己扩展CoprocessorProtocol。 

    这个是github的hbase集成插件

    这个功能集成到simplehbase里面了。
    https://github.com/zhang-xzhi/simplehbase

  • 相关阅读:
    匿存函数,内存函数,递归函数,二分法查找
    内置函数
    生成器函数,推导式,生成器表达式
    函数名的应用,闭包,迭代器
    动态参数,作用域
    函数,返回值,参数
    文件操作
    什么是协程
    MYSQL允许远程访问
    phpstorm+xdebug搭建
  • 原文地址:https://www.cnblogs.com/yaohaitao/p/6789113.html
Copyright © 2011-2022 走看看