zoukankan      html  css  js  c++  java
  • 使用Observer实现HBase到Elasticsearch的数据同步

    •     最近在公司做统一日志收集处理平台,技术选型肯定要选择elasticsearch,因为可以快速检索系统日志,日志问题排查及功业务链调用可以被快速检索,公司各个应用的日志有些字段比如说content是不需要在es中作为存储的,当时考虑使用一种keyValue形式的数据库作存储,然后使用hbase的Rowkey作为es的docId,实现数据检索在es中,存储在hbase中,这样可以大大减轻es的存储压力。

    • 什么是 Observer

    HBase 0.92 版本引入了协处理器(Coprocessor),可以使开发者将自己的代码嵌入到 HBase 中,其中协处理器分为两大块,一个是终端(Endpoint),另一个是本文将要介绍的观察者(Observer)。

    Observer 有些类似于 MySQL 中的触发器(Trigger),它可以为 HBase 中的操作添加钩子,并在事件发生后实现自己的的业务逻辑。

    • Observer 主要分为三种:

    RegionObserver:增删改查相关,例如 Get、Put、Delete、Scan 等 WALObserver:WAL 操作相关 MasterObserver:DDL-类型相关,例如创建、删除、修改数据表等

    数据同步将会使用 RegionObserver 监听 Put 和 Delete 事件。

    • 如何实现自定义的的 Observer

    每一个 Observer 都是一个 Jar 包。首先需要引入hbase-server包,并实现如BaseRegionObserver等 HBase 提供的相关接口,重写需要监听对应事件的方法。

    实现数据同步功能可以重写postPut和putDelete方法监听 Put 和 Delete 事件。

    下面就是一个最简单的例子,在这两个方法中分别得到 hbsae表名和 RowKey 分别对应着es中的indexName和docId

    public class HbaseToEsObserver extends BaseRegionObserver {
        private static Client client = null;
        private static final Log LOG = LogFactory.getLog(HbaseToEsObserver.class);
        public static final String SEARCH_INDICE_PATTERN = "idx_%s_%s";
        /**
         * 读取HBase Shell的指令参数
         * @param env
         */
        private void readConfiguration(CoprocessorEnvironment env) {
            Configuration conf = env.getConfiguration();
            EsConfig.clusterName = conf.get("es_cluster");
            EsConfig.nodeHost = conf.get("es_host");
            EsConfig.nodePort = conf.getInt("es_port", 9300);
            EsConfig.indexName = conf.get("es_index");
            EsConfig.typeName = conf.get("es_type");
            LOG.info("observer -- started with config: " + EsConfig.getInfo());
        }
     
        @Override
        public void start(CoprocessorEnvironment env) throws IOException {
            readConfiguration(env);
            client = EsSearchManager.getInstance().getClient();
        }
        
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,                       Durability durability) {
           try {
                LOG.debug("es 索引开始 begin");
                String indexId = new String(put.getRow());
                Map<byte[], List<Cell>> familyMap =  put.getFamilyCellMap();
                Map<String, Object> json = new HashMap<String, Object>();
                for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
                    for (Cell cell : entry.getValue()) {
                        String key = Bytes.toString(CellUtil.cloneQualifier(cell));
                        String value = Bytes.toString(CellUtil.cloneValue(cell));
                        json.put(key, value);
                        LOG.info("key="+key+"value="+value);
                    }
                }
               //es中索引表的名称是idx_xxx_xxx
               String tableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
               String indexName = String.format(SEARCH_INDICE_PATTERN, EsConfig.indexName,tableName).toLowerCase();
               ElasticSearchUtil.addUpdateBuilderToBulk(client.prepareUpdate(indexName, EsConfig.typeName, indexId).setUpsert(json));
            } catch (Exception ex) {
                LOG.error(ex);
            }
        }
     
    public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
            try {
                String indexId = new String(delete.getRow());
                ElasticSearchUtil.addDeleteBuilderToBulk(client.prepareDelete(EsConfig.indexName, EsConfig.typeName, indexId));
                LOG.info("observer -- delete a doc: " + indexId);
            } catch (Exception ex) {
                LOG.error(ex);
            }
        }

    当日志hbase中一条条插入到hbase中的时候就会触发协处理器动作,为了减轻es服务器操作的压力我们批量操作es中的数据,先将索引数据存储到BulkRequestBuilder,当缓冲池中的索引数据为10条或者当提交间隔达到最大提交间隔的时候批量将索引数据发送到es服务器中。下面看下ElasticSearchUtil中的代码

    public class ElasticSearchUtil {
        private static final Log LOG = LogFactory.getLog(ElasticSearchUtil.class);
        // 缓冲池容量
        private static final int MAX_BULK_COUNT = 10;
        // 最大提交间隔(秒)
        private static final int MAX_COMMIT_INTERVAL = 60 * 2;
        private static Client client = null;
        private static BulkRequestBuilder bulkRequestBuilder = null;
        private static Lock commitIndexLock= new ReentrantLock();
     
        static {
            try {
               client = EsSearchManager.getInstance().getClient();
               bulkRequestBuilder = client.prepareBulk();
               bulkRequestBuilder.setRefresh(true);
               ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
               executor.scheduleWithFixedDelay(
                       new CommitIndexTimer(),
                       30 * 1000,
                       MAX_COMMIT_INTERVAL * 1000,
                       TimeUnit.MILLISECONDS);
            }catch(Exception e){
                LOG.error(e.getMessage());
             }
        }
     
        /**
         * 判断缓存池是否已满,批量提交
         *
         * @param threshold
         */
        private static void bulkRequest(int threshold) {
            if (bulkRequestBuilder.numberOfActions() > threshold) {
                LOG.info("执行索引程序,当前池中待索引数量="+bulkRequestBuilder.numberOfActions());
                BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
                if (!bulkResponse.hasFailures()) {
                    LOG.info("es索引程序成功!");
                    bulkRequestBuilder = client.prepareBulk();
                }
                if (bulkResponse.hasFailures()) {
                    LOG.error("es索引异常:"+bulkResponse.buildFailureMessage());
                }
            }
        }
     
        /**
         * 定时任务,避免RegionServer迟迟无数据更新,导致ElasticSearch没有与HBase同步
         * 定时执行
         */
        static class CommitIndexTimer implements Runnable {
            @Override
            public void run() {
                commitIndexLock.lock();
                try {
                    bulkRequest(0);
                } catch (Exception ex) {
                    ex.printStackTrace();
                } finally {
                    commitIndexLock.unlock();
                }
            }
        }
    }

    然后将项目打成jar包,提交到hdfs中,然后使用 HBase Shell 创建一个表,将这个 Observer 挂到该表中:

    create 'businessslog','info'
    disable 'businessslog'
     
    alter 'businessslog',METHOD =>'table_att','coprocessor' => 'hdfs://hadoop26:9000/observer.jar|com.github.hbase.observer.HbaseToEsObserver|1001|es_cluster=myes,es_type=loginfo,es_index=test,es_port=9300,es_host=114.55.253.15'
     
    enable 'businessslog'        
    describe 'businessslog'

    最后使用 describe 'businessslog' 命令就可以查看协处理器是否挂载成功,使用命令挂载协处理器还是有点麻烦,为此 封装了hbase创建表的时候自动建立协处理器的代码如下,不用在使用麻烦的命令建立协处理器了,直接调用Java 方法创建,方便了许多

     public void createTableWithCoprocessor(String tableName,String oberverName,String path,Map<String,String> map, String...familyColumn) throws Exception {
            TableName table = TableName.valueOf(tableName);
            Admin admin = getConn().getAdmin();
            boolean isExists = admin.tableExists(table);
            if(isExists){
                return ;
            }else{
                try {
                    HTableDescriptor htd = new HTableDescriptor(table);
                    for (String fc : familyColumn) {
                        HColumnDescriptor hcd = new HColumnDescriptor(fc);
                        htd.addFamily(hcd);
                    }
                    admin.createTable(htd);
                    admin.disableTable(table);
                    HTableDescriptor hTableDescriptor = new HTableDescriptor(table);
                    for (String fc : familyColumn) {
                        HColumnDescriptor hcd = new HColumnDescriptor(fc);
                        hTableDescriptor.addFamily(hcd);
                    }
                    hTableDescriptor.addCoprocessor(oberverName, new Path(path), Coprocessor.PRIORITY_USER, map);
                    admin.modifyTable(table, hTableDescriptor);
                    admin.enableTable(table);
                    admin.close();
                } catch (IOException e) {
                    logger.error(e.getMessage());
                }
            }
        }
     

    总结: es:可以实现复杂快速查询,但是不适合存储海量数据(针对一些大字段,不存储) hbase:可以实现海量数据存储,但是不适合进行复杂查询 es+hbase可以实现海量数据的复杂快速查询,在这里es可以认为是hbase的二级索引

    es中还需要将mapping映射配置正确,确保某些大字段建立索引 不存储,这里就在赘述,如上就可以实现当检索的时候还是在es中查询,当查询具体能容的时候再去hbase根据rowkey也就是es中的docId定位具体日志内容。

    以上总结了部分代码,详细的代码请查看github地址 https://github.com/winstonelei/BigDataTools ,包括了一些大数据组件的基本操作,包含了hbase,hadoop,es,hive等

    转载于:https://my.oschina.net/u/1792341/blog/915850

  • 相关阅读:
    [C++] static member variable and static const member variable
    [C++] const inside class VS const outside class
    [C++] OOP
    [C++] Function Template
    [C++] right value reference
    [C++] advanced reference
    [C++] const and char*
    [C++] c Struct VS c++ Struct
    [C++] decltype(auto) C++ 11 feature
    easyui-validatebox 的简单长度验证
  • 原文地址:https://www.cnblogs.com/qfdy123/p/13787806.html
Copyright © 2011-2022 走看看