zoukankan      html  css  js  c++  java
  • 读书笔记-HBase in Action-第二部分Advanced concepts-(2)Coprocessor

    Coprocessor是HBase 0.92.0引入的特性。使用Coprocessor。能够将一些计算逻辑下推到HBase节点,HBase由一个单纯的存储系统升级为分布式数据处理平台。

    Coprocessor分为两种:Observer和Endpoint。

    Observer能改动扩展已有的client操作功能。而Endpoint能引入新的client操作。

    Observer

    Observer的作用类似于数据库的触发器或者AOP中的advice。下图为Put操作添加Observer,当中1-2-4-6是一次正常的Put操作RPC调用过程,而3和5属于Observer,能够在Put操作之前和之后添加自己定义处理逻辑。


    Observer包含三种,RegionObserver(针对数据訪问和更新操作,执行在Region上)/WALObserver(针对WAL日志事件,执行在RegionServer上下文)/MasterObserver(针对DDL操作,执行在Master节点)。

    Endpoint

    Endpoint的作用则类似于数据库存储过程。实现机制是通过扩展HBase RPC协议,给client暴露新的操作接口。

    例如以下图,client负责发起调用和收集结果,服务端各节点负责并行计算。


    实战

    以上一章的follows表为例,通过Observer实现followedBy被关注表数据一致性维护。Endpoint实现关注人数量统计。

    由于要实如今插入follows表时自己主动插入followedBy表。须要用到关注人/被关注人username信息,所以首先升级schema。


    实现Observer

    代码中有三处凝视值得注意:

    1. postPut方法在put操作之后被调用。
    2. 假设通过hbase-site.xml安装Observer。会应用到全局全部表,所以这里推断put操作的是否follows表。

    3. 这里有点bad smell。Observer执行在服务器端。为了共用代码,又调用client代码,仅为演示作用。

    packageHBaseIA.TwitBase.coprocessors;
    //…
    publicclass FollowsObserver extends BaseRegionObserver {
        private HTablePool pool = null;
        @Override
        public void start(CoprocessorEnvironment env)throws IOException {
            pool = newHTablePool(env.getConfiguration(), Integer.MAX_VALUE);
        }
        @Override
        public void stop(CoprocessorEnvironment env)throws IOException {
            pool.close();
        }
        @Override
        public void postPut(//1,在Put操作之后调用
                finalObserverContext<RegionCoprocessorEnvironment> e,
                final Put put,
                final WALEdit edit,
                final boolean writeToWAL) throws IOException {
            byte[] table=e.getEnvironment().getRegion().getRegionInfo().getTableName();
            if (!Bytes.equals(table,FOLLOWS_TABLE_NAME))
                 return;  //2,推断表名
            KeyValue kv =put.get(RELATION_FAM, FROM).get(0);
            String from =Bytes.toString(kv.getValue());
            kv = put.get(RELATION_FAM,TO).get(0);
            String to =Bytes.toString(kv.getValue());
            RelationsDAO relations = newRelationsDAO(pool);
            relations.addFollowedBy(to,from);//3,插入followedBy表
        }
    }
    Observer的安装能够通过改动hbase-site.xml或者使用tableschema改动语句完毕,前者须要重新启动HBase服务,后者仅仅须要又一次上下线相应表。

    $ hbase shell
    HBaseShell; enter 'help<RETURN>' for list of supported commands.
    Type"exit<RETURN>" to leave the HBase Shell
    Version0.92.0, r1231986, Mon Jan 16 13:16:35 UTC 2012
    hbase(main):001:0>disable 'follows'
    0 row(s) in 7.0560 seconds
    hbase(main):002:0>alter 'follows', METHOD => 'table_att',
    'coprocessor'=>'file:///Users/ndimiduk/repos/hbaseiatwitbase/
    target/twitbase-1.0.0.jar
    |HBaseIA.TwitBase.coprocessors.FollowsObserver|1001|'
    Updatingall regions with the new schema...
    1/1regions updated.
    Done.
    0 row(s) in 1.0770 seconds
    hbase(main):003:0>enable 'follows'
    0 row(s) in 2.0760 seconds

    当中1001为优先级。当载入多个Observer时。依照优先级次序执行。

    实现Endpoint

    关注人数量统计能够通过clientScan实现,相比Endpoint方案。有两个待改进点:

    1. 传输全部关注人到client,不必要的网络I/O。

    2. 拿到全部关注人Result结果后。遍历实现计数是单线程的。

    实现Endpoint包含三部分

    定义PRC接口

    publicinterface RelationCountProtocol extends CoprocessorProtocol {
        public long followedByCount(String userId) throwsIOException;
    }

    服务端实现

    和client不同,InternalScanner执行在特定Region上。返回的是原始的KeyValue对象。

    packageHBaseIA.TwitBase.coprocessors;
    //…
    publicclass RelationCountImpl extends BaseEndpointCoprocessor implementsRelationCountProtocol {
        @Override
        public longfollowedByCount(String userId) throws IOException {
            byte[]startkey = Md5Utils.md5sum(userId);
            Scan scan = newScan(startkey);
            scan.setFilter(newPrefixFilter(startkey));
            scan.addColumn(RELATION_FAM,FROM);
            scan.setMaxVersions(1);
            RegionCoprocessorEnvironmentenv= (RegionCoprocessorEnvironment)getEnvironment();
            InternalScanner scanner =env.getRegion().getScanner(scan);//1,server端
            long sum = 0;
            List<KeyValue> results= new ArrayList<KeyValue>();
            boolean hasMore = false;
            do {
                hasMore =scanner.next(results);
                sum += results.size();
                results.clear();
            } while (hasMore);
            scanner.close();
            return sum;
        }
    }

    client代码

    參考凝视:

    1. 定义Call实例 
    2. 调用服务端Endpoint。
    3. 聚合全部RegionServer得到的结果

    public long followedByCount (final String userId) throws Throwable {
        HTableInterface followed =pool.getTable(FOLLOWED_TABLE_NAME);
        final byte[] startKey = Md5Utils.md5sum(userId);
        final byte[] endKey =Arrays.copyOf(startKey, startKey.length);
        endKey[endKey.length-1]++;
        Batch.Call<RelationCountProtocol,Long> callable =
            newBatch.Call<RelationCountProtocol, Long>() {
            @Override
            public Longcall(RelationCountProtocol instance) throws IOException {
                returninstance.followedByCount(userId);
            }
        };//1 call instance
        Map<byte[], Long>results = followed.coprocessorExec(
                                       RelationCountProtocol.class,
                                       startKey,
                                       endKey,
                                       callable);//2 invoke endpoint
        long sum = 0;
        for(Map.Entry<byte[],Long> e : results.entrySet()) {
            sum +=e.getValue().longValue();
        }//3 aggreagte results
        return sum;
    }
    Endpoint仅仅能通过配置文件部署,还须要将相关jar包增加到HBase classpath。

    <property>
        <name>hbase.coprocessor.region.classes</name>
        <value>HBaseIA.TwitBase.coprocessors.RelationCountImpl</value>
    </property>

  • 相关阅读:
    leetcode : Valid Sudoku
    leetcode : Longest Increasing Subsequence
    leetcode : Search for a Range
    leetcode : Search Insert Position
    leetcode : next permutation
    leetcode : Implement strStr()
    leetcode : Remove Element
    框架:Spring MVC
    笔试:在线编程相关
    J2EE:关系(一对多、多对一、多对多关系)
  • 原文地址:https://www.cnblogs.com/yxwkf/p/5249803.html
Copyright © 2011-2022 走看看