zoukankan      html  css  js  c++  java
  • 使用solrj进行DIH操作

    背景说明:在一个项目中需要将Mongodb中的数据导入到solr中完成搜索。在solr中Mysql数据库有对应的DIH包,可以通过配置sql语句完成数据的导入。Mongodb下也有开源的工具用来实现数据的导入。看了下文档,感觉这个工具对数据的定制性不高,并且是python的,不是很满足项目需求。最后决定使用solrj来完成数据的导入。

    一、 遇到的问题

    1. 全量数据很大,在全量或者增量时无法一次性将数据全部获取: 对数据进行分页获取。(关于两种分页获取数据的性能问题,后面会单独介绍)

    2. 全量在更新数据时,需要将之前的老数据clean掉,增量则不需要: clean其实就是删除所有数据。

    3. 由于使用了分页获取数据,全量的clean操作必须是在全量开始之前完成,并且为了保证在做全量过程中,之前的老数据不会丢失,删除全部数据的操作对应的commit==false,且在整个全量过程中commit==false, 在最后完成全量后,再进行commit。

    4.增量操作和全量操作是通过配置不同的trigger完成的,比如增量每隔五分钟执行一次,全量则一天执行一次。如果没有对任务进行控制,可能会造成 全量和增量同时在做。刚才说了,全量操作的整个过程,commit==false, 所以对增量和全量的任务必须加互斥锁。

    二、相关的实现

    package com.meizu.galaxy2.solr;
    
    import org.apache.log4j.Logger;
    import org.apache.solr.client.solrj.SolrQuery;
    import org.apache.solr.client.solrj.SolrServerException;
    import org.apache.solr.client.solrj.impl.CloudSolrClient;
    import org.apache.solr.client.solrj.request.UpdateRequest;
    import org.apache.solr.common.SolrInputDocument;
    import org.apache.solr.common.params.ModifiableSolrParams;
    
    import java.io.IOException;
    import java.lang.reflect.Field;
    import java.util.List;
    
    /**
     * Created by ltao on 2015-7-16.
     */
    public class CloudSolrDIHClient {
    
        private static final Logger logger = Logger.getLogger(CloudSolrDIHClient.class);
        private static final String ID_FILED_NAME = "id";
        private CloudSolrClient solrClient;
    
        private static final int BATCH_SIZE=500;
    
        private String defaultCollection;
    
    
        public CloudSolrDIHClient(String zkHost, String zkNodePath, int zkClientTimeout, int zkConnectTimeout, String defaultCollection) {
    
            if (!zkHost.startsWith("zookeeper")) {
                logger.error("zk host must start with zookeeper://");
                return;
            } else {
    
                String hosts = zkHost.substring(12);
                hosts = hosts + zkNodePath;
                solrClient = new org.apache.solr.client.solrj.impl.CloudSolrClient(hosts);
                solrClient.setZkClientTimeout(zkClientTimeout);
                solrClient.setZkConnectTimeout(zkConnectTimeout);
                this.defaultCollection = defaultCollection;
            }
        }
    
    
        public void connect() throws Exception {
            solrClient.connect();
        }
    
        public void addDoc(SolrInputDocument doc) {
            if (this.defaultCollection != null) {
                this.addDoc(defaultCollection, doc);
            } else {
                logger.error("default collection should not be null");
            }
    
        }
    
        public void addDoc(String collection, SolrInputDocument doc) {
            try {
                solrClient.add(collection, doc);
            } catch (Exception e) {
                logger.error("add Doc occurs an error,collection:" + collection + ",doc_id:" + doc.getFieldValue(ID_FILED_NAME), e);
            }
        }
    
    
        public void addDocs(List<SolrInputDocument> docs) {
            if (this.defaultCollection != null) {
                this.addDocs(defaultCollection, docs);
            } else {
                logger.error("default collection should not be null");
            }
        }
    
    
        public void addDocs(String collection, List<SolrInputDocument> docs) {
            if(docs!=null && docs.size()>0) {
                int size=docs.size();
                if(size<=BATCH_SIZE) {
                    try {
                        solrClient.add(collection, docs);
                    } catch (Exception e) {
                        logger.error("add Docs occurs an error,collection:" + collection, e);
                    }
                }
                else
                {
                    int end=size>BATCH_SIZE? BATCH_SIZE:size;
                    int start=0;
                    while(true)
                    {
                        List<SolrInputDocument> subList=docs.subList(start,end);
                        try {
                            solrClient.add(collection, subList);
                        } catch (Exception e) {
                            logger.error("add Docs occurs an error,collection:" + collection, e);
                        }
                        if(end==size)
                        {
                            break;
                        }
                        start=start+BATCH_SIZE;
                        end=(end+BATCH_SIZE);
                       if(end>size)
                       {
                           end=size;
                       }
                    }
    
                }
            }
    
        }
    
        public void deleteDocByIds(List<String> ids) {
            if (this.defaultCollection != null) {
                this.deleteDocByIds(defaultCollection, ids);
            }
        }
    
        public void deleteDocByIds(String collection, List<String> ids) {
    
            try {
                solrClient.deleteById(collection,ids);
            } catch (Exception e) {
                logger.error("delete Docs occurs an error,collection:" + collection ,e);
            }
    
        }
    
    
        public void deleteDocById(String collection, String id) {
            try {
                solrClient.deleteById(collection, id);
            } catch (Exception e) {
                logger.error("delete Doc occurs an error,collection:" + collection + ",doc_id:" + id, e);
            }
        }
    
        public void deleteDocById(String id) {
            if (this.defaultCollection != null) {
                this.deleteDocById(defaultCollection, id);
    
            } else {
                logger.error("default collection should not be null");
            }
        }
    
    
        public void addBean(String collection, Object obj) {
            try {
                solrClient.addBean(collection, obj);
            } catch (Exception e) {
                String id = null;
                try {
                    Field idFiled = obj.getClass().getDeclaredField(ID_FILED_NAME);
                    if (idFiled != null) {
                        idFiled.setAccessible(true);
                        Object idFiledValue = idFiled.get(obj);
                        id = idFiledValue != null ? idFiledValue.toString() : "";
                    }
                } catch (Exception e1) {
                    logger.error("get id field occurs an error", e1);
                }
                logger.error("add bean occurs an error,collection:" + collection + ",bean_id:" + id, e);
            }
        }
    
        public void addBean(Object obj) throws SolrServerException, IOException {
            if (this.defaultCollection != null) {
                this.addBean(defaultCollection, obj);
            } else {
                logger.error("default collection should not be null");
            }
    
        }
    
        public void addBeans(List<Object> objs) throws SolrServerException, IOException {
            if (this.defaultCollection != null) {
                this.addBean(defaultCollection, objs);
            } else {
                logger.error("default collection should not be null");
            }
        }
    
    
        public void addBeans(String collection, List<Object> objs) {
            if(objs!=null && objs.size()>0) {
                int size=objs.size();
                if(size<=BATCH_SIZE) {
                    try {
                        solrClient.addBeans(collection, objs);
                    } catch (Exception e) {
                        logger.error("addBeans occurs an error,collection:" + collection, e);
                    }
                }
                else
                {
                    int end=size>BATCH_SIZE? BATCH_SIZE:size;
                    int start=0;
                    while(true)
                    {
                        List<Object> subList=objs.subList(start,end);
                        try {
                            solrClient.addBeans(collection, subList);
                        } catch (Exception e) {
                            logger.error("addBeans occurs an error,collection:" + collection, e);
                        }
                        if(end==size)
                        {
                            break;
                        }
                        start=start+BATCH_SIZE;
                        end=(end+BATCH_SIZE);
                        if(end>size)
                        {
                            end=size;
                        }
                    }
    
                }
            }
        }
    
    
        public void commit() throws SolrServerException, IOException {
            this.commit(defaultCollection);
        }
    
        public void commit(String collection) throws SolrServerException, IOException {
            solrClient.commit(collection);
        }
    
        public void clean(Boolean clean) throws SolrServerException, IOException {
            this.clean(defaultCollection, clean);
        }
    
        public void clean(String collection, Boolean clean) throws SolrServerException, IOException {
            UpdateRequest updateRequest = new UpdateRequest();
            updateRequest.setParam("stream.body", "<delete><query>*:*</query></delete>");
            updateRequest.setParam("commit", Boolean.toString(clean));
            solrClient.request(updateRequest, collection);
        }
    
    
        public void close() throws IOException {
            if (solrClient != null) {
                solrClient.close();
            }
        }
    
    
    }

    上面是对solrclient的简单封装:增加、删除、clean

    全量和增量进行加锁互斥

      private static Lock lock = new ReentrantLock();
    
        public void importDelta() {
            boolean hasGetLock = false;
            try {
                hasGetLock = lock.tryLock();
                if (hasGetLock) {
                    logger.info("start import delta hotel data ");
                    long start = System.currentTimeMillis();
                    hotelService.importDelta();
                    long end = System.currentTimeMillis();
                    logger.info("finish import delta hotel data ,spend " + (end - start) + " ms");
                }
            } finally {
                if (hasGetLock) {
                    lock.unlock();
                }
            }
    
        }
    
        public void importAll() {
    
            try {
                lock.lock();
                logger.info("start import all hotel data ");
                long start = System.currentTimeMillis();
                hotelService.importAll();
                long end = System.currentTimeMillis();
                logger.info("finish import all hotel data ,spend " + (end - start) + " ms");
            } finally {
                lock.unlock();
            }
        }
    
        public DataImportService getHotelService() {
            return hotelService;
        }

    这里用了Lock的tryLock,tryLock()会尝试获取锁,如果当前锁已被使用,则放弃该次获取锁操作。lock()则会阻塞,直到获取到锁。这样可以较大概率的保证全量一定能够执行。(如果增量一直都在运行,可能会造成全量一直阻塞,在实际运行中不会遇到这种情况;或者在某种机缘巧合下,增量一个接一个的获取到了锁,全量则一直阻塞,个人觉得应该可以使用公平锁解决刚才的这个问题,不过其实没必要)。

  • 相关阅读:
    dubbo-admin 2.0安装部署
    一文多发神器
    springboot整合druid踩坑记录
    thymeleaf中的重定向的绝对路径问题
    路径问题
    sp_executesql介绍和使用
    java.net.UnknownHostException: api.weixin.qq.com解决办法
    调用百度地图示例
    浅析分布式架构
    城乡医保用户角色未分配修改
  • 原文地址:https://www.cnblogs.com/limingluzhu/p/5120284.html
Copyright © 2011-2022 走看看