zoukankan      html  css  js  c++  java
  • solr dataimport 数据导入源码分析(十三)

    本文接下来分析EntityProcessor相关类,我们可以称之为实体处理器,针对不同的数据源有不同的实体处理器,屏蔽了不同数据源的差异

    本文只介绍针对数据库数据源的实体处理器,其他实体处理器类似

    EntityProcessor类为抽象类,定义了获取数据源的Map类型数据的方法(针对添加 修改 删除的数据)

    /**
     * <p>
     * An instance of entity processor serves an entity. It is reused throughout the
     * import process.
     * </p>
     * <p/>
     * <p>
     * Implementations of this abstract class must provide a public no-args constructor.
     * </p>
     * <p/>
     * <p>
     * Refer to <a
     * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
     * for more details.
     * </p>
     * <p/>
     * <b>This API is experimental and may change in the future.</b>
     *
     * @version $Id: EntityProcessor.java 824359 2009-10-12 14:31:54Z ehatcher $
     * @since solr 1.3
     */
    public abstract class EntityProcessor {
    
      /**
       * This method is called when it starts processing an entity. When it comes
       * back to the entity it is called again. So it can reset anything at that point.
       * For a rootmost entity this is called only once for an ingestion. For sub-entities , this
       * is called multiple once for each row from its parent entity
       *
       * @param context The current context
       */
      public abstract void init(Context context);
    
      /**
       * This method helps streaming the data for each row . The implementation
       * would fetch as many rows as needed and gives one 'row' at a time. Only this
       * method is used during a full import
       *
       * @return A 'row'.  The 'key' for the map is the column name and the 'value'
       *         is the value of that column. If there are no more rows to be
       *         returned, return 'null'
       */
      public abstract Map<String, Object> nextRow();
    
      /**
       * This is used for delta-import. It gives the pks of the changed rows in this
       * entity
       *
       * @return the pk vs value of all changed rows
       */
      public abstract Map<String, Object> nextModifiedRowKey();
    
      /**
       * This is used during delta-import. It gives the primary keys of the rows
       * that are deleted from this entity. If this entity is the root entity, solr
       * document is deleted. If this is a sub-entity, the Solr document is
       * considered as 'changed' and will be recreated
       *
       * @return the pk vs value of all changed rows
       */
      public abstract Map<String, Object> nextDeletedRowKey();
    
      /**
       * This is used during delta-import. This gives the primary keys and their
       * values of all the rows changed in a parent entity due to changes in this
       * entity.
       *
       * @return the pk vs value of all changed rows in the parent entity
       */
      public abstract Map<String, Object> nextModifiedParentRowKey();
    
      /**
       * Invoked for each parent-row after the last row for this entity is processed. If this is the root-most
       * entity, it will be called only once in the import, at the very end.
       * 
       */
      public abstract void destroy();
    
      /**
       * Invoked after the transformers are invoked. EntityProcessors can add, remove or modify values
       * added by Transformers in this method.
       *
       * @param r The transformed row
       * @since solr 1.4
       */
      public void postTransform(Map<String, Object> r) {
      }
    
      /**
       * Invoked when the Entity processor is destroyed towards the end of import.
       *
       * @since solr 1.4
       */
      public void close() {
        //no-op
      }
    }

    继承类EntityProcessorBase是所有具体实体处理器的基类,定义了公用方法,其中最重要的是Map<String, Object> getNext(),从数据迭代器Iterator<Map<String, Object>> rowIterator获取Map类型数据记录(其中DIHCacheSupport cacheSupport对象用于缓存

    protected Map<String, Object> getNext() {
        if(cacheSupport==null) {
          try {
            if (rowIterator == null)
              return null;
            if (rowIterator.hasNext())
              return rowIterator.next();
            query = null;
            rowIterator = null;
            return null;
          } catch (Exception e) {
            SolrException.log(log, "getNext() failed for query '" + query + "'", e);
            query = null;
            rowIterator = null;
            wrapAndThrow(DataImportHandlerException.WARN, e);
            return null;
          }
        } else  {
          return cacheSupport.getCacheData(context, query, rowIterator);
        }      
      }

    SqlEntityProcessor类为数据库数据源的实体处理器

    /**
     * <p>
     * An {@link EntityProcessor} instance which provides support for reading from
     * databases. It is used in conjunction with {@link JdbcDataSource}. This is the default
     * {@link EntityProcessor} if none is specified explicitly in data-config.xml
     * </p>
     * <p/>
     * <p>
     * Refer to <a
     * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
     * for more details.
     * </p>
     * <p/>
     * <b>This API is experimental and may change in the future.</b>
     *
     * @version $Id: SqlEntityProcessor.java 1065312 2011-01-30 16:08:25Z rmuir $
     * @since solr 1.3
     */
    public class SqlEntityProcessor extends EntityProcessorBase {
      private static final Logger LOG = LoggerFactory.getLogger(SqlEntityProcessor.class);
      //数据源
      protected DataSource<Iterator<Map<String, Object>>> dataSource;
      //初始化数据源
      @Override
      @SuppressWarnings("unchecked")
      public void init(Context context) {
        super.init(context);
        dataSource = context.getDataSource();
      }
      //初始化数据迭代器(根据查询语句从数据源获取)
      protected void initQuery(String q) {
        try {
          DataImporter.QUERY_COUNT.get().incrementAndGet();
          rowIterator = dataSource.getData(q);
          this.query = q;
        } catch (DataImportHandlerException e) {
          throw e;
        } catch (Exception e) {
          LOG.error( "The query failed '" + q + "'", e);
          throw new DataImportHandlerException(DataImportHandlerException.SEVERE, e);
        }
      }
    
      @Override
      public Map<String, Object> nextRow() {    
        if (rowIterator == null) {
          String q = getQuery();
          initQuery(context.replaceTokens(q));
        }
        return getNext();
      }
    
      @Override
      public Map<String, Object> nextModifiedRowKey() {
        if (rowIterator == null) {
          String deltaQuery = context.getEntityAttribute(DELTA_QUERY);
          if (deltaQuery == null)
            return null;
          initQuery(context.replaceTokens(deltaQuery));
        }
        return getNext();
      }
    
      @Override
      public Map<String, Object> nextDeletedRowKey() {
        if (rowIterator == null) {
          String deletedPkQuery = context.getEntityAttribute(DEL_PK_QUERY);
          if (deletedPkQuery == null)
            return null;
          initQuery(context.replaceTokens(deletedPkQuery));
        }
        return getNext();
      }
    
      @Override
      public Map<String, Object> nextModifiedParentRowKey() {
        if (rowIterator == null) {
          String parentDeltaQuery = context.getEntityAttribute(PARENT_DELTA_QUERY);
          if (parentDeltaQuery == null)
            return null;
          LOG.info("Running parentDeltaQuery for Entity: "
                  + context.getEntityAttribute("name"));
          initQuery(context.replaceTokens(parentDeltaQuery));
        }
        return getNext();
      }
      
      public String getQuery() {
        String queryString = context.getEntityAttribute(QUERY);
        if (Context.FULL_DUMP.equals(context.currentProcess())) {
          return queryString;
        }
        if (Context.DELTA_DUMP.equals(context.currentProcess())) {
          String deltaImportQuery = context.getEntityAttribute(DELTA_IMPORT_QUERY);
          if(deltaImportQuery != null) return deltaImportQuery;
        }
        LOG.warn("'deltaImportQuery' attribute is not specified for entity : "+ entityName);
        return getDeltaImportQuery(queryString);
      }
    
      public String getDeltaImportQuery(String queryString) {    
        StringBuilder sb = new StringBuilder(queryString);
        if (SELECT_WHERE_PATTERN.matcher(queryString).find()) {
          sb.append(" and ");
        } else {
          sb.append(" where ");
        }
        boolean first = true;
        String[] primaryKeys = context.getEntityAttribute("pk").split(",");
        for (String primaryKey : primaryKeys) {
          if (!first) {
            sb.append(" and ");
          }
          first = false;
          Object val = context.resolve("dataimporter.delta." + primaryKey);
          if (val == null) {
            Matcher m = DOT_PATTERN.matcher(primaryKey);
            if (m.find()) {
              val = context.resolve("dataimporter.delta." + m.group(1));
            }
          }
          sb.append(primaryKey).append(" = ");
          if (val instanceof Number) {
            sb.append(val.toString());
          } else {
            sb.append("'").append(val.toString()).append("'");
          }
        }
        return sb.toString();
      }
    
      private static Pattern SELECT_WHERE_PATTERN = Pattern.compile(
              "^\\s*(select\\b.*?\\b)(where).*", Pattern.CASE_INSENSITIVE);
    
      public static final String QUERY = "query";
    
      public static final String DELTA_QUERY = "deltaQuery";
    
      public static final String DELTA_IMPORT_QUERY = "deltaImportQuery";
    
      public static final String PARENT_DELTA_QUERY = "parentDeltaQuery";
    
      public static final String DEL_PK_QUERY = "deletedPkQuery";
    
      public static final Pattern DOT_PATTERN = Pattern.compile(".*?\\.(.*)$");
    }

     我们接下来分析EntityProcessorWrapper类,该类继承自抽象类EntityProcessor,用于装饰具体的实体处理器(装饰模式)

    其重要成员如下

     //被装饰的实体处理器 
     EntityProcessor delegate;
      private DocBuilder docBuilder;
    
      String onError;
      Context context;
      protected VariableResolverImpl resolver;
      String entityName;
    
      protected List<Transformer> transformers;
    
      protected List<Map<String, Object>> rowcache;

    在它的构造方法里面,初始化被装饰的成员对象

    public EntityProcessorWrapper(EntityProcessor delegate, DocBuilder docBuilder) {
        this.delegate = delegate;
        this.docBuilder = docBuilder;
      }

    初始化方法里面调用被装饰对象的初始化方法(获取数据源) 

    @Override
      public void init(Context context) {
        rowcache = null;
        this.context = context;
        resolver = (VariableResolverImpl) context.getVariableResolver();
        //context has to be set correctly . keep the copy of the old one so that it can be restored in destroy
        if (entityName == null) {
          onError = resolver.replaceTokens(context.getEntityAttribute(ON_ERROR));
          if (onError == null) {
              onError = ABORT;
          }
          entityName = context.getEntityAttribute(DataConfig.NAME);
        }
        delegate.init(context);
    
      }

    其他相关方法均为调用被装饰的具体实体处理器的相应方法,另外添加了数据转换等功能,本文不再具体分析 

    ---------------------------------------------------------------------------

    本系列solr dataimport 数据导入源码分析系本人原创

    转载请注明出处 博客园 刺猬的温驯

    本文链接 http://www.cnblogs.com/chenying99/archive/2013/05/04/3059397.html

  • 相关阅读:
    【3006】统计数字
    【5001】n皇后问题
    【7001】n阶法雷序列
    【9402】倒序数
    【9705】&&【a801】细胞
    【9802】闭合曲线面积
    【a803】营救
    【9112】求2的n次方的精确值
    V8引擎实现标准ECMA-262(三)
    仔细看看Javascript中的逻辑与(&&)和逻辑或(||)
  • 原文地址:https://www.cnblogs.com/chenying99/p/3059397.html
Copyright © 2011-2022 走看看