1.先配置hosts的机器名和IP映射
需要先在运行Java程序的机器的hosts文件加入机器名和IP的映射。否则程序运行时会卡死或连接失败。
192.168.100.105 c1 192.168.100.110 c2 192.168.100.115 c3 192.168.100.120 c4
2.修改Maven配置文件(pom.xml)
在<dependencies></dependencies>节点内加入:
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.4.7</version> </dependency>
* 其中三个version节点的2.4.7是对应HBase的版本。例如,安装的HBase是v2.4.0,那三个version节点就填2.4.0
3.代码实现
public class HBaseHandler { public static void main(String[] args) { String quorum = "c1,c2,c3,c4"; //ZooKeeper的主机名或主机ip,默认端口是2181 String tableName = "Student"; String cf_info = "Info"; String cf_score = "Score"; String c_info_name = "Name", c_info_age = "Age"; String c_score_Math = "Math", c_score_English = "English"; // HBase API中要close的类实现了java.lang.AutoCloseable,可以直接在try里面创建实例 try (Connection conn = connect(quorum)) { System.out.println("------------------- create table -------------------"); createTable(conn, tableName, new String[]{cf_info, cf_score}); System.out.println("------------------- put -------------------"); put(conn, tableName, "row1", cf_info, c_info_name, "sam"); put(conn, tableName, "row1", cf_score, c_score_Math, "70"); put(conn, tableName, "row1", cf_score, c_score_English, "75"); put(conn, tableName, "row2", cf_score, c_score_English, "80"); System.out.println("------------------- get -------------------"); String value1 = getCell(conn, tableName, "row1", cf_score, c_score_Math); String value2 = getCell(conn, tableName, "row1", cf_score); System.out.println("------------------- scan -------------------"); List<Map<String, String>> values1 = scan(conn, tableName, "row1", "row3"); //[row1,row3) System.out.println("------------------- scan all table -------------------"); List<Map<String, String>> values2 = scan(conn, tableName); //[row1,row3) System.out.println("------------------- scan with filter -------------------"); Filter filter = new ColumnPrefixFilter("Eng".getBytes());//过滤列族以Eng开头的 List<Map<String, String>> values3 = scan(conn, tableName, filter); //[row1,row3) System.out.println("------------------- increment -------------------"); long value3 = increase(conn, tableName, "row2", cf_score, c_score_Math, 80L); long value4 = increase(conn, tableName, "row2", cf_score, c_score_Math, -5); System.out.println("------------------- delete table -------------------"); deleteTable(conn, tableName); } catch (Exception e) { System.out.println(e); } } /** * 连接HBase * * @param quorum * @return * @throws Exception */ public static Connection connect(String quorum) throws Exception { Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum", quorum); Connection connection = ConnectionFactory.createConnection(config); return connection; } /** * 创建表 * * @param connection * @param tableName 表名 * @param familyNames 列族 * @throws Exception */ public static void createTable(Connection connection, String tableName, String[] familyNames) throws Exception { try (Admin admin = connection.getAdmin()) { TableName tb = TableName.valueOf(tableName); if (!admin.tableExists(tb)) { TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tb); for (String name : familyNames) { ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.of(name); builder.setColumnFamily(cfd); } TableDescriptor desc = builder.build(); admin.createTable(desc); } } } /** * 插入数据 * * @param connection * @param tableName * @param rowKey * @param columnFamily * @param column * @param data * @throws Exception */ public static void put(Connection connection, String tableName, String rowKey, String columnFamily, String column, long data) throws Exception { try (Table table = connection.getTable(TableName.valueOf(tableName))) { Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(data)); table.put(put); } } /** * 插入数据 * * @param connection * @param tableName * @param rowKey * @param columnFamily * @param column * @param data * @throws Exception */ public static void put(Connection connection, String tableName, String rowKey, String columnFamily, String column, String data) throws Exception { try (Table table = connection.getTable(TableName.valueOf(tableName))) { Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(data)); table.put(put); } } /** * 根据rowkey获取一行数据 * * @param connection * @param tableName * @param rowKey * @return * @throws IOException */ public static Map<String, String> getRow(Connection connection, String tableName, String rowKey) throws IOException { try (Table table = connection.getTable(TableName.valueOf(tableName))) { Get get = new Get(Bytes.toBytes(rowKey)); Result result = table.get(get); List<Cell> cells = result.listCells(); if (CollectionUtils.isEmpty(cells)) { return Collections.emptyMap(); } Map<String, String> objectMap = new HashMap<>(); for (Cell cell : cells) { String qualifier = new String(CellUtil.cloneQualifier(cell)); String value = new String(CellUtil.cloneValue(cell), "UTF-8"); objectMap.put(qualifier, value); } return objectMap; } } /** * 获取指定列族的数据 * * @param connection * @param tableName * @param rowKey * @param columnFamily * @return * @throws IOException */ public static String getCell(Connection connection, String tableName, String rowKey, String columnFamily) throws IOException { return getCell(connection, tableName, rowKey, columnFamily, null); } /** * 获取指定行列的数据 * * @param connection * @param tableName * @param rowKey * @param columnFamily * @param column * @return * @throws IOException */ public static String getCell(Connection connection, String tableName, String rowKey, String columnFamily, String column) throws IOException { try (Table table = connection.getTable(TableName.valueOf(tableName))) { String value = null; Get get = new Get(Bytes.toBytes(rowKey)); get.addFamily(columnFamily.getBytes()); if (column != null) { get.addColumn(columnFamily.getBytes(), column.getBytes()); } // get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column)); Result result = table.get(get); List<Cell> cells = result.listCells(); if (!CollectionUtils.isEmpty(cells)) { value = new String(CellUtil.cloneValue(cells.get(0)), "UTF-8"); } return value; } } /** * 获取游标内容(start row和end row) * * @param connection * @param tableName * @param rowkeyStart * @param rowkeyEnd * @return * @throws IOException */ public static List<Map<String, String>> scan(Connection connection, String tableName, String rowkeyStart, String rowkeyEnd) throws IOException { return scan(connection, tableName, rowkeyStart, rowkeyEnd, null); } /** * 获取游标内容(filter) * * @param connection * @param tableName * @return * @throws IOException */ public static List<Map<String, String>> scan(Connection connection, String tableName) throws IOException { return scan(connection, tableName, null, null, null); } /** * 获取游标内容(filter) * * @param connection * @param tableName * @param filter * @return * @throws IOException */ public static List<Map<String, String>> scan(Connection connection, String tableName, Filter filter) throws IOException { return scan(connection, tableName, null, null, filter); } /** * 获取游标内容 * * @param connection * @param tableName * @param rowkeyStart * @param rowkeyEnd * @throws IOException */ public static List<Map<String, String>> scan(Connection connection, String tableName, String rowkeyStart, String rowkeyEnd, Filter filter) throws IOException { try (Table table = connection.getTable(TableName.valueOf(tableName))) { Scan scan = new Scan(); if (!StringUtils.isEmpty(rowkeyStart)) { scan.withStartRow(Bytes.toBytes(rowkeyStart)); } if (!StringUtils.isEmpty(rowkeyEnd)) { scan.withStopRow(Bytes.toBytes(rowkeyEnd)); } if (filter != null) { scan.setFilter(filter); } try (ResultScanner rs = table.getScanner(scan);) { List<Map<String, String>> dataList = new ArrayList<>(); for (Result r : rs) { Map<String, String> objectMap = new HashMap<>(); for (Cell cell : r.listCells()) { String qualifier = new String(CellUtil.cloneQualifier(cell)); String value = new String(CellUtil.cloneValue(cell), "UTF-8"); objectMap.put(qualifier, value); } dataList.add(objectMap); } return dataList; } } } /** * 对指定行列的数据加减值 * * @param connection * @param tableName * @param rowKey * @param columnFamily * @param column * @param incValue * @throws IOException */ public static long increase(Connection connection, String tableName, String rowKey, String columnFamily, String column, long incValue) throws IOException { Long value = null; try (Table table = connection.getTable(TableName.valueOf(tableName))) { Increment increment = new Increment(rowKey.getBytes()); increment.addColumn(columnFamily.getBytes(), column.getBytes(), incValue); Result result = table.increment(increment); for (Cell cell : result.rawCells()) { value = Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); // System.out.println("cell: " + cell + " - incValue: " + Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } } return value; } /** * 删除表 * * @param connection * @param tableName * @throws IOException */ public static void deleteTable(Connection connection, String tableName) throws IOException { TableName tn = TableName.valueOf(tableName); try (Admin admin = connection.getAdmin()) { if (admin.tableExists(tn)) { admin.disableTable(tn); admin.deleteTable(tn); } } } }