zoukankan      html  css  js  c++  java
  • Hbase介绍及操作

    1. Hbase概述

    1.1 Hbase是什么

    • HBase是建立在HDFS之上的分布式面向列的数据库;属于KV结构数据,原生不支持标准SQL。它是一个Apache的开源项目,是横向扩展的。
    • HBase可以提供快速随机访问海量结构化数据。它利用了Hadoop的文件系统(HDFS)提供的容错能力。
    • HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库,是HBase基于列的而不是基于行的模式。

    1.2 Hbase数据单元

    RowKey:是Byte array,是表中每条记录的“主键”,按照字典顺序排序,方便快速查找,Rowkey的设计非常重要;
    Column Family:列族,拥有一个名称(string),包含一个或者多个相关列;
    Column:属于某一个columnfamily,familyName:columnName,每条记录可动态添加;
    Version Number:类型为Long,默认值是系统时间戳Timestamp,可由用户自定义;用于标记同一份数据的不同版本。
    Value(Cell):Byte array;

    2. Hbase的架构

    2.1 Hbase的架构组成

    HBase采用Master/Slave架构搭建集群,它隶属于Hadoop生态系统,由以下类型节点组成:HMaster节点、HRegionServer节点、ZooKeeper集群,而在底层,它将数据存储于HDFS中,因而涉及到HDFS的NameNode、DataNode等,总体结构如下:

    各组件说明:
    Client

    • 使用HBase RPC机制与HMaster和HRegionServer进行通信;
    • Client与HMaster进行通信进行管理类操作;
    • Client与HRegionServer进行数据读写类操作;

    HMaster

    • HMaster没有单点问题,HBase中可以启动多个HMaster,通过Zookeeper保证总有一个Master在运行。

    HMaster主要负责Table和Region的管理工作

    • 管理用户对表的增删改查操作;
    • 管理HRegionServer的负载均衡,调整Region分布;
    • Region Split后,负责新Region的分布;
    • 在HRegionServer停机后,负责失效HRegionServer上Region 的迁移;

    HRegionServer
    HBase中最核心的模块;

    • 维护region,处理对这些region的IO请求;
    • Regionserver负责切分在运行过程中变得过大的region;

    HRegion

    • HBase使用RowKey将表水平切割成多个HRegion,从HMaster的角度,每个HRegion都纪录了它的StartKey和EndKey(第一个HRegion的StartKey为空,最后一个HRegion的EndKey为空),由于RowKey是排序的,因而Client可以通过HMaster快速的定位每个RowKey在哪个HRegion中。

    2.2查看hbase的web界面

    http://master:16010/

    3.hbase shell操作

    3.1 DDL操作

    #开启hbase shell
    hbase shell
    #查看hbase状态
    status
    #查看hbase版本
    version
    #创建命名空间
    create_namespace '命名空间名'
    #显示所有命名空间
    list_namespace
    #删除命名空间, 在删除一个命名空间时,该命名空间不能包含任何的表,否则会报错
    drop_namespace '命名空间名'
    #创建默认命名空间的表
    create '表名称', '列族名称1','列族名称2','列族名称N'
    #创建带有命名空间的表
    create '命名空间:表名称', '列族名称1','列族名称2','列族名称N'
    #列出所有表
    list
    #获得表的描述
    describe '表名'
    #删除table 表的 列族名称1 列族
    alter 'table',{NAME=>'列族名称1',METHOD=>'delete'}
    #删除多个列族
    alter 'table', {NAME => '列族名称1', METHOD => 'delete'},{NAME => '列族名称2', METHOD => 'delete'}
    #先把表下线
    disable '表名'
    #再drop表
    drop '表名'

    3.2 DML操作

    #添加数据
    # 语法:put <table>,<rowkey>,<family:column>,<value>,[<timestamp>]
    #如果不写timestamp,则系统默认
    put 'table','id01', 'c_f1:name','111'
    
    #获取数据
    #get: 获取表中一行数据,不能扫描全表
    # 语法:get <table>,<rowkey>,[<family:column>,....]
    get 'table','id01'
    
    #更新数据
    #语法:重新put,put时会覆盖原来的数据
    put 'table','id01', 'c_f1:name','222'
    
    #scan扫描
    # 语法:scan <table> ,{COLUMNS => [ <family:column>,.... ], LIMIT => num}
    #扫描全表,大表操作不可取
    scan 'table'
    #获取表中前两行
    scan 'table', {LIMIT => 2}
    #扫描表中指定列族数据
    scan 'table', {COLUMNS => 'c_f1'}
    #扫描表中执行列族中列的数据
    scan 'table', {COLUMNS => 'c_f2:cert_no'}
    #扫描表中值=222 的数据
    scan 'table', FILTER=>"ValueFilter(=,'name:222')"
    # 筛选行,按照rowkey的范围[STARTROW,STOPROW) 
    scan 'table', {STARTROW =>'id01' , STOPROW => 'id03'}
    
    #删除行中某列数据
    # 语法:delete <table>, <rowkey>, <family:column>
    # 必须指定列名
    # 会删除执行列的所有版本数据
    delete 'table', 'id04',  'c_f2:name'
    
    #删除整行
    # 语法:deleteall <table>, <rowkey>
    deleteall 'table', 'id05'
    
    #清空表数据
    # 语法: truncate <table>
    truncate 'table'
    
    #查询表中有多少行
    # 语法:count <table>, {INTERVAL => intervalNum, CACHE => cacheNum}
    # INTERVAL设置多少行显示一次及对应的rowkey,默认1000;
    # CACHE每次去取的缓存区大小,默认是10,调整该参数可提高查询速度
    #查询表中数据行数
    count 'table'
    #按照2行显示一次,查询
    count 'table', {INTERVAL => 2} 

    4.hbase整合springboot

    4.1pom

            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>1.3.6</version>
            </dependency>

    4.2 application.properties

    hbase.conf.confMaps.'hbase.zookeeper.quorum'=master,slave1,slave2

    4.3 HbaseConfig 自定义配置类

    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.Map;
    
    /**
     * Hbase-Conf配置
     *
     * @Author: suyuan
     */
    @Configuration
    @ConfigurationProperties(prefix = HbaseConfig.CONF_PREFIX)
    public class HbaseConfig {
    
        public static final String CONF_PREFIX = "hbase.conf";
    
        private Map<String,String> confMaps;
    
        public Map<String, String> getconfMaps() {
            return confMaps;
        }
        public void setconfMaps(Map<String, String> confMaps) {
            this.confMaps = confMaps;
        }
    }
    

    4.4 HBaseUtils工具类

    import org.springframework.beans.BeansException;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    import org.springframework.stereotype.Component;
    
    /**
     * Spring的ApplicationContext的持有者,可以用静态方法的方式获取spring容器中的bean
     */
    @Component
    public class SpringContextHolder implements ApplicationContextAware {
    
        private static ApplicationContext applicationContext;
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            SpringContextHolder.applicationContext = applicationContext;
        }
    
        public static ApplicationContext getApplicationContext() {
            assertApplicationContext();
            return applicationContext;
        }
    
        @SuppressWarnings("unchecked")
        public static <T> T getBean(String beanName) {
            assertApplicationContext();
            return (T) applicationContext.getBean(beanName);
        }
    
        public static <T> T getBean(Class<T> requiredType) {
            assertApplicationContext();
            return applicationContext.getBean(requiredType);
        }
    
        private static void assertApplicationContext() {
            if (SpringContextHolder.applicationContext == null) {
                throw new RuntimeException("applicaitonContext属性为null,请检查是否注入了SpringContextHolder!");
            }
        }
    
    }
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.*;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
    import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
    import org.apache.hadoop.hbase.filter.*;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.DependsOn;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StopWatch;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.NavigableMap;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    @DependsOn("springContextHolder")        //控制依赖顺序,保证springContextHolder类在之前已经加载
    @Component
    public class HBaseUtils {
    
        private Logger logger = LoggerFactory.getLogger(this.getClass());
    
        //手动获取hbaseConfig配置类对象
        private static HbaseConfig hbaseConfig = SpringContextHolder.getBean("hbaseConfig");
    
        private static Configuration conf = HBaseConfiguration.create();
        private static ExecutorService pool = Executors.newScheduledThreadPool(20);    //设置连接池
        private static Connection connection = null;
        private static HBaseUtils instance = null;
        private static Admin admin = null;
    
        private HBaseUtils(){
            if(connection == null){
                try {
                    //将hbase配置类中定义的配置加载到连接池中每个连接里
                    Map<String, String> confMap = hbaseConfig.getconfMaps();
                    for (Map.Entry<String,String> confEntry : confMap.entrySet()) {
                        conf.set(confEntry.getKey(), confEntry.getValue());
                    }
                    connection = ConnectionFactory.createConnection(conf, pool);
                    admin = connection.getAdmin();
                } catch (IOException e) {
                    logger.error("HbaseUtils实例初始化失败!错误信息为:" + e.getMessage(), e);
                }
            }
        }
    
        //简单单例方法,如果autowired自动注入就不需要此方法
        public static synchronized HBaseUtils getInstance(){
            if(instance == null){
                instance = new HBaseUtils();
            }
            return instance;
        }
    
    
        /**
         * 创建表
         *
         * @param tableName         表名
         * @param columnFamily      列族(数组)
         */
        public void createTable(String tableName, String[] columnFamily) throws IOException{
            TableName name = TableName.valueOf(tableName);
            //如果存在则删除
            if (admin.tableExists(name)) {
                admin.disableTable(name);
                admin.deleteTable(name);
                logger.error("create htable error! this table {} already exists!", name);
            } else {
                HTableDescriptor desc = new HTableDescriptor(name);
                for (String cf : columnFamily) {
                    desc.addFamily(new HColumnDescriptor(cf));
                }
                admin.createTable(desc);
            }
        }
    
        /**
         * 插入记录(单行单列族-多列多值)
         *
         * @param tableName         表名
         * @param row               行名
         * @param columnFamilys     列族名
         * @param columns           列名(数组)
         * @param values            值(数组)(且需要和列一一对应)
         */
        public void insertRecords(String tableName, String row, String columnFamilys, String[] columns, String[] values) throws IOException {
            TableName name = TableName.valueOf(tableName);
            Table table = connection.getTable(name);
            Put put = new Put(Bytes.toBytes(row));
            for (int i = 0; i < columns.length; i++) {
                put.addColumn(Bytes.toBytes(columnFamilys), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
                table.put(put);
            }
        }
    
        /**
         * 插入记录(单行单列族-单列单值)
         *
         * @param tableName         表名
         * @param row               行名
         * @param columnFamily      列族名
         * @param column            列名
         * @param value             值
         */
        public void insertOneRecord(String tableName, String row, String columnFamily, String column, String value) throws IOException {
            TableName name = TableName.valueOf(tableName);
            Table table = connection.getTable(name);
            Put put = new Put(Bytes.toBytes(row));
            put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
            table.put(put);
        }
    
        /**
         * 删除一行记录
         *
         * @param tablename         表名
         * @param rowkey            行名
         */
        public void deleteRow(String tablename, String rowkey) throws IOException {
            TableName name = TableName.valueOf(tablename);
            Table table = connection.getTable(name);
            Delete d = new Delete(rowkey.getBytes());
            table.delete(d);
        }
    
        /**
         * 删除单行单列族记录
         * @param tablename         表名
         * @param rowkey            行名
         * @param columnFamily      列族名
         */
        public void deleteColumnFamily(String tablename, String rowkey, String columnFamily) throws IOException {
            TableName name = TableName.valueOf(tablename);
            Table table = connection.getTable(name);
            Delete d = new Delete(rowkey.getBytes()).deleteFamily(Bytes.toBytes(columnFamily));
            table.delete(d);
        }
    
        /**
         * 删除单行单列族单列记录
         *
         * @param tablename         表名
         * @param rowkey            行名
         * @param columnFamily      列族名
         * @param column            列名
         */
        public void deleteColumn(String tablename, String rowkey, String columnFamily, String column) throws IOException {
            TableName name = TableName.valueOf(tablename);
            Table table = connection.getTable(name);
            Delete d = new Delete(rowkey.getBytes()).deleteColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
            table.delete(d);
        }
    
    
        /**
         * 查找一行记录
         *
         * @param tablename         表名
         * @param rowKey            行名
         */
        public static String selectRow(String tablename, String rowKey) throws IOException {
            String record = "";
            TableName name=TableName.valueOf(tablename);
            Table table = connection.getTable(name);
            Get g = new Get(rowKey.getBytes());
            Result rs = table.get(g);
            NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = rs.getMap();
            for (Cell cell : rs.rawCells()) {
                StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRow())).append("	")
                        .append(Bytes.toString(cell.getFamily())).append("	")
                        .append(Bytes.toString(cell.getQualifier())).append("	")
                        .append(Bytes.toString(cell.getValue())).append("
    ");
                String str = stringBuffer.toString();
                record += str;
            }
            return record;
        }
    
        /**
         * 查找单行单列族单列记录
         *
         * @param tablename         表名
         * @param rowKey            行名
         * @param columnFamily      列族名
         * @param column            列名
         * @return
         */
        public static String selectValue(String tablename, String rowKey, String columnFamily, String column) throws IOException {
            TableName name=TableName.valueOf(tablename);
            Table table = connection.getTable(name);
            Get g = new Get(rowKey.getBytes());
            g.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
            Result rs = table.get(g);
            return Bytes.toString(rs.value());
        }
    
        /**
         * 查询表中所有行(Scan方式)
         *
         * @param tablename
         * @return
         */
        public String scanAllRecord(String tablename) throws IOException {
            String record = "";
            TableName name=TableName.valueOf(tablename);
            Table table = connection.getTable(name);
            Scan scan = new Scan();
            ResultScanner scanner = table.getScanner(scan);
            try {
                for(Result result : scanner){
                    for (Cell cell : result.rawCells()) {
                        StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRow())).append("	")
                                .append(Bytes.toString(cell.getFamily())).append("	")
                                .append(Bytes.toString(cell.getQualifier())).append("	")
                                .append(Bytes.toString(cell.getValue())).append("
    ");
                        String str = stringBuffer.toString();
                        record += str;
                    }
                }
            } finally {
                if (scanner != null) {
                    scanner.close();
                }
            }
    
            return record;
        }
    
        /**
         * 根据rowkey关键字查询报告记录
         *
         * @param tablename
         * @param rowKeyword
         * @return
         */
        public List scanReportDataByRowKeyword(String tablename, String rowKeyword) throws IOException {
            ArrayList<Object> list = new ArrayList<>();
    
            Table table = connection.getTable(TableName.valueOf(tablename));
            Scan scan = new Scan();
    
            //添加行键过滤器,根据关键字匹配
            RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword));
            scan.setFilter(rowFilter);
    
            ResultScanner scanner = table.getScanner(scan);
            try {
                for (Result result : scanner) {
                    //TODO 此处根据业务来自定义实现
                    list.add(null);
                }
            } finally {
                if (scanner != null) {
                    scanner.close();
                }
            }
    
            return list;
        }
    
        /**
         * 根据rowkey关键字和时间戳范围查询报告记录
         *
         * @param tablename
         * @param rowKeyword
         * @return
         */
        public List scanReportDataByRowKeywordTimestamp(String tablename, String rowKeyword, Long minStamp, Long maxStamp) throws IOException {
            ArrayList<Object> list = new ArrayList<>();
    
            Table table = connection.getTable(TableName.valueOf(tablename));
            Scan scan = new Scan();
            //添加scan的时间范围
            scan.setTimeRange(minStamp, maxStamp);
    
            RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword));
            scan.setFilter(rowFilter);
    
            ResultScanner scanner = table.getScanner(scan);
            try {
                for (Result result : scanner) {
                    //TODO 此处根据业务来自定义实现
                    list.add(null);
                }
            } finally {
                if (scanner != null) {
                    scanner.close();
                }
            }
    
            return list;
        }
    
    
        /**
         * 删除表操作
         *
         * @param tablename
         */
        public void deleteTable(String tablename) throws IOException {
            TableName name=TableName.valueOf(tablename);
            if(admin.tableExists(name)) {
                admin.disableTable(name);
                admin.deleteTable(name);
            }
        }
    
        /**
         * 利用协处理器进行全表count统计
         *
         * @param tablename
         */
        public Long countRowsWithCoprocessor(String tablename) throws Throwable {
            TableName name=TableName.valueOf(tablename);
            HTableDescriptor descriptor = admin.getTableDescriptor(name);
    
            String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
            if (! descriptor.hasCoprocessor(coprocessorClass)) {
                admin.disableTable(name);
                descriptor.addCoprocessor(coprocessorClass);
                admin.modifyTable(name, descriptor);
                admin.enableTable(name);
            }
    
            //计时
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
    
            Scan scan = new Scan();
            AggregationClient aggregationClient = new AggregationClient(conf);
    
            Long count = aggregationClient.rowCount(name, new LongColumnInterpreter(), scan);
    
            stopWatch.stop();
            System.out.println("RowCount:" + count +  ",全表count统计耗时:" + stopWatch.getTotalTimeMillis());
    
            return count;
        }
    
    }
    
    

    4.5 测试用例

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @RequestMapping("/hbase")
    public class HbaseController {
    
        @Autowired
        private HBaseUtils hBaseUtils;
    
        /**
         * 创建表
         * @author suyuan
         * @date 2021/6/22 17:17 
         */
        @RequestMapping("/createTable")
        public String createTable() throws Exception {
            String[] arr = {"name","age"};
            hBaseUtils.createTable("sy:syy",arr);
            return "ok";
        }
    
        /**
         * 删除表
         * @author suyuan
         * @date 2021/6/22 17:18 
         */
        @RequestMapping("/deleteTable")
        public String deleteTable() throws Exception {
            hBaseUtils.deleteTable("syy");
            return "ok";
        }
    
        /**
         * 插入记录(插入同列族数据就是更新)
         * @author suyuan
         * @date 2021/6/22 17:18 
         */
        @RequestMapping("/insertRecords")
        public String insertRecords() throws Exception {
            hBaseUtils.insertRecords("sy:syy","20210622","name", new String[]{"name1","name2"}, new String[]{"111","222"});
            return "ok";
        }
    
        /**
         * 查找一行记录
         * @author suyuan
         * @date 2021/6/22 17:18 
         */
        @RequestMapping("/selectRow")
        public String selectRow() throws Exception {
            String s = hBaseUtils.selectRow("sy:syy", "20210622");
            System.out.println(s);
            return s;
        }
    
    }
    

    本地编写代码需配置:

    host地址

    192.168.10.160  master
    192.168.10.161  slave1
    192.168.10.162  slave2

    集群需要启动

    zookeeper、hadoop、hbase

  • 相关阅读:
    php二维数组排序
    重学C语言 -- printf,scanf
    php调试利器 -- xdebug
    composer php依赖管理工具
    现代php开发
    php新特性--持续更新
    2016年书单
    jenkins集成gitlab实现自动合并
    etcd安装
    nginx 日志切割
  • 原文地址:https://www.cnblogs.com/symkmk123/p/14963789.html
Copyright © 2011-2022 走看看