zoukankan      html  css  js  c++  java
  • Hbase 操作工具类

    依赖jar

            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>2.0.5</version>
            </dependency>

     

    HbaseUtils.java

    package javax.utils;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.client.Delete;
    import org.apache.hadoop.hbase.client.Get;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.util.Bytes;
    
    /**
     * Hbase 操作工具类
     * 
     * @author Logan
     * @version 1.0.0
     * @createDate 2019-05-03
     *
     */
    public class HbaseUtils {
    
        // ===============Common=====================================
    
        /**
         * 根据表名获取Table对象
         * 
         * @param name 表名,必要时可指定命名空间,比如:“default:user”
         * @return Hbase Table 对象
         * @throws IOException 有异常抛出,由调用者捕获处理
         */
        public static Table getTable(String name) throws IOException {
            TableName tableName = TableName.valueOf(name);
            Connection connection = ConnectionFactory.createConnection();
            return connection.getTable(tableName);
        }
    
        // =============== Put =====================================
    
        /**
         * 根据rowKey生成一个Put对象
         * 
         * @param rowKey rowKey
         * @return Put对象
         */
        public static Put createPut(String rowKey) {
            return new Put(Bytes.toBytes(rowKey));
        }
    
        /**
         * 在Put对象上增加Cell
         * 
         * @param put Put对象
         * @param cell cell对象
         * @throws IOException 有异常抛出,由调用者捕获处理
         */
        public static void addCellOnPut(Put put, Cell cell) throws IOException {
            put.add(cell);
        }
    
        /**
         * 在Put对象上增加值
         * 
         * @param put Put对象
         * @param family 列簇
         * @param qualifier 列
         * @param value 字符串类型的值
         */
        public static void addValueOnPut(Put put, String family, String qualifier, String value) {
            addValueOnPut(put, family, qualifier, Bytes.toBytes(value));
        }
    
        /**
         * 在Put对象上增加值
         * 
         * @param put Put对象
         * @param family 列簇
         * @param qualifier 列
         * @param value 字节数组类型的值,可以是任意对象序列化而成
         */
        public static void addValueOnPut(Put put, String family, String qualifier, byte[] value) {
            put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), value);
        }
    
        /**
         * 在Put对象上增加值
         * 
         * @param put Put对象
         * @param family 列簇
         * @param qualifier 列
         * @param ts Timestamp时间戳
         * @param value 字符串类型的值
         */
        public static void addValueOnPut(Put put, String family, String qualifier, long ts, String value) {
            addValueOnPut(put, family, qualifier, ts, Bytes.toBytes(value));
        }
    
        /**
         * 在Put对象上增加值
         * 
         * @param put Put对象
         * @param family 列簇
         * @param qualifier 列
         * @param ts Timestamp时间戳
         * @param value 字节数组类型的值,可以是任意对象序列化而成
         */
        public static void addValueOnPut(Put put, String family, String qualifier, long ts, byte[] value) {
            put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), ts, value);
        }
    
        /**
         * 按表名插入一个Put对象包含的数据
         * 
         * @param tableName 表名,必要时可指定命名空间,比如:“default:user”
         * @param put 要插入的数据对象
         * @throws IOException 有异常抛出,由调用者捕获处理
         */
        public static void put(String tableName, Put put) throws IOException {
            try (
                    Table table = getTable(tableName);
            ) {
    
                table.put(put);
            }
        }
    
        /**
         * 按表名批量插入Put对象包含的数据
         * 
         * @param tableName 表名,必要时可指定命名空间,比如:“default:user”
         * @param puts 要插入的数据对象集合
         * @throws IOException 有异常抛出,由调用者捕获处理
         */
        public static void put(String tableName, List<Put> puts) throws IOException {
            try (
                    Table table = getTable(tableName);
            ) {
    
                table.put(puts);
            }
        }
    
        // =============== Get =====================================
    
        /**
         * 根据rowKey生成一个查询的Get对象
         * 
         * @param rowKey rowKey
         * @return Get 对象
         */
        public static Get createGet(String rowKey) {
            return new Get(Bytes.toBytes(rowKey));
        }
    
        /**
         * 对查询的Get对象增加指定列簇
         * 
         * @param get
         * @param family
         */
        public static void addFamilyOnGet(Get get, String family) {
            get.addFamily(Bytes.toBytes(family));
        }
    
        /**
         * 对查询的Get对象增加指定列簇和列
         * 
         * @param get
         * @param family
         * @param qualifier
         */
        public static void addColumnOnGet(Get get, String family, String qualifier) {
            get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
        }
    
        /**
         * 根据表名和rowKey查询结果(包含全部列簇和列)
         * 
         * @param tableName 表名,必要时可指定命名空间,比如:“default:user”
         * @param rowKey 查询rowKey
         * @return 查询结果Result
         * @throws IOException 有异常抛出,由调用者捕获处理
         */
        public static Result get(String tableName, String rowKey) throws IOException {
            Get get = createGet(rowKey);
            return get(tableName, get);
        }
    
        /**
         * 根据表名和rowKey数组批量查询结果(包含全部列簇和列)
         * 
         * @param tableName 表名,必要时可指定命名空间,比如:“default:user”
         * @param rowKeys 查询rowKey数组
         * @return 查询结果Result数组
         * @throws IOException 有异常抛出,由调用者捕获处理
         */
        public static Result[] get(String tableName, String[] rowKeys) throws IOException {
            List<Get> gets = new ArrayList<Get>();
            for (String rowKey : rowKeys) {
                gets.add(createGet(rowKey));
            }
            return get(tableName, gets);
        }
    
        /**
         * 根据表名和Get对象查询结果
         * 
         * @param tableName 表名,必要时可指定命名空间,比如:“default:user”
         * @param get Hbase查询对象
         * @return 查询结果Result
         * @throws IOException 有异常抛出,由调用者捕获处理
         */
        public static Result get(String tableName, Get get) throws IOException {
            try (
                    Table table = getTable(tableName);
            ) {
    
                return table.get(get);
            }
        }
    
        /**
         * 根据表名和Get对象数组查询结果
         * 
         * @param tableName 表名,必要时可指定命名空间,比如:“default:user”
         * @param gets 多个Hbase查询对象组成的数组
         * @return 查询结果Result数组
         * @throws IOException 有异常抛出,由调用者捕获处理
         */
        public static Result[] get(String tableName, List<Get> gets) throws IOException {
            try (
                    Table table = getTable(tableName);
            ) {
                return table.get(gets);
            }
        }
    
        // =============== Scan =====================================
    
        /**
         * 根据startRow和stopRow创建扫描对象
         * 
         * @param startRow 扫描开始行,结果包含该行
         * @param stopRow 扫描结束行,结果不包含该行
         * @return Scan对象
         */
        public static Scan createScan(String startRow, String stopRow) {
            Scan scan = new Scan();
            scan.withStartRow(Bytes.toBytes(startRow));
            scan.withStopRow(Bytes.toBytes(stopRow));
            return scan;
        }
    
        /**
         * 对扫描对象设置列簇
         * 
         * @param scan 扫描对象
         * @param family 列簇
         */
        public static void addFamilyOnScan(Scan scan, String family) {
            scan.addFamily(Bytes.toBytes(family));
        }
    
        /**
         * 对扫描对象设置列
         * 
         * @param scan 扫描对象
         * @param family 列簇
         * @param qualifier 列簇下对应的列
         */
        public static void addColumnOnScan(Scan scan, String family, String qualifier) {
            scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
        }
    
        /**
         * 根据表名和扫描对象扫描数据
         * 
         * @param tableName 表名,必要时可指定命名空间,比如:“default:user”
         * @param scan 扫描对象
         * @return 扫描结果集对象ResultScanner
         * @throws IOException 有异常抛出,由调用者捕获处理
         */
        public static ResultScanner scan(String tableName, Scan scan) throws IOException {
            try (
                    Table table = getTable(tableName);
            ) {
                return table.getScanner(scan);
            }
        }
    
        /**
         * 根据表名、开始行和结束行扫描数据(结果包含开始行,不包含结束行,半开半闭区间[startRow, stopRow))
         * 
         * @param tableName 表名,必要时可指定命名空间,比如:“default:user”
         * @param startRow 扫描开始行
         * @param stopRow 扫描结束行
         * @return 扫描结果集对象ResultScanner
         * @throws IOException 有异常抛出,由调用者捕获处理
         */
        public static ResultScanner scan(String tableName, String startRow, String stopRow) throws IOException {
            return scan(tableName, createScan(startRow, stopRow));
        }
    
        // =============== Delete =====================================
    
        /**
         * 根据rowKey生成一个查询的Delete对象
         * 
         * @param rowKey rowKey
         * @return Delete对象
         */
        public static Delete createDelete(String rowKey) {
            return new Delete(Bytes.toBytes(rowKey));
        }
    
        /**
         * 在Delete对象上增加Cell
         * 
         * @param delete Delete对象
         * @param cell cell对象
         * @throws IOException 有异常抛出,由调用者捕获处理
         */
        public static void addCellOnDelete(Delete delete, Cell cell) throws IOException {
            delete.add(cell);
        }
    
        /**
         * 对删除对象增加指定列簇
         * 
         * @param delete Delete对象
         * @param family 列簇
         */
        public static void addFamilyOnDelete(Delete delete, String family) {
            delete.addFamily(Bytes.toBytes(family));
        }
    
        /**
         * 对删除对象增加指定列簇和列
         * 
         * @param delete Delete对象
         * @param family 列簇
         * @param qualifier 列
         */
        public static void addColumnOnDelete(Delete delete, String family, String qualifier) {
            delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
        }
    
        /**
         * 按表名删除一个Delete对象指定的数据
         * 
         * @param tableName 表名,必要时可指定命名空间,比如:“default:user”
         * @param delete Delete对象
         * @throws IOException 有异常抛出,由调用者捕获处理
         */
        public static void delete(String tableName, Delete delete) throws IOException {
            try (
                    Table table = getTable(tableName);
            ) {
                table.delete(delete);
            }
        }
    
        /**
         * 按表名批量删除Delete对象集合包含的指定数据
         * 
         * @param tableName 表名,必要时可指定命名空间,比如:“default:user”
         * @param deletes Delete对象集合
         * @throws IOException 有异常抛出,由调用者捕获处理
         */
        public static void delete(String tableName, List<Delete> deletes) throws IOException {
            try (
                    Table table = getTable(tableName);
            ) {
                table.delete(deletes);
            }
        }
    
    }

    以下是测试类

    HbaseClientDemo.java

    package com.java.demo;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import javax.utils.HbaseUtils;
    
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.client.Delete;
    import org.apache.hadoop.hbase.client.Get;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.junit.Test;
    
    /**
     * Hbase 客户端测试
     * 
     * @author Logan
     * @version 1.0.0
     * @createDate 2019-05-03
     *
     */
    public class HbaseClientDemo {
    
        /**
         * 向user表中插入数据
         */
        @Test
        public void put() {
    
            String tableName = "default:user";
            try {
    
                List<Put> puts = new ArrayList<Put>();
                Put put = HbaseUtils.createPut("key1005");
                HbaseUtils.addValueOnPut(put, "info", "name", "孙悟空");
                HbaseUtils.addValueOnPut(put, "info", "age", "500");
                HbaseUtils.addValueOnPut(put, "info", "address", "花果山");
                // HbaseUtils.put(tableName, put);
                puts.add(put);
    
                put = HbaseUtils.createPut("key1006");
                HbaseUtils.addValueOnPut(put, "info", "name", "沙悟净");
                HbaseUtils.addValueOnPut(put, "info", "age", "1000");
                HbaseUtils.addValueOnPut(put, "info", "address", "流沙河");
                puts.add(put);
    
                HbaseUtils.put(tableName, puts);
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 按rowKey批量查询user表中全部列簇全部列的值
         */
        @Test
        public void getAllFamily() {
            try {
                String tableName = "default:user";
                String[] rowKeys = { "key1001", "key1002", "key1003", "key1005", "key1006" };
    
                // 按表名和rowKey查询所有列
                Result[] results = HbaseUtils.get(tableName, rowKeys);
                for (Result result : results) {
    
                    // 打印查询结果
                    printResult(result);
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 按rowKey查询user表中指定列簇指定列的值
         */
        @Test
        public void get() {
            try {
                String tableName = "default:user";
                String rowKey = "key1002";
    
                Get get = HbaseUtils.createGet(rowKey);
    
                HbaseUtils.addColumnOnGet(get, "info", "name");
                HbaseUtils.addColumnOnGet(get, "info", "age");
    
                // 不存在的列,查询结果不显示
                HbaseUtils.addColumnOnGet(get, "info", "address");
    
                // 如果在增加列后增加已有的列簇,会返回该列簇的全部列数据,覆盖前边的增加列
                // HbaseUtils.addFamilyOnGet(get, "info");
    
                Result result = HbaseUtils.get(tableName, get);
                printResult(result);
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @Test
        public void scan() {
            try {
                String tableName = "default:user";
                String startRow = "key1001";
                String stopRow = "key1006";
                ResultScanner resultScanner = HbaseUtils.scan(tableName, startRow, stopRow);
                for (Result result : resultScanner) {
                    printResult(result);
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 打印查询结果
         * 
         * @param result 查询结果对象
         */
        private void printResult(Result result) {
            Cell[] cells = result.rawCells();
    
            // 从Result中读取 rowKey
            System.out.println(Bytes.toString(result.getRow()));
    
            String print = "%s	 %s:%s 	 %s";
            for (Cell cell : cells) {
    
                // 从Cell中取rowKey
                String row = Bytes.toString(CellUtil.cloneRow(cell));
                String family = Bytes.toString(CellUtil.cloneFamily(cell));
                String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
                String value = Bytes.toString(CellUtil.cloneValue(cell));
    
                System.out.println(String.format(print, row, family, qualifier, value));
    
            }
        }
    
        /**
         * 删除指定列
         */
        @Test
        public void deleteColumn() {
            try {
                String tableName = "default:user";
                List<Delete> deletes = new ArrayList<Delete>();
                Delete delete = HbaseUtils.createDelete("key1005");
                HbaseUtils.addColumnOnDelete(delete, "info", "age");
                HbaseUtils.addColumnOnDelete(delete, "info", "address");
                // HbaseUtils.delete(tableName, delete);
                deletes.add(delete);
    
                delete = HbaseUtils.createDelete("key1006");
                HbaseUtils.addColumnOnDelete(delete, "info", "address");
                deletes.add(delete);
    
                HbaseUtils.delete(tableName, deletes);
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 删除指定列簇
         */
        @Test
        public void deleteFamily() {
            try {
                String tableName = "default:user";
                List<Delete> deletes = new ArrayList<Delete>();
                Delete delete = HbaseUtils.createDelete("key1005");
                HbaseUtils.addFamilyOnDelete(delete, "info");
                // HbaseUtils.delete(tableName, delete);
                deletes.add(delete);
    
                delete = HbaseUtils.createDelete("key1006");
                HbaseUtils.addFamilyOnDelete(delete, "info");
                deletes.add(delete);
    
                HbaseUtils.delete(tableName, deletes);
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }

     

      

     

    Hbase 操作工具类

    .

  • 相关阅读:
    pyenv: python2.7: command not found The `python2.7' command exists in these Python versions: 2.7.5
    Gazebo_02_build_a_robot
    Gazebo_01_getting_start
    vscode等编辑器中报Exception has occurred: ModuleNotFoundError No module named 'requests'
    Ubuntu16.04安装Python3.8以后出现lsb_release/No LSB modules are available的错误
    C语言字符串定义(数组&指针)
    电脑软件更新管理
    VS2017自定义新建模板
    《SQL必知必会-第四版》--学习摘抄
    实体类封装数据库查询信息(工具一)
  • 原文地址:https://www.cnblogs.com/jonban/p/hbase-client.html
Copyright © 2011-2022 走看看