SolrDispatchFilter的作用

This filter looks at the incoming URL maps them to handlers defined in solrconfig.xml

将请求的url映射到solrconfig.xml定义的handler上。

该过滤器的doFilter方法主题:

 HttpSolrCall call = getHttpSolrCall((HttpServletRequest) request, (HttpServletResponse) response, retry);try {Action result = call.call();switch (result) {case PASSTHROUGH:chain.doFilter(request, response);break;case RETRY:doFilter(request, response, chain, true);break;case FORWARD:request.getRequestDispatcher(call.getPath()).forward(request, response);break;}  } finally {call.destroy();}

HttpSolrCall的call方法处理这个请求:

case PROCESS:final Method reqMethod = Method.getMethod(req.getMethod());HttpCacheHeaderUtil.setCacheControlHeader(config, resp, reqMethod);// unless we have been explicitly told not to, do cache validation// if we fail cache validation, execute the queryif (config.getHttpCachingConfig().isNever304() ||!HttpCacheHeaderUtil.doCacheHeaderValidation(solrReq, req, reqMethod, resp)) {SolrQueryResponse solrRsp = new SolrQueryResponse();/* even for HEAD requests, we need to execute the handler to* ensure we don't get an error (and to make sure the correct* QueryResponseWriter is selected and we get the correct* Content-Type)*/SolrRequestInfo.setRequestInfo(new SolrRequestInfo(solrReq, solrRsp)); execute(solrRsp);HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);Iterator<Map.Entry<String, String>> headers = solrRsp.httpHeaders();while (headers.hasNext()) {Map.Entry<String, String> entry = headers.next();resp.addHeader(entry.getKey(), entry.getValue());}QueryResponseWriter responseWriter = core.getQueryResponseWriter(solrReq);if (invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates);writeResponse(solrRsp, responseWriter, reqMethod);}return RETURN;

构造请求和响应,并调用SolrCore来处理:

public void execute(SolrRequestHandler handler, SolrQueryRequest req, SolrQueryResponse rsp) {if (handler==null) {String msg = "Null Request Handler '" +req.getParams().get(CommonParams.QT) + "'";if (log.isWarnEnabled()) log.warn(logid + msg + ":" + req);throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, msg);}preDecorateResponse(req, rsp);if (requestLog.isDebugEnabled() && rsp.getToLog().size() > 0) {// log request at debug in case something goes wrong and we aren't able to log later
      requestLog.debug(rsp.getToLogAsString(logid));}// TODO: this doesn't seem to be working correctly and causes problems with the example server and distrib (for example /spell)// if (req.getParams().getBool(ShardParams.IS_SHARD,false) && !(handler instanceof SearchHandler))//   throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,"isShard is only acceptable with search handlers");
handler.handleRequest(req,rsp);postDecorateResponse(handler, req, rsp);if (rsp.getToLog().size() > 0) {if (requestLog.isInfoEnabled()) {requestLog.info(rsp.getToLogAsString(logid));}if (log.isWarnEnabled() && slowQueryThresholdMillis >= 0) {final long qtime = (long) (req.getRequestTimer().getTime());if (qtime >= slowQueryThresholdMillis) {log.warn("slow: " + rsp.getToLogAsString(logid));}}}}

RequestHandlerBase映射到相应的handler,进行请求处理:

/*** Initializes the {@link org.apache.solr.request.SolrRequestHandler} by creating three {@link org.apache.solr.common.params.SolrParams} named.* <table border="1" summary="table of parameters">* <tr><th>Name</th><th>Description</th></tr>* <tr><td>defaults</td><td>Contains all of the named arguments contained within the list element named "defaults".</td></tr>* <tr><td>appends</td><td>Contains all of the named arguments contained within the list element named "appends".</td></tr>* <tr><td>invariants</td><td>Contains all of the named arguments contained within the list element named "invariants".</td></tr>* </table>** Example:* <pre>* &lt;lst name="defaults"&gt;* &lt;str name="echoParams"&gt;explicit&lt;/str&gt;* &lt;str name="qf"&gt;text^0.5 features^1.0 name^1.2 sku^1.5 id^10.0&lt;/str&gt;* &lt;str name="mm"&gt;2&lt;-1 5&lt;-2 6&lt;90%&lt;/str&gt;* &lt;str name="bq"&gt;incubationdate_dt:[* TO NOW/DAY-1MONTH]^2.2&lt;/str&gt;* &lt;/lst&gt;* &lt;lst name="appends"&gt;* &lt;str name="fq"&gt;inStock:true&lt;/str&gt;* &lt;/lst&gt;** &lt;lst name="invariants"&gt;* &lt;str name="facet.field"&gt;cat&lt;/str&gt;* &lt;str name="facet.field"&gt;manu_exact&lt;/str&gt;* &lt;str name="facet.query"&gt;price:[* TO 500]&lt;/str&gt;* &lt;str name="facet.query"&gt;price:[500 TO *]&lt;/str&gt;* &lt;/lst&gt;* </pre>*** @param args The {@link org.apache.solr.common.util.NamedList} to initialize from** @see #handleRequest(org.apache.solr.request.SolrQueryRequest, org.apache.solr.response.SolrQueryResponse)* @see #handleRequestBody(org.apache.solr.request.SolrQueryRequest, org.apache.solr.response.SolrQueryResponse)* @see org.apache.solr.util.SolrPluginUtils#setDefaults(org.apache.solr.request.SolrQueryRequest, org.apache.solr.common.params.SolrParams, org.apache.solr.common.params.SolrParams, org.apache.solr.common.params.SolrParams)* @see SolrParams#toSolrParams(org.apache.solr.common.util.NamedList)** See also the example solrconfig.xml located in the Solr codebase (example/solr/conf).*/
@Overridepublic void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp) {numRequests.incrementAndGet();TimerContext timer = requestTimes.time();try {if(pluginInfo != null && pluginInfo.attributes.containsKey(USEPARAM)) req.getContext().put(USEPARAM,pluginInfo.attributes.get(USEPARAM));SolrPluginUtils.setDefaults(this, req, defaults, appends, invariants);req.getContext().remove(USEPARAM);rsp.setHttpCaching(httpCaching);handleRequestBody( req, rsp );// count timeoutsNamedList header = rsp.getResponseHeader();if(header != null) {Object partialResults = header.get("partialResults");boolean timedOut = partialResults == null ? false : (Boolean)partialResults;if( timedOut ) {numTimeouts.incrementAndGet();rsp.setHttpCaching(false);}}} catch (Exception e) {if (e instanceof SolrException) {SolrException se = (SolrException)e;if (se.code() == SolrException.ErrorCode.CONFLICT.code) {// TODO: should we allow this to be counted as an error (numErrors++)?
} else {SolrException.log(SolrCore.log,e);}} else {SolrException.log(SolrCore.log,e);if (e instanceof SyntaxError) {e = new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);}}rsp.setException(e);numErrors.incrementAndGet();}finally {timer.stop();}}

对应的handler:SearchHandler来处理body:

@Overridepublic void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception{List<SearchComponent> components  = getComponents();ResponseBuilder rb = new ResponseBuilder(req, rsp, components);if (rb.requestInfo != null) {rb.requestInfo.setResponseBuilder(rb);}boolean dbg = req.getParams().getBool(CommonParams.DEBUG_QUERY, false);rb.setDebug(dbg);if (dbg == false){//if it's true, we are doing everything anyway.
      SolrPluginUtils.getDebugInterests(req.getParams().getParams(CommonParams.DEBUG), rb);}final RTimerTree timer = rb.isDebug() ? req.getRequestTimer() : null;final ShardHandler shardHandler1 = getAndPrepShardHandler(req, rb); // creates a ShardHandler object only if it's neededif (timer == null) {// non-debugging prepare phasefor( SearchComponent c : components ) {c.prepare(rb);}} else {// debugging prepare phaseRTimerTree subt = timer.sub( "prepare" );for( SearchComponent c : components ) {rb.setTimer( subt.sub( c.getName() ) );c.prepare(rb);rb.getTimer().stop();}subt.stop();}if (!rb.isDistrib) {// a normal non-distributed requestlong timeAllowed = req.getParams().getLong(CommonParams.TIME_ALLOWED, -1L);if (timeAllowed > 0L) {SolrQueryTimeoutImpl.set(timeAllowed);}try {// The semantics of debugging vs not debugging are different enough that// it makes sense to have two control loopsif(!rb.isDebug()) {// Processfor( SearchComponent c : components ) {c.process(rb);}}else {// ProcessRTimerTree subt = timer.sub( "process" );for( SearchComponent c : components ) {rb.setTimer( subt.sub( c.getName() ) );c.process(rb);rb.getTimer().stop();}subt.stop();// add the timing infoif (rb.isDebugTimings()) {rb.addDebugInfo("timing", timer.asNamedList() );}}} catch (ExitableDirectoryReader.ExitingReaderException ex) {log.warn( "Query: " + req.getParamString() + "; " + ex.getMessage());SolrDocumentList r = (SolrDocumentList) rb.rsp.getValues().get("response");if(r == null)r = new SolrDocumentList();r.setNumFound(0);rb.rsp.add("response", r);if(rb.isDebug()) {NamedList debug = new NamedList();debug.add("explain", new NamedList());rb.rsp.add("debug", debug);}rb.rsp.getResponseHeader().add("partialResults", Boolean.TRUE);} finally {SolrQueryTimeoutImpl.reset();}} else {// a distributed requestif (rb.outgoing == null) {rb.outgoing = new LinkedList<>();}rb.finished = new ArrayList<>();int nextStage = 0;do {rb.stage = nextStage;nextStage = ResponseBuilder.STAGE_DONE;// call all componentsfor( SearchComponent c : components ) {// the next stage is the minimum of what all components reportnextStage = Math.min(nextStage, c.distributedProcess(rb));}// check the outgoing queue and send requestswhile (rb.outgoing.size() > 0) {// submit all current request tasks at oncewhile (rb.outgoing.size() > 0) {ShardRequest sreq = rb.outgoing.remove(0);sreq.actualShards = sreq.shards;if (sreq.actualShards==ShardRequest.ALL_SHARDS) {sreq.actualShards = rb.shards;}sreq.responses = new ArrayList<>(sreq.actualShards.length); // presume we'll get a response from each shard we send to// TODO: map from shard to address[]for (String shard : sreq.actualShards) {ModifiableSolrParams params = new ModifiableSolrParams(sreq.params);params.remove(ShardParams.SHARDS);      // not a top-level requestparams.set(CommonParams.DISTRIB, "false");               // not a top-level requestparams.remove("indent");params.remove(CommonParams.HEADER_ECHO_PARAMS);params.set(ShardParams.IS_SHARD, true);  // a sub (shard) request
              params.set(ShardParams.SHARDS_PURPOSE, sreq.purpose);params.set(ShardParams.SHARD_URL, shard); // so the shard knows what was askedif (rb.requestInfo != null) {// we could try and detect when this is needed, but it could be trickyparams.set("NOW", Long.toString(rb.requestInfo.getNOW().getTime()));}String shardQt = params.get(ShardParams.SHARDS_QT);if (shardQt != null) {params.set(CommonParams.QT, shardQt);} else {// for distributed queries that don't include shards.qt, use the original path// as the default but operators need to update their luceneMatchVersion to enable// this behavior since it did not work this way prior to 5.1if (req.getCore().getSolrConfig().luceneMatchVersion.onOrAfter(Version.LUCENE_5_1_0)) {String reqPath = (String) req.getContext().get(PATH);if (!"/select".equals(reqPath)) {params.set(CommonParams.QT, reqPath);} // else if path is /select, then the qt gets passed thru if set} else {// this is the pre-5.1 behavior, which translates to sending the shard request to /select
                  params.remove(CommonParams.QT);}}shardHandler1.submit(sreq, shard, params);}}// now wait for replies, but if anyone puts more requests on// the outgoing queue, send them out immediately (by exiting// this loop)boolean tolerant = rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false);while (rb.outgoing.size() == 0) {ShardResponse srsp = tolerant ? shardHandler1.takeCompletedIncludingErrors():shardHandler1.takeCompletedOrError();if (srsp == null) break;  // no more requests to wait for// Was there an exception?  if (srsp.getException() != null) {// If things are not tolerant, abort everything and rethrowif(!tolerant) {shardHandler1.cancelAll();if (srsp.getException() instanceof SolrException) {throw (SolrException)srsp.getException();} else {throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, srsp.getException());}} else {if(rsp.getResponseHeader().get("partialResults") == null) {rsp.getResponseHeader().add("partialResults", Boolean.TRUE);}}}rb.finished.add(srsp.getShardRequest());// let the components see the responses to the requestfor(SearchComponent c : components) {c.handleResponses(rb, srsp.getShardRequest());}}}for(SearchComponent c : components) {c.finishStage(rb);}// we are done when the next stage is MAX_VALUE} while (nextStage != Integer.MAX_VALUE);}// SOLR-5550: still provide shards.info if requested even for a short circuited distrib requestif(!rb.isDistrib && req.getParams().getBool(ShardParams.SHARDS_INFO, false) && rb.shortCircuitedURL != null) {  NamedList<Object> shardInfo = new SimpleOrderedMap<Object>();SimpleOrderedMap<Object> nl = new SimpleOrderedMap<Object>();        if (rsp.getException() != null) {Throwable cause = rsp.getException();if (cause instanceof SolrServerException) {cause = ((SolrServerException)cause).getRootCause();} else {if (cause.getCause() != null) {cause = cause.getCause();}          }nl.add("error", cause.toString() );StringWriter trace = new StringWriter();cause.printStackTrace(new PrintWriter(trace));nl.add("trace", trace.toString() );}else {nl.add("numFound", rb.getResults().docList.matches());nl.add("maxScore", rb.getResults().docList.maxScore());}nl.add("shardAddress", rb.shortCircuitedURL);nl.add("time", req.getRequestTimer().getTime()); // elapsed time of this request so farint pos = rb.shortCircuitedURL.indexOf("://");        String shardInfoName = pos != -1 ? rb.shortCircuitedURL.substring(pos+3) : rb.shortCircuitedURL;shardInfo.add(shardInfoName, nl);   rsp.getValues().add(ShardParams.SHARDS_INFO,shardInfo);            }}

1. 单机QueryComponent处理请求:

/*** Actually run the query*/@Overridepublic void process(ResponseBuilder rb) throws IOException{LOG.debug("process: {}", rb.req.getParams());SolrQueryRequest req = rb.req;SolrParams params = req.getParams();if (!params.getBool(COMPONENT_NAME, true)) {return;}SolrIndexSearcher searcher = req.getSearcher();StatsCache statsCache = req.getCore().getStatsCache();int purpose = params.getInt(ShardParams.SHARDS_PURPOSE, ShardRequest.PURPOSE_GET_TOP_IDS);if ((purpose & ShardRequest.PURPOSE_GET_TERM_STATS) != 0) {statsCache.returnLocalStats(rb, searcher);return;}// check if we need to update the local copy of global dfsif ((purpose & ShardRequest.PURPOSE_SET_TERM_STATS) != 0) {// retrieve from request and update local cache
      statsCache.receiveGlobalStats(req);}SolrQueryResponse rsp = rb.rsp;IndexSchema schema = searcher.getSchema();// Optional: This could also be implemented by the top-level searcher sending// a filter that lists the ids... that would be transparent to// the request handler, but would be more expensive (and would preserve score// too if desired).String ids = params.get(ShardParams.IDS);if (ids != null) {SchemaField idField = schema.getUniqueKeyField();List<String> idArr = StrUtils.splitSmart(ids, ",", true);int[] luceneIds = new int[idArr.size()];int docs = 0;for (int i=0; i<idArr.size(); i++) {int id = searcher.getFirstMatch(new Term(idField.getName(), idField.getType().toInternal(idArr.get(i))));if (id >= 0)luceneIds[docs++] = id;}DocListAndSet res = new DocListAndSet();res.docList = new DocSlice(0, docs, luceneIds, null, docs, 0);if (rb.isNeedDocSet()) {// TODO: create a cache for this!List<Query> queries = new ArrayList<>();queries.add(rb.getQuery());List<Query> filters = rb.getFilters();if (filters != null) queries.addAll(filters);res.docSet = searcher.getDocSet(queries);}rb.setResults(res);ResultContext ctx = new ResultContext();ctx.docs = rb.getResults().docList;ctx.query = null; // anything?rsp.add("response", ctx);return;}// -1 as flag if not set.long timeAllowed = params.getLong(CommonParams.TIME_ALLOWED, -1L);if (null != rb.getCursorMark() && 0 < timeAllowed) {// fundamentally incompatiblethrow new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Can not search using both " +CursorMarkParams.CURSOR_MARK_PARAM + " and " + CommonParams.TIME_ALLOWED);}SolrIndexSearcher.QueryCommand cmd = rb.getQueryCommand();cmd.setTimeAllowed(timeAllowed);req.getContext().put(SolrIndexSearcher.STATS_SOURCE, statsCache.get(req));SolrIndexSearcher.QueryResult result = new SolrIndexSearcher.QueryResult();//// grouping / field collapsing//
    GroupingSpecification groupingSpec = rb.getGroupingSpec();if (groupingSpec != null) {try {boolean needScores = (cmd.getFlags() & SolrIndexSearcher.GET_SCORES) != 0;if (params.getBool(GroupParams.GROUP_DISTRIBUTED_FIRST, false)) {CommandHandler.Builder topsGroupsActionBuilder = new CommandHandler.Builder().setQueryCommand(cmd).setNeedDocSet(false) // Order matters here.setIncludeHitCount(true).setSearcher(searcher);for (String field : groupingSpec.getFields()) {topsGroupsActionBuilder.addCommandField(new SearchGroupsFieldCommand.Builder().setField(schema.getField(field)).setGroupSort(groupingSpec.getGroupSort()).setTopNGroups(cmd.getOffset() + cmd.getLen()).setIncludeGroupCount(groupingSpec.isIncludeGroupCount()).build());}CommandHandler commandHandler = topsGroupsActionBuilder.build();commandHandler.execute();SearchGroupsResultTransformer serializer = new SearchGroupsResultTransformer(searcher);rsp.add("firstPhase", commandHandler.processResult(result, serializer));rsp.add("totalHitCount", commandHandler.getTotalHitCount());rb.setResult(result);return;} else if (params.getBool(GroupParams.GROUP_DISTRIBUTED_SECOND, false)) {CommandHandler.Builder secondPhaseBuilder = new CommandHandler.Builder().setQueryCommand(cmd).setTruncateGroups(groupingSpec.isTruncateGroups() && groupingSpec.getFields().length > 0).setSearcher(searcher);for (String field : groupingSpec.getFields()) {SchemaField schemaField = schema.getField(field);String[] topGroupsParam = params.getParams(GroupParams.GROUP_DISTRIBUTED_TOPGROUPS_PREFIX + field);if (topGroupsParam == null) {topGroupsParam = new String[0];}List<SearchGroup<BytesRef>> topGroups = new ArrayList<>(topGroupsParam.length);for (String topGroup : topGroupsParam) {SearchGroup<BytesRef> searchGroup = new SearchGroup<>();if (!topGroup.equals(TopGroupsShardRequestFactory.GROUP_NULL_VALUE)) {searchGroup.groupValue = new BytesRef(schemaField.getType().readableToIndexed(topGroup));}topGroups.add(searchGroup);}secondPhaseBuilder.addCommandField(new TopGroupsFieldCommand.Builder().setField(schemaField).setGroupSort(groupingSpec.getGroupSort()).setSortWithinGroup(groupingSpec.getSortWithinGroup()).setFirstPhaseGroups(topGroups).setMaxDocPerGroup(groupingSpec.getGroupOffset() + groupingSpec.getGroupLimit()).setNeedScores(needScores).setNeedMaxScore(needScores).build());}for (String query : groupingSpec.getQueries()) {secondPhaseBuilder.addCommandField(new QueryCommand.Builder().setDocsToCollect(groupingSpec.getOffset() + groupingSpec.getLimit()).setSort(groupingSpec.getGroupSort()).setQuery(query, rb.req).setDocSet(searcher).build());}CommandHandler commandHandler = secondPhaseBuilder.build();commandHandler.execute();TopGroupsResultTransformer serializer = new TopGroupsResultTransformer(rb);rsp.add("secondPhase", commandHandler.processResult(result, serializer));rb.setResult(result);return;}int maxDocsPercentageToCache = params.getInt(GroupParams.GROUP_CACHE_PERCENTAGE, 0);boolean cacheSecondPassSearch = maxDocsPercentageToCache >= 1 && maxDocsPercentageToCache <= 100;Grouping.TotalCount defaultTotalCount = groupingSpec.isIncludeGroupCount() ?Grouping.TotalCount.grouped : Grouping.TotalCount.ungrouped;int limitDefault = cmd.getLen(); // this is normally from "rows"Grouping grouping =new Grouping(searcher, result, cmd, cacheSecondPassSearch, maxDocsPercentageToCache, groupingSpec.isMain());grouping.setSort(groupingSpec.getGroupSort()).setGroupSort(groupingSpec.getSortWithinGroup()).setDefaultFormat(groupingSpec.getResponseFormat()).setLimitDefault(limitDefault).setDefaultTotalCount(defaultTotalCount).setDocsPerGroupDefault(groupingSpec.getGroupLimit()).setGroupOffsetDefault(groupingSpec.getGroupOffset()).setGetGroupedDocSet(groupingSpec.isTruncateGroups());if (groupingSpec.getFields() != null) {for (String field : groupingSpec.getFields()) {grouping.addFieldCommand(field, rb.req);}}if (groupingSpec.getFunctions() != null) {for (String groupByStr : groupingSpec.getFunctions()) {grouping.addFunctionCommand(groupByStr, rb.req);}}if (groupingSpec.getQueries() != null) {for (String groupByStr : groupingSpec.getQueries()) {grouping.addQueryCommand(groupByStr, rb.req);}}if (rb.doHighlights || rb.isDebug() || params.getBool(MoreLikeThisParams.MLT, false)) {// we need a single list of the returned docs
          cmd.setFlags(SolrIndexSearcher.GET_DOCLIST);}grouping.execute();if (grouping.isSignalCacheWarning()) {rsp.add("cacheWarning",String.format(Locale.ROOT, "Cache limit of %d percent relative to maxdoc has exceeded. Please increase cache size or disable caching.", maxDocsPercentageToCache));}rb.setResult(result);if (grouping.mainResult != null) {ResultContext ctx = new ResultContext();ctx.docs = grouping.mainResult;ctx.query = null; // TODO? add the query?rsp.add("response", ctx);rsp.getToLog().add("hits", grouping.mainResult.matches());} else if (!grouping.getCommands().isEmpty()) { // Can never be empty since grouping.execute() checks for this.rsp.add("grouped", result.groupedResults);rsp.getToLog().add("hits", grouping.getCommands().get(0).getMatches());}return;} catch (SyntaxError e) {throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);}}// normal search result
    searcher.search(result, cmd);rb.setResult(result);ResultContext ctx = new ResultContext();ctx.docs = rb.getResults().docList;ctx.query = rb.getQuery();rsp.add("response", ctx);rsp.getToLog().add("hits", rb.getResults().docList.matches());if ( ! rb.req.getParams().getBool(ShardParams.IS_SHARD,false) ) {if (null != rb.getNextCursorMark()) {rb.rsp.add(CursorMarkParams.CURSOR_MARK_NEXT,rb.getNextCursorMark().getSerializedTotem());}}if(rb.mergeFieldHandler != null) {rb.mergeFieldHandler.handleMergeFields(rb, searcher);} else {doFieldSortValues(rb, searcher);}doPrefetch(rb);}

调用SolrIndexSearcher:基于lucence IndexSearcher之上加入缓存功能和schema aware

SolrIndexSearcher adds schema awareness and caching functionality
 over the lucene IndexSearcher

  public QueryResult search(QueryResult qr, QueryCommand cmd) throws IOException {getDocListC(qr,cmd);return qr;}

private void getDocListNC(QueryResult qr,QueryCommand cmd) throws IOException {int len = cmd.getSupersetMaxDoc();int last = len;if (last < 0 || last > maxDoc()) last=maxDoc();final int lastDocRequested = last;int nDocsReturned;int totalHits;float maxScore;int[] ids;float[] scores;boolean needScores = (cmd.getFlags() & GET_SCORES) != 0;Query query = QueryUtils.makeQueryable(cmd.getQuery());ProcessedFilter pf = getProcessedFilter(cmd.getFilter(), cmd.getFilterList());if (pf.filter != null) {query = new FilteredQuery(query, pf.filter);}// handle zero case...if (lastDocRequested<=0) {final float[] topscore = new float[] { Float.NEGATIVE_INFINITY };final int[] numHits = new int[1];Collector collector;if (!needScores) {collector = new SimpleCollector () {@Overridepublic void collect(int doc) {numHits[0]++;}@Overridepublic boolean needsScores() {return false;}};} else {collector = new SimpleCollector() {Scorer scorer;@Overridepublic void setScorer(Scorer scorer) {this.scorer = scorer;}@Overridepublic void collect(int doc) throws IOException {numHits[0]++;float score = scorer.score();if (score > topscore[0]) topscore[0]=score;            }@Overridepublic boolean needsScores() {return true;}};}buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter);nDocsReturned=0;ids = new int[nDocsReturned];scores = new float[nDocsReturned];totalHits = numHits[0];maxScore = totalHits>0 ? topscore[0] : 0.0f;// no docs on this page, so cursor doesn't change
      qr.setNextCursorMark(cmd.getCursorMark());} else {final TopDocsCollector topCollector = buildTopDocsCollector(len, cmd);Collector collector = topCollector;buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter);totalHits = topCollector.getTotalHits();TopDocs topDocs = topCollector.topDocs(0, len);populateNextCursorMarkFromTopDocs(qr, cmd, topDocs);maxScore = totalHits>0 ? topDocs.getMaxScore() : 0.0f;nDocsReturned = topDocs.scoreDocs.length;ids = new int[nDocsReturned];scores = (cmd.getFlags()&GET_SCORES)!=0 ? new float[nDocsReturned] : null;for (int i=0; i<nDocsReturned; i++) {ScoreDoc scoreDoc = topDocs.scoreDocs[i];ids[i] = scoreDoc.doc;if (scores != null) scores[i] = scoreDoc.score;}}int sliceLen = Math.min(lastDocRequested,nDocsReturned);if (sliceLen < 0) sliceLen=0;qr.setDocList(new DocSlice(0,sliceLen,ids,scores,totalHits,maxScore));}

2. 集群查找

HttpShardHandler调用:

@Overridepublic void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {// do this outside of the callable for thread safety reasonsfinal List<String> urls = getURLs(sreq, shard);Callable<ShardResponse> task = new Callable<ShardResponse>() {@Overridepublic ShardResponse call() throws Exception {ShardResponse srsp = new ShardResponse();if (sreq.nodeName != null) {srsp.setNodeName(sreq.nodeName);}srsp.setShardRequest(sreq);srsp.setShard(shard);SimpleSolrResponse ssr = new SimpleSolrResponse();srsp.setSolrResponse(ssr);long startTime = System.nanoTime();try {params.remove(CommonParams.WT); // use default (currently javabin)
          params.remove(CommonParams.VERSION);QueryRequest req = makeQueryRequest(sreq, params, shard);req.setMethod(SolrRequest.METHOD.POST);// no need to set the response parser as binary is the default// req.setResponseParser(new BinaryResponseParser());// if there are no shards available for a slice, urls.size()==0if (urls.size()==0) {// TODO: what's the right error code here? We should use the same thing when// all of the servers for a shard are down.throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);}if (urls.size() <= 1) {String url = urls.get(0);srsp.setShardAddress(url);try (SolrClient client = new HttpSolrClient(url, httpClient)) {ssr.nl = client.request(req);}} else {
            LBHttpSolrClient.Rsp rsp = httpShardHandlerFactory.makeLoadBalancedRequest(req, urls);ssr.nl = rsp.getResponse();srsp.setShardAddress(rsp.getServer());}}catch( ConnectException cex ) {srsp.setException(cex); //????} catch (Exception th) {srsp.setException(th);if (th instanceof SolrException) {srsp.setResponseCode(((SolrException)th).code());} else {srsp.setResponseCode(-1);}}ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);return transfomResponse(sreq, srsp, shard);}};try {if (shard != null)  {MDC.put("ShardRequest.shards", shard);}if (urls != null && !urls.isEmpty())  {MDC.put("ShardRequest.urlList", urls.toString());}pending.add( completionService.submit(task) );} finally {MDC.remove("ShardRequest.shards");MDC.remove("ShardRequest.urlList");}}

调用LBHttpSolrClient处理:

该类的功能说明:

/*** LBHttpSolrClient or "LoadBalanced HttpSolrClient" is a load balancing wrapper around* {@link HttpSolrClient}. This is useful when you* have multiple Solr servers and the requests need to be Load Balanced among them.** Do <b>NOT</b> use this class for indexing in master/slave scenarios since documents must be sent to the* correct master; no inter-node routing is done.** In SolrCloud (leader/replica) scenarios, it is usually better to use* {@link CloudSolrClient}, but this class may be used* for updates because the server will forward them to the appropriate leader.** <p>* It offers automatic failover when a server goes down and it detects when the server comes back up.* <p>* Load balancing is done using a simple round-robin on the list of servers.* <p>* If a request to a server fails by an IOException due to a connection timeout or read timeout then the host is taken* off the list of live servers and moved to a 'dead server list' and the request is resent to the next live server.* This process is continued till it tries all the live servers. If at least one server is alive, the request succeeds,* and if not it fails.* <blockquote><pre>* SolrClient lbHttpSolrClient = new LBHttpSolrClient("http://host1:8080/solr/", "http://host2:8080/solr", "http://host2:8080/solr");* //or if you wish to pass the HttpClient do as follows* httpClient httpClient = new HttpClient();* SolrClient lbHttpSolrClient = new LBHttpSolrClient(httpClient, "http://host1:8080/solr/", "http://host2:8080/solr", "http://host2:8080/solr");* </pre></blockquote>* This detects if a dead server comes alive automatically. The check is done in fixed intervals in a dedicated thread.* This interval can be set using {@link #setAliveCheckInterval} , the default is set to one minute.* <p>* <b>When to use this?</b><br> This can be used as a software load balancer when you do not wish to setup an external* load balancer. Alternatives to this code are to use* a dedicated hardware load balancer or using Apache httpd with mod_proxy_balancer as a load balancer. See <a* href="http://en.wikipedia.org/wiki/Load_balancing_(computing)">Load balancing on Wikipedia</a>** @since solr 1.4*/

具体的执行方法如下:

  /*** Makes a request to one or more of the given urls, using the configured load balancer.** @param req The solr search request that should be sent through the load balancer* @param urls The list of solr server urls to load balance across* @return The response from the request*/public LBHttpSolrClient.Rsp makeLoadBalancedRequest(final QueryRequest req, List<String> urls)throws SolrServerException, IOException {return loadbalancer.request(new LBHttpSolrClient.Req(req, urls));}

调用request处理请求:

 /*** Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped.* If a request fails due to an IOException, the server is moved to the dead pool for a certain period of* time, or until a test request on that server succeeds.** Servers are queried in the exact order given (except servers currently in the dead pool are skipped).* If no live servers from the provided list remain to be tried, a number of previously skipped dead servers will be tried.* Req.getNumDeadServersToTry() controls how many dead servers will be tried.** If no live servers are found a SolrServerException is thrown.** @param req contains both the request as well as the list of servers to query** @return the result of the request** @throws IOException If there is a low-level I/O error.*/public Rsp request(Req req) throws SolrServerException, IOException {Rsp rsp = new Rsp();Exception ex = null;boolean isUpdate = req.request instanceof IsUpdateRequest;List<ServerWrapper> skipped = null;long timeAllowedNano = getTimeAllowedInNanos(req.getRequest());long timeOutTime = System.nanoTime() + timeAllowedNano;for (String serverStr : req.getServers()) {if(isTimeExceeded(timeAllowedNano, timeOutTime)) {break;}serverStr = normalize(serverStr);// if the server is currently a zombie, just skip to the next oneServerWrapper wrapper = zombieServers.get(serverStr);if (wrapper != null) {// System.out.println("ZOMBIE SERVER QUERIED: " + serverStr);final int numDeadServersToTry = req.getNumDeadServersToTry();if (numDeadServersToTry > 0) {if (skipped == null) {skipped = new ArrayList<>(numDeadServersToTry);skipped.add(wrapper);}else if (skipped.size() < numDeadServersToTry) {skipped.add(wrapper);}}continue;}rsp.server = serverStr;try {MDC.put("LBHttpSolrClient.url", serverStr);HttpSolrClient client = makeSolrClient(serverStr);ex = doRequest(client, req, rsp, isUpdate, false, null);if (ex == null) {return rsp; // SUCCESS
        }} finally {MDC.remove("LBHttpSolrClient.url");}}// try the servers we previously skippedif (skipped != null) {for (ServerWrapper wrapper : skipped) {if(isTimeExceeded(timeAllowedNano, timeOutTime)) {break;}ex = doRequest(wrapper.client, req, rsp, isUpdate, true, wrapper.getKey());if (ex == null) {return rsp; // SUCCESS
        }}}if (ex == null) {throw new SolrServerException("No live SolrServers available to handle this request");} else {throw new SolrServerException("No live SolrServers available to handle this request:" + zombieServers.keySet(), ex);}}

转载于:https://www.cnblogs.com/davidwang456/p/4988775.html

solr服务器的查询过程相关推荐

  1. DNS查询过程及DNS服务器简单搭建

    1.描述DNS查询过程以及DNS服务器类别. DNS查询过程: 一次完整的查询请求经过的流程: Client--> hosts文件 --> DNS Local Cache --> D ...

  2. java操作es聚合操作并显示其他字段_java使用elasticsearch分组进行聚合查询过程解析...

    这篇文章主要介绍了java使用elasticsearch分组进行聚合查询过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 java连接elas ...

  3. PHP solr服务器搭建,搜索方案 solr+php如何安装配置?

    问题 solr+php如何安装配置使用 解决方法1: solr提供http请求查询接口.客户端通过触发http请求获取json.xml等数据格式数据,并对数据进行解析显示.一般情况下各种语言都会有封装 ...

  4. 【转】DNS查询过程

    DNS查询过程 DNS的查询过程是指在客户端通过DNS服务器将一个IP地址转换为一个FQDN(Fully Qualified Domain Name,完全合格的域名),或将一个FQDN转化为一个IP地 ...

  5. MySQL查询过程及Scheme设计与数据类型优化

    MySQL查询过程 我们总是希望MySQL能够获得更高的查询性能,最好的办法是弄清楚MySQL是如何优化和执行查询的.一旦理解了这一点,就会发现:很多的查询优化工作实际上就是遵循一些原则让MySQL的 ...

  6. mysql查询过程从客户端发送查询请求_MySQL查询过程和高级查询

    最近有个需求,要修改现有存储结构,涉及查询条件和查询效率的考量,看了几篇索引和HBase相关的文章,回忆了相关知识,结合项目需求,说说自己的理解和总结. 总体目录如下,上篇介绍了前3小节,分析了索引为 ...

  7. [日常] DNS的迭代查询过程

    DNS是应用层协议,端口号为tcp/53和udp/53 DNS查询过程,比如访问www.test.com 1.客户机查询www.test.com 2.查询首选DNS服务器,Linux下/etc/res ...

  8. 查询网站web服务器,web服务器地址查询

    web服务器地址查询 内容精选 换一换 公网域名解析是基于Internet网络的域名解析过程,可以把人们常用的域名(如www.example.com)转换成用于计算机连接的IP地址(如1.2.3.4) ...

  9. 通过解析器向DNS服务器发出查询

    解析器的用法非常简单.Socket库中的程序都是标准组件,只要从应用程序中进行调用就可以了.具体来说,在编写浏览器等应用程序的时候,只要像图1.11这样写上解析器的程序名称"gethostb ...

最新文章

  1. 光立方原理讲解_一分钟讲解光模块原理与结构
  2. mysql外键无法删除_mysql外键无法删除数据的情况
  3. vs c# release断点调试
  4. timeout connect 10000 # default 10 second time out if a backend is not found
  5. ubuntu server 10.04 LTS(64位)装不了花生壳的解决方法
  6. mysql免安装数据库用法_MySQL数据库之mysql免安装制作使用说明
  7. 95-110-022-源码-Env-LocalStreamEnvironment
  8. Linux 命令(121)—— cal 命令
  9. 标签打印软件如何制作箭头样式
  10. CTO职责铁三角:商业、技术、团队
  11. 安利一个免费在线的pdf转word、excel、ppt
  12. GIS开发:客户端控制的地图样式
  13. SPSS提示“列表中不允许存在字符串变量”的解决方法
  14. APK一键注入网络验证后台PHP源码ThinkPHP开发版
  15. 物联网外设学习笔记-语音识别模块
  16. JS 判断一个字符串是否为日期格式(兼容IOS)
  17. 滑动验证码--前端部分
  18. 读书笔记 | 张五常 经济解释 (卷一) 科学说需求
  19. oracle和mybatis自增,在Springboot项目中使用MybatisPlus和Oracle实现主键ID的自增
  20. 通识~FIR数字滤波器设计讲解

热门文章

  1. 怎么用python编简单游戏_用Python实现一个简单的算术游戏详解
  2. 在mysql中会话变量前面的字符是什么_在MySQL中仅使用会话变量仅对字符的首次出现执行搜索/替换...
  3. 快速了解FAT32文件系统
  4. ubuntu 16.10安装mysql_在Ubuntu 16.10安装mysql workbench报未安装软件包 libpng12-0错误的解决方法...
  5. java怎么修改会员信息_JavaWeb用户信息管理系统-修改用户操作的实现
  6. mysql zrm 配置_利用MySQL-zrm来备份和恢复MySQL数据库方法详解
  7. 从一个提问引发到你是怎么看待编程语言是一种工具这句话的?【笔记自用】
  8. c++枚举类型(二) c++11 枚举类
  9. tf.broadcast_dynamic_shape
  10. 我的微信luogantt