连接器通过监视器对象DocumentSnapshotRepositoryMonitor从上文提到的仓库对象SnapshotRepository(数据库仓库为DBSnapshotRepository)中迭代获取数据
监视器类DocumentSnapshotRepositoryMonitor在其构造方法初始化相关成员变量,这些成员属性都是与数据获取及数据处理逻辑相关的对象
/** This connector instance's current traversal schedule. */ private volatile TraversalSchedule traversalSchedule; /** Directory that contains snapshots. */ private final SnapshotStore snapshotStore; /** The root of the repository to monitor */ private final SnapshotRepository<? extends DocumentSnapshot> query; /** Reader for the current snapshot. */ private SnapshotReader snapshotReader; /** Callback to invoke when a change is detected. */ private final Callback callback; /** Current record from the snapshot. */ private DocumentSnapshot current; /** The snapshot we are currently writing */ private OrderedSnapshotWriter snapshotWriter; private final String name; private final DocumentSnapshotFactory documentSnapshotFactory; private final DocumentSink documentSink; /* Contains a checkpoint confirmation from CM. */ private MonitorCheckpoint guaranteeCheckpoint; /* The monitor should exit voluntarily if set to false */ private volatile boolean isRunning = true; /** * Creates a DocumentSnapshotRepositoryMonitor that monitors the * Repository rooted at {@code root}. * * @param name the name of this monitor (a hash of the start path) * @param query query for files * @param snapshotStore where snapshots are stored * @param callback client callback * @param documentSink destination for filtered out file info * @param initialCp checkpoint when system initiated, could be {@code null} * @param documentSnapshotFactory for un-serializing * {@link DocumentSnapshot} objects. */ public DocumentSnapshotRepositoryMonitor(String name, SnapshotRepository<? extends DocumentSnapshot> query, SnapshotStore snapshotStore, Callback callback, DocumentSink documentSink, MonitorCheckpoint initialCp, DocumentSnapshotFactory documentSnapshotFactory) { this.name = name; this.query = query; this.snapshotStore = snapshotStore; this.callback = callback; this.documentSnapshotFactory = documentSnapshotFactory; this.documentSink = documentSink; guaranteeCheckpoint = initialCp; }
同时实现了Runnable接口,在override的run方法里面实现数据的处理逻辑
@Override public void run() { // Call NDC.push() via reflection, if possible. invoke(ndcPush, "Monitor " + name); try { while (true) { tryToRunForever(); // TODO: Remove items from this monitor that are in queues. // Watch out for race conditions. The queues are potentially // giving docs to CM as bad things happen in monitor. // This TODO would be mitigated by a reconciliation with GSA. performExceptionRecovery(); } } catch (InterruptedException ie) { LOG.info("Repository Monitor " + name + " received stop signal. " + this); } finally { // Call NDC.remove() via reflection, if possible. invoke(ndcRemove); } }
进一步调用tryToRunForever()方法
private void tryToRunForever() throws InterruptedException { try { while (true) { if (traversalSchedule == null || traversalSchedule.shouldRun()) { // Start traversal doOnePass(); } else { LOG.finest("Currently out of traversal window. " + "Sleeping for 15 minutes."); // TODO(nashi): Calculate when it should wake up while // handling TraversalScheduleAware events properly. //没到点,休息 callback.passPausing(15*60*1000); } } } catch (SnapshotWriterException e) { String msg = "Failed to write to snapshot file: " + snapshotWriter.getPath(); LOG.log(Level.SEVERE, msg, e); } catch (SnapshotReaderException e) { String msg = "Failed to read snapshot file: " + snapshotReader.getPath(); LOG.log(Level.SEVERE, msg, e); } catch (SnapshotStoreException e) { String msg = "Problem with snapshot store."; LOG.log(Level.SEVERE, msg, e); } catch (SnapshotRepositoryRuntimeException e) { String msg = "Failed reading repository."; LOG.log(Level.SEVERE, msg, e); } }
在doOnePass()方法实现从仓库对象SnapshotRepository中获取数据,并将数据快照持久化到快照文件,并实现相关的数据处理逻辑(判断是新增 删除或更新等,
这些数据最后通过回调Callback接口添加到ChangeQueue对象中的阻塞队列)
/** * 在doOnePass()方法中生成独立的快照读写器 * Makes one pass through the repository, notifying {@code visitor} of any * changes. * * @throws InterruptedException */ private void doOnePass() throws SnapshotStoreException, InterruptedException { callback.passBegin(); try { //快照读取器 // Open the most recent snapshot and read the first record. this.snapshotReader = snapshotStore.openMostRecentSnapshot(); current = snapshotReader.read(); //快照写入器 // Create an snapshot writer for this pass. this.snapshotWriter = new OrderedSnapshotWriter(snapshotStore.openNewSnapshotWriter()); //下面代码为从仓库里面获取数据 for(DocumentSnapshot ss : query) { //检查是否停止 if (false == isRunning) { LOG.log(Level.INFO, "Exiting the monitor thread " + name + " " + this); throw new InterruptedException(); } if (Thread.currentThread().isInterrupted()) { throw new InterruptedException(); } processDeletes(ss); safelyProcessDocumentSnapshot(ss); } //迭代完数据后,删除快照读取器后面多出来的部分(考虑数据源删除了后面的数据) // Take care of any trailing paths in the snapshot. processDeletes(null); } finally { try { snapshotStore.close(snapshotReader, snapshotWriter); } catch (IOException e) { LOG.log(Level.WARNING, "Failed closing snapshot reader and writer.", e); // Try to proceed anyway. Weird they are not closing. } } if (current != null) { throw new IllegalStateException( "Should not finish pass until entire read snapshot is consumed."); } //完工了,休息 callback.passComplete(getCheckpoint(-1)); snapshotStore.deleteOldSnapshots(); if (!callback.hasEnqueuedAtLeastOneChangeThisPass()) { // No monitor checkpoints from this pass went to queue because // there were no changes, so we can delete the snapshot we just wrote. new java.io.File(snapshotWriter.getPath()).delete(); // TODO: Check return value; log trouble. } snapshotWriter = null; snapshotReader = null; }
processDeletes方法实现数据删除逻辑的处理
/** * Process snapshot entries as deletes until {@code current} catches up with * {@code documentSnapshot}. Or, if {@code documentSnapshot} is {@code null}, * process all remaining snapshot entries as deletes. * * @param documentSnapshot where to stop * @throws SnapshotReaderException * @throws InterruptedException */ private void processDeletes(DocumentSnapshot documentSnapshot) throws SnapshotReaderException, InterruptedException { //参数documentSnapshot大于当前current的,则删除当前的current;然后继续迭代快照里面下一个documentSnapshot while (current != null && (documentSnapshot == null || COMPARATOR.compare(documentSnapshot, current) > 0)) { callback.deletedDocument( new DeleteDocumentHandle(current.getDocumentId()), getCheckpoint()); current = snapshotReader.read(); } }
下面跟踪safelyProcessDocumentSnapshot方法
private void safelyProcessDocumentSnapshot(DocumentSnapshot snapshot) throws InterruptedException, SnapshotReaderException, SnapshotWriterException { try { processDocument(snapshot); } catch (RepositoryException re) { //TODO Log the exception or its message? in document sink perhaps. //处理异常的snapshot documentSink.add(snapshot.getDocumentId(), FilterReason.IO_EXCEPTION); } }
进一步调用processDocument方法,里面包括更新和新增数据的处理逻辑
/** * Processes a document found in the document repository. * * @param documentSnapshot * @throws RepositoryException * @throws InterruptedException * @throws SnapshotReaderException * @throws SnapshotWriterException */ private void processDocument(DocumentSnapshot documentSnapshot) throws InterruptedException, RepositoryException, SnapshotReaderException, SnapshotWriterException { // At this point 'current' >= 'file', or possibly current == null if // we've processed the previous snapshot entirely. if (current != null && COMPARATOR.compare(documentSnapshot, current) == 0) { //处理发生变化的documentSnapshot,并更新当前的documentSnapshot processPossibleChange(documentSnapshot); } else { // This file didn't exist during the previous scan. //不存在该documentSnapshot DocumentHandle documentHandle = documentSnapshot.getUpdate(null); snapshotWriter.write(documentSnapshot); // Null if filtered due to mime-type. if (documentHandle != null) { callback.newDocument(documentHandle, getCheckpoint(-1)); } } }
处理更新情况
/** * Processes a document found in the document repository that also appeared * in the previous scan. Determines whether the document has changed, * propagates changes to the client and writes the snapshot record. * * @param documentSnapshot * @throws RepositoryException * @throws InterruptedException * @throws SnapshotWriterException * @throws SnapshotReaderException */ private void processPossibleChange(DocumentSnapshot documentSnapshot) throws RepositoryException, InterruptedException, SnapshotWriterException, SnapshotReaderException { //大概是对比hash值 DocumentHandle documentHandle = documentSnapshot.getUpdate(current); //写入快照文件 snapshotWriter.write(documentSnapshot); if (documentHandle == null) { // No change. //如果未发生改变,则不发送 } else { // Normal change - send the gsa an update. callback.changedDocument(documentHandle, getCheckpoint()); } current = snapshotReader.read(); }
更新数据的快照和新增数据的快照首先持久化到最新的快照文件
数据提交通过回调callback成员的相关方法,最后将数据提交到ChangeQueue队列对象
Callback接口定义了数据处理的相关方法
/** * 回调接口 * The client provides an implementation of this interface to receive * notification of changes to the repository. */ public static interface Callback { public void passBegin() throws InterruptedException; public void newDocument(DocumentHandle documentHandle, MonitorCheckpoint mcp) throws InterruptedException; public void deletedDocument(DocumentHandle documentHandle, MonitorCheckpoint mcp) throws InterruptedException; public void changedDocument(DocumentHandle documentHandle, MonitorCheckpoint mcp) throws InterruptedException; public void passComplete(MonitorCheckpoint mcp) throws InterruptedException; public boolean hasEnqueuedAtLeastOneChangeThisPass(); public void passPausing(int sleepms) throws InterruptedException; }
在ChangeQueue队列类内部定义了内部类Callback,实现了该接口,在其实现方法里面将提交的数据添加到ChangeQueue队列类的成员阻塞队列之中
/** * 回调接口实现:向阻塞队列pendingChanges加入Change元素 * Adds {@link Change Changes} to this queue. */ private class Callback implements DocumentSnapshotRepositoryMonitor.Callback { private int changeCount = 0; public void passBegin() { changeCount = 0; activityLogger.scanBeginAt(new Timestamp(System.currentTimeMillis())); } /* @Override */ public void changedDocument(DocumentHandle dh, MonitorCheckpoint mcp) throws InterruptedException { ++changeCount; pendingChanges.put(new Change(Change.FactoryType.CLIENT, dh, mcp)); activityLogger.gotChangedDocument(dh.getDocumentId()); } /* @Override */ public void deletedDocument(DocumentHandle dh, MonitorCheckpoint mcp) throws InterruptedException { ++changeCount; pendingChanges.put(new Change(Change.FactoryType.INTERNAL, dh, mcp)); activityLogger.gotDeletedDocument(dh.getDocumentId()); } /* @Override */ public void newDocument(DocumentHandle dh, MonitorCheckpoint mcp) throws InterruptedException { ++changeCount; pendingChanges.put(new Change(Change.FactoryType.CLIENT, dh, mcp)); activityLogger.gotNewDocument(dh.getDocumentId()); } /* @Override */ public void passComplete(MonitorCheckpoint mcp) throws InterruptedException { activityLogger.scanEndAt(new Timestamp(System.currentTimeMillis())); if (introduceDelayAfterEveryScan || changeCount == 0) { Thread.sleep(sleepInterval); } } public boolean hasEnqueuedAtLeastOneChangeThisPass() { return changeCount > 0; } /* @Override */ public void passPausing(int sleepms) throws InterruptedException { Thread.sleep(sleepms); } }
---------------------------------------------------------------------------
本系列企业搜索引擎开发之连接器connector系本人原创
转载请注明出处 博客园 刺猬的温驯
本人邮箱: chenying998179@163#com (#改为.)