zoukankan      html  css  js  c++  java
  • lucene源码分析(8)MergeScheduler

    1.使用IndexWriter.java

        mergeScheduler.merge(this, MergeTrigger.EXPLICIT, newMergesFound);

    2.定义MergeScheduler

    /** <p>Expert: {@link IndexWriter} uses an instance
     *  implementing this interface to execute the merges
     *  selected by a {@link MergePolicy}.  The default
     *  MergeScheduler is {@link ConcurrentMergeScheduler}.</p>
     * @lucene.experimental
    */

    3.MergeTrigger 出发merge的事件

    /**
     * MergeTrigger is passed to
     * {@link MergePolicy#findMerges(MergeTrigger, SegmentInfos, MergePolicy.MergeContext)} to indicate the
     * event that triggered the merge.
     */
    public enum MergeTrigger {
      /**
       * Merge was triggered by a segment flush.
       */
      SEGMENT_FLUSH,
    
      /**
       * Merge was triggered by a full flush. Full flushes
       * can be caused by a commit, NRT reader reopen or a close call on the index writer.
       */
      FULL_FLUSH,
    
      /**
       * Merge has been triggered explicitly by the user.
       */
      EXPLICIT,
    
      /**
       * Merge was triggered by a successfully finished merge.
       */
      MERGE_FINISHED,
    
      /**
       * Merge was triggered by a closing IndexWriter.
       */
      CLOSING
    }

    4.ConcurrentMergeScheduler默认实现

    /** A {@link MergeScheduler} that runs each merge using a
     *  separate thread.
     *
     *  <p>Specify the max number of threads that may run at
     *  once, and the maximum number of simultaneous merges
     *  with {@link #setMaxMergesAndThreads}.</p>
     *
     *  <p>If the number of merges exceeds the max number of threads 
     *  then the largest merges are paused until one of the smaller
     *  merges completes.</p>
     *
     *  <p>If more than {@link #getMaxMergeCount} merges are
     *  requested then this class will forcefully throttle the
     *  incoming threads by pausing until one more more merges
     *  complete.</p>
     *
     *  <p>This class attempts to detect whether the index is
     *  on rotational storage (traditional hard drive) or not
     *  (e.g. solid-state disk) and changes the default max merge
     *  and thread count accordingly.  This detection is currently
     *  Linux-only, and relies on the OS to put the right value
     *  into /sys/block/&lt;dev&gt;/block/rotational.  For all
     *  other operating systems it currently assumes a rotational
     *  disk for backwards compatibility.  To enable default
     *  settings for spinning or solid state disks for such
     *  operating systems, use {@link #setDefaultMaxMergesAndThreads(boolean)}.
     */ 

    5.MergeThread执行merge任务

        @Override
        public void run() {
          try {
            if (verbose()) {
              message("  merge thread: start");
            }
    
            doMerge(writer, merge);
    
            if (verbose()) {
              message("  merge thread: done");
            }
    
            // Let CMS run new merges if necessary:
            try {
              merge(writer, MergeTrigger.MERGE_FINISHED, true);
            } catch (AlreadyClosedException ace) {
              // OK
            } catch (IOException ioe) {
              throw new RuntimeException(ioe);
            }
    
          } catch (Throwable exc) {
    
            if (exc instanceof MergePolicy.MergeAbortedException) {
              // OK to ignore
            } else if (suppressExceptions == false) {
              // suppressExceptions is normally only set during
              // testing.
              handleMergeException(writer.getDirectory(), exc);
            }
    
          } finally {
            synchronized(ConcurrentMergeScheduler.this) {
              removeMergeThread();
    
              updateMergeThreads();
    
              // In case we had stalled indexing, we can now wake up
              // and possibly unstall:
              ConcurrentMergeScheduler.this.notifyAll();
            }
          }
        }
      }

    merge过程

     /**
       * Merges the indicated segments, replacing them in the stack with a
       * single segment.
       * 
       * @lucene.experimental
       */
      public void merge(MergePolicy.OneMerge merge) throws IOException {
    
        boolean success = false;
    
        final long t0 = System.currentTimeMillis();
    
        final MergePolicy mergePolicy = config.getMergePolicy();
        try {
          try {
            try {
              mergeInit(merge);
    
              if (infoStream.isEnabled("IW")) {
                infoStream.message("IW", "now merge
      merge=" + segString(merge.segments) + "
      index=" + segString());
              }
    
              mergeMiddle(merge, mergePolicy);
              mergeSuccess(merge);
              success = true;
            } catch (Throwable t) {
              handleMergeException(t, merge);
            }
          } finally {
            synchronized(this) {
    
              mergeFinish(merge);
    
              if (success == false) {
                if (infoStream.isEnabled("IW")) {
                  infoStream.message("IW", "hit exception during merge");
                }
              } else if (!merge.isAborted() && (merge.maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS || (!closed && !closing))) {
                // This merge (and, generally, any change to the
                // segments) may now enable new merges, so we call
                // merge policy & update pending merges.
                updatePendingMerges(mergePolicy, MergeTrigger.MERGE_FINISHED, merge.maxNumSegments);
              }
            }
          }
        } catch (Throwable t) {
          // Important that tragicEvent is called after mergeFinish, else we hang
          // waiting for our merge thread to be removed from runningMerges:
          tragicEvent(t, "merge");
          throw t;
        }
    
        if (merge.info != null && merge.isAborted() == false) {
          if (infoStream.isEnabled("IW")) {
            infoStream.message("IW", "merge time " + (System.currentTimeMillis()-t0) + " msec for " + merge.info.info.maxDoc() + " docs");
          }
        }
      }

    6.merge结束

      /** Does fininishing for a merge, which is fast but holds
       *  the synchronized lock on IndexWriter instance. */
      final synchronized void mergeFinish(MergePolicy.OneMerge merge) {
    
        // forceMerge, addIndexes or waitForMerges may be waiting
        // on merges to finish.
        notifyAll();
    
        // It's possible we are called twice, eg if there was an
        // exception inside mergeInit
        if (merge.registerDone) {
          final List<SegmentCommitInfo> sourceSegments = merge.segments;
          for (SegmentCommitInfo info : sourceSegments) {
            mergingSegments.remove(info);
          }
          merge.registerDone = false;
        }
    
        runningMerges.remove(merge);
      }
  • 相关阅读:
    122. Best Time to Buy and Sell Stock II
    121. Best Time to Buy and Sell Stock
    72. Edit Distance
    583. Delete Operation for Two Strings
    582. Kill Process
    indexDB基本用法
    浏览器的渲染原理
    js实现txt/excel文件下载
    git 常用命令
    nginx进入 配置目录时
  • 原文地址:https://www.cnblogs.com/davidwang456/p/10059134.html
Copyright © 2011-2022 走看看