zoukankan      html  css  js  c++  java
  • Solr4.8.0源码分析(14)之SolrCloud索引深入(1)

    Solr4.8.0源码分析(14) 之 SolrCloud索引深入(1)

         上一章节《Solr In Action 笔记(4) 之 SolrCloud分布式索引基础》简要学习了SolrCloud的索引过程,本节开始将通过阅读源码来深入学习下SolrCloud的索引过程。

    1. SolrCloud的索引过程流程图

          这里借用下《solrCloud Update Request Handling 更新索引流程》流程图:

        由上图可以看出,SolrCloud的索引过程主要通过一个索引链过程来实现的,那么本节主要讲述下索引链以及DistributedUpdateProcessor这个过程。

    2. SolrCloud update索引链

    • SolrCloud的Update索引链的类是UpdateRequestProcessorChain,这个类在Solr初始化的时候就会进行定义。
    • SolrCloud的Update索引链的组成可以通过solrconfig.xml进行自定义,比较灵活,例如:
     1  <updateRequestProcessorChain name="key" default="true">
     2    <processor class="package.Class1" />
     3    <processor class="package.Class2" >
     4      <str name="someInitParam1">value</str>
     5      <int name="someInitParam2">42</int>
     6    </processor>
     7    <processor class="solr.LogUpdateProcessorFactory" >
     8      <int name="maxNumToLog">100</int>
     9    </processor>
    10    <processor class="solr.RunUpdateProcessorFactory" />
    11  </updateRequestProcessorChain>
    • 如果未自定义UpdateRequestProcessorChain,那么Solr就默认以下三个过程组成索引链,依次如下:
      • LogUpdateProcessorFactory, 它对应的处理过程为LogUpdateProcessor , 主要负责日志的记录;

      • DistributedUpdateProcessorFactory, 它对应的处理过程为DistributedUpdateProcessor ,主要负责对version的处理以及request的分发;

      • RunUpdateProcessorFactory,  它对应的处理过程为DirectUpdateHandler2 ,主要负责将记录add进本shard的lucene中;

    • 如果最后一个索引链包含了RunUpdateProcessorFactory,但是没有包含DistributedUpdateProcessorFactory,那么Solr会在索引链init的时候自动在RunUpdateProcessorFactory前面加入DistributedUpdateProcessorFactory;
    • 每一次update操作,都会重新创建一个索引链,即createProcessor,然后依次进行下去;
    • update操作包含add,delete,commit,所以上述UpdateProcessor都会分别包含processAdd,processDelete,processCommit,依次可以细分成add 索引链,delete索引链,commit索引链。

        明白了以上几点,那么开始学习索引链的源码

    • 首先来查看索引链的init代码,UpdateRequestProcessorChain.init()在初始化SolrCore的时候就调用了
     1  public void init(PluginInfo info) {
     2     final String infomsg = "updateRequestProcessorChain "" + 
     3       (null != info.name ? info.name : "") + """ + 
     4       (info.isDefault() ? " (default)" : "");
     5 
     6     log.info("creating " + infomsg);
     7 
     8     // wrap in an ArrayList so we know we know we can do fast index lookups 
     9     // and that add(int,Object) is supported
    10     //从solrcore获取索引链的成员,存放成列表形式。索引链在Solrconfig.xml中是以插件的形式加入的
    11     List<UpdateRequestProcessorFactory> list = new ArrayList
    12       (solrCore.initPlugins(info.getChildren("processor"),UpdateRequestProcessorFactory.class,null));
    13 
    14     if(list.isEmpty()){
    15       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
    16                               infomsg + " require at least one processor");
    17     }
    18 
    19     int numDistrib = 0;
    20     int runIndex = -1;
    21     // hi->lo incase multiple run instances, add before first one
    22     // (no idea why someone might use multiple run instances, but just in case)
    23     //从后往前遍历索引链列表,寻找DistributingUpdateProcessorFactory和RunUpdateProcessorFactory
    24     for (int i = list.size()-1; 0 <= i; i--) {
    25       UpdateRequestProcessorFactory factory = list.get(i);
    26       if (factory instanceof DistributingUpdateProcessorFactory) {
    27         numDistrib++; //DistributingUpdateProcessorFactory的个数不能超过1
    28       }
    29       if (factory instanceof RunUpdateProcessorFactory) {
    30         runIndex = i; //RunUpdateProcessorFactory的编号
    31       }
    32     }
    33     if (1 < numDistrib) {
    34       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
    35                               infomsg + " may not contain more then one " +
    36                               "instance of DistributingUpdateProcessorFactory");
    37     }
    38     //如果存在RunUpdateProcessorFactory且没有DistributingUpdateProcessorFactory,
    39     //那么会在RunUpdateProcessorFactory之前加入DistributingUpdateProcessorFactory
    40     if (0 <= runIndex && 0 == numDistrib) {
    41       // by default, add distrib processor immediately before run
    42       DistributedUpdateProcessorFactory distrib 
    43         = new DistributedUpdateProcessorFactory();
    44       distrib.init(new NamedList());
    45       list.add(runIndex, distrib);
    46 
    47       log.info("inserting DistributedUpdateProcessorFactory into " + infomsg);
    48     }
    49 
    50     chain = list.toArray(new UpdateRequestProcessorFactory[list.size()]); 
    51   }
    • 每当有update请求时候就会触发索引链的创建,UpdateRequestProcessorChain.createProcessor。
     1 public UpdateRequestProcessor createProcessor(SolrQueryRequest req, 
     2                                                 SolrQueryResponse rsp) 
     3   {
     4     UpdateRequestProcessor processor = null;
     5     UpdateRequestProcessor last = null;
     6     //获取distribPhase 是否需要跳过DistributingUpdateProcessorFactory前面那一过程(即LogUpdateProcessor),该参数不是由客户端生成
     7     final String distribPhase = req.getParams().get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM);
     8     final boolean skipToDistrib = distribPhase != null;
     9     boolean afterDistrib = true;  // we iterate backwards, so true to start
    10     //从后往前组件索引链即
    11     //LogUpdateProcessor.next = DistributedUpdateProcessor
    12     //DistributedUpdateProcessor.next = DirectUpdateHandler2
    13     //DirectUpdateHandler2.next = null
    14     for (int i = chain.length-1; i>=0; i--) {
    15       UpdateRequestProcessorFactory factory = chain[i];
    16 
    17       if (skipToDistrib) {
    18         if (afterDistrib) {
    19           // 跳过DistributingUpdateProcessorFactory前面的索引链过程
    20           if (factory instanceof DistributingUpdateProcessorFactory) {
    21             afterDistrib = false;
    22           }
    23         } else if (!(factory instanceof UpdateRequestProcessorFactory.RunAlways)) {
    24           // skip anything that doesn't have the marker interface
    25           continue;
    26         }
    27       }
    28       //创建UpdateRequestProcessorFactory对应的UpdateProcessor,并进行连接
    29       processor = factory.getInstance(req, rsp, last);
    30       last = processor == null ? last : processor;
    31     }
    32 
    33     return last;
    34   }
    • 在UpdateRequestProcessorChain.createProcessor会调用UpdateRequestProcessorFactory的getInstance创建对应的updateProcessor,
      • LogUpdateProcessor

    1   public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
    2     return LogUpdateProcessor.log.isInfoEnabled() ? new LogUpdateProcessor(req, rsp, this, next) : null;
    3   }
      • DistributedUpdateProcessor
    1   public DistributedUpdateProcessor getInstance(SolrQueryRequest req,
    2       SolrQueryResponse rsp, UpdateRequestProcessor next) {
    3 
    4     return new DistributedUpdateProcessor(req, rsp, next);
    5   }
      • RunUpdateProcessorFactory

     

    1   public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) 
    2   {
    3     return new RunUpdateProcessor(req, next);
    4   }

    总结:本节主要学习了SolrCloud分布式索引的整体流程,以及SolrCloud建索引时候索引链的情况,下一节将详细介绍索引链的具体过程。

  • 相关阅读:
    C#发送邮件
    C# MD5加密
    html实现艺术字
    sql日期转换比较问题
    web 抓取
    NHibernate主要数据操作方法
    写日志
    备忘 sql分页
    自我介绍
    企业级应用和互联网应用的区别
  • 原文地址:https://www.cnblogs.com/rcfeng/p/4088506.html
Copyright © 2011-2022 走看看