zoukankan      html  css  js  c++  java
  • 企业搜索引擎开发之连接器connector(二十六)



     /** This connector instance's current traversal schedule. */
      private volatile TraversalSchedule traversalSchedule;
      /** Directory that contains snapshots. */
      private final SnapshotStore snapshotStore;
      /** The root of the repository to monitor */
      private final SnapshotRepository<? extends DocumentSnapshot> query;
      /** Reader for the current snapshot. */
      private SnapshotReader snapshotReader;
      /** Callback to invoke when a change is detected. */
      private final Callback callback;
      /** Current record from the snapshot. */
      private DocumentSnapshot current;
      /** The snapshot we are currently writing */
      private OrderedSnapshotWriter snapshotWriter;
      private final String name;
      private final DocumentSnapshotFactory documentSnapshotFactory;
      private final DocumentSink documentSink;
      /* Contains a checkpoint confirmation from CM. */
      private MonitorCheckpoint guaranteeCheckpoint;
      /* The monitor should exit voluntarily if set to false */
      private volatile boolean isRunning = true;
       * Creates a DocumentSnapshotRepositoryMonitor that monitors the
       * Repository rooted at {@code root}.
       * @param name the name of this monitor (a hash of the start path)
       * @param query query for files
       * @param snapshotStore where snapshots are stored
       * @param callback client callback
       * @param documentSink destination for filtered out file info
       * @param initialCp checkpoint when system initiated, could be {@code null}
       * @param documentSnapshotFactory for un-serializing
       *        {@link DocumentSnapshot} objects.
      public DocumentSnapshotRepositoryMonitor(String name,
          SnapshotRepository<? extends DocumentSnapshot> query,
          SnapshotStore snapshotStore, Callback callback,
          DocumentSink documentSink, MonitorCheckpoint initialCp,
          DocumentSnapshotFactory documentSnapshotFactory) {
        this.name = name;
        this.query = query;
        this.snapshotStore = snapshotStore;
        this.callback = callback;
        this.documentSnapshotFactory = documentSnapshotFactory;
        this.documentSink = documentSink;
        guaranteeCheckpoint = initialCp;


      public void run() {
        // Call NDC.push() via reflection, if possible.
        invoke(ndcPush, "Monitor " + name);
        try {
          while (true) {
            // TODO: Remove items from this monitor that are in queues.
            // Watch out for race conditions. The queues are potentially
            // giving docs to CM as bad things happen in monitor.
            // This TODO would be mitigated by a reconciliation with GSA.
        } catch (InterruptedException ie) {
          LOG.info("Repository Monitor " + name + " received stop signal. " + this);
        } finally {
          // Call NDC.remove() via reflection, if possible.


    private void tryToRunForever() throws InterruptedException {
        try {
          while (true) {
            if (traversalSchedule == null || traversalSchedule.shouldRun()) {
              // Start traversal
            else {
              LOG.finest("Currently out of traversal window. "
                  + "Sleeping for 15 minutes.");
              // TODO(nashi): Calculate when it should wake up while
              // handling TraversalScheduleAware events properly.
        } catch (SnapshotWriterException e) {
          String msg = "Failed to write to snapshot file: " + snapshotWriter.getPath();
          LOG.log(Level.SEVERE, msg, e);
        } catch (SnapshotReaderException e) {
          String msg = "Failed to read snapshot file: " + snapshotReader.getPath();
          LOG.log(Level.SEVERE, msg, e);
        } catch (SnapshotStoreException e) {
          String msg = "Problem with snapshot store.";
          LOG.log(Level.SEVERE, msg, e);
        } catch (SnapshotRepositoryRuntimeException e) {
          String msg = "Failed reading repository.";
          LOG.log(Level.SEVERE, msg, e);

    在doOnePass()方法实现从仓库对象SnapshotRepository中获取数据,并将数据快照持久化到快照文件,并实现相关的数据处理逻辑(判断是新增 删除或更新等,


       * 在doOnePass()方法中生成独立的快照读写器
       * Makes one pass through the repository, notifying {@code visitor} of any
       * changes.
       * @throws InterruptedException
      private void doOnePass() throws SnapshotStoreException,
          InterruptedException {
        try {
          // Open the most recent snapshot and read the first record.
          this.snapshotReader = snapshotStore.openMostRecentSnapshot();
          current = snapshotReader.read();
          // Create an snapshot writer for this pass.
          this.snapshotWriter =
              new OrderedSnapshotWriter(snapshotStore.openNewSnapshotWriter());
          for(DocumentSnapshot ss : query) {
            if (false == isRunning) {
              LOG.log(Level.INFO, "Exiting the monitor thread " + name
                  + " " + this);
              throw new InterruptedException();
            if (Thread.currentThread().isInterrupted()) {
              throw new InterruptedException();
           // Take care of any trailing paths in the snapshot.
        } finally {
          try {
            snapshotStore.close(snapshotReader, snapshotWriter);
          } catch (IOException e) {
            LOG.log(Level.WARNING, "Failed closing snapshot reader and writer.", e);
            // Try to proceed anyway.  Weird they are not closing.
        if (current != null) {
          throw new IllegalStateException(
              "Should not finish pass until entire read snapshot is consumed.");
        if (!callback.hasEnqueuedAtLeastOneChangeThisPass()) {
          // No monitor checkpoints from this pass went to queue because
          // there were no changes, so we can delete the snapshot we just wrote.
          new java.io.File(snapshotWriter.getPath()).delete();
          // TODO: Check return value; log trouble.
        snapshotWriter = null;
        snapshotReader = null;


       * Process snapshot entries as deletes until {@code current} catches up with
       * {@code documentSnapshot}. Or, if {@code documentSnapshot} is {@code null},
       * process all remaining snapshot entries as deletes.
       * @param documentSnapshot where to stop
       * @throws SnapshotReaderException
       * @throws InterruptedException
      private void processDeletes(DocumentSnapshot documentSnapshot)
          throws SnapshotReaderException, InterruptedException {
        while (current != null
            && (documentSnapshot == null
                || COMPARATOR.compare(documentSnapshot, current) > 0)) {
              new DeleteDocumentHandle(current.getDocumentId()), getCheckpoint());
          current = snapshotReader.read();


    private void safelyProcessDocumentSnapshot(DocumentSnapshot snapshot)
          throws InterruptedException, SnapshotReaderException,
          SnapshotWriterException {
        try {
        } catch (RepositoryException re) {
          //TODO Log the exception or its message? in document sink perhaps.
          documentSink.add(snapshot.getDocumentId(), FilterReason.IO_EXCEPTION);


       * Processes a document found in the document repository.
       * @param documentSnapshot
       * @throws RepositoryException
       * @throws InterruptedException
       * @throws SnapshotReaderException
       * @throws SnapshotWriterException
      private void processDocument(DocumentSnapshot documentSnapshot)
          throws InterruptedException, RepositoryException, SnapshotReaderException,
              SnapshotWriterException {
        // At this point 'current' >= 'file', or possibly current == null if
        // we've processed the previous snapshot entirely.
        if (current != null
            && COMPARATOR.compare(documentSnapshot, current) == 0) {
        } else {
          // This file didn't exist during the previous scan.
          DocumentHandle documentHandle  = documentSnapshot.getUpdate(null);     
          // Null if filtered due to mime-type.
          if (documentHandle != null) {
            callback.newDocument(documentHandle, getCheckpoint(-1));


       * Processes a document found in the document repository that also appeared
       * in the previous scan. Determines whether the document has changed,
       * propagates changes to the client and writes the snapshot record.
       * @param documentSnapshot
       * @throws RepositoryException
       * @throws InterruptedException
       * @throws SnapshotWriterException
       * @throws SnapshotReaderException
      private void processPossibleChange(DocumentSnapshot documentSnapshot)
          throws RepositoryException, InterruptedException, SnapshotWriterException,
                 SnapshotReaderException {
        DocumentHandle documentHandle = documentSnapshot.getUpdate(current);
        if (documentHandle == null) {
          // No change.
        } else {
          // Normal change - send the gsa an update.
          callback.changedDocument(documentHandle, getCheckpoint());
        current = snapshotReader.read();




       * 回调接口
       * The client provides an implementation of this interface to receive
       * notification of changes to the repository.
      public static interface Callback {
        public void passBegin() throws InterruptedException;
        public void newDocument(DocumentHandle documentHandle,
            MonitorCheckpoint mcp) throws InterruptedException;
        public void deletedDocument(DocumentHandle documentHandle,
            MonitorCheckpoint mcp) throws InterruptedException;
        public void changedDocument(DocumentHandle documentHandle,
            MonitorCheckpoint mcp) throws InterruptedException;
        public void passComplete(MonitorCheckpoint mcp) throws InterruptedException;
        public boolean hasEnqueuedAtLeastOneChangeThisPass();
        public void passPausing(int sleepms) throws InterruptedException;


       * 回调接口实现:向阻塞队列pendingChanges加入Change元素
       * Adds {@link Change Changes} to this queue.
      private class Callback implements DocumentSnapshotRepositoryMonitor.Callback {
        private int changeCount = 0;
        public void passBegin() {
          changeCount = 0;
          activityLogger.scanBeginAt(new Timestamp(System.currentTimeMillis()));
        /* @Override */
        public void changedDocument(DocumentHandle dh, MonitorCheckpoint mcp)
            throws InterruptedException {
          pendingChanges.put(new Change(Change.FactoryType.CLIENT, dh, mcp));
         /* @Override */
        public void deletedDocument(DocumentHandle dh, MonitorCheckpoint mcp)
            throws InterruptedException {
          pendingChanges.put(new Change(Change.FactoryType.INTERNAL, dh, mcp));
        /* @Override */
        public void newDocument(DocumentHandle dh, MonitorCheckpoint mcp)
            throws InterruptedException {
          pendingChanges.put(new Change(Change.FactoryType.CLIENT, dh, mcp));
        /* @Override */
        public void passComplete(MonitorCheckpoint mcp) throws InterruptedException {
          activityLogger.scanEndAt(new Timestamp(System.currentTimeMillis()));
          if (introduceDelayAfterEveryScan || changeCount == 0) {
        public boolean hasEnqueuedAtLeastOneChangeThisPass() {
          return changeCount > 0;
        /* @Override */
        public void passPausing(int sleepms) throws InterruptedException {



    转载请注明出处 博客园 刺猬的温驯

    本人邮箱: chenying998179@163#com (#改为.)

    本文链接 http://www.cnblogs.com/chenying99/p/3789505.html 

  • 相关阅读:
    LeetCode 252. Meeting Rooms
    LeetCode 161. One Edit Distance
    LeetCode 156. Binary Tree Upside Down
    LeetCode 173. Binary Search Tree Iterator
    LeetCode 285. Inorder Successor in BST
    LeetCode 305. Number of Islands II
    LeetCode 272. Closest Binary Search Tree Value II
    LeetCode 270. Closest Binary Search Tree Value
    LeetCode 329. Longest Increasing Path in a Matrix
    LintCode Subtree
  • 原文地址:https://www.cnblogs.com/chenying99/p/3789505.html
Copyright © 2011-2022 走看看