1.使用IndexWriter.java
mergeScheduler.merge(this, MergeTrigger.EXPLICIT, newMergesFound);
2.定义MergeScheduler
/** <p>Expert: {@link IndexWriter} uses an instance
* implementing this interface to execute the merges
* selected by a {@link MergePolicy}. The default
* MergeScheduler is {@link ConcurrentMergeScheduler}.</p>
* @lucene.experimental
*/
3.MergeTrigger 出发merge的事件
/** * MergeTrigger is passed to * {@link MergePolicy#findMerges(MergeTrigger, SegmentInfos, MergePolicy.MergeContext)} to indicate the * event that triggered the merge. */ public enum MergeTrigger { /** * Merge was triggered by a segment flush. */ SEGMENT_FLUSH, /** * Merge was triggered by a full flush. Full flushes * can be caused by a commit, NRT reader reopen or a close call on the index writer. */ FULL_FLUSH, /** * Merge has been triggered explicitly by the user. */ EXPLICIT, /** * Merge was triggered by a successfully finished merge. */ MERGE_FINISHED, /** * Merge was triggered by a closing IndexWriter. */ CLOSING }
4.ConcurrentMergeScheduler默认实现
/** A {@link MergeScheduler} that runs each merge using a * separate thread. * * <p>Specify the max number of threads that may run at * once, and the maximum number of simultaneous merges * with {@link #setMaxMergesAndThreads}.</p> * * <p>If the number of merges exceeds the max number of threads * then the largest merges are paused until one of the smaller * merges completes.</p> * * <p>If more than {@link #getMaxMergeCount} merges are * requested then this class will forcefully throttle the * incoming threads by pausing until one more more merges * complete.</p> * * <p>This class attempts to detect whether the index is * on rotational storage (traditional hard drive) or not * (e.g. solid-state disk) and changes the default max merge * and thread count accordingly. This detection is currently * Linux-only, and relies on the OS to put the right value * into /sys/block/<dev>/block/rotational. For all * other operating systems it currently assumes a rotational * disk for backwards compatibility. To enable default * settings for spinning or solid state disks for such * operating systems, use {@link #setDefaultMaxMergesAndThreads(boolean)}. */
5.MergeThread执行merge任务
@Override public void run() { try { if (verbose()) { message(" merge thread: start"); } doMerge(writer, merge); if (verbose()) { message(" merge thread: done"); } // Let CMS run new merges if necessary: try { merge(writer, MergeTrigger.MERGE_FINISHED, true); } catch (AlreadyClosedException ace) { // OK } catch (IOException ioe) { throw new RuntimeException(ioe); } } catch (Throwable exc) { if (exc instanceof MergePolicy.MergeAbortedException) { // OK to ignore } else if (suppressExceptions == false) { // suppressExceptions is normally only set during // testing. handleMergeException(writer.getDirectory(), exc); } } finally { synchronized(ConcurrentMergeScheduler.this) { removeMergeThread(); updateMergeThreads(); // In case we had stalled indexing, we can now wake up // and possibly unstall: ConcurrentMergeScheduler.this.notifyAll(); } } } }
merge过程
/** * Merges the indicated segments, replacing them in the stack with a * single segment. * * @lucene.experimental */ public void merge(MergePolicy.OneMerge merge) throws IOException { boolean success = false; final long t0 = System.currentTimeMillis(); final MergePolicy mergePolicy = config.getMergePolicy(); try { try { try { mergeInit(merge); if (infoStream.isEnabled("IW")) { infoStream.message("IW", "now merge merge=" + segString(merge.segments) + " index=" + segString()); } mergeMiddle(merge, mergePolicy); mergeSuccess(merge); success = true; } catch (Throwable t) { handleMergeException(t, merge); } } finally { synchronized(this) { mergeFinish(merge); if (success == false) { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "hit exception during merge"); } } else if (!merge.isAborted() && (merge.maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS || (!closed && !closing))) { // This merge (and, generally, any change to the // segments) may now enable new merges, so we call // merge policy & update pending merges. updatePendingMerges(mergePolicy, MergeTrigger.MERGE_FINISHED, merge.maxNumSegments); } } } } catch (Throwable t) { // Important that tragicEvent is called after mergeFinish, else we hang // waiting for our merge thread to be removed from runningMerges: tragicEvent(t, "merge"); throw t; } if (merge.info != null && merge.isAborted() == false) { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "merge time " + (System.currentTimeMillis()-t0) + " msec for " + merge.info.info.maxDoc() + " docs"); } } }
6.merge结束
/** Does fininishing for a merge, which is fast but holds * the synchronized lock on IndexWriter instance. */ final synchronized void mergeFinish(MergePolicy.OneMerge merge) { // forceMerge, addIndexes or waitForMerges may be waiting // on merges to finish. notifyAll(); // It's possible we are called twice, eg if there was an // exception inside mergeInit if (merge.registerDone) { final List<SegmentCommitInfo> sourceSegments = merge.segments; for (SegmentCommitInfo info : sourceSegments) { mergingSegments.remove(info); } merge.registerDone = false; } runningMerges.remove(merge); }