代码改变世界
[登录 · 注册]
  • 企业搜索引擎开发之连接器connector(二十九)
  • 在哪里调用监控器管理对象snapshotRepositoryMonitorManager的start方法及stop方法,然后又在哪里调用CheckpointAndChangeQueue对象的resume方法获取List<CheckpointAndChange> guaranteedChanges集合

    下面跟踪到DiffingConnectorTraversalManager类的相关方法,在该类实现的方法中,调用了监控器管理对象snapshotRepositoryMonitorManager的相关方法实现对其操作

    private final DocumentSnapshotRepositoryMonitorManager
          snapshotRepositoryMonitorManager;
      private final TraversalContextManager traversalContextManager;
      /**
       * Boolean to mark TraversalManager as invalid.
       * It's possible for Connector Manager to keep a reference to
       * an outdated TraversalManager (after a new one has been given
       * previous TraversalManagers are invalid to use).
       */
      private boolean isActive = true;
    
      /**
       * Creates a {@link DiffingConnectorTraversalManager}.
       *
       * @param snapshotRepositoryMonitorManager the
       *        {@link DocumentSnapshotRepositoryMonitorManager}
       *        for use accessing a {@link ChangeSource}
       * @param traversalContextManager {@link TraversalContextManager}
       *        that holds the current {@link TraversalContext}
       */
      public DiffingConnectorTraversalManager(
          DocumentSnapshotRepositoryMonitorManager snapshotRepositoryMonitorManager,
          TraversalContextManager traversalContextManager) {
        this.snapshotRepositoryMonitorManager = snapshotRepositoryMonitorManager;
        this.traversalContextManager = traversalContextManager;
      }

    resumeTraversal方法启动监视器管理对象snapshotRepositoryMonitorManager,并返回DocumentList集合

    /* @Override */
      public synchronized DocumentList resumeTraversal(String checkpoint)
          throws RepositoryException {
        /* Exhaustive list of method's use:
         resumeTraversal(null) from startTraversal:
           monitors get started from null
         resumeTraversal(null) from Connector Manager sometime after startTraversal:
           monitors already started from previous resumeTraversal call
         resumeTraversal(cp) from Connector Manager without a startTraversal:
           means there was a shutdown or turn off
           monitors get started from cp; should use state
         resumeTraversal(cp) from Connector Manager sometime after some uses:
           is most common case; roll
        */
        if (isActive()) {
            //启动snapshotRepositoryMonitorManager
          if (!snapshotRepositoryMonitorManager.isRunning()) {
            snapshotRepositoryMonitorManager.start(checkpoint);
          }
          return newDocumentList(checkpoint);
        } else {
          throw new RepositoryException(
              "Inactive FileTraversalManager referanced.");
        }
      }

    进一步调用newDocumentList方法返回DocumentList集合

    private DocumentList newDocumentList(String checkpoint)
          throws RepositoryException {
        //获取队列 CheckpointAndChangeQueue(队列 CheckpointAndChangeQueue只由snapshotRepositoryMonitorManager引用)
        CheckpointAndChangeQueue checkpointAndChangeQueue =
            snapshotRepositoryMonitorManager.getCheckpointAndChangeQueue();
    
        try {
          DiffingConnectorDocumentList documentList = new DiffingConnectorDocumentList(
              checkpointAndChangeQueue,
              CheckpointAndChangeQueue.initializeCheckpointStringIfNull(
                  checkpoint));
          //Map<String, MonitorCheckpoint>
          Map<String, MonitorCheckpoint> guaranteesMade =
              checkpointAndChangeQueue.getMonitorRestartPoints();
          
          snapshotRepositoryMonitorManager.acceptGuarantees(guaranteesMade);
    
          return new ConfirmActiveDocumentList(documentList);
        } catch (IOException e) {
          throw new RepositoryException("Failure when making DocumentList.", e);
        }
      }

    DiffingConnectorDocumentList documentList对象的构造函数里面封装了CheckpointAndChangeQueue checkpointAndChangeQueue队列集合

    DiffingConnectorDocumentList 类完整实现如下:

    /**
     * An implementation of {@link DocumentList} for the {@link DiffingConnector}.
     *
     * @since 2.8
     */
    public class DiffingConnectorDocumentList implements DocumentList {
      private final Iterator<CheckpointAndChange> checkpointAndChangeIterator;
      private String checkpoint;
    
      /**
       * Creates a document list that returns a batch of documents from the provided
       * {@link CheckpointAndChangeQueue}.
       *
       * @param queue a CheckpointAndChangeQueue containing document changes
       * @param checkpoint point into the change queue after which to start
       *        returning documents
       * @throws IOException if persisting fails
       */
      public DiffingConnectorDocumentList(CheckpointAndChangeQueue queue,
          String checkpoint) throws IOException {
          //CheckpointAndChangeQueue queued的resume方法获取List<CheckpointAndChange>
          //本DocumentList批次数据已经加载于内存
        List<CheckpointAndChange> guaranteedChanges = queue.resume(checkpoint);
        checkpointAndChangeIterator = guaranteedChanges.iterator();
        this.checkpoint = checkpoint;
      }
      
      /**
       * 调用方获取该状态并持久化,迭代完毕即为最后的checkpoint
       */
      /* @Override */
      public String checkpoint() {
        return checkpoint;
      }
    
      /* @Override */
      public Document nextDocument() throws RepositoryException {
        if (checkpointAndChangeIterator.hasNext()) {
          CheckpointAndChange checkpointAndChange =
            checkpointAndChangeIterator.next();
          //更新checkpoint
          checkpoint = checkpointAndChange.getCheckpoint().toString();
          return checkpointAndChange.getChange().getDocumentHandle().getDocument();
        } else {
          return null;
        }
      }
    }

    在其构造方法中调用参数CheckpointAndChangeQueue queue的resume方法获取List<CheckpointAndChange> guaranteedChanges,在其nextDocument()方法中通过迭代获取CheckpointAndChange checkpointAndChange对象,同时更新checkpoint状态标识

    最后获取与监视器关联的MonitorCheckpoint对象映射

    //Map<String, MonitorCheckpoint>
          Map<String, MonitorCheckpoint> guaranteesMade =
              checkpointAndChangeQueue.getMonitorRestartPoints();

    然后调用监控器管理对象snapshotRepositoryMonitorManager的acceptGuarantees方法,相应的监视器对象接收并确认MonitorCheckpoint对象

     /**
       * 监视器管理对象收到CheckpointAndChangeQueue对象反馈,分发给对应的监视器处理MonitorCheckpoint
       */
      /* @Override */
      public void acceptGuarantees(Map<String, MonitorCheckpoint> guarantees) {
        for (Map.Entry<String, MonitorCheckpoint> entry : guarantees.entrySet()) {
          String monitorName = entry.getKey();
          MonitorCheckpoint checkpoint = entry.getValue();
          DocumentSnapshotRepositoryMonitor monitor = fileSystemMonitorsByName.get(monitorName);
          if (monitor != null) {
            // Signal is asynch.  Let monitor figure out how to use.
              //回调
            monitor.acceptGuarantee(checkpoint);
          }
        }
      }

    与仓库对象相对应的具体监视器接收确认

    /**
       * 监视器收到反馈 [MonitorCheckpoint接收确认]
       * @param cp
       */
      // Public for DocumentSnapshotRepositoryMonitorTest
      @VisibleForTesting
      public void acceptGuarantee(MonitorCheckpoint cp) {
        snapshotStore.acceptGuarantee(cp);
        guaranteeCheckpoint = cp;
      }

    仓库对应的存储对象处于处理链的末端

    /**
       * 反馈MonitorCheckpoint处理
       * @param cp
       */
      void acceptGuarantee(MonitorCheckpoint cp) {
        long readSnapshotNumber = cp.getSnapshotNumber();
        if (readSnapshotNumber < 0) {
          throw new IllegalArgumentException("Received invalid snapshot in: " + cp);
        }
        if (oldestSnapshotToKeep > readSnapshotNumber) {
          LOG.warning("Received an older snapshot than " + oldestSnapshotToKeep + ": " + cp);
        } else {
          oldestSnapshotToKeep = readSnapshotNumber;
        }
      }

    ---------------------------------------------------------------------------

    本系列企业搜索引擎开发之连接器connector系本人原创

    转载请注明出处 博客园 刺猬的温驯

    本人邮箱: chenying998179@163#com (#改为.)

    本文链接 http://www.cnblogs.com/chenying99/p/3789650.html 

  • 上一篇:企业搜索引擎开发之连接器connector(三十)
    下一篇:solr&lucene3.6.0源码解析(四)
  • 【推广】 阿里云小站-上云优惠聚集地(新老客户同享)更有每天限时秒杀!
    【推广】 云服务器低至0.95折 1核2G ECS云服务器8.1元/月
    【推广】 阿里云老用户升级四重礼遇享6.5折限时折扣!
  • 原文:https://www.cnblogs.com/chenying99/p/3789650.html
走看看 - 开发者的网上家园