zoukankan      html  css  js  c++  java
  • HBase基础知识(8):扫描操作之缓存与批量处理

    每一个next()调用都会为每行数据生成一个单独RPC请求,即使使用next(int nbRows)方法,也是如此,因为该方法仅仅是在客户端循环地调用next()方法。很显然,当单元格数据较小时,这样做的性能不会很好。因此,如果一次RPC请求可以获取多行数据,这样更会有意义。这样的方法可以由扫描器缓存实现,默认情况下,这个缓存是关闭的。
    可以在两个层面上打开它:在表的层面,这个表所有扫描实例的缓存都会生效;也可以在扫描层面,这样便只会影响当前的扫描实例。用户可以使用以下的HTable方法设置表级的扫描器缓存:

    void setScannerCaching(int scannerCaching)
    int getScannerCaching()
    

    用户可以修改整个HBase集群的默认值1。只要把下面的配置项添加到hbase-site.xml中即可:

    <property>
     <name>hbase.client.scanner.caching</name>
     <value>10</value>
    </property>

    这样所有的Scan实例的扫描器缓存大小就设置为10了。用户还可以从表或扫描两个层面覆盖默认配置,但是需要明确这样做的目的。
    setScannerCaching()可以设置缓存大小,getScannerCaching()可以返回当前缓存大小的值。每次用户调用getScanner(scan)之后,API都会把设定值配置到扫描实例中——除非用户使用了扫描层面的配置并覆盖了表层面的配置,扫描层面的配置优先级最高。可以使用下列Scan类方法设置扫描级的缓存:

    void setCaching(int caching)
    int getCaching()
    

    这两个方法的作用和表层面方法一样,能控制RPC调用取回的行数。两种next()方法都回受这些配置影响。

    用户需要为少量的RPC请求次数和客户端以及服务端的内存消耗找到平衡点。很多时候,将扫描器缓存设置的比较提高扫描的性能,不过设得太高就会产生不良影响:每次next()调用将会占用更长的时间,因为要获取更多的文件并传输到客户端,如果返回给客户端的数据超出了其堆的大小,程序就会终止并抛出OutOfMemoryException异常。
    当传输和处理数据的时间超过配置的扫描器租约超时时间时,用户将会收到一个ScanerTimeoutException形式抛出的租约过期错误。
    下边是使用扫描器时超时示例代码:

    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HConstants;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    
    public class HBaseResultScanner
    {
        public static void main(String[] args) throws IOException
        {
            Configuration conf = HBaseConfiguration.create();
            HTable table = new HTable(conf, "testtable");
            Scan scan = new Scan();
            ResultScanner scanner = table.getScanner(scan);
            int scannerTimeout = (int) conf.getLong(
                                     HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, -1);
            try
            {
                Thread.sleep(scannerTimeout + 5000);
    
            }
            catch (Exception e)
            {
                // TODO: handle exception
            }
            while (true)
            {
                try
                {
                    Result result = scanner.next();
                    if (result == null)
                        break;
                    System.out.println(result);
    
                }
                catch (Exception e)
                {
                    e.printStackTrace();
                    break;
                }
            }
            scanner.close();
    
        }
    
    }

    这段代码得到了当前配置的租约时间,休眠了比这个时间更长的时间,然后服务器端感知租约超时并触发租约恢复操作。

    用户可能会尝试向配置中添加如下信息:

    Configuration conf = HBaseConfiguration.create();
            conf.setLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, 12000);

    假如这个修改吧超时时间延长了,由于这个值是在客户端应用中配置的,不会被传递到远程region服务器,所以这样的修改是无效的。
    如果对于数据量非常大的行,这些行很有可能超过客户端进程的内存容量。HBase和它的客户端API对这个问题有一个解决方法:批量。用户可以使用以下方法控制获取批量操作:

    void setBatch(int batch)
    int getBatch()

    缓存是面向行一级的操作,而批量是面向列一级的操作。批量可以让用户选择每一次ResultScanner()实例的next()操作要取回多少列。
    **如果一行包括的列数超过了批量中设置的值,则可以将这一行分片,每次next操作返回一片。
    当一行的列数不能被批量中设置的值整除时,最后一次返回的Result实例会包含比较少的列,例如,如果有一行有17列,用户把batch的值设为5,则一共会返回4个result实例,这4个实例中包括的列数应当为5、5、5和2。**
    组合使用扫描器缓存和批量大小,可以让用户方便地控制扫描每一个范围内的行键时所需要的RPC调用次数。
    方法如下:

    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.log4j.Appender;
    import org.apache.log4j.AppenderSkeleton;
    import org.apache.log4j.Level;
    import org.apache.log4j.Logger;
    import org.apache.log4j.spi.LoggingEvent;
    
    public class HBaseResultScanner {
        private static void scan(int caching, int batch) throws IOException {
            Configuration conf = HBaseConfiguration.create();
            HTable table = new HTable(conf, "testtable");
            Logger log = Logger.getLogger("org.apache.hadoop");
            final int[] counters = { 0, 0 };
            Appender appender = new AppenderSkeleton() {
                @Override
                protected void append(LoggingEvent event) {
                    String msg = event.getMessage().toString();
                    if (msg != null && msg.contains("Call: next")) {
                        counters[0]++;
                    }
                }
    
                @Override
                public boolean requiresLayout() {
                    return false;
                }
    
                @Override
                public void close() {
                }
    
            };
            log.removeAllAppenders();
            log.setAdditivity(false);
            log.addAppender(appender);
            log.setLevel(Level.DEBUG);
            Scan scan = new Scan();
            scan.setCaching(caching);
            scan.setBatch(batch);
            ResultScanner scanner = table.getScanner(scan);
            for (Result result : scanner) {
                counters[1]++;
            }
            scanner.close();
            System.out.println("Caching: " + caching + ",Batch:" + batch
                    + ",Results:" + counters[1] + ",RPCs:" + counters[0]);
    
        }
    
        public static void main(String[] args) throws IOException {
            scan(1, 1);
            scan(200, 1);
            scan(2000, 100);
            scan(2, 100);
            scan(2, 10);
            scan(5, 100);
            scan(5, 20);
            scan(10, 10);
        }

    运行结果如下:

    Caching:1,Batch:1,Results:200,RPCs:201
    Caching:200,Batch:1,Results:200,RPCs:2
    Caching:2000,Batch:100,Results:10,RPCs:1
    Caching:2,Batch:100,Results:10,RPCs:6
    Caching:2,Batch:10,Results:20,RPCs:11
    Caching:5,Batch:100,Results:10,RPCs:3
    Caching:5,Batch:20,Results:10,RPCs:3
    Caching:10,Batch:10,Results:20,RPCs:3

    用户可以修改调整这两个参数来看它们对输出结果的影响。如下表所示

    缓存 批量处理 Result个数 RPC次数 说明
    1 1 200 201 每个列都作为一个Result实例返回。最后还多一个RPC确认扫描完成
    200 1 200 2 每个Result实例都只包含一列的值,不过它们都被一次RPC请求取回
    2 10 20 11 批量参数是一行所包含的列数的一半,所以200列除以10,需要20个result实例。同时需要10次RPC请求取回。
    5 100 10 3 对一行来讲,这个批量参数实在是太大了,所以一行的20列都被放入到了一个Result实例中。同时缓存为5,所以10个Result实例被两次RPC请求取回。
    5 20 10 3 同上,不过这次的批量值与一行列数正好相同,所以输出与上面一种情况相同
    10 10 20 3 这次把表分成了较小的result实例,但使用了较大的缓存值,所以也是只用了两次RPC请求就返回了数据

    要计算一次扫描操作的RPC请求的次数,用户需要先计算出行数和每行列数的乘积。然后用这个值除以批量大小和每行列数中较小的那个值。最后再用除得的结果除以扫描器缓存值。 用数学公式表示如下:

     RPC请求的次数=(行数x每行的列数)/
      Min(每行的列数,批量大小)/扫描器缓存

    此外,还需要一些请求来打开和关闭扫描器。用户或许需要把这两次请求也考虑在内。
    下图展示了缓存和批量两个参数如何联动。


    扫描器缓存和批量两个参数控制RPC的次数

    小的批量值使服务器端把3个列装入一个Result实例,同时扫描器缓存为6,使每个RPC请求传输6行,即6个被批量封装的Result实例。如果没有指定批量大小,而是指定了扫描器缓存,那么一个调用结果就能包含所有的行,因为每一行都包含在一个Result实例中。只有当用户使用批量模式后,行内(intra-row)扫描功能才会启用。

  • 相关阅读:
    机器学习中数据缺失的处理及建模方法
    小样本学习(Few-Shot Learning)
    常见文本相似度计算法
    【FPGA ZYNQ Ultrascale+ MPSOC教程】33.BRAM实现PS与PL交互
    【紫光同创国产FPGA教程】【第十章】DDR3读写测试实验
    【紫光同创国产FPGA教程】【第九章】HDMI编程测试实验
    【紫光同创国产FPGA教程】【第八章】SD卡读写实验
    【紫光同创国产FPGA教程】【第七章】I2C接口EEPROM实验
    【紫光同创国产FPGA教程】【第六章】PDS下按键消抖实验
    【紫光同创国产FPGA教程】【第五章】串口收发实验
  • 原文地址:https://www.cnblogs.com/ainima/p/6331833.html
Copyright © 2011-2022 走看看