zoukankan      html  css  js  c++  java
  • solr DIH 知识梳理

    solr DIH 知识梳理

    web.xml中listener配置

    <listener>
            <listener-class>org.apache.solr.handler.dataimport.scheduler.ApplicationListener</listener-class>
    </listener>
    

    配置文件dataimport.properties

    #################################################
    #                                               #
    #       dataimport scheduler properties         #
    #                                               #
    #################################################
     
    #  to sync or not to sync
    #  1 - active; anything else - inactive
    syncEnabled=1
     
    #  which cores to schedule
    #  in a multi-core environment you can decide which cores you want syncronized
    #  leave empty or comment it out if using single-core deployment
    syncCores=core1,core2
     
    #  solr server name or IP address
    #  [defaults to localhost if empty]
    server=localhost
     
    #  solr server port
    #  [defaults to 80 if empty]
    port=8080
     
    #  application name/context
    #  [defaults to current ServletContextListener's context (app) name]
    webapp=solr
     
    #  URL params [mandatory]
    #  remainder of URL
    params=/dataimport?command=delta-import&clean=false&commit=true
     
    #  schedule interval
    #  number of minutes between two runs
    #  [defaults to 30 if empty]
    interval=1
     
    #  重做索引的时间间隔,单位分钟,默认7200,即5天; 
    #  为空,为0,或者注释掉:表示永不重做索引
    reBuildIndexInterval=7200
     
    #  重做索引的参数
    reBuildIndexParams=/dataimport?command=full-import&clean=true&commit=true
     
    #  重做索引时间间隔的计时开始时间,第一次真正执行的时间=reBuildIndexBeginTime+reBuildIndexInterval*60*1000;
    #  两种格式:2012-04-11 03:10:00 或者  03:10:00,后一种会自动补全日期部分为服务启动时的日期
    reBuildIndexBeginTime=03:10:00
    
    • interval增量索引的频率,每隔interval分钟就启动一次task
     timer.scheduleAtFixedRate(task, startTime, 60000 * interval);
    
    • 关于reBuildIndexBeginTime,这里表现为fullImportStartTime
    • 增量更新的请求参数params=/dataimport?command=delta-import&clean=false&commit=true
    fullImportTimer.scheduleAtFixedRate(fullImportTask, fullImportStartTime, 60000 * reBuildIndexInterval);
    

    data-config.xml配置

    • query是获取全部数据的SQL(solr从sql中获取那些数据),多列

    • deltaImportQuery是获取增量数据时使用的SQL(数据库新增数据追加到solr的数据),多列

    • deltaQuery是获取pk的SQL(数据库新增数据是,追加到solr的数据时的条件,根据id
      ,条件是最后一次获取的时间,${dataimporter.last_index_time,最后获取的时间}),一列

    • 这个是在增量时使用的修改语句,其中需要注意的是dataimporter.delta这个前缀一定要带
      pk,根据我艰苦卓绝的跟踪代码知道这个pk其实作用只是用来对deltaQuery查询出来的内容放入到一个map中的时候作为key用的
      如果你不想deltaQuery查询出来的结果最后出现混乱,那么最好保证pk是唯一的

    document中使用的是自己要被加入到索引中的field
    query,被用来做为全量导入的时候使用
    deltaImportQuery 这个是在增量时使用的修改语句,其中需要注意的是dataimporter.delta这个前缀一定要带
    pk,根据我艰苦卓绝的跟踪代码知道这个pk其实作用只是用来对deltaQuery查询出来的内容放入到一个map中的时候作为key用的
    如果你不想deltaQuery查询出来的结果最后出现混乱,那么最好保证pk是唯一的
    deltaQuery,这个是用来查询需要被更新的对象的主键,一边deltaImportQuery使用
    transformer:很多时候数据库中的字段不能满足你的需要,比如存储了用户生日,那么你需要将他的生肖存储,则此时需要对生日做自己的处理
    

    增量索引

    终止跑索引:http://localhost:8080/solr/collection1/dataimport?command=abort
    开始索引:http://localhost:8080/solr/collection1/dataimport?command=full-import
    增量索引 :http://localhost:8080/solr/collection1/dataimport?command=delta-import
    

    源代码apache-solr-dataimportscheduler-1.4.jar
    ApplicationListener

    package org.apache.solr.handler.dataimport.scheduler;
    
    import java.util.Calendar;
    import java.util.Date;
    import java.util.Timer;
    import javax.servlet.ServletContext;
    import javax.servlet.ServletContextEvent;
    import javax.servlet.ServletContextListener;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class ApplicationListener
      implements ServletContextListener
    {
      private static final Logger logger = LoggerFactory.getLogger(ApplicationListener.class)
        ;
    
      public void contextDestroyed(ServletContextEvent servletContextEvent)
      {
        ServletContext servletContext = servletContextEvent.getServletContext();
    
        Timer timer = (Timer)servletContext.getAttribute("timer");
    
        Timer fullImportTimer = (Timer)servletContext
          .getAttribute("fullImportTimer");
    
        if (timer != null)
          timer.cancel();
        if (fullImportTimer != null) {
          fullImportTimer.cancel();
        }
    
        servletContext.removeAttribute("timer");
        servletContext.removeAttribute("fullImportTimer");
      }
    
      public void contextInitialized(ServletContextEvent servletContextEvent)
      {
        ServletContext servletContext = servletContextEvent.getServletContext();
        try
        {
          Timer timer = new Timer();
        //增量索引task
          DeltaImportHTTPPostScheduler task = new DeltaImportHTTPPostScheduler(servletContext
            .getServletContextName(), timer);
        //配置的间隔时间分钟
          int interval = task.getIntervalInt();
    
          Calendar calendar = Calendar.getInstance();
    
          calendar.add(12, interval);
          Date startTime = calendar.getTime();
          //task调度
          timer.scheduleAtFixedRate(task, startTime, 60000 * interval);
    
          servletContext.setAttribute("timer", timer);
    
          Timer fullImportTimer = new Timer();
          //全量索引task
          FullImportHTTPPostScheduler fullImportTask = new FullImportHTTPPostScheduler(servletContext
            .getServletContextName(), fullImportTimer);
          //这里重建索引时间
          int reBuildIndexInterval = fullImportTask
            .getReBuildIndexIntervalInt();
          if (reBuildIndexInterval <= 0) {
            logger.warn("Full Import Schedule disabled");
            return;
          }
    
          Calendar fullImportCalendar = Calendar.getInstance();
          Date beginDate = fullImportTask.getReBuildIndexBeginTime();
          fullImportCalendar.setTime(beginDate);
          fullImportCalendar.add(12, reBuildIndexInterval);
          Date fullImportStartTime = fullImportCalendar.getTime();
          //fullImportStartTime这个跟配置文件里的reBuildIndexBeginTime相关,
          fullImportTimer.scheduleAtFixedRate(fullImportTask, fullImportStartTime, 60000 * reBuildIndexInterval);
    
          servletContext.setAttribute("fullImportTimer", fullImportTimer);
        }
        catch (Exception e) {
          if (e.getMessage().endsWith("disabled"))
            logger.warn("Schedule disabled");
          else
            logger.error("Problem initializing the scheduled task: ", e);
        }
      }
    }
    

    增量,全量索引task都继承自BaseTimerTask,主要都差不多,看BaseTimerTask的prepUrlSendHttpPost就好
    增量索引task

    package org.apache.solr.handler.dataimport.scheduler;
    
    import java.util.Timer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class DeltaImportHTTPPostScheduler extends BaseTimerTask
    {
      private static final Logger logger = LoggerFactory.getLogger(DeltaImportHTTPPostScheduler.class)
        ;
    
      public DeltaImportHTTPPostScheduler(String webAppName, Timer t) throws Exception
      {
        super(webAppName, t);
        logger.info("<index update process> DeltaImportHTTPPostScheduler init");
      }
    
      public void run()
      {
        try {
          if ((this.server.isEmpty()) || (this.webapp.isEmpty()) || (this.params == null) || 
            (this.params
            .isEmpty())) {
            logger.warn("<index update process> Insuficient info provided for data import");
            logger.info("<index update process> Reloading global dataimport.properties");
            reloadParams();
          }
          else if (this.singleCore) {
            prepUrlSendHttpPost(this.params);
          }
          else if ((this.syncCores.length == 0) || ((this.syncCores.length == 1) && 
            (this.syncCores[0]
            .isEmpty()))) {
            logger.warn("<index update process> No cores scheduled for data import");
            logger.info("<index update process> Reloading global dataimport.properties");
            reloadParams();
          }
          else {
            for (String core : this.syncCores)
              prepUrlSendHttpPost(core, this.params);
          }
        }
        catch (Exception e) {
          logger.error("Failed to prepare for sendHttpPost", e);
          reloadParams();
        }
      }
    }
    

    全量索引task

    package org.apache.solr.handler.dataimport.scheduler;
    
    import java.util.Timer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class FullImportHTTPPostScheduler extends BaseTimerTask
    {
      private static final Logger logger = LoggerFactory.getLogger(FullImportHTTPPostScheduler.class)
        ;
    
      public FullImportHTTPPostScheduler(String webAppName, Timer t) throws Exception
      {
        super(webAppName, t);
        logger.info("<index update process> DeltaImportHTTPPostScheduler init");
      }
    
      public void run()
      {
        try {
          if ((this.server.isEmpty()) || (this.webapp.isEmpty()) || (this.reBuildIndexParams == null) || 
            (this.reBuildIndexParams
            .isEmpty())) {
            logger.warn("<index update process> Insuficient info provided for data import, reBuildIndexParams is null");
            logger.info("<index update process> Reloading global dataimport.properties");
            reloadParams();
          }
          else if (this.singleCore) {
            prepUrlSendHttpPost(this.reBuildIndexParams);
          }
          else if ((this.syncCores.length == 0) || ((this.syncCores.length == 1) && 
            (this.syncCores[0]
            .isEmpty()))) {
            logger.warn("<index update process> No cores scheduled for data import");
            logger.info("<index update process> Reloading global dataimport.properties");
            reloadParams();
          }
          else {
            for (String core : this.syncCores)
              prepUrlSendHttpPost(core, this.reBuildIndexParams);
          }
        }
        catch (Exception e) {
          logger.error("Failed to prepare for sendHttpPost", e);
          reloadParams();
        }
      }
    }
    

    BaseTimerTask的,这里主要关注prepUrlSendHttpPost

    package org.apache.solr.handler.dataimport.scheduler;
    
    import java.io.IOException;
    import java.net.HttpURLConnection;
    import java.net.MalformedURLException;
    import java.net.URL;
    import java.text.DateFormat;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Timer;
    import java.util.TimerTask;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public abstract class BaseTimerTask extends TimerTask
    {
      protected String syncEnabled;
      protected String[] syncCores;
      protected String server;
      protected String port;
      protected String webapp;
      protected String params;
      protected String interval;
      protected String cores;
      protected SolrDataImportProperties p;
      protected boolean singleCore;
      protected String reBuildIndexParams;
      protected String reBuildIndexBeginTime;
      protected String reBuildIndexInterval;
      private String webAppName;
      protected static final Logger logger = LoggerFactory.getLogger(BaseTimerTask.class)
        ;
    
      public BaseTimerTask(String webAppName, Timer t) throws Exception
      {
        this.webAppName = webAppName;
    
        this.p = new SolrDataImportProperties();
        reloadParams();
    
        if (!this.syncEnabled.equals("1")) {
          throw new Exception("Schedule disabled");
        }
        if ((this.syncCores == null) || ((this.syncCores.length == 1) && 
          (this.syncCores[0]
          .isEmpty()))) {
          this.singleCore = true;
          logger.info("<index update process> Single core identified in dataimport.properties");
        } else {
          this.singleCore = false;
          logger.info(new StringBuilder().append("<index update process> Multiple cores identified in dataimport.properties. Sync active for: ").append(this.cores).toString());
        }
      }
    
      protected void reloadParams()
      {
        this.p.loadProperties(true);
        this.syncEnabled = this.p.getProperty("syncEnabled");
        this.cores = this.p.getProperty("syncCores");
        this.server = this.p.getProperty("server");
        this.port = this.p.getProperty("port");
        this.webapp = this.p.getProperty("webapp");
        this.params = this.p.getProperty("params");
        this.interval = this.p.getProperty("interval");
        this.syncCores = (this.cores != null ? this.cores.split(",") : null);
    
        this.reBuildIndexParams = this.p
          .getProperty("reBuildIndexParams");
    
        this.reBuildIndexBeginTime = this.p
          .getProperty("reBuildIndexBeginTime");
    
        this.reBuildIndexInterval = this.p
          .getProperty("reBuildIndexInterval");
    
        fixParams(this.webAppName);
      }
    
      protected void fixParams(String webAppName) {
        if ((this.server == null) || (this.server.isEmpty()))
          this.server = "localhost";
        if ((this.port == null) || (this.port.isEmpty()))
          this.port = "8080";
        if ((this.webapp == null) || (this.webapp.isEmpty()))
          this.webapp = webAppName;
        if ((this.interval == null) || (this.interval.isEmpty()) || (getIntervalInt() <= 0))
          this.interval = "30";
        if ((this.reBuildIndexBeginTime == null) || (this.reBuildIndexBeginTime.isEmpty()))
          this.interval = "00:00:00";
        if ((this.reBuildIndexInterval == null) || (this.reBuildIndexInterval.isEmpty()) || 
          (getReBuildIndexIntervalInt() <= 0))
          this.reBuildIndexInterval = "0";
      }
    
      protected void prepUrlSendHttpPost(String params)
      {
        sendHttpPost(null, params);
      }
    
      protected void prepUrlSendHttpPost(String coreName, String params)
      {
        sendHttpPost(coreName, params);
      }
    
      protected void sendHttpPost(String coreName, String params)
      {
        DateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss SSS");
        Date startTime = new Date();
    
        String core = coreName == null ? "" : new StringBuilder().append("[").append(coreName).append("] ").toString();
    
        logger.info(new StringBuilder().append(core).append("<index update process> Process started at .............. ")
          .append(df
          .format(startTime))
          .toString());
        try
        {
          String completeUrl = buildUrl(coreName, params);
          URL url = new URL(completeUrl);
          HttpURLConnection conn = (HttpURLConnection)url.openConnection();
    
          conn.setRequestMethod("POST");
          conn.setRequestProperty("type", "submit");
          conn.setDoOutput(true);
    
          conn.connect();
    
          logger.info(new StringBuilder().append(core).append("<index update process> Full URL				").append(conn.getURL()).toString());
          logger.info(new StringBuilder().append(core).append("<index update process> Response message			")
            .append(conn
            .getResponseMessage()).toString());
          logger.info(new StringBuilder().append(core).append("<index update process> Response code			")
            .append(conn
            .getResponseCode()).toString());
    
          if (conn.getResponseCode() != 200) {
            reloadParams();
          }
    
          conn.disconnect();
          logger.info(new StringBuilder().append(core).append("<index update process> Disconnected from server		").append(this.server).toString());
          Date endTime = new Date();
          logger.info(new StringBuilder().append(core).append("<index update process> Process ended at ................ ")
            .append(df
            .format(endTime))
            .toString());
        } catch (MalformedURLException mue) {
          logger.error("Failed to assemble URL for HTTP POST", mue);
        } catch (IOException ioe) {
          logger.error("Failed to connect to the specified URL while trying to send HTTP POST", ioe);
        }
        catch (Exception e)
        {
          logger.error("Failed to send HTTP POST", e);
        }
      }
    
      private String buildUrl(String coreName, String params) { StringBuilder sb = new StringBuilder();
    
        sb.append("http://").append(this.server).append(":").append(this.port);
    
        if (!this.webapp.startsWith("/"))
          sb.append("/");
        sb.append(this.webapp);
    
        if ((coreName != null) && (!coreName.isEmpty())) {
          if (!this.webapp.endsWith("/"))
            sb.append("/");
          sb.append(coreName);
        }
    
        if (sb.charAt(sb.length() - 1) == '/') {
          if (params.startsWith("/"))
            sb.setLength(sb.length() - 1);
        }
        else if (!params.startsWith("/")) {
          sb.append("/");
        }
        sb.append(params);
    
        return sb.toString(); }
    
      public int getIntervalInt() {
        try {
          return Integer.parseInt(this.interval);
        } catch (NumberFormatException e) {
          logger.warn("Unable to convert 'interval' to number. Using default value (30) instead", e);
        }
    
        return 30;
      }
    
      public int getReBuildIndexIntervalInt()
      {
        try {
          return Integer.parseInt(this.reBuildIndexInterval);
        } catch (NumberFormatException e) {
          logger.info("Unable to convert 'reBuildIndexInterval' to number. do't rebuild index.", e);
        }
    
        return 0;
      }
    
      public Date getReBuildIndexBeginTime()
      {
        Date beginDate = null;
        try {
          SimpleDateFormat sdfDate = new SimpleDateFormat("yyyy-MM-dd");
          String dateStr = sdfDate.format(new Date());
          beginDate = sdfDate.parse(dateStr);
          if ((this.reBuildIndexBeginTime == null) || 
            (this.reBuildIndexBeginTime
            .isEmpty()))
            return beginDate;
          SimpleDateFormat sdf;
          if (this.reBuildIndexBeginTime.matches("\d{2}:\d{2}:\d{2}")) {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
            beginDate = sdf.parse(new StringBuilder().append(dateStr).append(" ").append(this.reBuildIndexBeginTime).toString());
          }
          else if (this.reBuildIndexBeginTime
            .matches("\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}"))
          {
            sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
          }
          return sdf.parse(this.reBuildIndexBeginTime);
        }
        catch (ParseException e)
        {
          logger.warn("Unable to convert 'reBuildIndexBeginTime' to date. use now time.", e);
        }
    
        return beginDate;
      }
    }
    

    最后还有Properties文件相关的类SolrDataImportProperties

    package org.apache.solr.handler.dataimport.scheduler;
    
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.util.Properties;
    import org.apache.solr.core.SolrResourceLoader;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class SolrDataImportProperties
    {
      private Properties properties;
      public static final String SYNC_ENABLED = "syncEnabled";
      public static final String SYNC_CORES = "syncCores";
      public static final String SERVER = "server";
      public static final String PORT = "port";
      public static final String WEBAPP = "webapp";
      public static final String PARAMS = "params";
      public static final String INTERVAL = "interval";
      public static final String REBUILDINDEXPARAMS = "reBuildIndexParams";
      public static final String REBUILDINDEXBEGINTIME = "reBuildIndexBeginTime";
      public static final String REBUILDINDEXINTERVAL = "reBuildIndexInterval";
      private static final Logger logger = LoggerFactory.getLogger(SolrDataImportProperties.class)
        ;
    
      public void loadProperties(boolean force)
      {
        try
        {
          SolrResourceLoader loader = new SolrResourceLoader(null);
          logger.info("Instance dir = " + loader.getInstanceDir());
    
          String configDir = loader.getConfigDir();
          configDir = SolrResourceLoader.normalizeDir(configDir);
          if ((force) || (this.properties == null)) {
            this.properties = new Properties();
    
            String dataImportPropertiesPath = configDir + "dataimport.properties";
    
            FileInputStream fis = new FileInputStream(dataImportPropertiesPath);
    
            this.properties.load(fis);
          }
        } catch (FileNotFoundException fnfe) {
          logger.error("Error locating DataImportScheduler dataimport.properties file", fnfe);
        }
        catch (IOException ioe)
        {
          logger.error("Error reading DataImportScheduler dataimport.properties file", ioe);
        }
        catch (Exception e)
        {
          logger.error("Error loading DataImportScheduler properties", e);
        }
      }
    
      public String getProperty(String key) {
        return this.properties.getProperty(key);
      }
    }
    

    服务端流程主要流程

    handleRequestBody-->  importer.runCmd-->doFullImport/doDeltaImport--> docBuilder.execute()--> doDelta()-->collectDelta()-->ject> row = epw.nextModifiedRowKey()/getModifiedParentRows-->entityProcessor.nextModifiedParentRowKey()-->initQuery()-->dataSource.getData(q)-->ResultSetIterator r = new ResultSetIterator(query)-->getARow()
    

    剩下的就跟正常的solr处理流程差不多了

    Solr中DIH模式的使用

    solr连接数据库配置

    Solr学习(五)DIH增量、定时导入并检索数据

  • 相关阅读:
    让CEF支持FLASH(非安装插件的形式)
    解决SQLServer 2008 日志无法收缩,收缩后大小不改变
    HTML Socket实现 .NET
    JS基础之BOM对象
    JavaScript对象
    JS函数
    JavaScript概述
    CSS块级元素和行内元素
    返回顶部示例
    CSS属性操作二
  • 原文地址:https://www.cnblogs.com/donganwangshi/p/4276015.html
Copyright © 2011-2022 走看看