zoukankan      html  css  js  c++  java
  • Solr4.8.0源码分析(14)之SolrCloud索引深入(1)

    Solr4.8.0源码分析(14) 之 SolrCloud索引深入(1)

         上一章节《Solr In Action 笔记(4) 之 SolrCloud分布式索引基础》简要学习了SolrCloud的索引过程,本节开始将通过阅读源码来深入学习下SolrCloud的索引过程。

    1. SolrCloud的索引过程流程图

          这里借用下《solrCloud Update Request Handling 更新索引流程》流程图:

        由上图可以看出,SolrCloud的索引过程主要通过一个索引链过程来实现的,那么本节主要讲述下索引链以及DistributedUpdateProcessor这个过程。

    2. SolrCloud update索引链

    • SolrCloud的Update索引链的类是UpdateRequestProcessorChain,这个类在Solr初始化的时候就会进行定义。
    • SolrCloud的Update索引链的组成可以通过solrconfig.xml进行自定义,比较灵活,例如:
     1  <updateRequestProcessorChain name="key" default="true">
     2    <processor class="package.Class1" />
     3    <processor class="package.Class2" >
     4      <str name="someInitParam1">value</str>
     5      <int name="someInitParam2">42</int>
     6    </processor>
     7    <processor class="solr.LogUpdateProcessorFactory" >
     8      <int name="maxNumToLog">100</int>
     9    </processor>
    10    <processor class="solr.RunUpdateProcessorFactory" />
    11  </updateRequestProcessorChain>
    • 如果未自定义UpdateRequestProcessorChain,那么Solr就默认以下三个过程组成索引链,依次如下:
      • LogUpdateProcessorFactory, 它对应的处理过程为LogUpdateProcessor , 主要负责日志的记录;

      • DistributedUpdateProcessorFactory, 它对应的处理过程为DistributedUpdateProcessor ,主要负责对version的处理以及request的分发;

      • RunUpdateProcessorFactory,  它对应的处理过程为DirectUpdateHandler2 ,主要负责将记录add进本shard的lucene中;

    • 如果最后一个索引链包含了RunUpdateProcessorFactory,但是没有包含DistributedUpdateProcessorFactory,那么Solr会在索引链init的时候自动在RunUpdateProcessorFactory前面加入DistributedUpdateProcessorFactory;
    • 每一次update操作,都会重新创建一个索引链,即createProcessor,然后依次进行下去;
    • update操作包含add,delete,commit,所以上述UpdateProcessor都会分别包含processAdd,processDelete,processCommit,依次可以细分成add 索引链,delete索引链,commit索引链。

        明白了以上几点,那么开始学习索引链的源码

    • 首先来查看索引链的init代码,UpdateRequestProcessorChain.init()在初始化SolrCore的时候就调用了
     1  public void init(PluginInfo info) {
     2     final String infomsg = "updateRequestProcessorChain "" + 
     3       (null != info.name ? info.name : "") + """ + 
     4       (info.isDefault() ? " (default)" : "");
     5 
     6     log.info("creating " + infomsg);
     7 
     8     // wrap in an ArrayList so we know we know we can do fast index lookups 
     9     // and that add(int,Object) is supported
    10     //从solrcore获取索引链的成员,存放成列表形式。索引链在Solrconfig.xml中是以插件的形式加入的
    11     List<UpdateRequestProcessorFactory> list = new ArrayList
    12       (solrCore.initPlugins(info.getChildren("processor"),UpdateRequestProcessorFactory.class,null));
    13 
    14     if(list.isEmpty()){
    15       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
    16                               infomsg + " require at least one processor");
    17     }
    18 
    19     int numDistrib = 0;
    20     int runIndex = -1;
    21     // hi->lo incase multiple run instances, add before first one
    22     // (no idea why someone might use multiple run instances, but just in case)
    23     //从后往前遍历索引链列表,寻找DistributingUpdateProcessorFactory和RunUpdateProcessorFactory
    24     for (int i = list.size()-1; 0 <= i; i--) {
    25       UpdateRequestProcessorFactory factory = list.get(i);
    26       if (factory instanceof DistributingUpdateProcessorFactory) {
    27         numDistrib++; //DistributingUpdateProcessorFactory的个数不能超过1
    28       }
    29       if (factory instanceof RunUpdateProcessorFactory) {
    30         runIndex = i; //RunUpdateProcessorFactory的编号
    31       }
    32     }
    33     if (1 < numDistrib) {
    34       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
    35                               infomsg + " may not contain more then one " +
    36                               "instance of DistributingUpdateProcessorFactory");
    37     }
    38     //如果存在RunUpdateProcessorFactory且没有DistributingUpdateProcessorFactory,
    39     //那么会在RunUpdateProcessorFactory之前加入DistributingUpdateProcessorFactory
    40     if (0 <= runIndex && 0 == numDistrib) {
    41       // by default, add distrib processor immediately before run
    42       DistributedUpdateProcessorFactory distrib 
    43         = new DistributedUpdateProcessorFactory();
    44       distrib.init(new NamedList());
    45       list.add(runIndex, distrib);
    46 
    47       log.info("inserting DistributedUpdateProcessorFactory into " + infomsg);
    48     }
    49 
    50     chain = list.toArray(new UpdateRequestProcessorFactory[list.size()]); 
    51   }
    • 每当有update请求时候就会触发索引链的创建,UpdateRequestProcessorChain.createProcessor。
     1 public UpdateRequestProcessor createProcessor(SolrQueryRequest req, 
     2                                                 SolrQueryResponse rsp) 
     3   {
     4     UpdateRequestProcessor processor = null;
     5     UpdateRequestProcessor last = null;
     6     //获取distribPhase 是否需要跳过DistributingUpdateProcessorFactory前面那一过程(即LogUpdateProcessor),该参数不是由客户端生成
     7     final String distribPhase = req.getParams().get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM);
     8     final boolean skipToDistrib = distribPhase != null;
     9     boolean afterDistrib = true;  // we iterate backwards, so true to start
    10     //从后往前组件索引链即
    11     //LogUpdateProcessor.next = DistributedUpdateProcessor
    12     //DistributedUpdateProcessor.next = DirectUpdateHandler2
    13     //DirectUpdateHandler2.next = null
    14     for (int i = chain.length-1; i>=0; i--) {
    15       UpdateRequestProcessorFactory factory = chain[i];
    16 
    17       if (skipToDistrib) {
    18         if (afterDistrib) {
    19           // 跳过DistributingUpdateProcessorFactory前面的索引链过程
    20           if (factory instanceof DistributingUpdateProcessorFactory) {
    21             afterDistrib = false;
    22           }
    23         } else if (!(factory instanceof UpdateRequestProcessorFactory.RunAlways)) {
    24           // skip anything that doesn't have the marker interface
    25           continue;
    26         }
    27       }
    28       //创建UpdateRequestProcessorFactory对应的UpdateProcessor,并进行连接
    29       processor = factory.getInstance(req, rsp, last);
    30       last = processor == null ? last : processor;
    31     }
    32 
    33     return last;
    34   }
    • 在UpdateRequestProcessorChain.createProcessor会调用UpdateRequestProcessorFactory的getInstance创建对应的updateProcessor,
      • LogUpdateProcessor

    1   public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
    2     return LogUpdateProcessor.log.isInfoEnabled() ? new LogUpdateProcessor(req, rsp, this, next) : null;
    3   }
      • DistributedUpdateProcessor
    1   public DistributedUpdateProcessor getInstance(SolrQueryRequest req,
    2       SolrQueryResponse rsp, UpdateRequestProcessor next) {
    3 
    4     return new DistributedUpdateProcessor(req, rsp, next);
    5   }
      • RunUpdateProcessorFactory

     

    1   public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) 
    2   {
    3     return new RunUpdateProcessor(req, next);
    4   }

    总结:本节主要学习了SolrCloud分布式索引的整体流程,以及SolrCloud建索引时候索引链的情况,下一节将详细介绍索引链的具体过程。

  • 相关阅读:
    Linux IO接口 监控 (iostat)
    linux 防火墙 命令
    _CommandPtr 添加参数 0xC0000005: Access violation writing location 0xcccccccc 错误
    Visual Studio自动关闭
    Linux vsftpd 安装 配置
    linux 挂载外部存储设备 (mount)
    myeclipse 9.0 激活 for win7 redhat mac 亲测
    英文操作系统 Myeclipse Console 乱码问题
    Linux 基本操作命令
    linux 查看系统相关 命令
  • 原文地址:https://www.cnblogs.com/rcfeng/p/4088506.html
Copyright © 2011-2022 走看看