zoukankan      html  css  js  c++  java
  • 企业搜索引擎开发之连接器connector(二十一)

    从上文中的QueryTraverser对象的BatchResult runBatch(BatchSize batchSize)方法上溯到CancelableBatch类,该类实现了TimedCancelable接口,后者又extends了TimedCancelable接口,后者又extends了Cancelable接口,后者又extends了Runnable接口

    Cancelable接口源码

    /**
     * A {@link Runnable} that supports cancellation.
     */
    public interface Cancelable extends Runnable {
      /**
       * Cancel the operation performed by this {@link Runnable}.
       * While this {@link Runnable#run} method is running in one thread this
       * may be called in another so implementors must provide any needed
       * synchronization.
       */
      public void cancel();
    }

    TimedCancelable接口源码

    /**
     * A {@link Runnable} that supports cancellation and timeout.
     */
    public interface TimedCancelable extends Cancelable {
      /**
       * Complete the operations performed by this {@link Runnable} due to the
       * expiration of its time interval. While this {@link Runnable#run} method is
       * running in one thread this may be called in another so implementors must
       * provide any needed synchronization.
       */
      public void timeout(TaskHandle taskHandle);
    }

    下面才是CancelableBatch类,实现了TimedCancelable接口

    /**
     * A {@link TimedCancelable} for running a {@link Connector} batch using
     * a {@link Traverser}
     */
    class CancelableBatch implements TimedCancelable {
      private static final Logger LOGGER =
        Logger.getLogger(CancelableBatch.class.getName());
    
      final Traverser traverser;
      final String traverserName;
      final BatchResultRecorder batchResultRecorder;
      final BatchTimeout batchTimeout;
      final BatchSize batchSize;
    
      /**
       * Construct a {@link CancelableBatch}.
       *
       * @param traverser {@link Traverser} for running the batch.
       * @param traverserName traverser name for logging purposes.
       * @param batchResultRecorder {@link BatchResultRecorder} for recording
       *        the result of running the batch.
       * @param batchSize hint and constraints as to the number of documents
       *        to process in the batch.
       */
      public CancelableBatch(Traverser traverser, String traverserName,
          BatchResultRecorder batchResultRecorder, BatchTimeout batchTimeout,
          BatchSize batchSize) {
        this.traverser = traverser;
        this.traverserName = traverserName;
        this.batchResultRecorder = batchResultRecorder;
        this.batchSize = batchSize;
        this.batchTimeout = batchTimeout;
      }
      
      /**
       * 取消执行
       */
      public void cancel() {
       traverser.cancelBatch();
      }
      
      /**
       * 运行超时
       */
      public void timeout(TaskHandle taskHandle) {
         batchTimeout.timeout();
      }
    
      public void run() {
        NDC.push("Traverse " + traverserName);
        try {
          LOGGER.fine("Begin runBatch; traverserName = " + traverserName
              + "  " + batchSize);
          BatchResult batchResult = traverser.runBatch(batchSize);
          LOGGER.fine("Traverser " + traverserName + " batchDone with result = "
              + batchResult);
          batchResultRecorder.recordResult(batchResult);
        } finally {
          NDC.remove();
        }
      }
    
      @Override
      public String toString() {
        return "CancelableBatch: traverser = " + traverser
               + ", batchSize = " + batchSize;
      }
    }

    在上面的run方法里面,调用了BatchResult batchResult = traverser.runBatch(batchSize);方法,用于获取数据源数据并向服务器推送数据

    另外两方法注意下,后面会用到

    /**
       * 取消执行
       */
      public void cancel() {
       traverser.cancelBatch();
      }
      
      /**
       * 运行超时
       */
      public void timeout(TaskHandle taskHandle) {
         batchTimeout.timeout();
      }

    综上所述,CancelableBatch是一个实现了Runnable接口的线程类,姑且如是说

    继续上溯到ConnectorCoordinatorImpl类,该类实现了ConnectorCoordinator接口,该接口声明了一个startBatch()方法

    /**
       * Starts running a batch for this {@link ConnectorCoordinator} if a batch is
       * not already running.
       *
       * @return true if this call started a batch
       * @throws ConnectorNotFoundException if this {@link ConnectorCoordinator}
       *         does not exist.
       */
      public boolean startBatch() throws ConnectorNotFoundException;

    首先需要明白的是,一个连接器实例对应一个ConnectorCoordinatorImpl实例对象,ConnectorCoordinatorImpl类实在庞大,我们先分析startBatch()方法源码

    /**
       * 开始采集
       * Starts running a batch for this {@link ConnectorCoordinator} if a batch is
       * not already running.
       *
       * @return true if this call started a batch
       */
      /* @Override */
      public synchronized boolean startBatch() {
        if (!shouldRun()) {
          return false;
        }
    
        BatchSize batchSize = loadManager.determineBatchSize();
        if (batchSize.getHint() == 0) {
          return false;
        }
    
        try {
          TraversalManager traversalManager = getTraversalManager();
          if (traversalManager == null) {
            return false;
          }
          //当前标识
          currentBatchKey = new Object();
          
          BatchCoordinator batchCoordinator = new BatchCoordinator(this);
          
          //batchCoordinator 作为 TraversalStateStore stateStore角色
          Traverser traverser = new QueryTraverser(pusherFactory,
              traversalManager, batchCoordinator, name,
              Context.getInstance().getTraversalContext(), clock);
          
          //batchCoordinator 作为 BatchResultRecorder batchResultRecorder, BatchTimeout batchTimeout角色
          
          //调用Traverser traverser的取消方法   
          //BatchResultRecorder batchResultRecorder记录运行结果;[不能由外部调用]
          //BatchTimeout batchTimeout的超时方法
          TimedCancelable batch =  new CancelableBatch(traverser, name,
              batchCoordinator, batchCoordinator, batchSize);
          taskHandle = threadPool.submit(batch);
          //threadPool.shutdown(interrupt, waitMillis)
          //taskHandle.cancel();
          
          return true;
        } catch (ConnectorNotFoundException cnfe) {
          LOGGER.log(Level.WARNING, "Connector not found - this is normal if you "
              + " recently reconfigured your connector instance: " + cnfe);
        } catch (InstantiatorException ie) {
          LOGGER.log(Level.WARNING,
              "Failed to perform connector content traversal.", ie);
          delayTraversal(TraversalDelayPolicy.ERROR);
        }
        return false;
      }

    我们可以看到,在该方法里面首先构造QueryTraverser对象(需要构造引用PusherFactory pusherFactory、TraversalManager traversalManager、TraversalStateStore stateStore实例),然后构造CancelableBatch对象(构造函数传入QueryTraverser对象和BatchSize batchSize对象*批次尺寸),最后线程池对象提交CancelableBatch对象执行(到现在我们知道,一次线程执行只执行批次尺寸的数据采集,而并不一定是全部数据)

    这里的BatchCoordinator batchCoordinator = new BatchCoordinator(this)对象在上面方法中充当不同的角色,即该对象实现了不同的接口

    其构造函数传入了当前对象,即ConnectorCoordinatorImpl connectorCoordinator实例对象

    在BatchCoordinator batchCoordinator对象实现不同接口的实现方法里面,基本上都是回调ConnectorCoordinatorImpl connectorCoordinator实例对象的方法,采用这种迂回的回马枪策略,大概是出于职责分明考虑吧,另外可能需要用到ConnectorCoordinatorImpl connectorCoordinator实例对象的状态

    基本上BatchCoordinator batchCoordinator对象实现的方法在ConnectorCoordinatorImpl connectorCoordinator实例对象里面都要实现,这里设计方法采用的是一种包装器模式、或者是代理模式

    可以猜想到,BatchCoordinator batchCoordinator对象实现了的接口实际上ConnectorCoordinatorImpl connectorCoordinator实例对象名义上甚至实际上也同样实现了(而类ConnectorCoordinatorImpl 实现的接口BatchCoordinator并不一定要实现)

    我们先睹为快,不先做瞎猜了

    BatchCoordinator类实现

    class BatchCoordinator implements TraversalStateStore,
    BatchResultRecorder, BatchTimeout

    ConnectorCoordinatorImpl类实现

    class ConnectorCoordinatorImpl implements
         ConnectorCoordinator, ChangeHandler, BatchResultRecorder

    貌似BatchCoordinator类实现的接口之中,只有BatchResultRecorder接口ConnectorCoordinatorImpl类名义上也实现了

    下面逐一分析

    BatchCoordinator batchCoordinator = new BatchCoordinator(this);
          
          //batchCoordinator 作为 TraversalStateStore stateStore角色
          Traverser traverser = new QueryTraverser(pusherFactory,
              traversalManager, batchCoordinator, name,
              Context.getInstance().getTraversalContext(), clock);

    这里batchCoordinator 作为 TraversalStateStore stateStore角色,实现raversalStateStore接口方法为:

     public String getTraversalState() {
        synchronized (connectorCoordinator) {
          if (connectorCoordinator.currentBatchKey == requiredBatchKey) {
            return cachedState;
          } else {
            throw new BatchCompletedException();
          }
        }
      }
    
      public void storeTraversalState(String state) {
        synchronized (connectorCoordinator) {
          // Make sure our batch is still valid and that nobody has modified
          // the checkpoint while we were away.
          try {
            if ((connectorCoordinator.currentBatchKey == requiredBatchKey) &&
                isCheckpointUnmodified()) {
              connectorCoordinator.setConnectorState(state);
              cachedState = state;
            } else {
              throw new BatchCompletedException();
            }
          } catch (ConnectorNotFoundException cnfe) {
            // Connector disappeared while we were away.
            // Don't try to store results.
            throw new BatchCompletedException();
          }
        }
      }

    上面方法分别为获取断点状态和更新断点状态,需要考虑同步问题,两者都回调了ConnectorCoordinatorImpl connectorCoordinator对象的方法

    获取状态

    /**
       * Returns the Connector's traversal state.
       *
       * @return String representation of the stored state, or
       *         null if no state is stored.
       * @throws ConnectorNotFoundException if this {@link ConnectorCoordinator}
       *         does not exist.
       */
      /* @Override */
      public synchronized String getConnectorState()
          throws ConnectorNotFoundException {
        return getInstanceInfo().getConnectorState();
      }

    更新状态

    /**
       * Set the Connector's traversal state.
       *
       * @param state a String representation of the state to store.
       *        If null, any previous stored state is discarded.
       * @throws ConnectorNotFoundException if this {@link ConnectorCoordinator}
       *         does not exist.
       */
      /* @Override */
      public synchronized void setConnectorState(String state)
          throws ConnectorNotFoundException {
        getInstanceInfo().setConnectorState(state);
        // Must not call ChangeDetector, as this is called from a synchronized
        // block in BatchCoordinator.
      }

    接下来分析BatchCoordinator batchCoordinator对象充当的其他角色

     //batchCoordinator 作为 BatchResultRecorder batchResultRecorder, BatchTimeout batchTimeout角色
          
          //调用Traverser traverser的取消方法   
          //BatchResultRecorder batchResultRecorder记录运行结果;[不能由外部调用]
          //BatchTimeout batchTimeout的超时方法
          TimedCancelable batch =  new CancelableBatch(traverser, name,
              batchCoordinator, batchCoordinator, batchSize);

    前者作为BatchResultRecorder batchResultRecorder角色,后者作为BatchTimeout batchTimeout角色

    实现BatchResultRecorder接口的方法

    public void recordResult(BatchResult result) {
        synchronized (connectorCoordinator) {
          if (connectorCoordinator.currentBatchKey == requiredBatchKey) {
            connectorCoordinator.recordResult(result);
          } else {
            LOGGER.fine("Ignoring a BatchResult returned from a "
                + "prevously canceled traversal batch.  Connector = "
                + connectorCoordinator.getConnectorName()
                + "  result = " + result + "  batchKey = " + requiredBatchKey);
          }
        }
      }

    里面进一步回调了ConnectorCoordinatorImpl connectorCoordinator对象如下方法

    /**
       * Records the supplied traversal batch results.  Updates the
       * {@link LoadManager} with number of documents traversed,
       * and implements the requested {@link TraversalDelayPolicy}.
       *
       * @param result a BatchResult
       */
      /* @Override */
      public synchronized void recordResult(BatchResult result) {
        loadManager.recordResult(result);
        delayTraversal(result.getDelayPolicy());
      }

    记录执行结果及决定延迟策略

    这里调用的方法名一致,我们再前面已经看到,两者都实现了BatchResultRecorder接口

    实现BatchTimeout接口方法

    public void timeout() {
        synchronized (connectorCoordinator) {
          if (connectorCoordinator.currentBatchKey == requiredBatchKey) {
            connectorCoordinator.resetBatch();
          } else {
            LOGGER.warning("Ignoring Timeout for previously prevously canceled"
                + " or completed traversal batch.  Connector = "
                + connectorCoordinator.getConnectorName()
                + "  batchKey = "+ requiredBatchKey);
          }
        }
      }

    回调ConnectorCoordinatorImpl connectorCoordinator对象如下方法(重置采集)

    /**
       * 取消采集
       * Halts any in-progess traversals for this {@link Connector} instance.
       * Some or all of the information collected during the current traversal
       * may be discarded.
       */
      synchronized void resetBatch() {
        if (taskHandle != null) {
          taskHandle.cancel();
        }
        taskHandle = null;
        currentBatchKey = null;
        interfaces = null;
    
        // Discard cached interface instances.
        traversalManager = null;
        retriever = null;
        traversalSchedule = null;
      }

    前两者分别用于状态管理和记录执行结果及延迟策略,第三者用于取消采集(里面调用了taskHandle.cancel()方法)

    执行序列为

    TimedCancelable类型对象(CancelableBatch对象)的timeout(TaskHandle taskHandle)方法-->

    BatchTimeout类型对象(即BatchCoordinator batchCoordinator)的batchTimeout.timeout()方法-->

    ConnectorCoordinatorImpl  connectorCoordinator对象的resetBatch()方法-->

    TaskHandle taskHandle的taskHandle.cancel()方法-->Cancelable类型对象(CancelableBatch对象)的cancel()方法-->Traverser traverser的 cancelBatch()方法

    即CancelableBatch对象的timeout(TaskHandle taskHandle)方法绕来绕去最终接上了自己的cancel()方法

    从下文中我们可以看到,这样处理的目的在于当一个线程超时时,由另一个监控超时的线程执行取消操作;在正常情况下,该执行序列将不会发生

    TaskHandle taskHandle是一个任务执行句柄,用于对执行过程进行操控

    /**
     * Handle for the management of a {@link Cancelable} primary task.
     */
    public class TaskHandle {
      /**
       * The primary {@link Cancelable} that is run by this task to
       * perform some useful work.
       */
      final Cancelable cancelable;
    
      /*
       * The {@link future} for the primary task.
       */
      final Future<?> taskFuture;
    
      /*
       * The time the task starts.
       */
      final long startTime;
    
      /**
       * Create a TaskHandle.
       *
       * @param cancelable {@link Cancelable} for the primary task.
       * @param taskFuture {@link Future} for the primary task.
       * @param startTime startTime for the primary task.
       */
      TaskHandle(Cancelable cancelable, Future<?> taskFuture, long startTime) {
        this.cancelable = cancelable;
        this.taskFuture = taskFuture;
        this.startTime = startTime;
      }
    
      /**
       * Cancel the primary task and the time out task.
       */
      public void cancel() {
        cancelable.cancel();
        taskFuture.cancel(true);
      }
    
      /**
       * Return true if the primary task has completed.
       */
      public boolean isDone() {
        return taskFuture.isDone();
      }
    }

    ---------------------------------------------------------------------------

    本系列企业搜索引擎开发之连接器connector系本人原创

    转载请注明出处 博客园 刺猬的温驯

    本人邮箱: chenying998179@163#com (#改为.)

    本文链接 http://www.cnblogs.com/chenying99/p/3775591.html 

  • 相关阅读:
    转linux ln命令详解 简单
    商业智能哲学思索
    IE8 RTM 和 IE8 RC1 版本比较。
    汇编语言发展树.
    IE8卸载后遗症,不得不继续当小白鼠。。。
    商务智能系统实现数据管理的关键技术
    商务智能 是什么? 不是什么?
    Standard and Custom Toolset Configurations
    统一论:3G手机、云计算、SaaS、业务开发平台、SOA、BPEL [转]
    JAVA Atm测试实验心得
  • 原文地址:https://www.cnblogs.com/chenying99/p/3775591.html
Copyright © 2011-2022 走看看