zoukankan      html  css  js  c++  java
  • solr服务器的查询过程

    SolrDispatchFilter的作用

    This filter looks at the incoming URL maps them to handlers defined in solrconfig.xml

    将请求的url映射到solrconfig.xml定义的handler上。

    该过滤器的doFilter方法主题:

     HttpSolrCall call = getHttpSolrCall((HttpServletRequest) request, (HttpServletResponse) response, retry);
        try {
          Action result = call.call();
          switch (result) {
            case PASSTHROUGH:
              chain.doFilter(request, response);
              break;
            case RETRY:
              doFilter(request, response, chain, true);
              break;
            case FORWARD:
              request.getRequestDispatcher(call.getPath()).forward(request, response);
              break;
          }  
        } finally {
          call.destroy();
        }

    HttpSolrCall的call方法处理这个请求:

    case PROCESS:
              final Method reqMethod = Method.getMethod(req.getMethod());
              HttpCacheHeaderUtil.setCacheControlHeader(config, resp, reqMethod);
              // unless we have been explicitly told not to, do cache validation
              // if we fail cache validation, execute the query
              if (config.getHttpCachingConfig().isNever304() ||
                  !HttpCacheHeaderUtil.doCacheHeaderValidation(solrReq, req, reqMethod, resp)) {
                SolrQueryResponse solrRsp = new SolrQueryResponse();
                  /* even for HEAD requests, we need to execute the handler to
                   * ensure we don't get an error (and to make sure the correct
                   * QueryResponseWriter is selected and we get the correct
                   * Content-Type)
                   */
                SolrRequestInfo.setRequestInfo(new SolrRequestInfo(solrReq, solrRsp));
                execute(solrRsp);
                HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);
                Iterator<Map.Entry<String, String>> headers = solrRsp.httpHeaders();
                while (headers.hasNext()) {
                  Map.Entry<String, String> entry = headers.next();
                  resp.addHeader(entry.getKey(), entry.getValue());
                }
                QueryResponseWriter responseWriter = core.getQueryResponseWriter(solrReq);
                if (invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates);
                writeResponse(solrRsp, responseWriter, reqMethod);
              }
              return RETURN;

    构造请求和响应,并调用SolrCore来处理:

    public void execute(SolrRequestHandler handler, SolrQueryRequest req, SolrQueryResponse rsp) {
        if (handler==null) {
          String msg = "Null Request Handler '" +
            req.getParams().get(CommonParams.QT) + "'";
    
          if (log.isWarnEnabled()) log.warn(logid + msg + ":" + req);
    
          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, msg);
        }
    
        preDecorateResponse(req, rsp);
    
        if (requestLog.isDebugEnabled() && rsp.getToLog().size() > 0) {
          // log request at debug in case something goes wrong and we aren't able to log later
          requestLog.debug(rsp.getToLogAsString(logid));
        }
    
        // TODO: this doesn't seem to be working correctly and causes problems with the example server and distrib (for example /spell)
        // if (req.getParams().getBool(ShardParams.IS_SHARD,false) && !(handler instanceof SearchHandler))
        //   throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,"isShard is only acceptable with search handlers");
    
    
        handler.handleRequest(req,rsp);
        postDecorateResponse(handler, req, rsp);
    
        if (rsp.getToLog().size() > 0) {
          if (requestLog.isInfoEnabled()) {
            requestLog.info(rsp.getToLogAsString(logid));
          }
    
          if (log.isWarnEnabled() && slowQueryThresholdMillis >= 0) {
            final long qtime = (long) (req.getRequestTimer().getTime());
            if (qtime >= slowQueryThresholdMillis) {
              log.warn("slow: " + rsp.getToLogAsString(logid));
            }
          }
        }
      }

    RequestHandlerBase映射到相应的handler,进行请求处理:

    /**
       * Initializes the {@link org.apache.solr.request.SolrRequestHandler} by creating three {@link org.apache.solr.common.params.SolrParams} named.
       * <table border="1" summary="table of parameters">
       * <tr><th>Name</th><th>Description</th></tr>
       * <tr><td>defaults</td><td>Contains all of the named arguments contained within the list element named "defaults".</td></tr>
       * <tr><td>appends</td><td>Contains all of the named arguments contained within the list element named "appends".</td></tr>
       * <tr><td>invariants</td><td>Contains all of the named arguments contained within the list element named "invariants".</td></tr>
       * </table>
       *
       * Example:
       * <pre>
       * &lt;lst name="defaults"&gt;
       * &lt;str name="echoParams"&gt;explicit&lt;/str&gt;
       * &lt;str name="qf"&gt;text^0.5 features^1.0 name^1.2 sku^1.5 id^10.0&lt;/str&gt;
       * &lt;str name="mm"&gt;2&lt;-1 5&lt;-2 6&lt;90%&lt;/str&gt;
       * &lt;str name="bq"&gt;incubationdate_dt:[* TO NOW/DAY-1MONTH]^2.2&lt;/str&gt;
       * &lt;/lst&gt;
       * &lt;lst name="appends"&gt;
       * &lt;str name="fq"&gt;inStock:true&lt;/str&gt;
       * &lt;/lst&gt;
       *
       * &lt;lst name="invariants"&gt;
       * &lt;str name="facet.field"&gt;cat&lt;/str&gt;
       * &lt;str name="facet.field"&gt;manu_exact&lt;/str&gt;
       * &lt;str name="facet.query"&gt;price:[* TO 500]&lt;/str&gt;
       * &lt;str name="facet.query"&gt;price:[500 TO *]&lt;/str&gt;
       * &lt;/lst&gt;
       * </pre>
       *
       *
       * @param args The {@link org.apache.solr.common.util.NamedList} to initialize from
       *
       * @see #handleRequest(org.apache.solr.request.SolrQueryRequest, org.apache.solr.response.SolrQueryResponse)
       * @see #handleRequestBody(org.apache.solr.request.SolrQueryRequest, org.apache.solr.response.SolrQueryResponse)
       * @see org.apache.solr.util.SolrPluginUtils#setDefaults(org.apache.solr.request.SolrQueryRequest, org.apache.solr.common.params.SolrParams, org.apache.solr.common.params.SolrParams, org.apache.solr.common.params.SolrParams)
       * @see SolrParams#toSolrParams(org.apache.solr.common.util.NamedList)
       *
       * See also the example solrconfig.xml located in the Solr codebase (example/solr/conf).
       */
    @Override
      public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp) {
        numRequests.incrementAndGet();
        TimerContext timer = requestTimes.time();
        try {
          if(pluginInfo != null && pluginInfo.attributes.containsKey(USEPARAM)) req.getContext().put(USEPARAM,pluginInfo.attributes.get(USEPARAM));
          SolrPluginUtils.setDefaults(this, req, defaults, appends, invariants);
          req.getContext().remove(USEPARAM);
          rsp.setHttpCaching(httpCaching);
          handleRequestBody( req, rsp );
          // count timeouts
          NamedList header = rsp.getResponseHeader();
          if(header != null) {
            Object partialResults = header.get("partialResults");
            boolean timedOut = partialResults == null ? false : (Boolean)partialResults;
            if( timedOut ) {
              numTimeouts.incrementAndGet();
              rsp.setHttpCaching(false);
            }
          }
        } catch (Exception e) {
          if (e instanceof SolrException) {
            SolrException se = (SolrException)e;
            if (se.code() == SolrException.ErrorCode.CONFLICT.code) {
              // TODO: should we allow this to be counted as an error (numErrors++)?
    
            } else {
              SolrException.log(SolrCore.log,e);
            }
          } else {
            SolrException.log(SolrCore.log,e);
            if (e instanceof SyntaxError) {
              e = new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
            }
          }
    
          rsp.setException(e);
          numErrors.incrementAndGet();
        }
        finally {
          timer.stop();
        }
      }

    对应的handler:SearchHandler来处理body:

    @Override
      public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception
      {
        List<SearchComponent> components  = getComponents();
        ResponseBuilder rb = new ResponseBuilder(req, rsp, components);
        if (rb.requestInfo != null) {
          rb.requestInfo.setResponseBuilder(rb);
        }
    
        boolean dbg = req.getParams().getBool(CommonParams.DEBUG_QUERY, false);
        rb.setDebug(dbg);
        if (dbg == false){//if it's true, we are doing everything anyway.
          SolrPluginUtils.getDebugInterests(req.getParams().getParams(CommonParams.DEBUG), rb);
        }
    
        final RTimerTree timer = rb.isDebug() ? req.getRequestTimer() : null;
    
        final ShardHandler shardHandler1 = getAndPrepShardHandler(req, rb); // creates a ShardHandler object only if it's needed
        
        if (timer == null) {
          // non-debugging prepare phase
          for( SearchComponent c : components ) {
            c.prepare(rb);
          }
        } else {
          // debugging prepare phase
          RTimerTree subt = timer.sub( "prepare" );
          for( SearchComponent c : components ) {
            rb.setTimer( subt.sub( c.getName() ) );
            c.prepare(rb);
            rb.getTimer().stop();
          }
          subt.stop();
        }
    
        if (!rb.isDistrib) {
          // a normal non-distributed request
    
          long timeAllowed = req.getParams().getLong(CommonParams.TIME_ALLOWED, -1L);
          if (timeAllowed > 0L) {
            SolrQueryTimeoutImpl.set(timeAllowed);
          }
          try {
            // The semantics of debugging vs not debugging are different enough that
            // it makes sense to have two control loops
            if(!rb.isDebug()) {
              // Process
              for( SearchComponent c : components ) {
                c.process(rb);
              }
            }
            else {
              // Process
              RTimerTree subt = timer.sub( "process" );
              for( SearchComponent c : components ) {
                rb.setTimer( subt.sub( c.getName() ) );
                c.process(rb);
                rb.getTimer().stop();
              }
              subt.stop();
    
              // add the timing info
              if (rb.isDebugTimings()) {
                rb.addDebugInfo("timing", timer.asNamedList() );
              }
            }
          } catch (ExitableDirectoryReader.ExitingReaderException ex) {
            log.warn( "Query: " + req.getParamString() + "; " + ex.getMessage());
            SolrDocumentList r = (SolrDocumentList) rb.rsp.getValues().get("response");
            if(r == null)
              r = new SolrDocumentList();
            r.setNumFound(0);
            rb.rsp.add("response", r);
            if(rb.isDebug()) {
              NamedList debug = new NamedList();
              debug.add("explain", new NamedList());
              rb.rsp.add("debug", debug);
            }
            rb.rsp.getResponseHeader().add("partialResults", Boolean.TRUE);
          } finally {
            SolrQueryTimeoutImpl.reset();
          }
        } else {
          // a distributed request
    
          if (rb.outgoing == null) {
            rb.outgoing = new LinkedList<>();
          }
          rb.finished = new ArrayList<>();
    
          int nextStage = 0;
          do {
            rb.stage = nextStage;
            nextStage = ResponseBuilder.STAGE_DONE;
    
            // call all components
            for( SearchComponent c : components ) {
              // the next stage is the minimum of what all components report
              nextStage = Math.min(nextStage, c.distributedProcess(rb));
            }
    
    
            // check the outgoing queue and send requests
            while (rb.outgoing.size() > 0) {
    
              // submit all current request tasks at once
              while (rb.outgoing.size() > 0) {
                ShardRequest sreq = rb.outgoing.remove(0);
                sreq.actualShards = sreq.shards;
                if (sreq.actualShards==ShardRequest.ALL_SHARDS) {
                  sreq.actualShards = rb.shards;
                }
                sreq.responses = new ArrayList<>(sreq.actualShards.length); // presume we'll get a response from each shard we send to
    
                // TODO: map from shard to address[]
                for (String shard : sreq.actualShards) {
                  ModifiableSolrParams params = new ModifiableSolrParams(sreq.params);
                  params.remove(ShardParams.SHARDS);      // not a top-level request
                  params.set(CommonParams.DISTRIB, "false");               // not a top-level request
                  params.remove("indent");
                  params.remove(CommonParams.HEADER_ECHO_PARAMS);
                  params.set(ShardParams.IS_SHARD, true);  // a sub (shard) request
                  params.set(ShardParams.SHARDS_PURPOSE, sreq.purpose);
                  params.set(ShardParams.SHARD_URL, shard); // so the shard knows what was asked
                  if (rb.requestInfo != null) {
                    // we could try and detect when this is needed, but it could be tricky
                    params.set("NOW", Long.toString(rb.requestInfo.getNOW().getTime()));
                  }
                  String shardQt = params.get(ShardParams.SHARDS_QT);
                  if (shardQt != null) {
                    params.set(CommonParams.QT, shardQt);
                  } else {
                    // for distributed queries that don't include shards.qt, use the original path
                    // as the default but operators need to update their luceneMatchVersion to enable
                    // this behavior since it did not work this way prior to 5.1
                    if (req.getCore().getSolrConfig().luceneMatchVersion.onOrAfter(Version.LUCENE_5_1_0)) {
                      String reqPath = (String) req.getContext().get(PATH);
                      if (!"/select".equals(reqPath)) {
                        params.set(CommonParams.QT, reqPath);
                      } // else if path is /select, then the qt gets passed thru if set
                    } else {
                      // this is the pre-5.1 behavior, which translates to sending the shard request to /select
                      params.remove(CommonParams.QT);
                    }
                  }
                  shardHandler1.submit(sreq, shard, params);
                }
              }
    
    
              // now wait for replies, but if anyone puts more requests on
              // the outgoing queue, send them out immediately (by exiting
              // this loop)
              boolean tolerant = rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false);
              while (rb.outgoing.size() == 0) {
                ShardResponse srsp = tolerant ? 
                    shardHandler1.takeCompletedIncludingErrors():
                    shardHandler1.takeCompletedOrError();
                if (srsp == null) break;  // no more requests to wait for
    
                // Was there an exception?  
                if (srsp.getException() != null) {
                  // If things are not tolerant, abort everything and rethrow
                  if(!tolerant) {
                    shardHandler1.cancelAll();
                    if (srsp.getException() instanceof SolrException) {
                      throw (SolrException)srsp.getException();
                    } else {
                      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, srsp.getException());
                    }
                  } else {
                    if(rsp.getResponseHeader().get("partialResults") == null) {
                      rsp.getResponseHeader().add("partialResults", Boolean.TRUE);
                    }
                  }
                }
    
                rb.finished.add(srsp.getShardRequest());
    
                // let the components see the responses to the request
                for(SearchComponent c : components) {
                  c.handleResponses(rb, srsp.getShardRequest());
                }
              }
            }
    
            for(SearchComponent c : components) {
              c.finishStage(rb);
            }
    
            // we are done when the next stage is MAX_VALUE
          } while (nextStage != Integer.MAX_VALUE);
        }
        
        // SOLR-5550: still provide shards.info if requested even for a short circuited distrib request
        if(!rb.isDistrib && req.getParams().getBool(ShardParams.SHARDS_INFO, false) && rb.shortCircuitedURL != null) {  
          NamedList<Object> shardInfo = new SimpleOrderedMap<Object>();
          SimpleOrderedMap<Object> nl = new SimpleOrderedMap<Object>();        
          if (rsp.getException() != null) {
            Throwable cause = rsp.getException();
            if (cause instanceof SolrServerException) {
              cause = ((SolrServerException)cause).getRootCause();
            } else {
              if (cause.getCause() != null) {
                cause = cause.getCause();
              }          
            }
            nl.add("error", cause.toString() );
            StringWriter trace = new StringWriter();
            cause.printStackTrace(new PrintWriter(trace));
            nl.add("trace", trace.toString() );
          }
          else {
            nl.add("numFound", rb.getResults().docList.matches());
            nl.add("maxScore", rb.getResults().docList.maxScore());
          }
          nl.add("shardAddress", rb.shortCircuitedURL);
          nl.add("time", req.getRequestTimer().getTime()); // elapsed time of this request so far
          
          int pos = rb.shortCircuitedURL.indexOf("://");        
          String shardInfoName = pos != -1 ? rb.shortCircuitedURL.substring(pos+3) : rb.shortCircuitedURL;
          shardInfo.add(shardInfoName, nl);   
          rsp.getValues().add(ShardParams.SHARDS_INFO,shardInfo);            
        }
      }

    1. 单机QueryComponent处理请求:

    /**
       * Actually run the query
       */
      @Override
      public void process(ResponseBuilder rb) throws IOException
      {
        LOG.debug("process: {}", rb.req.getParams());
      
        SolrQueryRequest req = rb.req;
        SolrParams params = req.getParams();
        if (!params.getBool(COMPONENT_NAME, true)) {
          return;
        }
        SolrIndexSearcher searcher = req.getSearcher();
    
        StatsCache statsCache = req.getCore().getStatsCache();
        
        int purpose = params.getInt(ShardParams.SHARDS_PURPOSE, ShardRequest.PURPOSE_GET_TOP_IDS);
        if ((purpose & ShardRequest.PURPOSE_GET_TERM_STATS) != 0) {
          statsCache.returnLocalStats(rb, searcher);
          return;
        }
        // check if we need to update the local copy of global dfs
        if ((purpose & ShardRequest.PURPOSE_SET_TERM_STATS) != 0) {
          // retrieve from request and update local cache
          statsCache.receiveGlobalStats(req);
        }
    
        SolrQueryResponse rsp = rb.rsp;
        IndexSchema schema = searcher.getSchema();
    
        // Optional: This could also be implemented by the top-level searcher sending
        // a filter that lists the ids... that would be transparent to
        // the request handler, but would be more expensive (and would preserve score
        // too if desired).
        String ids = params.get(ShardParams.IDS);
        if (ids != null) {
          SchemaField idField = schema.getUniqueKeyField();
          List<String> idArr = StrUtils.splitSmart(ids, ",", true);
          int[] luceneIds = new int[idArr.size()];
          int docs = 0;
          for (int i=0; i<idArr.size(); i++) {
            int id = searcher.getFirstMatch(
                    new Term(idField.getName(), idField.getType().toInternal(idArr.get(i))));
            if (id >= 0)
              luceneIds[docs++] = id;
          }
    
          DocListAndSet res = new DocListAndSet();
          res.docList = new DocSlice(0, docs, luceneIds, null, docs, 0);
          if (rb.isNeedDocSet()) {
            // TODO: create a cache for this!
            List<Query> queries = new ArrayList<>();
            queries.add(rb.getQuery());
            List<Query> filters = rb.getFilters();
            if (filters != null) queries.addAll(filters);
            res.docSet = searcher.getDocSet(queries);
          }
          rb.setResults(res);
    
          ResultContext ctx = new ResultContext();
          ctx.docs = rb.getResults().docList;
          ctx.query = null; // anything?
          rsp.add("response", ctx);
          return;
        }
    
        // -1 as flag if not set.
        long timeAllowed = params.getLong(CommonParams.TIME_ALLOWED, -1L);
        if (null != rb.getCursorMark() && 0 < timeAllowed) {
          // fundamentally incompatible
          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Can not search using both " +
                                  CursorMarkParams.CURSOR_MARK_PARAM + " and " + CommonParams.TIME_ALLOWED);
        }
    
        SolrIndexSearcher.QueryCommand cmd = rb.getQueryCommand();
        cmd.setTimeAllowed(timeAllowed);
    
        req.getContext().put(SolrIndexSearcher.STATS_SOURCE, statsCache.get(req));
        
        SolrIndexSearcher.QueryResult result = new SolrIndexSearcher.QueryResult();
    
        //
        // grouping / field collapsing
        //
        GroupingSpecification groupingSpec = rb.getGroupingSpec();
        if (groupingSpec != null) {
          try {
            boolean needScores = (cmd.getFlags() & SolrIndexSearcher.GET_SCORES) != 0;
            if (params.getBool(GroupParams.GROUP_DISTRIBUTED_FIRST, false)) {
              CommandHandler.Builder topsGroupsActionBuilder = new CommandHandler.Builder()
                  .setQueryCommand(cmd)
                  .setNeedDocSet(false) // Order matters here
                  .setIncludeHitCount(true)
                  .setSearcher(searcher);
    
              for (String field : groupingSpec.getFields()) {
                topsGroupsActionBuilder.addCommandField(new SearchGroupsFieldCommand.Builder()
                    .setField(schema.getField(field))
                    .setGroupSort(groupingSpec.getGroupSort())
                    .setTopNGroups(cmd.getOffset() + cmd.getLen())
                    .setIncludeGroupCount(groupingSpec.isIncludeGroupCount())
                    .build()
                );
              }
    
              CommandHandler commandHandler = topsGroupsActionBuilder.build();
              commandHandler.execute();
              SearchGroupsResultTransformer serializer = new SearchGroupsResultTransformer(searcher);
              rsp.add("firstPhase", commandHandler.processResult(result, serializer));
              rsp.add("totalHitCount", commandHandler.getTotalHitCount());
              rb.setResult(result);
              return;
            } else if (params.getBool(GroupParams.GROUP_DISTRIBUTED_SECOND, false)) {
              CommandHandler.Builder secondPhaseBuilder = new CommandHandler.Builder()
                  .setQueryCommand(cmd)
                  .setTruncateGroups(groupingSpec.isTruncateGroups() && groupingSpec.getFields().length > 0)
                  .setSearcher(searcher);
    
              for (String field : groupingSpec.getFields()) {
                SchemaField schemaField = schema.getField(field);
                String[] topGroupsParam = params.getParams(GroupParams.GROUP_DISTRIBUTED_TOPGROUPS_PREFIX + field);
                if (topGroupsParam == null) {
                  topGroupsParam = new String[0];
                }
    
                List<SearchGroup<BytesRef>> topGroups = new ArrayList<>(topGroupsParam.length);
                for (String topGroup : topGroupsParam) {
                  SearchGroup<BytesRef> searchGroup = new SearchGroup<>();
                  if (!topGroup.equals(TopGroupsShardRequestFactory.GROUP_NULL_VALUE)) {
                    searchGroup.groupValue = new BytesRef(schemaField.getType().readableToIndexed(topGroup));
                  }
                  topGroups.add(searchGroup);
                }
    
                secondPhaseBuilder.addCommandField(
                    new TopGroupsFieldCommand.Builder()
                        .setField(schemaField)
                        .setGroupSort(groupingSpec.getGroupSort())
                        .setSortWithinGroup(groupingSpec.getSortWithinGroup())
                        .setFirstPhaseGroups(topGroups)
                        .setMaxDocPerGroup(groupingSpec.getGroupOffset() + groupingSpec.getGroupLimit())
                        .setNeedScores(needScores)
                        .setNeedMaxScore(needScores)
                        .build()
                );
              }
    
              for (String query : groupingSpec.getQueries()) {
                secondPhaseBuilder.addCommandField(new QueryCommand.Builder()
                    .setDocsToCollect(groupingSpec.getOffset() + groupingSpec.getLimit())
                    .setSort(groupingSpec.getGroupSort())
                    .setQuery(query, rb.req)
                    .setDocSet(searcher)
                    .build()
                );
              }
    
              CommandHandler commandHandler = secondPhaseBuilder.build();
              commandHandler.execute();
              TopGroupsResultTransformer serializer = new TopGroupsResultTransformer(rb);
              rsp.add("secondPhase", commandHandler.processResult(result, serializer));
              rb.setResult(result);
              return;
            }
    
            int maxDocsPercentageToCache = params.getInt(GroupParams.GROUP_CACHE_PERCENTAGE, 0);
            boolean cacheSecondPassSearch = maxDocsPercentageToCache >= 1 && maxDocsPercentageToCache <= 100;
            Grouping.TotalCount defaultTotalCount = groupingSpec.isIncludeGroupCount() ?
                Grouping.TotalCount.grouped : Grouping.TotalCount.ungrouped;
            int limitDefault = cmd.getLen(); // this is normally from "rows"
            Grouping grouping =
                new Grouping(searcher, result, cmd, cacheSecondPassSearch, maxDocsPercentageToCache, groupingSpec.isMain());
            grouping.setSort(groupingSpec.getGroupSort())
                .setGroupSort(groupingSpec.getSortWithinGroup())
                .setDefaultFormat(groupingSpec.getResponseFormat())
                .setLimitDefault(limitDefault)
                .setDefaultTotalCount(defaultTotalCount)
                .setDocsPerGroupDefault(groupingSpec.getGroupLimit())
                .setGroupOffsetDefault(groupingSpec.getGroupOffset())
                .setGetGroupedDocSet(groupingSpec.isTruncateGroups());
    
            if (groupingSpec.getFields() != null) {
              for (String field : groupingSpec.getFields()) {
                grouping.addFieldCommand(field, rb.req);
              }
            }
    
            if (groupingSpec.getFunctions() != null) {
              for (String groupByStr : groupingSpec.getFunctions()) {
                grouping.addFunctionCommand(groupByStr, rb.req);
              }
            }
    
            if (groupingSpec.getQueries() != null) {
              for (String groupByStr : groupingSpec.getQueries()) {
                grouping.addQueryCommand(groupByStr, rb.req);
              }
            }
    
            if (rb.doHighlights || rb.isDebug() || params.getBool(MoreLikeThisParams.MLT, false)) {
              // we need a single list of the returned docs
              cmd.setFlags(SolrIndexSearcher.GET_DOCLIST);
            }
    
            grouping.execute();
            if (grouping.isSignalCacheWarning()) {
              rsp.add(
                  "cacheWarning",
                  String.format(Locale.ROOT, "Cache limit of %d percent relative to maxdoc has exceeded. Please increase cache size or disable caching.", maxDocsPercentageToCache)
              );
            }
            rb.setResult(result);
    
            if (grouping.mainResult != null) {
              ResultContext ctx = new ResultContext();
              ctx.docs = grouping.mainResult;
              ctx.query = null; // TODO? add the query?
              rsp.add("response", ctx);
              rsp.getToLog().add("hits", grouping.mainResult.matches());
            } else if (!grouping.getCommands().isEmpty()) { // Can never be empty since grouping.execute() checks for this.
              rsp.add("grouped", result.groupedResults);
              rsp.getToLog().add("hits", grouping.getCommands().get(0).getMatches());
            }
            return;
          } catch (SyntaxError e) {
            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
          }
        }
    
        // normal search result
        searcher.search(result, cmd);
        rb.setResult(result);
    
        ResultContext ctx = new ResultContext();
        ctx.docs = rb.getResults().docList;
        ctx.query = rb.getQuery();
        rsp.add("response", ctx);
        rsp.getToLog().add("hits", rb.getResults().docList.matches());
    
        if ( ! rb.req.getParams().getBool(ShardParams.IS_SHARD,false) ) {
          if (null != rb.getNextCursorMark()) {
            rb.rsp.add(CursorMarkParams.CURSOR_MARK_NEXT,
                       rb.getNextCursorMark().getSerializedTotem());
          }
        }
    
        if(rb.mergeFieldHandler != null) {
          rb.mergeFieldHandler.handleMergeFields(rb, searcher);
        } else {
          doFieldSortValues(rb, searcher);
        }
    
        doPrefetch(rb);
      }

    调用SolrIndexSearcher:基于lucence IndexSearcher之上加入缓存功能和schema aware

    SolrIndexSearcher adds schema awareness and caching functionality
     over the lucene IndexSearcher
      public QueryResult search(QueryResult qr, QueryCommand cmd) throws IOException {
        getDocListC(qr,cmd);
        return qr;
      }
    private void getDocListNC(QueryResult qr,QueryCommand cmd) throws IOException {
        int len = cmd.getSupersetMaxDoc();
        int last = len;
        if (last < 0 || last > maxDoc()) last=maxDoc();
        final int lastDocRequested = last;
        int nDocsReturned;
        int totalHits;
        float maxScore;
        int[] ids;
        float[] scores;
    
        boolean needScores = (cmd.getFlags() & GET_SCORES) != 0;
        
        Query query = QueryUtils.makeQueryable(cmd.getQuery());
    
        ProcessedFilter pf = getProcessedFilter(cmd.getFilter(), cmd.getFilterList());
        if (pf.filter != null) {
          query = new FilteredQuery(query, pf.filter);
        }
    
        // handle zero case...
        if (lastDocRequested<=0) {
          final float[] topscore = new float[] { Float.NEGATIVE_INFINITY };
          final int[] numHits = new int[1];
    
          Collector collector;
    
          if (!needScores) {
            collector = new SimpleCollector () {
              @Override
              public void collect(int doc) {
                numHits[0]++;
              }
              
              @Override
              public boolean needsScores() {
                return false;
              }
            };
          } else {
            collector = new SimpleCollector() {
              Scorer scorer;
              @Override
              public void setScorer(Scorer scorer) {
                this.scorer = scorer;
              }
              @Override
              public void collect(int doc) throws IOException {
                numHits[0]++;
                float score = scorer.score();
                if (score > topscore[0]) topscore[0]=score;            
              }
              
              @Override
              public boolean needsScores() {
                return true;
              }
            };
          }
          
          buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter);
    
          nDocsReturned=0;
          ids = new int[nDocsReturned];
          scores = new float[nDocsReturned];
          totalHits = numHits[0];
          maxScore = totalHits>0 ? topscore[0] : 0.0f;
          // no docs on this page, so cursor doesn't change
          qr.setNextCursorMark(cmd.getCursorMark());
        } else {
          final TopDocsCollector topCollector = buildTopDocsCollector(len, cmd);
          Collector collector = topCollector;
          buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter);
    
          totalHits = topCollector.getTotalHits();
          TopDocs topDocs = topCollector.topDocs(0, len);
          populateNextCursorMarkFromTopDocs(qr, cmd, topDocs);
    
          maxScore = totalHits>0 ? topDocs.getMaxScore() : 0.0f;
          nDocsReturned = topDocs.scoreDocs.length;
          ids = new int[nDocsReturned];
          scores = (cmd.getFlags()&GET_SCORES)!=0 ? new float[nDocsReturned] : null;
          for (int i=0; i<nDocsReturned; i++) {
            ScoreDoc scoreDoc = topDocs.scoreDocs[i];
            ids[i] = scoreDoc.doc;
            if (scores != null) scores[i] = scoreDoc.score;
          }
        }
    
        int sliceLen = Math.min(lastDocRequested,nDocsReturned);
        if (sliceLen < 0) sliceLen=0;
        qr.setDocList(new DocSlice(0,sliceLen,ids,scores,totalHits,maxScore));
      }

    2. 集群查找

    HttpShardHandler调用:

    @Override
      public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
        // do this outside of the callable for thread safety reasons
        final List<String> urls = getURLs(sreq, shard);
    
        Callable<ShardResponse> task = new Callable<ShardResponse>() {
          @Override
          public ShardResponse call() throws Exception {
    
            ShardResponse srsp = new ShardResponse();
            if (sreq.nodeName != null) {
              srsp.setNodeName(sreq.nodeName);
            }
            srsp.setShardRequest(sreq);
            srsp.setShard(shard);
            SimpleSolrResponse ssr = new SimpleSolrResponse();
            srsp.setSolrResponse(ssr);
            long startTime = System.nanoTime();
    
            try {
              params.remove(CommonParams.WT); // use default (currently javabin)
              params.remove(CommonParams.VERSION);
    
              QueryRequest req = makeQueryRequest(sreq, params, shard);
              req.setMethod(SolrRequest.METHOD.POST);
    
              // no need to set the response parser as binary is the default
              // req.setResponseParser(new BinaryResponseParser());
    
              // if there are no shards available for a slice, urls.size()==0
              if (urls.size()==0) {
                // TODO: what's the right error code here? We should use the same thing when
                // all of the servers for a shard are down.
                throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
              }
    
              if (urls.size() <= 1) {
                String url = urls.get(0);
                srsp.setShardAddress(url);
                try (SolrClient client = new HttpSolrClient(url, httpClient)) {
                  ssr.nl = client.request(req);
                }
              } else {
                LBHttpSolrClient.Rsp rsp = httpShardHandlerFactory.makeLoadBalancedRequest(req, urls);
                ssr.nl = rsp.getResponse();
                srsp.setShardAddress(rsp.getServer());
              }
            }
            catch( ConnectException cex ) {
              srsp.setException(cex); //????
            } catch (Exception th) {
              srsp.setException(th);
              if (th instanceof SolrException) {
                srsp.setResponseCode(((SolrException)th).code());
              } else {
                srsp.setResponseCode(-1);
              }
            }
    
            ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
    
            return transfomResponse(sreq, srsp, shard);
          }
        };
    
        try {
          if (shard != null)  {
            MDC.put("ShardRequest.shards", shard);
          }
          if (urls != null && !urls.isEmpty())  {
            MDC.put("ShardRequest.urlList", urls.toString());
          }
          pending.add( completionService.submit(task) );
        } finally {
          MDC.remove("ShardRequest.shards");
          MDC.remove("ShardRequest.urlList");
        }
      }

    调用LBHttpSolrClient处理:

    该类的功能说明:

    /**
     * LBHttpSolrClient or "LoadBalanced HttpSolrClient" is a load balancing wrapper around
     * {@link HttpSolrClient}. This is useful when you
     * have multiple Solr servers and the requests need to be Load Balanced among them.
     *
     * Do <b>NOT</b> use this class for indexing in master/slave scenarios since documents must be sent to the
     * correct master; no inter-node routing is done.
     *
     * In SolrCloud (leader/replica) scenarios, it is usually better to use
     * {@link CloudSolrClient}, but this class may be used
     * for updates because the server will forward them to the appropriate leader.
     *
     * <p>
     * It offers automatic failover when a server goes down and it detects when the server comes back up.
     * <p>
     * Load balancing is done using a simple round-robin on the list of servers.
     * <p>
     * If a request to a server fails by an IOException due to a connection timeout or read timeout then the host is taken
     * off the list of live servers and moved to a 'dead server list' and the request is resent to the next live server.
     * This process is continued till it tries all the live servers. If at least one server is alive, the request succeeds,
     * and if not it fails.
     * <blockquote><pre>
     * SolrClient lbHttpSolrClient = new LBHttpSolrClient("http://host1:8080/solr/", "http://host2:8080/solr", "http://host2:8080/solr");
     * //or if you wish to pass the HttpClient do as follows
     * httpClient httpClient = new HttpClient();
     * SolrClient lbHttpSolrClient = new LBHttpSolrClient(httpClient, "http://host1:8080/solr/", "http://host2:8080/solr", "http://host2:8080/solr");
     * </pre></blockquote>
     * This detects if a dead server comes alive automatically. The check is done in fixed intervals in a dedicated thread.
     * This interval can be set using {@link #setAliveCheckInterval} , the default is set to one minute.
     * <p>
     * <b>When to use this?</b><br> This can be used as a software load balancer when you do not wish to setup an external
     * load balancer. Alternatives to this code are to use
     * a dedicated hardware load balancer or using Apache httpd with mod_proxy_balancer as a load balancer. See <a
     * href="http://en.wikipedia.org/wiki/Load_balancing_(computing)">Load balancing on Wikipedia</a>
     *
     * @since solr 1.4
     */

    具体的执行方法如下:

      /**
       * Makes a request to one or more of the given urls, using the configured load balancer.
       *
       * @param req The solr search request that should be sent through the load balancer
       * @param urls The list of solr server urls to load balance across
       * @return The response from the request
       */
      public LBHttpSolrClient.Rsp makeLoadBalancedRequest(final QueryRequest req, List<String> urls)
        throws SolrServerException, IOException {
        return loadbalancer.request(new LBHttpSolrClient.Req(req, urls));
      }

    调用request处理请求:

     /**
       * Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped.
       * If a request fails due to an IOException, the server is moved to the dead pool for a certain period of
       * time, or until a test request on that server succeeds.
       *
       * Servers are queried in the exact order given (except servers currently in the dead pool are skipped).
       * If no live servers from the provided list remain to be tried, a number of previously skipped dead servers will be tried.
       * Req.getNumDeadServersToTry() controls how many dead servers will be tried.
       *
       * If no live servers are found a SolrServerException is thrown.
       *
       * @param req contains both the request as well as the list of servers to query
       *
       * @return the result of the request
       *
       * @throws IOException If there is a low-level I/O error.
       */
      public Rsp request(Req req) throws SolrServerException, IOException {
        Rsp rsp = new Rsp();
        Exception ex = null;
        boolean isUpdate = req.request instanceof IsUpdateRequest;
        List<ServerWrapper> skipped = null;
    
        long timeAllowedNano = getTimeAllowedInNanos(req.getRequest());
        long timeOutTime = System.nanoTime() + timeAllowedNano;
        for (String serverStr : req.getServers()) {
          if(isTimeExceeded(timeAllowedNano, timeOutTime)) {
            break;
          }
          
          serverStr = normalize(serverStr);
          // if the server is currently a zombie, just skip to the next one
          ServerWrapper wrapper = zombieServers.get(serverStr);
          if (wrapper != null) {
            // System.out.println("ZOMBIE SERVER QUERIED: " + serverStr);
            final int numDeadServersToTry = req.getNumDeadServersToTry();
            if (numDeadServersToTry > 0) {
              if (skipped == null) {
                skipped = new ArrayList<>(numDeadServersToTry);
                skipped.add(wrapper);
              }
              else if (skipped.size() < numDeadServersToTry) {
                skipped.add(wrapper);
              }
            }
            continue;
          }
          rsp.server = serverStr;
          try {
            MDC.put("LBHttpSolrClient.url", serverStr);
            HttpSolrClient client = makeSolrClient(serverStr);
    
            ex = doRequest(client, req, rsp, isUpdate, false, null);
            if (ex == null) {
              return rsp; // SUCCESS
            }
          } finally {
            MDC.remove("LBHttpSolrClient.url");
          }
        }
    
        // try the servers we previously skipped
        if (skipped != null) {
          for (ServerWrapper wrapper : skipped) {
            if(isTimeExceeded(timeAllowedNano, timeOutTime)) {
              break;
            }
    
            ex = doRequest(wrapper.client, req, rsp, isUpdate, true, wrapper.getKey());
            if (ex == null) {
              return rsp; // SUCCESS
            }
          }
        }
    
    
        if (ex == null) {
          throw new SolrServerException("No live SolrServers available to handle this request");
        } else {
          throw new SolrServerException("No live SolrServers available to handle this request:" + zombieServers.keySet(), ex);
        }
    
      }
  • 相关阅读:
    Java读书笔记(2)-输入输出
    Java读书笔记(1)-异常处理
    Photoshop自动导出各尺寸Android和Iphone图标,支持新版Android Studio
    【原创】我的研究生活
    [原创]使用Fiddler抓取手机APP流量--360WIFI
    Federa 7 配置yum 源
    开源自己写的刷票器软件(windows和Android)
    更新linux kernel到3.14.10 LTS版后,virt-manager无法识别qemu hypervisor的问题
    Net Core Identity 身份验证:注册、登录和注销 (简单示例)
    Net Core的API文档工具Swagger
  • 原文地址:https://www.cnblogs.com/davidwang456/p/4988775.html
Copyright © 2011-2022 走看看