项目源码:https://github.com/cw1322311203/hbasedemo/tree/master/hbase-coprocesser
1.协处理器实现两个表数据的同步插入步骤
可以使用协处理器来实现两个表数据的同步插入
协处理器(Hbase自己的功能)实现两表的同步数据插入步骤:
-
创建类,继承
BaseRegionObserver
-
重写方法:
postPut
-
实现逻辑:
增加student的数据,同时增加cw:student中的数据
-
将项目打包(依赖)后上传到HBase安装目录的lib目录中(集群所有节点都需要上传)并重启hbase集群,让hbase可以识别协处理器
ProjectStructure–>Artifacts–>±->JAR–>Empty–>±->ModuleOutput–>选中Module–>Apply
Build–>Build Artifacts–>Build
上述操作完成后,可在out目录找到一个协处理器module的jar包,上传到hbase安装目录下的lib目录中,并重启HBase集群
-
删除原始表,在增加新表时,同时设定协处理器td.addCoprocessor(“com.cw.bigdata.hbase.coprocesser.InsertCwStudentCoprocesser”);
2. 具体实现代码
协处理器代码
package com.cw.bigdata.hbase.coprocesser;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import java.io.IOException;
/**
* 协处理器(Hbase自己的功能)实现两表的同步数据插入
* 1)创建类,继承BaseRegionObserver
* 2)重写方法:postPut
* 3)实现逻辑:
* 1. 增加student的数据
* 2. 同时增加cw:student中的数据
* 4)将项目打包(依赖)后上传到HBase安装目录的lib目录中(集群所有节点都需要上传),让hbase可以识别协处理器
* 5)删除原始表,在增加新表时,同时设定协处理器td.addCoprocessor("com.cw.bigdata.hbase.coprocesser.InsertCwStudentCoprocesser");
*/
public class InsertCwStudentCoprocesser extends BaseRegionObserver {
// prePut:之前
// doPut:正在执行
// postPut:之后,执行完插入操作之后执行什么操作
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
// 获取表
Table table = e.getEnvironment().getTable(TableName.valueOf("cw:student"));
// 增加数据
table.put(put);
// 关闭表
table.close();
}
}
创建表的代码
package com.cw.bigdata.hbase.coprocesser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
/**
* 测试Hbase API
*/
public class TestHbaseAPI {
public static void main(String[] args) throws Exception {
// 通过java代码访问hbase数据库
// 0) 创建配置对象,获取hbase的连接
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
// 2) 获取操作对象 : Admin
Admin admin = connection.getAdmin();
// 3) 操作数据库
// 3-1) 判断命名空间是否存在
try {
admin.getNamespaceDescriptor("cw");// 不存在会抛异常
} catch (NamespaceNotFoundException e) {
//创建表空间
NamespaceDescriptor nd = NamespaceDescriptor.create("cw").build();
admin.createNamespace(nd);
System.out.println("命名空间创建成功...");
}
// 3-2) 判断hbase中是否存在某张表
TableName tableName = TableName.valueOf("student");
boolean flag = admin.tableExists(tableName);
System.out.println(flag);
if (flag) {
// 获取指定的表对象
Table table = connection.getTable(tableName);
// 查询数据
// Admin负责DDL(create drop alter) DML(update insert delete) DQL(select)
String rowkey = "1001";
// string==>byte[]
// 字符编码问题
Get get = new Get(Bytes.toBytes(rowkey));
// 查询结果
Result result = table.get(get);
boolean empty = result.isEmpty();
System.out.println("1001数据是否存在= " + !empty);
if (empty) {
// 新增数据
Put put = new Put(Bytes.toBytes(rowkey));
String family = "info";
String column = "name";
String val = "zhangsan";
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(val));
table.put(put);
System.out.println("增加数据...");
} else {
// 展示数据
for (Cell cell : result.rawCells()) {
System.out.println("value = " + Bytes.toString(CellUtil.cloneValue(cell)));
System.out.println("rowkey = " + Bytes.toString(CellUtil.cloneRow(cell)));
System.out.println("family = " + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("column = " + Bytes.toString(CellUtil.cloneQualifier(cell)));
}
}
} else {
//创建表
// 创建表描述对象
HTableDescriptor td = new HTableDescriptor(tableName);
// 增加协处理器
td.addCoprocessor("com.cw.bigdata.hbase.coprocesser.InsertCwStudentCoprocesser");
// 增加列族
HColumnDescriptor cd = new HColumnDescriptor("info");
td.addFamily(cd);
admin.createTable(td);
System.out.println(tableName + "表创建成功...");
}
// 4) 获取操作结果
// 5) 关闭数据库连接
}
}
3. 运行结果
hbase(main):002:0> list
TABLE
cw:student
1 row(s) in 0.0190 seconds
# 运行完建表的api程序后
hbase(main):004:0> describe 'student'
Table student is ENABLED
student, {TABLE_ATTRIBUTES => {coprocessor$1 => '|com.cw.bigdata.hbase.coprocesser.InsertCwStudentCoprocesser|1073741823|'}
COLUMN FAMILIES DESCRIPTION
{NAME => 'info', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '1', MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_
CELLS => 'FALSE', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}
1 row(s) in 0.0810 seconds
hbase(main):005:0> describe 'cw:student'
Table cw:student is ENABLED
cw:student
COLUMN FAMILIES DESCRIPTION
{NAME => 'info', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0', KEEP_DELETED_
CELLS => 'FALSE', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}
1 row(s) in 0.0210 seconds
hbase(main):006:0> put 'student','1001','info:name','zhangsan'
0 row(s) in 0.1450 seconds
hbase(main):007:0> scan 'student'
ROW COLUMN+CELL
1001 column=info:name, timestamp=1574482734646, value=zhangsan
1 row(s) in 0.0180 seconds
hbase(main):008:0> scan 'cw:student'
ROW COLUMN+CELL
1001 column=info:name, timestamp=1574482734646, value=zhangsan
1 row(s) in 0.0110 seconds
# 可以看到已经通过协处理器实现两表的同步插入了