zoukankan      html  css  js  c++  java
  • Hbase介绍及操作

    1. Hbase概述

    1.1 Hbase是什么

    • HBase是建立在HDFS之上的分布式面向列的数据库;属于KV结构数据,原生不支持标准SQL。它是一个Apache的开源项目,是横向扩展的。
    • HBase可以提供快速随机访问海量结构化数据。它利用了Hadoop的文件系统(HDFS)提供的容错能力。
    • HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库,是HBase基于列的而不是基于行的模式。

    1.2 Hbase数据单元

    RowKey:是Byte array,是表中每条记录的“主键”,按照字典顺序排序,方便快速查找,Rowkey的设计非常重要;
    Column Family:列族,拥有一个名称(string),包含一个或者多个相关列;
    Column:属于某一个columnfamily,familyName:columnName,每条记录可动态添加;
    Version Number:类型为Long,默认值是系统时间戳Timestamp,可由用户自定义;用于标记同一份数据的不同版本。
    Value(Cell):Byte array;

    2. Hbase的架构

    2.1 Hbase的架构组成

    HBase采用Master/Slave架构搭建集群,它隶属于Hadoop生态系统,由以下类型节点组成:HMaster节点、HRegionServer节点、ZooKeeper集群,而在底层,它将数据存储于HDFS中,因而涉及到HDFS的NameNode、DataNode等,总体结构如下:

    各组件说明:
    Client

    • 使用HBase RPC机制与HMaster和HRegionServer进行通信;
    • Client与HMaster进行通信进行管理类操作;
    • Client与HRegionServer进行数据读写类操作;

    HMaster

    • HMaster没有单点问题,HBase中可以启动多个HMaster,通过Zookeeper保证总有一个Master在运行。

    HMaster主要负责Table和Region的管理工作

    • 管理用户对表的增删改查操作;
    • 管理HRegionServer的负载均衡,调整Region分布;
    • Region Split后,负责新Region的分布;
    • 在HRegionServer停机后,负责失效HRegionServer上Region 的迁移;

    HRegionServer
    HBase中最核心的模块;

    • 维护region,处理对这些region的IO请求;
    • Regionserver负责切分在运行过程中变得过大的region;

    HRegion

    • HBase使用RowKey将表水平切割成多个HRegion,从HMaster的角度,每个HRegion都纪录了它的StartKey和EndKey(第一个HRegion的StartKey为空,最后一个HRegion的EndKey为空),由于RowKey是排序的,因而Client可以通过HMaster快速的定位每个RowKey在哪个HRegion中。

    2.2查看hbase的web界面

    http://master:16010/

    3.hbase shell操作

    3.1 DDL操作

    #开启hbase shell
    hbase shell
    #查看hbase状态
    status
    #查看hbase版本
    version
    #创建命名空间
    create_namespace '命名空间名'
    #显示所有命名空间
    list_namespace
    #删除命名空间, 在删除一个命名空间时,该命名空间不能包含任何的表,否则会报错
    drop_namespace '命名空间名'
    #创建默认命名空间的表
    create '表名称', '列族名称1','列族名称2','列族名称N'
    #创建带有命名空间的表
    create '命名空间:表名称', '列族名称1','列族名称2','列族名称N'
    #列出所有表
    list
    #获得表的描述
    describe '表名'
    #删除table 表的 列族名称1 列族
    alter 'table',{NAME=>'列族名称1',METHOD=>'delete'}
    #删除多个列族
    alter 'table', {NAME => '列族名称1', METHOD => 'delete'},{NAME => '列族名称2', METHOD => 'delete'}
    #先把表下线
    disable '表名'
    #再drop表
    drop '表名'

    3.2 DML操作

    #添加数据
    # 语法:put <table>,<rowkey>,<family:column>,<value>,[<timestamp>]
    #如果不写timestamp,则系统默认
    put 'table','id01', 'c_f1:name','111'
    
    #获取数据
    #get: 获取表中一行数据,不能扫描全表
    # 语法:get <table>,<rowkey>,[<family:column>,....]
    get 'table','id01'
    
    #更新数据
    #语法:重新put,put时会覆盖原来的数据
    put 'table','id01', 'c_f1:name','222'
    
    #scan扫描
    # 语法:scan <table> ,{COLUMNS => [ <family:column>,.... ], LIMIT => num}
    #扫描全表,大表操作不可取
    scan 'table'
    #获取表中前两行
    scan 'table', {LIMIT => 2}
    #扫描表中指定列族数据
    scan 'table', {COLUMNS => 'c_f1'}
    #扫描表中执行列族中列的数据
    scan 'table', {COLUMNS => 'c_f2:cert_no'}
    #扫描表中值=222 的数据
    scan 'table', FILTER=>"ValueFilter(=,'name:222')"
    # 筛选行,按照rowkey的范围[STARTROW,STOPROW) 
    scan 'table', {STARTROW =>'id01' , STOPROW => 'id03'}
    
    #删除行中某列数据
    # 语法:delete <table>, <rowkey>, <family:column>
    # 必须指定列名
    # 会删除执行列的所有版本数据
    delete 'table', 'id04',  'c_f2:name'
    
    #删除整行
    # 语法:deleteall <table>, <rowkey>
    deleteall 'table', 'id05'
    
    #清空表数据
    # 语法: truncate <table>
    truncate 'table'
    
    #查询表中有多少行
    # 语法:count <table>, {INTERVAL => intervalNum, CACHE => cacheNum}
    # INTERVAL设置多少行显示一次及对应的rowkey,默认1000;
    # CACHE每次去取的缓存区大小,默认是10,调整该参数可提高查询速度
    #查询表中数据行数
    count 'table'
    #按照2行显示一次,查询
    count 'table', {INTERVAL => 2} 

    4.hbase整合springboot

    4.1pom

            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>1.3.6</version>
            </dependency>

    4.2 application.properties

    hbase.conf.confMaps.'hbase.zookeeper.quorum'=master,slave1,slave2

    4.3 HbaseConfig 自定义配置类

    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.Map;
    
    /**
     * Hbase-Conf配置
     *
     * @Author: suyuan
     */
    @Configuration
    @ConfigurationProperties(prefix = HbaseConfig.CONF_PREFIX)
    public class HbaseConfig {
    
        public static final String CONF_PREFIX = "hbase.conf";
    
        private Map<String,String> confMaps;
    
        public Map<String, String> getconfMaps() {
            return confMaps;
        }
        public void setconfMaps(Map<String, String> confMaps) {
            this.confMaps = confMaps;
        }
    }
    

    4.4 HBaseUtils工具类

    import org.springframework.beans.BeansException;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    import org.springframework.stereotype.Component;
    
    /**
     * Spring的ApplicationContext的持有者,可以用静态方法的方式获取spring容器中的bean
     */
    @Component
    public class SpringContextHolder implements ApplicationContextAware {
    
        private static ApplicationContext applicationContext;
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            SpringContextHolder.applicationContext = applicationContext;
        }
    
        public static ApplicationContext getApplicationContext() {
            assertApplicationContext();
            return applicationContext;
        }
    
        @SuppressWarnings("unchecked")
        public static <T> T getBean(String beanName) {
            assertApplicationContext();
            return (T) applicationContext.getBean(beanName);
        }
    
        public static <T> T getBean(Class<T> requiredType) {
            assertApplicationContext();
            return applicationContext.getBean(requiredType);
        }
    
        private static void assertApplicationContext() {
            if (SpringContextHolder.applicationContext == null) {
                throw new RuntimeException("applicaitonContext属性为null,请检查是否注入了SpringContextHolder!");
            }
        }
    
    }
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.*;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
    import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
    import org.apache.hadoop.hbase.filter.*;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.DependsOn;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StopWatch;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.NavigableMap;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    @DependsOn("springContextHolder")        //控制依赖顺序,保证springContextHolder类在之前已经加载
    @Component
    public class HBaseUtils {
    
        private Logger logger = LoggerFactory.getLogger(this.getClass());
    
        //手动获取hbaseConfig配置类对象
        private static HbaseConfig hbaseConfig = SpringContextHolder.getBean("hbaseConfig");
    
        private static Configuration conf = HBaseConfiguration.create();
        private static ExecutorService pool = Executors.newScheduledThreadPool(20);    //设置连接池
        private static Connection connection = null;
        private static HBaseUtils instance = null;
        private static Admin admin = null;
    
        private HBaseUtils(){
            if(connection == null){
                try {
                    //将hbase配置类中定义的配置加载到连接池中每个连接里
                    Map<String, String> confMap = hbaseConfig.getconfMaps();
                    for (Map.Entry<String,String> confEntry : confMap.entrySet()) {
                        conf.set(confEntry.getKey(), confEntry.getValue());
                    }
                    connection = ConnectionFactory.createConnection(conf, pool);
                    admin = connection.getAdmin();
                } catch (IOException e) {
                    logger.error("HbaseUtils实例初始化失败!错误信息为:" + e.getMessage(), e);
                }
            }
        }
    
        //简单单例方法,如果autowired自动注入就不需要此方法
        public static synchronized HBaseUtils getInstance(){
            if(instance == null){
                instance = new HBaseUtils();
            }
            return instance;
        }
    
    
        /**
         * 创建表
         *
         * @param tableName         表名
         * @param columnFamily      列族(数组)
         */
        public void createTable(String tableName, String[] columnFamily) throws IOException{
            TableName name = TableName.valueOf(tableName);
            //如果存在则删除
            if (admin.tableExists(name)) {
                admin.disableTable(name);
                admin.deleteTable(name);
                logger.error("create htable error! this table {} already exists!", name);
            } else {
                HTableDescriptor desc = new HTableDescriptor(name);
                for (String cf : columnFamily) {
                    desc.addFamily(new HColumnDescriptor(cf));
                }
                admin.createTable(desc);
            }
        }
    
        /**
         * 插入记录(单行单列族-多列多值)
         *
         * @param tableName         表名
         * @param row               行名
         * @param columnFamilys     列族名
         * @param columns           列名(数组)
         * @param values            值(数组)(且需要和列一一对应)
         */
        public void insertRecords(String tableName, String row, String columnFamilys, String[] columns, String[] values) throws IOException {
            TableName name = TableName.valueOf(tableName);
            Table table = connection.getTable(name);
            Put put = new Put(Bytes.toBytes(row));
            for (int i = 0; i < columns.length; i++) {
                put.addColumn(Bytes.toBytes(columnFamilys), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
                table.put(put);
            }
        }
    
        /**
         * 插入记录(单行单列族-单列单值)
         *
         * @param tableName         表名
         * @param row               行名
         * @param columnFamily      列族名
         * @param column            列名
         * @param value             值
         */
        public void insertOneRecord(String tableName, String row, String columnFamily, String column, String value) throws IOException {
            TableName name = TableName.valueOf(tableName);
            Table table = connection.getTable(name);
            Put put = new Put(Bytes.toBytes(row));
            put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
            table.put(put);
        }
    
        /**
         * 删除一行记录
         *
         * @param tablename         表名
         * @param rowkey            行名
         */
        public void deleteRow(String tablename, String rowkey) throws IOException {
            TableName name = TableName.valueOf(tablename);
            Table table = connection.getTable(name);
            Delete d = new Delete(rowkey.getBytes());
            table.delete(d);
        }
    
        /**
         * 删除单行单列族记录
         * @param tablename         表名
         * @param rowkey            行名
         * @param columnFamily      列族名
         */
        public void deleteColumnFamily(String tablename, String rowkey, String columnFamily) throws IOException {
            TableName name = TableName.valueOf(tablename);
            Table table = connection.getTable(name);
            Delete d = new Delete(rowkey.getBytes()).deleteFamily(Bytes.toBytes(columnFamily));
            table.delete(d);
        }
    
        /**
         * 删除单行单列族单列记录
         *
         * @param tablename         表名
         * @param rowkey            行名
         * @param columnFamily      列族名
         * @param column            列名
         */
        public void deleteColumn(String tablename, String rowkey, String columnFamily, String column) throws IOException {
            TableName name = TableName.valueOf(tablename);
            Table table = connection.getTable(name);
            Delete d = new Delete(rowkey.getBytes()).deleteColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
            table.delete(d);
        }
    
    
        /**
         * 查找一行记录
         *
         * @param tablename         表名
         * @param rowKey            行名
         */
        public static String selectRow(String tablename, String rowKey) throws IOException {
            String record = "";
            TableName name=TableName.valueOf(tablename);
            Table table = connection.getTable(name);
            Get g = new Get(rowKey.getBytes());
            Result rs = table.get(g);
            NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = rs.getMap();
            for (Cell cell : rs.rawCells()) {
                StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRow())).append("	")
                        .append(Bytes.toString(cell.getFamily())).append("	")
                        .append(Bytes.toString(cell.getQualifier())).append("	")
                        .append(Bytes.toString(cell.getValue())).append("
    ");
                String str = stringBuffer.toString();
                record += str;
            }
            return record;
        }
    
        /**
         * 查找单行单列族单列记录
         *
         * @param tablename         表名
         * @param rowKey            行名
         * @param columnFamily      列族名
         * @param column            列名
         * @return
         */
        public static String selectValue(String tablename, String rowKey, String columnFamily, String column) throws IOException {
            TableName name=TableName.valueOf(tablename);
            Table table = connection.getTable(name);
            Get g = new Get(rowKey.getBytes());
            g.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
            Result rs = table.get(g);
            return Bytes.toString(rs.value());
        }
    
        /**
         * 查询表中所有行(Scan方式)
         *
         * @param tablename
         * @return
         */
        public String scanAllRecord(String tablename) throws IOException {
            String record = "";
            TableName name=TableName.valueOf(tablename);
            Table table = connection.getTable(name);
            Scan scan = new Scan();
            ResultScanner scanner = table.getScanner(scan);
            try {
                for(Result result : scanner){
                    for (Cell cell : result.rawCells()) {
                        StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRow())).append("	")
                                .append(Bytes.toString(cell.getFamily())).append("	")
                                .append(Bytes.toString(cell.getQualifier())).append("	")
                                .append(Bytes.toString(cell.getValue())).append("
    ");
                        String str = stringBuffer.toString();
                        record += str;
                    }
                }
            } finally {
                if (scanner != null) {
                    scanner.close();
                }
            }
    
            return record;
        }
    
        /**
         * 根据rowkey关键字查询报告记录
         *
         * @param tablename
         * @param rowKeyword
         * @return
         */
        public List scanReportDataByRowKeyword(String tablename, String rowKeyword) throws IOException {
            ArrayList<Object> list = new ArrayList<>();
    
            Table table = connection.getTable(TableName.valueOf(tablename));
            Scan scan = new Scan();
    
            //添加行键过滤器,根据关键字匹配
            RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword));
            scan.setFilter(rowFilter);
    
            ResultScanner scanner = table.getScanner(scan);
            try {
                for (Result result : scanner) {
                    //TODO 此处根据业务来自定义实现
                    list.add(null);
                }
            } finally {
                if (scanner != null) {
                    scanner.close();
                }
            }
    
            return list;
        }
    
        /**
         * 根据rowkey关键字和时间戳范围查询报告记录
         *
         * @param tablename
         * @param rowKeyword
         * @return
         */
        public List scanReportDataByRowKeywordTimestamp(String tablename, String rowKeyword, Long minStamp, Long maxStamp) throws IOException {
            ArrayList<Object> list = new ArrayList<>();
    
            Table table = connection.getTable(TableName.valueOf(tablename));
            Scan scan = new Scan();
            //添加scan的时间范围
            scan.setTimeRange(minStamp, maxStamp);
    
            RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword));
            scan.setFilter(rowFilter);
    
            ResultScanner scanner = table.getScanner(scan);
            try {
                for (Result result : scanner) {
                    //TODO 此处根据业务来自定义实现
                    list.add(null);
                }
            } finally {
                if (scanner != null) {
                    scanner.close();
                }
            }
    
            return list;
        }
    
    
        /**
         * 删除表操作
         *
         * @param tablename
         */
        public void deleteTable(String tablename) throws IOException {
            TableName name=TableName.valueOf(tablename);
            if(admin.tableExists(name)) {
                admin.disableTable(name);
                admin.deleteTable(name);
            }
        }
    
        /**
         * 利用协处理器进行全表count统计
         *
         * @param tablename
         */
        public Long countRowsWithCoprocessor(String tablename) throws Throwable {
            TableName name=TableName.valueOf(tablename);
            HTableDescriptor descriptor = admin.getTableDescriptor(name);
    
            String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
            if (! descriptor.hasCoprocessor(coprocessorClass)) {
                admin.disableTable(name);
                descriptor.addCoprocessor(coprocessorClass);
                admin.modifyTable(name, descriptor);
                admin.enableTable(name);
            }
    
            //计时
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
    
            Scan scan = new Scan();
            AggregationClient aggregationClient = new AggregationClient(conf);
    
            Long count = aggregationClient.rowCount(name, new LongColumnInterpreter(), scan);
    
            stopWatch.stop();
            System.out.println("RowCount:" + count +  ",全表count统计耗时:" + stopWatch.getTotalTimeMillis());
    
            return count;
        }
    
    }
    
    

    4.5 测试用例

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @RequestMapping("/hbase")
    public class HbaseController {
    
        @Autowired
        private HBaseUtils hBaseUtils;
    
        /**
         * 创建表
         * @author suyuan
         * @date 2021/6/22 17:17 
         */
        @RequestMapping("/createTable")
        public String createTable() throws Exception {
            String[] arr = {"name","age"};
            hBaseUtils.createTable("sy:syy",arr);
            return "ok";
        }
    
        /**
         * 删除表
         * @author suyuan
         * @date 2021/6/22 17:18 
         */
        @RequestMapping("/deleteTable")
        public String deleteTable() throws Exception {
            hBaseUtils.deleteTable("syy");
            return "ok";
        }
    
        /**
         * 插入记录(插入同列族数据就是更新)
         * @author suyuan
         * @date 2021/6/22 17:18 
         */
        @RequestMapping("/insertRecords")
        public String insertRecords() throws Exception {
            hBaseUtils.insertRecords("sy:syy","20210622","name", new String[]{"name1","name2"}, new String[]{"111","222"});
            return "ok";
        }
    
        /**
         * 查找一行记录
         * @author suyuan
         * @date 2021/6/22 17:18 
         */
        @RequestMapping("/selectRow")
        public String selectRow() throws Exception {
            String s = hBaseUtils.selectRow("sy:syy", "20210622");
            System.out.println(s);
            return s;
        }
    
    }
    

    本地编写代码需配置:

    host地址

    192.168.10.160  master
    192.168.10.161  slave1
    192.168.10.162  slave2

    集群需要启动

    zookeeper、hadoop、hbase

  • 相关阅读:
    @RequestParam注解使用:Name for argument type [java.lang.String] not available, and parameter name information not found in class file either.
    cglib动态代理导致注解丢失问题及如何修改注解允许被继承
    springboot Autowired BeanNotOfRequiredTypeException
    git根据用户过滤提交记录
    不同包下,相同数据结构的两个类进行转换
    How to use Jackson to deserialise an array of objects
    jooq实践
    java如何寻找main函数对应的类
    Python--matplotlib
    Python 和 Scikit-Learn
  • 原文地址:https://www.cnblogs.com/symkmk123/p/14963789.html
Copyright © 2011-2022 走看看