从上文中的QueryTraverser对象的BatchResult runBatch(BatchSize batchSize)方法上溯到CancelableBatch类,该类实现了TimedCancelable接口,后者又extends了TimedCancelable接口,后者又extends了Cancelable接口,后者又extends了Runnable接口
Cancelable接口源码
/** * A {@link Runnable} that supports cancellation. */ public interface Cancelable extends Runnable { /** * Cancel the operation performed by this {@link Runnable}. * While this {@link Runnable#run} method is running in one thread this * may be called in another so implementors must provide any needed * synchronization. */ public void cancel(); }
TimedCancelable接口源码
/** * A {@link Runnable} that supports cancellation and timeout. */ public interface TimedCancelable extends Cancelable { /** * Complete the operations performed by this {@link Runnable} due to the * expiration of its time interval. While this {@link Runnable#run} method is * running in one thread this may be called in another so implementors must * provide any needed synchronization. */ public void timeout(TaskHandle taskHandle); }
下面才是CancelableBatch类,实现了TimedCancelable接口
/** * A {@link TimedCancelable} for running a {@link Connector} batch using * a {@link Traverser} */ class CancelableBatch implements TimedCancelable { private static final Logger LOGGER = Logger.getLogger(CancelableBatch.class.getName()); final Traverser traverser; final String traverserName; final BatchResultRecorder batchResultRecorder; final BatchTimeout batchTimeout; final BatchSize batchSize; /** * Construct a {@link CancelableBatch}. * * @param traverser {@link Traverser} for running the batch. * @param traverserName traverser name for logging purposes. * @param batchResultRecorder {@link BatchResultRecorder} for recording * the result of running the batch. * @param batchSize hint and constraints as to the number of documents * to process in the batch. */ public CancelableBatch(Traverser traverser, String traverserName, BatchResultRecorder batchResultRecorder, BatchTimeout batchTimeout, BatchSize batchSize) { this.traverser = traverser; this.traverserName = traverserName; this.batchResultRecorder = batchResultRecorder; this.batchSize = batchSize; this.batchTimeout = batchTimeout; } /** * 取消执行 */ public void cancel() { traverser.cancelBatch(); } /** * 运行超时 */ public void timeout(TaskHandle taskHandle) { batchTimeout.timeout(); } public void run() { NDC.push("Traverse " + traverserName); try { LOGGER.fine("Begin runBatch; traverserName = " + traverserName + " " + batchSize); BatchResult batchResult = traverser.runBatch(batchSize); LOGGER.fine("Traverser " + traverserName + " batchDone with result = " + batchResult); batchResultRecorder.recordResult(batchResult); } finally { NDC.remove(); } } @Override public String toString() { return "CancelableBatch: traverser = " + traverser + ", batchSize = " + batchSize; } }
在上面的run方法里面,调用了BatchResult batchResult = traverser.runBatch(batchSize);方法,用于获取数据源数据并向服务器推送数据
另外两方法注意下,后面会用到
/** * 取消执行 */ public void cancel() { traverser.cancelBatch(); } /** * 运行超时 */ public void timeout(TaskHandle taskHandle) { batchTimeout.timeout(); }
综上所述,CancelableBatch是一个实现了Runnable接口的线程类,姑且如是说
继续上溯到ConnectorCoordinatorImpl类,该类实现了ConnectorCoordinator接口,该接口声明了一个startBatch()方法
/** * Starts running a batch for this {@link ConnectorCoordinator} if a batch is * not already running. * * @return true if this call started a batch * @throws ConnectorNotFoundException if this {@link ConnectorCoordinator} * does not exist. */ public boolean startBatch() throws ConnectorNotFoundException;
首先需要明白的是,一个连接器实例对应一个ConnectorCoordinatorImpl实例对象,ConnectorCoordinatorImpl类实在庞大,我们先分析startBatch()方法源码
/** * 开始采集 * Starts running a batch for this {@link ConnectorCoordinator} if a batch is * not already running. * * @return true if this call started a batch */ /* @Override */ public synchronized boolean startBatch() { if (!shouldRun()) { return false; } BatchSize batchSize = loadManager.determineBatchSize(); if (batchSize.getHint() == 0) { return false; } try { TraversalManager traversalManager = getTraversalManager(); if (traversalManager == null) { return false; } //当前标识 currentBatchKey = new Object(); BatchCoordinator batchCoordinator = new BatchCoordinator(this); //batchCoordinator 作为 TraversalStateStore stateStore角色 Traverser traverser = new QueryTraverser(pusherFactory, traversalManager, batchCoordinator, name, Context.getInstance().getTraversalContext(), clock); //batchCoordinator 作为 BatchResultRecorder batchResultRecorder, BatchTimeout batchTimeout角色 //调用Traverser traverser的取消方法 //BatchResultRecorder batchResultRecorder记录运行结果;[不能由外部调用] //BatchTimeout batchTimeout的超时方法 TimedCancelable batch = new CancelableBatch(traverser, name, batchCoordinator, batchCoordinator, batchSize); taskHandle = threadPool.submit(batch); //threadPool.shutdown(interrupt, waitMillis) //taskHandle.cancel(); return true; } catch (ConnectorNotFoundException cnfe) { LOGGER.log(Level.WARNING, "Connector not found - this is normal if you " + " recently reconfigured your connector instance: " + cnfe); } catch (InstantiatorException ie) { LOGGER.log(Level.WARNING, "Failed to perform connector content traversal.", ie); delayTraversal(TraversalDelayPolicy.ERROR); } return false; }
我们可以看到,在该方法里面首先构造QueryTraverser对象(需要构造引用PusherFactory pusherFactory、TraversalManager traversalManager、TraversalStateStore stateStore实例),然后构造CancelableBatch对象(构造函数传入QueryTraverser对象和BatchSize batchSize对象*批次尺寸),最后线程池对象提交CancelableBatch对象执行(到现在我们知道,一次线程执行只执行批次尺寸的数据采集,而并不一定是全部数据)
这里的BatchCoordinator batchCoordinator = new BatchCoordinator(this)对象在上面方法中充当不同的角色,即该对象实现了不同的接口
其构造函数传入了当前对象,即ConnectorCoordinatorImpl connectorCoordinator实例对象
在BatchCoordinator batchCoordinator对象实现不同接口的实现方法里面,基本上都是回调ConnectorCoordinatorImpl connectorCoordinator实例对象的方法,采用这种迂回的回马枪策略,大概是出于职责分明考虑吧,另外可能需要用到ConnectorCoordinatorImpl connectorCoordinator实例对象的状态
基本上BatchCoordinator batchCoordinator对象实现的方法在ConnectorCoordinatorImpl connectorCoordinator实例对象里面都要实现,这里设计方法采用的是一种包装器模式、或者是代理模式
可以猜想到,BatchCoordinator batchCoordinator对象实现了的接口实际上ConnectorCoordinatorImpl connectorCoordinator实例对象名义上甚至实际上也同样实现了(而类ConnectorCoordinatorImpl 实现的接口BatchCoordinator并不一定要实现)
我们先睹为快,不先做瞎猜了
BatchCoordinator类实现
class BatchCoordinator implements TraversalStateStore, BatchResultRecorder, BatchTimeout
ConnectorCoordinatorImpl类实现
class ConnectorCoordinatorImpl implements ConnectorCoordinator, ChangeHandler, BatchResultRecorder
貌似BatchCoordinator类实现的接口之中,只有BatchResultRecorder接口ConnectorCoordinatorImpl类名义上也实现了
下面逐一分析
BatchCoordinator batchCoordinator = new BatchCoordinator(this); //batchCoordinator 作为 TraversalStateStore stateStore角色 Traverser traverser = new QueryTraverser(pusherFactory, traversalManager, batchCoordinator, name, Context.getInstance().getTraversalContext(), clock);
这里batchCoordinator 作为 TraversalStateStore stateStore角色,实现raversalStateStore接口方法为:
public String getTraversalState() { synchronized (connectorCoordinator) { if (connectorCoordinator.currentBatchKey == requiredBatchKey) { return cachedState; } else { throw new BatchCompletedException(); } } } public void storeTraversalState(String state) { synchronized (connectorCoordinator) { // Make sure our batch is still valid and that nobody has modified // the checkpoint while we were away. try { if ((connectorCoordinator.currentBatchKey == requiredBatchKey) && isCheckpointUnmodified()) { connectorCoordinator.setConnectorState(state); cachedState = state; } else { throw new BatchCompletedException(); } } catch (ConnectorNotFoundException cnfe) { // Connector disappeared while we were away. // Don't try to store results. throw new BatchCompletedException(); } } }
上面方法分别为获取断点状态和更新断点状态,需要考虑同步问题,两者都回调了ConnectorCoordinatorImpl connectorCoordinator对象的方法
获取状态
/** * Returns the Connector's traversal state. * * @return String representation of the stored state, or * null if no state is stored. * @throws ConnectorNotFoundException if this {@link ConnectorCoordinator} * does not exist. */ /* @Override */ public synchronized String getConnectorState() throws ConnectorNotFoundException { return getInstanceInfo().getConnectorState(); }
更新状态
/** * Set the Connector's traversal state. * * @param state a String representation of the state to store. * If null, any previous stored state is discarded. * @throws ConnectorNotFoundException if this {@link ConnectorCoordinator} * does not exist. */ /* @Override */ public synchronized void setConnectorState(String state) throws ConnectorNotFoundException { getInstanceInfo().setConnectorState(state); // Must not call ChangeDetector, as this is called from a synchronized // block in BatchCoordinator. }
接下来分析BatchCoordinator batchCoordinator对象充当的其他角色
//batchCoordinator 作为 BatchResultRecorder batchResultRecorder, BatchTimeout batchTimeout角色 //调用Traverser traverser的取消方法 //BatchResultRecorder batchResultRecorder记录运行结果;[不能由外部调用] //BatchTimeout batchTimeout的超时方法 TimedCancelable batch = new CancelableBatch(traverser, name, batchCoordinator, batchCoordinator, batchSize);
前者作为BatchResultRecorder batchResultRecorder角色,后者作为BatchTimeout batchTimeout角色
实现BatchResultRecorder接口的方法为
public void recordResult(BatchResult result) { synchronized (connectorCoordinator) { if (connectorCoordinator.currentBatchKey == requiredBatchKey) { connectorCoordinator.recordResult(result); } else { LOGGER.fine("Ignoring a BatchResult returned from a " + "prevously canceled traversal batch. Connector = " + connectorCoordinator.getConnectorName() + " result = " + result + " batchKey = " + requiredBatchKey); } } }
里面进一步回调了ConnectorCoordinatorImpl connectorCoordinator对象如下方法
/** * Records the supplied traversal batch results. Updates the * {@link LoadManager} with number of documents traversed, * and implements the requested {@link TraversalDelayPolicy}. * * @param result a BatchResult */ /* @Override */ public synchronized void recordResult(BatchResult result) { loadManager.recordResult(result); delayTraversal(result.getDelayPolicy()); }
记录执行结果及决定延迟策略
这里调用的方法名一致,我们再前面已经看到,两者都实现了BatchResultRecorder接口
实现BatchTimeout接口方法为
public void timeout() { synchronized (connectorCoordinator) { if (connectorCoordinator.currentBatchKey == requiredBatchKey) { connectorCoordinator.resetBatch(); } else { LOGGER.warning("Ignoring Timeout for previously prevously canceled" + " or completed traversal batch. Connector = " + connectorCoordinator.getConnectorName() + " batchKey = "+ requiredBatchKey); } } }
回调ConnectorCoordinatorImpl connectorCoordinator对象如下方法(重置采集)
/** * 取消采集 * Halts any in-progess traversals for this {@link Connector} instance. * Some or all of the information collected during the current traversal * may be discarded. */ synchronized void resetBatch() { if (taskHandle != null) { taskHandle.cancel(); } taskHandle = null; currentBatchKey = null; interfaces = null; // Discard cached interface instances. traversalManager = null; retriever = null; traversalSchedule = null; }
前两者分别用于状态管理和记录执行结果及延迟策略,第三者用于取消采集(里面调用了taskHandle.cancel()方法)
执行序列为
TimedCancelable类型对象(CancelableBatch对象)的timeout(TaskHandle taskHandle)方法-->
BatchTimeout类型对象(即BatchCoordinator batchCoordinator)的batchTimeout.timeout()方法-->
ConnectorCoordinatorImpl connectorCoordinator对象的resetBatch()方法-->
TaskHandle taskHandle的taskHandle.cancel()方法-->Cancelable类型对象(CancelableBatch对象)的cancel()方法-->Traverser traverser的 cancelBatch()方法
即CancelableBatch对象的timeout(TaskHandle taskHandle)方法绕来绕去最终接上了自己的cancel()方法
从下文中我们可以看到,这样处理的目的在于当一个线程超时时,由另一个监控超时的线程执行取消操作;在正常情况下,该执行序列将不会发生
TaskHandle taskHandle是一个任务执行句柄,用于对执行过程进行操控
/** * Handle for the management of a {@link Cancelable} primary task. */ public class TaskHandle { /** * The primary {@link Cancelable} that is run by this task to * perform some useful work. */ final Cancelable cancelable; /* * The {@link future} for the primary task. */ final Future<?> taskFuture; /* * The time the task starts. */ final long startTime; /** * Create a TaskHandle. * * @param cancelable {@link Cancelable} for the primary task. * @param taskFuture {@link Future} for the primary task. * @param startTime startTime for the primary task. */ TaskHandle(Cancelable cancelable, Future<?> taskFuture, long startTime) { this.cancelable = cancelable; this.taskFuture = taskFuture; this.startTime = startTime; } /** * Cancel the primary task and the time out task. */ public void cancel() { cancelable.cancel(); taskFuture.cancel(true); } /** * Return true if the primary task has completed. */ public boolean isDone() { return taskFuture.isDone(); } }
---------------------------------------------------------------------------
本系列企业搜索引擎开发之连接器connector系本人原创
转载请注明出处 博客园 刺猬的温驯
本人邮箱: chenying998179@163#com (#改为.)