zoukankan      html  css  js  c++  java
  • 企业搜索引擎开发之连接器connector(九)

    接下来继续分析PusherFactory类 及Pusher类的相关源码实现

    先浏览一下相关UML模型图:

    我们从该图可以看到,连接器最终是通过FeedConnection类型对象向应用中心发送xmlfeed数据的

    (勘误:企业搜索引擎开发之连接器connector(六)中的UML图 这部分有点出入)

    PusherFactory类是Pusher类的工厂类,现在来看PusherFactory源码:

    /**
     * Interface for a factory that creates {@link Pusher} instances for use by a
     * {@link com.google.enterprise.connector.traversal.Traverser Traverser}.
     */
    public interface PusherFactory {
      /**
       * Create a new {@link Pusher} instance appropriate for the supplied
       * dataSource.
       *
       * @param dataSource a data source for a {@code Feed}, typically the name
       *        of a connector instance.
       * @return a {@link Pusher}
       * @throws PushException if no {@link Pusher} is assigned to the 
       *         {@code dataSource}.
       */
      public Pusher newPusher(String dataSource) throws PushException;
    }

    该接口就是生产Pusher类型对象,实现类DocPusherFactory如下:

    /**
     * Factory that creates {@link DocPusher} instances that feed
     * {@link FeedConnection}.
     */
    // TODO: Support multiple sinks where different connector instances
    // might feed different sinks.
    public class DocPusherFactory implements PusherFactory {
      /**
       * FeedConnection that is the sink for our generated XmlFeeds.
       */
      private final FeedConnection feedConnection;
    
      /**
       * Configured maximum document size and maximum feed file size supported.
       */
      private final FileSizeLimitInfo fileSizeLimit;
    
      /**
       * Creates a {@code DocPusherFactory} object from the specified
       * {@code feedConnection}.
       *
       * @param feedConnection a FeedConnection
       */
      public DocPusherFactory(FeedConnection feedConnection) {
        this(feedConnection, new FileSizeLimitInfo());
      }
    
      /**
       * Creates a {@code DocPusherFactory} object from the specified
       * {@code feedConnection}.  The supplied {@link FileSizeLimitInfo} specifies
       * constraints as to the size of a Document's content and the size of
       * generated Feed files.
       *
       * @param feedConnection a {@link FeedConnection} sink for documents.
       * @param fileSizeLimit {@link FileSizeLimitInfo} constraints on document
       *        content and feed size.
       */
      public DocPusherFactory(FeedConnection feedConnection,
                              FileSizeLimitInfo fileSizeLimit) {
        this.feedConnection = feedConnection;
        this.fileSizeLimit = fileSizeLimit;
      }
    
      //@Override
      public Pusher newPusher(String dataSource) {
        return new DocPusher(feedConnection, dataSource, fileSizeLimit);
      }
    }

     Pusher接口方法如下:

    /**
     * Interface for a Pusher - something that takes spi Documents
     * and sends them along on their way.
     */
    public interface Pusher {
    
      /**
       * Takes an spi Document and pushes it along, presumably to the GSA Feed.
       *
       * @param document A Document
       * @return true if Pusher may accept more documents, false otherwise.
       * @throws RepositoryException if transient error accessing the Repository
       * @throws RepositoryDocumentException if fatal error accessing the Document
       * @throws FeedException if a transient Feed error occurs in the Pusher
       * @throws PushException if a transient error occurs in the Pusher
       */
      public boolean take(Document document)
          throws PushException, FeedException, RepositoryException;
    
      /**
       * Finishes processing a document feed.  If the caller anticipates no
       * further calls to {@link #take(Document)} will be made,
       * this method should be called, so that the Pusher may send a cached,
       * accumulated Feed to the feed processor.
       *
       * @throws RepositoryException if transient error accessing the Repository
       * @throws RepositoryDocumentException if fatal error accessing the Document
       * @throws FeedException if a transient Feed error occurs in the Pusher
       * @throws PushException if a transient error occurs in the Pusher
       */
      public void flush()
          throws PushException, FeedException, RepositoryException;
    
      /**
       * Cancels a feed.  Discard any accumulated feed data.
       */
      public void cancel();
    }

    实现类DocPusher源码如下:

    /**
     * Class to generate xml feed for a document from the Document and send it
     * to GSA.
     */
    public class DocPusher implements Pusher {
      private static final Logger LOGGER =
          Logger.getLogger(DocPusher.class.getName());
    
      private static final byte[] SPACE_CHAR = { 0x20 };  // UTF-8 space
    
      /**
       * Separate Logger for Feed Logging.
       */
      private static final Logger FEED_WRAPPER_LOGGER =
          Logger.getLogger(LOGGER.getName() + ".FEED_WRAPPER");
      private static final Logger FEED_LOGGER =
          Logger.getLogger(FEED_WRAPPER_LOGGER.getName() + ".FEED");
      private static final Level FEED_LOG_LEVEL = Level.FINER;
    
      /**
       * Configured maximum document size and maximum feed file size supported.
       */
      private final FileSizeLimitInfo fileSizeLimit;
    
      /**
       * FeedConnection that is the sink for our generated XmlFeeds.
       */
      private final FeedConnection feedConnection;
    
      /**
       * Encoding method to use for Document content.
       */
      private String contentEncoding;
    
      /**
       * The Connector name that is the dataSource for this Feed.
       */
      private final String connectorName;
    
      /**
       * ExcecutorService that submits a Feed to the GSA in a separate thread.
       * This allows us to overlap I/O reading content from the Repository
       * in the traversal thread, and submitting content to the GSA in
       * a submitFeed thread.
       */
      private final ExecutorService feedSender;
    
      /**
       * This is the list of outstanding asynchronous feed submissions.
       */
      private final LinkedList<FutureTask<String>> submissions;
    
      /**
       * This is used to build up a multi-record feed.  Documents are added to the
       * feed until the size of the feed exceeds the FileSizeLimitInfo.maxFeedSize
       * or we are finished with the batch of documents. The feed is then
       * submitted to the feed connection.
       */
      private XmlFeed xmlFeed = null;
    
      /**
       * This field is used to construct a feed record in parallel to the main feed
       * InputStream construction.  It is only used if the feed logging level is set
       * to the appropriate level.  It only exists during the time the main feed is
       * being constructed.  Once sufficient information has been appended to this
       * buffer its contents will be logged and it will be nulled.
       */
      private StringBuilder feedLog = null;
    
      // For use by unit tests.
      private String gsaResponse;
    
      /**
       * Creates a {@code DocPusher} object from the specified
       * {@code feedConnection} and {@code connectorName}.  The supplied
       * {@link FileSizeLimitInfo} specifies constraints as to the size of a
       * Document's content and the size of generated Feed files.
       *
       * @param feedConnection a FeedConnection
       * @param connectorName The connector name that is the source of the feed
       * @param fileSizeLimitInfo FileSizeLimitInfo constraints on document content
       *        and feed size.
       */
      public DocPusher(FeedConnection feedConnection, String connectorName,
                       FileSizeLimitInfo fileSizeLimitInfo) {
        this.feedConnection = feedConnection;
        this.connectorName = connectorName;
        this.fileSizeLimit = fileSizeLimitInfo;
    
        // Check to see if the GSA supports compressed content feeds.
        String supportedEncodings =
            feedConnection.getContentEncodings().toLowerCase();
        this.contentEncoding =
            (supportedEncodings.indexOf(XmlFeed.XML_BASE64COMPRESSED) >= 0) ?
            XmlFeed.XML_BASE64COMPRESSED : XmlFeed.XML_BASE64BINARY;
    
        // Initialize background feed submission.
        this.submissions = new LinkedList<FutureTask<String>>();
        this.feedSender = Executors.newSingleThreadExecutor();
      }
    
      /**
       * Return the Feed Logger.
       */
      public static Logger getFeedLogger() {
        return FEED_WRAPPER_LOGGER;
      }
    
      /**
       * Gets the response from GSA when the feed is sent. For testing only.
       *
       * @return gsaResponse response from GSA.
       */
      protected String getGsaResponse() {
        return gsaResponse;
      }
    
      /**
       * Takes a Document and sends a the feed to the GSA.
       *
       * @param document Document corresponding to the document.
       * @return true if Pusher should accept more documents, false otherwise.
       * @throws PushException if Pusher problem
       * @throws FeedException if transient Feed problem
       * @throws RepositoryDocumentException if fatal Document problem
       * @throws RepositoryException if transient Repository problem
       */
      public boolean take(Document document)
          throws PushException, FeedException, RepositoryException {
        if (feedSender.isShutdown()) {
          throw new IllegalStateException("Pusher is shut down");
        }
        checkSubmissions();
    
        String feedType;
        try {
          feedType = DocUtils.getFeedType(document);
        } catch (RuntimeException e) {
          LOGGER.log(Level.WARNING,
              "Rethrowing RuntimeException as RepositoryDocumentException", e);
          throw new RepositoryDocumentException(e);
        }
    
        // All feeds in a feed file must be of the same type.
        // If the feed would change type, send the feed off to the GSA
        // and start a new one.
        if ((xmlFeed != null) && (feedType != xmlFeed.getFeedType())) {
          if (LOGGER.isLoggable(Level.FINE)) {
            LOGGER.fine("A new feedType, " + feedType + ", requires a new feed for "
                + connectorName + ". Closing feed and sending to GSA.");
          }
          submitFeed();
        }
    
        if (xmlFeed == null) {
          if (LOGGER.isLoggable(Level.FINE)) {
            LOGGER.fine("Creating new " + feedType + " feed for " + connectorName);
          }
          try {
            startNewFeed(feedType);
          } catch (OutOfMemoryError me) {
            throw new PushException("Unable to allocate feed buffer.  Try reducing"
                + " the maxFeedSize setting, reducing the number of connector"
                + " intances, or adjusting the JVM heap size parameters.", me);
          }
        }
    
        boolean isThrowing = false;
        int resetPoint = xmlFeed.size();
        InputStream contentStream = null;
        try {
          // Add this document to the feed.
          contentStream = getContentStream(document, feedType);
          xmlFeed.addRecord(document, contentStream, contentEncoding);
          if (LOGGER.isLoggable(Level.FINER)) {
            LOGGER.finer("Document "
                + DocUtils.getRequiredString(document, SpiConstants.PROPNAME_DOCID)
                + " from connector " + connectorName + " added to feed.");
          }
    
          // If the feed is full, send it off to the GSA.
          if (xmlFeed.isFull() || lowMemory()) {
            if (LOGGER.isLoggable(Level.FINE)) {
              LOGGER.fine("Feed for " + connectorName + " has grown to "
                  + xmlFeed.size() + " bytes. Closing feed and sending to GSA.");
            }
            submitFeed();
    
            // If we are running low on memory, don't start another feed -
            // tell the Traverser to finish this batch.
            if (lowMemory()) {
              return false;
            }
    
            // If the number of feeds waiting to be sent has backed up,
            // tell the Traverser to finish this batch.
            if ((checkSubmissions() > 10) || feedConnection.isBacklogged()) {
              return false;
            }
         }
    
          // Indicate that this Pusher may accept more documents.
          return true;
    
        } catch (OutOfMemoryError me) {
          xmlFeed.reset(resetPoint);
          throw new PushException("Out of memory building feed, retrying.", me);
        } catch (RuntimeException e) {
          xmlFeed.reset(resetPoint);
          LOGGER.log(Level.WARNING,
              "Rethrowing RuntimeException as RepositoryDocumentException", e);
          throw new RepositoryDocumentException(e);
        } catch (RepositoryDocumentException rde) {
          // Skipping this document, remove it from the feed.
          xmlFeed.reset(resetPoint);
          throw rde;
        } catch (IOException ioe) {
          LOGGER.log(Level.SEVERE, "IOException while reading: skipping", ioe);
          xmlFeed.reset(resetPoint);
          Throwable t = ioe.getCause();
          isThrowing = true;
          if (t != null && (t instanceof RepositoryException)) {
            throw (RepositoryException) t;
          } else {
            throw new RepositoryDocumentException("I/O error reading data", ioe);
          }
        } finally {
          if (contentStream != null) {
            try {
              contentStream.close();
            } catch (IOException e) {
              if (!isThrowing) {
                LOGGER.log(Level.WARNING,
                           "Rethrowing IOException as PushException", e);
                throw new PushException("IOException: " + e.getMessage(), e);
              }
            }
          }
        }
      }
    
      /**
       * Finish a feed.  No more documents are anticipated.
       * If there is an outstanding feed file, submit it to the GSA.
       *
       * @throws PushException if Pusher problem
       * @throws FeedException if transient Feed problem
       * @throws RepositoryException
       */
      public void flush() throws PushException, FeedException, RepositoryException {
        LOGGER.fine("Flushing accumulated feed to GSA");
        checkSubmissions();
        if (!feedSender.isShutdown()) {
          submitFeed();
          feedSender.shutdown();
        }
        while (!feedSender.isTerminated()) {
          try {
            feedSender.awaitTermination(10, TimeUnit.SECONDS);
          } catch (InterruptedException ie) {
            if (checkSubmissions() > 0) {
              throw new FeedException("Interrupted while waiting for feeds.");
            }
          }
        }
        checkSubmissions();
      }
    
      /**
       * Cancels any feed being constructed.  Any accumulated feed data is lost.
       */
      public void cancel() {
        // Discard any feed under construction.
        if (xmlFeed != null) {
          LOGGER.fine("Discarding accumulated feed for " + connectorName);
          xmlFeed = null;
        }
        if (feedLog != null) {
          feedLog = null;
        }
        // Cancel any feeds under asynchronous submission.
        feedSender.shutdownNow();
      }
    
      /**
       * Checks on asynchronously submitted feeds to see if they completed
       * or failed.  If any of the submissions failed, throw an Exception.
       *
       * @return number if items remaining in the submissions list
       */
      private int checkSubmissions()
          throws PushException, FeedException, RepositoryException {
        int count = 0;  // Count of outstanding items in the list.
        synchronized(submissions) {
          ListIterator<FutureTask<String>> iter = submissions.listIterator();
          while (iter.hasNext()) {
            FutureTask<String> future = iter.next();
            if (future.isDone()) {
              iter.remove();
              try {
                gsaResponse = future.get();
              } catch (InterruptedException ie) {
                // Shouldn't happen if isDone.
              } catch (ExecutionException ee) {
                Throwable cause = ee.getCause();
                if (cause == null) {
                  cause = ee;
                }
                if (cause instanceof PushException) {
                  throw (PushException) cause;
                } else if (cause instanceof FeedException) {
                  throw (FeedException) cause;
                } else if (cause instanceof RepositoryException) {
                  throw (RepositoryException) cause;
                } else {
                  throw new FeedException("Error submitting feed", cause);
                }
              }
            } else {
              count++;
            }
          }
        }
        return count;
      }
    
      /**
       * Checks for low available memory condition.
       *
       * @return true if free memory is running low.
       */
      private boolean lowMemory() {
        long threshold = ((fileSizeLimit.maxFeedSize() + fileSizeLimit.maxDocumentSize()) * 4) / 3;
        Runtime rt = Runtime.getRuntime();
        if ((rt.maxMemory() - (rt.totalMemory() - rt.freeMemory())) < threshold) {
          rt.gc();
          if ((rt.maxMemory() - (rt.totalMemory() - rt.freeMemory())) < threshold) {
            return true;
          }
        }
        return false;
      }
    
      /**
       * Allocates initial memory for a new XmlFeed and feed logger.
       *
       * @param feedType
       */
      private void startNewFeed(String feedType) throws PushException {
        // Allocate a buffer to construct the feed log.
        try {
          if (FEED_LOGGER.isLoggable(FEED_LOG_LEVEL) && feedLog == null) {
            feedLog = new StringBuilder(256 * 1024);
            feedLog.append("Records generated for ").append(feedType);
            feedLog.append(" feed of ").append(connectorName).append(":\n");
          }
        } catch (OutOfMemoryError me) {
          throw new OutOfMemoryError(
               "Unable to allocate feed log buffer for connector " + connectorName);
        }
    
        // Allocate XmlFeed of the target size.
        int feedSize = (int) fileSizeLimit.maxFeedSize();
        try {
          try {
            xmlFeed = new XmlFeed(connectorName, feedType, feedSize, feedLog);
          } catch (OutOfMemoryError me) {
            // We shouldn't even have gotten this far under a low memory condition.
            // However, try to allocate a tiny feed buffer.  It should fill up on
            // the first document, forcing it to be submitted.  DocPusher.take()
            // should then return a signal to the caller to terminate the batch.
            LOGGER.warning("Insufficient memory available to allocate an optimally"
                + " sized feed - retrying with a much smaller feed allocation.");
            feedSize = 1024;
            try {
              xmlFeed = new XmlFeed(connectorName, feedType, feedSize, feedLog);
            } catch (OutOfMemoryError oome) {
              throw new OutOfMemoryError(
                   "Unable to allocate feed buffer for connector " + connectorName);
            }
          }
        } catch (IOException ioe) {
          throw new PushException("Error creating feed", ioe);
        }
    
        LOGGER.fine("Allocated a new feed of size " + feedSize);
        return;
      }
    
      /**
       * Takes the accumulated XmlFeed and sends the feed to the GSA.
       *
       * @throws PushException if Pusher problem
       * @throws FeedException if transient Feed problem
       * @throws RepositoryException
       */
      private void submitFeed()
          throws PushException, FeedException, RepositoryException {
        if (xmlFeed == null) {
          return;
        }
    
        final XmlFeed feed = xmlFeed;
        xmlFeed = null;
        final String logMessage;
        if (feedLog != null) {
          logMessage = feedLog.toString();
          feedLog = null;
        } else {
          logMessage = null;
        }
    
        try {
          feed.close();
        } catch (IOException ioe) {
          throw new PushException("Error closing feed", ioe);
        }
    
        try {
          // Send the feed to the GSA in a separate thread.
          FutureTask<String> future = new FutureTask<String> (
              new Callable<String>() {
                public String call()
                    throws PushException, FeedException, RepositoryException {
                  try {
                    NDC.push("Feed " + feed.getDataSource());
                    return submitFeed(feed, logMessage);
                  } finally {
                    NDC.remove();
                  }
                }
              }
            );
          feedSender.execute(future);
          // Add the future to list of outstanding submissions.
          synchronized(submissions) {
            submissions.add(future);
          }
        } catch (RejectedExecutionException ree) {
          throw new FeedException("Asynchronous feed was rejected. ", ree);
        }
      }
    
      /**
       * Takes the supplied XmlFeed and sends that feed to the GSA.
       *
       * @param feed an XmlFeed
       * @param logMessage a Feed Log message
       * @return response String from GSA
       * @throws PushException if Pusher problem
       * @throws FeedException if transient Feed problem
       * @throws RepositoryException
       */
      private String submitFeed(XmlFeed feed, String logMessage)
          throws PushException, FeedException, RepositoryException {
    
        if (LOGGER.isLoggable(Level.FINE)) {
          LOGGER.fine("Submitting " + feed.getFeedType() + " feed for "
              + feed.getDataSource() + " to the GSA. " + feed.getRecordCount()
              + " records totaling " + feed.size() + " bytes.");
        }
    
        // Write the generated feedLog message to the feed logger.
        if (logMessage != null && FEED_LOGGER.isLoggable(FEED_LOG_LEVEL)) {
          FEED_LOGGER.log(FEED_LOG_LEVEL, logMessage);
        }
    
        // Write the Feed to the TeedFeedFile, if one was specified.
        String teedFeedFilename = Context.getInstance().getTeedFeedFile();
        if (teedFeedFilename != null) {
          boolean isThrowing = false;
          OutputStream os = null;
          try {
            os = new FileOutputStream(teedFeedFilename, true);
            feed.writeTo(os);
          } catch (IOException e) {
            isThrowing = true;
            throw new FeedException("Cannot write to file: " + teedFeedFilename, e);
          } finally {
            if (os != null) {
              try {
                os.close();
              } catch (IOException e) {
                if (!isThrowing) {
                  throw new FeedException(
                       "Cannot write to file: " + teedFeedFilename, e);
                }
              }
            }
          }
        }
    
        String gsaResponse = feedConnection.sendData(feed);
        if (!gsaResponse.equals(GsaFeedConnection.SUCCESS_RESPONSE)) {
          String eMessage = gsaResponse;
          if (GsaFeedConnection.UNAUTHORIZED_RESPONSE.equals(gsaResponse)) {
            eMessage += ": Client is not authorized to send feeds. Make "
                + "sure the GSA is configured to trust feeds from your host.";
          }
          if (GsaFeedConnection.INTERNAL_ERROR_RESPONSE.equals(gsaResponse)) {
            eMessage += ": Check GSA status or feed format.";
          }
          throw new PushException(eMessage);
        }
        return gsaResponse;
      }
    
      /**
       * Return an InputStream for the Document's content.
       */
      private InputStream getContentStream(Document document, String feedType)
          throws RepositoryException {
        InputStream contentStream = null;
        if (!feedType.equals(XmlFeed.XML_FEED_METADATA_AND_URL)) {
          InputStream encodedContentStream = getEncodedStream(
              new BigEmptyDocumentFilterInputStream(
                  DocUtils.getOptionalStream(document,
                  SpiConstants.PROPNAME_CONTENT), fileSizeLimit.maxDocumentSize()),
              (Context.getInstance().getTeedFeedFile() != null), 1024 * 1024);
    
          InputStream encodedAlternateStream = getEncodedStream(getAlternateContent(
              DocUtils.getOptionalString(document, SpiConstants.PROPNAME_TITLE)),
              false, 1024);
    
          contentStream = new AlternateContentFilterInputStream(
              encodedContentStream, encodedAlternateStream, xmlFeed);
        }
        return contentStream;
      }
    
      /**
       * Wrap the content stream with the suitable encoding (either
       * Base64 or Base64Compressed, based upon GSA encoding support.
       */
      // TODO: Don't compress tiny content or already compressed data
      // (based on mimetype).  This is harder than it sounds.
      private InputStream getEncodedStream(InputStream content, boolean wrapLines,
                                           int ioBufferSize) {
        if (XmlFeed.XML_BASE64COMPRESSED.equals(contentEncoding)) {
          return new Base64FilterInputStream(
              new CompressedFilterInputStream(content, ioBufferSize), wrapLines);
        } else {
          return new Base64FilterInputStream(content, wrapLines);
         }
      }
    
      /**
       * Construct the alternate content data for a feed item.  If the feed item
       * has null or empty content, or if the feed item has excessively large
       * content, substitute this data which will insure that the feed item gets
       * indexed by the GSA. The alternate content consists of the item's title,
       * or a single space, if it lacks a title.
       *
       * @param title from the feed item
       * @return an InputStream containing the alternate content
       */
      private static InputStream getAlternateContent(String title) {
        byte[] bytes = null;
        // Alternate content is a string that is substituted for null or empty
        // content streams, in order to make sure the GSA indexes the feed item.
        // If the feed item supplied a title property, we build an HTML fragment
        // containing that title.  This provides better looking search result
        // entries.
        if (title != null && title.trim().length() > 0) {
          try {
            String t = "<html><title>" + title.trim() + "</title></html>";
            bytes = t.getBytes("UTF-8");
          } catch (UnsupportedEncodingException uee) {
            // Don't be fancy.  Try the single space content.
          }
        }
        // If no title is available, we supply a single space as the content.
        if (bytes == null) {
          bytes = SPACE_CHAR;
        }
        return new ByteArrayInputStream(bytes);
      }
    
      /**
       * A FilterInput stream that protects against large documents and empty
       * documents.  If we have read more than FileSizeLimitInfo.maxDocumentSize
       * bytes from the input, we reset the feed to before we started reading
       * content, then provide the alternate content.  Similarly, if we get EOF
       * after reading zero bytes, we provide the alternate content.
       */
      private static class AlternateContentFilterInputStream
          extends FilterInputStream {
        private boolean useAlternate;
        private InputStream alternate;
        private final XmlFeed feed;
        private int resetPoint;
    
        /**
         * @param in InputStream containing raw document content
         * @param alternate InputStream containing alternate content to provide
         * @param feed XmlFeed under constructions (used for reseting size)
         */
        public AlternateContentFilterInputStream(InputStream in,
            InputStream alternate, XmlFeed feed) {
          super(in);
          this.useAlternate = false;
          this.alternate = alternate;
          this.feed = feed;
          this.resetPoint = -1;
        }
    
        // Reset the feed to its position when we started reading this stream,
        // and start reading from the alternate input.
        // TODO: WARNING: this will not work if using chunked HTTP transfer.
        private void switchToAlternate() {
          feed.reset(resetPoint);
          useAlternate = true;
        }
    
        @Override
        public int read() throws IOException {
          if (resetPoint == -1) {
            // If I have read nothing yet, remember the reset point in the feed.
            resetPoint = feed.size();
          }
          if (!useAlternate) {
            try {
              return super.read();
            } catch (EmptyDocumentException e) {
              switchToAlternate();
            } catch (BigDocumentException e) {
              LOGGER.finer("Document content exceeds the maximum configured "
                           + "document size, discarding content.");
              switchToAlternate();
            }
          }
          return alternate.read();
        }
    
        @Override
        public int read(byte b[], int off, int len) throws IOException {
          if (resetPoint == -1) {
            // If I have read nothing yet, remember the reset point in the feed.
            resetPoint = feed.size();
          }
          if (!useAlternate) {
            try {
              return super.read(b, off, len);
            } catch (EmptyDocumentException e) {
              switchToAlternate();
              return 0; // Return alternate content on subsequent call to read().
            } catch (BigDocumentException e) {
              LOGGER.finer("Document content exceeds the maximum configured "
                           + "document size, discarding content.");
              switchToAlternate();
              return 0; // Return alternate content on subsequent call to read().
            }
          }
          return alternate.read(b, off, len);
        }
    
        @Override
        public boolean markSupported() {
          return false;
        }
    
        @Override
        public void close() throws IOException {
          super.close();
          alternate.close();
        }
      }
    
      /**
       * A FilterInput stream that protects against large documents and empty
       * documents.  If we have read more than FileSizeLimitInfo.maxDocumentSize
       * bytes from the input, or if we get EOF after reading zero bytes,
       * we throw a subclass of IOException that is used as a signal for
       * AlternateContentFilterInputStream to switch to alternate content.
       */
      private static class BigEmptyDocumentFilterInputStream
          extends FilterInputStream {
        private final long maxDocumentSize;
        private long currentDocumentSize;
    
        /**
         * @param in InputStream containing raw document content
         * @param maxDocumentSize maximum allowed size in bytes of data read from in
         */
        public BigEmptyDocumentFilterInputStream(InputStream in,
                                                 long maxDocumentSize) {
          super(in);
          this.maxDocumentSize = maxDocumentSize;
          this.currentDocumentSize = 0;
        }
    
        @Override
        public int read() throws IOException {
          if (in == null) {
            throw new EmptyDocumentException();
          }
          int val = super.read();
          if (val == -1) {
            if (currentDocumentSize == 0) {
              throw new EmptyDocumentException();
            }
          } else if (++currentDocumentSize > maxDocumentSize) {
            throw new BigDocumentException();
          }
          return val;
        }
    
        @Override
        public int read(byte b[], int off, int len) throws IOException {
          if (in == null) {
            throw new EmptyDocumentException();
          }
          int bytesRead = super.read(b, off,
              (int) Math.min(len, maxDocumentSize - currentDocumentSize + 1));
          if (bytesRead == -1) {
            if (currentDocumentSize == 0) {
              throw new EmptyDocumentException();
            }
          } else if ((currentDocumentSize += bytesRead) > maxDocumentSize) {
            throw new BigDocumentException();
          }
          return bytesRead;
        }
    
        @Override
        public boolean markSupported() {
          return false;
        }
    
        @Override
        public void close() throws IOException {
          if (in != null) {
            super.close();
          }
        }
      }
    
      /**
       * Subclass of IOException that is thrown when maximumDocumentSize
       * is exceeded.
       */
      private static class BigDocumentException extends IOException {
        public BigDocumentException() {
          super("Maximum Document size exceeded.");
        }
      }
    
      /**
       * Subclass of IOException that is thrown when the document has
       * no content.
       */
      private static class EmptyDocumentException extends IOException {
        public EmptyDocumentException() {
          super("Document has no content.");
        }
      }
    }

    这个类的代码是有点长的,take方法首先是将Document转化为InputStream流然后封装到XmlFeed数据载体里面

     然后以线程池方式提交XmlFeed数据(调用feedConnection.sendData(feed)方法)

    我么可以发现提交xmlfeed数据前,如果我们在属性文件applicationContext.properties配置了teedFeedFile属性,连接器会同时将发送的xml数据写入该属性值的文件

    ---------------------------------------------------------------------------

    本系列企业搜索引擎开发之连接器connector系本人原创

    转载请注明出处 博客园 刺猬的温驯

    本文链接http://www.cnblogs.com/chenying99/archive/2013/03/19/2968423.html

  • 相关阅读:
    Elasticsearch集群优化实战
    Elasticsearch跨集群搜索(Cross Cluster Search)
    Elasticsearch集群运维
    Elasticsearch冷热集群搭建
    分布式系统事务一致性
    使用Vertx重构系统小结
    ThreadLocal原理分析与使用场景
    使用Vertx构建微服务
    使用 Python & Flask 实现 RESTful Web API
    Golang简单工厂方法demo
  • 原文地址:https://www.cnblogs.com/chenying99/p/2968423.html
Copyright © 2011-2022 走看看