zoukankan      html  css  js  c++  java
  • HBase表操作

    相对于0.9.X版本,在HBase1.X版本对内部API改动比较大,例如连接部分类库变更,如下:

      连接获取:org.apache.hadoop.hbase.HBaseConfiguration.createConnection(conf)已经过时,改为使用org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(conf);

      表操作:org.apache.hadoop.hbase.client.HTable已过时,改为使用org.apache.hadoop.hbase.client.HTable接口;

      

    第一部分:连接获取
    package com.mengyao.bigdata.hbase;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    
    /**
     * 
     * @author mengyao
     *
     */
    public class HBaseHelper {
    
        private static final String QUORUM = "hbase.zookeeper.quorum";
        private static final String ZK_PORT = "hbase.zookeeper.property.clientPort";
        private String quorum;
        private String zkPort;
        private Configuration conf;
    
        public HBaseHelper(Configuration conf) {
            this.conf = conf;
        }
        
        public HBaseHelper(String quorum, String zkPort) {
            this.quorum = quorum;
            this.zkPort = zkPort;
        }
        
        public HBaseHelper(Configuration conf, String quorum, String zkPort) {
            this.conf = conf;
            this.quorum = quorum;
            this.zkPort = zkPort;
        }
        
        /**
         * 
         * @param conf
         * @return
         * @throws IOException
         */
        public Connection getConnection() throws IOException {
            if (null == conf) {
                conf = HBaseConfiguration.create();
                conf.set(QUORUM, quorum);
                conf.set(ZK_PORT, zkPort);
            }
            if (null == conf.get(QUORUM) || conf.get(QUORUM).isEmpty()) {
                conf.set(QUORUM, quorum);            
            }
            if (null == conf.get(ZK_PORT) || conf.get(ZK_PORT).isEmpty()) {
                conf.set(ZK_PORT, zkPort);
            }
            return ConnectionFactory.createConnection(conf);
        }
        
        /**
         * 
         * @param connection
         * @throws IOException
         */
        public void closeAll(Connection connection) throws IOException {
            if (!connection.isClosed()) {
                connection.close();
            }
        }
        
        public String getQuorum() {
            return quorum;
        }
    
        public void setQuorum(String quorum) {
            this.quorum = quorum;
        }
    
        public String getZkPort() {
            return zkPort;
        }
    
        public void setZkPort(String zkPort) {
            this.zkPort = zkPort;
        }
    
    }
    
    
    
    第二部分:表数据增删改查
    package com.mengyao.bigdata.hbase;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.Delete;
    import org.apache.hadoop.hbase.client.Get;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
    import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.log4j.Logger;
    
    /**
     * HBase version:1.0.1.1
     * @author mengyao
     *
     */
    public class HBaseV1011Template {
    
        Logger logger = Logger.getLogger(getClass());
    
        private static Connection connection;
        static {
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "bdata200,bdata202,bdata203");
            conf.set("hbase.zookeeper.property.clientPort", "2181");
            HBaseHelper hBaseHelper = new HBaseHelper(conf);
            try {
                connection = hBaseHelper.getConnection();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) throws IOException {
            //单行添加
            Map<String, String> keyValue = new HashMap<>();
            keyValue.put("name", "xiaogang");
            keyValue.put("sex", "男");
            keyValue.put("addr", "北京");
            add("test", "0005", "info", keyValue);
            
            //多行添加
            Map<String, Map<String, String>> keyValues = new HashMap<String, Map<String, String>>();
            Map<String, String> keyValue1 = new HashMap<>();
            keyValue1.put("name", "ligang");
            keyValue1.put("sex", "男");
            keyValue1.put("addr", "北京");
            keyValues.put("0008_info", keyValue1);
            Map<String, String> keyValue2 = new HashMap<>();
            keyValue2.put("name", "zhaojun");
            keyValue2.put("sex", "男");
            keyValue2.put("addr", "北京");
            keyValues.put("0009_info", keyValue2);
            adds("test", "_", keyValues);
    
            //查询所有
            System.out.println("1:"+queryForScan("test", "info"));
            
            //根据rowKey查询
            System.out.println("2:"+queryForRowKey("test", "0005", "info"));
            
            //根据时间戳范围查询(默认包钱不包后)
            System.out.println("3:"+queryForTimeRange("test", "info", 1492510703521L, 1492664183955L));
            
            //根据rowKey查询(默认包前不包后)
            System.out.println("4:"+queryForRowKeyRange("test", "info", "0001", "0003"));
            
            //根据指定列名和列值查询1
            System.out.println("5:"+queryForQuilfier("test", "info", "name", "xiaoming"));
            
            //根据指定列名和列值查询1
            System.out.println("6:"+queryForQuilfier("test", "info", "sex", "男"));
            
            //根据指定列名和列值查询1
            System.out.println("7:"+queryForQuilfier("test", "info", "sex", "女"));
            
            //根据rowKey删除
            deleteByRowKey("test", "0005", "info");
        }
    
        /**
         * 单行添加
         * @param tableName
         * @param rowKey
         * @param family
         * @param keyValue
         * @throws IOException
         */
        private static void add(String tableName, String rowKey, String family, Map<String, String> keyValue) throws IOException {
            Table table = connection.getTable(TableName.valueOf(tableName));
            Put put = new Put(Bytes.toBytes(rowKey));
            for (Entry<String, String> entry : keyValue.entrySet()) {
                put.addColumn(Bytes.toBytes(family), Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue()));
            }
            table.put(put);
            table.close();
            keyValue.clear();
        }
        
        
        private static void adds(String tableName, String rowFamilySeparator, Map<String, Map<String, String>> keyValues) throws IOException {
            Table table = connection.getTable(TableName.valueOf(tableName));
            List<Put> puts = new ArrayList<Put>();
            for (Entry<String, Map<String, String>> entry : keyValues.entrySet()) {
                String key = entry.getKey();
                if (null == rowFamilySeparator || rowFamilySeparator.isEmpty()) {
                    rowFamilySeparator = "_";
                }
                String rowKey = key.split(rowFamilySeparator)[0];
                String family = key.split(rowFamilySeparator)[1];
                Map<String, String> keyValue = entry.getValue();
                Put put = new Put(Bytes.toBytes(rowKey), System.currentTimeMillis());
                for (Entry<String, String> entry2 : keyValue.entrySet()) {
                    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(entry2.getKey()), Bytes.toBytes(entry2.getValue()));                
                }
                puts.add(put);
            }
            table.put(puts);
            table.close();
            keyValues.clear();
        }
        
        /**
         * 单行删除
         * @param tableName
         * @param rowKey
         * @param family
         * @throws IOException
         */
        private static void deleteByRowKey(String tableName, String rowKey, String family) throws IOException {
            Table table = connection.getTable(TableName.valueOf(tableName));
            Delete delete = new Delete(Bytes.toBytes(rowKey)); 
            table.delete(delete);
            table.close();
        }
    
        /**
         * 查询所有
         * @param tableName
         * @param family
         * @return
         * @throws IOException
         */
        public static List<Map<String, String>> queryForScan(String tableName, String family) throws IOException {
            List<Map<String, String>> rows = new ArrayList<Map<String, String>>();
            Table table = connection.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            scan.addFamily(Bytes.toBytes(family));
            ResultScanner rs = table.getScanner(scan);
            Map<String, String> row = null;
            try {
                for (Result r = rs.next(); r != null; r = rs.next()) {
                    Cell[] cells = r.rawCells();
                    for (Cell cell : cells) {
                        row = new HashMap<String, String>();
                        row.put("timestamp", cell.getTimestamp() + "");
                        row.put("rowKey", new String(CellUtil.cloneRow(cell)));
                        row.put("family", new String(CellUtil.cloneFamily(cell)));
                        row.put(new String(CellUtil.cloneQualifier(cell)), new String(CellUtil.cloneValue(cell)));
                        rows.add(row);
                    }
                }
            } finally {
                rs.close();
            }
            return rows;
        }
        
        /**
         * 根据时间范围
         * @param tableName
         * @param family
         * @param minStamp
         * @param maxStamp
         * @return
         * @throws IOException
         */
        public static List<Map<String, String>> queryForTimeRange(String tableName, String family, long minStamp, long maxStamp) throws IOException {
            List<Map<String, String>> rows = new ArrayList<Map<String, String>>();
            Table table = connection.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            scan.addFamily(Bytes.toBytes(family));
            scan.setTimeRange(minStamp, maxStamp);
            ResultScanner rs = table.getScanner(scan);
            Map<String, String> row = null;
            try {
                for (Result r = rs.next(); r != null; r = rs.next()) {
                    Cell[] cells = r.rawCells();
                    for (Cell cell : cells) {
                        row = new HashMap<String, String>();
                        row.put("timestamp", cell.getTimestamp() + "");
                        row.put("rowKey", new String(CellUtil.cloneRow(cell)));
                        row.put("family", new String(CellUtil.cloneFamily(cell)));
                        row.put(new String(CellUtil.cloneQualifier(cell)), new String(CellUtil.cloneValue(cell)));
                        rows.add(row);
                    }
                }
            } finally {
                rs.close();
            }
            return rows;
        }
    
        /**
         * 根据RowKey查询
         * @param tableName
         * @param rowKey
         * @param family
         * @return
         * @throws IOException
         */
        public static Map<String, String> queryForRowKey(String tableName, String rowKey, String family) throws IOException {
            Table table = connection.getTable(TableName.valueOf(tableName));
            Get get = new Get(Bytes.toBytes(rowKey));
            get.addFamily(Bytes.toBytes(family));
            Scan scan = new Scan(get);
            ResultScanner rs = table.getScanner(scan);
            Map<String, String> row = null;
            try {
                for (Result r = rs.next(); r != null; r = rs.next()) {
                    Cell[] cells = r.rawCells();
                    row = new HashMap<String, String>();
                    for (Cell cell : cells) {
                        row.put("timestamp", cell.getTimestamp() + "");
                        row.put("rowKey", new String(CellUtil.cloneRow(cell)));
                        row.put("family", new String(CellUtil.cloneFamily(cell)));
                        row.put(new String(CellUtil.cloneQualifier(cell)), new String(CellUtil.cloneValue(cell)));
                    }
                }
            } finally {
                rs.close();
            }
            return row;
        }
        
        /**
         * 根据RowKey范围查询
         * @param tableName
         * @param family
         * @param startRow
         * @param stopRow
         * @return
         * @throws IOException
         */
        public static List<Map<String, String>> queryForRowKeyRange(String tableName, String family, String startRow, String stopRow) throws IOException {
            List<Map<String, String>> rows = new ArrayList<Map<String, String>>();
            Table table = connection.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            scan.addFamily(Bytes.toBytes(family));
            scan.setStartRow(Bytes.toBytes(startRow));
            scan.setStopRow(Bytes.toBytes(stopRow));
            ResultScanner rs = table.getScanner(scan);
            Map<String, String> row = null;
            try {
                for (Result r = rs.next(); r != null; r = rs.next()) {
                    Cell[] cells = r.rawCells();
                    for (Cell cell : cells) {
                        row = new HashMap<String, String>();
                        row.put("timestamp", cell.getTimestamp() + "");
                        row.put("rowKey", new String(CellUtil.cloneRow(cell)));
                        row.put("family", new String(CellUtil.cloneFamily(cell)));
                        row.put(new String(CellUtil.cloneQualifier(cell)), new String(CellUtil.cloneValue(cell)));
                        rows.add(row);
                    }
                }
            } finally {
                rs.close();
            }
            return rows;
        }
        
        /**
         * 根据指定列名匹配列值
         * @param tableName
         * @param family
         * @param qualifier
         * @param value
         * @return
         * @throws IOException
         */
        public static Collection<Map<String, String>> queryForQuilfier(String tableName, String family, String qualifier, String value) throws IOException {
            Map<String, Map<String, String>> rows = new HashMap<String, Map<String, String>>();
            Table table = connection.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(qualifier), CompareOp.EQUAL, Bytes.toBytes(value));
            filter.setFilterIfMissing(true); 
            scan.setFilter(filter);
            ResultScanner rs = table.getScanner(scan);
            Map<String, String> row = null;
            try {
                for (Result r = rs.next(); r != null; r = rs.next()) {
                    Cell[] cells = r.rawCells();
                    for (Cell cell : cells) {
                        String rowKey = new String(CellUtil.cloneRow(cell));
                        if (null == row || !rows.containsKey(rowKey)) {
                            row = new HashMap<String, String>();
                        }
                        row.put("timestamp", cell.getTimestamp() + "");
                        row.put("rowKey", rowKey);
                        row.put("family", new String(CellUtil.cloneFamily(cell)));
                        row.put(new String(CellUtil.cloneQualifier(cell)), new String(CellUtil.cloneValue(cell)));
                        rows.put(rowKey,row);
                    }
                }
            } finally {
                rs.close();
            }
            return rows.values();
        }
    
    }
  • 相关阅读:
    java socket HTTPClient GET
    Spring的JdbcTemplate使用update或insert操作的三种使用例子
    windows XP 解决QQ和360软件冲突的办法
    J2EE 应用服务器集群常用方法
    javaeye被关闭了 被政 府和谐感
    P6SPY(JDBC SQL拦截)的安装和使用
    让QQ与360并存,不卸载360正常使用QQ
    Spring2.5注解(标注)学习笔记(使用annotation代替XML)
    centos vnc配置
    HTTP协议内容具体含义
  • 原文地址:https://www.cnblogs.com/mengyao/p/6742476.html
Copyright © 2011-2022 走看看