zoukankan      html  css  js  c++  java
  • solr dataimport 数据导入源码分析(三)

    在介绍DocBuilder 类之前,我们先来解读数据导入对应实体处理器EntityProcessor,默认的实体处理器为SqlEntityProcessor

    EntityProcessor 为抽象类,具体方法由子类实现

     package org.apache.solr.handler.dataimport;


    import java.util.Map;
    public abstract class EntityProcessor {
    public abstract void init(Context context);
     public abstract Map<String, Object> nextRow();
    public abstract Map<String, Object> nextModifiedRowKey();
    public abstract Map<String, Object> nextDeletedRowKey();
    public abstract Map<String, Object> nextModifiedParentRowKey();
    public abstract void destroy();
    public void postTransform(Map<String, Object> r) {
      }
    public void close() {
        //no-op
      }
    EntityProcessorBase继承类封装公用逻辑,其中比较重要的是getNext()方法,用于遍历数据迭代器,供子类调用
    protected Iterator<Map<String, Object>> rowIterator;
    protected DIHCacheSupport cacheSupport = null

    protected Map<String, Object> getNext() {
        if(cacheSupport==null) {
          try {
            if (rowIterator == null)
              return null;
            if (rowIterator.hasNext())
              return rowIterator.next();
            query = null;
            rowIterator = null;
            return null;
          } catch (Exception e) {
            SolrException.log(log, "getNext() failed for query '" + query + "'", e);
            query = null;
            rowIterator = null;
            wrapAndThrow(DataImportHandlerException.WARN, e);
            return null;
          }
        } else  {
          return cacheSupport.getCacheData(context, query, rowIterator);
        }      
      }

    真正的数据处理器为SqlEntityProcessor,简要代码如下

     package org.apache.solr.handler.dataimport;


    import java.util.Iterator;
    import java.util.Map;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;

    public class SqlEntityProcessor extends EntityProcessorBase {

    protected DataSource<Iterator<Map<String, Object>>> dataSource;

      @Override
      @SuppressWarnings("unchecked")
      public void init(Context context) {
        super.init(context);
        dataSource = context.getDataSource();
      }

      protected void initQuery(String q) {
        try {
          DataImporter.QUERY_COUNT.get().incrementAndGet();
          rowIterator = dataSource.getData(q);
          this.query = q;
        } catch (DataImportHandlerException e) {
          throw e;
        } catch (Exception e) {
          LOG.error( "The query failed '" + q + "'", e);
          throw new DataImportHandlerException(DataImportHandlerException.SEVERE, e);
        }
      }

      @Override
      public Map<String, Object> nextRow() {    
        if (rowIterator == null) {
          String q = getQuery();
          initQuery(context.replaceTokens(q));
        }
        return getNext();
      }

      @Override
      public Map<String, Object> nextModifiedRowKey() {
        if (rowIterator == null) {
          String deltaQuery = context.getEntityAttribute(DELTA_QUERY);
          if (deltaQuery == null)
            return null;
          initQuery(context.replaceTokens(deltaQuery));
        }
        return getNext();
      }

      @Override
      public Map<String, Object> nextDeletedRowKey() {
        if (rowIterator == null) {
          String deletedPkQuery = context.getEntityAttribute(DEL_PK_QUERY);
          if (deletedPkQuery == null)
            return null;
          initQuery(context.replaceTokens(deletedPkQuery));
        }
        return getNext();
      }

      @Override
      public Map<String, Object> nextModifiedParentRowKey() {
        if (rowIterator == null) {
          String parentDeltaQuery = context.getEntityAttribute(PARENT_DELTA_QUERY);
          if (parentDeltaQuery == null)
            return null;
          LOG.info("Running parentDeltaQuery for Entity: "
                  + context.getEntityAttribute("name"));
          initQuery(context.replaceTokens(parentDeltaQuery));
        }
        return getNext();
      }
    }
  • 相关阅读:
    POJ 1328 Radar Installation
    POJ 1700 Crossing River
    POJ 1700 Crossing River
    poj 3253 Fence Repair (贪心,优先队列)
    poj 3253 Fence Repair (贪心,优先队列)
    poj 3069 Saruman's Army(贪心)
    poj 3069 Saruman's Army(贪心)
    Redis 笔记与总结2 String 类型和 Hash 类型
    数据分析方法有哪些_数据分析方法
    数据分析方法有哪些_数据分析方法
  • 原文地址:https://www.cnblogs.com/chenying99/p/2677743.html
Copyright © 2011-2022 走看看