通常一个SnapshotRepository仓库对象对应一个DocumentSnapshotRepositoryMonitor监视器对象,同时也对应一个快照存储器对象,它们的关联是通过监视器管理对象DocumentSnapshotRepositoryMonitorManagerImpl实现的
DocumentSnapshotRepositoryMonitorManagerImpl类要实现那些行为,先查看其实现接口DocumentSnapshotRepositoryMonitorManager定义的方法规范
/** * Management interface to {@link DocumentSnapshotRepositoryMonitor} threads. * * @since 2.8 */ public interface DocumentSnapshotRepositoryMonitorManager { /** * Ensures all monitor threads are running. * * @param checkpoint for the last completed document or null if none have * been completed. * @throws RepositoryException */ void start(String checkpoint) throws RepositoryException; /** * Stops all the configured {@link DocumentSnapshotRepositoryMonitor} threads. */ void stop(); /** * Removes persisted state for {@link DocumentSnapshotRepositoryMonitor} * threads. After calling this {@link DocumentSnapshotRepositoryMonitor} * threads will no longer be able to resume from where they left off last * time. */ void clean(); /** * Returns the number of {@link DocumentSnapshotRepositoryMonitor} threads * that are alive. This method is for testing purposes. */ int getThreadCount(); /** * Returns the {@link CheckpointAndChangeQueue} for this * {@link DocumentSnapshotRepositoryMonitorManager} */ CheckpointAndChangeQueue getCheckpointAndChangeQueue(); /** Returns whether we are after a start() call and before a stop(). */ boolean isRunning(); /** * Receives information specifying what is guaranteed to be delivered to GSA. * Every entry in passed in Map is a monitor name and MonitorCheckpoint. * The monitor of that name can expect that all documents before and including * document related with MonitorCheckpoint will be delivered to GSA. * This information is for the convenience and efficiency of the Monitor so * that it knows how many changes it has to resend. It's valid for a monitor * to ignore these updates if it feels like it for some good reason. * FileConnectorSystemMonitor instances use this information to trim their * file system snapshots. */ void acceptGuarantees(Map<String, MonitorCheckpoint> guarantees); /** * Receives {@link TraversalSchedule} from TraversalManager which is * {@link TraversalScheduleAware}. */ void setTraversalSchedule(TraversalSchedule traversalSchedule); }
然后再来看DocumentSnapshotRepositoryMonitorManagerImpl类怎么实现上述接口中定义的行为
先来了解相关属性及如何初始化它们的
private volatile TraversalSchedule traversalSchedule; //监控器线程 private final List<Thread> threads = Collections.synchronizedList(new ArrayList<Thread>()); //监控器映射容器 private final Map<String, DocumentSnapshotRepositoryMonitor> fileSystemMonitorsByName = Collections.synchronizedMap(new HashMap<String, DocumentSnapshotRepositoryMonitor>()); private boolean isRunning = false; // Monitor threads start in off state. private final List<? extends SnapshotRepository<? extends DocumentSnapshot>> repositories; private final File snapshotDir; private final ChecksumGenerator checksumGenerator; //CheckpointAndChange对象容器(List) private final CheckpointAndChangeQueue checkpointAndChangeQueue; //Change对象容器(阻塞队列) private final ChangeQueue changeQueue; private final DocumentSnapshotFactory documentSnapshotFactory; /** * Constructs {@link DocumentSnapshotRepositoryMonitorManagerImpl} * for the {@link DiffingConnector}. * * @param repositories a {@code List} of {@link SnapshotRepository * SnapshotRepositorys} * @param documentSnapshotFactory a {@link DocumentSnapshotFactory} * @param snapshotDir directory to store {@link SnapshotRepository} * @param checksumGenerator a {@link ChecksumGenerator} used to * detect changes in a document's content * @param changeQueue a {@link ChangeQueue} * @param checkpointAndChangeQueue a * {@link CheckpointAndChangeQueue} */ public DocumentSnapshotRepositoryMonitorManagerImpl( List<? extends SnapshotRepository< ? extends DocumentSnapshot>> repositories, DocumentSnapshotFactory documentSnapshotFactory, File snapshotDir, ChecksumGenerator checksumGenerator, ChangeQueue changeQueue, CheckpointAndChangeQueue checkpointAndChangeQueue) { this.repositories = repositories; this.documentSnapshotFactory = documentSnapshotFactory; this.snapshotDir = snapshotDir; this.checksumGenerator = checksumGenerator; this.changeQueue = changeQueue; this.checkpointAndChangeQueue = checkpointAndChangeQueue; }
下面我们再来看它的start方法,在该方法中,主要动作为分别为调用checkpointAndChangeQueue对象的start方法,初始化各个仓库对象相关联的快照存储对象SnapshotStore,最后是启动各个仓库对象的监控器实例
/** * 启动方法 */ /** Go from "cold" to "warm" including CheckpointAndChangeQueue. */ public void start(String connectorManagerCheckpoint) throws RepositoryException { try { //启动 获取Change(主要动作:从json格式队列文件加载monitorPoints和checkpointAndChangeList队列) checkpointAndChangeQueue.start(connectorManagerCheckpoint); } catch (IOException e) { throw new RepositoryException("Failed starting CheckpointAndChangeQueue.", e); } //MonitorCheckpoint容器 Map<String, MonitorCheckpoint> monitorPoints = checkpointAndChangeQueue.getMonitorRestartPoints(); Map<String, SnapshotStore> snapshotStores = null; //加载monitorName与SnapshotStore映射容器 try { snapshotStores = recoverSnapshotStores(connectorManagerCheckpoint, monitorPoints); } catch (SnapshotStoreException e) { throw new RepositoryException("Snapshot recovery failed.", e); } catch (IOException e) { throw new RepositoryException("Snapshot recovery failed.", e); } catch (InterruptedException e) { throw new RepositoryException("Snapshot recovery interrupted.", e); } //启动监控线程 startMonitorThreads(snapshotStores, monitorPoints); isRunning = true; }
在初始化每个仓库对象的快照存储对象SnapshotStore时,同时传入相关联的MonitorCheckPoint对象实例,必要时修复快照文件
/* For each start path gets its monitor recovery files in state were monitor * can be started. */ /** * 加载monitorName与SnapshotStore映射容器 * @param connectorManagerCheckpoint * @param monitorPoints * @return * @throws IOException * @throws SnapshotStoreException * @throws InterruptedException */ private Map<String, SnapshotStore> recoverSnapshotStores( String connectorManagerCheckpoint, Map<String, MonitorCheckpoint> monitorPoints) throws IOException, SnapshotStoreException, InterruptedException { Map<String, SnapshotStore> snapshotStores = new HashMap<String, SnapshotStore>(); for (SnapshotRepository<? extends DocumentSnapshot> repository : repositories) { String monitorName = makeMonitorNameFromStartPath(repository.getName()); File dir = new File(snapshotDir, monitorName); boolean startEmpty = (connectorManagerCheckpoint == null) || (!monitorPoints.containsKey(monitorName)); if (startEmpty) { LOG.info("Deleting " + repository.getName() + " global checkpoint=" + connectorManagerCheckpoint + " monitor checkpoint=" + monitorPoints.get(monitorName)); //删除该快照目录 delete(dir); } else { //修复该快照目录 SnapshotStore.stitch(dir, monitorPoints.get(monitorName), documentSnapshotFactory); } SnapshotStore snapshotStore = new SnapshotStore(dir, documentSnapshotFactory); snapshotStores.put(monitorName, snapshotStore); } return snapshotStores; }
下面继续跟踪启动监控器线程的方法
/** * 启动监控线程(貌似MonitorCheckpoint与SnapshotStore与monitor有映射关系) * Creates a {@link DocumentSnapshotRepositoryMonitor} thread for each * startPath. * * @throws RepositoryDocumentException if any of the threads cannot be * started. */ private void startMonitorThreads(Map<String, SnapshotStore> snapshotStores, Map<String, MonitorCheckpoint> monitorPoints) throws RepositoryDocumentException { for (SnapshotRepository<? extends DocumentSnapshot> repository : repositories) { String monitorName = makeMonitorNameFromStartPath(repository.getName()); //monitorName snapshotStores映射 //快照存储器(读写器) SnapshotStore snapshotStore = snapshotStores.get(monitorName); //创建监控线程 Thread monitorThread = newMonitorThread(repository, snapshotStore, monitorPoints.get(monitorName)); threads.add(monitorThread); LOG.info("starting monitor for <" + repository.getName() + ">"); monitorThread.setName(repository.getName()); monitorThread.setDaemon(true); monitorThread.start(); } }
监控器对象的创建在下面的方法
/** * 创建监控线程 * Creates a {@link DocumentSnapshotRepositoryMonitor} thread for the provided * folder. * * @throws RepositoryDocumentException if {@code startPath} is not readable, * or if there is any problem reading or writing snapshots. */ private Thread newMonitorThread( SnapshotRepository<? extends DocumentSnapshot> repository, SnapshotStore snapshotStore, MonitorCheckpoint startCp) throws RepositoryDocumentException { //注意monitorName String monitorName = makeMonitorNameFromStartPath(repository.getName()); //document在监控线程里面处理 DocumentSnapshotRepositoryMonitor monitor = new DocumentSnapshotRepositoryMonitor(monitorName, repository, snapshotStore, changeQueue.newCallback(), DOCUMENT_SINK, startCp, documentSnapshotFactory); monitor.setTraversalSchedule(traversalSchedule); LOG.fine("Adding a new monitor for " + monitorName + ": " + monitor); fileSystemMonitorsByName.put(monitorName, monitor); return new Thread(monitor); }
stop方法实现监控器线程的停止
/** * 停止监控器 */ private void flagAllMonitorsToStop() { for (SnapshotRepository<? extends DocumentSnapshot> repository : repositories) { String monitorName = makeMonitorNameFromStartPath(repository.getName()); DocumentSnapshotRepositoryMonitor monitor = fileSystemMonitorsByName.get(monitorName); if (null != monitor) { monitor.shutdown(); } else { LOG.fine("Unable to stop non existent monitor thread for " + monitorName); } } } /** * 停止监控器线程 */ /* @Override */ public synchronized void stop() { for (Thread thread : threads) { thread.interrupt(); } for (Thread thread : threads) { try { thread.join(MAX_SHUTDOWN_MS); if (thread.isAlive()) { LOG.warning("failed to stop background thread: " + thread.getName()); } } catch (InterruptedException e) { // Mark this thread as interrupted so it can be dealt with later. Thread.currentThread().interrupt(); } } threads.clear(); /* in case thread.interrupt doesn't stop monitors */ flagAllMonitorsToStop(); fileSystemMonitorsByName.clear(); changeQueue.clear(); this.isRunning = false; }
在flagAllMonitorsToStop()方法中调用监控器对象的monitor.shutdown()方法,设置监控器对象 的标识属性
/* The monitor should exit voluntarily if set to false */ private volatile boolean isRunning = true;
---------------------------------------------------------------------------
本系列企业搜索引擎开发之连接器connector系本人原创
转载请注明出处 博客园 刺猬的温驯
本人邮箱: chenying998179@163#com (#改为.)