zoukankan      html  css  js  c++  java
  • Solr DIH JDBC 源码解析

    Solr DIH 源码解析

    DataImportHandler.handleRequestBody()中的importer.runCmd(requestParams, sw)

    if (DataImporter.FULL_IMPORT_CMD.equals(command)
          || DataImporter.DELTA_IMPORT_CMD.equals(command) ||
          IMPORT_CMD.equals(command)) {
    importer.maybeReloadConfiguration(requestParams, defaultParams);
    UpdateRequestProcessorChain processorChain =
            req.getCore().getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN));
    UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
    SolrResourceLoader loader = req.getCore().getResourceLoader();
    DIHWriter sw = getSolrWriter(processor, loader, requestParams, req);//创建DIHWriter
    
    if (requestParams.isDebug()) {
      if (debugEnabled) {
        // Synchronous request for the debug mode
        importer.runCmd(requestParams, sw);
        rsp.add("mode", "debug");
        rsp.add("documents", requestParams.getDebugInfo().debugDocuments);
        if (requestParams.getDebugInfo().debugVerboseOutput != null) {
          rsp.add("verbose-output", requestParams.getDebugInfo().debugVerboseOutput);
        }
      } else {
        message = DataImporter.MSG.DEBUG_NOT_ENABLED;
      }
    } else {
      // Asynchronous request for normal mode
      if(requestParams.getContentStream() == null && !requestParams.isSyncMode()){
        importer.runAsync(requestParams, sw);
      } else {
        importer.runCmd(requestParams, sw);
      }
    

    这里会先创建DIHWriter

    DIHWriter sw = getSolrWriter(processor, loader, requestParams, req);//创建DIHWriter
    

    getSolrWriter在DataImportHandler的实现,是实例化了一个SolrWriter

      private DIHWriter getSolrWriter(final UpdateRequestProcessor processor,
          final SolrResourceLoader loader, final RequestInfo requestParams,
          SolrQueryRequest req) {
        SolrParams reqParams = req.getParams();
        String writerClassStr = null;
        if (reqParams != null && reqParams.get(PARAM_WRITER_IMPL) != null) {
          writerClassStr = (String) reqParams.get(PARAM_WRITER_IMPL);
        }
        DIHWriter writer;
        if (writerClassStr != null
            && !writerClassStr.equals(DEFAULT_WRITER_NAME)
            && !writerClassStr.equals(DocBuilder.class.getPackage().getName() + "."
                + DEFAULT_WRITER_NAME)) {
          try {
            @SuppressWarnings("unchecked")
            Class<DIHWriter> writerClass = DocBuilder.loadClass(writerClassStr, req.getCore());
            Constructor<DIHWriter> cnstr = writerClass.getConstructor(new Class[] {
                UpdateRequestProcessor.class, SolrQueryRequest.class});
            return cnstr.newInstance((Object) processor, (Object) req);
          } catch (Exception e) {
            throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
                "Unable to load Writer implementation:" + writerClassStr, e);
          }
        } else {
          return new SolrWriter(processor, req) {
            @Override
            public boolean upload(SolrInputDocument document) {
              try {
                return super.upload(document);//called by DocBuilder.buildDocument
              } catch (RuntimeException e) {
                LOG.error("Exception while adding: " + document, e);
                return false;
              }
            }
          };
        }
      }
    

    DataImporter中runCmd()方法

    if (!importLock.tryLock()){
      LOG.warn("Import command failed . another import is running");      
      return;
    }
    try {
      if (FULL_IMPORT_CMD.equals(command) || IMPORT_CMD.equals(command)) {
        doFullImport(sw, reqParams);
      } else if (command.equals(DELTA_IMPORT_CMD)) {
        doDeltaImport(sw, reqParams);
      }
    } finally {
      importLock.unlock();
    }
    

    这里importLock是独占锁

    private ReentrantLock importLock = new ReentrantLock();
    

    主要逻辑自然就是doFullImport()和doDeltaImport()中了,这里全量,增量这两都差不多

    public void doDeltaImport(DIHWriter writer, RequestInfo requestParams) {
    LOG.info("Starting Delta Import");
    setStatus(Status.RUNNING_DELTA_DUMP);
    try {
      DIHProperties dihPropWriter = createPropertyWriter();
      setIndexStartTime(dihPropWriter.getCurrentTimestamp());
      docBuilder = new DocBuilder(this, writer, dihPropWriter, requestParams);
      checkWritablePersistFile(writer, dihPropWriter);
      docBuilder.execute();
      if (!requestParams.isDebug())
        cumulativeStatistics.add(docBuilder.importStatistics);
    } catch (Exception e) {
      LOG.error("Delta Import Failed", e);
      docBuilder.handleError("Delta Import Failed", e);
    } finally {
      setStatus(Status.IDLE);
      DocBuilder.INSTANCE.set(null);
    }
    
    }
    
    public void doFullImport(DIHWriter writer, RequestInfo requestParams) {
    LOG.info("Starting Full Import");
    setStatus(Status.RUNNING_FULL_DUMP);
    try {
      DIHProperties dihPropWriter = createPropertyWriter();
      setIndexStartTime(dihPropWriter.getCurrentTimestamp());
      docBuilder = new DocBuilder(this, writer, dihPropWriter, requestParams);
      checkWritablePersistFile(writer, dihPropWriter);
      docBuilder.execute();
      if (!requestParams.isDebug())
        cumulativeStatistics.add(docBuilder.importStatistics);
    } catch (Exception e) {
      SolrException.log(LOG, "Full Import failed", e);
      docBuilder.handleError("Full Import failed", e);
    } finally {
      setStatus(Status.IDLE);
      DocBuilder.INSTANCE.set(null);
    }
    
    }
    
    

    都是实例化一个DocBuilder,然后调用起execute()方法,在execute()中调用doDelta()或者doFullDump,doFullDump为例

    private void doFullDump() {
    addStatusMessage("Full Dump Started");    
    buildDocument(getVariableResolver(), null, null, currentEntityProcessorWrapper, true, null);
    }
    

    buildDocument的调用

    buildDocument-->DocBuilder.buildDocument-->buildDocument-->boolean result = writer.upload(doc)
    

      private void buildDocument(VariableResolver vr, DocWrapper doc,
                                 Map<String, Object> pk, EntityProcessorWrapper epw, boolean isRoot,
                                 ContextImpl parentCtx, List<EntityProcessorWrapper> entitiesToDestroy)
    

    中会实例化ContextImpl,初始化EntityProcessorWrapper,调用

    Map<String, Object> arow = epw.nextRow();
    

    获取数据,最后调用SolrWriter的upload()

    实例化ContextImpl

    ContextImpl ctx = new ContextImpl(epw, vr, null,
                pk == null ? Context.FULL_DUMP : Context.DELTA_DUMP,
                session, parentCtx, this);
    

    初始化EntityProcessorWrapper,这里最后SqlEntityProcessor在中获取dataSource

    public void init(Context context) {
    rowcache = null;
    this.context = context;
    resolver = (VariableResolver) context.getVariableResolver();
    if (entityName == null) {
      onError = resolver.replaceTokens(context.getEntityAttribute(ON_ERROR));
      if (onError == null) onError = ABORT;
      entityName = context.getEntityAttribute(ConfigNameConstants.NAME);
    }
    delegate.init(context);//SqlEntityProcessor
    
    }
    

    在SqlEntityProcessor中getDataSource()的调用

    ContextImpl.getDataSource()-->dataImporter.getDataSourceInstance-->JdbcDataSource.init-->createConnectionFactory
    

    最后在createConnectionFactory中开到了url,driver,中算出现这货了。

    看看 epw.nextRow();是怎么获取数据的

       arow = delegate.nextRow();
    

    nextRow里边主要两个方法initQuery()与getNext()

    @Override
    public Map<String, Object> nextRow() {    
    if (rowIterator == null) {
      String q = getQuery();
      initQuery(context.replaceTokens(q));
    }
    return getNext();
    }
    

    initQuery()

    try {
      DataImporter.QUERY_COUNT.get().incrementAndGet();
      rowIterator = dataSource.getData(q);
      this.query = q;
    

    在会实例化一个ResultSetIterator

    public Iterator<Map<String, Object>> getData(String query) {
    ResultSetIterator r = new ResultSetIterator(query);
    return r.getIterator();
    }
    

    这里ResultSetIterator才是执行sql的好么,orz

    Connection c = getConnection();
    stmt = c.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
    stmt.setFetchSize(batchSize);
    stmt.setMaxRows(maxRows);
    LOG.debug("Executing SQL: " + query);
    long start = System.nanoTime();
    if (stmt.execute(query)) {
      resultSet = stmt.getResultSet();
    }
    

    这里注意batchSize的设定,最好配置为batchSize=-1,具体参考正确使用MySQL JDBC setFetchSize()方法解决JDBC处理大结果集 java.lang.OutOfMemoryEr
    batchSize 的初始值为FETCH_SIZE=500

     private int batchSize = FETCH_SIZE;
    

    配置的batchsize的赋值为

      String bsz = initProps.getProperty("batchSize");
      if (bsz != null) {
        bsz = context.replaceTokens(bsz);
        try {
          batchSize = Integer.parseInt(bsz);
          if (batchSize == -1)
            batchSize = Integer.MIN_VALUE;
        } catch (NumberFormatException e) {
          LOG.warn("Invalid batch size: " + bsz);
        }
      }    
    

    由于获取数据时stmt配置为

    Connection c = getConnection();
    stmt = c.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
    stmt.setFetchSize(batchSize);
    stmt.setMaxRows(maxRows);
    

    这种方式下为何避免OOM需要设定FetchSize为Integer.MIN_VALUE; 参考正确使用MySQL JDBC setFetchSize()方法解决JDBC处理大结果集 java.lang.OutOfMemoryEr

    最后返回一个rSetIterator给后面(getNext())用,看看getNext()中怎么使用的

    if (rowIterator.hasNext())
          return rowIterator.next();
    

    这里如果rowIterator不为null就会调用next()方法,rSetIterator中next()的实现

    public Map<String, Object> next() {
      return getARow();
    }
    

    getARow()的实现,这里不就是从resultSet中获取数据包装成Map么,soga

    
    private Map<String, Object> getARow() {
      if (resultSet == null)
        return null;
      Map<String, Object> result = new HashMap<>();
      for (String colName : colNames) {
        try {
          if (!convertType) {
            // Use underlying database's type information except for BigDecimal and BigInteger
            // which cannot be serialized by JavaBin/XML. See SOLR-6165
            Object value = resultSet.getObject(colName);
            if (value instanceof BigDecimal || value instanceof BigInteger) {
              result.put(colName, value.toString());
            } else {
              result.put(colName, value);
            }
            continue;
          }
    
          Integer type = fieldNameVsType.get(colName);
          if (type == null)
            type = Types.VARCHAR;
          switch (type) {
            case Types.INTEGER:
              result.put(colName, resultSet.getInt(colName));
              break;
            case Types.FLOAT:
              result.put(colName, resultSet.getFloat(colName));
              break;
            case Types.BIGINT:
              result.put(colName, resultSet.getLong(colName));
              break;
            case Types.DOUBLE:
              result.put(colName, resultSet.getDouble(colName));
              break;
            case Types.DATE:
              result.put(colName, resultSet.getTimestamp(colName));
              break;
            case Types.BOOLEAN:
              result.put(colName, resultSet.getBoolean(colName));
              break;
            case Types.BLOB:
              result.put(colName, resultSet.getBytes(colName));
              break;
            default:
              result.put(colName, resultSet.getString(colName));
              break;
          }
        } catch (SQLException e) {
          logError("Error reading data ", e);
          wrapAndThrow(SEVERE, e, "Error reading data from database");
        }
      }
      return result;
    }
    

    再回来看看upload

     boolean result = writer.upload(doc);
    

    SolrWriter的upload()方法

    @Override
    public boolean upload(SolrInputDocument d) {
    try {
      AddUpdateCommand command = new AddUpdateCommand(req);
      command.solrDoc = d;
      command.commitWithin = commitWithin;
      processor.processAdd(command);
    } catch (Exception e) {
      log.warn("Error creating document : " + d, e);
      return false;
    }
    
    return true;
    }
    

    这里SolrWriter的processor又是来自processor链chain,处理就跟solr的一样了,如RunUpdateProcessor,LogUpdateProcessor

    UpdateRequestProcessorChain processorChain =
                    req.getCore().getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN));
            UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
            SolrResourceLoader loader = req.getCore().getResourceLoader();
            DIHWriter sw = getSolrWriter(processor, loader, requestParams, req);
    
    
  • 相关阅读:
    Python常用模块之sys
    python操作zip文件
    python的os模块
    [Python模块学习]用qrcode模块生成二维码
    os模块os.walk() 方法和os.path.join()的简单使用
    python操作redis详解
    成员变量和局部变量
    类和对象 引用属性和方法举例
    Java String字符串/==和equals区别,str。toCharAt(),getBytes,indexOf过滤存在字符,trim()/String与StringBuffer多线程安全/StringBuilder单线程—— 14.0
    泛型--面向对象8
  • 原文地址:https://www.cnblogs.com/donganwangshi/p/4379219.html
Copyright © 2011-2022 走看看