zoukankan      html  css  js  c++  java
  • 企业搜索引擎开发之连接器connector(八)

    接下来继续分析TimedCancelable类及QueryTraverser类

    TimedCancelable类的源码如下:

    /**
     * A {@link TimedCancelable} for running a {@link Connector} batch using
     * a {@link Traverser}
     */
    class CancelableBatch implements TimedCancelable {
      private static final Logger LOGGER =
        Logger.getLogger(CancelableBatch.class.getName());
    
      final Traverser traverser;
      final String traverserName;
      final BatchResultRecorder batchResultRecorder;
      final BatchTimeout batchTimeout;
      final BatchSize batchSize;
    
      /**
       * Construct a {@link CancelableBatch}.
       *
       * @param traverser {@link Traverser} for running the batch.
       * @param traverserName traverser name for logging purposes.
       * @param batchResultRecorder {@link BatchResultRecorder} for recording
       *        the result of running the batch.
       * @param batchSize hint and constraints as to the number of documents
       *        to process in the batch.
       */
      public CancelableBatch(Traverser traverser, String traverserName,
          BatchResultRecorder batchResultRecorder, BatchTimeout batchTimeout,
          BatchSize batchSize) {
        this.traverser = traverser;
        this.traverserName = traverserName;
        this.batchResultRecorder = batchResultRecorder;
        this.batchSize = batchSize;
        this.batchTimeout = batchTimeout;
      }
    
      public void cancel() {
       traverser.cancelBatch();
      }
    
      public void timeout(TaskHandle taskHandle) {
         batchTimeout.timeout();
      }
    
      public void run() {
        NDC.push("Traverse " + traverserName);
        try {
          LOGGER.fine("Begin runBatch; traverserName = " + traverserName
              + "  " + batchSize);
          BatchResult batchResult = traverser.runBatch(batchSize);
          LOGGER.fine("Traverser " + traverserName + " batchDone with result = "
              + batchResult);
          batchResultRecorder.recordResult(batchResult);
        } finally {
          NDC.remove();
        }
      }
    
      @Override
      public String toString() {
        return "CancelableBatch traverser: " + traverser + "  " + batchSize;
      }
    }

     该类通过提供操作连接器的取消相关方法外,主要就是线程执行的run方法了(cancel方法主要由线程执行句柄TaskHandle类来调用)

    在线程执行方法里面调用traverser.runBatch(batchSize)方法(traverser为Traverser接口实现类QueryTraverser对象)

    执行完毕后由BatchResultRecorder类对象记录执行结果的信息

    继续来分析QueryTraverser类的源码:

    /**
     * Traverser for a repository implemented using a TraversalManager
     */
    public class QueryTraverser implements Traverser {
      private static final Logger LOGGER =
          Logger.getLogger(QueryTraverser.class.getName());
    
      private final PusherFactory pusherFactory;
      private final TraversalManager queryTraversalManager;
      private final TraversalStateStore stateStore;
      private final String connectorName;
      private final TraversalContext traversalContext;
    
      // Synchronize access to cancelWork.
      private final Object cancelLock = new Object();
      private boolean cancelWork = false;
    
      public QueryTraverser(PusherFactory pusherFactory,
          TraversalManager traversalManager, TraversalStateStore stateStore,
          String connectorName, TraversalContext traversalContext) {
        this.pusherFactory = pusherFactory;
        this.queryTraversalManager = traversalManager;
        this.stateStore = stateStore;
        this.connectorName = connectorName;
        this.traversalContext = traversalContext;
        if (queryTraversalManager instanceof TraversalContextAware) {
          TraversalContextAware contextAware =
              (TraversalContextAware)queryTraversalManager;
          try {
            contextAware.setTraversalContext(traversalContext);
          } catch (Exception e) {
            LOGGER.log(Level.WARNING, "Unable to set TraversalContext", e);
          }
        }
      }
    
      //@Override
      public void cancelBatch() {
        synchronized(cancelLock) {
          cancelWork = true;
        }
        LOGGER.fine("Cancelling traversal for connector " + connectorName);
      }
    
      public boolean isCancelled() {
        synchronized(cancelLock) {
          return cancelWork;
        }
      }
    
      //@Override
      public BatchResult runBatch(BatchSize batchSize) {
        final long startTime = System.currentTimeMillis();
        final long timeoutTime = startTime
          + traversalContext.traversalTimeLimitSeconds() * 1000;
    
        if (isCancelled()) {
            LOGGER.warning("Attempting to run a cancelled QueryTraverser");
          return new BatchResult(TraversalDelayPolicy.ERROR);
        }
        try {
          queryTraversalManager.setBatchHint(batchSize.getHint());
        } catch (RepositoryException e) {
          LOGGER.log(Level.WARNING, "Unable to set batch hint", e);
        }
    
        String connectorState;
        try {
          if (stateStore != null) {
            connectorState = stateStore.getTraversalState();
          } else {
            throw new IllegalStateException("null TraversalStateStore");
          }
        } catch (IllegalStateException ise) {
          // We get here if the ConnectorStateStore for connector is disabled.
          // That happens if the connector was deleted while we were asleep.
          // Our connector seems to have been deleted.  Don't process a batch.
          LOGGER.finer("Halting traversal..." + ise.getMessage());
          return new BatchResult(TraversalDelayPolicy.ERROR);
        }
    
        DocumentList resultSet = null;
        if (connectorState == null) {
          try {
            LOGGER.finer("Starting traversal...");
            resultSet = queryTraversalManager.startTraversal();
          } catch (Exception e) {
            LOGGER.log(Level.WARNING, "startTraversal threw exception: ", e);
            return new BatchResult(TraversalDelayPolicy.ERROR);
          }
        } else {
          try {
            LOGGER.finer("Resuming traversal...");
            resultSet = queryTraversalManager.resumeTraversal(connectorState);
          } catch (Exception e) {
            LOGGER.log(Level.WARNING, "resumeTraversal threw exception: ", e);
            return new BatchResult(TraversalDelayPolicy.ERROR);
          }
        }
    
        // If the traversal returns null, that means that the repository has
        // no new content to traverse.
        if (resultSet == null) {
          LOGGER.finer("Result set is NULL, no documents returned for traversal.");
          return new BatchResult(TraversalDelayPolicy.POLL, 0);
        }
    
        Pusher pusher = null;
        BatchResult result = null;
        int counter = 0;
        try {
          // Get a Pusher for feeding the returned Documents.
          pusher = pusherFactory.newPusher(connectorName);
    
          while (counter < batchSize.getMaximum()) {
            if (Thread.currentThread().isInterrupted() || isCancelled()) {
              LOGGER.fine("Traversal for connector " + connectorName
                          + " has been interrupted...breaking out of batch run.");
              break;
            }
            if (System.currentTimeMillis() >= timeoutTime) {
              LOGGER.fine("Traversal for connector " + connectorName
                  + " is completing due to time limit.");
              break;
            }
    
            Document nextDocument = null;
            String docid = null;
            try {
              LOGGER.finer("Pulling next document from connector " + connectorName);
              nextDocument = resultSet.nextDocument();
              if (nextDocument == null) {
                break;
              } else {
                // Since there are a couple of places below that could throw
                // exceptions but not exit the while loop, the counter should be
                // incremented here to insure it represents documents returned from
                // the list.  Note the call to nextDocument() could also throw a
                // RepositoryDocumentException signaling a skipped document in which
                // case the call will not be counted against the batch maximum.
                counter++;
                // Fetch DocId to use in messages.
                try {
                  docid = Value.getSingleValueString(nextDocument,
                                                     SpiConstants.PROPNAME_DOCID);
                } catch (IllegalArgumentException e1) {
                    LOGGER.fine("Unable to get document id for document ("
                                + nextDocument + "): " + e1.getMessage());
                } catch (RepositoryException e1) {
                    LOGGER.fine("Unable to get document id for document ("
                                + nextDocument + "): " + e1.getMessage());
                }
              }
              LOGGER.finer("Sending document (" + docid + ") from connector "
                  + connectorName + " to Pusher");
    
              if (!pusher.take(nextDocument)) {
                LOGGER.fine("Traversal for connector " + connectorName
                    + " is completing at the request of the Pusher.");
                break;
              }
    
            } catch (SkippedDocumentException e) {
              /* TODO (bmj): This is a temporary solution and should be replaced.
               * It uses Exceptions for non-exceptional cases.
               */
              // Skip this document.  Proceed on to the next one.
              if (LOGGER.isLoggable(Level.FINER)) {
                LOGGER.log(Level.FINER, "Skipping document (" + docid
                    + ") from connector " + connectorName + ": " + e.getMessage());
              }
            } catch (RepositoryDocumentException e) {
              // Skip individual documents that fail.  Proceed on to the next one.
              LOGGER.log(Level.WARNING, "Skipping document (" + docid
                  + ") from connector " + connectorName, e);
            } catch (RuntimeException e) {
              // Skip individual documents that fail.  Proceed on to the next one.
              LOGGER.log(Level.WARNING, "Skipping document (" + docid
                  + ") from connector " + connectorName, e);
            }
          }
          // No more documents. Wrap up any accumulated feed data and send it off.
          if (!isCancelled()) {
            pusher.flush();
          }
        } catch (OutOfMemoryError e) {
          pusher.cancel();
          System.runFinalization();
          System.gc();
          result = new BatchResult(TraversalDelayPolicy.ERROR);
          try {
            LOGGER.severe("Out of JVM Heap Space.  Will retry later.");
            LOGGER.log(Level.FINEST, e.getMessage(), e);
          } catch (Throwable t) {
            // OutOfMemory state may prevent us from logging the error.
            // Don't make matters worse by rethrowing something meaningless.
          }
        } catch (RepositoryException e) {
          // Drop the entire batch on the floor.  Do not call checkpoint
          // (as there is a discrepancy between what the Connector thinks
          // it has fed, and what actually has been pushed).
          LOGGER.log(Level.SEVERE, "Repository Exception during traversal.", e);
          result = new BatchResult(TraversalDelayPolicy.ERROR);
        } catch (PushException e) {
          LOGGER.log(Level.SEVERE, "Push Exception during traversal.", e);
          // Drop the entire batch on the floor.  Do not call checkpoint
          // (as there is a discrepancy between what the Connector thinks
          // it has fed, and what actually has been pushed).
          result = new BatchResult(TraversalDelayPolicy.ERROR);
        } catch (FeedException e) {
          LOGGER.log(Level.SEVERE, "Feed Exception during traversal.", e);
          // Drop the entire batch on the floor.  Do not call checkpoint
          // (as there is a discrepancy between what the Connector thinks
          // it has fed, and what actually has been pushed).
          result = new BatchResult(TraversalDelayPolicy.ERROR);
        } catch (Throwable t) {
          LOGGER.log(Level.SEVERE, "Uncaught Exception during traversal.", t);
          // Drop the entire batch on the floor.  Do not call checkpoint
          // (as there is a discrepancy between what the Connector thinks
          // it has fed, and what actually has been pushed).
          result = new BatchResult(TraversalDelayPolicy.ERROR);
       } finally {
          // If we have cancelled the work, abandon the batch.
          if (isCancelled()) {
            result = new BatchResult(TraversalDelayPolicy.ERROR);
          }
    
          // Checkpoint completed work as well as skip past troublesome documents
          // (e.g. documents that are too large and will always fail).
          if ((result == null) && (checkpointAndSave(resultSet) == null)) {
            // Unable to get a checkpoint, so wait a while, then retry batch.
            result = new BatchResult(TraversalDelayPolicy.ERROR);
          }
        }
        if (result == null) {
          result = new BatchResult(TraversalDelayPolicy.IMMEDIATE, counter,
                                   startTime, System.currentTimeMillis());
        } else if (pusher != null) {
          // We are returning an error from this batch. Cancel any feed that
          // might be in progress.
          pusher.cancel();
        }
        return result;
      }
    
      private String checkpointAndSave(DocumentList pm) {
        String connectorState = null;
        LOGGER.finest("Checkpointing for connector " + connectorName + " ...");
        try {
          connectorState = pm.checkpoint();
        } catch (RepositoryException re) {
          // If checkpoint() throws RepositoryException, it means there is no
          // new checkpoint.
          return null;
        } catch (Exception e) {
          // If checkpoint() throws some general Exception, it is probably
          // an older connector that doesn't understand the newer empty
          // DocumentList and Exception handling from runBatch() model.
          return null;
        }
        try {
          if (connectorState != null) {
            if (stateStore != null) {
              stateStore.storeTraversalState(connectorState);
            } else {
              throw new IllegalStateException("null TraversalStateStore");
            }
            LOGGER.finest("...checkpoint " + connectorState + " created.");
          }
          return connectorState;
        } catch (IllegalStateException ise) {
          // We get here if the ConnectorStateStore for connector is disabled.
          // That happens if the connector was deleted while we were working.
          // Our connector seems to have been deleted.  Don't save a checkpoint.
          LOGGER.finest("...checkpoint " + connectorState + " discarded.");
        }
        return null;
      }
    }

    void cancelBatch()方法用去取消执行,我们上文看到是通过CancelableBatch类调用的,而Taskhandle再调用CancelableBatch类的cancel()方法,最终委托给ConnectorCoordinatorImpl类的void resetBatch()方法

     还是分析一下关键方法BatchResult runBatch(BatchSize batchSize),这里是调用具体连接器的TraversalManager接口实现类来轮询数据的

    首先判断任务是否取消,否则直接返回

    接着获取存储的任务的状态信息,并调用queryTraversalManager.startTraversal();或queryTraversalManager.resumeTraversal(connectorState);获取数据结果集DocumentList resultSet

    接下来通过PusherFactory工厂获取Pusher类型对象,并调用Pusher类型对象的take方法想应用中心发送数据

    最后保存数据结果集的状态信息并返回连接器数据采集的结果信息

    new BatchResult(TraversalDelayPolicy.IMMEDIATE, counter,startTime, System.currentTimeMillis());

    ---------------------------------------------------------------------------

    本系列企业搜索引擎开发之连接器connector系本人原创

    转载请注明出处 博客园 刺猬的温驯

    本文链接 http://www.cnblogs.com/chenying99/archive/2013/03/19/2968411.html

  • 相关阅读:
    2. Add Two Numbers
    1. Two Sum
    leetcode 213. 打家劫舍 II JAVA
    leetcode 48. 旋转图像 java
    leetcode 45. 跳跃游戏 II JAVA
    leetcode 42. 接雨水 JAVA
    40. 组合总和 II leetcode JAVA
    24. 两两交换链表中的节点 leetcode
    1002. 查找常用字符 leecode
    leetcode 23. 合并K个排序链表 JAVA
  • 原文地址:https://www.cnblogs.com/chenying99/p/2968411.html
Copyright © 2011-2022 走看看