zoukankan      html  css  js  c++  java
  • solr源码分析之数据导入DataImporter追溯。

      若要搜索的信息都是被存储在数据库里面的,但是solr不能直接搜数据库,所以只有借助Solr组件将要搜索的信息在搜索服务器上进行索引,然后在客户端供客户使用。

    1. SolrDispatchFilter

    SolrDispatchFilter的作用:将请求的url映射到定义在solrconfig.xml中的处理器handler。

    要处理的动作有:

      enum Action {
        PASSTHROUGH, FORWARD, RETURN, RETRY, ADMIN, REMOTEQUERY, PROCESS
      }

    PASSTHROUGH:通过webapp传递到Restlet。

    FORWARD:跳转重写的url(没有路径前缀和核心/集合名称)到Restlet。

    RETURN:返回控制,不需要更多特定的处理,通常在设置错误并返回时产生。

    RETRY:重试请求。当没有发现工作的core时,设置此参数。

    注:核心是指CoreContainer

    SolrDispatchFilter间接继承了javax.servlet.Filter,实现方法为doFilter():

    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain, boolean retry) throws IOException, ServletException {
        if (!(request instanceof HttpServletRequest)) return;
    
        AtomicReference<ServletRequest> wrappedRequest = new AtomicReference();
        if (!authenticateRequest(request, response, wrappedRequest)) { // the response and status code have already been sent
          return;
        }
        if (wrappedRequest.get() != null) {
          request = wrappedRequest.get();
        }
        if (cores.getAuthenticationPlugin() != null) {
          log.debug("User principal: {}", ((HttpServletRequest)request).getUserPrincipal());
        }
    
        // No need to even create the HttpSolrCall object if this path is excluded.
        if(excludePatterns != null) {
          String servletPath = ((HttpServletRequest) request).getServletPath();
          for (Pattern p : excludePatterns) {
            Matcher matcher = p.matcher(servletPath);
            if (matcher.lookingAt()) {
              chain.doFilter(request, response);
              return;
            }
          }
        }
        
        HttpSolrCall call = getHttpSolrCall((HttpServletRequest) request, (HttpServletResponse) response, retry);
        try {
          Action result = call.call();
          switch (result) {
            case PASSTHROUGH:
              chain.doFilter(request, response);
              break;
            case RETRY:
              doFilter(request, response, chain, true);
              break;
            case FORWARD:
              request.getRequestDispatcher(call.getPath()).forward(request, response);
              break;
          }  
        } finally {
          call.destroy();
        }
      }

    SolrDispatchFilter调用HttpSolrCall的call()方法来处理。

    2. 调用HttpSolrCall处理请求

    HttpSolrCall的构造函数:

      public HttpSolrCall(SolrDispatchFilter solrDispatchFilter, CoreContainer cores,
                   HttpServletRequest request, HttpServletResponse response, boolean retry) {
        this.solrDispatchFilter = solrDispatchFilter;
        this.cores = cores;
        this.req = request;
        this.response = response;
        this.retry = retry;
        this.requestType = RequestType.UNKNOWN;
        queryParams = SolrRequestParsers.parseQueryString(req.getQueryString());
      }

    在call方法中完整请求处理:

     /**
       * This method processes the request.
       */
      public Action call() throws IOException {
        MDCLoggingContext.reset();
        MDCLoggingContext.setNode(cores);
    
        if (cores == null) {
          sendError(503, "Server is shutting down or failed to initialize");
          return RETURN;
        }
    
        if (solrDispatchFilter.abortErrorMessage != null) {
          sendError(500, solrDispatchFilter.abortErrorMessage);
          return RETURN;
        }
    
        try {
          init();
          /* Authorize the request if
           1. Authorization is enabled, and
           2. The requested resource is not a known static file
            */
          if (cores.getAuthorizationPlugin() != null) {
            AuthorizationContext context = getAuthCtx();
            log.info(context.toString());
            AuthorizationResponse authResponse = cores.getAuthorizationPlugin().authorize(context);
            if (!(authResponse.statusCode == HttpStatus.SC_ACCEPTED) && !(authResponse.statusCode == HttpStatus.SC_OK)) {
              sendError(authResponse.statusCode,
                  "Unauthorized request, Response code: " + authResponse.statusCode);
              return RETURN;
            }
          }
    
          HttpServletResponse resp = response;
          switch (action) {
            case ADMIN:
              handleAdminRequest();
              return RETURN;
            case REMOTEQUERY:
              remoteQuery(coreUrl + path, resp);
              return RETURN;
            case PROCESS:
              final Method reqMethod = Method.getMethod(req.getMethod());
              HttpCacheHeaderUtil.setCacheControlHeader(config, resp, reqMethod);
              // unless we have been explicitly told not to, do cache validation
              // if we fail cache validation, execute the query
              if (config.getHttpCachingConfig().isNever304() ||
                  !HttpCacheHeaderUtil.doCacheHeaderValidation(solrReq, req, reqMethod, resp)) {
                SolrQueryResponse solrRsp = new SolrQueryResponse();
                  /* even for HEAD requests, we need to execute the handler to
                   * ensure we don't get an error (and to make sure the correct
                   * QueryResponseWriter is selected and we get the correct
                   * Content-Type)
                   */
                SolrRequestInfo.setRequestInfo(new SolrRequestInfo(solrReq, solrRsp));
                execute(solrRsp);
                HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);
                Iterator<Map.Entry<String, String>> headers = solrRsp.httpHeaders();
                while (headers.hasNext()) {
                  Map.Entry<String, String> entry = headers.next();
                  resp.addHeader(entry.getKey(), entry.getValue());
                }
                QueryResponseWriter responseWriter = core.getQueryResponseWriter(solrReq);
                if (invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates);
                writeResponse(solrRsp, responseWriter, reqMethod);
              }
              return RETURN;
            default: return action;
          }
        } catch (Throwable ex) {
          sendError(ex);
          // walk the the entire cause chain to search for an Error
          Throwable t = ex;
          while (t != null) {
            if (t instanceof Error) {
              if (t != ex) {
                SolrDispatchFilter.log.error("An Error was wrapped in another exception - please report complete stacktrace on SOLR-6161", ex);
              }
              throw (Error) t;
            }
            t = t.getCause();
          }
          return RETURN;
        } finally {
          MDCLoggingContext.clear();
        }
    
      }

    3.获取handler

    RequestHandlerBase获取handler:

    /**
       * Get the request handler registered to a given name.
       *
       * This function is thread safe.
       */
      public static SolrRequestHandler getRequestHandler(String handlerName, PluginBag<SolrRequestHandler> reqHandlers) {
        if(handlerName == null) return null;
        SolrRequestHandler handler = reqHandlers.get(handlerName);
        int idx = 0;
        if(handler == null) {
          for (; ; ) {
            idx = handlerName.indexOf('/', idx+1);
            if (idx > 0) {
              String firstPart = handlerName.substring(0, idx);
              handler = reqHandlers.get(firstPart);
              if (handler == null) continue;
              if (handler instanceof NestedRequestHandler) {
                return ((NestedRequestHandler) handler).getSubHandler(handlerName.substring(idx));
              }
            } else {
              break;
            }
          }
        }
        return handler;
      }

    4.处理请求handleRequest

    RequestHandlerBase的handleRequest()方法处理请求:

    public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp) {
        numRequests.incrementAndGet();
        TimerContext timer = requestTimes.time();
        try {
          if(pluginInfo != null && pluginInfo.attributes.containsKey(USEPARAM)) req.getContext().put(USEPARAM,pluginInfo.attributes.get(USEPARAM));
          SolrPluginUtils.setDefaults(this, req, defaults, appends, invariants);
          req.getContext().remove(USEPARAM);
          rsp.setHttpCaching(httpCaching);
          handleRequestBody( req, rsp );
          // count timeouts
          NamedList header = rsp.getResponseHeader();
          if(header != null) {
            Object partialResults = header.get("partialResults");
            boolean timedOut = partialResults == null ? false : (Boolean)partialResults;
            if( timedOut ) {
              numTimeouts.incrementAndGet();
              rsp.setHttpCaching(false);
            }
          }
        } catch (Exception e) {
          if (e instanceof SolrException) {
            SolrException se = (SolrException)e;
            if (se.code() == SolrException.ErrorCode.CONFLICT.code) {
              // TODO: should we allow this to be counted as an error (numErrors++)?
    
            } else {
              SolrException.log(SolrCore.log,e);
            }
          } else {
            SolrException.log(SolrCore.log,e);
            if (e instanceof SyntaxError) {
              e = new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
            }
          }
    
          rsp.setException(e);
          numErrors.incrementAndGet();
        }
        finally {
          timer.stop();
        }
      }

    5.具体请求落到各个handler的handleRequestBody()方法,以DataImportHandler为例:

     @Override
      @SuppressWarnings("unchecked")
      public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp)
              throws Exception {
        rsp.setHttpCaching(false);
        
        //TODO: figure out why just the first one is OK...
        ContentStream contentStream = null;
        Iterable<ContentStream> streams = req.getContentStreams();
        if(streams != null){
          for (ContentStream stream : streams) {
              contentStream = stream;
              break;
          }
        }
        SolrParams params = req.getParams();
        NamedList defaultParams = (NamedList) initArgs.get("defaults");
        RequestInfo requestParams = new RequestInfo(req, getParamsMap(params), contentStream);
        String command = requestParams.getCommand();
        
        if (DataImporter.SHOW_CONF_CMD.equals(command)) {    
          String dataConfigFile = params.get("config");
          String dataConfig = params.get("dataConfig");
          if(dataConfigFile != null) {
            dataConfig = SolrWriter.getResourceAsString(req.getCore().getResourceLoader().openResource(dataConfigFile));
          }
          if(dataConfig==null)  {
            rsp.add("status", DataImporter.MSG.NO_CONFIG_FOUND);
          } else {
            // Modify incoming request params to add wt=raw
            ModifiableSolrParams rawParams = new ModifiableSolrParams(req.getParams());
            rawParams.set(CommonParams.WT, "raw");
            req.setParams(rawParams);
            ContentStreamBase content = new ContentStreamBase.StringStream(dataConfig);
            rsp.add(RawResponseWriter.CONTENT, content);
          }
          return;
        }
    
        rsp.add("initArgs", initArgs);
        String message = "";
    
        if (command != null) {
          rsp.add("command", command);
        }
        // If importer is still null
        if (importer == null) {
          rsp.add("status", DataImporter.MSG.NO_INIT);
          return;
        }
    
        if (command != null && DataImporter.ABORT_CMD.equals(command)) {
          importer.runCmd(requestParams, null);
        } else if (importer.isBusy()) {
          message = DataImporter.MSG.CMD_RUNNING;
        } else if (command != null) {
          if (DataImporter.FULL_IMPORT_CMD.equals(command)
                  || DataImporter.DELTA_IMPORT_CMD.equals(command) ||
                  IMPORT_CMD.equals(command)) {
            importer.maybeReloadConfiguration(requestParams, defaultParams);
            UpdateRequestProcessorChain processorChain =
                    req.getCore().getUpdateProcessorChain(params);
            UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
            SolrResourceLoader loader = req.getCore().getResourceLoader();
            DIHWriter sw = getSolrWriter(processor, loader, requestParams, req);
            
            if (requestParams.isDebug()) {
              if (debugEnabled) {
                // Synchronous request for the debug mode
                importer.runCmd(requestParams, sw);
                rsp.add("mode", "debug");
                rsp.add("documents", requestParams.getDebugInfo().debugDocuments);
                if (requestParams.getDebugInfo().debugVerboseOutput != null) {
                  rsp.add("verbose-output", requestParams.getDebugInfo().debugVerboseOutput);
                }
              } else {
                message = DataImporter.MSG.DEBUG_NOT_ENABLED;
              }
            } else {
              // Asynchronous request for normal mode
              if(requestParams.getContentStream() == null && !requestParams.isSyncMode()){
                importer.runAsync(requestParams, sw);
              } else {
                importer.runCmd(requestParams, sw);
              }
            }
          } else if (DataImporter.RELOAD_CONF_CMD.equals(command)) { 
            if(importer.maybeReloadConfiguration(requestParams, defaultParams)) {
              message = DataImporter.MSG.CONFIG_RELOADED;
            } else {
              message = DataImporter.MSG.CONFIG_NOT_RELOADED;
            }
          }
        }
        rsp.add("status", importer.isBusy() ? "busy" : "idle");
        rsp.add("importResponse", message);
        rsp.add("statusMessages", importer.getStatusMessages());
      }

     6. 导入数据操作

    分全量和增量:

    void runCmd(RequestInfo reqParams, DIHWriter sw) {
        String command = reqParams.getCommand();
        if (command.equals(ABORT_CMD)) {
          if (docBuilder != null) {
            docBuilder.abort();
          }
          return;
        }
        if (!importLock.tryLock()){
          LOG.warn("Import command failed . another import is running");      
          return;
        }
        try {
          if (FULL_IMPORT_CMD.equals(command) || IMPORT_CMD.equals(command)) {
            doFullImport(sw, reqParams);
          } else if (command.equals(DELTA_IMPORT_CMD)) {
            doDeltaImport(sw, reqParams);
          }
        } finally {
          importLock.unlock();
        }
      }

    以全量为例:

      public void doFullImport(DIHWriter writer, RequestInfo requestParams) {
        LOG.info("Starting Full Import");
        setStatus(Status.RUNNING_FULL_DUMP);
        try {
          DIHProperties dihPropWriter = createPropertyWriter();
          setIndexStartTime(dihPropWriter.getCurrentTimestamp());
          docBuilder = new DocBuilder(this, writer, dihPropWriter, requestParams);
          checkWritablePersistFile(writer, dihPropWriter);
          docBuilder.execute();
          if (!requestParams.isDebug())
            cumulativeStatistics.add(docBuilder.importStatistics);
        } catch (Exception e) {
          SolrException.log(LOG, "Full Import failed", e);
          docBuilder.handleError("Full Import failed", e);
        } finally {
          setStatus(Status.IDLE);
          DocBuilder.INSTANCE.set(null);
        }
    
      }

    7. EntityProcessorWrapper处理sql的实现类SqlEntityProcessor

      @Override
      public void init(Context context) {
        rowcache = null;
        this.context = context;
        resolver = (VariableResolver) context.getVariableResolver();
        if (entityName == null) {
          onError = resolver.replaceTokens(context.getEntityAttribute(ON_ERROR));
          if (onError == null) onError = ABORT;
          entityName = context.getEntityAttribute(ConfigNameConstants.NAME);
        }
        delegate.init(context);
    
      }

    初始化时实现SqlEntityProcessor的初始化

      public void init(Context context) {
        super.init(context);
        dataSource = context.getDataSource();
      }

    contextImpl

      @Override
      public DataSource getDataSource() {
        if (ds != null) return ds;
        if(epw==null) { return null; }
        if (epw!=null && epw.getDatasource() == null) {
          epw.setDatasource(dataImporter.getDataSourceInstance(epw.getEntity(), epw.getEntity().getDataSourceName(), this));
        }
        if (epw!=null && epw.getDatasource() != null && docBuilder != null && docBuilder.verboseDebug &&
                 Context.FULL_DUMP.equals(currentProcess())) {
          //debug is not yet implemented properly for deltas
          epw.setDatasource(docBuilder.getDebugLogger().wrapDs(epw.getDatasource()));
        }
        return epw.getDatasource();
      }

    DataImporter获取数据库配置:

    public DataSource getDataSourceInstance(Entity key, String name, Context ctx) {
        Map<String,String> p = requestLevelDataSourceProps.get(name);
        if (p == null)
          p = config.getDataSources().get(name);
        if (p == null)
          p = requestLevelDataSourceProps.get(null);// for default data source
        if (p == null)
          p = config.getDataSources().get(null);
        if (p == null)  
          throw new DataImportHandlerException(SEVERE,
                  "No dataSource :" + name + " available for entity :" + key.getName());
        String type = p.get(TYPE);
        DataSource dataSrc = null;
        if (type == null) {
          dataSrc = new JdbcDataSource();
        } else {
          try {
            dataSrc = (DataSource) DocBuilder.loadClass(type, getCore()).newInstance();
          } catch (Exception e) {
            wrapAndThrow(SEVERE, e, "Invalid type for data source: " + type);
          }
        }
        try {
          Properties copyProps = new Properties();
          copyProps.putAll(p);
          Map<String, Object> map = ctx.getRequestParameters();
          if (map.containsKey("rows")) {
            int rows = Integer.parseInt((String) map.get("rows"));
            if (map.containsKey("start")) {
              rows += Integer.parseInt((String) map.get("start"));
            }
            copyProps.setProperty("maxRows", String.valueOf(rows));
          }
          dataSrc.init(ctx, copyProps);
        } catch (Exception e) {
          wrapAndThrow(SEVERE, e, "Failed to initialize DataSource: " + key.getDataSourceName());
        }
        return dataSrc;
      }

    8.查询结果

    public ResultSetIterator(String query) {
    
          try {
            Connection c = getConnection();
            stmt = c.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            stmt.setFetchSize(batchSize);
            stmt.setMaxRows(maxRows);
            LOG.debug("Executing SQL: " + query);
            long start = System.nanoTime();
            if (stmt.execute(query)) {
              resultSet = stmt.getResultSet();
            }
            LOG.trace("Time taken for sql :"
                    + TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS));
            colNames = readFieldNames(resultSet.getMetaData());
          } catch (Exception e) {
            wrapAndThrow(SEVERE, e, "Unable to execute query: " + query);
          }
          if (resultSet == null) {
            rSetIterator = new ArrayList<Map<String, Object>>().iterator();
            return;
          }
    
          rSetIterator = new Iterator<Map<String, Object>>() {
            @Override
            public boolean hasNext() {
              return hasnext();
            }
    
            @Override
            public Map<String, Object> next() {
              return getARow();
            }
    
            @Override
            public void remove() {/* do nothing */
            }
          };
        }

    solr支持数据库的全量和增量索引建立,上述代码介绍了全量索引的来龙去脉,增量索引和全量索引雷同,就不赘述了。

  • 相关阅读:
    WCF 第十章 异常处理 创建并使用强类型错误
    WCF 第十章 总结
    WCF 第十章 异常处理
    WCF 第十章 异常处理 使用FaultException管理服务异常
    WCF 第十章 异常处理 通信异常细节
    哪本书是对程序员最有影响、每个程序员都该阅读的书?(转自外刊IT评论)
    WCF 第十一章 工作流服务 从WF调用一个WCF服务
    比尔盖茨给大学毕业生的11条人生忠告
    王爽 汇编语言 实验七
    王爽 汇编语言 实验五第5题和第6题
  • 原文地址:https://www.cnblogs.com/davidwang456/p/4754628.html
Copyright © 2011-2022 走看看