Solr4.8.0源码分析(14) 之 SolrCloud索引深入(1)
上一章节《Solr In Action 笔记(4) 之 SolrCloud分布式索引基础》简要学习了SolrCloud的索引过程,本节开始将通过阅读源码来深入学习下SolrCloud的索引过程。
1. SolrCloud的索引过程流程图
这里借用下《solrCloud Update Request Handling 更新索引流程》流程图:
由上图可以看出,SolrCloud的索引过程主要通过一个索引链过程来实现的,那么本节主要讲述下索引链以及DistributedUpdateProcessor这个过程。
2. SolrCloud update索引链
- SolrCloud的Update索引链的类是UpdateRequestProcessorChain,这个类在Solr初始化的时候就会进行定义。
- SolrCloud的Update索引链的组成可以通过solrconfig.xml进行自定义,比较灵活,例如:
1 <updateRequestProcessorChain name="key" default="true"> 2 <processor class="package.Class1" /> 3 <processor class="package.Class2" > 4 <str name="someInitParam1">value</str> 5 <int name="someInitParam2">42</int> 6 </processor> 7 <processor class="solr.LogUpdateProcessorFactory" > 8 <int name="maxNumToLog">100</int> 9 </processor> 10 <processor class="solr.RunUpdateProcessorFactory" /> 11 </updateRequestProcessorChain>
- 如果未自定义UpdateRequestProcessorChain,那么Solr就默认以下三个过程组成索引链,依次如下:
-
LogUpdateProcessorFactory, 它对应的处理过程为LogUpdateProcessor , 主要负责日志的记录;
-
DistributedUpdateProcessorFactory, 它对应的处理过程为DistributedUpdateProcessor ,主要负责对version的处理以及request的分发;
-
RunUpdateProcessorFactory, 它对应的处理过程为DirectUpdateHandler2 ,主要负责将记录add进本shard的lucene中;
-
- 如果最后一个索引链包含了RunUpdateProcessorFactory,但是没有包含DistributedUpdateProcessorFactory,那么Solr会在索引链init的时候自动在RunUpdateProcessorFactory前面加入DistributedUpdateProcessorFactory;
- 每一次update操作,都会重新创建一个索引链,即createProcessor,然后依次进行下去;
- update操作包含add,delete,commit,所以上述UpdateProcessor都会分别包含processAdd,processDelete,processCommit,依次可以细分成add 索引链,delete索引链,commit索引链。
明白了以上几点,那么开始学习索引链的源码
- 首先来查看索引链的init代码,UpdateRequestProcessorChain.init()在初始化SolrCore的时候就调用了
1 public void init(PluginInfo info) { 2 final String infomsg = "updateRequestProcessorChain "" + 3 (null != info.name ? info.name : "") + """ + 4 (info.isDefault() ? " (default)" : ""); 5 6 log.info("creating " + infomsg); 7 8 // wrap in an ArrayList so we know we know we can do fast index lookups 9 // and that add(int,Object) is supported 10 //从solrcore获取索引链的成员,存放成列表形式。索引链在Solrconfig.xml中是以插件的形式加入的 11 List<UpdateRequestProcessorFactory> list = new ArrayList 12 (solrCore.initPlugins(info.getChildren("processor"),UpdateRequestProcessorFactory.class,null)); 13 14 if(list.isEmpty()){ 15 throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 16 infomsg + " require at least one processor"); 17 } 18 19 int numDistrib = 0; 20 int runIndex = -1; 21 // hi->lo incase multiple run instances, add before first one 22 // (no idea why someone might use multiple run instances, but just in case) 23 //从后往前遍历索引链列表,寻找DistributingUpdateProcessorFactory和RunUpdateProcessorFactory 24 for (int i = list.size()-1; 0 <= i; i--) { 25 UpdateRequestProcessorFactory factory = list.get(i); 26 if (factory instanceof DistributingUpdateProcessorFactory) { 27 numDistrib++; //DistributingUpdateProcessorFactory的个数不能超过1 28 } 29 if (factory instanceof RunUpdateProcessorFactory) { 30 runIndex = i; //RunUpdateProcessorFactory的编号 31 } 32 } 33 if (1 < numDistrib) { 34 throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 35 infomsg + " may not contain more then one " + 36 "instance of DistributingUpdateProcessorFactory"); 37 } 38 //如果存在RunUpdateProcessorFactory且没有DistributingUpdateProcessorFactory, 39 //那么会在RunUpdateProcessorFactory之前加入DistributingUpdateProcessorFactory 40 if (0 <= runIndex && 0 == numDistrib) { 41 // by default, add distrib processor immediately before run 42 DistributedUpdateProcessorFactory distrib 43 = new DistributedUpdateProcessorFactory(); 44 distrib.init(new NamedList()); 45 list.add(runIndex, distrib); 46 47 log.info("inserting DistributedUpdateProcessorFactory into " + infomsg); 48 } 49 50 chain = list.toArray(new UpdateRequestProcessorFactory[list.size()]); 51 }
- 每当有update请求时候就会触发索引链的创建,UpdateRequestProcessorChain.createProcessor。
1 public UpdateRequestProcessor createProcessor(SolrQueryRequest req, 2 SolrQueryResponse rsp) 3 { 4 UpdateRequestProcessor processor = null; 5 UpdateRequestProcessor last = null; 6 //获取distribPhase 是否需要跳过DistributingUpdateProcessorFactory前面那一过程(即LogUpdateProcessor),该参数不是由客户端生成 7 final String distribPhase = req.getParams().get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM); 8 final boolean skipToDistrib = distribPhase != null; 9 boolean afterDistrib = true; // we iterate backwards, so true to start 10 //从后往前组件索引链即 11 //LogUpdateProcessor.next = DistributedUpdateProcessor 12 //DistributedUpdateProcessor.next = DirectUpdateHandler2 13 //DirectUpdateHandler2.next = null 14 for (int i = chain.length-1; i>=0; i--) { 15 UpdateRequestProcessorFactory factory = chain[i]; 16 17 if (skipToDistrib) { 18 if (afterDistrib) { 19 // 跳过DistributingUpdateProcessorFactory前面的索引链过程 20 if (factory instanceof DistributingUpdateProcessorFactory) { 21 afterDistrib = false; 22 } 23 } else if (!(factory instanceof UpdateRequestProcessorFactory.RunAlways)) { 24 // skip anything that doesn't have the marker interface 25 continue; 26 } 27 } 28 //创建UpdateRequestProcessorFactory对应的UpdateProcessor,并进行连接 29 processor = factory.getInstance(req, rsp, last); 30 last = processor == null ? last : processor; 31 } 32 33 return last; 34 }
- 在UpdateRequestProcessorChain.createProcessor会调用UpdateRequestProcessorFactory的getInstance创建对应的updateProcessor,
-
LogUpdateProcessor
-
1 public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { 2 return LogUpdateProcessor.log.isInfoEnabled() ? new LogUpdateProcessor(req, rsp, this, next) : null; 3 }
-
- DistributedUpdateProcessor
1 public DistributedUpdateProcessor getInstance(SolrQueryRequest req, 2 SolrQueryResponse rsp, UpdateRequestProcessor next) { 3 4 return new DistributedUpdateProcessor(req, rsp, next); 5 }
-
-
RunUpdateProcessorFactory
-
1 public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) 2 { 3 return new RunUpdateProcessor(req, next); 4 }
总结:本节主要学习了SolrCloud分布式索引的整体流程,以及SolrCloud建索引时候索引链的情况,下一节将详细介绍索引链的具体过程。