zoukankan      html  css  js  c++  java
  • Hbase小案例之小新

    1、HBase的namespace介绍

    namespace:名字空间或命名空间,相当于mysql中的database。

    2、namespace的作用

      1)配额管理:限制一个namespace可以使用的资源,包括region和table

      2)命名空间安全管理:提供了另一个层面的多租户安全管理

      3)Region服务器组:一个命名或一张表,可以被固定到一组RegionServers上,从而保证了数据隔离性

    3、namespace的基本操作

      1)列出所有的namespace

    list_namespace
    

      2)创建namespace

    create_namespace 'nametest'  

    3)查看namespace

    describe_namespace 'nametest'

    4)在namespace下创建表

    create 'nametest:testtable', 'fm1' 

    5)查看namespace下的表

    list_namespace_tables 'nametest'  

    6)删除namespace(删除空间的时候,命名空间下的表必须是空,否则先删除表)

    drop_namespace 'nametest'  

    4、Hbase的数据版本的确界以及TTL(数据存活的时间)

    hbase中有版本version 的概念

    hbase中针对版本有一个上界和下届的概念

    1)版本的下届:最小的版本数据,下届的默认值是0,说明最小版本数是1

    2)版本的上界:数据最大的版本,上届的默认值是1。

    TTL:数据的存活周期

    可以设置在某一个列族上或者是设置在一条数据上(rowkey的数据)

    5、通过代码实现版本设定以及数据的TTL

    1)创建maven工程并添加jar包

    2)代码实现

    public class HBaseVersionAndTTL {
        public static void main(String[] args) throws IOException, InterruptedException {
    // 创建数据库的连接        
    Configuration configuration = HBaseConfiguration.create();      configuration.set("hbase.zookeeper.quorum","hadoop01:2181,hadoop02:2181,hadoop03:2181");
    Connection connection = ConnectionFactory.createConnection();
    Admin admin = connection.getAdmin();
    
    // 创建一张表
            if(!admin.tableExists(TableName.valueOf("version_hbase"))){
                HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf("version_hbase"));
                HColumnDescriptor f1 = new HColumnDescriptor("f1");
    // 设置版本
                f1.setMinVersions(3);
                f1.setMaxVersions(5);
                //针对某一个列族下面所有的列设置TTL
                f1.setTimeToLive(30);  // 存活周期
                hTableDescriptor.addFamily(f1);
                admin.createTable(hTableDescriptor);
            }
            Table version_hbase = connection.getTable(TableName.valueOf("version_hbase"));
            Put put = new Put("1".getBytes());
            //针对某一条具体的数据设置TTL
            //put.setTTL(3000);
      // 指定数据的插入时间      put.addColumn("f1".getBytes(),"name".getBytes(),System.currentTimeMillis(),"zhangsan".getBytes());
            version_hbase.put(put);
            Thread.sleep(1000);
            Put put2 = new Put("1".getBytes());
            put2.addColumn("f1".getBytes(),"name".getBytes(),System.currentTimeMillis(),"zhangsan2".getBytes());
            version_hbase.put(put2);
            Get get = new Get("1".getBytes());
            get.setMaxVersions();
            Result result = version_hbase.get(get);
            Cell[] cells = result.rawCells();
            for (Cell cell : cells) {
                System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
            }
            version_hbase.close();
            connection.close();
        }
    }
    

    6、微博案列

    1)三张表

    2)创建maven工程导入包

    3)拷贝配置文件daomaven下

    /export/servers/hbase-1.2.0-cdh5.14.0/conf 这个路径下的三个配置文件,分别是 core-site.xml、hdfs-site.xml、hbase-site.xml三个配置文件,拷贝到maven工程的resources资源目录下

    4)创建命名空间以及表名的定义

     private String WEIBO_CONTENT = "weibo:content";
        private String WEIBO_RELATIONS = "weibo:relations";
        private String WEIBO_CONTENT_EMAILS = "weibo:receive_content_email";
    
        private Connection connection;
        private Admin admin;
    
        @Before
        public void init() throws IOException {
            //构建config对象
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181");
            //数据库的连接
            connection = ConnectionFactory.createConnection(conf);
            admin = connection.getAdmin();
        }
    
        // 创建命名空间
        @Test
        public void createNameSpace() throws IOException {
            // 创建命名空间描述符
            NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create("weibo").addConfiguration("ceator", "wang").build();
            // 创建命名空间
            admin.createNamespace(namespaceDescriptor);
        }
    

    5)创建微博内容表

        // 创建微博content表
        @Test
        public void createTableContent() throws IOException {
            if(!admin.tableExists(TableName.valueOf(WEIBO_CONTENT))){
                // 创建表的描述符
                HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(WEIBO_CONTENT));
                HColumnDescriptor info = new HColumnDescriptor("info");
                info.setMinVersions(1);
                info.setMaxVersions(1);
                info.setBlockCacheEnabled(true); //是否开启缓存
                info.setBlocksize(2048*1024);
                tableDescriptor.addFamily(info);
                admin.createTable(tableDescriptor);
            }
        }

    6)创建用户关系表

    * 创建用户关系表
     * 方法名 createTableRelations
     Table Name    weibo:relations
     RowKey    用户ID
     ColumnFamily  attends、fans
     ColumnLabel   关注用户ID,粉丝用户ID
     ColumnValue   用户ID
     Version   1个版本
    
     */
        // 创建relations表
        @Test
        public void createTableRelations() throws IOException {
            if (!admin.tableExists(TableName.valueOf(WEIBO_RELATIONS))) {
                HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(WEIBO_RELATIONS));
                // 添加列族
                HColumnDescriptor attends = new HColumnDescriptor("attends");
                attends.setMinVersions(1);
                attends.setMaxVersions(1);
                attends.setBlockCacheEnabled(true);
                attends.setBlocksize(2048*1024);
                HColumnDescriptor fans = new HColumnDescriptor("fans");
                fans.setMinVersions(1);
                fans.setMaxVersions(1);
                fans.setBlockCacheEnabled(true);
                fans.setBlocksize(2048*1024);
    
                tableDescriptor.addFamily(attends);
                tableDescriptor.addFamily(fans);
                admin.createTable(tableDescriptor);
            }
        }

    7)创建微博收件箱表

    /**
     * 表结构:
     方法名   createTableReceiveContentEmails
     Table Name    weibo:receive_content_email
     RowKey    用户ID
     ColumnFamily  info
     ColumnLabel   用户ID
     ColumnValue   取微博内容的RowKey
     Version   1000
     */
    
        // 创建email表
        @Test
        public void createTableEmile() throws IOException {
            if(!admin.tableExists(TableName.valueOf(WEIBO_CONTENT_EMAILS))){
    
                HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(WEIBO_CONTENT_EMAILS));
               // 创建列族
                HColumnDescriptor info = new HColumnDescriptor("info");
                info.setMinVersions(1000);
                info.setMaxVersions(1000);
                info.setBlockCacheEnabled(true);
                info.setBlocksize(2018*1024);
                // 把列族加入表中
                tableDescriptor.addFamily(info);
                // 创建表
                admin.createTable(tableDescriptor);
    
            }
        }

    8)发布微博内容

      a、微博内容表中添加1条数据

      b、微博收件箱表对所有粉丝用户添加数据

        // 用户发布微博
        public void publishWeibo(String uid, String content) throws IOException {
            Connection connection = getConnection();
            // 1、将发布的内容保存起来 weibo:connect
            Table weibo_content = connection.getTable(TableName.valueOf(WEIBO_CONTENT));
            // 设定rowkey
            long time = System.currentTimeMillis();
            String rowkey = uid + "_" + time;
            Put put = new Put(rowkey.getBytes());
            put.addColumn("info".getBytes(), "content".getBytes(), time, content.getBytes());
            weibo_content.put(put);
    
            // 2、获取当前的所有fans查询
            Table weibo_relations = connection.getTable(TableName.valueOf(WEIBO_RELATIONS));
            Get get = new Get(uid.getBytes());
            // 只要fans的列族数据
            get.addFamily("fans".getBytes());
            Result result = weibo_relations.get(get);
            // 如果没有fans 直接结束
            if (result.isEmpty()) {
                weibo_content.close();
                weibo_relations.close();
                connection.close();
                return;
            }
            ArrayList<String> uids = new ArrayList<>();
            //要是是fans的uid
            List<Cell> cells = result.listCells();
            for (Cell cell : cells) {
                String fans_uid = Bytes.toString(CellUtil.cloneQualifier(cell));
                uids.add(fans_uid);
            }
            // 3、将用户发布的消息rowkey
            Table weibo_receive_content_emails = connection.getTable(TableName.valueOf(WEIBO_CONTENT_EMAILS));
            for (String fanid : uids) {
                Put put1 = new Put(fanid.getBytes());
                put1.addColumn("info".getBytes(), uid.getBytes(), time, rowkey.getBytes());
                weibo_receive_content_emails.put(put1);
            }
            weibo_content.close();
            weibo_relations.close();
            weibo_receive_content_emails.close();
            connection.close();
        }
    

    9)添加关注用户

        // 添加关注用户
        public void addAttends(String uid, String... attends) throws IOException {
            Connection connection = getConnection();
            Table weibo_relations = connection.getTable(TableName.valueOf(WEIBO_RELATIONS));
            //1、1的用户保存关注的用户
            Put put = new Put(uid.getBytes());
            for (String attend : attends) {
                put.addColumn("attends".getBytes(), attend.getBytes(), attend.getBytes());
            }
            weibo_relations.put(put);
            //2、被关注的用户同时多了一个fans
            for (String attend : attends) {
                Put put1 = new Put(attend.getBytes());
                put1.addColumn("fans".getBytes(), uid.getBytes(), uid.getBytes());
                weibo_relations.put(put1);
            }
            //3、维护当前用户和被关乎用户的微博查看关系 1关注了23m  此时1就可以看到23m发布的消息
            Table weibo_content = connection.getTable(TableName.valueOf(WEIBO_CONTENT));
            Table weibo_content_emails = connection.getTable(TableName.valueOf(WEIBO_CONTENT_EMAILS));
            ArrayList<String> strings = new ArrayList<>();//用于接收微博rowkey
            for (String attend : attends) {
                Scan scan = new Scan();
                //用于检索当前用户发布微博的rowkey
                PrefixFilter prefixFilter = new PrefixFilter((attend + "_").getBytes());
                scan.setFilter(prefixFilter);
                ResultScanner scanner = weibo_content.getScanner(scan);
    
                //遍历获取微博的rowkey
                for (Result result : scanner) {
                    List<Cell> cells = result.listCells();
                    for (Cell cell : cells) {
                        //获取微博rowkey
                        String weiborowkey = Bytes.toString(CellUtil.cloneRow(cell));
    
                        strings.add(weiborowkey);
                    }
                }
            }
    
            //把微博内容的rowkey保存到收件箱中
            //传入的是当前用户的uid
            Put put2 = new Put(uid.getBytes());
            for (String weiborowkey : strings) {
                //2_13235345436   2_13452346436
                String[] split = weiborowkey.split("_");
                put2.addColumn("info".getBytes(), split[0].getBytes(), System.currentTimeMillis(), weiborowkey.getBytes());
                weibo_content_emails.put(put2);
            }
            weibo_content.close();
            weibo_relations.close();
            weibo_content_emails.close();
            connection.close();
        }

    10)取消关注用户

        //7.取消关注用户
        public  void cancelAttends(String uid,String...  cancelattends) throws IOException {
    
            Connection connection = getConnection();
            Table weibo_relations = connection.getTable(TableName.valueOf(WEIBO_RELATIONS));
    
            Delete delete = new Delete(uid.getBytes());
            //1.移除当前用户被关注的用户  1.  取消3 M
            for (String cancelattend : cancelattends) {
                delete.addColumns("attends".getBytes(),cancelattend.getBytes());
            }
            weibo_relations.delete(delete);
    
            //2.被移除的用户同时取消粉丝
            for (String cancelattend : cancelattends) {
                //被取消关注的用户,取消粉丝
                Delete delete1 = new Delete(cancelattend.getBytes());
                delete1.addColumn("fans".getBytes(),uid.getBytes());
                weibo_relations.delete(delete1);
            }
    
            Table weibo_content_emails = connection.getTable(TableName.valueOf(WEIBO_CONTENT_EMAILS));
            //3.删除收件箱中相关的微博rowkey
            for (String cancelattend : cancelattends) {
                Delete delete2 = new Delete(uid.getBytes());
                //删除全部版本
                delete2.addColumns("info".getBytes(),cancelattend.getBytes());
                weibo_content_emails.delete(delete2);
            }
    
            weibo_content_emails.close();
            weibo_relations.close();
            connection.close();
    
        }

    11)拉取关注人的微博消息

     //8.拉取关注人的微博消息
        public void getAttendsContent(String uid) throws IOException {
            Connection connection = getConnection();
            Table weibo_content_emails = connection.getTable(TableName.valueOf(WEIBO_CONTENT_EMAILS));
    
            //从email表检索关注的用户发布的微博rowkey
            Get get = new Get(uid.getBytes());
            get.setMaxVersions(5);
            Result result = weibo_content_emails.get(get);
    
    
            //创建一个get集合用于接收get对象
            List<Get> gets = new ArrayList<Get>();
    
            List<Cell> cells = result.listCells();
            for (Cell cell : cells) {
                //获取微博的rowkey
                byte[] weiborowkey = CellUtil.cloneValue(cell);
                //这个get就是用于在content进行检索数据deget对象
                Get get1 = new Get(weiborowkey);
                gets.add(get1);
            }
    
            //上微博表检索发布的内容
            Table weibo_content = connection.getTable(TableName.valueOf(WEIBO_CONTENT));
            //获取检索数据的每一个结果集
            Result[] results = weibo_content.get(gets);
            for (Result result1 : results) {
                List<Cell> cells1 = result1.listCells();
                for (Cell cell : cells1) {
                    //打印发布的内容
                    System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
    
                }
            }
    
            weibo_content.close();
            weibo_content_emails.close();
            connection.close();
        }
  • 相关阅读:
    [Javascript] Javascript numeric separators
    [RxJS] Extend Promises by Adding Custom Behavior
    [RxJS] Building an RxJS Operator
    [RxJS] Build an Event Combo Observable with RxJS (takeWhile, takeUntil, take, skip)
    [RxJS] Encapsulate complex imperative logic in a simple observable
    Android之Android软键盘的隐藏显示研究
    Android之LogUtil
    Android之使用XMLPull解析xml(二)
    Android之多媒体扫描过程
    Android之在Tab更新两个ListView,让一个listview有按下另个一个listview没有的效果
  • 原文地址:https://www.cnblogs.com/haojia/p/12386214.html
Copyright © 2011-2022 走看看