zoukankan      html  css  js  c++  java
  • HBase分布式数据库——谷粒微博实战

    第一章 需求分析

    1. 微博内容的浏览,数据库表设计
    2. 用户社交体现:关注用户,取关用户
    3. 拉取关注的人的微博内容

    第二章 数据库设计

     设计成三张表微博内容表、用户关系表和微博收件箱表。

     

    第三章 代码实现

    3.1 创建工程

    maven工程依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>HBase</artifactId>
            <groupId>org.example</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>guli-weibo</artifactId>
        <dependencies>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-server</artifactId>
                <version>1.3.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>1.3.1</version>
            </dependency>
        </dependencies>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
    </project>

    工程文件夹:

    3.2 constants包

    在这里我们定义的是一些常量,例如是命名空间、表明、列名等,以便后面我们需要使用的时候反复重写,防止写错,也可以进行一个解耦合。

    public class Constants {
        public static Configuration CONFIGURATION = null;
        // HBase setting 这里不知道为什么要这要写才可以,视频里面没有写这个也可以运行,
        // 但是我就会报错,说无法连接,希望有大佬可以解释一下。
        // 那个set里面第二个放的是你们集群的名称,可能会不一样,看你们自己的命名了。
        static {
            CONFIGURATION = HBaseConfiguration.create();
            CONFIGURATION.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
        }
        // 命名空间
        public static final String NAMESPACE = "weibo";
        // weibo内容表
        public static final String CONTENT_TABLE = "weibo:content";
        public static final String CONTENT_TABLE_CF = "info";
        public static final int CONTENT_TABLE_VERSIONS = 1;
        // 用户关系表
        public static final String RELATION_TABLE = "weibo:relation";
        // 第一个列族
        public static final String RELATION_TABLE_CF1 = "attends";
        // 第二个列族
        public static final String RELATION_TABLE_CF2 = "fans";
        public static final int RELATION_TABLE_VERSIONS = 1;
        //收件箱表
        public static final String INBOX_TABLE = "weibo:inbox";
        public static final String INBOX_TABLE_CF = "info";
        public static final int INBOX_TABLE_VERSIONS = 2;
    }

    3.3 utils包

    这里主要是进行DDL的操作,例如创建命名空间,以及对表之类的操作。

    public class HBaseUtil {
        // 1 创建命名空间
        public static void createNameSpace(String namespace) throws IOException {
        }
        // 2 判断表是否存在
        private static boolean isTableExist(String tableName) throws IOException {
        }
        // 3 创建表
        public static void createTable(String tableName, int versions, String... cfs) throws IOException {
        }
    }

    创建命名空间

    // 1 创建命名空间
        public static void createNameSpace(String namespace) throws IOException {
            // 1 获取Connection对象
            Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
            // 2 获取admin对象
            Admin admin = connection.getAdmin();
            // 3 构建命名空间描述器
            NamespaceDescriptor build = NamespaceDescriptor.create(namespace).build();
            // 4 创建命名空间
            admin.createNamespace(build);
            // 5 关闭资源
            admin.close();
            connection.close();
        }

    判断表是否存在

     // 2 判断表是否存在
        private static boolean isTableExist(String tableName) throws IOException {
            // 1 获取Connection对象
            Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
            // 2 获取admin对象
            Admin admin = connection.getAdmin();
            // 3 判断是否存在
            boolean result = admin.tableExists(TableName.valueOf(tableName));
            // 4 关闭资源
            admin.close();
            connection.close();
            // 5 返回结果
            return result;
        }

    创建表

     // 3 创建表
        public static void createTable(String tableName, int versions, String... cfs) throws IOException {
            // 1 判断是否传入了列族信息
            if (cfs.length <= 0) {
                System.out.println("column information is not available.");
                return ;
            }
            // 2 判断表是否存在
            if (isTableExist(tableName)) {
                System.out.println(tableName + "table is exist.");
                return;
            }
            // 3 获取Connection对象
            Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
            // 4 获取Admin对象
            Admin admin = connection.getAdmin();
            // 5 创建表描述器
            HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
            // 6 循环添加列族信息
            for (String cf : cfs) {
                HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cf);
                // 7 设置版本
                hColumnDescriptor.setMaxVersions(versions);
                hTableDescriptor.addFamily(hColumnDescriptor);
            }
            // 8 创建表操作
            admin.createTable(hTableDescriptor);
            // 关闭资源
            admin.close();
            connection.close();
        }

    3.4 dao包

    这里是一些DML的操作,例如一些在表中插入数据,或者是删除数据的操作。

    public class HBaseDao {
        // 1 发布微博
        public static void publishWeiBo(String uid, String content) throws IOException {
        }
        // 2 关注用户
        public static void addAttends(String uid, String... attends) throws IOException {
        }
        // 3 取关
        public static void deleteAttends(String uid, String... dels) throws IOException {
        }
        // 4 获取
        public static void getInit(String uid) throws IOException {
        }
        // 5 获取某个人的所有微博详情
        public static void getWeibo(String uid) throws IOException {
        }
    }

    发微博功能

    这一个功能比较难,因为不仅对于当前发布微博的人的内容表需要进行更新,并且需要对当前这个人的粉丝的收件表也需要更新,因为粉丝需要看到关注的人的最新动态。所以这里需要对两张表,多个人进行操作。

    // 1 发布微博
        public static void publishWeiBo(String uid, String content) throws IOException {
            // 1 获取Connection对象
            Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
            // 2 get the admin object
            Admin admin = connection.getAdmin();
            // 第一部分:操纵微博内容表
            // 1.获取微博内容表对象
            Table conTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));
            // 2. 获取当前时间戳
            long ts = System.currentTimeMillis();
            // 3 获取Rowkey
            String rowKey = uid + "_" + ts;
            // 4 创建Put对象
            Put conPut = new Put(Bytes.toBytes(rowKey));
            // 5 给Put对象赋值
            conPut.addColumn(Bytes.toBytes(Constants.CONTENT_TABLE_CF), Bytes.toBytes("content"), Bytes.toBytes(content));
            // 6 执行插入数据操作
            conTable.put(conPut);
            // 第二部分:操纵微博收件箱表
            // 1 获取用户关系表对象
            Table relaTable = connection.getTable(TableName.valueOf(Constants.RELATION_TABLE));
            // 2 获取当前发布微博人的fans列族数据
            Get get = new Get(Bytes.toBytes(uid));
            // 指定relation关系表的第二个列族
            get.addFamily(Bytes.toBytes(Constants.RELATION_TABLE_CF2));
            Result result = relaTable.get(get);
            // 3 创建一个集合,用于存放微博内容表的put对象
            ArrayList<Put> inboxPuts = new ArrayList<>();
            // 4 遍历粉丝
            for (Cell cell : result.rawCells()) {
                // 5 构建微博收件箱表的Put对象
                Put inboxPut = new Put(CellUtil.cloneQualifier(cell));
                // 6 给收件箱表的Put对象赋值
                inboxPut.addColumn(Bytes.toBytes(Constants.INBOX_TABLE_CF), Bytes.toBytes(uid), Bytes.toBytes(rowKey));
                // 7 将收件箱表的Put对象存入集合
                inboxPuts.add(inboxPut);
            }
            // 8 判断是否有粉丝
            if (inboxPuts.size() > 0) {
                // 获取收件箱表对象
                Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));
                // 执行收件箱表数据插入操作
                inboxTable.put(inboxPuts);
                // 关闭收件箱表
                inboxTable.close();
            }
            // 关闭资源
            relaTable.close();
            conTable.close();
            connection.close();
        }

    关注功能

    关注某个人之后,需要获得这个人的最近发布的微博,以及在微博关系表当中需要添加新的关系。

      // 2 关注用户
        public static void addAttends(String uid, String... attends) throws IOException {
            // 校验是否添加了待关注的人
            if (attends.length <= 0) {
                System.out.println("please choose attention person");
                return ;
            }
            // 获取Connection对象
            Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
            // 第一部分:操作用户关系表
            // 1 获取用户关系表对象
            Table relaTable = connection.getTable(TableName.valueOf(Constants.RELATION_TABLE));
            // 2 创建一个集合,用于存放用户关系表的Put对象
            ArrayList<Put> relaPuts  = new ArrayList<>();
            // 3 创建操作者的Put对象
            Put uidPut = new Put(Bytes.toBytes(uid));
            // 4 循环创建被关注者的Put对象
            for (String attend : attends) {
                // 5 给操作者的Put对象赋值
                uidPut.addColumn(Bytes.toBytes(Constants.RELATION_TABLE_CF1), Bytes.toBytes(attend), Bytes.toBytes(attend));
                // 6 创建被关注者的Put对象
                Put attendPut = new Put(Bytes.toBytes(attend));
                // 7 给被关注者的Put对象赋值
                attendPut.addColumn(Bytes.toBytes(Constants.RELATION_TABLE_CF2), Bytes.toBytes(uid), Bytes.toBytes(uid));
                // 8 将被关注者的Put对象放入集合
                relaPuts.add(attendPut);
            }
            // 9 将操作者的Put对象添加至集合
            relaPuts.add(uidPut);
            // 10 执行用户关系表的插入数据操作
            relaTable.put(relaPuts);
            // 第二部分:操作收件箱表
            // 1 获取微博内容表对象
            Table contTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));
            // 2 创建收件箱表的Put对象
            Put inboxPut = new Put(Bytes.toBytes(uid));
            // 3 循环attends,获取每个被关注者的近期发布的微博
            for (String attend : attends) {
                // 4 获取当前被关注者的近期发布的微博(scan)->集合
                // 这里获取只能用scan,因为get要指定rowKey,而在关注时是不知道rowKey的时间戳
                // 不能全表扫描 这里使用startRow 和stopRow
                Scan scan = new Scan(Bytes.toBytes(attend + "_"), Bytes.toBytes(attend + "|"));
                ResultScanner resultScanner = contTable.getScanner(scan);
                // define a time stamp
                long ts = System.currentTimeMillis();
                // 5 对获取的值进行遍历
                // 按rowKey的比较规则时间戳最小的微博最先出来,先放入列中,最后才能拿到最新的
                // 这里其实有问题 如果关注的人发布太多的微博,只为了得到最新的三天数据。而去插入大量无用的数据
                // 可以在发布微博函数中反转时间戳解决
                for (Result result : resultScanner) {
                    // 6 给收件箱表的Put对象赋值
                    inboxPut.addColumn(Bytes.toBytes(Constants.INBOX_TABLE_CF), Bytes.toBytes(attend), ts++, result.getRow());
                }
            }
            // 7 判断当前的Put对象是否为空
            if (!inboxPut.isEmpty()) {
                // 获取收件箱表对象
                Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));
                // 插入数据
                inboxTable.put(inboxPut);
                // 关闭收件箱表连接
                inboxTable.close();
            }
            // 关闭资源
            relaTable.close();
            contTable.close();
            connection.close();
        }

    取消关注

    // 3 取关
        public static void deleteAttends(String uid, String... dels) throws IOException {
    
            if (dels.length <= 0) {
                System.out.println("please choose deletion person");
                return;
            }
            // 获取Connection对象
            Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
            // 第一部分:操作用户关系表
            // 1 获取用户关系表对象
            Table relatTable = connection.getTable(TableName.valueOf(Constants.RELATION_TABLE));
            // 2 创建一个集合,用户存放用户关系表的Delete对象
            ArrayList<Delete> relatDelete = new ArrayList<>();
            // 3 创建操作者的Delete对象
            Delete uidDelete = new Delete(Bytes.toBytes(uid));
            // 4 循环创建被取关者的Delete对象
            for (String del : dels) {
                // 5 给操作者的Delete对象赋值
                uidDelete.addColumns(Bytes.toBytes(Constants.RELATION_TABLE_CF1), Bytes.toBytes(del));
                // 6 创建被取关者的Delete对象
                Delete delDelte = new Delete(Bytes.toBytes(del));
                // 7 给被取关者的Delete对象赋值
                delDelte.addColumns(Bytes.toBytes(Constants.RELATION_TABLE_CF2), Bytes.toBytes(uid));
                // 8 将被取关者的Delete对象添加至集合
                relatDelete.add(delDelte);
            }
            // 9 将操作者的Delete对象添加至集合
            relatDelete.add(uidDelete);
            // 10 执行用户关系表的删除操作
            relatTable.delete(relatDelete);
    // 第二部分:操作收件箱表 // 1 获取收件箱表对象 Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE)); // 2 创建操作者的Delete对象 Delete inboxDelete = new Delete(Bytes.toBytes(uid)); // 3 给操作者的Delete对象赋值 for (String del : dels) { inboxDelete.addColumns(Bytes.toBytes(Constants.INBOX_TABLE_CF), Bytes.toBytes(del)); } // 4 执行收件箱表的删除操作 inboxTable.delete(inboxDelete); // 关闭资源 relatTable.close(); inboxTable.close(); connection.close(); }

    获得用户初始页

     // 4 获取
        public static void getInit(String uid) throws IOException {
            // 1 获取Connection对象
            Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
            // 2 获取收件箱表对象
            Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));
            // 3 获取微博内容表对象
            Table conTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));
            // 4 创建收件箱表Get对象,并获取数据(设置最大版本)
            Get inboxGet = new Get(Bytes.toBytes(uid));
            inboxGet.setMaxVersions();
            Result result = inboxTable.get(inboxGet);
            // 5 遍历获取的数据
            for (Cell cell : result.rawCells()) {
    
                // 6 构建微博内容表Get对象
                Get conGet = new Get(CellUtil.cloneValue(cell));
    
                // 7 获取Get对象的数据内容
                Result conResult = conTable.get(conGet);
    
                // 8 解析内容并打印
                for (Cell conCell : conResult.rawCells()) {
                    System.out.println("RK:" + Bytes.toString(CellUtil.cloneRow(conCell)) +
                            ", CF:" + Bytes.toString(CellUtil.cloneFamily(conCell)) +
                            ", CN:" + Bytes.toString(CellUtil.cloneQualifier(conCell)) +
                            ", Value:" + Bytes.toString(CellUtil.cloneValue(conCell)));
                }
            }
            // 9 关闭资源
            inboxTable.close();
            conTable.close();
            connection.close();
        }

    获得用户全部微博内容

     // 5 获取某个人的所有微博详情
        public static void getWeibo(String uid) throws IOException {
            // 1 获取Connection对象
            Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
            // 2 获取微博内容表对象
            Table conTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));
            // 构建Scan对象
            Scan scan = new Scan();
            // 构建过滤器
            RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(uid + "_"));
            scan.setFilter(rowFilter);
            // 4 获取数据
            ResultScanner resultScanner = conTable.getScanner(scan);
            // 5 解析数据并打印
            for (Result result : resultScanner) {
                for (Cell cell : result.rawCells()) {
                    System.out.println("RK:" + Bytes.toString(CellUtil.cloneRow(cell)) +
                            ", CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) +
                            ", CN:" + Bytes.toString(CellUtil.cloneQualifier(cell)) +
                            ", Value:" + Bytes.toString(CellUtil.cloneValue(cell)));
                }
            }
            // 6 关闭资源
            conTable.close();
            connection.close();
        }

    3.5 test包 测试

    public class TestWeiBo {
    
        public static void init() {
    
            try {
                // 创建命名空间
                HBaseUtil.createNameSpace(Constants.NAMESPACE);
                // 创建微博内容表
                HBaseUtil.createTable(Constants.CONTENT_TABLE, Constants.CONTENT_TABLE_VERSIONS, Constants.CONTENT_TABLE_CF);
                // 创建用户关系表
                HBaseUtil.createTable(Constants.RELATION_TABLE, Constants.RELATION_TABLE_VERSIONS,
                        Constants.RELATION_TABLE_CF1, Constants.RELATION_TABLE_CF2);
                // 创建收件箱表
                HBaseUtil.createTable(Constants.INBOX_TABLE, Constants.INBOX_TABLE_VERSIONS, Constants.INBOX_TABLE_CF);
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) throws IOException, InterruptedException {
            // 初始化
            init();
            System.out.println("init done");
    
            // 1001 发布微博
            HBaseDao.publishWeiBo("1001","这是1001的微博,Hello World");
            System.out.println("publish done");
    
            // 1002 关注 1001, 1003
            HBaseDao.addAttends("1002", "1001", "1003");
            System.out.println("attend done");
    
            // 获取1002 初始化页面
            HBaseDao.getInit("1002");
            System.out.println("*********************111**********************");
    
            // 1003发布三条微博 同时1001发布两条
            HBaseDao.publishWeiBo("1003", "谁说的赶紧下课!!");
            Thread.sleep(10);
            HBaseDao.publishWeiBo("1001", "我没说话!!");
            Thread.sleep(10);
            HBaseDao.publishWeiBo("1003", "谁说的!!");
            Thread.sleep(10);
            HBaseDao.publishWeiBo("1001", "反正飞机是下线了!!");
            Thread.sleep(10);
            HBaseDao.publishWeiBo("1003", "你们爱咋咋地!!");
    
            // 获取1002 初始页面
            HBaseDao.getInit("1002");
            System.out.println("*********************222***********************");
            // 1002 取关 1003
            HBaseDao.deleteAttends("1002", "1003");
            //获取1002初始化页面
            HBaseDao.getInit("1002");
            System.out.println("*********************333***********************");
            // 1002再次关注1003
            HBaseDao.addAttends("1002", "1003");
            // 获取1002初始化页面
            HBaseDao.getInit("1002");
            System.out.println("*********************444***********************");
            // get 1001 detail initial page
            HBaseDao.getWeibo("1001");
            System.out.println("*********************555***********************");
        }
    }
    作者:王陸

    -------------------------------------------

    个性签名:罔谈彼短,靡持己长。做一个谦逊爱学的人!

    本站使用「署名 4.0 国际」创作共享协议,转载请在文章明显位置注明作者及出处。鉴于博主处于考研复习期间,有什么问题请在评论区中提出,博主尽可能当天回复,加微信好友请注明原因

  • 相关阅读:
    HDFS源码分析(六)-----租约
    YARN源码分析(一)-----ApplicationMaster
    YARN源码分析(一)-----ApplicationMaster
    YARN源码分析(一)-----ApplicationMaster
    YARN源码分析(二)-----ResourceManager中的NM节点管理
    YARN源码分析(二)-----ResourceManager中的NM节点管理
    Confluence 6 如何备份和恢复
    Confluence 6 那些文件需要备份
    Confluence 6 确定一个生产系统备份方案
    Confluence 6 生产环境备份策略
  • 原文地址:https://www.cnblogs.com/wkfvawl/p/15772759.html
Copyright © 2011-2022 走看看