zoukankan      html  css  js  c++  java
  • 企业搜索引擎开发之连接器connector(二十六)

    连接器通过监视器对象DocumentSnapshotRepositoryMonitor从上文提到的仓库对象SnapshotRepository(数据库仓库为DBSnapshotRepository)中迭代获取数据

    监视器类DocumentSnapshotRepositoryMonitor在其构造方法初始化相关成员变量,这些成员属性都是与数据获取及数据处理逻辑相关的对象

     /** This connector instance's current traversal schedule. */
      private volatile TraversalSchedule traversalSchedule;
    
      /** Directory that contains snapshots. */
      private final SnapshotStore snapshotStore;
    
      /** The root of the repository to monitor */
      private final SnapshotRepository<? extends DocumentSnapshot> query;
    
      /** Reader for the current snapshot. */
      private SnapshotReader snapshotReader;
    
      /** Callback to invoke when a change is detected. */
      private final Callback callback;
    
      /** Current record from the snapshot. */
      private DocumentSnapshot current;
    
      /** The snapshot we are currently writing */
      private OrderedSnapshotWriter snapshotWriter;
    
      private final String name;
    
      private final DocumentSnapshotFactory documentSnapshotFactory;
    
      private final DocumentSink documentSink;
    
      /* Contains a checkpoint confirmation from CM. */
      private MonitorCheckpoint guaranteeCheckpoint;
    
      /* The monitor should exit voluntarily if set to false */
      private volatile boolean isRunning = true;
    
      /**
       * Creates a DocumentSnapshotRepositoryMonitor that monitors the
       * Repository rooted at {@code root}.
       *
       * @param name the name of this monitor (a hash of the start path)
       * @param query query for files
       * @param snapshotStore where snapshots are stored
       * @param callback client callback
       * @param documentSink destination for filtered out file info
       * @param initialCp checkpoint when system initiated, could be {@code null}
       * @param documentSnapshotFactory for un-serializing
       *        {@link DocumentSnapshot} objects.
       */
      public DocumentSnapshotRepositoryMonitor(String name,
          SnapshotRepository<? extends DocumentSnapshot> query,
          SnapshotStore snapshotStore, Callback callback,
          DocumentSink documentSink, MonitorCheckpoint initialCp,
          DocumentSnapshotFactory documentSnapshotFactory) {
        this.name = name;
        this.query = query;
        this.snapshotStore = snapshotStore;
        this.callback = callback;
        this.documentSnapshotFactory = documentSnapshotFactory;
        this.documentSink = documentSink;
        guaranteeCheckpoint = initialCp;
      }

    同时实现了Runnable接口,在override的run方法里面实现数据的处理逻辑

    @Override
      public void run() {
        // Call NDC.push() via reflection, if possible.
        invoke(ndcPush, "Monitor " + name);
        try {
          while (true) {
            tryToRunForever();
            // TODO: Remove items from this monitor that are in queues.
            // Watch out for race conditions. The queues are potentially
            // giving docs to CM as bad things happen in monitor.
            // This TODO would be mitigated by a reconciliation with GSA.
            performExceptionRecovery();
          }
        } catch (InterruptedException ie) {
          LOG.info("Repository Monitor " + name + " received stop signal. " + this);
        } finally {
          // Call NDC.remove() via reflection, if possible.
          invoke(ndcRemove);
        }
      }

    进一步调用tryToRunForever()方法

    private void tryToRunForever() throws InterruptedException {
        try {
          while (true) {
            if (traversalSchedule == null || traversalSchedule.shouldRun()) {
              // Start traversal
              doOnePass();
            }
            else {
              LOG.finest("Currently out of traversal window. "
                  + "Sleeping for 15 minutes.");
              // TODO(nashi): Calculate when it should wake up while
              // handling TraversalScheduleAware events properly.
              //没到点,休息
              callback.passPausing(15*60*1000);
            }
          }
        } catch (SnapshotWriterException e) {
          String msg = "Failed to write to snapshot file: " + snapshotWriter.getPath();
          LOG.log(Level.SEVERE, msg, e);
        } catch (SnapshotReaderException e) {
          String msg = "Failed to read snapshot file: " + snapshotReader.getPath();
          LOG.log(Level.SEVERE, msg, e);
        } catch (SnapshotStoreException e) {
          String msg = "Problem with snapshot store.";
          LOG.log(Level.SEVERE, msg, e);
        } catch (SnapshotRepositoryRuntimeException e) {
          String msg = "Failed reading repository.";
          LOG.log(Level.SEVERE, msg, e);
        }
      }

    在doOnePass()方法实现从仓库对象SnapshotRepository中获取数据,并将数据快照持久化到快照文件,并实现相关的数据处理逻辑(判断是新增 删除或更新等,

    这些数据最后通过回调Callback接口添加到ChangeQueue对象中的阻塞队列)

    /**
       * 在doOnePass()方法中生成独立的快照读写器
       * Makes one pass through the repository, notifying {@code visitor} of any
       * changes.
       *
       * @throws InterruptedException
       */
      private void doOnePass() throws SnapshotStoreException,
          InterruptedException {
        callback.passBegin();
        try {
            //快照读取器
          // Open the most recent snapshot and read the first record.
          this.snapshotReader = snapshotStore.openMostRecentSnapshot();
          current = snapshotReader.read();
           //快照写入器
          // Create an snapshot writer for this pass.
          this.snapshotWriter =
              new OrderedSnapshotWriter(snapshotStore.openNewSnapshotWriter());
          //下面代码为从仓库里面获取数据
          for(DocumentSnapshot ss : query) {
              //检查是否停止
            if (false == isRunning) {
              LOG.log(Level.INFO, "Exiting the monitor thread " + name
                  + " " + this);
              throw new InterruptedException();
            }
    
            if (Thread.currentThread().isInterrupted()) {
              throw new InterruptedException();
            }
            processDeletes(ss);
            safelyProcessDocumentSnapshot(ss);
          }
          
           //迭代完数据后,删除快照读取器后面多出来的部分(考虑数据源删除了后面的数据)
           // Take care of any trailing paths in the snapshot.
          processDeletes(null);
    
        } finally {
          try {
            snapshotStore.close(snapshotReader, snapshotWriter);
          } catch (IOException e) {
            LOG.log(Level.WARNING, "Failed closing snapshot reader and writer.", e);
            // Try to proceed anyway.  Weird they are not closing.
          }
        }
        if (current != null) {
          throw new IllegalStateException(
              "Should not finish pass until entire read snapshot is consumed.");
        }
        //完工了,休息
        callback.passComplete(getCheckpoint(-1));
        snapshotStore.deleteOldSnapshots();
        if (!callback.hasEnqueuedAtLeastOneChangeThisPass()) {
          // No monitor checkpoints from this pass went to queue because
          // there were no changes, so we can delete the snapshot we just wrote.
          new java.io.File(snapshotWriter.getPath()).delete();
          // TODO: Check return value; log trouble.
        }
        snapshotWriter = null;
        snapshotReader = null;
      }

    processDeletes方法实现数据删除逻辑的处理

    /**
       * Process snapshot entries as deletes until {@code current} catches up with
       * {@code documentSnapshot}. Or, if {@code documentSnapshot} is {@code null},
       * process all remaining snapshot entries as deletes.
       *
       * @param documentSnapshot where to stop
       * @throws SnapshotReaderException
       * @throws InterruptedException
       */
      private void processDeletes(DocumentSnapshot documentSnapshot)
          throws SnapshotReaderException, InterruptedException {
          //参数documentSnapshot大于当前current的,则删除当前的current;然后继续迭代快照里面下一个documentSnapshot
        while (current != null
            && (documentSnapshot == null
                || COMPARATOR.compare(documentSnapshot, current) > 0)) {
          callback.deletedDocument(
              new DeleteDocumentHandle(current.getDocumentId()), getCheckpoint());
          current = snapshotReader.read();
        }
      }

    下面跟踪safelyProcessDocumentSnapshot方法

    private void safelyProcessDocumentSnapshot(DocumentSnapshot snapshot)
          throws InterruptedException, SnapshotReaderException,
          SnapshotWriterException {
        try {
          processDocument(snapshot);
        } catch (RepositoryException re) {
          //TODO Log the exception or its message? in document sink perhaps.
            //处理异常的snapshot
          documentSink.add(snapshot.getDocumentId(), FilterReason.IO_EXCEPTION);
        }
      }

    进一步调用processDocument方法,里面包括更新和新增数据的处理逻辑

    /**
       * Processes a document found in the document repository.
       *
       * @param documentSnapshot
       * @throws RepositoryException
       * @throws InterruptedException
       * @throws SnapshotReaderException
       * @throws SnapshotWriterException
       */
      private void processDocument(DocumentSnapshot documentSnapshot)
          throws InterruptedException, RepositoryException, SnapshotReaderException,
              SnapshotWriterException {
        // At this point 'current' >= 'file', or possibly current == null if
        // we've processed the previous snapshot entirely.
        if (current != null
            && COMPARATOR.compare(documentSnapshot, current) == 0) {
            //处理发生变化的documentSnapshot,并更新当前的documentSnapshot
          processPossibleChange(documentSnapshot);
        } else {
          // This file didn't exist during the previous scan.
            //不存在该documentSnapshot
          DocumentHandle documentHandle  = documentSnapshot.getUpdate(null);     
          snapshotWriter.write(documentSnapshot);
    
          // Null if filtered due to mime-type.
          if (documentHandle != null) {
            callback.newDocument(documentHandle, getCheckpoint(-1));
          }
        }
      }

    处理更新情况

    /**
       * Processes a document found in the document repository that also appeared
       * in the previous scan. Determines whether the document has changed,
       * propagates changes to the client and writes the snapshot record.
       *
       * @param documentSnapshot
       * @throws RepositoryException
       * @throws InterruptedException
       * @throws SnapshotWriterException
       * @throws SnapshotReaderException
       */
      private void processPossibleChange(DocumentSnapshot documentSnapshot)
          throws RepositoryException, InterruptedException, SnapshotWriterException,
                 SnapshotReaderException {
          //大概是对比hash值
        DocumentHandle documentHandle = documentSnapshot.getUpdate(current);
        //写入快照文件
        snapshotWriter.write(documentSnapshot);
        if (documentHandle == null) {
          // No change.
            //如果未发生改变,则不发送
        } else {
          // Normal change - send the gsa an update.
          callback.changedDocument(documentHandle, getCheckpoint());
        }
        current = snapshotReader.read();
      }

    更新数据的快照和新增数据的快照首先持久化到最新的快照文件

    数据提交通过回调callback成员的相关方法,最后将数据提交到ChangeQueue队列对象

    Callback接口定义了数据处理的相关方法

    /**
       * 回调接口
       * The client provides an implementation of this interface to receive
       * notification of changes to the repository.
       */
      public static interface Callback {
        public void passBegin() throws InterruptedException;
        
        public void newDocument(DocumentHandle documentHandle,
            MonitorCheckpoint mcp) throws InterruptedException;
    
        public void deletedDocument(DocumentHandle documentHandle,
            MonitorCheckpoint mcp) throws InterruptedException;
    
        public void changedDocument(DocumentHandle documentHandle,
            MonitorCheckpoint mcp) throws InterruptedException;
    
        public void passComplete(MonitorCheckpoint mcp) throws InterruptedException;
    
        public boolean hasEnqueuedAtLeastOneChangeThisPass();
    
        public void passPausing(int sleepms) throws InterruptedException;
      }

    在ChangeQueue队列类内部定义了内部类Callback,实现了该接口,在其实现方法里面将提交的数据添加到ChangeQueue队列类的成员阻塞队列之中 

    /**
       * 回调接口实现:向阻塞队列pendingChanges加入Change元素
       * Adds {@link Change Changes} to this queue.
       */
      private class Callback implements DocumentSnapshotRepositoryMonitor.Callback {
        private int changeCount = 0;
    
        public void passBegin() {
          changeCount = 0;
          activityLogger.scanBeginAt(new Timestamp(System.currentTimeMillis()));
        }
    
        /* @Override */
        public void changedDocument(DocumentHandle dh, MonitorCheckpoint mcp)
            throws InterruptedException {
          ++changeCount;
          pendingChanges.put(new Change(Change.FactoryType.CLIENT, dh, mcp));
          activityLogger.gotChangedDocument(dh.getDocumentId());
        }
    
         /* @Override */
        public void deletedDocument(DocumentHandle dh, MonitorCheckpoint mcp)
            throws InterruptedException {
          ++changeCount;
          pendingChanges.put(new Change(Change.FactoryType.INTERNAL, dh, mcp));
          activityLogger.gotDeletedDocument(dh.getDocumentId());
        }
    
        /* @Override */
        public void newDocument(DocumentHandle dh, MonitorCheckpoint mcp)
            throws InterruptedException {
          ++changeCount;
          pendingChanges.put(new Change(Change.FactoryType.CLIENT, dh, mcp));
          activityLogger.gotNewDocument(dh.getDocumentId());
        }
    
        /* @Override */
        public void passComplete(MonitorCheckpoint mcp) throws InterruptedException {
          activityLogger.scanEndAt(new Timestamp(System.currentTimeMillis()));
          if (introduceDelayAfterEveryScan || changeCount == 0) {
            Thread.sleep(sleepInterval);
          }
        }
    
        public boolean hasEnqueuedAtLeastOneChangeThisPass() {
          return changeCount > 0;
        }
    
        /* @Override */
        public void passPausing(int sleepms) throws InterruptedException {
          Thread.sleep(sleepms);
        }
      }

    ---------------------------------------------------------------------------

    本系列企业搜索引擎开发之连接器connector系本人原创

    转载请注明出处 博客园 刺猬的温驯

    本人邮箱: chenying998179@163#com (#改为.)

    本文链接 http://www.cnblogs.com/chenying99/p/3789505.html 

  • 相关阅读:
    WPF应用程序的生命周期
    问题解决(一)在ipad上通过safari浏览文档
    C#网络编程之TCP协议(一)
    认识webMethods
    sql存储过程与webMethods
    Start to study Introduction to Algorithms
    堆和栈的区别 (转贴)
    win7平台下使用MASMPlus搭建汇编环境
    makefile
    struct类型声明的疑问
  • 原文地址:https://www.cnblogs.com/chenying99/p/3789505.html
Copyright © 2011-2022 走看看