zoukankan      html  css  js  c++  java
  • Hbase 笔记(3) 客户端API基础

    1、概述

    (1)、HBase 所有修改数据的操作保证行级别的原子性。

    (2)、用户应该尽量使用批处理(batch)更新,减少单独操作同一行数据的次数。

    (3)、创建表有代价,所以只创建一个HTable,一般在应用程序开始时创建

    (4)、使用HTablePool,复用多个实例,例子

            int maxSize = 20;
            HTablePool pool = new HTablePool(conf,maxSize);
            
            HTableInterface tableInterface = pool.getTable(strTableName);
            Put put = new Put(Bytes.toBytes("row-11"));
            put.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-a"));
            put.add(Bytes.toBytes("cf2"), Bytes.toBytes("b"), Bytes.toBytes("value-b"));
            tableInterface.put(put);

    2、PUT操作

    (1) .  多个版本

    查看多版本:scan  'testtable',  { VERSIONS => 3 }

    (2). KeyValue 类,元数据内容

    row-key / family:qualifier / version / type / value-length

    (3). 客户端的写缓冲区

    每一个put操作都是一个RPC操作,启用写缓冲区可将多个put一次RPC送往服务器:

    table.setAutoFlush(false);

    若需要将缓冲区数据强制写入服务端:

    table.flushCommits()

    设置写缓冲区大小

    table.setWriteBufferSize(long writeBufferSize)  // 默认为 2 M

    也可以设置hbase-site.xml 中的 hbase.client.write.buffer

    (4). 错误处理

     如果Put 一个列表,里面有一个错误的(使用了不存在的列族);

            Put put1 = new Put(Bytes.toBytes("11111"), System.currentTimeMillis());
            put1.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x11")); 
            
            Put put2 = new Put(Bytes.toBytes("22222"), System.currentTimeMillis()); 
            put2.add(Bytes.toBytes("noExist"), Bytes.toBytes("a"), Bytes.toBytes("value-y22"));
            
            Put put3 = new Put(Bytes.toBytes("33333"), System.currentTimeMillis());
            put3.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x33")); 
            
            list.add(put1);
            list.add(put2);
            list.add(put3);
    
            tableInterface.put(list);

    抛出以下异常:

    org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException: Column family noExist does not exist in region testtable

    但是其余两个Put成功

    hbase(main):005:0> scan 'testtable'
    ROW                   COLUMN+CELL                                                 
     11111                column=cf1:a, timestamp=1416194410866, value=value-x11      
     33333                column=cf1:a, timestamp=1416194410866, value=value-x33 


     如果Put 一个列表,里面有一个为空  ;

            Put put1 = new Put(Bytes.toBytes("11111"), System.currentTimeMillis());
            put1.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x11"));
    
            Put put2 = new Put(Bytes.toBytes("22222"), System.currentTimeMillis());
            put2.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-y22"));
    
            Put put3 = new Put(Bytes.toBytes("33333"), System.currentTimeMillis());
            put3.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x33"));
    
            Put put4 = new Put(Bytes.toBytes("44444"), System.currentTimeMillis());
    
            list.add(put1);
            list.add(put2);
            list.add(put3);
            list.add(put4);
    
            tableInterface.put(list);

    将抛出异常:Exception in thread "main" java.lang.IllegalArgumentException: No columns to insert

    因为这个错误由客户端检查发现,因此将没有内容写入到数据库。

    (5). Put 写入List, 不保证写入的顺序。

    (6). 原子操作

    Put put1 = new Put(Bytes.toBytes("11111"), System.currentTimeMillis());
    put1.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x11"));
    tableInterface.checkAndPut(Bytes.toBytes("11111"), 
                   Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x33"), put1);
    
    

    3. Get 操作

    只获取一个

            Get get = new Get(Bytes.toBytes("row-11"));
            get.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
            get.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b"));
    
            Result result = table.get(get);
            
            byte[] value1 = result.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
            byte[] value2 = result.getValue(Bytes.toBytes("cf2"), Bytes.toBytes("b"));
            System.out.println("cf1:a=" + Bytes.toString(value1));
            System.out.println("cf2:b=" + Bytes.toString(value2));


    获取多个

            Get get1 = new Get(Bytes.toBytes("row-11"));
            get1.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
            get1.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b"));
    
            Get get2 = new Get(Bytes.toBytes("row-22"));
            get2.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
            get2.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b"));
    
            List<Get> listGet = new ArrayList<Get>();
            listGet.add(get1);
            listGet.add(get2);
            
            Result[] resutls = table.get(listGet);
            
            for (Result result : resutls) {
                byte[] row = result.getRow();
                byte[] value1 = result.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
                byte[] value2 = result.getValue(Bytes.toBytes("cf2"), Bytes.toBytes("b"));
                System.out.println("rowkey="  + Bytes.toString(row)+", cf1:a=" + Bytes.toString(value1));
                System.out.println("rowkey=" +  Bytes.toString(row)+", cf2:b=" + Bytes.toString(value2));
            }


    如果  table.get(listGet) 操作中, listGet 之中有一个错误,则整个操作终止并抛出异常。


    查找某行或者某行之前的一行:

            Result rowOrBefore = table.getRowOrBefore(Bytes.toBytes("row-22"), Bytes.toBytes("cf1"));
    


    4、DELETE 操作

    同样也可以delete某1行或者多行,删除特定版本或者多个版本,整个列族或者某个列

    也有原子操作: 

            Delete delete = new Delete(Bytes.toBytes("row-11"));
            table.checkAndDelete(Bytes.toBytes("row-11"),Bytes.toBytes("cf1"), 
                    Bytes.toBytes("a"), Bytes.toBytes("value-x33"), delete);

    5、批量处理 Batch

    可以同时进行Put、Get、Delete 操作

            List<Row> list = new ArrayList<Row>();
            Put put1 = new Put(Bytes.toBytes("11111"), System.currentTimeMillis());
            put1.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x11"));
            put1.add(Bytes.toBytes("cf2"), Bytes.toBytes("b"), Bytes.toBytes("value-y11"));
            Get get1 = new Get(Bytes.toBytes("11111"));
            get1.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));    
            Get get2 = new Get(Bytes.toBytes("33333"));
            get1.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));    
            list.add(put1);
            list.add(get1); 
            list.add(get2); 
    
            Object[] results = new Object[list.size()];
            try {
                tableInterface.batch(list, results);
            } catch (InterruptedException ex) {
                Logger.getLogger(BatchTest.class.getName()).log(Level.SEVERE, null, ex);
            }
            for(Object result:results)
            {
                System.out.println(result);
            }

    输出:

    keyvalues=NONE
    keyvalues={11111/cf1:a/1416199179786/Put/vlen=9/mvcc=0}
    keyvalues={33333/cf1:a/1416194410866/Put/vlen=9/mvcc=0}

    Object[]  results = new Object[list.size()];
    tableInterface.batch(list, results);                      // 如果出错,此方法可以访问部分结果        
    Object[]  results2 = tableInterface.batch(list);   // 如果出错,此方法不会有任何结果


    6、行锁

    设置 hbase.regionserver.lease.period 可修改锁超时时间。

    在行上创建一个锁,该锁阻塞索引的并发读取。


    7、扫描scan

    设置 hbase.regionserver.lease.period 可修改扫描器超时时间。

    Scan 例子: 

            Scan scan = new Scan(); 
            scan.setStartRow(Bytes.toBytes("11111"));
            scan.setStopRow(Bytes.toBytes("333333"));
            scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
            scan.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b"));
            ResultScanner scanner = htable.getScanner(scan);        
            for(Result result : scanner)
            {
                 System.out.println(result);
            }
            scanner.close();


    以上的Scan方法,获取每一个Result 都会生成一个单独的RPC 请求,性能将不会很高。

    可用两种方法打开扫描器缓存:

    (1). 修改 hbase.client.scanner.caching

    (2).  代码中设置 

            Scan scan = new Scan();
            scan.setCaching(10);
            scan.setStartRow(Bytes.toBytes("11111"));
            scan.setStopRow(Bytes.toBytes("333333"));
            scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
            scan.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b"));
            ResultScanner scanner = htable.getScanner(scan);        
            for(Result result : scanner)
            {
                 System.out.println(result);
            }
            scanner.close();

    注意:当数据量非常大的行,这些行有可能超过客户算进程的内存容量,可以如下控制批量获取:

            Scan scan = new Scan();
            scan.setCaching(10);
            scan.setBatch(5);
            scan.setStartRow(Bytes.toBytes("11111"));
            scan.setStopRow(Bytes.toBytes("333333"));
            scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
            scan.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b"));
            ResultScanner scanner = htable.getScanner(scan);        
            for(Result result : scanner)
            {
                 System.out.println(result);
            }
            scanner.close();
     


    setCaching(10): 每次服务器端向客户端传输 10 行 Result 

    setBatch(5):每次返回的 Result  包含5 个列


  • 相关阅读:
    cmd 进入不同的驱动盘及上下级目录
    子网/ip/子网掩码
    虚拟化技术与"云"
    OSI 7层模型
    第一天的CI笔记
    在本地Apache服务器配置虚拟主机站点
    phpmailer 发送邮件
    mysql 中执行的 sql 注意字段之间的反向引号和单引号
    手动部署LNMT(Nginx+Tomcat)并实现Nginx反向代理Tomcat
    JS 创建元素的三种方法
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276352.html
Copyright © 2011-2022 走看看