HBase协处理器的简单使用
一、hbas协处理器介绍
hbase在0.92版本之前是没有协处理器的,之所以引入协处理器是为了能够让用户能够可以扩展服务端的类库,并直接在服务端完成特定任务二不需要跟客户端之间有IO操作.
hbase可以实现的功能
- 代码可以运行在每个表服务器的每个表上
- 提供高层调用接口给客户端使用
- 提供一个非常灵活的模型来构建分布式服务
- 为每一个应用提供自动化的扩展,负载均衡,请求路由的功能
hbase协处理器的分类
- EndPoint:
用来实现类似关系型数据库中的存储过程的功能
- Observers:
用来实现类似关系型数据库中的触发器的功能
HBase提供了一些基本的抽象类来简单的实现接口中的方法,便于用户编写协处理器的时候不需要手动实现每一个方法
观察者(Observer)
- RegionObserver:
针对Region的观察者,可以监听关于Region的操作
- RegionServerObserver:
针对RegionServer的观察者,可以监听整个RegionServer的操作
- MasterObserver:
针对Master观察者,可以监听Master进行的DDL操作
- WALObserver:
针对WAL的观察者,可以监听WAL的所有读写操作
- BulkLoadObserver:
BulkLoad是采用MapReduce将大量数据快读导入HBase的一种方式。BulkLoadObserver可以监听BulkLoad行为
- EndPointObserver:
可以监听EndPoint的执行过程
常用接口:
- BaseRegionObserver:
实现了RegionObserver接口的所有需要实现的方法,并给出了最简单的实现
- BaseMasterObserver:
实现了MasterObserver接口的所有需要实现的方法,并给出了最简单的实现
终端程序(EndPoint)
只实现了一个接口CoprocessorService,并且没有提供最基本的实现类。该接口只有一个方法需要实现:getServer,该方法主要返回ProtocolBuffers的实例,所以实现EndPoint之前,需要了解一下Protocol Buffers的相关知识作为基础
二、Observer协处理器的实现
上面讲了一堆概念,下面我们实现一个写出里器并加载到集群中进行测试一下
需求1:
有一张表winshop_attention,现在需要实现在每向里面put一条数据的时候,就会更新modifyTime为当前最新时间。之前的做法是,通过两条put语句来实现,现在通过写出里器交给协处理器来进行修改时间的更新:
STEP 1
新建一个maven工程,打包方式选择jar,然后添加项目的依赖。因为协处理器相关的接口都在hbase-server包里面,所有,我们只需要添加hbase-server的依赖即可,注意:最好不好同时添加hbase-server和hbase-client的依赖
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.1.2</version> </dependency>
STEP 2
新建 AttentionObserver,直接继承 BaseRegionObserver类,这样相对于实现Coprocessor接口来说,可以避免实现很多不必要的方法
STEP 3
在该类中我实现我们的逻辑,需要重写BaseRegionObserver当中的方法,方法很多,足以满足我们的业务逻辑需求,在再次我实现:数据put之后进行更新modifyTime
package com.winner.count; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.time.LocalDateTime; /** * @AUTHOR Guozy * @DATE 2020/9/14-0:18 **/ public class AttentionObserver extends BaseRegionObserver { @Override public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { String currentTime = String.valueOf(LocalDateTime.now()).replace("T", " "); put.addColumn(Bytes.toBytes("i"), Bytes.toBytes("mt"), Bytes.toBytes(currentTime)); } }
当然,这里面可以编写我们的业务逻辑,且 ObserverContext<RegionCoprocessorEnvironment> e 更是包含了更多的有用的信息。
STEP 4
打包该项目为jar包,并将jar包上传至hdfs目录下,具体打包方式,可以直接使用maven自带的package方式:
上传jar包到hdfs目录,我这里指定目录为:/winner/hadoop/hbase_config 下
STEP 5
在HBase中针对winshop_attention这张表启用这个观察者。登录hbase shell依次执行以下命令:
hbase(main):215:0> disable 'winshop_attention' hbase(main):216:0> alter 'winshop_attention',METHOD=>'table_att','coprocessor'=>'/winner/hadoop/hbase_config/hbaseComprocess.jar|com.winner.count.AttentionObserver||' hbase(main):217:0> enable 'winshop_attention'
协处理器加载格式命令行说明:
alter 'winshop_attention',METHOD=>'table_att','coprocessor'=>'①|②|③|④'
- table_att:固定词组,意思是调用setValue()方法给表设置属性
- coprocessor: 代表协处理器的意思
-
①:协处理器的jar包路径,要保证所有regionServer都可以读取到,可以在本地(每台机器的都要有),不过建议在hdfs上
-
②:协处理器的完成类名
-
③:协处理器的优先级,用整数表示,整数越小,优先级越高。可以为空
-
④:协处理器运行需要的参数,可以为空,多个参数用逗号分隔,例如:arg1=1,arg2=2
注:①|②|③|④ 之间不要有空格
STEP 6
验证协处理器是否加载成功:
- 将HBase表进行上线
- 执行put命令,查看modifytime是否会更新为最新时间
可以看到,我们的协处理器是生效了。
这里有一个坑:对于同一张表来说,如果想要卸载原有的协处理器,并加载新的协处理器的话,就必须更改jar名称,否则会包如下错误: