zoukankan      html  css  js  c++  java
  • solr源码解读(转)

    solr源码解读(转)原文地址:http://blog.csdn.net/duck_genuine/article/details/6962624

    配置

    solr 对一个搜索请求的的流程

    在solrconfig.xml会配置一个handler。配置了前置处理组件preParams,还有后置处理组件filterResult,当然还有默认的组件

    [html] view plaincopy
     
    1. <requestHandler name="standard" class="solr.SearchHandler" default="true">  
    2.   
    3.      <arr name="first-components">  
    4.         <str>preParams</str>  
    5.      </arr>  
    6.         <lst name="defaults">  
    7.           <str name="echoParams">explicit</str>  
    8.           <int name="rows">10</int>  
    9.           <int name="start">0</int>  
    10.          <str name="q">*:*</str>  
    11.         </lst>      
    12.   
    13.      <arr name="last-components">  
    14.         <str>filterResult</str>  
    15.      </arr>       
    16.   
    17.    </requestHandler>  



    http请求控制器

    当一个查询请求过来的时候,先到类SolrDispatchFilter,由这个分发器寻找对应的handler来处理。
     

    [java] view plaincopy
     
    1. String qt = solrReq.getParams().get( CommonParams.QT );  
    2. handler = core.getRequestHandler( qt );  



    ---------------------------------------------------------------------------------------------------

    [java] view plaincopy
     
    1. this.execute( req, handler, solrReq, solrRsp );  
    2. HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);  



    -----------------------------------------------------------------------------------------------

    从上面的代码里看出是由solrCore留下的接口来处理请求。从代码框架上,从此刻开始进入solr的核心代码。

    [java] view plaincopy
     
    1. protected void execute( HttpServletRequest req, SolrRequestHandler handler, SolrQueryRequest sreq, SolrQueryResponse rsp) {  
    2.   sreq.getContext().put( "webapp", req.getContextPath() );  
    3.   sreq.getCore().execute( handler, sreq, rsp );  
    4. }  



    看一下solrCore代码execute的方法 的主要代码

    [java] view plaincopy
     
    1. public void execute(SolrRequestHandler handler, SolrQueryRequest req, SolrQueryResponse rsp) {  
    2. 。。。。。  
    3.     handler.handleRequest(req,rsp);  
    4.     setResponseHeaderValues(handler,req,rsp);  
    5.  。。。。。。。  
    6.   }  



    主要实现对请求的处理,并将请求结果的状态信息写到响应的头部

    SolrRequestHandler 处理器

    再看一下对请求的处理。。先看定义该请求处理器的接口,可以更好理解。只有两个方法,一个是初始化信息,主要是配置时的默认参数,另一个就是处理请求的接口。

    [java] view plaincopy
     
    1. public interface SolrRequestHandler extends SolrInfoMBean {  
    2.   public void init(NamedList args);  
    3.   public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp);  
    4. }  



    先看一下实现该接口的类RequestHandlerBase

    [java] view plaincopy
     
    1. public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp) {  
    2.     numRequests++;  
    3.     try {  
    4.       SolrPluginUtils.setDefaults(req,defaults,appends,invariants);  
    5.       rsp.setHttpCaching(httpCaching);  
    6.       handleRequestBody( req, rsp );  
    7.       // count timeouts  
    8.       NamedList header = rsp.getResponseHeader();  
    9.       if(header != null) {  
    10.         Object partialResults = header.get("partialResults");  
    11.         boolean timedOut = partialResults == null ? false : (Boolean)partialResults;  
    12.         if( timedOut ) {  
    13.           numTimeouts++;  
    14.           rsp.setHttpCaching(false);  
    15.         }  
    16.       }  
    17.     } catch (Exception e) {  
    18.       SolrException.log(SolrCore.log,e);  
    19.       if (e instanceof ParseException) {  
    20.         e = new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);  
    21.       }  
    22.       rsp.setException(e);  
    23.       numErrors++;  
    24.     }  
    25.     totalTime += rsp.getEndTime() - req.getStartTime();  
    26.   }  



    主要记录该请求处理的状态与处理时间记录。真正的实现方法交由各个子类      handleRequestBody( req, rsp );

    现在看一下SearchHandler对于搜索处理的实现方法

    首先是将solrconfig.xml上配置的各个处理组件按一定顺序组装起来,先是first-Component,默认的component,last-component.这些处理组件会按照它们的顺序来执行,以下是searchHandler的实现主体。方法handleRequestBody

    [java] view plaincopy
     
    1. @Override  
    2. public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception, ParseException, InstantiationException, IllegalAccessException  
    3. {  
    4.   // int sleep = req.getParams().getInt("sleep",0);  
    5.   // if (sleep > 0) {log.error("SLEEPING for " + sleep);  Thread.sleep(sleep);}  
    6.   ResponseBuilder rb = new ResponseBuilder();  
    7.   rb.req = req;  
    8.   rb.rsp = rsp;  
    9.   rb.components = components;  
    10.   rb.setDebug(req.getParams().getBool(CommonParams.DEBUG_QUERY, false));  
    11.   
    12.   final RTimer timer = rb.isDebug() ? new RTimer() : null;  
    13.   
    14.   if (timer == null) {  
    15.     // non-debugging prepare phase  
    16.     for( SearchComponent c : components ) {  
    17.       c.prepare(rb);  
    18.     }  
    19.   } else {  
    20.     // debugging prepare phase  
    21.     RTimer subt = timer.sub( "prepare" );  
    22.     for( SearchComponent c : components ) {  
    23.       rb.setTimer( subt.sub( c.getName() ) );  
    24.       c.prepare(rb);  
    25.       rb.getTimer().stop();  
    26.     }  
    27.     subt.stop()<span style="color:#FF0000;">;</span>  
    28.   }  
    29.    //单机版  
    30.   if (rb.shards == null) {  
    31.     // a normal non-distributed request  
    32.   
    33.     // The semantics of debugging vs not debugging are different enough that  
    34.     // it makes sense to have two control loops  
    35.     if(!rb.isDebug()) {  
    36.       // Process  
    37.       for( SearchComponent c : components ) {  
    38.         c.process(rb);  
    39.       }  
    40.     }  
    41.     else {  
    42.       // Process  
    43.       RTimer subt = timer.sub( "process" );  
    44.       for( SearchComponent c : components ) {  
    45.         rb.setTimer( subt.sub( c.getName() ) );  
    46.         c.process(rb);  
    47.         rb.getTimer().stop();  
    48.       }  
    49.       subt.stop();  
    50.       timer.stop();  
    51.   
    52.       // add the timing info  
    53.       if( rb.getDebugInfo() == null ) {  
    54.         rb.setDebugInfo( new SimpleOrderedMap<Object>() );  
    55.       }  
    56.       rb.getDebugInfo().add( "timing", timer.asNamedList() );  
    57.     }  
    58.   
    59.   } else {//分布式请求  
    60.     // a distributed request  
    61.   
    62.     HttpCommComponent comm = new HttpCommComponent();  
    63.   
    64.     if (rb.outgoing == null) {  
    65.       rb.outgoing = new LinkedList<ShardRequest>();  
    66.     }  
    67.     rb.finished = new ArrayList<ShardRequest>();  
    68.   
    69.     //起始状态为0,结束状态为整数的最大值  
    70.     int nextStage = 0;  
    71.     do {  
    72.       rb.stage = nextStage;  
    73.       nextStage = ResponseBuilder.STAGE_DONE;  
    74.   
    75.       // call all components  
    76.       for( SearchComponent c : components ) {  
    77.         //得到所有组件运行后返回的下一个状态,并取最小值  
    78.         nextStage = Math.min(nextStage, c.distributedProcess(rb));  
    79.       }  
    80.   
    81.   
    82.       // 如果有需要向子机发送请求  
    83.       while (rb.outgoing.size() > 0) {  
    84.   
    85.         // submit all current request tasks at once  
    86.         while (rb.outgoing.size() > 0) {  
    87.           ShardRequest sreq = rb.outgoing.remove(0);  
    88.           sreq.actualShards = sreq.shards;  
    89.           if (sreq.actualShards==ShardRequest.ALL_SHARDS) {  
    90.             sreq.actualShards = rb.shards;  
    91.           }  
    92.           sreq.responses = new ArrayList<ShardResponse>();  
    93.   
    94.           // 向各个子机发送请求  
    95.           for (String shard : sreq.actualShards) {  
    96.             ModifiableSolrParams params = new ModifiableSolrParams(sreq.params);  
    97.             params.remove(ShardParams.SHARDS);      // not a top-level request  
    98.             params.remove("indent");  
    99.             params.remove(CommonParams.HEADER_ECHO_PARAMS);  
    100.             params.set(ShardParams.IS_SHARD, true);  // a sub (shard) request  
    101.             String shardHandler = req.getParams().get(ShardParams.SHARDS_QT);  
    102.             if (shardHandler == null) {  
    103.               params.remove(CommonParams.QT);  
    104.             } else {  
    105.               params.set(CommonParams.QT, shardHandler);  
    106.             }  
    107.           //提交子请求  
    108.            comm.submit(sreq, shard, params);  
    109.           }  
    110.         }  
    111.   
    112.   
    113.         // now wait for replies, but if anyone puts more requests on  
    114.         // the outgoing queue, send them out immediately (by exiting  
    115.         // this loop)  
    116.         while (rb.outgoing.size() == 0) {  
    117.           ShardResponse srsp = comm.takeCompletedOrError();  
    118.           if (srsp == null) break;  // no more requests to wait for  
    119.   
    120.           // Was there an exception?  If so, abort everything and  
    121.           // rethrow  
    122.           if (srsp.getException() != null) {  
    123.             comm.cancelAll();  
    124.             if (srsp.getException() instanceof SolrException) {  
    125.               throw (SolrException)srsp.getException();  
    126.             } else {  
    127.               throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, srsp.getException());  
    128.             }  
    129.           }  
    130.   
    131.           rb.finished.add(srsp.getShardRequest());  
    132.   
    133.           //每个组件都对于返回的数据处理  
    134.           for(SearchComponent c : components) {  
    135.             c.handleResponses(rb, srsp.getShardRequest());  
    136.           }  
    137.         }  
    138.       }//请求队列结束  
    139.   
    140.       //再对该轮请求进行收尾工作  
    141.       for(SearchComponent c : components) {  
    142.           c.finishStage(rb);  
    143.        }  
    144.   
    145.       //如果状态未到结束,则继续循环  
    146.     } while (nextStage != Integer.MAX_VALUE);  
    147.   }  
    148. }  

    首先运行的是各个组件的方法prepare

    [java] view plaincopy
     
    1. for( SearchComponent c : components ) {  
    2.   c.prepare(rb);  
    3. }  



    再则如果不是分布式搜索,则比较简单的运行

    [java] view plaincopy
     
    1. for( SearchComponent c : components ) {  
    2.         c.process(rb);  
    3.       }  



    就结束!

    如果是分布式搜索,过程会比较复杂些,对于每个组件处理都会返回一个状态,对于以下几个方法循环执行,直到状态结束 。  

    在类ResponseBuilder定义了几个状态。

      

    [java] view plaincopy
     
    1. public static int STAGE_START           = 0;  
    2. public static int STAGE_PARSE_QUERY     = 1000;  
    3. public static int STAGE_EXECUTE_QUERY   = 2000;  
    4. public static int STAGE_GET_FIELDS      = 3000;  
    5. public static int STAGE_DONE            = Integer.MAX_VALUE;  

    从STAGE_START---->STAGE_PARSE_QUERY------>STAGE_EXECUTE_QUERY--------------->STAGE_GET_FIELDS------------>STAGE_DONE

    从这些状态名称可以猜得出整个对应的过程。

    每个组件先调用方法distributeProcess,并返回下一个状态

    [java] view plaincopy
     
    1. for( SearchComponent c : components ) {  
    2.      // the next stage is the minimum of what all components report  
    3.      nextStage = Math.min(nextStage, c.distributedProcess(rb));  
    4.    }  

    而方法handleResponse主要处理返回来的数据

         

    [java] view plaincopy
     
    1. for(SearchComponent c : components) {  
    2.         c.handleResponses(rb, srsp.getShardRequest());  
    3.       }  


    然后交由finishStage方法来对每一个状态的过程作结束动作。

    ------------------------------

    [java] view plaincopy
     
    1. for(SearchComponent c : components) {  
    2.           c.finishStage(rb);  
    3.        }  



    -----------------------------

    了解这个流程有助于扩展solr。比如有个业务是要我对搜索的自然结果排序进行干预,而这个干预只针对前几页结果,所以我不得不做个组件来对其中结果进行处理。

    所以我想可以添加一个组件放在最后-------------》

    1)如果是分布式搜索:

           这个组件可以在重写finsihStage做处理。算是对最终结果的排序处理即可。

    2)如果只是单机:

          这个组件可以在重写process做处理

    组件

    现在看一下其中一个主要的组件QueryComponent

    prepare

    对于QueryComponent主要解析用户传送的语法解析参数defType,以及过滤查询fq,返回字段集fl.排序字段Sort

    单机处理

    process

       分布式搜索过程中的某一步,这里应该是主机要合并文档,取出对应的文档的过程,

    主机发出指定的solr主键ids来取文档集,首先取出对应的lucene的内部id集。如果某些文档已不在则弃掉。

    [java] view plaincopy
     
    1. String ids = params.get(ShardParams.IDS);  
    2.     if (ids != null) {//将传过来的ids,放进结果集中,并在后面取出对应的结果文档  
    3.      SchemaField idField = req.getSchema().getUniqueKeyField();  
    4.       List<String> idArr = StrUtils.splitSmart(ids, ",", true);  
    5.       int[] luceneIds = new int[idArr.size()];  
    6.       int docs = 0;  
    7.       for (int i=0; i<idArr.size(); i++) {  
    8.       //solr主键id对应的文档lucene内部的id  
    9.        int id = req.getSearcher().getFirstMatch(  
    10.                 new Term(idField.getName(), idField.getType().toInternal(idArr.get(i))));  
    11.         if (id >= 0)  
    12.           luceneIds[docs++] = id;  
    13.       }  
    14.        
    15.       DocListAndSet res = new DocListAndSet();  
    16.   
    17.       //这里并没有传入scores[]  
    18.   
    19.   res.docList = new DocSlice(0, docs, luceneIds, null, docs, 0);  
    20. //需要另一种doc集合处理。  
    21.  if (rb.isNeedDocSet()) {  
    22.  List<Query> queries = new ArrayList<Query>();  
    23.   queries.add(rb.getQuery());  
    24. List<Query> filters = rb.getFilters();   
    25. if (filters != null)  
    26.  queries.addAll(filters);  
    27.   res.docSet = searcher.getDocSet(queries);  
    28.  }   
    29. rb.setResults(res);  
    30.  rsp.add("response",rb.getResults().docList);  
    31.  return;   
    32. }  
    [java] view plaincopy
     
      1. <pre name="code" class="java">  //封装搜索值对象与封装结果值对象   
      2.    SolrIndexSearcher.QueryCommand cmd = rb.getQueryCommand();  
      3.    //设置超时最大值  
      4.     cmd.setTimeAllowed(timeAllowed);  
      5.     SolrIndexSearcher.QueryResult result = new SolrIndexSearcher.QueryResult();  
      6.     //搜索  
      7.     searcher.search(result,cmd);  
      8.     //设置搜索结果  
      9.     rb.setResult( result );  
      10.     rsp.add("response",rb.getResults().docList);  
      11.     rsp.getToLog().add("hits", rb.getResults().docList.matches());  
      12.     //对含有字段排序处理  
      13.     doFieldSortValues(rb, searcher);  
      14.    //非分布查询过程,且搜索结果数小于50,进行缓存  
      15.     doPrefetch(rb);  
      16.   
      17.   
      18. <pre name="code" class="java"><p>目前看到真实获取文档内容的是在</p><p>QueryResponseWriter</p><p>例如xml的输出格式类XMLWriter</p></pre><p></p>  
      19. <pre></pre>  
      20. <pre></pre>  
      21. <br>  
      22. <p></p>  
      23. <h2><a name="t10"></a>分布式处理<br>  
      24. </h2>  
      25. <h3><a name="t11"></a>1)distributedProcess</h3>  
      26. <p></p><pre name="code" class="java">  @Override    
      27.   public int distributedProcess(ResponseBuilder rb) throws IOException {  
      28.     if (rb.stage < ResponseBuilder.STAGE_PARSE_QUERY)  
      29.       return ResponseBuilder.STAGE_PARSE_QUERY;  
      30.     if (rb.stage == ResponseBuilder.STAGE_PARSE_QUERY) {  
      31.       createDistributedIdf(rb);  
      32.       return ResponseBuilder.STAGE_EXECUTE_QUERY;  
      33.     }  
      34.     if (rb.stage < ResponseBuilder.STAGE_EXECUTE_QUERY) return ResponseBuilder.STAGE_EXECUTE_QUERY;  
      35.     if (rb.stage == ResponseBuilder.STAGE_EXECUTE_QUERY) {  
      36. //分布式查询  
      37.      createMainQuery(rb);  
      38.       return ResponseBuilder.STAGE_GET_FIELDS;  
      39.     }  
      40.     if (rb.stage < ResponseBuilder.STAGE_GET_FIELDS) return ResponseBuilder.STAGE_GET_FIELDS;  
      41.     if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) {  
      42.    
      43.     //这里就会去对应的主机拿取需要的字段,封装请求字段的参数,放进请求队列里,可以由外部的searchHandler提交该请求,最后结果放在ShardResponse类里。  
      44.      createRetrieveDocs(rb);  
      45.       return ResponseBuilder.STAGE_DONE;  
      46.     }  
      47.     return ResponseBuilder.STAGE_DONE;  
      48.   }</pre><br>  
      49. <br>  
      50. <p></p>  
      51. <p>   <br>  
      52. </p>  
      53. <p><br>  
      54. </p>  
      55. <h3><a name="t12"></a> 2) handleResponses<br>  
      56. </h3>  
      57. <pre name="code" class="java"> public void handleResponses(ResponseBuilder rb, ShardRequest sreq) {    
      58.   
      59.          if ((sreq.purpose & ShardRequest.PURPOSE_GET_TOP_IDS) != 0) {  
      60.   
      61.                       //合并ids   
      62.   
      63.                mergeIds(rb, sreq);  
      64.   
      65.               //合并groupCount     
      66.   
      67.             mergeGroupCounts(rb, sreq);   
      68.   
      69.            }      
      70.   
      71.        if ((sreq.purpose & ShardRequest.PURPOSE_GET_FIELDS) != 0) {  
      72.   
      73.                //获取文档的字段,并将结题组装起来放到最终结果列表对应的位置里      
      74.   
      75.              returnFields(rb, sreq);      
      76.   
      77.             return;    
      78.   
      79.        }  
      80.   
      81.   }</pre><br>  
      82. <br>  
      83. <h3><a name="t13"></a>   3)  finishStage</h3>  
      84. <p><br>  
      85. </p>  
      86. <p> </p><pre name="code" class="java"> @Override  
      87.   public void finishStage(ResponseBuilder rb) {  
      88.    //这里说是==获取文档内容的值,在  
      89.    if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) {  
      90.        //有些文档可能已不存在了,则忽略掉  
      91.       for (Iterator<SolrDocument> iter = rb._responseDocs.iterator(); iter.hasNext();) {  
      92.         if (iter.next() == null) {  
      93.           iter.remove();  
      94.           rb._responseDocs.setNumFound(rb._responseDocs.getNumFound()-1);  
      95.         }          
      96.       }  
      97.   
      98.       rb.rsp.add("response", rb._responseDocs);  
      99.     }  
      100.   }  
      101. </pre><br>  
      102. <p></p>  
      103. <p><span style="color:#FF0000"><br>  
      104. </span></p>  
      105. <p><span style="color:#FF0000">同样最后的结果是保存在<br>  
      106. <br>  
      107. ResponseBuilder <br>  
      108. <br>  
      109.      ResponseBuilder <br>  
      110.          NamedList values = new SimpleOrderedMap();<br>  
      111. <br>  
      112. 这个字段里,以键为"response",单机存储的是lucene 的内部id列表<br>  
      113. 如果是分布式,则存储的是SolrDocumentList,不用再去索引拿出对应的存储字段,<br>  
      114. 这个在QueryResponseWriter里有对应的处理</span><br>  
      115. </p>  
      116. <p></p>  
      117. <p><br>  
      118. </p>  
      119. <p><br>  
      120. </p>  
      121. <p><br>  
      122. </p>  
      123. <p><br>  
      124. </p>  
      125. <p><br>  
      126. </p>  
      127. <p><br>  
      128. </p>  
      129. <p><br>  
      130. </p>  
      131. <p><br>  
      132. </p>  
      133. <p></p>  
      134.   
      135. </pre>  
  • 相关阅读:
    【转】系统缓存全解析二:动态缓存(4)-第三方分布式缓存解决方案 Velocity
    DevExpress.XtraTreeList.TreeList 的一些解决办法
    【转】系统缓存全解析二:动态缓存(4)-Discuz!NT中集成Memcached分布式缓存
    c#遍历Dictionary
    【转】memcached 命令概述
    WSAWaitforMultEvent使用
    创建线程是否调用CloseHandle
    小序
    select模式学习(二)之:客户端
    CoInitlize使用
  • 原文地址:https://www.cnblogs.com/strive-for-freedom/p/4229208.html
Copyright © 2011-2022 走看看