zoukankan      html  css  js  c++  java
  • HBase协处理器的使用(添加Solr二级索引)

    给HBase添加一二级索引,HBase协处理器结合solr

    代码如下

    package com.hbase.coprocessor;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.NavigableMap;
    import java.util.UUID;
    
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.CoprocessorEnvironment;
    import org.apache.hadoop.hbase.client.Durability;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
    import org.apache.hadoop.hbase.coprocessor.ObserverContext;
    import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
    import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    
    /**
    * @author:FengZhen
    * @create:2018年7月9日
    */
    public class HbaseDataSyncSolrObserver extends BaseRegionObserver{
    	public static Logger log = LoggerFactory.getLogger(HbaseDataSyncSolrObserver.class);
        /**
         *  start
         * @param e
         * @throws IOException
         */
        @Override
        public void start(CoprocessorEnvironment e) throws IOException {
        }
    
        /**
         * stop
         * @param e
         * @throws IOException
         */
        @Override
        public void stop(CoprocessorEnvironment e) throws IOException {
        }
    
        /**
         * Called after the client stores a value
         * after data put to hbase then prepare update builder to bulk  Solr
         *
         * @param e
         * @param put
         * @param edit
         * @param durability
         * @throws IOException
         */
        @Override
        public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        	NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
            for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
            	String id = UUID.randomUUID().toString();
            	String rowkey = Bytes.toString(CellUtil.cloneRow(entry.getValue().get(0)));
    			List<String> tags = new ArrayList<String>();
            	for (Cell cell : entry.getValue()) {
    	            String key = Bytes.toString(CellUtil.cloneQualifier(cell));
    	            if (key.contains("tb_") || key.contains("tm_")) {
    	            	tags.add(key);
    				}
    	        }
            	if (null == tags || tags.size() <= 0) {
    				continue;
    			}
            	VmMemory vmMemory = new VmMemory();
            	vmMemory.setId(id);
            	vmMemory.setRowkey(rowkey);
            	vmMemory.setTags(tags);
            	SolrWriter.addDocToCache(vmMemory);
            }
        }
    }
    

     Solr代码处理如下

    package com.hbase.coprocessor;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.HashSet;
    import java.util.List;
    import java.util.Set;
    import java.util.Timer;
    import java.util.TimerTask;
    import java.util.Vector;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.solr.client.solrj.SolrQuery;
    import org.apache.solr.client.solrj.SolrServerException;
    import org.apache.solr.client.solrj.impl.CloudSolrClient;
    import org.apache.solr.client.solrj.response.QueryResponse;
    import org.apache.solr.common.SolrDocument;
    import org.apache.solr.common.SolrInputDocument;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
    * @author:FengZhen
    * @create:2018年7月9日
    */
    public class SolrWriter {
        public static Logger log = LoggerFactory.getLogger(SolrWriter.class);
    
        public static String urlSolr = "node244.qt:2181,node245.qt:2181,node246.qt:2181";     //solr地址 192.168.1.232:2181
        public static String defaultCollection = "socialSecurity";  //默认collection tagCollectionHDFS  socialSecurity
        public static int zkClientTimeOut =20000;  //zk客户端请求超时间
        public static int zkConnectTimeOut =10000;  //zk客户端连接超时间
        public static CloudSolrClient cloudSolrClient = null;
    
        public static int maxCacheCount = 200;   //缓存大小,当达到该上限时提交
        public static Vector<VmMemory> cache = null;   //缓存
        public static Vector<String> cacheRowkey = null;
        public  static Lock commitLock =new ReentrantLock();  //在添加缓存或进行提交时加�?
    
        public static int maxCommitTime = 60*1; //�?大提交时�?
    
        static {
            Configuration conf = HBaseConfiguration.create();
            urlSolr = conf.get("hbase.solr.zklist", "node244.qt:2181,node245.qt:2181,node246.qt:2181"); // 192.168.1.231:2181,192.168.1.232:2181,192.168.1.233:2181
            defaultCollection = conf.get("hbase.solr.collection","socialSecurity");
            zkClientTimeOut = conf.getInt("hbase.solr.zkClientTimeOut", 10000);
            zkConnectTimeOut = conf.getInt("hbase.solr.zkConnectTimeOut", 10000);
            maxCacheCount = conf.getInt("hbase.solr.maxCacheCount", 200);
            maxCommitTime =  conf.getInt("hbase.solr.maxCommitTime", 60*1);
    
            log.info("solr init param"+urlSolr+"  "+defaultCollection+"  "+zkClientTimeOut+"  "+zkConnectTimeOut+"  "+maxCacheCount+"  "+maxCommitTime);
            try {
                cache=new Vector<VmMemory>(maxCacheCount);
                cacheRowkey = new Vector<String>(maxCacheCount);
                cloudSolrClient = new CloudSolrClient(urlSolr);
                cloudSolrClient.setDefaultCollection(defaultCollection);
                cloudSolrClient.setZkClientTimeout(zkClientTimeOut);
                cloudSolrClient.setZkConnectTimeout(zkConnectTimeOut);
                //启动定时任务,第�?次延�?10执行,之后每隔指定时间执行�?�?
                Timer timer=new Timer();
                timer.schedule(new CommitTimer(),10*1000,maxCommitTime*1000);
            } catch (Exception ex){
                ex.printStackTrace();
            }
        }
        /**
         * 批量提交
         */
        public void inputDoc(List<VmMemory> vmMoneyList) throws IOException, SolrServerException {
            if (vmMoneyList == null || vmMoneyList.size() == 0) {
            	log.info("==========inputDoc:return========");
                return;
            }
            List<SolrInputDocument> doclist= new ArrayList<SolrInputDocument>(vmMoneyList.size());
            for (VmMemory vm : vmMoneyList) {
                String id = vm.getId();
                String rowkey = vm.getRowkey();
                List<String> tags = vm.getTags();
                log.info("===id={}===rowkey={}=======",id,rowkey);
                Set<String> tagSet = new HashSet<String>();
                SolrQuery solrQuery = new SolrQuery();
            	solrQuery.setQuery("rowkey:"+rowkey);
        		QueryResponse queryResponse = cloudSolrClient.query(solrQuery);
        		List<SolrDocument> rowkeys = queryResponse.getResults();
                SolrInputDocument document = new SolrInputDocument();
                
        		if (null != rowkeys && rowkeys.size() > 0) {
        			for(SolrDocument solrDocument : rowkeys) {
            			id = (String)solrDocument.get("id");
            			rowkey = (String)solrDocument.get("rowkey");
        	            List<String> solrTags = (List<String>)solrDocument.get("tags");
        	            tagSet.addAll(solrTags);
        			}
    			}
                tagSet.addAll(tags);
        		document.addField("id", id);
                document.addField("rowkey", rowkey);
                List<String> tagIds = new ArrayList<String>(tagSet);
                for (String tagId : tagIds) {
                	document.addField("tags", tagId);
    			}
                doclist.add(document);
            }
            cloudSolrClient.add(doclist);
            cloudSolrClient.commit(true, true, true);
        }
    
        /**
         * 单条提交
         */
        public void inputDoc(VmMemory vm) throws IOException, SolrServerException {
            if (vm == null) {
                return;
            }
            SolrInputDocument doc = new SolrInputDocument();
            doc.addField("id", vm.getId());
            doc.addField("rowkey", vm.getRowkey());
            List<String> tags = vm.getTags();
            for (String tag:tags) {
                doc.addField("tags", tag);
            }
            cloudSolrClient.add(doc);
            cloudSolrClient.commit(true, true, true);
        }
    
        public void deleteDoc(List<String> rowkeys) throws IOException, SolrServerException {
            if (rowkeys == null || rowkeys.size() == 0) {
                return;
            }
            cloudSolrClient.deleteById(rowkeys);
            cloudSolrClient.commit(true, true, true);
        }
    
        public void deleteDoc(String rowkey) throws IOException, SolrServerException {
            cloudSolrClient.deleteById(rowkey);
            cloudSolrClient.commit(true, true, true);
        }
    
        /**
         * 添加记录到cache,如果cache达到maxCacheCount,则提交
         * addDocToCache会在hbase每次插入数据时将记录插入缓存�?
         * 并且判断是否达到上限,如果达到则将缓存内�?用数据提交到solr
         */
        public static void addDocToCache(VmMemory vmMemory) {
            commitLock.lock();
            try {
            	//判断cache中是否有重复的rowkey,有则先提交
            	if (cacheRowkey.contains(vmMemory.getRowkey())) {
            		 new SolrWriter().inputDoc(cache);
                     cache.clear();
                     cacheRowkey.clear();
    			}
                cache.add(vmMemory);
                cacheRowkey.add(vmMemory.getRowkey());
                if (cache.size() >= maxCacheCount) {
                    new SolrWriter().inputDoc(cache);
                    cache.clear();
                    cacheRowkey.clear();
                }
            } catch (Exception ex) {
                log.info(ex.getMessage());
            } finally {
                commitLock.unlock();
            }
        }
    
        /**
         * 提交定时�?
         * CommitTimer 则会每隔�?段时间提交一次,
         * 以保证缓存内�?有数据最终写入solr
         */
        static class CommitTimer extends TimerTask {
            @Override
            public void run() {
                commitLock.lock();
                try {
                    if (cache.size() > 0) { //大于0则提�?
                        log.info("timer commit count:"+cache.size());
                        new SolrWriter().inputDoc(cache);
                        cache.clear();
                        cacheRowkey.clear();
                    }
                } catch (Exception ex) {
                    log.info(ex.getMessage());
                } finally {
                    commitLock.unlock();
                }
            }
        }
    }
    

    协处理器使用步骤如下

    1.代码打jar包,并上传至HDFS

    2.创建HBase表并添加协处理器,如下

    hbase(main):002:0> create 'socialSecurityTest','tags','userInfo'
    hbase(main):004:0> disable 'socialSecurityTest'
    hbase(main):010:0> alter 'socialSecurityTest',METHOD=>'table_att','coprocessor'=>'hdfs://nameservice/user/solr/hbase/observer/HBaseCoprocessor.jar|com.hbase.coprocessor.HbaseDataSyncSolrObserver|1001|collection=tagCollection' 
    hbase(main):027:0> enable 'socialSecurityTest'
    

    3.测试

    hbase(main):016:0> put 'socialSecurityTest','rowkey-1','tags:0_1','1'

    此时,可通过HBase日志查看协处理器的处理情况。

    没错误的情况下,Solr中应该已经也有数据了

    使用过程中出现的问题

    2018-07-11 17:06:14,054 INFO  [LruBlockCacheStatsExecutor] hfile.LruBlockCache: totalSize=417.42 KB, freeSize=395.89 MB, max=396.30 MB, blockCount=0, accesses=0, hits=0, hitRatio=0, cachingAccesses=0, cachingHits=0, cachingHitsRatio=0,evictions=8069, evicted=0, evictedPerRun=0.0
    2018-07-11 17:06:23,523 ERROR [RpcServer.FifoWFPBQ.priority.handler=19,queue=1,port=16000] master.MasterRpcServices: Region server node231.qt,16020,1531219308266 reported a fatal error:
    ABORTING region server node231.qt,16020,1531219308266: The coprocessor com.hbase.coprocesser.HbaseDataSyncEsObserver threw java.lang.NoClassDefFoundError: org/apache/http/entity/mime/content/ContentBody
    Cause:
    java.lang.NoClassDefFoundError: org/apache/http/entity/mime/content/ContentBody
            at com.hbase.coprocesser.SolrUtil.insert(SolrUtil.java:53)
            at com.hbase.coprocesser.HbaseDataSyncEsObserver.postPut(HbaseDataSyncEsObserver.java:79)
            at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$32.call(RegionCoprocessorHost.java:923)
            at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$RegionOperation.call(RegionCoprocessorHost.java:1660)
            at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperation(RegionCoprocessorHost.java:1734)
            at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperation(RegionCoprocessorHost.java:1692)
            at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postPut(RegionCoprocessorHost.java:919)
            at org.apache.hadoop.hbase.regionserver.HRegion.doMiniBatchMutation(HRegion.java:3413)
            at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2986)
            at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2928)
            at org.apache.hadoop.hbase.regionserver.RSRpcServices.doBatchOp(RSRpcServices.java:748)
            at org.apache.hadoop.hbase.regionserver.RSRpcServices.doNonAtomicRegionMutation(RSRpcServices.java:708)
            at org.apache.hadoop.hbase.regionserver.RSRpcServices.multi(RSRpcServices.java:2124)
            at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:32393)
            at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2141)
            at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
            at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:187)
            at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:167)
    Caused by: java.lang.ClassNotFoundException: org.apache.http.entity.mime.content.ContentBody
            at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
            at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
            ... 18 more

    两种解决方式

    一、将缺少的jar包放入HBase的lib下

    二、添加依赖重新打包即可,依赖如下

    <dependency>
    	<groupId>org.apache.httpcomponents</groupId>
    	<artifactId>httpmime</artifactId>
    	<version>4.3.2</version>
    </dependency> 

    pom添加一下内容

    <build>
        <finalName>SolrTest</finalName>
        
        <plugins>
      	<plugin>
    		<artifactId>maven-assembly-plugin</artifactId>
    		<configuration>
    			<descriptorRefs>
    				<descriptorRef>jar-with-dependencies</descriptorRef>
    			</descriptorRefs>
    			<archive>
    				<manifest>
    					<mainClass></mainClass>
    				</manifest>
    			</archive>
    		</configuration>
    		<executions>
    			<execution>
    				<id>make-assembly</id>
    				<phase>package</phase>
    				<goals>
    					<goal>single</goal>
    				</goals>
    			</execution>
    		</executions>
    	</plugin>
      </plugins>
      </build>

    <dependency>

           <groupId>org.apache.httpcomponents</groupId>

           <artifactId>httpmime</artifactId>

           <version>4.3.2</version>

    </dependency>

  • 相关阅读:
    OpenCL学习笔记(三):OpenCL安装,编程简介与helloworld
    OpenCL学习笔记(二):并行编程概念理解
    OpenCL学习笔记(一):摩尔定律,异构计算与OpenCL初印象
    深度学习开源工具——caffe介绍
    机器学习方法(五):逻辑回归Logistic Regression,Softmax Regression
    OpenCL与CUDA,CPU与GPU
    深度学习方法:受限玻尔兹曼机RBM(二)网络模型
    深度学习方法:受限玻尔兹曼机RBM(一)基本概念
    机器学习方法:回归(三):最小角回归Least Angle Regression(LARS),forward stagewise selection
    机器学习方法(四):决策树Decision Tree原理与实现技巧
  • 原文地址:https://www.cnblogs.com/EnzoDin/p/9431158.html
Copyright © 2011-2022 走看看