zoukankan      html  css  js  c++  java
  • 流式大数据计算实践(5)----HBase使用&SpringBoot集成

    一、前言

    1、上文中我们搭建好了一套HBase集群环境,这一文我们学习一下HBase的基本操作和客户端API的使用

    二、shell操作

    先通过命令进入HBase的命令行操作

    /work/soft/hbase-1.2.2/bin/hbase shell

    1、建表

    create 'test', 'cf'

    (1)以上命令是建立一个test表,里面有一个列族cf

    (2)与RDS不同,HBase的列不是必须的,当向列族中插入一个单元格数据时,才有了列

    2、查看所有表

    list

    3、查看表属性

    describe 'test'

    4、增加列族

    alter 'test', 'cf2'

    5、插入数据

    put 'test', 'row1', 'cf:name', 'jack'

    (1)命令解释:向test表中的row1行插入列族cf,列名name的数据jack

    6、查询行数据

    scan 'test', {STARTROW => 'row3'}
    scan 'test', {ENDROW => 'row4'}

    (1)命令解释:查找test表中rowkey大于等于row3的数据

    (2)命令解释:查找test表中rowkey小于row4的数据(不包括row4)

    7、查询单元格数据

    get 'test', 'row7', 'cf:name'

    8、删除数据

    delete 'test', 'row4', 'cf:name'

    (1)命令解释:删除test表中row4行的cf:name列的单元格数据

    三、客户端API

    正常开发中操作HBase多数情况下通过客户端API操作,我们这里使用Java来操作,jdk要求至少1.7以上,编译器我这里用的是IntelliJ IDEA

    (1)新建一个maven工程

    (2)打开pom文件,引入HBase的依赖

    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>2.1.1</version>
    </dependency>

    (3)将HBase的相关配置文件引入到我们的maven项目中,拷贝HBase目录下的hbase-site.xml和Hadoop目录下的core-site.xml,将两个文件复制到src/main/resources目录下

    (4)记得将前文中虚拟机的IP和hostname映射配置到写代码这台机器的hosts文件中(比如win7的hosts目录为C:WindowsSystem32driversetc)

    (5)新建一个类,编写CRUD的示例代码,下面代码用了jdk1.7的一个语法糖:try-with-resources,在try()里面声明的对象,会自动帮你调用对象的close方法来关闭对象,不用手动调用close(),非常方便

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.*;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.io.compress.Compression;
    import org.apache.hadoop.hbase.util.Bytes;
    
    import java.io.IOException;
    import java.net.URISyntaxException;
    
    public class HelloHBase
    {
        public static void main(String[] args) throws URISyntaxException
        {
            // 加载HBase的配置
            Configuration configuration = HBaseConfiguration.create();
    
            // 读取配置文件
            configuration.addResource(new Path(ClassLoader.getSystemResource("hbase-site.xml").toURI()));
            configuration.addResource(new Path(ClassLoader.getSystemResource("core-site.xml").toURI()));
    
            try (// 创建一个HBase连接
                 Connection connection = ConnectionFactory.createConnection(configuration);
                 // 获得执行操作的管理接口
                 Admin admin = connection.getAdmin();)
            {
                // 新建一个表名为mytable的表
                TableName tableName = TableName.valueOf("mytable");
                HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
    
                // 新建一个列族名为mycf的列族
                HColumnDescriptor mycf = new HColumnDescriptor("mycf");
                // 将列族添加到表中
                tableDescriptor.addFamily(mycf);
                // 执行建表操作
                createOrOverwrite(admin, tableDescriptor);
    
                // 设置列族的压缩方式为GZ
                mycf.setCompactionCompressionType(Compression.Algorithm.GZ);
                // 设置最大版本数量(ALL_VERSIONS实际上就是Integer.MAX_VALUE)
                mycf.setMaxVersions(HConstants.ALL_VERSIONS);
                // 列族更新到表中
                tableDescriptor.modifyFamily(mycf);
                // 执行更新操作
                admin.modifyTable(tableName, tableDescriptor);
    
                // 新增一个列族
                HColumnDescriptor hColumnDescriptor = new HColumnDescriptor("newcf");
                hColumnDescriptor.setCompactionCompressionType(Compression.Algorithm.GZ);
                hColumnDescriptor.setMaxVersions(HConstants.ALL_VERSIONS);
                // 执行新增操作
                admin.addColumnFamily(tableName, hColumnDescriptor);
    
                // 获取表对象
                Table table = connection.getTable(tableName);
    
                // 创建一个put请求,用于添加数据或者更新数据
                Put put = new Put(Bytes.toBytes("row1"));
                put.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("name"), Bytes.toBytes("jack"));
                table.put(put);
    
                // 创建一个append请求,用于在数据后面添加内容
                Append append = new Append(Bytes.toBytes("row1"));
                append.add(Bytes.toBytes("mycf"), Bytes.toBytes("name"), Bytes.toBytes("son"));
                table.append(append);
    
                // 创建一个long类型的列
                Put put1 = new Put(Bytes.toBytes("row2"));
                put1.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("age"), Bytes.toBytes(6L));
                // 创建一个增值请求,将值增加10L
                Increment increment = new Increment(Bytes.toBytes("row2"));
                increment.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("age"), 10L);
                table.increment(increment);
    
                // 创建一个查询请求,查询一行数据
                Get get = new Get(Bytes.toBytes("row1"));
                // 由于HBase的一行可能非常大,所以限定要取出的列族
                get.addFamily(Bytes.toBytes("mycf"));
                // 创建一个结果请求
                Result result = table.get(get);
                // 从查询结果中取出name列,然后打印(这里默认取最新版本的值,如果要取其他版本要使用Cell对象)
                byte[] name = result.getValue(Bytes.toBytes("mycf"), Bytes.toBytes("name"));
                System.out.println(Bytes.toString(name));
    
                // 创建一个扫描请求,查询多行数据
                Scan scan = new Scan(Bytes.toBytes("row1"));
                // 设置扫描器的缓存数量,遍历数据时不用发多次请求,默认100,适当的缓存可以提高性能
                scan.setCaching(150);
                // 创建扫描结果,这个时候不会真正从HBase查询数据,下面的遍历才是去查询
                ResultScanner resultScanner = table.getScanner(scan);
                for (Result r : resultScanner)
                {
                    String data = Bytes.toString(r.getValue(Bytes.toBytes("mycf"), Bytes.toBytes("name")));
                    System.out.println(data);
                }
                // 使用完毕关闭
                resultScanner.close();
    
                // 创建一个删除请求
                Delete delete = new Delete(Bytes.toBytes("row2"));
                // 可以自定义一些筛选条件
                delete.addFamily(Bytes.toBytes("age"));
                table.delete(delete);
    
                // 停用表
                admin.disableTable(tableName);
                // 删除列族
                admin.deleteColumnFamily(tableName, "mycf".getBytes());
                // 删除表
                admin.deleteTable(tableName);
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
            System.out.println("ok");
        }
    
        public static void createOrOverwrite(Admin admin, HTableDescriptor table) throws IOException
        {
            // 获取table名
            TableName tableName = table.getTableName();
            // 判断table是否存在,如果存在则先停用并删除
            if (admin.tableExists(tableName))
            {
                // 停用表
                admin.disableTable(tableName);
                // 删除表
                admin.deleteTable(tableName);
            }
            // 创建表
            admin.createTable(table);
        }
    }

     四、API的高级用法

    上一章介绍了API的基本使用方法,这一章总结一些高级用法

    1、过滤器:通过get或者scan查找数据时,经常需要加入一些条件来查找

    (1)值过滤器:相当于传统sql的where column like '%jack%',但是会对所有的列都做过滤,如果需要对单个列过滤,可以使用SingleColumnValueFilter,如果需要查询值相等的过滤器,可以使用BinaryComparator

    CompareFilter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("jack"));
    scan.setFilter(filter);

    (2)分页过滤器:相当于传统sql的limit,但是不能指定起始页码,所以需要自己记录最后一个row key,并通过scan.setStartRow()设置,在做分页时有个小技巧,如果你通过scan.setStartRow()设置最后一个row key时,下一页的数据依然会包含上一页的最后一个数据,所以你可以将最后一个row key的末尾加一个0,就可以不包含最后一个数据了,因为row key是按照字典顺序排序的

    Filter filter1 = new PageFilter(10L);
    scan.setFilter(filter1);

    (3)过滤器列表:用于组合多个过滤器,实现复杂一些的查询场景,注意这个过滤器列表是有顺序的,FilterList的第一个参数用来指定多个条件的连接方式(and、or),MUST_PASS_ALL相当于and连接,MUST_PASS_ONE相当于or连接

    FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
    filterList.addFilter(filter);
    filterList.addFilter(filter1);
    scan.setFilter(filterList);

    (4)还有一些其他的过滤器,使用方法大同小异,比如行键过滤器、列过滤器、单元格过滤器,甚至可以自定义过滤器,其他高级用法可以等用到再看

    五、SpringBoot集成

    1、后台我们用SpringCloud的微服务搭建,本章是用SpringBoot集成HBase环境,SpringBoot项目的搭建非常简单

    2、首先将HBase的相关配置文件引入到我们已经搭建好的的SpringBoot项目中,拷贝HBase目录下的hbase-site.xml和Hadoop目录下的core-site.xml、hdfs-site.xml,将三个文件复制到src/main/resources目录下

    3、编写一个Java配置文件来集成HBase环境

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.hadoop.hbase.HbaseTemplate;
    
    import java.net.URISyntaxException;
    
    @Configuration
    public class HBaseConfig
    {
        @Bean
        public HbaseTemplate hbaseTemplate()
        {
            HbaseTemplate hbaseTemplate = new HbaseTemplate();
            hbaseTemplate.setConfiguration(getConfiguration());
            hbaseTemplate.setAutoFlush(true);
            return hbaseTemplate;
        }
    
        private org.apache.hadoop.conf.Configuration getConfiguration()
        {
            try
            {
                org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
                configuration.addResource(new Path(ClassLoader.getSystemResource("hdfs-site.xml").toURI()));
                configuration.addResource(new Path(ClassLoader.getSystemResource("core-site.xml").toURI()));
                configuration.addResource(new Path(ClassLoader.getSystemResource("hbase-site.xml").toURI()));
                return configuration;
            }
            catch (URISyntaxException e)
            {
                e.printStackTrace();
            }
            return null;
        }
    }

    4、上面的配置中将HBase操作的对象注入到Spring中,所以当我们需要操作HBase时,直接使用HbaseTemplate即可,下例中是将一条数据插入到HBase中,可以看出HbaseTemplate高度封装了CRUD,使用起来更加简单方便

    @Service
    public class HBaseService implements IHBaseService
    {
        @Autowired
        private HbaseTemplate hbaseTemplate;
    
        @Override
        public void saveDeviceHeartbeat(String uuid, JSONObject heartObject)
        {
            hbaseTemplate.put("mytable", "row1", "mycf", "uuid", Bytes.toBytes(uuid));
        }
    }
  • 相关阅读:
    【Mysql】Mysql在Linux操作系统下在线安装
    【Mysql】Mysql实战:分页查询、(批量)添加、修改、(批量)删除
    【Nginx】Ngnix在Linux操作系统下的安装及搭建
    【Linux】Shell-解压/压缩、软件安装(jdk、tomcat)
    【Linux】用户创建修改切换、文件权限管理
    【Linux】文件操作命令、管道命令、文件编辑命令(VI)
    kafka学习 回顾以及新知识
    Scale Up 和 Scale Out存储架构
    计算机五层网络模型--回顾
    kafka学习(一)
  • 原文地址:https://www.cnblogs.com/orange911/p/9999767.html
Copyright © 2011-2022 走看看