SolrDispatchFilter的作用
This filter looks at the incoming URL maps them to handlers defined in solrconfig.xml
将请求的url映射到solrconfig.xml定义的handler上。
该过滤器的doFilter方法主题:
HttpSolrCall call = getHttpSolrCall((HttpServletRequest) request, (HttpServletResponse) response, retry); try { Action result = call.call(); switch (result) { case PASSTHROUGH: chain.doFilter(request, response); break; case RETRY: doFilter(request, response, chain, true); break; case FORWARD: request.getRequestDispatcher(call.getPath()).forward(request, response); break; } } finally { call.destroy(); }
HttpSolrCall的call方法处理这个请求:
case PROCESS: final Method reqMethod = Method.getMethod(req.getMethod()); HttpCacheHeaderUtil.setCacheControlHeader(config, resp, reqMethod); // unless we have been explicitly told not to, do cache validation // if we fail cache validation, execute the query if (config.getHttpCachingConfig().isNever304() || !HttpCacheHeaderUtil.doCacheHeaderValidation(solrReq, req, reqMethod, resp)) { SolrQueryResponse solrRsp = new SolrQueryResponse(); /* even for HEAD requests, we need to execute the handler to * ensure we don't get an error (and to make sure the correct * QueryResponseWriter is selected and we get the correct * Content-Type) */ SolrRequestInfo.setRequestInfo(new SolrRequestInfo(solrReq, solrRsp)); execute(solrRsp); HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod); Iterator<Map.Entry<String, String>> headers = solrRsp.httpHeaders(); while (headers.hasNext()) { Map.Entry<String, String> entry = headers.next(); resp.addHeader(entry.getKey(), entry.getValue()); } QueryResponseWriter responseWriter = core.getQueryResponseWriter(solrReq); if (invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates); writeResponse(solrRsp, responseWriter, reqMethod); } return RETURN;
构造请求和响应,并调用SolrCore来处理:
public void execute(SolrRequestHandler handler, SolrQueryRequest req, SolrQueryResponse rsp) { if (handler==null) { String msg = "Null Request Handler '" + req.getParams().get(CommonParams.QT) + "'"; if (log.isWarnEnabled()) log.warn(logid + msg + ":" + req); throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, msg); } preDecorateResponse(req, rsp); if (requestLog.isDebugEnabled() && rsp.getToLog().size() > 0) { // log request at debug in case something goes wrong and we aren't able to log later requestLog.debug(rsp.getToLogAsString(logid)); } // TODO: this doesn't seem to be working correctly and causes problems with the example server and distrib (for example /spell) // if (req.getParams().getBool(ShardParams.IS_SHARD,false) && !(handler instanceof SearchHandler)) // throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,"isShard is only acceptable with search handlers"); handler.handleRequest(req,rsp); postDecorateResponse(handler, req, rsp); if (rsp.getToLog().size() > 0) { if (requestLog.isInfoEnabled()) { requestLog.info(rsp.getToLogAsString(logid)); } if (log.isWarnEnabled() && slowQueryThresholdMillis >= 0) { final long qtime = (long) (req.getRequestTimer().getTime()); if (qtime >= slowQueryThresholdMillis) { log.warn("slow: " + rsp.getToLogAsString(logid)); } } } }
RequestHandlerBase映射到相应的handler,进行请求处理:
/** * Initializes the {@link org.apache.solr.request.SolrRequestHandler} by creating three {@link org.apache.solr.common.params.SolrParams} named. * <table border="1" summary="table of parameters"> * <tr><th>Name</th><th>Description</th></tr> * <tr><td>defaults</td><td>Contains all of the named arguments contained within the list element named "defaults".</td></tr> * <tr><td>appends</td><td>Contains all of the named arguments contained within the list element named "appends".</td></tr> * <tr><td>invariants</td><td>Contains all of the named arguments contained within the list element named "invariants".</td></tr> * </table> * * Example: * <pre> * <lst name="defaults"> * <str name="echoParams">explicit</str> * <str name="qf">text^0.5 features^1.0 name^1.2 sku^1.5 id^10.0</str> * <str name="mm">2<-1 5<-2 6<90%</str> * <str name="bq">incubationdate_dt:[* TO NOW/DAY-1MONTH]^2.2</str> * </lst> * <lst name="appends"> * <str name="fq">inStock:true</str> * </lst> * * <lst name="invariants"> * <str name="facet.field">cat</str> * <str name="facet.field">manu_exact</str> * <str name="facet.query">price:[* TO 500]</str> * <str name="facet.query">price:[500 TO *]</str> * </lst> * </pre> * * * @param args The {@link org.apache.solr.common.util.NamedList} to initialize from * * @see #handleRequest(org.apache.solr.request.SolrQueryRequest, org.apache.solr.response.SolrQueryResponse) * @see #handleRequestBody(org.apache.solr.request.SolrQueryRequest, org.apache.solr.response.SolrQueryResponse) * @see org.apache.solr.util.SolrPluginUtils#setDefaults(org.apache.solr.request.SolrQueryRequest, org.apache.solr.common.params.SolrParams, org.apache.solr.common.params.SolrParams, org.apache.solr.common.params.SolrParams) * @see SolrParams#toSolrParams(org.apache.solr.common.util.NamedList) * * See also the example solrconfig.xml located in the Solr codebase (example/solr/conf). */ @Override public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp) { numRequests.incrementAndGet(); TimerContext timer = requestTimes.time(); try { if(pluginInfo != null && pluginInfo.attributes.containsKey(USEPARAM)) req.getContext().put(USEPARAM,pluginInfo.attributes.get(USEPARAM)); SolrPluginUtils.setDefaults(this, req, defaults, appends, invariants); req.getContext().remove(USEPARAM); rsp.setHttpCaching(httpCaching); handleRequestBody( req, rsp ); // count timeouts NamedList header = rsp.getResponseHeader(); if(header != null) { Object partialResults = header.get("partialResults"); boolean timedOut = partialResults == null ? false : (Boolean)partialResults; if( timedOut ) { numTimeouts.incrementAndGet(); rsp.setHttpCaching(false); } } } catch (Exception e) { if (e instanceof SolrException) { SolrException se = (SolrException)e; if (se.code() == SolrException.ErrorCode.CONFLICT.code) { // TODO: should we allow this to be counted as an error (numErrors++)? } else { SolrException.log(SolrCore.log,e); } } else { SolrException.log(SolrCore.log,e); if (e instanceof SyntaxError) { e = new SolrException(SolrException.ErrorCode.BAD_REQUEST, e); } } rsp.setException(e); numErrors.incrementAndGet(); } finally { timer.stop(); } }
对应的handler:SearchHandler来处理body:
@Override public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { List<SearchComponent> components = getComponents(); ResponseBuilder rb = new ResponseBuilder(req, rsp, components); if (rb.requestInfo != null) { rb.requestInfo.setResponseBuilder(rb); } boolean dbg = req.getParams().getBool(CommonParams.DEBUG_QUERY, false); rb.setDebug(dbg); if (dbg == false){//if it's true, we are doing everything anyway. SolrPluginUtils.getDebugInterests(req.getParams().getParams(CommonParams.DEBUG), rb); } final RTimerTree timer = rb.isDebug() ? req.getRequestTimer() : null; final ShardHandler shardHandler1 = getAndPrepShardHandler(req, rb); // creates a ShardHandler object only if it's needed if (timer == null) { // non-debugging prepare phase for( SearchComponent c : components ) { c.prepare(rb); } } else { // debugging prepare phase RTimerTree subt = timer.sub( "prepare" ); for( SearchComponent c : components ) { rb.setTimer( subt.sub( c.getName() ) ); c.prepare(rb); rb.getTimer().stop(); } subt.stop(); } if (!rb.isDistrib) { // a normal non-distributed request long timeAllowed = req.getParams().getLong(CommonParams.TIME_ALLOWED, -1L); if (timeAllowed > 0L) { SolrQueryTimeoutImpl.set(timeAllowed); } try { // The semantics of debugging vs not debugging are different enough that // it makes sense to have two control loops if(!rb.isDebug()) { // Process for( SearchComponent c : components ) { c.process(rb); } } else { // Process RTimerTree subt = timer.sub( "process" ); for( SearchComponent c : components ) { rb.setTimer( subt.sub( c.getName() ) ); c.process(rb); rb.getTimer().stop(); } subt.stop(); // add the timing info if (rb.isDebugTimings()) { rb.addDebugInfo("timing", timer.asNamedList() ); } } } catch (ExitableDirectoryReader.ExitingReaderException ex) { log.warn( "Query: " + req.getParamString() + "; " + ex.getMessage()); SolrDocumentList r = (SolrDocumentList) rb.rsp.getValues().get("response"); if(r == null) r = new SolrDocumentList(); r.setNumFound(0); rb.rsp.add("response", r); if(rb.isDebug()) { NamedList debug = new NamedList(); debug.add("explain", new NamedList()); rb.rsp.add("debug", debug); } rb.rsp.getResponseHeader().add("partialResults", Boolean.TRUE); } finally { SolrQueryTimeoutImpl.reset(); } } else { // a distributed request if (rb.outgoing == null) { rb.outgoing = new LinkedList<>(); } rb.finished = new ArrayList<>(); int nextStage = 0; do { rb.stage = nextStage; nextStage = ResponseBuilder.STAGE_DONE; // call all components for( SearchComponent c : components ) { // the next stage is the minimum of what all components report nextStage = Math.min(nextStage, c.distributedProcess(rb)); } // check the outgoing queue and send requests while (rb.outgoing.size() > 0) { // submit all current request tasks at once while (rb.outgoing.size() > 0) { ShardRequest sreq = rb.outgoing.remove(0); sreq.actualShards = sreq.shards; if (sreq.actualShards==ShardRequest.ALL_SHARDS) { sreq.actualShards = rb.shards; } sreq.responses = new ArrayList<>(sreq.actualShards.length); // presume we'll get a response from each shard we send to // TODO: map from shard to address[] for (String shard : sreq.actualShards) { ModifiableSolrParams params = new ModifiableSolrParams(sreq.params); params.remove(ShardParams.SHARDS); // not a top-level request params.set(CommonParams.DISTRIB, "false"); // not a top-level request params.remove("indent"); params.remove(CommonParams.HEADER_ECHO_PARAMS); params.set(ShardParams.IS_SHARD, true); // a sub (shard) request params.set(ShardParams.SHARDS_PURPOSE, sreq.purpose); params.set(ShardParams.SHARD_URL, shard); // so the shard knows what was asked if (rb.requestInfo != null) { // we could try and detect when this is needed, but it could be tricky params.set("NOW", Long.toString(rb.requestInfo.getNOW().getTime())); } String shardQt = params.get(ShardParams.SHARDS_QT); if (shardQt != null) { params.set(CommonParams.QT, shardQt); } else { // for distributed queries that don't include shards.qt, use the original path // as the default but operators need to update their luceneMatchVersion to enable // this behavior since it did not work this way prior to 5.1 if (req.getCore().getSolrConfig().luceneMatchVersion.onOrAfter(Version.LUCENE_5_1_0)) { String reqPath = (String) req.getContext().get(PATH); if (!"/select".equals(reqPath)) { params.set(CommonParams.QT, reqPath); } // else if path is /select, then the qt gets passed thru if set } else { // this is the pre-5.1 behavior, which translates to sending the shard request to /select params.remove(CommonParams.QT); } } shardHandler1.submit(sreq, shard, params); } } // now wait for replies, but if anyone puts more requests on // the outgoing queue, send them out immediately (by exiting // this loop) boolean tolerant = rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false); while (rb.outgoing.size() == 0) { ShardResponse srsp = tolerant ? shardHandler1.takeCompletedIncludingErrors(): shardHandler1.takeCompletedOrError(); if (srsp == null) break; // no more requests to wait for // Was there an exception? if (srsp.getException() != null) { // If things are not tolerant, abort everything and rethrow if(!tolerant) { shardHandler1.cancelAll(); if (srsp.getException() instanceof SolrException) { throw (SolrException)srsp.getException(); } else { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, srsp.getException()); } } else { if(rsp.getResponseHeader().get("partialResults") == null) { rsp.getResponseHeader().add("partialResults", Boolean.TRUE); } } } rb.finished.add(srsp.getShardRequest()); // let the components see the responses to the request for(SearchComponent c : components) { c.handleResponses(rb, srsp.getShardRequest()); } } } for(SearchComponent c : components) { c.finishStage(rb); } // we are done when the next stage is MAX_VALUE } while (nextStage != Integer.MAX_VALUE); } // SOLR-5550: still provide shards.info if requested even for a short circuited distrib request if(!rb.isDistrib && req.getParams().getBool(ShardParams.SHARDS_INFO, false) && rb.shortCircuitedURL != null) { NamedList<Object> shardInfo = new SimpleOrderedMap<Object>(); SimpleOrderedMap<Object> nl = new SimpleOrderedMap<Object>(); if (rsp.getException() != null) { Throwable cause = rsp.getException(); if (cause instanceof SolrServerException) { cause = ((SolrServerException)cause).getRootCause(); } else { if (cause.getCause() != null) { cause = cause.getCause(); } } nl.add("error", cause.toString() ); StringWriter trace = new StringWriter(); cause.printStackTrace(new PrintWriter(trace)); nl.add("trace", trace.toString() ); } else { nl.add("numFound", rb.getResults().docList.matches()); nl.add("maxScore", rb.getResults().docList.maxScore()); } nl.add("shardAddress", rb.shortCircuitedURL); nl.add("time", req.getRequestTimer().getTime()); // elapsed time of this request so far int pos = rb.shortCircuitedURL.indexOf("://"); String shardInfoName = pos != -1 ? rb.shortCircuitedURL.substring(pos+3) : rb.shortCircuitedURL; shardInfo.add(shardInfoName, nl); rsp.getValues().add(ShardParams.SHARDS_INFO,shardInfo); } }
1. 单机QueryComponent处理请求:
/** * Actually run the query */ @Override public void process(ResponseBuilder rb) throws IOException { LOG.debug("process: {}", rb.req.getParams()); SolrQueryRequest req = rb.req; SolrParams params = req.getParams(); if (!params.getBool(COMPONENT_NAME, true)) { return; } SolrIndexSearcher searcher = req.getSearcher(); StatsCache statsCache = req.getCore().getStatsCache(); int purpose = params.getInt(ShardParams.SHARDS_PURPOSE, ShardRequest.PURPOSE_GET_TOP_IDS); if ((purpose & ShardRequest.PURPOSE_GET_TERM_STATS) != 0) { statsCache.returnLocalStats(rb, searcher); return; } // check if we need to update the local copy of global dfs if ((purpose & ShardRequest.PURPOSE_SET_TERM_STATS) != 0) { // retrieve from request and update local cache statsCache.receiveGlobalStats(req); } SolrQueryResponse rsp = rb.rsp; IndexSchema schema = searcher.getSchema(); // Optional: This could also be implemented by the top-level searcher sending // a filter that lists the ids... that would be transparent to // the request handler, but would be more expensive (and would preserve score // too if desired). String ids = params.get(ShardParams.IDS); if (ids != null) { SchemaField idField = schema.getUniqueKeyField(); List<String> idArr = StrUtils.splitSmart(ids, ",", true); int[] luceneIds = new int[idArr.size()]; int docs = 0; for (int i=0; i<idArr.size(); i++) { int id = searcher.getFirstMatch( new Term(idField.getName(), idField.getType().toInternal(idArr.get(i)))); if (id >= 0) luceneIds[docs++] = id; } DocListAndSet res = new DocListAndSet(); res.docList = new DocSlice(0, docs, luceneIds, null, docs, 0); if (rb.isNeedDocSet()) { // TODO: create a cache for this! List<Query> queries = new ArrayList<>(); queries.add(rb.getQuery()); List<Query> filters = rb.getFilters(); if (filters != null) queries.addAll(filters); res.docSet = searcher.getDocSet(queries); } rb.setResults(res); ResultContext ctx = new ResultContext(); ctx.docs = rb.getResults().docList; ctx.query = null; // anything? rsp.add("response", ctx); return; } // -1 as flag if not set. long timeAllowed = params.getLong(CommonParams.TIME_ALLOWED, -1L); if (null != rb.getCursorMark() && 0 < timeAllowed) { // fundamentally incompatible throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Can not search using both " + CursorMarkParams.CURSOR_MARK_PARAM + " and " + CommonParams.TIME_ALLOWED); } SolrIndexSearcher.QueryCommand cmd = rb.getQueryCommand(); cmd.setTimeAllowed(timeAllowed); req.getContext().put(SolrIndexSearcher.STATS_SOURCE, statsCache.get(req)); SolrIndexSearcher.QueryResult result = new SolrIndexSearcher.QueryResult(); // // grouping / field collapsing // GroupingSpecification groupingSpec = rb.getGroupingSpec(); if (groupingSpec != null) { try { boolean needScores = (cmd.getFlags() & SolrIndexSearcher.GET_SCORES) != 0; if (params.getBool(GroupParams.GROUP_DISTRIBUTED_FIRST, false)) { CommandHandler.Builder topsGroupsActionBuilder = new CommandHandler.Builder() .setQueryCommand(cmd) .setNeedDocSet(false) // Order matters here .setIncludeHitCount(true) .setSearcher(searcher); for (String field : groupingSpec.getFields()) { topsGroupsActionBuilder.addCommandField(new SearchGroupsFieldCommand.Builder() .setField(schema.getField(field)) .setGroupSort(groupingSpec.getGroupSort()) .setTopNGroups(cmd.getOffset() + cmd.getLen()) .setIncludeGroupCount(groupingSpec.isIncludeGroupCount()) .build() ); } CommandHandler commandHandler = topsGroupsActionBuilder.build(); commandHandler.execute(); SearchGroupsResultTransformer serializer = new SearchGroupsResultTransformer(searcher); rsp.add("firstPhase", commandHandler.processResult(result, serializer)); rsp.add("totalHitCount", commandHandler.getTotalHitCount()); rb.setResult(result); return; } else if (params.getBool(GroupParams.GROUP_DISTRIBUTED_SECOND, false)) { CommandHandler.Builder secondPhaseBuilder = new CommandHandler.Builder() .setQueryCommand(cmd) .setTruncateGroups(groupingSpec.isTruncateGroups() && groupingSpec.getFields().length > 0) .setSearcher(searcher); for (String field : groupingSpec.getFields()) { SchemaField schemaField = schema.getField(field); String[] topGroupsParam = params.getParams(GroupParams.GROUP_DISTRIBUTED_TOPGROUPS_PREFIX + field); if (topGroupsParam == null) { topGroupsParam = new String[0]; } List<SearchGroup<BytesRef>> topGroups = new ArrayList<>(topGroupsParam.length); for (String topGroup : topGroupsParam) { SearchGroup<BytesRef> searchGroup = new SearchGroup<>(); if (!topGroup.equals(TopGroupsShardRequestFactory.GROUP_NULL_VALUE)) { searchGroup.groupValue = new BytesRef(schemaField.getType().readableToIndexed(topGroup)); } topGroups.add(searchGroup); } secondPhaseBuilder.addCommandField( new TopGroupsFieldCommand.Builder() .setField(schemaField) .setGroupSort(groupingSpec.getGroupSort()) .setSortWithinGroup(groupingSpec.getSortWithinGroup()) .setFirstPhaseGroups(topGroups) .setMaxDocPerGroup(groupingSpec.getGroupOffset() + groupingSpec.getGroupLimit()) .setNeedScores(needScores) .setNeedMaxScore(needScores) .build() ); } for (String query : groupingSpec.getQueries()) { secondPhaseBuilder.addCommandField(new QueryCommand.Builder() .setDocsToCollect(groupingSpec.getOffset() + groupingSpec.getLimit()) .setSort(groupingSpec.getGroupSort()) .setQuery(query, rb.req) .setDocSet(searcher) .build() ); } CommandHandler commandHandler = secondPhaseBuilder.build(); commandHandler.execute(); TopGroupsResultTransformer serializer = new TopGroupsResultTransformer(rb); rsp.add("secondPhase", commandHandler.processResult(result, serializer)); rb.setResult(result); return; } int maxDocsPercentageToCache = params.getInt(GroupParams.GROUP_CACHE_PERCENTAGE, 0); boolean cacheSecondPassSearch = maxDocsPercentageToCache >= 1 && maxDocsPercentageToCache <= 100; Grouping.TotalCount defaultTotalCount = groupingSpec.isIncludeGroupCount() ? Grouping.TotalCount.grouped : Grouping.TotalCount.ungrouped; int limitDefault = cmd.getLen(); // this is normally from "rows" Grouping grouping = new Grouping(searcher, result, cmd, cacheSecondPassSearch, maxDocsPercentageToCache, groupingSpec.isMain()); grouping.setSort(groupingSpec.getGroupSort()) .setGroupSort(groupingSpec.getSortWithinGroup()) .setDefaultFormat(groupingSpec.getResponseFormat()) .setLimitDefault(limitDefault) .setDefaultTotalCount(defaultTotalCount) .setDocsPerGroupDefault(groupingSpec.getGroupLimit()) .setGroupOffsetDefault(groupingSpec.getGroupOffset()) .setGetGroupedDocSet(groupingSpec.isTruncateGroups()); if (groupingSpec.getFields() != null) { for (String field : groupingSpec.getFields()) { grouping.addFieldCommand(field, rb.req); } } if (groupingSpec.getFunctions() != null) { for (String groupByStr : groupingSpec.getFunctions()) { grouping.addFunctionCommand(groupByStr, rb.req); } } if (groupingSpec.getQueries() != null) { for (String groupByStr : groupingSpec.getQueries()) { grouping.addQueryCommand(groupByStr, rb.req); } } if (rb.doHighlights || rb.isDebug() || params.getBool(MoreLikeThisParams.MLT, false)) { // we need a single list of the returned docs cmd.setFlags(SolrIndexSearcher.GET_DOCLIST); } grouping.execute(); if (grouping.isSignalCacheWarning()) { rsp.add( "cacheWarning", String.format(Locale.ROOT, "Cache limit of %d percent relative to maxdoc has exceeded. Please increase cache size or disable caching.", maxDocsPercentageToCache) ); } rb.setResult(result); if (grouping.mainResult != null) { ResultContext ctx = new ResultContext(); ctx.docs = grouping.mainResult; ctx.query = null; // TODO? add the query? rsp.add("response", ctx); rsp.getToLog().add("hits", grouping.mainResult.matches()); } else if (!grouping.getCommands().isEmpty()) { // Can never be empty since grouping.execute() checks for this. rsp.add("grouped", result.groupedResults); rsp.getToLog().add("hits", grouping.getCommands().get(0).getMatches()); } return; } catch (SyntaxError e) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e); } } // normal search result searcher.search(result, cmd); rb.setResult(result); ResultContext ctx = new ResultContext(); ctx.docs = rb.getResults().docList; ctx.query = rb.getQuery(); rsp.add("response", ctx); rsp.getToLog().add("hits", rb.getResults().docList.matches()); if ( ! rb.req.getParams().getBool(ShardParams.IS_SHARD,false) ) { if (null != rb.getNextCursorMark()) { rb.rsp.add(CursorMarkParams.CURSOR_MARK_NEXT, rb.getNextCursorMark().getSerializedTotem()); } } if(rb.mergeFieldHandler != null) { rb.mergeFieldHandler.handleMergeFields(rb, searcher); } else { doFieldSortValues(rb, searcher); } doPrefetch(rb); }
调用SolrIndexSearcher:基于lucence IndexSearcher之上加入缓存功能和schema aware
SolrIndexSearcher adds schema awareness and caching functionality
over the lucene IndexSearcher
public QueryResult search(QueryResult qr, QueryCommand cmd) throws IOException { getDocListC(qr,cmd); return qr; }
private void getDocListNC(QueryResult qr,QueryCommand cmd) throws IOException { int len = cmd.getSupersetMaxDoc(); int last = len; if (last < 0 || last > maxDoc()) last=maxDoc(); final int lastDocRequested = last; int nDocsReturned; int totalHits; float maxScore; int[] ids; float[] scores; boolean needScores = (cmd.getFlags() & GET_SCORES) != 0; Query query = QueryUtils.makeQueryable(cmd.getQuery()); ProcessedFilter pf = getProcessedFilter(cmd.getFilter(), cmd.getFilterList()); if (pf.filter != null) { query = new FilteredQuery(query, pf.filter); } // handle zero case... if (lastDocRequested<=0) { final float[] topscore = new float[] { Float.NEGATIVE_INFINITY }; final int[] numHits = new int[1]; Collector collector; if (!needScores) { collector = new SimpleCollector () { @Override public void collect(int doc) { numHits[0]++; } @Override public boolean needsScores() { return false; } }; } else { collector = new SimpleCollector() { Scorer scorer; @Override public void setScorer(Scorer scorer) { this.scorer = scorer; } @Override public void collect(int doc) throws IOException { numHits[0]++; float score = scorer.score(); if (score > topscore[0]) topscore[0]=score; } @Override public boolean needsScores() { return true; } }; } buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter); nDocsReturned=0; ids = new int[nDocsReturned]; scores = new float[nDocsReturned]; totalHits = numHits[0]; maxScore = totalHits>0 ? topscore[0] : 0.0f; // no docs on this page, so cursor doesn't change qr.setNextCursorMark(cmd.getCursorMark()); } else { final TopDocsCollector topCollector = buildTopDocsCollector(len, cmd); Collector collector = topCollector; buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter); totalHits = topCollector.getTotalHits(); TopDocs topDocs = topCollector.topDocs(0, len); populateNextCursorMarkFromTopDocs(qr, cmd, topDocs); maxScore = totalHits>0 ? topDocs.getMaxScore() : 0.0f; nDocsReturned = topDocs.scoreDocs.length; ids = new int[nDocsReturned]; scores = (cmd.getFlags()&GET_SCORES)!=0 ? new float[nDocsReturned] : null; for (int i=0; i<nDocsReturned; i++) { ScoreDoc scoreDoc = topDocs.scoreDocs[i]; ids[i] = scoreDoc.doc; if (scores != null) scores[i] = scoreDoc.score; } } int sliceLen = Math.min(lastDocRequested,nDocsReturned); if (sliceLen < 0) sliceLen=0; qr.setDocList(new DocSlice(0,sliceLen,ids,scores,totalHits,maxScore)); }
2. 集群查找
HttpShardHandler调用:
@Override public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) { // do this outside of the callable for thread safety reasons final List<String> urls = getURLs(sreq, shard); Callable<ShardResponse> task = new Callable<ShardResponse>() { @Override public ShardResponse call() throws Exception { ShardResponse srsp = new ShardResponse(); if (sreq.nodeName != null) { srsp.setNodeName(sreq.nodeName); } srsp.setShardRequest(sreq); srsp.setShard(shard); SimpleSolrResponse ssr = new SimpleSolrResponse(); srsp.setSolrResponse(ssr); long startTime = System.nanoTime(); try { params.remove(CommonParams.WT); // use default (currently javabin) params.remove(CommonParams.VERSION); QueryRequest req = makeQueryRequest(sreq, params, shard); req.setMethod(SolrRequest.METHOD.POST); // no need to set the response parser as binary is the default // req.setResponseParser(new BinaryResponseParser()); // if there are no shards available for a slice, urls.size()==0 if (urls.size()==0) { // TODO: what's the right error code here? We should use the same thing when // all of the servers for a shard are down. throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard); } if (urls.size() <= 1) { String url = urls.get(0); srsp.setShardAddress(url); try (SolrClient client = new HttpSolrClient(url, httpClient)) { ssr.nl = client.request(req); } } else { LBHttpSolrClient.Rsp rsp = httpShardHandlerFactory.makeLoadBalancedRequest(req, urls); ssr.nl = rsp.getResponse(); srsp.setShardAddress(rsp.getServer()); } } catch( ConnectException cex ) { srsp.setException(cex); //???? } catch (Exception th) { srsp.setException(th); if (th instanceof SolrException) { srsp.setResponseCode(((SolrException)th).code()); } else { srsp.setResponseCode(-1); } } ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); return transfomResponse(sreq, srsp, shard); } }; try { if (shard != null) { MDC.put("ShardRequest.shards", shard); } if (urls != null && !urls.isEmpty()) { MDC.put("ShardRequest.urlList", urls.toString()); } pending.add( completionService.submit(task) ); } finally { MDC.remove("ShardRequest.shards"); MDC.remove("ShardRequest.urlList"); } }
调用LBHttpSolrClient处理:
该类的功能说明:
/** * LBHttpSolrClient or "LoadBalanced HttpSolrClient" is a load balancing wrapper around * {@link HttpSolrClient}. This is useful when you * have multiple Solr servers and the requests need to be Load Balanced among them. * * Do <b>NOT</b> use this class for indexing in master/slave scenarios since documents must be sent to the * correct master; no inter-node routing is done. * * In SolrCloud (leader/replica) scenarios, it is usually better to use * {@link CloudSolrClient}, but this class may be used * for updates because the server will forward them to the appropriate leader. * * <p> * It offers automatic failover when a server goes down and it detects when the server comes back up. * <p> * Load balancing is done using a simple round-robin on the list of servers. * <p> * If a request to a server fails by an IOException due to a connection timeout or read timeout then the host is taken * off the list of live servers and moved to a 'dead server list' and the request is resent to the next live server. * This process is continued till it tries all the live servers. If at least one server is alive, the request succeeds, * and if not it fails. * <blockquote><pre> * SolrClient lbHttpSolrClient = new LBHttpSolrClient("http://host1:8080/solr/", "http://host2:8080/solr", "http://host2:8080/solr"); * //or if you wish to pass the HttpClient do as follows * httpClient httpClient = new HttpClient(); * SolrClient lbHttpSolrClient = new LBHttpSolrClient(httpClient, "http://host1:8080/solr/", "http://host2:8080/solr", "http://host2:8080/solr"); * </pre></blockquote> * This detects if a dead server comes alive automatically. The check is done in fixed intervals in a dedicated thread. * This interval can be set using {@link #setAliveCheckInterval} , the default is set to one minute. * <p> * <b>When to use this?</b><br> This can be used as a software load balancer when you do not wish to setup an external * load balancer. Alternatives to this code are to use * a dedicated hardware load balancer or using Apache httpd with mod_proxy_balancer as a load balancer. See <a * href="http://en.wikipedia.org/wiki/Load_balancing_(computing)">Load balancing on Wikipedia</a> * * @since solr 1.4 */
具体的执行方法如下:
/** * Makes a request to one or more of the given urls, using the configured load balancer. * * @param req The solr search request that should be sent through the load balancer * @param urls The list of solr server urls to load balance across * @return The response from the request */ public LBHttpSolrClient.Rsp makeLoadBalancedRequest(final QueryRequest req, List<String> urls) throws SolrServerException, IOException { return loadbalancer.request(new LBHttpSolrClient.Req(req, urls)); }
调用request处理请求:
/** * Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped. * If a request fails due to an IOException, the server is moved to the dead pool for a certain period of * time, or until a test request on that server succeeds. * * Servers are queried in the exact order given (except servers currently in the dead pool are skipped). * If no live servers from the provided list remain to be tried, a number of previously skipped dead servers will be tried. * Req.getNumDeadServersToTry() controls how many dead servers will be tried. * * If no live servers are found a SolrServerException is thrown. * * @param req contains both the request as well as the list of servers to query * * @return the result of the request * * @throws IOException If there is a low-level I/O error. */ public Rsp request(Req req) throws SolrServerException, IOException { Rsp rsp = new Rsp(); Exception ex = null; boolean isUpdate = req.request instanceof IsUpdateRequest; List<ServerWrapper> skipped = null; long timeAllowedNano = getTimeAllowedInNanos(req.getRequest()); long timeOutTime = System.nanoTime() + timeAllowedNano; for (String serverStr : req.getServers()) { if(isTimeExceeded(timeAllowedNano, timeOutTime)) { break; } serverStr = normalize(serverStr); // if the server is currently a zombie, just skip to the next one ServerWrapper wrapper = zombieServers.get(serverStr); if (wrapper != null) { // System.out.println("ZOMBIE SERVER QUERIED: " + serverStr); final int numDeadServersToTry = req.getNumDeadServersToTry(); if (numDeadServersToTry > 0) { if (skipped == null) { skipped = new ArrayList<>(numDeadServersToTry); skipped.add(wrapper); } else if (skipped.size() < numDeadServersToTry) { skipped.add(wrapper); } } continue; } rsp.server = serverStr; try { MDC.put("LBHttpSolrClient.url", serverStr); HttpSolrClient client = makeSolrClient(serverStr); ex = doRequest(client, req, rsp, isUpdate, false, null); if (ex == null) { return rsp; // SUCCESS } } finally { MDC.remove("LBHttpSolrClient.url"); } } // try the servers we previously skipped if (skipped != null) { for (ServerWrapper wrapper : skipped) { if(isTimeExceeded(timeAllowedNano, timeOutTime)) { break; } ex = doRequest(wrapper.client, req, rsp, isUpdate, true, wrapper.getKey()); if (ex == null) { return rsp; // SUCCESS } } } if (ex == null) { throw new SolrServerException("No live SolrServers available to handle this request"); } else { throw new SolrServerException("No live SolrServers available to handle this request:" + zombieServers.keySet(), ex); } }