zoukankan      html  css  js  c++  java
  • 企业搜索引擎开发之连接器connector(八)

    接下来继续分析TimedCancelable类及QueryTraverser类

    TimedCancelable类的源码如下:

    /**
     * A {@link TimedCancelable} for running a {@link Connector} batch using
     * a {@link Traverser}
     */
    class CancelableBatch implements TimedCancelable {
      private static final Logger LOGGER =
        Logger.getLogger(CancelableBatch.class.getName());
    
      final Traverser traverser;
      final String traverserName;
      final BatchResultRecorder batchResultRecorder;
      final BatchTimeout batchTimeout;
      final BatchSize batchSize;
    
      /**
       * Construct a {@link CancelableBatch}.
       *
       * @param traverser {@link Traverser} for running the batch.
       * @param traverserName traverser name for logging purposes.
       * @param batchResultRecorder {@link BatchResultRecorder} for recording
       *        the result of running the batch.
       * @param batchSize hint and constraints as to the number of documents
       *        to process in the batch.
       */
      public CancelableBatch(Traverser traverser, String traverserName,
          BatchResultRecorder batchResultRecorder, BatchTimeout batchTimeout,
          BatchSize batchSize) {
        this.traverser = traverser;
        this.traverserName = traverserName;
        this.batchResultRecorder = batchResultRecorder;
        this.batchSize = batchSize;
        this.batchTimeout = batchTimeout;
      }
    
      public void cancel() {
       traverser.cancelBatch();
      }
    
      public void timeout(TaskHandle taskHandle) {
         batchTimeout.timeout();
      }
    
      public void run() {
        NDC.push("Traverse " + traverserName);
        try {
          LOGGER.fine("Begin runBatch; traverserName = " + traverserName
              + "  " + batchSize);
          BatchResult batchResult = traverser.runBatch(batchSize);
          LOGGER.fine("Traverser " + traverserName + " batchDone with result = "
              + batchResult);
          batchResultRecorder.recordResult(batchResult);
        } finally {
          NDC.remove();
        }
      }
    
      @Override
      public String toString() {
        return "CancelableBatch traverser: " + traverser + "  " + batchSize;
      }
    }

     该类通过提供操作连接器的取消相关方法外,主要就是线程执行的run方法了(cancel方法主要由线程执行句柄TaskHandle类来调用)

    在线程执行方法里面调用traverser.runBatch(batchSize)方法(traverser为Traverser接口实现类QueryTraverser对象)

    执行完毕后由BatchResultRecorder类对象记录执行结果的信息

    继续来分析QueryTraverser类的源码:

    /**
     * Traverser for a repository implemented using a TraversalManager
     */
    public class QueryTraverser implements Traverser {
      private static final Logger LOGGER =
          Logger.getLogger(QueryTraverser.class.getName());
    
      private final PusherFactory pusherFactory;
      private final TraversalManager queryTraversalManager;
      private final TraversalStateStore stateStore;
      private final String connectorName;
      private final TraversalContext traversalContext;
    
      // Synchronize access to cancelWork.
      private final Object cancelLock = new Object();
      private boolean cancelWork = false;
    
      public QueryTraverser(PusherFactory pusherFactory,
          TraversalManager traversalManager, TraversalStateStore stateStore,
          String connectorName, TraversalContext traversalContext) {
        this.pusherFactory = pusherFactory;
        this.queryTraversalManager = traversalManager;
        this.stateStore = stateStore;
        this.connectorName = connectorName;
        this.traversalContext = traversalContext;
        if (queryTraversalManager instanceof TraversalContextAware) {
          TraversalContextAware contextAware =
              (TraversalContextAware)queryTraversalManager;
          try {
            contextAware.setTraversalContext(traversalContext);
          } catch (Exception e) {
            LOGGER.log(Level.WARNING, "Unable to set TraversalContext", e);
          }
        }
      }
    
      //@Override
      public void cancelBatch() {
        synchronized(cancelLock) {
          cancelWork = true;
        }
        LOGGER.fine("Cancelling traversal for connector " + connectorName);
      }
    
      public boolean isCancelled() {
        synchronized(cancelLock) {
          return cancelWork;
        }
      }
    
      //@Override
      public BatchResult runBatch(BatchSize batchSize) {
        final long startTime = System.currentTimeMillis();
        final long timeoutTime = startTime
          + traversalContext.traversalTimeLimitSeconds() * 1000;
    
        if (isCancelled()) {
            LOGGER.warning("Attempting to run a cancelled QueryTraverser");
          return new BatchResult(TraversalDelayPolicy.ERROR);
        }
        try {
          queryTraversalManager.setBatchHint(batchSize.getHint());
        } catch (RepositoryException e) {
          LOGGER.log(Level.WARNING, "Unable to set batch hint", e);
        }
    
        String connectorState;
        try {
          if (stateStore != null) {
            connectorState = stateStore.getTraversalState();
          } else {
            throw new IllegalStateException("null TraversalStateStore");
          }
        } catch (IllegalStateException ise) {
          // We get here if the ConnectorStateStore for connector is disabled.
          // That happens if the connector was deleted while we were asleep.
          // Our connector seems to have been deleted.  Don't process a batch.
          LOGGER.finer("Halting traversal..." + ise.getMessage());
          return new BatchResult(TraversalDelayPolicy.ERROR);
        }
    
        DocumentList resultSet = null;
        if (connectorState == null) {
          try {
            LOGGER.finer("Starting traversal...");
            resultSet = queryTraversalManager.startTraversal();
          } catch (Exception e) {
            LOGGER.log(Level.WARNING, "startTraversal threw exception: ", e);
            return new BatchResult(TraversalDelayPolicy.ERROR);
          }
        } else {
          try {
            LOGGER.finer("Resuming traversal...");
            resultSet = queryTraversalManager.resumeTraversal(connectorState);
          } catch (Exception e) {
            LOGGER.log(Level.WARNING, "resumeTraversal threw exception: ", e);
            return new BatchResult(TraversalDelayPolicy.ERROR);
          }
        }
    
        // If the traversal returns null, that means that the repository has
        // no new content to traverse.
        if (resultSet == null) {
          LOGGER.finer("Result set is NULL, no documents returned for traversal.");
          return new BatchResult(TraversalDelayPolicy.POLL, 0);
        }
    
        Pusher pusher = null;
        BatchResult result = null;
        int counter = 0;
        try {
          // Get a Pusher for feeding the returned Documents.
          pusher = pusherFactory.newPusher(connectorName);
    
          while (counter < batchSize.getMaximum()) {
            if (Thread.currentThread().isInterrupted() || isCancelled()) {
              LOGGER.fine("Traversal for connector " + connectorName
                          + " has been interrupted...breaking out of batch run.");
              break;
            }
            if (System.currentTimeMillis() >= timeoutTime) {
              LOGGER.fine("Traversal for connector " + connectorName
                  + " is completing due to time limit.");
              break;
            }
    
            Document nextDocument = null;
            String docid = null;
            try {
              LOGGER.finer("Pulling next document from connector " + connectorName);
              nextDocument = resultSet.nextDocument();
              if (nextDocument == null) {
                break;
              } else {
                // Since there are a couple of places below that could throw
                // exceptions but not exit the while loop, the counter should be
                // incremented here to insure it represents documents returned from
                // the list.  Note the call to nextDocument() could also throw a
                // RepositoryDocumentException signaling a skipped document in which
                // case the call will not be counted against the batch maximum.
                counter++;
                // Fetch DocId to use in messages.
                try {
                  docid = Value.getSingleValueString(nextDocument,
                                                     SpiConstants.PROPNAME_DOCID);
                } catch (IllegalArgumentException e1) {
                    LOGGER.fine("Unable to get document id for document ("
                                + nextDocument + "): " + e1.getMessage());
                } catch (RepositoryException e1) {
                    LOGGER.fine("Unable to get document id for document ("
                                + nextDocument + "): " + e1.getMessage());
                }
              }
              LOGGER.finer("Sending document (" + docid + ") from connector "
                  + connectorName + " to Pusher");
    
              if (!pusher.take(nextDocument)) {
                LOGGER.fine("Traversal for connector " + connectorName
                    + " is completing at the request of the Pusher.");
                break;
              }
    
            } catch (SkippedDocumentException e) {
              /* TODO (bmj): This is a temporary solution and should be replaced.
               * It uses Exceptions for non-exceptional cases.
               */
              // Skip this document.  Proceed on to the next one.
              if (LOGGER.isLoggable(Level.FINER)) {
                LOGGER.log(Level.FINER, "Skipping document (" + docid
                    + ") from connector " + connectorName + ": " + e.getMessage());
              }
            } catch (RepositoryDocumentException e) {
              // Skip individual documents that fail.  Proceed on to the next one.
              LOGGER.log(Level.WARNING, "Skipping document (" + docid
                  + ") from connector " + connectorName, e);
            } catch (RuntimeException e) {
              // Skip individual documents that fail.  Proceed on to the next one.
              LOGGER.log(Level.WARNING, "Skipping document (" + docid
                  + ") from connector " + connectorName, e);
            }
          }
          // No more documents. Wrap up any accumulated feed data and send it off.
          if (!isCancelled()) {
            pusher.flush();
          }
        } catch (OutOfMemoryError e) {
          pusher.cancel();
          System.runFinalization();
          System.gc();
          result = new BatchResult(TraversalDelayPolicy.ERROR);
          try {
            LOGGER.severe("Out of JVM Heap Space.  Will retry later.");
            LOGGER.log(Level.FINEST, e.getMessage(), e);
          } catch (Throwable t) {
            // OutOfMemory state may prevent us from logging the error.
            // Don't make matters worse by rethrowing something meaningless.
          }
        } catch (RepositoryException e) {
          // Drop the entire batch on the floor.  Do not call checkpoint
          // (as there is a discrepancy between what the Connector thinks
          // it has fed, and what actually has been pushed).
          LOGGER.log(Level.SEVERE, "Repository Exception during traversal.", e);
          result = new BatchResult(TraversalDelayPolicy.ERROR);
        } catch (PushException e) {
          LOGGER.log(Level.SEVERE, "Push Exception during traversal.", e);
          // Drop the entire batch on the floor.  Do not call checkpoint
          // (as there is a discrepancy between what the Connector thinks
          // it has fed, and what actually has been pushed).
          result = new BatchResult(TraversalDelayPolicy.ERROR);
        } catch (FeedException e) {
          LOGGER.log(Level.SEVERE, "Feed Exception during traversal.", e);
          // Drop the entire batch on the floor.  Do not call checkpoint
          // (as there is a discrepancy between what the Connector thinks
          // it has fed, and what actually has been pushed).
          result = new BatchResult(TraversalDelayPolicy.ERROR);
        } catch (Throwable t) {
          LOGGER.log(Level.SEVERE, "Uncaught Exception during traversal.", t);
          // Drop the entire batch on the floor.  Do not call checkpoint
          // (as there is a discrepancy between what the Connector thinks
          // it has fed, and what actually has been pushed).
          result = new BatchResult(TraversalDelayPolicy.ERROR);
       } finally {
          // If we have cancelled the work, abandon the batch.
          if (isCancelled()) {
            result = new BatchResult(TraversalDelayPolicy.ERROR);
          }
    
          // Checkpoint completed work as well as skip past troublesome documents
          // (e.g. documents that are too large and will always fail).
          if ((result == null) && (checkpointAndSave(resultSet) == null)) {
            // Unable to get a checkpoint, so wait a while, then retry batch.
            result = new BatchResult(TraversalDelayPolicy.ERROR);
          }
        }
        if (result == null) {
          result = new BatchResult(TraversalDelayPolicy.IMMEDIATE, counter,
                                   startTime, System.currentTimeMillis());
        } else if (pusher != null) {
          // We are returning an error from this batch. Cancel any feed that
          // might be in progress.
          pusher.cancel();
        }
        return result;
      }
    
      private String checkpointAndSave(DocumentList pm) {
        String connectorState = null;
        LOGGER.finest("Checkpointing for connector " + connectorName + " ...");
        try {
          connectorState = pm.checkpoint();
        } catch (RepositoryException re) {
          // If checkpoint() throws RepositoryException, it means there is no
          // new checkpoint.
          return null;
        } catch (Exception e) {
          // If checkpoint() throws some general Exception, it is probably
          // an older connector that doesn't understand the newer empty
          // DocumentList and Exception handling from runBatch() model.
          return null;
        }
        try {
          if (connectorState != null) {
            if (stateStore != null) {
              stateStore.storeTraversalState(connectorState);
            } else {
              throw new IllegalStateException("null TraversalStateStore");
            }
            LOGGER.finest("...checkpoint " + connectorState + " created.");
          }
          return connectorState;
        } catch (IllegalStateException ise) {
          // We get here if the ConnectorStateStore for connector is disabled.
          // That happens if the connector was deleted while we were working.
          // Our connector seems to have been deleted.  Don't save a checkpoint.
          LOGGER.finest("...checkpoint " + connectorState + " discarded.");
        }
        return null;
      }
    }

    void cancelBatch()方法用去取消执行,我们上文看到是通过CancelableBatch类调用的,而Taskhandle再调用CancelableBatch类的cancel()方法,最终委托给ConnectorCoordinatorImpl类的void resetBatch()方法

     还是分析一下关键方法BatchResult runBatch(BatchSize batchSize),这里是调用具体连接器的TraversalManager接口实现类来轮询数据的

    首先判断任务是否取消,否则直接返回

    接着获取存储的任务的状态信息,并调用queryTraversalManager.startTraversal();或queryTraversalManager.resumeTraversal(connectorState);获取数据结果集DocumentList resultSet

    接下来通过PusherFactory工厂获取Pusher类型对象,并调用Pusher类型对象的take方法想应用中心发送数据

    最后保存数据结果集的状态信息并返回连接器数据采集的结果信息

    new BatchResult(TraversalDelayPolicy.IMMEDIATE, counter,startTime, System.currentTimeMillis());

    ---------------------------------------------------------------------------

    本系列企业搜索引擎开发之连接器connector系本人原创

    转载请注明出处 博客园 刺猬的温驯

    本文链接 http://www.cnblogs.com/chenying99/archive/2013/03/19/2968411.html

  • 相关阅读:
    [Swift]todoList压栈
    Backtrack下的dns爆破工具的目录
    Linux如何设置dns
    预防黑客入侵 防黑必学的cmd命令vs网络安全
    SSL协议详解
    CDN(内容分发网络)技术原理
    社工数据搜索引擎搭建
    实战 SSH 端口转发
    Sublime Text编辑器如何隐藏顶部的菜单栏
    Sublime Text 2 -Sidebar 背景色调整为黑色
  • 原文地址:https://www.cnblogs.com/chenying99/p/2968411.html
Copyright © 2011-2022 走看看