第一章 需求分析
- 微博内容的浏览,数据库表设计
- 用户社交体现:关注用户,取关用户
- 拉取关注的人的微博内容
第二章 数据库设计
设计成三张表微博内容表、用户关系表和微博收件箱表。
第三章 代码实现
3.1 创建工程
maven工程依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>HBase</artifactId> <groupId>org.example</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>guli-weibo</artifactId> <dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.3.1</version> </dependency> </dependencies> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> </project>
工程文件夹:
3.2 constants包
在这里我们定义的是一些常量,例如是命名空间、表明、列名等,以便后面我们需要使用的时候反复重写,防止写错,也可以进行一个解耦合。
public class Constants { public static Configuration CONFIGURATION = null; // HBase setting 这里不知道为什么要这要写才可以,视频里面没有写这个也可以运行, // 但是我就会报错,说无法连接,希望有大佬可以解释一下。 // 那个set里面第二个放的是你们集群的名称,可能会不一样,看你们自己的命名了。 static { CONFIGURATION = HBaseConfiguration.create(); CONFIGURATION.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104"); } // 命名空间 public static final String NAMESPACE = "weibo"; // weibo内容表 public static final String CONTENT_TABLE = "weibo:content"; public static final String CONTENT_TABLE_CF = "info"; public static final int CONTENT_TABLE_VERSIONS = 1; // 用户关系表 public static final String RELATION_TABLE = "weibo:relation"; // 第一个列族 public static final String RELATION_TABLE_CF1 = "attends"; // 第二个列族 public static final String RELATION_TABLE_CF2 = "fans"; public static final int RELATION_TABLE_VERSIONS = 1; //收件箱表 public static final String INBOX_TABLE = "weibo:inbox"; public static final String INBOX_TABLE_CF = "info"; public static final int INBOX_TABLE_VERSIONS = 2; }
3.3 utils包
这里主要是进行DDL的操作,例如创建命名空间,以及对表之类的操作。
public class HBaseUtil { // 1 创建命名空间 public static void createNameSpace(String namespace) throws IOException { } // 2 判断表是否存在 private static boolean isTableExist(String tableName) throws IOException { } // 3 创建表 public static void createTable(String tableName, int versions, String... cfs) throws IOException { } }
创建命名空间
// 1 创建命名空间 public static void createNameSpace(String namespace) throws IOException { // 1 获取Connection对象 Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION); // 2 获取admin对象 Admin admin = connection.getAdmin(); // 3 构建命名空间描述器 NamespaceDescriptor build = NamespaceDescriptor.create(namespace).build(); // 4 创建命名空间 admin.createNamespace(build); // 5 关闭资源 admin.close(); connection.close(); }
判断表是否存在
// 2 判断表是否存在 private static boolean isTableExist(String tableName) throws IOException { // 1 获取Connection对象 Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION); // 2 获取admin对象 Admin admin = connection.getAdmin(); // 3 判断是否存在 boolean result = admin.tableExists(TableName.valueOf(tableName)); // 4 关闭资源 admin.close(); connection.close(); // 5 返回结果 return result; }
创建表
// 3 创建表 public static void createTable(String tableName, int versions, String... cfs) throws IOException { // 1 判断是否传入了列族信息 if (cfs.length <= 0) { System.out.println("column information is not available."); return ; } // 2 判断表是否存在 if (isTableExist(tableName)) { System.out.println(tableName + "table is exist."); return; } // 3 获取Connection对象 Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION); // 4 获取Admin对象 Admin admin = connection.getAdmin(); // 5 创建表描述器 HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)); // 6 循环添加列族信息 for (String cf : cfs) { HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cf); // 7 设置版本 hColumnDescriptor.setMaxVersions(versions); hTableDescriptor.addFamily(hColumnDescriptor); } // 8 创建表操作 admin.createTable(hTableDescriptor); // 关闭资源 admin.close(); connection.close(); }
3.4 dao包
这里是一些DML的操作,例如一些在表中插入数据,或者是删除数据的操作。
public class HBaseDao { // 1 发布微博 public static void publishWeiBo(String uid, String content) throws IOException { } // 2 关注用户 public static void addAttends(String uid, String... attends) throws IOException { } // 3 取关 public static void deleteAttends(String uid, String... dels) throws IOException { } // 4 获取 public static void getInit(String uid) throws IOException { } // 5 获取某个人的所有微博详情 public static void getWeibo(String uid) throws IOException { } }
发微博功能
这一个功能比较难,因为不仅对于当前发布微博的人的内容表需要进行更新,并且需要对当前这个人的粉丝的收件表也需要更新,因为粉丝需要看到关注的人的最新动态。所以这里需要对两张表,多个人进行操作。
// 1 发布微博 public static void publishWeiBo(String uid, String content) throws IOException { // 1 获取Connection对象 Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION); // 2 get the admin object Admin admin = connection.getAdmin(); // 第一部分:操纵微博内容表 // 1.获取微博内容表对象 Table conTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE)); // 2. 获取当前时间戳 long ts = System.currentTimeMillis(); // 3 获取Rowkey String rowKey = uid + "_" + ts; // 4 创建Put对象 Put conPut = new Put(Bytes.toBytes(rowKey)); // 5 给Put对象赋值 conPut.addColumn(Bytes.toBytes(Constants.CONTENT_TABLE_CF), Bytes.toBytes("content"), Bytes.toBytes(content)); // 6 执行插入数据操作 conTable.put(conPut); // 第二部分:操纵微博收件箱表 // 1 获取用户关系表对象 Table relaTable = connection.getTable(TableName.valueOf(Constants.RELATION_TABLE)); // 2 获取当前发布微博人的fans列族数据 Get get = new Get(Bytes.toBytes(uid)); // 指定relation关系表的第二个列族 get.addFamily(Bytes.toBytes(Constants.RELATION_TABLE_CF2)); Result result = relaTable.get(get); // 3 创建一个集合,用于存放微博内容表的put对象 ArrayList<Put> inboxPuts = new ArrayList<>(); // 4 遍历粉丝 for (Cell cell : result.rawCells()) { // 5 构建微博收件箱表的Put对象 Put inboxPut = new Put(CellUtil.cloneQualifier(cell)); // 6 给收件箱表的Put对象赋值 inboxPut.addColumn(Bytes.toBytes(Constants.INBOX_TABLE_CF), Bytes.toBytes(uid), Bytes.toBytes(rowKey)); // 7 将收件箱表的Put对象存入集合 inboxPuts.add(inboxPut); } // 8 判断是否有粉丝 if (inboxPuts.size() > 0) { // 获取收件箱表对象 Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE)); // 执行收件箱表数据插入操作 inboxTable.put(inboxPuts); // 关闭收件箱表 inboxTable.close(); } // 关闭资源 relaTable.close(); conTable.close(); connection.close(); }
关注功能
关注某个人之后,需要获得这个人的最近发布的微博,以及在微博关系表当中需要添加新的关系。
// 2 关注用户 public static void addAttends(String uid, String... attends) throws IOException { // 校验是否添加了待关注的人 if (attends.length <= 0) { System.out.println("please choose attention person"); return ; } // 获取Connection对象 Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION); // 第一部分:操作用户关系表 // 1 获取用户关系表对象 Table relaTable = connection.getTable(TableName.valueOf(Constants.RELATION_TABLE)); // 2 创建一个集合,用于存放用户关系表的Put对象 ArrayList<Put> relaPuts = new ArrayList<>(); // 3 创建操作者的Put对象 Put uidPut = new Put(Bytes.toBytes(uid)); // 4 循环创建被关注者的Put对象 for (String attend : attends) { // 5 给操作者的Put对象赋值 uidPut.addColumn(Bytes.toBytes(Constants.RELATION_TABLE_CF1), Bytes.toBytes(attend), Bytes.toBytes(attend)); // 6 创建被关注者的Put对象 Put attendPut = new Put(Bytes.toBytes(attend)); // 7 给被关注者的Put对象赋值 attendPut.addColumn(Bytes.toBytes(Constants.RELATION_TABLE_CF2), Bytes.toBytes(uid), Bytes.toBytes(uid)); // 8 将被关注者的Put对象放入集合 relaPuts.add(attendPut); } // 9 将操作者的Put对象添加至集合 relaPuts.add(uidPut); // 10 执行用户关系表的插入数据操作 relaTable.put(relaPuts); // 第二部分:操作收件箱表 // 1 获取微博内容表对象 Table contTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE)); // 2 创建收件箱表的Put对象 Put inboxPut = new Put(Bytes.toBytes(uid)); // 3 循环attends,获取每个被关注者的近期发布的微博 for (String attend : attends) { // 4 获取当前被关注者的近期发布的微博(scan)->集合 // 这里获取只能用scan,因为get要指定rowKey,而在关注时是不知道rowKey的时间戳 // 不能全表扫描 这里使用startRow 和stopRow Scan scan = new Scan(Bytes.toBytes(attend + "_"), Bytes.toBytes(attend + "|")); ResultScanner resultScanner = contTable.getScanner(scan); // define a time stamp long ts = System.currentTimeMillis(); // 5 对获取的值进行遍历 // 按rowKey的比较规则时间戳最小的微博最先出来,先放入列中,最后才能拿到最新的 // 这里其实有问题 如果关注的人发布太多的微博,只为了得到最新的三天数据。而去插入大量无用的数据 // 可以在发布微博函数中反转时间戳解决 for (Result result : resultScanner) { // 6 给收件箱表的Put对象赋值 inboxPut.addColumn(Bytes.toBytes(Constants.INBOX_TABLE_CF), Bytes.toBytes(attend), ts++, result.getRow()); } } // 7 判断当前的Put对象是否为空 if (!inboxPut.isEmpty()) { // 获取收件箱表对象 Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE)); // 插入数据 inboxTable.put(inboxPut); // 关闭收件箱表连接 inboxTable.close(); } // 关闭资源 relaTable.close(); contTable.close(); connection.close(); }
取消关注
// 3 取关 public static void deleteAttends(String uid, String... dels) throws IOException { if (dels.length <= 0) { System.out.println("please choose deletion person"); return; } // 获取Connection对象 Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION); // 第一部分:操作用户关系表 // 1 获取用户关系表对象 Table relatTable = connection.getTable(TableName.valueOf(Constants.RELATION_TABLE)); // 2 创建一个集合,用户存放用户关系表的Delete对象 ArrayList<Delete> relatDelete = new ArrayList<>(); // 3 创建操作者的Delete对象 Delete uidDelete = new Delete(Bytes.toBytes(uid)); // 4 循环创建被取关者的Delete对象 for (String del : dels) { // 5 给操作者的Delete对象赋值 uidDelete.addColumns(Bytes.toBytes(Constants.RELATION_TABLE_CF1), Bytes.toBytes(del)); // 6 创建被取关者的Delete对象 Delete delDelte = new Delete(Bytes.toBytes(del)); // 7 给被取关者的Delete对象赋值 delDelte.addColumns(Bytes.toBytes(Constants.RELATION_TABLE_CF2), Bytes.toBytes(uid)); // 8 将被取关者的Delete对象添加至集合 relatDelete.add(delDelte); } // 9 将操作者的Delete对象添加至集合 relatDelete.add(uidDelete); // 10 执行用户关系表的删除操作 relatTable.delete(relatDelete);
// 第二部分:操作收件箱表 // 1 获取收件箱表对象 Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE)); // 2 创建操作者的Delete对象 Delete inboxDelete = new Delete(Bytes.toBytes(uid)); // 3 给操作者的Delete对象赋值 for (String del : dels) { inboxDelete.addColumns(Bytes.toBytes(Constants.INBOX_TABLE_CF), Bytes.toBytes(del)); } // 4 执行收件箱表的删除操作 inboxTable.delete(inboxDelete); // 关闭资源 relatTable.close(); inboxTable.close(); connection.close(); }
获得用户初始页
// 4 获取 public static void getInit(String uid) throws IOException { // 1 获取Connection对象 Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION); // 2 获取收件箱表对象 Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE)); // 3 获取微博内容表对象 Table conTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE)); // 4 创建收件箱表Get对象,并获取数据(设置最大版本) Get inboxGet = new Get(Bytes.toBytes(uid)); inboxGet.setMaxVersions(); Result result = inboxTable.get(inboxGet); // 5 遍历获取的数据 for (Cell cell : result.rawCells()) { // 6 构建微博内容表Get对象 Get conGet = new Get(CellUtil.cloneValue(cell)); // 7 获取Get对象的数据内容 Result conResult = conTable.get(conGet); // 8 解析内容并打印 for (Cell conCell : conResult.rawCells()) { System.out.println("RK:" + Bytes.toString(CellUtil.cloneRow(conCell)) + ", CF:" + Bytes.toString(CellUtil.cloneFamily(conCell)) + ", CN:" + Bytes.toString(CellUtil.cloneQualifier(conCell)) + ", Value:" + Bytes.toString(CellUtil.cloneValue(conCell))); } } // 9 关闭资源 inboxTable.close(); conTable.close(); connection.close(); }
获得用户全部微博内容
// 5 获取某个人的所有微博详情 public static void getWeibo(String uid) throws IOException { // 1 获取Connection对象 Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION); // 2 获取微博内容表对象 Table conTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE)); // 构建Scan对象 Scan scan = new Scan(); // 构建过滤器 RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(uid + "_")); scan.setFilter(rowFilter); // 4 获取数据 ResultScanner resultScanner = conTable.getScanner(scan); // 5 解析数据并打印 for (Result result : resultScanner) { for (Cell cell : result.rawCells()) { System.out.println("RK:" + Bytes.toString(CellUtil.cloneRow(cell)) + ", CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) + ", CN:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + ", Value:" + Bytes.toString(CellUtil.cloneValue(cell))); } } // 6 关闭资源 conTable.close(); connection.close(); }
3.5 test包 测试
public class TestWeiBo { public static void init() { try { // 创建命名空间 HBaseUtil.createNameSpace(Constants.NAMESPACE); // 创建微博内容表 HBaseUtil.createTable(Constants.CONTENT_TABLE, Constants.CONTENT_TABLE_VERSIONS, Constants.CONTENT_TABLE_CF); // 创建用户关系表 HBaseUtil.createTable(Constants.RELATION_TABLE, Constants.RELATION_TABLE_VERSIONS, Constants.RELATION_TABLE_CF1, Constants.RELATION_TABLE_CF2); // 创建收件箱表 HBaseUtil.createTable(Constants.INBOX_TABLE, Constants.INBOX_TABLE_VERSIONS, Constants.INBOX_TABLE_CF); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException, InterruptedException { // 初始化 init(); System.out.println("init done"); // 1001 发布微博 HBaseDao.publishWeiBo("1001","这是1001的微博,Hello World"); System.out.println("publish done"); // 1002 关注 1001, 1003 HBaseDao.addAttends("1002", "1001", "1003"); System.out.println("attend done"); // 获取1002 初始化页面 HBaseDao.getInit("1002"); System.out.println("*********************111**********************"); // 1003发布三条微博 同时1001发布两条 HBaseDao.publishWeiBo("1003", "谁说的赶紧下课!!"); Thread.sleep(10); HBaseDao.publishWeiBo("1001", "我没说话!!"); Thread.sleep(10); HBaseDao.publishWeiBo("1003", "谁说的!!"); Thread.sleep(10); HBaseDao.publishWeiBo("1001", "反正飞机是下线了!!"); Thread.sleep(10); HBaseDao.publishWeiBo("1003", "你们爱咋咋地!!"); // 获取1002 初始页面 HBaseDao.getInit("1002"); System.out.println("*********************222***********************"); // 1002 取关 1003 HBaseDao.deleteAttends("1002", "1003"); //获取1002初始化页面 HBaseDao.getInit("1002"); System.out.println("*********************333***********************"); // 1002再次关注1003 HBaseDao.addAttends("1002", "1003"); // 获取1002初始化页面 HBaseDao.getInit("1002"); System.out.println("*********************444***********************"); // get 1001 detail initial page HBaseDao.getWeibo("1001"); System.out.println("*********************555***********************"); } }