solr索引操作(包括新增 更新 删除 提交 合并等)相关UML图如下
从上面的类图我们可以发现,其中体现了工厂方法模式及责任链模式的运用
UpdateRequestProcessor相当于责任链模式中的处理器角色,我们通过如下的对象图也许更能反映多个UpdateRequestProcessor类型的处理器的活动行为
UpdateRequestProcessorChain为请求处理器链,供客户端调用(内部依赖处理器工厂数组生成不同的处理器)
public final class UpdateRequestProcessorChain implements PluginInfoInitialized { private UpdateRequestProcessorFactory[] chain; private final SolrCore solrCore; public UpdateRequestProcessorChain(SolrCore solrCore) { this.solrCore = solrCore; } public void init(PluginInfo info) { List<UpdateRequestProcessorFactory> list = solrCore.initPlugins(info.getChildren("processor"),UpdateRequestProcessorFactory.class,null); if(list.isEmpty()){ throw new RuntimeException( "updateRequestProcessorChain require at least one processor"); } chain = list.toArray(new UpdateRequestProcessorFactory[list.size()]); } public UpdateRequestProcessorChain( UpdateRequestProcessorFactory[] chain , SolrCore solrCore) { this.chain = chain; this.solrCore = solrCore; } public UpdateRequestProcessor createProcessor(SolrQueryRequest req, SolrQueryResponse rsp) { UpdateRequestProcessor processor = null; UpdateRequestProcessor last = null; for (int i = chain.length-1; i>=0; i--) { processor = chain[i].getInstance(req, rsp, last); last = processor == null ? last : processor; } return last; } public UpdateRequestProcessorFactory[] getFactories() { return chain; } }
UpdateRequestProcessorFactory为请求处理器抽象工厂类,用于实例化请求处理器(UpdateRequestProcessor),而具体的实例化过程延迟到子类实现
/** * A factory to generate an UpdateRequestProcessor for each request. * * If the factory needs access to {@link SolrCore} in initialization, it could * implement {@link SolrCoreAware} * * @since solr 1.3 */ public abstract class UpdateRequestProcessorFactory implements NamedListInitializedPlugin { public void init( NamedList args ) { // could process the Node } abstract public UpdateRequestProcessor getInstance( SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next ); }
请求处理器抽象类UpdateRequestProcessor,持有对自身类型的引用(责任链模式的体现)
/** * This is a good place for subclassed update handlers to process the document before it is * indexed. You may wish to add/remove fields or check if the requested user is allowed to * update the given document... * * Perhaps you continue adding an error message (without indexing the document)... * perhaps you throw an error and halt indexing (remove anything already indexed??) * * By default, this just passes the request to the next processor in the chain. * * @since solr 1.3 */ public abstract class UpdateRequestProcessor { protected final Logger log = LoggerFactory.getLogger(getClass()); protected final UpdateRequestProcessor next; public UpdateRequestProcessor( UpdateRequestProcessor next) { this.next = next; } public void processAdd(AddUpdateCommand cmd) throws IOException { if (next != null) next.processAdd(cmd); } public void processDelete(DeleteUpdateCommand cmd) throws IOException { if (next != null) next.processDelete(cmd); } public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException { if (next != null) next.processMergeIndexes(cmd); } public void processCommit(CommitUpdateCommand cmd) throws IOException { if (next != null) next.processCommit(cmd); } /** * @since Solr 1.4 */ public void processRollback(RollbackUpdateCommand cmd) throws IOException { if (next != null) next.processRollback(cmd); } public void finish() throws IOException { if (next != null) next.finish(); } }
具体工厂类RunUpdateProcessorFactory及具体请求处理器RunUpdateProcessor
/** * Pass the command to the UpdateHandler without any modifications * * @since solr 1.3 */ public class RunUpdateProcessorFactory extends UpdateRequestProcessorFactory { @Override public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { return new RunUpdateProcessor(req, next); } } class RunUpdateProcessor extends UpdateRequestProcessor { private final SolrQueryRequest req; private final UpdateHandler updateHandler; public RunUpdateProcessor(SolrQueryRequest req, UpdateRequestProcessor next) { super( next ); this.req = req; this.updateHandler = req.getCore().getUpdateHandler(); } @Override public void processAdd(AddUpdateCommand cmd) throws IOException { cmd.doc = DocumentBuilder.toDocument(cmd.getSolrInputDocument(), req.getSchema()); updateHandler.addDoc(cmd); super.processAdd(cmd); } @Override public void processDelete(DeleteUpdateCommand cmd) throws IOException { if( cmd.id != null ) { updateHandler.delete(cmd); } else { updateHandler.deleteByQuery(cmd); } super.processDelete(cmd); } @Override public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException { updateHandler.mergeIndexes(cmd); super.processMergeIndexes(cmd); } @Override public void processCommit(CommitUpdateCommand cmd) throws IOException { updateHandler.commit(cmd); super.processCommit(cmd); } /** * @since Solr 1.4 */ @Override public void processRollback(RollbackUpdateCommand cmd) throws IOException { updateHandler.rollback(cmd); super.processRollback(cmd); } }
命令参数
/** An index update command encapsulated in an object (Command pattern) * * @version $Id: UpdateCommand.java 1065312 2011-01-30 16:08:25Z rmuir $ */ public class UpdateCommand { protected String commandName; public UpdateCommand(String commandName) { this.commandName = commandName; } @Override public String toString() { return commandName; } }
AddUpdateCommand命令
/** * @version $Id: AddUpdateCommand.java 1145201 2011-07-11 15:18:47Z yonik $ */ public class AddUpdateCommand extends UpdateCommand { // optional id in "internal" indexed form... if it is needed and not supplied, // it will be obtained from the doc. public String indexedId; // The Lucene document to be indexed public Document doc; // Higher level SolrInputDocument, normally used to construct the Lucene Document // to index. public SolrInputDocument solrDoc; public boolean allowDups; public boolean overwritePending; public boolean overwriteCommitted; public Term updateTerm; public int commitWithin = -1; /** Reset state to reuse this object with a different document in the same request */ public void clear() { doc = null; solrDoc = null; indexedId = null; } public SolrInputDocument getSolrInputDocument() { return solrDoc; } public Document getLuceneDocument(IndexSchema schema) { if (doc == null && solrDoc != null) { // TODO?? build the doc from the SolrDocument? } return doc; } public String getIndexedId(IndexSchema schema) { if (indexedId == null) { SchemaField sf = schema.getUniqueKeyField(); if (sf != null) { if (doc != null) { schema.getUniqueKeyField(); Fieldable storedId = doc.getFieldable(sf.getName()); indexedId = sf.getType().storedToIndexed(storedId); } if (solrDoc != null) { SolrInputField field = solrDoc.getField(sf.getName()); if (field != null) { indexedId = sf.getType().toInternal( field.getFirstValue().toString() ); } } } } return indexedId; } public String getPrintableId(IndexSchema schema) { SchemaField sf = schema.getUniqueKeyField(); if (indexedId != null && sf != null) { return sf.getType().indexedToReadable(indexedId); } if (doc != null) { return schema.printableUniqueKey(doc); } if (solrDoc != null && sf != null) { SolrInputField field = solrDoc.getField(sf.getName()); if (field != null) { return field.getFirstValue().toString(); } } return "(null)"; } public AddUpdateCommand() { super("add"); } @Override public String toString() { StringBuilder sb = new StringBuilder(commandName); sb.append(':'); if (indexedId !=null) sb.append("id=").append(indexedId); sb.append(",allowDups=").append(allowDups); sb.append(",overwritePending=").append(overwritePending); sb.append(",overwriteCommitted=").append(overwriteCommitted); return sb.toString(); } }
DeleteUpdateCommand命令
/** * @version $Id: DeleteUpdateCommand.java 1235304 2012-01-24 15:39:17Z janhoy $ */ public class DeleteUpdateCommand extends UpdateCommand { public String id; // external (printable) id, for delete-by-id public String query; // query string for delete-by-query public boolean fromPending; public boolean fromCommitted; public int commitWithin = -1; public DeleteUpdateCommand() { super("delete"); } @Override public String toString() { StringBuilder sb = new StringBuilder(commandName); sb.append(':'); if (id!=null) sb.append("id=").append(id); else sb.append("query=`").append(query).append('`'); sb.append(",fromPending=").append(fromPending); sb.append(",fromCommitted=").append(fromCommitted); sb.append(",commitWithin=").append(commitWithin); return sb.toString(); } }
---------------------------------------------------------------------------
本系列solr&lucene3.6.0源码解析系本人原创
转载请注明出处 博客园 刺猬的温驯
本人邮箱: chenying998179#163.com (#改为@)