zoukankan      html  css  js  c++  java
  • 企业搜索引擎开发之连接器connector(二十六)

    连接器通过监视器对象DocumentSnapshotRepositoryMonitor从上文提到的仓库对象SnapshotRepository(数据库仓库为DBSnapshotRepository)中迭代获取数据

    监视器类DocumentSnapshotRepositoryMonitor在其构造方法初始化相关成员变量,这些成员属性都是与数据获取及数据处理逻辑相关的对象

     /** This connector instance's current traversal schedule. */
      private volatile TraversalSchedule traversalSchedule;
    
      /** Directory that contains snapshots. */
      private final SnapshotStore snapshotStore;
    
      /** The root of the repository to monitor */
      private final SnapshotRepository<? extends DocumentSnapshot> query;
    
      /** Reader for the current snapshot. */
      private SnapshotReader snapshotReader;
    
      /** Callback to invoke when a change is detected. */
      private final Callback callback;
    
      /** Current record from the snapshot. */
      private DocumentSnapshot current;
    
      /** The snapshot we are currently writing */
      private OrderedSnapshotWriter snapshotWriter;
    
      private final String name;
    
      private final DocumentSnapshotFactory documentSnapshotFactory;
    
      private final DocumentSink documentSink;
    
      /* Contains a checkpoint confirmation from CM. */
      private MonitorCheckpoint guaranteeCheckpoint;
    
      /* The monitor should exit voluntarily if set to false */
      private volatile boolean isRunning = true;
    
      /**
       * Creates a DocumentSnapshotRepositoryMonitor that monitors the
       * Repository rooted at {@code root}.
       *
       * @param name the name of this monitor (a hash of the start path)
       * @param query query for files
       * @param snapshotStore where snapshots are stored
       * @param callback client callback
       * @param documentSink destination for filtered out file info
       * @param initialCp checkpoint when system initiated, could be {@code null}
       * @param documentSnapshotFactory for un-serializing
       *        {@link DocumentSnapshot} objects.
       */
      public DocumentSnapshotRepositoryMonitor(String name,
          SnapshotRepository<? extends DocumentSnapshot> query,
          SnapshotStore snapshotStore, Callback callback,
          DocumentSink documentSink, MonitorCheckpoint initialCp,
          DocumentSnapshotFactory documentSnapshotFactory) {
        this.name = name;
        this.query = query;
        this.snapshotStore = snapshotStore;
        this.callback = callback;
        this.documentSnapshotFactory = documentSnapshotFactory;
        this.documentSink = documentSink;
        guaranteeCheckpoint = initialCp;
      }

    同时实现了Runnable接口,在override的run方法里面实现数据的处理逻辑

    @Override
      public void run() {
        // Call NDC.push() via reflection, if possible.
        invoke(ndcPush, "Monitor " + name);
        try {
          while (true) {
            tryToRunForever();
            // TODO: Remove items from this monitor that are in queues.
            // Watch out for race conditions. The queues are potentially
            // giving docs to CM as bad things happen in monitor.
            // This TODO would be mitigated by a reconciliation with GSA.
            performExceptionRecovery();
          }
        } catch (InterruptedException ie) {
          LOG.info("Repository Monitor " + name + " received stop signal. " + this);
        } finally {
          // Call NDC.remove() via reflection, if possible.
          invoke(ndcRemove);
        }
      }

    进一步调用tryToRunForever()方法

    private void tryToRunForever() throws InterruptedException {
        try {
          while (true) {
            if (traversalSchedule == null || traversalSchedule.shouldRun()) {
              // Start traversal
              doOnePass();
            }
            else {
              LOG.finest("Currently out of traversal window. "
                  + "Sleeping for 15 minutes.");
              // TODO(nashi): Calculate when it should wake up while
              // handling TraversalScheduleAware events properly.
              //没到点,休息
              callback.passPausing(15*60*1000);
            }
          }
        } catch (SnapshotWriterException e) {
          String msg = "Failed to write to snapshot file: " + snapshotWriter.getPath();
          LOG.log(Level.SEVERE, msg, e);
        } catch (SnapshotReaderException e) {
          String msg = "Failed to read snapshot file: " + snapshotReader.getPath();
          LOG.log(Level.SEVERE, msg, e);
        } catch (SnapshotStoreException e) {
          String msg = "Problem with snapshot store.";
          LOG.log(Level.SEVERE, msg, e);
        } catch (SnapshotRepositoryRuntimeException e) {
          String msg = "Failed reading repository.";
          LOG.log(Level.SEVERE, msg, e);
        }
      }

    在doOnePass()方法实现从仓库对象SnapshotRepository中获取数据,并将数据快照持久化到快照文件,并实现相关的数据处理逻辑(判断是新增 删除或更新等,

    这些数据最后通过回调Callback接口添加到ChangeQueue对象中的阻塞队列)

    /**
       * 在doOnePass()方法中生成独立的快照读写器
       * Makes one pass through the repository, notifying {@code visitor} of any
       * changes.
       *
       * @throws InterruptedException
       */
      private void doOnePass() throws SnapshotStoreException,
          InterruptedException {
        callback.passBegin();
        try {
            //快照读取器
          // Open the most recent snapshot and read the first record.
          this.snapshotReader = snapshotStore.openMostRecentSnapshot();
          current = snapshotReader.read();
           //快照写入器
          // Create an snapshot writer for this pass.
          this.snapshotWriter =
              new OrderedSnapshotWriter(snapshotStore.openNewSnapshotWriter());
          //下面代码为从仓库里面获取数据
          for(DocumentSnapshot ss : query) {
              //检查是否停止
            if (false == isRunning) {
              LOG.log(Level.INFO, "Exiting the monitor thread " + name
                  + " " + this);
              throw new InterruptedException();
            }
    
            if (Thread.currentThread().isInterrupted()) {
              throw new InterruptedException();
            }
            processDeletes(ss);
            safelyProcessDocumentSnapshot(ss);
          }
          
           //迭代完数据后,删除快照读取器后面多出来的部分(考虑数据源删除了后面的数据)
           // Take care of any trailing paths in the snapshot.
          processDeletes(null);
    
        } finally {
          try {
            snapshotStore.close(snapshotReader, snapshotWriter);
          } catch (IOException e) {
            LOG.log(Level.WARNING, "Failed closing snapshot reader and writer.", e);
            // Try to proceed anyway.  Weird they are not closing.
          }
        }
        if (current != null) {
          throw new IllegalStateException(
              "Should not finish pass until entire read snapshot is consumed.");
        }
        //完工了,休息
        callback.passComplete(getCheckpoint(-1));
        snapshotStore.deleteOldSnapshots();
        if (!callback.hasEnqueuedAtLeastOneChangeThisPass()) {
          // No monitor checkpoints from this pass went to queue because
          // there were no changes, so we can delete the snapshot we just wrote.
          new java.io.File(snapshotWriter.getPath()).delete();
          // TODO: Check return value; log trouble.
        }
        snapshotWriter = null;
        snapshotReader = null;
      }

    processDeletes方法实现数据删除逻辑的处理

    /**
       * Process snapshot entries as deletes until {@code current} catches up with
       * {@code documentSnapshot}. Or, if {@code documentSnapshot} is {@code null},
       * process all remaining snapshot entries as deletes.
       *
       * @param documentSnapshot where to stop
       * @throws SnapshotReaderException
       * @throws InterruptedException
       */
      private void processDeletes(DocumentSnapshot documentSnapshot)
          throws SnapshotReaderException, InterruptedException {
          //参数documentSnapshot大于当前current的,则删除当前的current;然后继续迭代快照里面下一个documentSnapshot
        while (current != null
            && (documentSnapshot == null
                || COMPARATOR.compare(documentSnapshot, current) > 0)) {
          callback.deletedDocument(
              new DeleteDocumentHandle(current.getDocumentId()), getCheckpoint());
          current = snapshotReader.read();
        }
      }

    下面跟踪safelyProcessDocumentSnapshot方法

    private void safelyProcessDocumentSnapshot(DocumentSnapshot snapshot)
          throws InterruptedException, SnapshotReaderException,
          SnapshotWriterException {
        try {
          processDocument(snapshot);
        } catch (RepositoryException re) {
          //TODO Log the exception or its message? in document sink perhaps.
            //处理异常的snapshot
          documentSink.add(snapshot.getDocumentId(), FilterReason.IO_EXCEPTION);
        }
      }

    进一步调用processDocument方法,里面包括更新和新增数据的处理逻辑

    /**
       * Processes a document found in the document repository.
       *
       * @param documentSnapshot
       * @throws RepositoryException
       * @throws InterruptedException
       * @throws SnapshotReaderException
       * @throws SnapshotWriterException
       */
      private void processDocument(DocumentSnapshot documentSnapshot)
          throws InterruptedException, RepositoryException, SnapshotReaderException,
              SnapshotWriterException {
        // At this point 'current' >= 'file', or possibly current == null if
        // we've processed the previous snapshot entirely.
        if (current != null
            && COMPARATOR.compare(documentSnapshot, current) == 0) {
            //处理发生变化的documentSnapshot,并更新当前的documentSnapshot
          processPossibleChange(documentSnapshot);
        } else {
          // This file didn't exist during the previous scan.
            //不存在该documentSnapshot
          DocumentHandle documentHandle  = documentSnapshot.getUpdate(null);     
          snapshotWriter.write(documentSnapshot);
    
          // Null if filtered due to mime-type.
          if (documentHandle != null) {
            callback.newDocument(documentHandle, getCheckpoint(-1));
          }
        }
      }

    处理更新情况

    /**
       * Processes a document found in the document repository that also appeared
       * in the previous scan. Determines whether the document has changed,
       * propagates changes to the client and writes the snapshot record.
       *
       * @param documentSnapshot
       * @throws RepositoryException
       * @throws InterruptedException
       * @throws SnapshotWriterException
       * @throws SnapshotReaderException
       */
      private void processPossibleChange(DocumentSnapshot documentSnapshot)
          throws RepositoryException, InterruptedException, SnapshotWriterException,
                 SnapshotReaderException {
          //大概是对比hash值
        DocumentHandle documentHandle = documentSnapshot.getUpdate(current);
        //写入快照文件
        snapshotWriter.write(documentSnapshot);
        if (documentHandle == null) {
          // No change.
            //如果未发生改变,则不发送
        } else {
          // Normal change - send the gsa an update.
          callback.changedDocument(documentHandle, getCheckpoint());
        }
        current = snapshotReader.read();
      }

    更新数据的快照和新增数据的快照首先持久化到最新的快照文件

    数据提交通过回调callback成员的相关方法,最后将数据提交到ChangeQueue队列对象

    Callback接口定义了数据处理的相关方法

    /**
       * 回调接口
       * The client provides an implementation of this interface to receive
       * notification of changes to the repository.
       */
      public static interface Callback {
        public void passBegin() throws InterruptedException;
        
        public void newDocument(DocumentHandle documentHandle,
            MonitorCheckpoint mcp) throws InterruptedException;
    
        public void deletedDocument(DocumentHandle documentHandle,
            MonitorCheckpoint mcp) throws InterruptedException;
    
        public void changedDocument(DocumentHandle documentHandle,
            MonitorCheckpoint mcp) throws InterruptedException;
    
        public void passComplete(MonitorCheckpoint mcp) throws InterruptedException;
    
        public boolean hasEnqueuedAtLeastOneChangeThisPass();
    
        public void passPausing(int sleepms) throws InterruptedException;
      }

    在ChangeQueue队列类内部定义了内部类Callback,实现了该接口,在其实现方法里面将提交的数据添加到ChangeQueue队列类的成员阻塞队列之中 

    /**
       * 回调接口实现:向阻塞队列pendingChanges加入Change元素
       * Adds {@link Change Changes} to this queue.
       */
      private class Callback implements DocumentSnapshotRepositoryMonitor.Callback {
        private int changeCount = 0;
    
        public void passBegin() {
          changeCount = 0;
          activityLogger.scanBeginAt(new Timestamp(System.currentTimeMillis()));
        }
    
        /* @Override */
        public void changedDocument(DocumentHandle dh, MonitorCheckpoint mcp)
            throws InterruptedException {
          ++changeCount;
          pendingChanges.put(new Change(Change.FactoryType.CLIENT, dh, mcp));
          activityLogger.gotChangedDocument(dh.getDocumentId());
        }
    
         /* @Override */
        public void deletedDocument(DocumentHandle dh, MonitorCheckpoint mcp)
            throws InterruptedException {
          ++changeCount;
          pendingChanges.put(new Change(Change.FactoryType.INTERNAL, dh, mcp));
          activityLogger.gotDeletedDocument(dh.getDocumentId());
        }
    
        /* @Override */
        public void newDocument(DocumentHandle dh, MonitorCheckpoint mcp)
            throws InterruptedException {
          ++changeCount;
          pendingChanges.put(new Change(Change.FactoryType.CLIENT, dh, mcp));
          activityLogger.gotNewDocument(dh.getDocumentId());
        }
    
        /* @Override */
        public void passComplete(MonitorCheckpoint mcp) throws InterruptedException {
          activityLogger.scanEndAt(new Timestamp(System.currentTimeMillis()));
          if (introduceDelayAfterEveryScan || changeCount == 0) {
            Thread.sleep(sleepInterval);
          }
        }
    
        public boolean hasEnqueuedAtLeastOneChangeThisPass() {
          return changeCount > 0;
        }
    
        /* @Override */
        public void passPausing(int sleepms) throws InterruptedException {
          Thread.sleep(sleepms);
        }
      }

    ---------------------------------------------------------------------------

    本系列企业搜索引擎开发之连接器connector系本人原创

    转载请注明出处 博客园 刺猬的温驯

    本人邮箱: chenying998179@163#com (#改为.)

    本文链接 http://www.cnblogs.com/chenying99/p/3789505.html 

  • 相关阅读:
    LeetCode 252. Meeting Rooms
    LeetCode 161. One Edit Distance
    LeetCode 156. Binary Tree Upside Down
    LeetCode 173. Binary Search Tree Iterator
    LeetCode 285. Inorder Successor in BST
    LeetCode 305. Number of Islands II
    LeetCode 272. Closest Binary Search Tree Value II
    LeetCode 270. Closest Binary Search Tree Value
    LeetCode 329. Longest Increasing Path in a Matrix
    LintCode Subtree
  • 原文地址:https://www.cnblogs.com/chenying99/p/3789505.html
Copyright © 2011-2022 走看看