zoukankan      html  css  js  c++  java
  • HBase协处理器实现两个表数据的同步插入步骤

    项目源码:https://github.com/cw1322311203/hbasedemo/tree/master/hbase-coprocesser

    1.协处理器实现两个表数据的同步插入步骤

    可以使用协处理器来实现两个表数据的同步插入

    协处理器(Hbase自己的功能)实现两表的同步数据插入步骤:

    1. 创建类,继承BaseRegionObserver

    2. 重写方法:postPut

    3. 实现逻辑:

      增加student的数据,同时增加cw:student中的数据

    4. 将项目打包(依赖)后上传到HBase安装目录的lib目录中(集群所有节点都需要上传)并重启hbase集群,让hbase可以识别协处理器

      ProjectStructure–>Artifacts–>±->JAR–>Empty–>±->ModuleOutput–>选中Module–>Apply

      Build–>Build Artifacts–>Build

      上述操作完成后,可在out目录找到一个协处理器module的jar包,上传到hbase安装目录下的lib目录中,并重启HBase集群

    5. 删除原始表,在增加新表时,同时设定协处理器td.addCoprocessor(“com.cw.bigdata.hbase.coprocesser.InsertCwStudentCoprocesser”);

    2. 具体实现代码

    协处理器代码

    package com.cw.bigdata.hbase.coprocesser;
    
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Durability;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
    import org.apache.hadoop.hbase.coprocessor.ObserverContext;
    import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
    import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
    
    import java.io.IOException;
    
    /**
     * 协处理器(Hbase自己的功能)实现两表的同步数据插入
     * 1)创建类,继承BaseRegionObserver
     * 2)重写方法:postPut
     * 3)实现逻辑:
     * 1. 增加student的数据
     * 2. 同时增加cw:student中的数据
     * 4)将项目打包(依赖)后上传到HBase安装目录的lib目录中(集群所有节点都需要上传),让hbase可以识别协处理器
     * 5)删除原始表,在增加新表时,同时设定协处理器td.addCoprocessor("com.cw.bigdata.hbase.coprocesser.InsertCwStudentCoprocesser");
     */
    public class InsertCwStudentCoprocesser extends BaseRegionObserver {
    
        // prePut:之前
        // doPut:正在执行
        // postPut:之后,执行完插入操作之后执行什么操作
        @Override
        public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
            // 获取表
            Table table = e.getEnvironment().getTable(TableName.valueOf("cw:student"));
    
            // 增加数据
            table.put(put);
    
            // 关闭表
            table.close();
        }
    }
    

    创建表的代码

    package com.cw.bigdata.hbase.coprocesser;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.*;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.util.Bytes;
    
    
    /**
     * 测试Hbase API
     */
    public class TestHbaseAPI {
        public static void main(String[] args) throws Exception {
    
            // 通过java代码访问hbase数据库
    
            // 0) 创建配置对象,获取hbase的连接
            Configuration conf = HBaseConfiguration.create();
    
            Connection connection = ConnectionFactory.createConnection(conf);
    
            // 2) 获取操作对象 : Admin
            Admin admin = connection.getAdmin();
    
            // 3) 操作数据库
    
            // 3-1) 判断命名空间是否存在
            try {
                admin.getNamespaceDescriptor("cw");// 不存在会抛异常
            } catch (NamespaceNotFoundException e) {
                //创建表空间
                NamespaceDescriptor nd = NamespaceDescriptor.create("cw").build();
                admin.createNamespace(nd);
                System.out.println("命名空间创建成功...");
            }
    
            // 3-2) 判断hbase中是否存在某张表
            TableName tableName = TableName.valueOf("student");
            boolean flag = admin.tableExists(tableName);
            System.out.println(flag);
    
            if (flag) {
    
                // 获取指定的表对象
                Table table = connection.getTable(tableName);
    
                // 查询数据
                // Admin负责DDL(create drop alter) DML(update insert delete) DQL(select)
                String rowkey = "1001";
                // string==>byte[]
                // 字符编码问题
                Get get = new Get(Bytes.toBytes(rowkey));
    
                // 查询结果
                Result result = table.get(get);
                boolean empty = result.isEmpty();
                System.out.println("1001数据是否存在= " + !empty);
                if (empty) {
                    // 新增数据
                    Put put = new Put(Bytes.toBytes(rowkey));
    
                    String family = "info";
                    String column = "name";
                    String val = "zhangsan";
    
                    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(val));
    
                    table.put(put);
                    System.out.println("增加数据...");
                } else {
                    // 展示数据
                    for (Cell cell : result.rawCells()) {
                        System.out.println("value = " + Bytes.toString(CellUtil.cloneValue(cell)));
                        System.out.println("rowkey = " + Bytes.toString(CellUtil.cloneRow(cell)));
                        System.out.println("family = " + Bytes.toString(CellUtil.cloneFamily(cell)));
                        System.out.println("column = " + Bytes.toString(CellUtil.cloneQualifier(cell)));
                    }
                }
            } else {
                //创建表
    
                // 创建表描述对象
                HTableDescriptor td = new HTableDescriptor(tableName);
    
                // 增加协处理器
                td.addCoprocessor("com.cw.bigdata.hbase.coprocesser.InsertCwStudentCoprocesser");
    
                // 增加列族
                HColumnDescriptor cd = new HColumnDescriptor("info");
                td.addFamily(cd);
    
                admin.createTable(td);
    
                System.out.println(tableName + "表创建成功...");
            }
    
            // 4) 获取操作结果
    
            // 5) 关闭数据库连接
    
        }
    }
    
    

    3. 运行结果

    hbase(main):002:0> list
    TABLE
    cw:student
    1 row(s) in 0.0190 seconds
    
    # 运行完建表的api程序后
    
    hbase(main):004:0> describe 'student'
    Table student is ENABLED
    student, {TABLE_ATTRIBUTES => {coprocessor$1 => '|com.cw.bigdata.hbase.coprocesser.InsertCwStudentCoprocesser|1073741823|'}
    COLUMN FAMILIES DESCRIPTION
    {NAME => 'info', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '1', MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_
    CELLS => 'FALSE', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}
    1 row(s) in 0.0810 seconds
    
    hbase(main):005:0> describe 'cw:student'
    Table cw:student is ENABLED
    cw:student
    COLUMN FAMILIES DESCRIPTION
    {NAME => 'info', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0', KEEP_DELETED_
    CELLS => 'FALSE', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}
    1 row(s) in 0.0210 seconds
    
    hbase(main):006:0> put 'student','1001','info:name','zhangsan'
    0 row(s) in 0.1450 seconds
    
    hbase(main):007:0> scan 'student'
    ROW                                              COLUMN+CELL
     1001                                            column=info:name, timestamp=1574482734646, value=zhangsan
    1 row(s) in 0.0180 seconds
    
    hbase(main):008:0> scan 'cw:student'
    ROW                                              COLUMN+CELL
     1001                                            column=info:name, timestamp=1574482734646, value=zhangsan
    1 row(s) in 0.0110 seconds
    
    # 可以看到已经通过协处理器实现两表的同步插入了
    
  • 相关阅读:
    PAT (Advanced Level) Practice 1100 Mars Numbers (20分)
    PAT (Advanced Level) Practice 1107 Social Clusters (30分) (并查集)
    PAT (Advanced Level) Practice 1105 Spiral Matrix (25分)
    PAT (Advanced Level) Practice 1104 Sum of Number Segments (20分)
    PAT (Advanced Level) Practice 1111 Online Map (30分) (两次迪杰斯特拉混合)
    PAT (Advanced Level) Practice 1110 Complete Binary Tree (25分) (完全二叉树的判断+分享致命婴幼儿错误)
    PAT (Advanced Level) Practice 1109 Group Photo (25分)
    PAT (Advanced Level) Practice 1108 Finding Average (20分)
    P6225 [eJOI2019]异或橙子 树状数组 异或 位运算
    P4124 [CQOI2016]手机号码 数位DP
  • 原文地址:https://www.cnblogs.com/chenxiaoge/p/13335434.html
Copyright © 2011-2022 走看看