zoukankan      html  css  js  c++  java
  • 使用Coprocessor实现hbase+solr数据交互

    HBase和Solr可以通过协处理器 Coprocessor 的方式向Solr发出请求,Solr对于接收到的数据可以做相关的同步:增、删、改索引的操作。使用solr作为hbase的二级索引,构建基于solr+hbase的快速多条件复杂查询。

    查询时,先根据条件在solr中查找符合条件的rowkey,再根据rowkey从hbase中取数据,根据测试,分页查询时基本可以实现ms级的快速查询。

    1. 编写SolrIndexCoprocessorObserver代码

    package cn.ac.ict.solr.server;
    
    import cn.ac.ict.solr.utils.SolrWriter;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.client.Delete;
    import org.apache.hadoop.hbase.client.Durability;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
    import org.apache.hadoop.hbase.coprocessor.ObserverContext;
    import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
    import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.solr.common.SolrInputDocument;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    /**
     * 监听HBase,一有数据postPut就向Solr发送
     * hbase有两种coprocessor,一种是Observer(观察者),类似于关系数据库的trigger(触发器)
     * 另外一种就是EndPoint,类似于关系数据库的存储过程
     * 使用solrwrite进行写数据
     * User: zhaop
     * Date: 15-4-7
     * Time: 下午2:16
     */
    public class SolrIndexCoprocessorObserver extends BaseRegionObserver{
        private static final Logger logger = LoggerFactory.getLogger(SolrIndexCoprocessorObserver.class);
    
        @Override
        public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put,WALEdit edit, Durability durability) throws IOException {
            logger.info("postPut 向solr中插入数据");
            inputSolr(put);
        }
    
        @Override
        public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,WALEdit edit,Durability durability) throws IOException {
            String rowKey = Bytes.toString(delete.getRow());
            try {
                logger.info("postDelete 删除solr中的数据");
                SolrWriter solrWriter = new SolrWriter();
                solrWriter.deleteDoc(rowKey);
            } catch (Exception ex){
                logger.info("postDelete delete rowKey = "+rowKey+" from solr fail:"+ex.getMessage());
                logger.error(ex.getMessage(),ex);
            }
        }
    
    
        public void inputSolr(Put put) {
            String rowKey = Bytes.toString(put.getRow());
            try {
                Cell cell_did = put.get(Bytes.toBytes("values"), Bytes.toBytes("did")).get(0);
                String did = new String(CellUtil.cloneValue(cell_did));
                Cell cell_dvid = put.get(Bytes.toBytes("values"), Bytes.toBytes("dvid")).get(0);
                String dvid = new String(CellUtil.cloneValue(cell_dvid));
                Cell cell_value= put.get(Bytes.toBytes("values"), Bytes.toBytes("value")).get(0);
                String value = new String(CellUtil.cloneValue(cell_value));
                Cell cell_timestamp = put.get(Bytes.toBytes("values"), Bytes.toBytes("timestamp")).get(0);
                String timestamp = new String(CellUtil.cloneValue(cell_timestamp));
                Cell cell_model = put.get(Bytes.toBytes("values"), Bytes.toBytes("model")).get(0);
                String model = new String(CellUtil.cloneValue(cell_model));
    
                SolrInputDocument doc = new SolrInputDocument();
                doc.addField("rowkey", rowKey);
                doc.addField("did", did);
                doc.addField("dvid", dvid);
                doc.addField("value", value);
                doc.addField("timestamp", timestamp);
                doc.addField("model", model);
    
                SolrWriter.addDocToCache(doc);
                logger.info("postPut 向solr缓存中插入数据成功,rowKey = "+rowKey);
            } catch (Exception e) {
                logger.info("postPut write rowKey = "+rowKey+" to solr fail:"+e.getMessage());
                logger.error(e.getMessage(),e);
            }
        }
    
    }
    View Code
    package cn.ac.ict.solr.utils;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.solr.client.solrj.SolrServerException;
    import org.apache.solr.client.solrj.impl.CloudSolrClient;
    import org.apache.solr.common.SolrInputDocument;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.util.List;
    import java.util.Timer;
    import java.util.TimerTask;
    import java.util.Vector;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * 向sola中写数据,每隔一段时间
     * User: zhaop
     * Date: 15-4-9
     * Time: 下午8:50
     */
    public class SolrWriter {
        private static final Logger logger = LoggerFactory.getLogger(SolrWriter.class);
    
        public static String urlSolr = "";     //solr地址
        private static String defaultCollection = "";  //默认collection
        private static int zkClientTimeOut = 0;//zk客户端请求超时间
        private static int zkConnectTimeOut = 0;//zk客户端连接超时间
        private static CloudSolrClient cloudSolrClient = null;
    
        private static int maxCacheCount = 0;   //缓存大小,当达到该上限时提交
        private static Vector<SolrInputDocument> cache = null;   //缓存 此处缓存对象可以改为 SolrInputDocument更具通用性
        public static Lock commitLock = new ReentrantLock();  //在添加缓存或进行提交时加锁
    
        private static int maxCommitTime = 60; //最大提交时间,s
    
        static {
            Configuration conf = HBaseConfiguration.create();
            urlSolr = conf.get("hbase.solr.zklist", "192.168.0.177:2181");
            defaultCollection = conf.get("hbase.solr.collection", "dev_values");
            zkClientTimeOut = conf.getInt("hbase.solr.zkClientTimeOut", 10000);
            zkConnectTimeOut = conf.getInt("hbase.solr.zkConnectTimeOut", 10000);
            maxCacheCount = conf.getInt("hbase.solr.maxCacheCount", 10000);
            maxCommitTime = conf.getInt("hbase.solr.maxCommitTime", 1);
    
            logger.info("solr init param " + urlSolr + "  " + defaultCollection + "  " + zkClientTimeOut + "  " + zkConnectTimeOut + "  " + maxCacheCount + "  " + maxCommitTime);
            try {
                cache = new Vector<SolrInputDocument>(maxCacheCount);
    
                cloudSolrClient = new CloudSolrClient(urlSolr);
                cloudSolrClient.setDefaultCollection(defaultCollection);
                cloudSolrClient.setZkClientTimeout(zkClientTimeOut);
                cloudSolrClient.setZkConnectTimeout(zkConnectTimeOut);
    
                //启动定时任务,第一次延迟1s执行,之后每隔指定时间执行一次
                Timer timer = new Timer();
                timer.schedule(new CommitTimer(), 1 * 1000, maxCommitTime * 1000);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
    
        }
    
        /**
         * 批量提交
         */
        public void inputDoc(List<SolrInputDocument> docList) throws IOException, SolrServerException {
            if (docList == null || docList.size() == 0) {
                return;
            }
            /*List<SolrInputDocument> doclist = new ArrayList<SolrInputDocument>(deviceDataList.size());
            for (DeviceData dd : deviceDataList) {
                SolrInputDocument doc = new SolrInputDocument();
                doc.addField("rowkey", dd.getRowkey());
                doc.addField("did", dd.getDid());
                doc.addField("dvid", dd.getDvid());
                doc.addField("value", dd.getValue());
                doc.addField("timestamp", dd.getTimestamp());
                doc.addField("model", dd.getModel());
                doclist.add(doc);
            }*/
            cloudSolrClient.add(docList);
            cloudSolrClient.commit();
        }
    
        /**
         * 单条提交
         */
        public void inputDoc(SolrInputDocument doc) throws IOException, SolrServerException {
            if (doc == null) {
                return;
            }
            cloudSolrClient.add(doc);
            cloudSolrClient.commit();
        }
    
        public void deleteDoc(List<String> rowkeys) throws IOException, SolrServerException {
            if (rowkeys == null || rowkeys.size() == 0) {
                return;
            }
            cloudSolrClient.deleteById(rowkeys);
            cloudSolrClient.commit();
        }
    
        public void deleteDoc(String rowkey) throws IOException, SolrServerException {
            cloudSolrClient.deleteById(rowkey);
            cloudSolrClient.commit();
        }
    
        /**
         * 添加记录到cache,如果cache达到maxCacheCount,则提交
         */
        public static void addDocToCache(SolrInputDocument doc) {
            commitLock.lock();
            try {
                cache.add(doc);
                logger.info("cache commit maxCacheCount:" + maxCacheCount);
                logger.info("cache size:" + cache.size());
                if (cache.size() >= maxCacheCount) { //cache满则提交
                    logger.info("cache commit, count:" + cache.size());
                    new SolrWriter().inputDoc(cache);
                    cache.clear();
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                commitLock.unlock();
            }
        }
    
        /**
         * 提交定时器
         */
        static class CommitTimer extends TimerTask {
            @Override
            public void run() {
                commitLock.lock();
                try {
                    if (cache.size() > 0) { //cache中有数据则提交
                        logger.info("timer commit, count:" + cache.size());
                        new SolrWriter().inputDoc(cache);
                        cache.clear();
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                } finally {
                    commitLock.unlock();
                }
            }
        }
    
    }
    View Code


    2. 打成jar包上传到hadoop中

    目录为hdfs:///lib/coprocessor-solr-1.0-SNAPSHOT.jar

    进入hadoop bin目录
    创建lib目录
    hadoop fs -mkdir /lib 
    上传文件
    hadoop fs -put coprocessor-solr-1.0-SNAPSHOT.jar /lib
    查看是否已存在
    hadoop fs -lsr /lib
    

     3. hbase shell中添加coprocessor

    对表增加coprocessor

    disable 'dev_values' alter 'dev_values', METHOD => 'table_att', 'coprocessor'=>'hdfs:///lib/coprocessor-solr-1.0-SNAPSHOT.jar|cn.ac.ict.solr.server.SolrIndexCoprocessorObserver|1001|' enable 'dev_values' 查看是否已添加成功 describe 'dev_values' 'dev_values', {TABLE_ATTRIBUTES => {coprocessor$2 => 'hdfs:///lib/coprocessor-solr-1.0-SNAPSHOT.jar|cn.ac.ict.solr.server. true SolrIndexCoprocessorObserver|1001|'}, {NAME => 'values', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_ SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'false' , BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}


    4. 大功告成,进行测试

    向hbase中插入一条数据,查看日志是否有记录,solr中查看数据是否已存在

    5.hbase shell 删除coprocessor

    disable 'dev_values' 
    
    alter 'dev_values',METHOD => 'table_att_unset',NAME =>'coprocessor$1' 
    
    enable 'dev_values'
  • 相关阅读:
    Java 多线程(一) 基础知识与概念
    hashMap和treeMap
    转:Java IO流学习总结
    hibernate缓存
    java aio nio bio
    java1.8新特性
    LeetCode Contiguous Array
    LeetCode Sort Characters By Frequency
    LeetCode Subarray Sum Equals K
    LeetCode Group Anagrams
  • 原文地址:https://www.cnblogs.com/iiot/p/4415653.html
Copyright © 2011-2022 走看看