给HBase添加一二级索引,HBase协处理器结合solr
代码如下
package com.hbase.coprocessor; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.UUID; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author:FengZhen * @create:2018年7月9日 */ public class HbaseDataSyncSolrObserver extends BaseRegionObserver{ public static Logger log = LoggerFactory.getLogger(HbaseDataSyncSolrObserver.class); /** * start * @param e * @throws IOException */ @Override public void start(CoprocessorEnvironment e) throws IOException { } /** * stop * @param e * @throws IOException */ @Override public void stop(CoprocessorEnvironment e) throws IOException { } /** * Called after the client stores a value * after data put to hbase then prepare update builder to bulk Solr * * @param e * @param put * @param edit * @param durability * @throws IOException */ @Override public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap(); for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) { String id = UUID.randomUUID().toString(); String rowkey = Bytes.toString(CellUtil.cloneRow(entry.getValue().get(0))); List<String> tags = new ArrayList<String>(); for (Cell cell : entry.getValue()) { String key = Bytes.toString(CellUtil.cloneQualifier(cell)); if (key.contains("tb_") || key.contains("tm_")) { tags.add(key); } } if (null == tags || tags.size() <= 0) { continue; } VmMemory vmMemory = new VmMemory(); vmMemory.setId(id); vmMemory.setRowkey(rowkey); vmMemory.setTags(tags); SolrWriter.addDocToCache(vmMemory); } } }
Solr代码处理如下
package com.hbase.coprocessor; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.Vector; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrInputDocument; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author:FengZhen * @create:2018年7月9日 */ public class SolrWriter { public static Logger log = LoggerFactory.getLogger(SolrWriter.class); public static String urlSolr = "node244.qt:2181,node245.qt:2181,node246.qt:2181"; //solr地址 192.168.1.232:2181 public static String defaultCollection = "socialSecurity"; //默认collection tagCollectionHDFS socialSecurity public static int zkClientTimeOut =20000; //zk客户端请求超时间 public static int zkConnectTimeOut =10000; //zk客户端连接超时间 public static CloudSolrClient cloudSolrClient = null; public static int maxCacheCount = 200; //缓存大小,当达到该上限时提交 public static Vector<VmMemory> cache = null; //缓存 public static Vector<String> cacheRowkey = null; public static Lock commitLock =new ReentrantLock(); //在添加缓存或进行提交时加�? public static int maxCommitTime = 60*1; //�?大提交时�? static { Configuration conf = HBaseConfiguration.create(); urlSolr = conf.get("hbase.solr.zklist", "node244.qt:2181,node245.qt:2181,node246.qt:2181"); // 192.168.1.231:2181,192.168.1.232:2181,192.168.1.233:2181 defaultCollection = conf.get("hbase.solr.collection","socialSecurity"); zkClientTimeOut = conf.getInt("hbase.solr.zkClientTimeOut", 10000); zkConnectTimeOut = conf.getInt("hbase.solr.zkConnectTimeOut", 10000); maxCacheCount = conf.getInt("hbase.solr.maxCacheCount", 200); maxCommitTime = conf.getInt("hbase.solr.maxCommitTime", 60*1); log.info("solr init param"+urlSolr+" "+defaultCollection+" "+zkClientTimeOut+" "+zkConnectTimeOut+" "+maxCacheCount+" "+maxCommitTime); try { cache=new Vector<VmMemory>(maxCacheCount); cacheRowkey = new Vector<String>(maxCacheCount); cloudSolrClient = new CloudSolrClient(urlSolr); cloudSolrClient.setDefaultCollection(defaultCollection); cloudSolrClient.setZkClientTimeout(zkClientTimeOut); cloudSolrClient.setZkConnectTimeout(zkConnectTimeOut); //启动定时任务,第�?次延�?10执行,之后每隔指定时间执行�?�? Timer timer=new Timer(); timer.schedule(new CommitTimer(),10*1000,maxCommitTime*1000); } catch (Exception ex){ ex.printStackTrace(); } } /** * 批量提交 */ public void inputDoc(List<VmMemory> vmMoneyList) throws IOException, SolrServerException { if (vmMoneyList == null || vmMoneyList.size() == 0) { log.info("==========inputDoc:return========"); return; } List<SolrInputDocument> doclist= new ArrayList<SolrInputDocument>(vmMoneyList.size()); for (VmMemory vm : vmMoneyList) { String id = vm.getId(); String rowkey = vm.getRowkey(); List<String> tags = vm.getTags(); log.info("===id={}===rowkey={}=======",id,rowkey); Set<String> tagSet = new HashSet<String>(); SolrQuery solrQuery = new SolrQuery(); solrQuery.setQuery("rowkey:"+rowkey); QueryResponse queryResponse = cloudSolrClient.query(solrQuery); List<SolrDocument> rowkeys = queryResponse.getResults(); SolrInputDocument document = new SolrInputDocument(); if (null != rowkeys && rowkeys.size() > 0) { for(SolrDocument solrDocument : rowkeys) { id = (String)solrDocument.get("id"); rowkey = (String)solrDocument.get("rowkey"); List<String> solrTags = (List<String>)solrDocument.get("tags"); tagSet.addAll(solrTags); } } tagSet.addAll(tags); document.addField("id", id); document.addField("rowkey", rowkey); List<String> tagIds = new ArrayList<String>(tagSet); for (String tagId : tagIds) { document.addField("tags", tagId); } doclist.add(document); } cloudSolrClient.add(doclist); cloudSolrClient.commit(true, true, true); } /** * 单条提交 */ public void inputDoc(VmMemory vm) throws IOException, SolrServerException { if (vm == null) { return; } SolrInputDocument doc = new SolrInputDocument(); doc.addField("id", vm.getId()); doc.addField("rowkey", vm.getRowkey()); List<String> tags = vm.getTags(); for (String tag:tags) { doc.addField("tags", tag); } cloudSolrClient.add(doc); cloudSolrClient.commit(true, true, true); } public void deleteDoc(List<String> rowkeys) throws IOException, SolrServerException { if (rowkeys == null || rowkeys.size() == 0) { return; } cloudSolrClient.deleteById(rowkeys); cloudSolrClient.commit(true, true, true); } public void deleteDoc(String rowkey) throws IOException, SolrServerException { cloudSolrClient.deleteById(rowkey); cloudSolrClient.commit(true, true, true); } /** * 添加记录到cache,如果cache达到maxCacheCount,则提交 * addDocToCache会在hbase每次插入数据时将记录插入缓存�? * 并且判断是否达到上限,如果达到则将缓存内�?用数据提交到solr */ public static void addDocToCache(VmMemory vmMemory) { commitLock.lock(); try { //判断cache中是否有重复的rowkey,有则先提交 if (cacheRowkey.contains(vmMemory.getRowkey())) { new SolrWriter().inputDoc(cache); cache.clear(); cacheRowkey.clear(); } cache.add(vmMemory); cacheRowkey.add(vmMemory.getRowkey()); if (cache.size() >= maxCacheCount) { new SolrWriter().inputDoc(cache); cache.clear(); cacheRowkey.clear(); } } catch (Exception ex) { log.info(ex.getMessage()); } finally { commitLock.unlock(); } } /** * 提交定时�? * CommitTimer 则会每隔�?段时间提交一次, * 以保证缓存内�?有数据最终写入solr */ static class CommitTimer extends TimerTask { @Override public void run() { commitLock.lock(); try { if (cache.size() > 0) { //大于0则提�? log.info("timer commit count:"+cache.size()); new SolrWriter().inputDoc(cache); cache.clear(); cacheRowkey.clear(); } } catch (Exception ex) { log.info(ex.getMessage()); } finally { commitLock.unlock(); } } } }
协处理器使用步骤如下
1.代码打jar包,并上传至HDFS
2.创建HBase表并添加协处理器,如下
hbase(main):002:0> create 'socialSecurityTest','tags','userInfo' hbase(main):004:0> disable 'socialSecurityTest' hbase(main):010:0> alter 'socialSecurityTest',METHOD=>'table_att','coprocessor'=>'hdfs://nameservice/user/solr/hbase/observer/HBaseCoprocessor.jar|com.hbase.coprocessor.HbaseDataSyncSolrObserver|1001|collection=tagCollection' hbase(main):027:0> enable 'socialSecurityTest'
3.测试
hbase(main):016:0> put 'socialSecurityTest','rowkey-1','tags:0_1','1'
此时,可通过HBase日志查看协处理器的处理情况。
没错误的情况下,Solr中应该已经也有数据了
使用过程中出现的问题
2018-07-11 17:06:14,054 INFO [LruBlockCacheStatsExecutor] hfile.LruBlockCache: totalSize=417.42 KB, freeSize=395.89 MB, max=396.30 MB, blockCount=0, accesses=0, hits=0, hitRatio=0, cachingAccesses=0, cachingHits=0, cachingHitsRatio=0,evictions=8069, evicted=0, evictedPerRun=0.0 2018-07-11 17:06:23,523 ERROR [RpcServer.FifoWFPBQ.priority.handler=19,queue=1,port=16000] master.MasterRpcServices: Region server node231.qt,16020,1531219308266 reported a fatal error: ABORTING region server node231.qt,16020,1531219308266: The coprocessor com.hbase.coprocesser.HbaseDataSyncEsObserver threw java.lang.NoClassDefFoundError: org/apache/http/entity/mime/content/ContentBody Cause: java.lang.NoClassDefFoundError: org/apache/http/entity/mime/content/ContentBody at com.hbase.coprocesser.SolrUtil.insert(SolrUtil.java:53) at com.hbase.coprocesser.HbaseDataSyncEsObserver.postPut(HbaseDataSyncEsObserver.java:79) at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$32.call(RegionCoprocessorHost.java:923) at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$RegionOperation.call(RegionCoprocessorHost.java:1660) at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperation(RegionCoprocessorHost.java:1734) at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperation(RegionCoprocessorHost.java:1692) at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postPut(RegionCoprocessorHost.java:919) at org.apache.hadoop.hbase.regionserver.HRegion.doMiniBatchMutation(HRegion.java:3413) at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2986) at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2928) at org.apache.hadoop.hbase.regionserver.RSRpcServices.doBatchOp(RSRpcServices.java:748) at org.apache.hadoop.hbase.regionserver.RSRpcServices.doNonAtomicRegionMutation(RSRpcServices.java:708) at org.apache.hadoop.hbase.regionserver.RSRpcServices.multi(RSRpcServices.java:2124) at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:32393) at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2141) at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112) at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:187) at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:167) Caused by: java.lang.ClassNotFoundException: org.apache.http.entity.mime.content.ContentBody at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 18 more
两种解决方式
一、将缺少的jar包放入HBase的lib下
二、添加依赖重新打包即可,依赖如下
<dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpmime</artifactId> <version>4.3.2</version> </dependency>
pom添加一下内容
<build> <finalName>SolrTest</finalName> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.3.2</version>
</dependency>