zoukankan      html  css  js  c++  java
  • Hbase 项目

     需求分析

    1) 微博内容的浏览,数据库表设计
    2) 用户社交体现:关注用户,取关用户
    3) 拉取关注的人的微博内容

    表结构

    代码实现

    1) 创建命名空间以及表名的定义
    2) 创建微博内容表
    3) 创建用户关系表
    4) 创建用户微博内容接收邮件表
    5) 发布微博内容
    6) 添加关注用户
    7) 移除(取关)用户
    8) 获取关注的人的微博内容
    9) 测试

    项目结构

    pom.xml 文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.example</groupId>
        <artifactId>hbase-pro</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>2.8.2</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.4.10</version>
            </dependency>
    
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>RELEASE</version>
            </dependency>
    
            <dependency>
                <groupId>org.jsoup</groupId>
                <artifactId>jsoup</artifactId>
                <version>1.11.3</version>
            </dependency>
    
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>2.4.4</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.4</version>
            </dependency>
    
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.70</version>
            </dependency>
    
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>compile</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>1.3.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-server</artifactId>
                <version>1.3.1</version>
            </dependency>
    
    
        </dependencies>
    
    </project>

     1. 常量类 Constants 

    package deng.com.constants;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    
    public class Constants {
        // HBase 的配置信息
        public static final Configuration CONFIGURATION = HBaseConfiguration.create();
        static {
            CONFIGURATION.set("hbase.zookeeper.quorum", "192.168.10.102");
            CONFIGURATION.set("hbase.zookeeper.property.clientPort", "2181");
        }
        // 命名空间
        public static final String NAMESPACE="weibo";
        // 微博内容表
        public static final String CONTENT_TABLE="weibo:content";
        public static final String CONTENT_TABLE_CF="info";
        public static final int CONTENT_TABLE_VERSIONS=1;
        // 用户关系表
        public static final String RELATION_TABLE="weibo:relation";
        public static final String RELATION_TABLE_CF1="attends"; // 关注
        public static final String RELATION_TABLE_CF2="fans";  // 粉丝
        public static final int RELATION_TABLE_VERSIONS=1;
        // 收件箱表
        public static final String INBOX_TABLE="weibo:info";
        public static final String INBOX_TABLE_CF="info";
        public static final int INBOX_TABLE_VERSIONS=2;
    
    }

    2. HBaseUtils 类(创建表,判断表是否存在等)

    package deng.com.utils;
    
    import deng.com.constants.Constants;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.NamespaceDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Admin;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    
    import java.io.IOException;
    
    public class HBaseUtils {
    
        //创建命名空间
        public static void createNameSpace(String nameSpace) throws IOException {
            // 1. 获取连接对象
            Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
            // 2. 获取admin
            Admin admin = connection.getAdmin();
            // 3. 构建命名空间描述器
            NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(nameSpace).build();
            // 4. 创建命名空间
            admin.createNamespace(namespaceDescriptor);
            // 5. 关闭资源
            admin.close();
            connection.close();
        }
    
        // 2. 判断表是否存在
        private static boolean isTableExist(String tableName) throws IOException {
            // 1. 获取连接对象
            Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
            // 2. 获取admin
            Admin admin = connection.getAdmin();
            // 3.判读是否存在
            boolean b = admin.tableExists(TableName.valueOf(tableName));
            // 4. 关闭资源
            admin.close();
            connection.close();
            return b;
        }
    
        // 3. 创建表
        public static void createTable(String tableName, int versions, String... cf) throws IOException {
            // 判读是否传入了列族信息
            if (cf.length <= 0) {
                System.out.println("请传入列族信息");
                return;
            }
    
            // 判断表是否存在
            if (isTableExist(tableName)) {
                System.out.println(tableName + "表已存在");
                return;
            }
    
            // 1. 获取连接对象
            Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
    //        Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
            // 2. 获取admin
            Admin admin = connection.getAdmin();
            // 3. 创建表描述器
            HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
            // 4. 循环添加列族信息
            for (String s : cf) {
                // 获取列族描述器
                HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(s);
                // 设置版本
                hColumnDescriptor.setMaxVersions(versions);
                hTableDescriptor.addFamily(hColumnDescriptor);
    
            }
    
            // 创建表
            admin.createTable(hTableDescriptor);
            admin.close();
            connection.close();
        }
    }

    3. HbaseDao类(业务操作)

    package deng.com.dao;
    
    /*
     * 1.发布微博
     * 2. 删除微博
     * 3. 关注用户
     * 4. 取关用户
     * 5.获取用户微博详情
     * 6.获取用户的初始页面
     */
    
    import com.sun.xml.internal.bind.v2.runtime.reflect.opt.Const;
    import deng.com.constants.Constants;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.HConstants;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.filter.CompareFilter;
    import org.apache.hadoop.hbase.filter.Filter;
    import org.apache.hadoop.hbase.filter.RowFilter;
    import org.apache.hadoop.hbase.filter.SubstringComparator;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec;
    
    import java.io.IOException;
    import java.util.ArrayList;
    
    public class HBaseDao {
        // 1. 发布微博
        public static void publishWeiBo(String uuid, String content) throws IOException {
            // 获取Connection 对象
            Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
            // 第一部分 操作微博内容表
    
            // 获取微博内容表对象
            Table conTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));
            // content 表的rowkey 是用户id+时间戳
            // 获取当前时间
            long ts = System.currentTimeMillis();
            // 拼接rowKey
            String rowKey = uuid + "_" + ts;
            // 创建Put 对象
            Put conPut = new Put(Bytes.toBytes(rowKey));
            // 给put 对象赋值
            conPut.addColumn(Bytes.toBytes(Constants.CONTENT_TABLE_CF), Bytes.toBytes("content"), Bytes.toBytes(content));
    
            // 插入数据
            conTable.put(conPut);
    
            // 第二部分 操作微博收件箱表
            // 1. 获取用户关系表对象
            Table relationTable = connection.getTable(TableName.valueOf(Constants.RELATION_TABLE));
            // 2. 获取当前发布微博人的fans列族数据
            Get get = new Get(Bytes.toBytes(uuid));
            get.addFamily(Bytes.toBytes(Constants.RELATION_TABLE_CF2)); // 指定fans 那个列族
            Result result = relationTable.get(get);
            // 创建一个集合用于存放微博内容表Put对象
            ArrayList<Put> inboxPuts = new ArrayList<>();
            Cell[] cells = result.rawCells();
            // 遍历粉丝
            for (Cell cell : cells) {
                // 构建微博收件箱表的put对象
                Put inboxPut = new Put(CellUtil.cloneQualifier(cell));
                // 给收件箱的putd对象赋值
                inboxPut.addColumn(Bytes.toBytes(Constants.INBOX_TABLE_CF), Bytes.toBytes(uuid), Bytes.toBytes(rowKey));
                // 收件箱表的put对象存入表中
                inboxPuts.add(inboxPut);
    
            }
            // 判断是否有粉丝
            if (inboxPuts.size() > 0) {
                // 获取收件箱表对象
                Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));
                //执行收件箱表数据插入
                inboxTable.put(inboxPuts);
                // 关闭收件箱表
                inboxTable.close();
            }
            // 关闭资源
            relationTable.close();
            conTable.close();
            connection.close();
    
        }
    
        public static void addAttends(String uuid, String... attends) throws IOException {
            // 校验
            if (attends.length <= 0) {
                System.out.println("请选择关注的人!");
                return;
            }
            // 第一部分:操作用户关系表
            //1. 获取用户关系表对象
            Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
            Table relaTable = connection.getTable(TableName.valueOf(Constants.RELATION_TABLE));
    
            //2. 创建一个集合用于存放用户关系表对象
            ArrayList<Put> relaPuts = new ArrayList<>();
            //3. 创建操作者的Put对象
            Put uuidPut = new Put(Bytes.toBytes(uuid));
            //4. 循环创建被关注者的put 对象
            for (String attend : attends) {
    
                //5. 给操作者put 对象赋值
                uuidPut.addColumn(Bytes.toBytes(Constants.RELATION_TABLE_CF1), Bytes.toBytes(attend), Bytes.toBytes(attend));
                //6. 创建被关注者的put对象
                Put attendPut = new Put(Bytes.toBytes(attend));
                //7. 给被关注者对象赋值
                attendPut.addColumn(Bytes.toBytes(Constants.RELATION_TABLE_CF2), Bytes.toBytes(uuid), Bytes.toBytes(uuid));
                //8. 将被关注者对象放入集合
                relaPuts.add(attendPut);
            }
    
            //9 .将操作者的put对象添加至集合
            relaPuts.add(uuidPut);
    
            //10. 执行用户关系表的插入操作
            relaTable.put(relaPuts);
    
    
            // 第二部分:收件箱表
            // 1. 获取微博内容表对象
            Table conTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));
            // 2. 创建收件箱表的put对象
            Put inboxPut = new Put(Bytes.toBytes(uuid));
            // 3. 循环attends 获取每个被关注者近期发布的微博内容
            for (String attend : attends) {
                // 4. 获取当前被关注近期发布的微博内容
                Scan scan = new Scan(Bytes.toBytes(attend + "_"), Bytes.toBytes(attend + "|"));// startrow ,endrow
                ResultScanner resultScanner = conTable.getScanner(scan);
                // 定义一个时间戳
                long ts = System.currentTimeMillis();
                //5. 对获取的值进行遍历
                for (Result result : resultScanner) {
                    // 6. 给收件箱表put 对象赋值
                    inboxPut.addColumn(Bytes.toBytes(Constants.INBOX_TABLE_CF), Bytes.toBytes(attend), ts++, result.getRow());
    
                }
                //7. 判断当前的put的对象是否为空
                if (!inboxPut.isEmpty()) {
                    // 获取收件箱表对象
                    Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));
                    // 插入数据
                    inboxTable.put(inboxPut);
                    // 关闭资源
                    inboxTable.close();
                }
                // 关闭资源
                conTable.close();
                relaTable.close();
                connection.close();
    
            }
        }
    
        // 取关
        public static void delAttends(String uid, String... dels) throws IOException {
            if(dels.length<=0){
                System.out.println("请添加取关用户!");
                return;
            }
            //获取连接对象
            Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
            // 第一部分:操作用户关系表
            // 1. 获取用户关系表对象
            Table relTable = connection.getTable(TableName.valueOf(Constants.RELATION_TABLE));
            // 2. 创建一个集合,用户存放用户关系表的delete对象
            ArrayList<Delete> relDeletes = new ArrayList<>();
            // 3. 创建操作delete对象
            Delete uidDelete = new Delete(Bytes.toBytes(uid));
            // 4. 循环创建被取关的delete对象
            for (String del : dels) {
                // 5. 给操作者delete对象赋值
                uidDelete.addColumns(Bytes.toBytes(Constants.RELATION_TABLE_CF1), Bytes.toBytes(del));
                // 6. 创建被取关者的delete对象
                Delete delDelete = new Delete(Bytes.toBytes(del));
                // 7. 给被取关者delete对象赋值
                delDelete.addColumns(Bytes.toBytes(Constants.RELATION_TABLE_CF2), Bytes.toBytes(uid));
                // 8. 将被取关者delete对象添加至集合
                relDeletes.add(delDelete);
    
            }
            // 9. 将操作者的Delete对象添加至集合
            relDeletes.add(uidDelete);
            // 10.执行删除操作
            relTable.delete(relDeletes);
    
            // 第二部分:操作收件箱表
            // 1. 获取收件箱表对象
            Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));
            // 2. 创建操作者的delete对象
            Delete inboxDelete = new Delete(Bytes.toBytes(uid));
    
            // 3. 给操作者的delete 对象赋值
            for (String del : dels) {
                inboxDelete.addColumns(Bytes.toBytes(Constants.INBOX_TABLE_CF), Bytes.toBytes(del));
    
            }
            // 4. 执行收件箱表删除操作
            inboxTable.delete(inboxDelete);
    
            // 5. 关闭资源
            inboxTable.close();
            relTable.close();
            connection.close();
        }
        // 获取某人的初始化页面数据
        public static void getInt(String uid) throws IOException {
            //获取connection 对象
            Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
            // 1. 收件箱表对象
            Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));
    
            // 2. 获取内容表对象
            Table conTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));
            // 3. 创建收件箱表的Get对象,并获取数据(set最大版本获取)
            Get inboxGet = new Get(Bytes.toBytes(uid));
            inboxGet.setMaxVersions();
            // 4. 获取该get对象的内容
            Result result = inboxTable.get(inboxGet);
            // 5. 遍历获取到数据
            for (Cell cell : result.rawCells()) {
                // 6. 构建微博内容表get对象
                Get conGet = new Get(CellUtil.cloneValue(cell));
                // 7. 获取该get对象的内容
                Result result1 = conTable.get(conGet);
                // 8. 解析并打印
                for (Cell rawCell : result1.rawCells()) {
                    System.out.println(
                            "RK:"+Bytes.toString(CellUtil.cloneRow(rawCell))+","+
                            "CF:"+Bytes.toString(CellUtil.cloneFamily(rawCell))+","+
                            "cn:"+Bytes.toString(CellUtil.cloneQualifier(rawCell))+","+
                            "value:"+Bytes.toString(CellUtil.cloneValue(rawCell))
                    );
                }
    
            }
            //6. 关闭资源
            inboxTable.close();
            conTable.close();
            connection.close();
    
        }
        // 获取某人的所有微博详情
        public  static void getWeiBo(String uid) throws IOException {
            // 获取连接对象
            Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
            // 1. 获取内容表
    
            Table conTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));
            // 2. 构建scan 对象
            Scan scan = new Scan();
            // 构建过滤器
            Filter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL,new SubstringComparator(uid+"_"));
            scan.setFilter(rowFilter);
    
            // 3. 获取数据
            ResultScanner scanner = conTable.getScanner(scan);
            // 4. 解析数据并打印
            for (Result result : scanner) {
                for (Cell cell : result.rawCells()) {
                    System.out.println(
                            "RK:"+Bytes.toString(CellUtil.cloneRow(cell))+","+
                                    "CF:"+Bytes.toString(CellUtil.cloneFamily(cell))+","+
                                    "cn:"+Bytes.toString(CellUtil.cloneQualifier(cell))+","+
                                    "value:"+Bytes.toString(CellUtil.cloneValue(cell))
                    );
                }
            }
            // 5. 关闭资源
            conTable.close();
            connection.close();
    
        }
    }

    4. 测试类TestWeiBo

    package deng.com.test;
    
    import deng.com.constants.Constants;
    import deng.com.utils.HBaseUtils;
    
    import java.io.IOException;
    
    import static deng.com.dao.HBaseDao.*;
    
    public class TestWeiBO {
        public static void init() throws IOException {
            // 创建命名空间
    //        HBaseUtils.createNameSpace(Constants.NAMESPACE);
            // 创建微博内容表
            HBaseUtils.createTable(Constants.CONTENT_TABLE,Constants.CONTENT_TABLE_VERSIONS,Constants.CONTENT_TABLE_CF);
    
            // 创建用户关系表
            HBaseUtils.createTable(Constants.RELATION_TABLE,Constants.RELATION_TABLE_VERSIONS,Constants.RELATION_TABLE_CF1,Constants.RELATION_TABLE_CF2);
    
            // 创建收件箱表
            HBaseUtils.createTable(Constants.INBOX_TABLE,Constants.INBOX_TABLE_VERSIONS,Constants.INBOX_TABLE_CF);
        }
    
        public static void main(String[] args) throws IOException {
            // 初始化
            init();
            // 1001 发布微博
            publishWeiBo("1001","快来上课");
    
            // 1002 关注1001和1003
    //        addAttends("1002","1001","1003");
    
            // 获取1002 初始化页面
            getInt("1002");
            System.out.println("================1111================");
    //        getWeiBo("1001");
    
            // 1003 发布3条微博 。同时1001 发布两条微博v哦
            publishWeiBo("1003","我是1003 : 今天天气真好");
            publishWeiBo("1003","我是1003 : 一起去海边玩吧");
            publishWeiBo("1003","我是1003 : 走,开车去");
            publishWeiBo("1001","我是1001:我才不去");
            publishWeiBo("1001","我是1001:要去你自己去");
            // 获取1002 初始化页面
            getInt("1002");
            System.out.println("================2222================");
    
            // 1002 取关1003
            delAttends("1002","1003");
    
            // 获取1002 初始化页面
            getInt("1002");
            System.out.println("================3333================");
            // 1002 再次关注1003
            addAttends("1002","1003");
            System.out.println("================4444===============");
            // 获取1001 详情
            getWeiBo("1001");
    
    
    
        }
    }
  • 相关阅读:
    git 入门操作
    ubuntu apc 安装
    vps mysql自动关闭
    xdebug安装
    C#获取IP和主机名
    C#在类中用调用Form的方法
    luogu3181 [HAOI2016]找相同字符
    luogu6139 【模板】广义后缀自动机(广义SAM)
    广义后缀自动机小结
    Codeforces Round #620 (Div. 2) 题解
  • 原文地址:https://www.cnblogs.com/knighterrant/p/14746965.html
Copyright © 2011-2022 走看看