zoukankan      html  css  js  c++  java
  • 企业搜索引擎开发之连接器connector(二十)

    连接器里面衔接数据源与数据推送对象的是QueryTraverser类对象,该类实现了Traverser接口

    /**
     * Interface presented by a Traverser.  Used by the Scheduler.
     */
    public interface Traverser {
    
      /**
       * Interval to wait after a transient error before retrying a traversal.
       */
      public static final int ERROR_WAIT_MILLIS = 15 * 60 * 1000;
    
      /**
       * Runs a batch of documents. The Traversal method may be hard (impossible?)
       * to interrupt while it is executing runBatch(). It is expected that a
       * thread loop running a traversal method would call runBatch(), then check
       * for InterruptedException, then decide whether it wants to stop of itself,
       * for scheduling reasons, or for a clean shutdown. It could then re-adjust
       * the batch hint if desired, then repeat.
       *
       * @param  batchSize A {@link BatchSize} instructs the traversal method to
       *         process approximately {@code batchSize.getHint()}, but no more
       *         than {@code batchSize.getMaximum()} number of documents in this
       *         batch.
       * @return A {@link BatchResult} containing the actual number of documents
       *         from this batch given to the feed and a possible policy to delay
       *         before requesting another batch.
       */
      public BatchResult runBatch(BatchSize batchSize);
    
      /**
       * Cancel the Batch in progress.  Discard the batch.  This might be called
       * when the workItem times out, connector deletion or reconfiguration, or
       * during shutdown.
       */
      public void cancelBatch();
    }

    即上面的BatchResult runBatch(BatchSize batchSize)方法,参数BatchSize batchSize表示批次大小

    QueryTraverser类对象通过引用TraversalManager queryTraversalManager对象实例获取数据源数据,同时引用PusherFactory pusherFactory对象实例实例化docPuser对象实例发送document对象数据,成员变量TraversalStateStore stateStore用于获取状态和保存状态(用于断点发送)

     @Override
      public BatchResult runBatch(BatchSize batchSize) {
          //开始时间
        final long startTime = clock.getTimeMillis();
        //超时时间
        final long timeoutTime = startTime
          + traversalContext.traversalTimeLimitSeconds() * 1000;
        //已取消
        if (isCancelled()) {
            LOGGER.warning("Attempting to run a cancelled QueryTraverser");
          return new BatchResult(TraversalDelayPolicy.ERROR);
        }
        try {
          //批次大小
          queryTraversalManager.setBatchHint(batchSize.getHint());
        } catch (RepositoryException e) {
          LOGGER.log(Level.WARNING, "Unable to set batch hint", e);
        }
    
        String connectorState;
        try {
          if (stateStore != null) {
            //获取断点状态
            connectorState = stateStore.getTraversalState();
          } else {
            throw new IllegalStateException("null TraversalStateStore");
          }
        } catch (IllegalStateException ise) {
          // We get here if the store for the connector is disabled.
          // That happens if the connector was deleted while we were asleep.
          // Our connector seems to have been deleted.  Don't process a batch.
          LOGGER.fine("Halting traversal for connector " + connectorName
                      + ": " + ise.getMessage());
          return new BatchResult(TraversalDelayPolicy.ERROR);
        }
    
        DocumentList resultSet = null;
        if (connectorState == null) {
          try {
            LOGGER.fine("START TRAVERSAL: Starting traversal for connector "
                        + connectorName);
            resultSet = queryTraversalManager.startTraversal();
          } catch (Exception e) {
            LOGGER.log(Level.WARNING, "startTraversal threw exception: ", e);
            return new BatchResult(TraversalDelayPolicy.ERROR);
          }
        } else {
          try {
            LOGGER.fine("RESUME TRAVERSAL: Resuming traversal for connector "
                + connectorName + " from checkpoint " + connectorState);
            resultSet = queryTraversalManager.resumeTraversal(connectorState);
          } catch (Exception e) {
            LOGGER.log(Level.WARNING, "resumeTraversal threw exception: ", e);
            return new BatchResult(TraversalDelayPolicy.ERROR);
          }
        }
    
        // If the traversal returns null, that means that the repository has
        // no new content to traverse.
        if (resultSet == null) {
          LOGGER.fine("Result set from connector " + connectorName
                      + " is NULL, no documents returned for traversal.");
          return new BatchResult(TraversalDelayPolicy.POLL, 0);
        }
    
        Pusher pusher = null;
        //反馈信息
        BatchResult result = null;
        int counter = 0;
        try {
            //同一批次同一个pusher实例
          // Get a Pusher for feeding the returned Documents.
          pusher = pusherFactory.newPusher(connectorName);
    
          while (true) {
            if (Thread.currentThread().isInterrupted() || isCancelled()) {
              LOGGER.fine("Traversal for connector " + connectorName
                          + " has been interrupted; breaking out of batch run.");
              break;
            }
            if (clock.getTimeMillis() >= timeoutTime) {
              LOGGER.fine("Traversal batch for connector " + connectorName
                  + " is completing due to time limit.");
              break;
            }
    
            String docid = null;
            try {
              LOGGER.finer("Pulling next document from connector " + connectorName);         
              
              Document nextDocument = resultSet.nextDocument();          
              //该resultSet数据集合批次已发送完毕
              if (nextDocument == null) {
                LOGGER.finer("Traversal batch for connector " + connectorName
                    + " at end after processing " + counter + " documents.");
    
                break;
              } else {
                //System.out.println("resultSet.getClass().getName():"+resultSet.getClass().getName());
                //System.out.println("nextDocument.getClass().getName():"+nextDocument.getClass().getName());
                // Since there are a couple of places below that could throw
                // exceptions but not exit the while loop, the counter should be
                // incremented here to insure it represents documents returned from
                // the list.  Note the call to nextDocument() could also throw a
                // RepositoryDocumentException signaling a skipped document in which
                // case the call will not be counted against the batch maximum.
                counter++;
                // Fetch DocId to use in messages.
                try {
                  docid = Value.getSingleValueString(nextDocument,
                                                     SpiConstants.PROPNAME_DOCID);
                } catch (IllegalArgumentException e1) {
                    LOGGER.finer("Unable to get document id for document ("
                                 + nextDocument + "): " + e1.getMessage());
                } catch (RepositoryException e1) {
                    LOGGER.finer("Unable to get document id for document ("
                                 + nextDocument + "): " + e1.getMessage());
                }
              }
              LOGGER.finer("Sending document (" + docid + ") from connector "
                  + connectorName + " to Pusher");
              //发布document
              if (pusher.take(nextDocument) != PusherStatus.OK) {
                LOGGER.fine("Traversal batch for connector " + connectorName
                    + " is completing at the request of the Pusher,"
                    + " after processing " + counter + " documents.");
                break;
              }
            } catch (SkippedDocumentException e) {
              /* TODO (bmj): This is a temporary solution and should be replaced.
               * It uses Exceptions for non-exceptional cases.
               */
              // Skip this document.  Proceed on to the next one.
              logSkippedDocument(docid, e);
            } catch (RepositoryDocumentException e) {
              // Skip individual documents that fail.  Proceed on to the next one.
              logSkippedDocument(docid, e);
            } catch (RuntimeException e) {
              // Skip individual documents that fail.  Proceed on to the next one.
              logSkippedDocument(docid, e);
            }
          }
          // No more documents. Wrap up any accumulated feed data and send it off.
          if (!isCancelled()) {
            pusher.flush();
          }
        } catch (OutOfMemoryError e) {
          pusher.cancel();
          System.runFinalization();
          System.gc();
          result = new BatchResult(TraversalDelayPolicy.ERROR);
          try {
            LOGGER.severe("Out of JVM Heap Space.  Will retry later.");
            LOGGER.log(Level.FINEST, e.getMessage(), e);
          } catch (Throwable t) {
            // OutOfMemory state may prevent us from logging the error.
            // Don't make matters worse by rethrowing something meaningless.
          }
        } catch (RepositoryException e) {
          // Drop the entire batch on the floor.  Do not call checkpoint
          // (as there is a discrepancy between what the Connector thinks
          // it has fed, and what actually has been pushed).
          LOGGER.log(Level.SEVERE, "Repository Exception during traversal.", e);
          result = new BatchResult(TraversalDelayPolicy.ERROR);
        } catch (PushException e) {
          LOGGER.log(Level.SEVERE, "Push Exception during traversal.", e);
          // Drop the entire batch on the floor.  Do not call checkpoint
          // (as there is a discrepancy between what the Connector thinks
          // it has fed, and what actually has been pushed).
          result = new BatchResult(TraversalDelayPolicy.ERROR);
        } catch (FeedException e) {
          LOGGER.log(Level.SEVERE, "Feed Exception during traversal.", e);
          // Drop the entire batch on the floor.  Do not call checkpoint
          // (as there is a discrepancy between what the Connector thinks
          // it has fed, and what actually has been pushed).
          result = new BatchResult(TraversalDelayPolicy.ERROR);
        } catch (Throwable t) {
          LOGGER.log(Level.SEVERE, "Uncaught Exception during traversal.", t);
          // Drop the entire batch on the floor.  Do not call checkpoint
          // (as there is a discrepancy between what the Connector thinks
          // it has fed, and what actually has been pushed).
          result = new BatchResult(TraversalDelayPolicy.ERROR);
       } finally {
          // If we have cancelled the work, abandon the batch.
          if (isCancelled()) {
            result = new BatchResult(TraversalDelayPolicy.ERROR);
          }
          
          //更新断点状态
          // Checkpoint completed work as well as skip past troublesome documents
          // (e.g. documents that are too large and will always fail).
          if ((result == null) && (checkpointAndSave(resultSet) == null)) {
            // Unable to get a checkpoint, so wait a while, then retry batch.
            result = new BatchResult(TraversalDelayPolicy.ERROR);
          }
        }
        if (result == null) {
          result = new BatchResult(TraversalDelayPolicy.IMMEDIATE, counter,
                                   startTime, clock.getTimeMillis());
        } else if (pusher != null) {
          // We are returning an error from this batch. Cancel any feed that
          // might be in progress.
          pusher.cancel();
        }
        return result;
      }

    关键代码本人已作了注释,通过遍历该数据集合批次,向docPusher对象提交document对象,遍历document对象执行完毕后更新断点状态用于下次从数据源获取数据

    /**
       * 保存断点状态
       * @param pm
       * @return
       */
      private String checkpointAndSave(DocumentList pm) {
        String connectorState = null;
        LOGGER.fine("CHECKPOINT: Generating checkpoint for connector "
                    + connectorName);
        try {
          connectorState = pm.checkpoint();
        } catch (RepositoryException re) {
          // If checkpoint() throws RepositoryException, it means there is no
          // new checkpoint.
          LOGGER.log(Level.FINE, "Failed to obtain checkpoint for connector "
                     + connectorName, re);
          return null;
        } catch (Exception e) {
          LOGGER.log(Level.INFO, "Failed to obtain checkpoint for connector "
                     + connectorName, e);
          return null;
        }
        try {
          if (connectorState != null) {
            if (stateStore != null) {
              stateStore.storeTraversalState(connectorState);
            } else {
              throw new IllegalStateException("null TraversalStateStore");
            }
            LOGGER.fine("CHECKPOINT: " + connectorState);
          }
          return connectorState;
        } catch (IllegalStateException ise) {
          // We get here if the store for the connector is disabled.
          // That happens if the connector was deleted while we were working.
          // Our connector seems to have been deleted.  Don't save a checkpoint.
          LOGGER.fine("Checkpoint discarded: " + connectorState);
        }
        return null;
      }

    取消执行方法通过设置布尔变量值,注意需要考虑同步

    /**
       * 取消执行
       */
      @Override
      public void cancelBatch() {
        synchronized(cancelLock) {
          cancelWork = true;
        }
        LOGGER.fine("Cancelling traversal for connector " + connectorName);
      }

    ---------------------------------------------------------------------------

    本系列企业搜索引擎开发之连接器connector系本人原创

    转载请注明出处 博客园 刺猬的温驯

    本人邮箱: chenying998179@163#com (#改为.)

    本文链接 http://www.cnblogs.com/chenying99/p/3775534.html 

  • 相关阅读:
    团队编程规范
    软工小组:我们都是水果
    Github与SmartGit使用说明与建议
    Github for Windows使用图文教程
    SQL语句实现mysql数据库快速插入1000w条数据
    dijkstra+relax修改
    Kuchiguse (20)简单字符串处理,输入坑
    1098. Insertion or Heap Sort (25)堆排序
    Consecutive Factors(求n的连续约数)
    Dijkstra(第二关键词最优),路径保存DFS
  • 原文地址:https://www.cnblogs.com/chenying99/p/3775534.html
Copyright © 2011-2022 走看看