zoukankan      html  css  js  c++  java
  • hbase 基本的JavaApi 数据操作及数据过滤(filter)

    本文主要是hbase的表操作、数据操作、数据查询过滤等,如果对JDBC或ADO有了解,容易理解HBASE API。

    hbase版本是2.0。

    1、为了方便先贴helper的部分代码(文末git上有完整的测试代码),主要是为了复用Connection。

    public class HBaseHelper implements Closeable {
    
        private Configuration configuration = null;
        private Connection connection = null;
        private Admin admin = null;
    
        private HBaseHelper(Configuration configuration) throws IOException {
            this.configuration = configuration;
            this.connection = ConnectionFactory.createConnection(this.configuration);
            admin = this.connection.getAdmin();
        }
    
        public static HBaseHelper getHBaseHelper(Configuration configuration) throws IOException {
            return new HBaseHelper(configuration);
        }
    
        @Override
        public void close() throws IOException {
            admin.close();
            connection.close();
        }
    
        public Connection getConnection() {
            return connection;
        }
    
        public Configuration getConfiguration() {
            return configuration;
        }
    ... ...

    初始化,用来初始化hbase配置,连接hbase,获取本文中的hbase辅助操作类HbaseHelper。

    //初始化
        private void setUp() throws IOException{
            conf = HBaseConfiguration.create();
            conf.set("hbase.master","192.168.31.10");
            //The port the HBase Master should bind to.
    //        conf.set("hbase.master.port","16000");
    
            //The port for the HBase Master web UI. Set to -1 if you do not want a UI instance run.
    //        conf.set("hbase.master.info.port","16010");
    
            //The port the HBase RegionServer binds to.
    //        conf.set("hbase.regionserver.port","16020");
    
            //The port for the HBase RegionServer web UI Set to -1 if you do not want the RegionServer UI to run.
    //        conf.set("hbase.regionserver.info.port","16030");
    
            conf.set("hbase.zookeeper.quorum","192.168.31.10");
    
            //Property from ZooKeeper’s config zoo.cfg. The port at which the clients will connect.
            // HBase数据库使用的端口
            //conf.set("hbase.zookeeper.property.clientPort", "2181");
    
            //单机
            conf.set("hbase.rootdir","file:///opt/hbase_data");
            conf.set("hbase.zookeeper.property.dataDir","/opt/hbase_data/zookeeper");
    
            helper = HBaseHelper.getHBaseHelper(conf);
        }

    2、命名空间、表创建、删除、exist等

    public void createNamespace(String namespace) {
            try {
                NamespaceDescriptor nd = NamespaceDescriptor.create(namespace).build();
                admin.createNamespace(nd);
            } catch (Exception e) {
                System.err.println("Error: " + e.getMessage());
            }
        }
    
        public void dropNamespace(String namespace, boolean force) {
            try {
                if (force) {
                    TableName[] tableNames = admin.listTableNamesByNamespace(namespace);
                    for (TableName name : tableNames) {
                        admin.disableTable(name);
                        admin.deleteTable(name);
                    }
                }
            } catch (Exception e) {
                // ignore
            }
            try {
                admin.deleteNamespace(namespace);
            } catch (IOException e) {
                System.err.println("Error: " + e.getMessage());
            }
        }
    
        public boolean existsTable(String table)
                throws IOException {
            return existsTable(TableName.valueOf(table));
        }
    
        public boolean existsTable(TableName table)
                throws IOException {
            return admin.tableExists(table);
        }
    
        public void createTable(String table, String... colfams)
                throws IOException {
            createTable(TableName.valueOf(table), 1, null, colfams);
        }
    
        public void createTable(TableName table, String... colfams)
                throws IOException {
            createTable(table, 1, null, colfams);
        }
    
        public void createTable(String table, int maxVersions, String... colfams)
                throws IOException {
            createTable(TableName.valueOf(table), maxVersions, null, colfams);
        }
    
        public void createTable(TableName table, int maxVersions, String... colfams)
                throws IOException {
            createTable(table, maxVersions, null, colfams);
        }
    
        public void createTable(String table, byte[][] splitKeys, String... colfams)
                throws IOException {
            createTable(TableName.valueOf(table), 1, splitKeys, colfams);
        }
    
        public void createTable(TableName table, int maxVersions, byte[][] splitKeys,
                                String... colfams)
                throws IOException {
            //表描述器构造器
            TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(table);
    
            //列族描述构造器
            ColumnFamilyDescriptorBuilder cfDescBuilder;
    
            //列族描述器
            ColumnFamilyDescriptor cfDesc;
    
    
            for (String cf : colfams) {
                cfDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf));
                cfDescBuilder.setMaxVersions(maxVersions);
                cfDesc = cfDescBuilder.build();
                tableDescriptorBuilder.setColumnFamily(cfDesc);
            }
            //获得表描述器
            TableDescriptor tableDescriptor = tableDescriptorBuilder.build();
    
            if (splitKeys != null) {
                admin.createTable(tableDescriptor, splitKeys);
            } else {
                admin.createTable(tableDescriptor);
            }
        }
    
    //禁用表
    public void disableTable(String table) throws IOException { disableTable(TableName.valueOf(table)); } public void disableTable(TableName table) throws IOException { admin.disableTable(table); } public void dropTable(String table) throws IOException { dropTable(TableName.valueOf(table)); }
    //删除前,先禁用表
    public void dropTable(TableName table) throws IOException { if (existsTable(table)) { if (admin.isTableEnabled(table)) disableTable(table); admin.deleteTable(table); } }

    样例:

    //插入testtable表数据
        private void initTestTable() throws IOException{
            String tableNameString = "testtable";
            if(helper.existsTable(tableNameString)){
                helper.disableTable(tableNameString);
                helper.dropTable(tableNameString);
            }
    
            helper.createTable(tableNameString,"info","ex","memo");
            helper.put(tableNameString,"row1","info","username","admin");
            helper.put(tableNameString,"row1","ex","addr","北京大道");
            helper.put(tableNameString,"row1","memo","detail","超级用户,地址:北京大道");
    
    
            helper.put(tableNameString,"row2","info","username","guest");
            helper.put(tableNameString,"row2","ex","addr","全国各地");
            helper.put(tableNameString,"row2","memo","detail","游客,地址:全国到处都是");
    
            helper.close();
        }

    2、插入(或是更新)数据

        public void put(String table, String row, String fam, String qual,
                        String val) throws IOException {
            put(TableName.valueOf(table), row, fam, qual, val);
        }
    
        //插入或更新单行
        public void put(TableName table, String row, String fam, String qual,
                        String val) throws IOException {
            Table tbl = connection.getTable(table);
            Put put = new Put(Bytes.toBytes(row));
            put.addColumn(Bytes.toBytes(fam), Bytes.toBytes(qual), Bytes.toBytes(val));
            tbl.put(put);
            tbl.close();
        }
    
        public void put(String table, String row, String fam, String qual, long ts,
                        String val) throws IOException {
            put(TableName.valueOf(table), row, fam, qual, ts, val);
        }
    
        //带时间戳插入或更新单行
        public void put(TableName table, String row, String fam, String qual, long ts,
                        String val) throws IOException {
            Table tbl = connection.getTable(table);
            Put put = new Put(Bytes.toBytes(row));
            put.addColumn(Bytes.toBytes(fam), Bytes.toBytes(qual), ts,
                    Bytes.toBytes(val));
            tbl.put(put);
            tbl.close();
        }
    
        //插入或者更新一个rowKey数据,一个Put里有一个rowKey,可能有多个列族和列名
        public void put(String tableNameString, Put put) throws IOException {
            TableName tableName = TableName.valueOf(tableNameString);
            Table table = connection.getTable(tableName);
            if (put != null && put.size() > 0) {
                table.put(put);
            }
            table.close();
        }

    2.1、批量插入,根据实际的业务来组装数据,最终就是利用API放入put列表

     //批量插入数据,list里每个map就是一条数据,并且按照rowKey columnFamily columnName columnValue放入map的key和value
        public void bulkInsert(String tableNameString, List<Map<String, Object>> list) throws IOException {
            Table table = connection.getTable(TableName.valueOf(tableNameString));
            List<Put> puts = new ArrayList<Put>();
            if (list != null && list.size() > 0) {
                for (Map<String, Object> map : list) {
                    Put put = new Put(Bytes.toBytes(map.get("rowKey").toString()));
                    put.addColumn(Bytes.toBytes(map.get("columnFamily").toString()),
                            Bytes.toBytes(map.get("columnName").toString()),
                            Bytes.toBytes(map.get("columnValue").toString()));
                    puts.add(put);
                }
            }
            table.put(puts);
            table.close();
        }
    
        //批量插入,外部组装put放入list
        public void bulkInsert2(String tableNameString, List<Put> puts) throws IOException {
            Table table = connection.getTable(TableName.valueOf(tableNameString));
            if (puts != null && puts.size() > 0) {
                table.put(puts);
            }
            table.close();
        }

    样例:

    //批量插入
        private void bulkInsertTestTable() throws IOException{
            String tableNameString = "testtable";
            if(!helper.existsTable(tableNameString)){
                helper.createTable(tableNameString,"info","ex","memo");
            }
    
            System.out.println(".........批量插入数据start.........");
            List<Map<String,Object>> mapList = new ArrayList<>();
            for(int i=1;i<201;i++){
                Map<String,Object> map = new HashMap<>();
                map.put("rowKey","testKey"+i);
                map.put("columnFamily","info");
                map.put("columnName","username");
                map.put("columnValue","guest"+i);
    
                map.put("rowKey","testKey"+i);
                map.put("columnFamily","ex");
                map.put("columnName","addr");
                map.put("columnValue","北京路"+i+"号");
    
                map.put("rowKey","testKey"+i);
                map.put("columnFamily","memo");
                map.put("columnName","detail");
                map.put("columnValue","联合国地球村北京路第"+i+"号");
    
                mapList.add(map);
            }
    
            helper.bulkInsert(tableNameString,mapList);
    
            System.out.println(".........批量插入数据end.........");
        }
    
        //批量插入2
        private void insertByRowKey(String table,String rowKey) throws IOException{
            Put put = new Put(Bytes.toBytes(rowKey));
    
            String columnFamily ;
            String columnName ;
            String columnValue ;
            for(int i=0;i<10;i++){
                columnFamily = "info";
                columnName = "username"+i;
                columnValue = "user111";
                put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(columnValue));
    
                columnFamily = "ex";
                columnName = "addr"+i;
                columnValue = "street 111";
                put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(columnValue));
    
                columnFamily = "memo";
                columnName = "detail"+i;
                columnValue = "sssss zzz 111222 ";
                put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(columnValue));
            }
            System.out.println("----> put size:"+put.size());
    
            helper.put(table,put);
    
        }
    
        private void bulkInsertTestTable2(String tableNameString) throws IOException{
    //        String tableNameString = "testtable";
            if(!helper.existsTable(tableNameString)){
                helper.createTable(tableNameString,"info","ex","memo");
            }
    
            List<Put> puts = new ArrayList<>();
            for(int i=0;i<10;i++){
                String rowKey = "rowKey"+i;
                Put put = new Put(Bytes.toBytes(rowKey));
    
                String columnFamily = "info";
                String columnName = "username2";
                String columnValue = "user"+i;
                put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(columnValue));
    
                columnFamily = "ex";
                columnName = "addr2";
                columnValue = "street "+i;
                put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(columnValue));
    
                columnFamily = "memo";
                columnName = "detail2";
                columnValue = "aazzdd "+i;
                put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(columnValue));
    
                System.out.println("put size:"+put.size());
                puts.add(put);
            }
            helper.bulkInsert2(tableNameString,puts);
        }

    3、删除数据,由于hbase数据是三个维度的,所以删除数据有多种操作

      //根据rowKey删除所有行数据
        public void deleteByKey(String tableNameString,String rowKey) throws IOException{
            Table table = connection.getTable(TableName.valueOf(tableNameString));
            Delete delete = new Delete(Bytes.toBytes(rowKey));
    
            table.delete(delete);
            table.close();
        }
    
        //根据rowKey和列族删除所有行数据
        public void deleteByKeyAndFamily(String tableNameString,String rowKey,String columnFamily) throws IOException{
            Table table = connection.getTable(TableName.valueOf(tableNameString));
            Delete delete = new Delete(Bytes.toBytes(rowKey));
            delete.addFamily(Bytes.toBytes(columnFamily));
    
            table.delete(delete);
            table.close();
        }
    
        //根据rowKey、列族删除多个列的数据
        public void deleteByKeyAndFC(String tableNameString,String rowKey,
                                     String columnFamily,List<String> columnNames) throws IOException{
            Table table = connection.getTable(TableName.valueOf(tableNameString));
            Delete delete = new Delete(Bytes.toBytes(rowKey));
            for(String columnName:columnNames){
                delete.addColumns(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName));
            }
            table.delete(delete);
            table.close();
        }

    4、基本的查询,唯一要注意的是cell里的value必须按位移和长度来取

        //根据rowkey,获取所有列族和列数据
        public List<Cell> getRowByKey(String tableNameString,String rowKey) throws IOException{
            Table table = connection.getTable(TableName.valueOf(tableNameString));
    
            Get get = new Get(Bytes.toBytes(rowKey));
            Result result = table.get(get);
    //        Cell[] cells = result.rawCells();
            List<Cell> list = result.listCells();
            table.close();
            return list;
        }


        //从Cell取Array要加上位移和长度,不然数据不正确
        public void dumpResult(Result result) {
            for (Cell cell : result.rawCells()) {
                System.out.println("Cell: " + cell +
                        ", Value: " + Bytes.toString(cell.getValueArray(),
                        cell.getValueOffset(), cell.getValueLength()));
            }
        }

    5、过滤,这个是HBASE查询的重要部分

    5.1、根据rowKey来过滤

        //根据rowKey过滤数据,rowKey可以使用正则表达式
        //返回rowKey和Cells的键值对
        public Map<String,List<Cell>> filterByRowKeyRegex(String tableNameString,String rowKey,CompareOperator operator) throws IOException{
            Table table = connection.getTable(TableName.valueOf(tableNameString));
            Scan scan = new Scan();
            //使用正则
            RowFilter filter = new RowFilter(operator,new RegexStringComparator(rowKey));
    
            //包含子串匹配,不区分大小写。
    //        RowFilter filter = new RowFilter(operator,new SubstringComparator(rowKey));
    
            scan.setFilter(filter);
    
            ResultScanner scanner = table.getScanner(scan);
            Map<String,List<Cell>> map = new HashMap<>();
            for(Result result:scanner){
                map.put(Bytes.toString(result.getRow()),result.listCells());
            }
            table.close();
            return map;
        }

    5.2、根据列值、列值正则等方式过滤

        //根据列族,列名,列值(支持正则)查找数据
        //返回值:如果查询到值,会返回所有匹配的rowKey下的各列族、列名的所有数据(即使查询的时候这些列族和列名并不匹配)
        public Map<String,List<Cell>> filterByValueRegex(String tableNameString,String family,String colName,
                                                    String value,CompareOperator operator) throws IOException{
            Table table = connection.getTable(TableName.valueOf(tableNameString));
            Scan scan = new Scan();
    
            //正则匹配
            SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(family),
                    Bytes.toBytes(colName),operator,new RegexStringComparator(value));
    
            //完全匹配
    //        SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(family),
    //                Bytes.toBytes(colName),operator,Bytes.toBytes(value));
    
            //SingleColumnValueExcludeFilter排除列值
    
            //要过滤的列必须存在,如果不存在,那么这些列不存在的数据也会返回。如果不想让这些数据返回,设置setFilterIfMissing为true
            filter.setFilterIfMissing(true);
            scan.setFilter(filter);
    
            ResultScanner scanner = table.getScanner(scan);
            Map<String,List<Cell>> map = new HashMap<>();
            for(Result result:scanner){
                map.put(Bytes.toString(result.getRow()),result.listCells());
            }
            return map;
        }

    5.3、根据列名前缀、列名正则、多个列名等过滤

     //根据列名前缀过滤数据
        public Map<String,List<Cell>> filterByColumnPrefix(String tableNameString,String prefix) throws IOException{
            Table table = connection.getTable(TableName.valueOf(tableNameString));
    
            //列名前缀匹配
            ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes(prefix));
    
            //QualifierFilter 用于列名多样性匹配过滤
    //        QualifierFilter filter = new QualifierFilter(CompareOperator.EQUAL,new SubstringComparator(prefix));
    
            //多个列名前缀匹配
    //        MultipleColumnPrefixFilter multiFilter = new MultipleColumnPrefixFilter(new byte[][]{});
    
            Scan scan = new Scan();
            scan.setFilter(filter);
    
            ResultScanner scanner = table.getScanner(scan);
            Map<String,List<Cell>> map = new HashMap<>();
            for(Result result:scanner){
                map.put(Bytes.toString(result.getRow()),result.listCells());
            }
            return map;
        }

    5.4、过滤器集合,多个过滤器同时按通过策略来过滤

        //根据列名范围以及列名前缀过滤数据
        public Map<String,List<Cell>> filterByPrefixAndRange(String tableNameString,String colPrefix,
                                                                 String minCol,String maxCol) throws IOException{
            Table table = connection.getTable(TableName.valueOf(tableNameString));
    
            //列名前缀匹配
            ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes(colPrefix));
    
            //列名范围扫描,上下限范围包括
            ColumnRangeFilter rangeFilter = new ColumnRangeFilter(Bytes.toBytes(minCol),true,
                    Bytes.toBytes(maxCol),true);
    
            FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
            filterList.addFilter(filter);
            filterList.addFilter(rangeFilter);
    
            Scan scan = new Scan();
            scan.setFilter(filterList);
    
            ResultScanner scanner = table.getScanner(scan);
            Map<String,List<Cell>> map = new HashMap<>();
            for(Result result:scanner){
                map.put(Bytes.toString(result.getRow()),result.listCells());
            }
            return map;
        }

    6、过滤器介绍

    6.1、比较操作,如等于、大于、小于

    public enum CompareOperator {
      // Keeps same names as the enums over in filter's CompareOp intentionally.
      // The convertion of operator to protobuf representation is via a name comparison.
      /** less than */
      LESS,
      /** less than or equal to */
      LESS_OR_EQUAL,
      /** equals */
      EQUAL,
      /** not equal */
      NOT_EQUAL,
      /** greater than or equal to */
      GREATER_OR_EQUAL,
      /** greater than */
      GREATER,
      /** no operation */
      NO_OP,
    }

    6.2、比较器,主要是继承ByteArrayComparable的类

    RegexStringComparator 支持正则表达式的值比较
    
    Scan scan = new Scan();
    RegexStringComparator comp = new RegexStringComparator("you."); // 以 you 开头的字符串
    SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("family"), Bytes.toBytes("qualifier"), CompareOp.EQUAL, comp);
    scan.setFilter(filter);
    SubStringComparator 用于判断一个子串是否存在于值中,并且不区分大小写。
    
    Scan scan = new Scan();
    SubstringComparator comp = new SubstringComparator("substr"); // 查找包含的字符串
    SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("family"), Bytes.toBytes("qualifier"), CompareOp.EQUAL, comp);
    scan.setFilter(filter);
    BinaryComparator 二进制比较器,不用反序列化直接进行字节比较,比较高效。
    
    Scan scan = new Scan();
    BinaryComparator comp = new BinaryComparator(Bytes.toBytes("my hbase")); 
    ValueFilter filter = new ValueFilter(CompareOp.EQUAL, comp);
    scan.setFilter(filter);
     BinaryPrefixComparator 前缀二进制比较器。只比较前缀是否相同。
    
    Scan scan = new Scan();
    BinaryPrefixComparator comp = new BinaryPrefixComparator(Bytes.toBytes("test")); //
    SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("family"), Bytes.toBytes("qualifier"),  CompareOp.EQUAL, comp);
    scan.setFilter(filter);

    注意:BitComparator、RegexStringComparator、SubStringComparator只能与EQUAL和NOT_EQUAL搭配使用,因为这些比较器的compareTo()方法匹配时返回0,不匹配的时候返回1,如果和LESS或GREATER搭配就会出错。

    基于字符串的比较器比基于字节的比较器更慢,也更消耗资源。

    6.3、过滤器,部分介绍

    行键过滤器
    RowFilter 对某一行的过滤。
    
    Scan scan = new Scan();
    RowFilter filter = new RowFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("row1")));
    scan.setFilter(filter);
    列族过滤器
    FamilyFilter 用于过滤列族(也可以在Scan 过程中通过设定某些列族来实现该功能)
    
    Scan scan = new Scan();
    FamilyFilter filter = new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("info"))); // 列族为 info
    scan.setFilter(filter);
    列名过滤器
    QualifierFilter 列名全匹配 Scan scan
    = new Scan(); QualifierFilter filter = new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("username"))); // 列名为 username scan.setFilter(filter); ColumnPrefixFilter 用于列名(Qualifier)前缀过滤,即包含某个前缀的所有列名。 Scan scan = new Scan(); ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes("addr")); // 前缀为 addr scan.setFilter(filter); MultipleColumnPrefixFilter MultipleColumnPrefixFilter 与 ColumnPrefixFilter 的行为类似,但可以指定多个列名(Qualifier)前缀。 Scan scan = new Scan(); byte[][] prefixes = new byte[][]{Bytes.toBytes("my-prefix-1"), Bytes.toBytes("my-prefix-2")}; MultipleColumnPrefixFilter filter = new MultipleColumnPrefixFilter(prefixes); scan.setFilter(filter); ColumnRangeFilter 列名范围过滤器可以进行高效的列名内部扫描。关键字:已排序 Scan scan = new Scan(); ColumnRangeFilter filter = new ColumnRangeFilter(Bytes.toBytes("minColumn"), true, Bytes.toBytes("maxColumn"), false); scan.setFilter(filter); DependentColumnFilter 尝试找到该列所在的每一行,并返回该行具有相同时间戳的全部键值对。 Scan scan = new Scan(); DependentColumnFilter filter = new DependentColumnFilter(Bytes.toBytes("family"), Bytes.toBytes("qualifier")); scan.setFilter(filter);
    列值过滤器
    SingleColumnValueFilter 列值比较
    
    列族 info 下的列 username的列值和字符串 "admin" 相等的数据 : 
    Scan scan = new Scan();
    SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("username"), CompareOp.EQUAL, Bytes.toBytes("admin"));
    scan.setFilter(filter);

    6.4、代码:

    https://github.com/asker124143222/hbaseHello

    https://github.com/asker124143222/hbaseDemo

  • 相关阅读:
    hisi3516/3519开发(二)—xshell连接串口
    linux svn使用
    IdentityServer4 源码介绍
    想写博客
    # VS2019 快捷键插入当前时间
    # 使用 vscode markdown 遇到的问题
    # 学Vue
    teXt使用
    Linux基础
    NopCommerce(Core)学习目录
  • 原文地址:https://www.cnblogs.com/asker009/p/10626508.html
Copyright © 2011-2022 走看看