zoukankan      html  css  js  c++  java
  • Heritrix 3.1.0 源码解析(三十四)

    本文主要分析FetchFTP处理器,该处理器用于ftp文件的下载,该处理器的实现是通过封装commons-net-2.0.jar组件来实现ftp文件下载

    在FetchFTP处理器里面定义了内部类SocketFactoryWithTimeout(SOCKET工厂),用于创建SOCKET

    /**
         * A {@link SocketFactory} much like {@link javax.net.DefaultSocketFactory},
         * except that the createSocket() methods that open connections support a
         * connect timeout.
         */
        public class SocketFactoryWithTimeout extends SocketFactory {
            protected int connectTimeoutMs = 0;
            
            public int getConnectTimeoutMs() {
                return connectTimeoutMs;
            }
    
            public void setConnectTimeoutMs(int connectTimeoutMs) {
                this.connectTimeoutMs = connectTimeoutMs;
            }
    
            public Socket createSocket() {
                return new Socket();
            }
    
            public Socket createSocket(String host, int port) throws IOException,
                    UnknownHostException {
                Socket sock = createSocket();
                sock.connect(new InetSocketAddress(host, port), connectTimeoutMs);
                return sock;
            }
    
            public Socket createSocket(InetAddress host, int port)
                    throws IOException {
                Socket sock = createSocket();
                sock.connect(new InetSocketAddress(host, port), connectTimeoutMs);
                return sock;
            }
    
            public Socket createSocket(String host, int port,
                    InetAddress localHost, int localPort) throws IOException,
                    UnknownHostException {
                Socket sock = createSocket();
                sock.bind(new InetSocketAddress(localHost, localPort));
                sock.connect(new InetSocketAddress(host, port), connectTimeoutMs);
                return sock;
            }
    
            public Socket createSocket(InetAddress address, int port,
                    InetAddress localAddress, int localPort) throws IOException {
                Socket sock = createSocket();
                sock.bind(new InetSocketAddress(localAddress, localPort));
                sock.connect(new InetSocketAddress(address, port), connectTimeoutMs);
                return sock;
            }         
            
        }

    在封装的ClientFTP对象的Socket openDataConnection(int command, String path)方法里面打开SOCKET

    public Socket openDataConnection(int command, String path)
        throws IOException {
            try {
                dataSocket = _openDataConnection_(command, path);
                if (dataSocket != null) {
                    recordAdditionalInfo("Opened data connection to "
                            + dataSocket.getInetAddress().getHostAddress() + ":"
                            + dataSocket.getPort());
                }
                return dataSocket;
            } catch (IOException e) {
                if (getPassiveHost() != null) {
                    recordAdditionalInfo("Failed to open data connection to "
                            + getPassiveHost() + ":" + getPassivePort() + ": "
                            + e.getMessage());
                } else {
                    recordAdditionalInfo("Failed to open data connection: "
                            + e.getMessage());
                }
                throw e;
            }
        }

    调用父类FTPClient的_openDataConnection_(command, path)方法,根据模式返回SOCKET对象

    接下来分析FetchFTP类的源码,该类拥有protected SocketFactoryWithTimeout socketFactory成员,用于设置ClientFTP对象的SOCKET工厂

    FetchFTP处理器的执行方法void innerProcess(CrawlURI curi)如下

    @Override
        protected void innerProcess(CrawlURI curi) throws InterruptedException {
            curi.setFetchBeginTime(System.currentTimeMillis());
           //封装的ClientFTP类
            ClientFTP client = new ClientFTP();
            Recorder recorder = curi.getRecorder();
            
            try {
                if (logger.isLoggable(Level.FINE)) {
                    logger.fine("attempting to fetch ftp uri: " + curi);
                }
               //ftp下载
                fetch(curi, client, recorder);
            } catch (IOException e) {
                if (logger.isLoggable(Level.INFO)) {
                    logger.info(curi + ": " + e);
                }
                curi.getNonFatalFailures().add(e);
                curi.setFetchStatus(FetchStatusCodes.S_CONNECT_FAILED);
            } finally {
                //退出登录、释放连接
                disconnect(client);
                curi.setFetchCompletedTime(System.currentTimeMillis());
                curi.getData().put(A_FTP_CONTROL_CONVERSATION, client.getControlConversation());
            }
        }

    继续调用void fetch(CrawlURI curi, ClientFTP client, Recorder recorder) 方法

    /**
         * Fetches a document from an FTP server.
         * 
         * @param curi      the URI of the document to fetch
         * @param client    the FTPClient to use for the fetch
         * @param recorder  the recorder to preserve the document in
         * @throws IOException  if a network or protocol error occurs
         * @throws InterruptedException  if the thread is interrupted
         */
        private void fetch(CrawlURI curi, ClientFTP client, Recorder recorder) 
        throws IOException, InterruptedException {
            // Connect to the FTP server.
            UURI uuri = curi.getUURI();
            int port = uuri.getPort();
            if (port == -1) {
                port = 21;
            }
            //创建SOCKET工厂
            if (socketFactory == null) {
                socketFactory = new SocketFactoryWithTimeout();
            }
            socketFactory.setConnectTimeoutMs(getSoTimeoutMs());
            //设置ClientFTP client对象的SOCKET工厂
            client.setSocketFactory(socketFactory);
            client.setConnectTimeout(getSoTimeoutMs());
            client.setDefaultTimeout(getSoTimeoutMs());
            client.setDataTimeout(getSoTimeoutMs());
            //连接FTP服务器
            client.connect(uuri.getHost(), port);
            
            client.setSoTimeout(getSoTimeoutMs());  // must be after connect()
            
            // Authenticate.
            String[] auth = getAuth(curi);
            //登录FTP服务器
            client.login(auth[0], auth[1]);
            
            // The given resource may or may not be a directory.
            // To figure out which is which, execute a CD command to
            // the UURI's path.  If CD works, it's a directory.
            //改变目录
            boolean isDirectory = client.changeWorkingDirectory(uuri.getPath());
    
            // Get a data socket.  This will either be the result of a NLST
            // command for a directory, or a RETR command for a file.
            int command;
            String path;
            if (isDirectory) {
                curi.getAnnotations().add("ftpDirectoryList");
                command = FTPCommand.NLST;
                client.setFileType(FTP.ASCII_FILE_TYPE);
                path = ".";
            } else { 
                command = FTPCommand.RETR;
                client.setFileType(FTP.BINARY_FILE_TYPE);
                path = uuri.getPath();
            }
            //设置模式
            client.enterLocalPassiveMode();
            Socket socket = null;
    
            try {
                //打开SOCKET
                socket = client.openDataConnection(command, path);
    
                // if "227 Entering Passive Mode" these will get reset later
                curi.setFetchStatus(client.getReplyCode());
                curi.getData().put(A_FTP_FETCH_STATUS, client.getReplyStrings()[0]);
            } catch (IOException e) {
                // try it again, see AbstractFrontier.needsRetrying()
                curi.setFetchStatus(FetchStatusCodes.S_CONNECT_LOST);
            }
    
            // Save the streams in the CURI, where downstream processors
            // expect to find them.
            if (socket != null) {
                if (socket.getSoTimeout() != getSoTimeoutMs()) {
                    logger.warning("data socket timeout " + socket.getSoTimeout() + "ms is not expected value " + getSoTimeoutMs() + "ms");
                }
                // Shall we get a digest on the content downloaded?
                boolean digestContent = getDigestContent();
                String algorithm = null; 
                if (digestContent) {
                    algorithm = getDigestAlgorithm();
                    recorder.getRecordedInput().setDigest(algorithm);
                    recorder.getRecordedInput().startDigest();
                } else {
                    // clear
                    recorder.getRecordedInput().setDigest((MessageDigest)null);
                }
                        
                try {
                    //获取SOCKET的InputStream和OutputStream
                    saveToRecorder(curi, socket, recorder);
                } finally {
                    recorder.close();
                    //关闭SOCKET
                    client.closeDataConnection(); // does socket.close()
                    curi.setContentSize(recorder.getRecordedInput().getSize());
    
                    // "226 Transfer complete."
                    client.getReply();
                    curi.setFetchStatus(client.getReplyCode());
                    curi.getData().put(A_FTP_FETCH_STATUS, client.getReplyStrings()[0]);
                    
                    if (isDirectory) {
                        curi.setContentType("text/plain");
                    } else {
                        curi.setContentType("application/octet-stream");
                    }
                    
                    if (logger.isLoggable(Level.FINE)) {
                        logger.fine("read " + recorder.getRecordedInput().getSize()
                                + " bytes from ftp data socket");
                    }
    
                    if (digestContent) {
                        curi.setContentDigest(algorithm,
                            recorder.getRecordedInput().getDigestValue());
                    }
                }
    
                if (isDirectory) {
                    //如果是目录,子目录或文件添加到CrawlURI curi对象的Collection<Link> outLinks = new HashSet<Link>()集合
                    extract(curi, recorder);
                }
            } else {
                // no data - without this, content size is -1
                curi.setContentSize(0);
            }
    
            addParent(curi);
        }

    void saveToRecorder(CrawlURI curi,Socket socket, Recorder recorder)方法将SOCKET的InputStream和OutputStream封装到Recorder recorder对象(CrawlURI curi对象属性)

    private void saveToRecorder(CrawlURI curi,
                Socket socket, Recorder recorder) 
        throws IOException, InterruptedException {
            recorder.inputWrap(socket.getInputStream());
            recorder.outputWrap(socket.getOutputStream());
            recorder.markContentBegin();
    
            // Read the remote file/dir listing in its entirety.
            long softMax = 0;
            long hardMax = getMaxLengthBytes();
            long timeout = (long)getTimeoutSeconds() * 1000L;
            int maxRate = getMaxFetchKBSec();
            RecordingInputStream input = recorder.getRecordedInput();
            input.setLimits(hardMax, timeout, maxRate); 
            input.readFullyOrUntil(softMax);
        }

    void disconnect(ClientFTP client)方法退出登录并且释放连接

    /**
         * Quietly disconnects from the given FTP client.
         * If an IOException is raised, this method logs it as a warning.
         * 
         * @param client  the client to disconnect
         */
        private static void disconnect(ClientFTP client) {
            if (client.isConnected()) try {
                client.logout();
            } catch (IOException e) {
            }
    
            if (client.isConnected()) try {
                client.disconnect();
            } catch (IOException e) {
                logger.warning("Could not disconnect from FTP client: " + e);
            }
        } 

    ---------------------------------------------------------------------------

    本系列Heritrix 3.1.0 源码解析系本人原创

    转载请注明出处 博客园 刺猬的温驯

    本文链接 http://www.cnblogs.com/chenying99/archive/2013/05/05/3060825.html

  • 相关阅读:
    ant build打包
    在JAVA中如何获取当前源文件名以及代码的行号
    react以组件为中心的代码分割和懒加载
    java中针对 try和finally一些总结
    JS强制关闭浏览器页签并且不提示关闭信息
    由[].slice.call()引发的思考
    JS类型判断
    nginx的location配置
    DBCP连接池
    java/Servlet
  • 原文地址:https://www.cnblogs.com/chenying99/p/3060825.html
Copyright © 2011-2022 走看看