相对于0.9.X版本,在HBase1.X版本对内部API改动比较大,例如连接部分类库变更,如下:
连接获取:org.apache.hadoop.hbase.HBaseConfiguration.createConnection(conf)已经过时,改为使用org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(conf);
表操作:org.apache.hadoop.hbase.client.HTable已过时,改为使用org.apache.hadoop.hbase.client.HTable接口;
第一部分:连接获取 package com.mengyao.bigdata.hbase; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; /** * * @author mengyao * */ public class HBaseHelper { private static final String QUORUM = "hbase.zookeeper.quorum"; private static final String ZK_PORT = "hbase.zookeeper.property.clientPort"; private String quorum; private String zkPort; private Configuration conf; public HBaseHelper(Configuration conf) { this.conf = conf; } public HBaseHelper(String quorum, String zkPort) { this.quorum = quorum; this.zkPort = zkPort; } public HBaseHelper(Configuration conf, String quorum, String zkPort) { this.conf = conf; this.quorum = quorum; this.zkPort = zkPort; } /** * * @param conf * @return * @throws IOException */ public Connection getConnection() throws IOException { if (null == conf) { conf = HBaseConfiguration.create(); conf.set(QUORUM, quorum); conf.set(ZK_PORT, zkPort); } if (null == conf.get(QUORUM) || conf.get(QUORUM).isEmpty()) { conf.set(QUORUM, quorum); } if (null == conf.get(ZK_PORT) || conf.get(ZK_PORT).isEmpty()) { conf.set(ZK_PORT, zkPort); } return ConnectionFactory.createConnection(conf); } /** * * @param connection * @throws IOException */ public void closeAll(Connection connection) throws IOException { if (!connection.isClosed()) { connection.close(); } } public String getQuorum() { return quorum; } public void setQuorum(String quorum) { this.quorum = quorum; } public String getZkPort() { return zkPort; } public void setZkPort(String zkPort) { this.zkPort = zkPort; } } 第二部分:表数据增删改查 package com.mengyao.bigdata.hbase; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Logger; /** * HBase version:1.0.1.1 * @author mengyao * */ public class HBaseV1011Template { Logger logger = Logger.getLogger(getClass()); private static Connection connection; static { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "bdata200,bdata202,bdata203"); conf.set("hbase.zookeeper.property.clientPort", "2181"); HBaseHelper hBaseHelper = new HBaseHelper(conf); try { connection = hBaseHelper.getConnection(); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException { //单行添加 Map<String, String> keyValue = new HashMap<>(); keyValue.put("name", "xiaogang"); keyValue.put("sex", "男"); keyValue.put("addr", "北京"); add("test", "0005", "info", keyValue); //多行添加 Map<String, Map<String, String>> keyValues = new HashMap<String, Map<String, String>>(); Map<String, String> keyValue1 = new HashMap<>(); keyValue1.put("name", "ligang"); keyValue1.put("sex", "男"); keyValue1.put("addr", "北京"); keyValues.put("0008_info", keyValue1); Map<String, String> keyValue2 = new HashMap<>(); keyValue2.put("name", "zhaojun"); keyValue2.put("sex", "男"); keyValue2.put("addr", "北京"); keyValues.put("0009_info", keyValue2); adds("test", "_", keyValues); //查询所有 System.out.println("1:"+queryForScan("test", "info")); //根据rowKey查询 System.out.println("2:"+queryForRowKey("test", "0005", "info")); //根据时间戳范围查询(默认包钱不包后) System.out.println("3:"+queryForTimeRange("test", "info", 1492510703521L, 1492664183955L)); //根据rowKey查询(默认包前不包后) System.out.println("4:"+queryForRowKeyRange("test", "info", "0001", "0003")); //根据指定列名和列值查询1 System.out.println("5:"+queryForQuilfier("test", "info", "name", "xiaoming")); //根据指定列名和列值查询1 System.out.println("6:"+queryForQuilfier("test", "info", "sex", "男")); //根据指定列名和列值查询1 System.out.println("7:"+queryForQuilfier("test", "info", "sex", "女")); //根据rowKey删除 deleteByRowKey("test", "0005", "info"); } /** * 单行添加 * @param tableName * @param rowKey * @param family * @param keyValue * @throws IOException */ private static void add(String tableName, String rowKey, String family, Map<String, String> keyValue) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); Put put = new Put(Bytes.toBytes(rowKey)); for (Entry<String, String> entry : keyValue.entrySet()) { put.addColumn(Bytes.toBytes(family), Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue())); } table.put(put); table.close(); keyValue.clear(); } private static void adds(String tableName, String rowFamilySeparator, Map<String, Map<String, String>> keyValues) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); List<Put> puts = new ArrayList<Put>(); for (Entry<String, Map<String, String>> entry : keyValues.entrySet()) { String key = entry.getKey(); if (null == rowFamilySeparator || rowFamilySeparator.isEmpty()) { rowFamilySeparator = "_"; } String rowKey = key.split(rowFamilySeparator)[0]; String family = key.split(rowFamilySeparator)[1]; Map<String, String> keyValue = entry.getValue(); Put put = new Put(Bytes.toBytes(rowKey), System.currentTimeMillis()); for (Entry<String, String> entry2 : keyValue.entrySet()) { put.addColumn(Bytes.toBytes(family), Bytes.toBytes(entry2.getKey()), Bytes.toBytes(entry2.getValue())); } puts.add(put); } table.put(puts); table.close(); keyValues.clear(); } /** * 单行删除 * @param tableName * @param rowKey * @param family * @throws IOException */ private static void deleteByRowKey(String tableName, String rowKey, String family) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); Delete delete = new Delete(Bytes.toBytes(rowKey)); table.delete(delete); table.close(); } /** * 查询所有 * @param tableName * @param family * @return * @throws IOException */ public static List<Map<String, String>> queryForScan(String tableName, String family) throws IOException { List<Map<String, String>> rows = new ArrayList<Map<String, String>>(); Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); scan.addFamily(Bytes.toBytes(family)); ResultScanner rs = table.getScanner(scan); Map<String, String> row = null; try { for (Result r = rs.next(); r != null; r = rs.next()) { Cell[] cells = r.rawCells(); for (Cell cell : cells) { row = new HashMap<String, String>(); row.put("timestamp", cell.getTimestamp() + ""); row.put("rowKey", new String(CellUtil.cloneRow(cell))); row.put("family", new String(CellUtil.cloneFamily(cell))); row.put(new String(CellUtil.cloneQualifier(cell)), new String(CellUtil.cloneValue(cell))); rows.add(row); } } } finally { rs.close(); } return rows; } /** * 根据时间范围 * @param tableName * @param family * @param minStamp * @param maxStamp * @return * @throws IOException */ public static List<Map<String, String>> queryForTimeRange(String tableName, String family, long minStamp, long maxStamp) throws IOException { List<Map<String, String>> rows = new ArrayList<Map<String, String>>(); Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); scan.addFamily(Bytes.toBytes(family)); scan.setTimeRange(minStamp, maxStamp); ResultScanner rs = table.getScanner(scan); Map<String, String> row = null; try { for (Result r = rs.next(); r != null; r = rs.next()) { Cell[] cells = r.rawCells(); for (Cell cell : cells) { row = new HashMap<String, String>(); row.put("timestamp", cell.getTimestamp() + ""); row.put("rowKey", new String(CellUtil.cloneRow(cell))); row.put("family", new String(CellUtil.cloneFamily(cell))); row.put(new String(CellUtil.cloneQualifier(cell)), new String(CellUtil.cloneValue(cell))); rows.add(row); } } } finally { rs.close(); } return rows; } /** * 根据RowKey查询 * @param tableName * @param rowKey * @param family * @return * @throws IOException */ public static Map<String, String> queryForRowKey(String tableName, String rowKey, String family) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); get.addFamily(Bytes.toBytes(family)); Scan scan = new Scan(get); ResultScanner rs = table.getScanner(scan); Map<String, String> row = null; try { for (Result r = rs.next(); r != null; r = rs.next()) { Cell[] cells = r.rawCells(); row = new HashMap<String, String>(); for (Cell cell : cells) { row.put("timestamp", cell.getTimestamp() + ""); row.put("rowKey", new String(CellUtil.cloneRow(cell))); row.put("family", new String(CellUtil.cloneFamily(cell))); row.put(new String(CellUtil.cloneQualifier(cell)), new String(CellUtil.cloneValue(cell))); } } } finally { rs.close(); } return row; } /** * 根据RowKey范围查询 * @param tableName * @param family * @param startRow * @param stopRow * @return * @throws IOException */ public static List<Map<String, String>> queryForRowKeyRange(String tableName, String family, String startRow, String stopRow) throws IOException { List<Map<String, String>> rows = new ArrayList<Map<String, String>>(); Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); scan.addFamily(Bytes.toBytes(family)); scan.setStartRow(Bytes.toBytes(startRow)); scan.setStopRow(Bytes.toBytes(stopRow)); ResultScanner rs = table.getScanner(scan); Map<String, String> row = null; try { for (Result r = rs.next(); r != null; r = rs.next()) { Cell[] cells = r.rawCells(); for (Cell cell : cells) { row = new HashMap<String, String>(); row.put("timestamp", cell.getTimestamp() + ""); row.put("rowKey", new String(CellUtil.cloneRow(cell))); row.put("family", new String(CellUtil.cloneFamily(cell))); row.put(new String(CellUtil.cloneQualifier(cell)), new String(CellUtil.cloneValue(cell))); rows.add(row); } } } finally { rs.close(); } return rows; } /** * 根据指定列名匹配列值 * @param tableName * @param family * @param qualifier * @param value * @return * @throws IOException */ public static Collection<Map<String, String>> queryForQuilfier(String tableName, String family, String qualifier, String value) throws IOException { Map<String, Map<String, String>> rows = new HashMap<String, Map<String, String>>(); Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(qualifier), CompareOp.EQUAL, Bytes.toBytes(value)); filter.setFilterIfMissing(true); scan.setFilter(filter); ResultScanner rs = table.getScanner(scan); Map<String, String> row = null; try { for (Result r = rs.next(); r != null; r = rs.next()) { Cell[] cells = r.rawCells(); for (Cell cell : cells) { String rowKey = new String(CellUtil.cloneRow(cell)); if (null == row || !rows.containsKey(rowKey)) { row = new HashMap<String, String>(); } row.put("timestamp", cell.getTimestamp() + ""); row.put("rowKey", rowKey); row.put("family", new String(CellUtil.cloneFamily(cell))); row.put(new String(CellUtil.cloneQualifier(cell)), new String(CellUtil.cloneValue(cell))); rows.put(rowKey,row); } } } finally { rs.close(); } return rows.values(); } }