zoukankan      html  css  js  c++  java
  • Heritrix 3.1.0 源码解析(三十三)

    本文要分析的是FetchHTTP处理器,该处理器实现CrawlURI curi对象URL的请求(OutputStream写入数据)并且返回InputStream

    FetchHTTP处理器有很多成员变量,均为设置远程请求的相关参数,本人只解释其中重要成员变量

    /**
         * Whether or not to perform an on-the-fly digest hash of retrieved
         * content-bodies.
         */
        {
            setDigestContent(true);
        }
        public boolean getDigestContent() {
            return (Boolean) kp.get("digestContent");
        }
        /**
         * 是否设置摘要内容
         * @param digest
         */
        public void setDigestContent(boolean digest) {
            kp.put("digestContent",digest);
        }
     
        /**
         * 摘要算法
         * Which algorithm (for example MD5 or SHA-1) to use to perform an
         * on-the-fly digest hash of retrieved content-bodies.
         */
        String digestAlgorithm = "sha1"; 
        public String getDigestAlgorithm() {
            return digestAlgorithm;
        }
        public void setDigestAlgorithm(String digestAlgorithm) {
            this.digestAlgorithm = digestAlgorithm;
        }
    //请求执行对象
        private transient HttpClient httpclient = null;
    CookieStorage cookieStorage = new BdbCookieStorage();
        /**
         * cookies存储对象
         * @param storage
         */
        @Autowired(required=false)
        public void setCookieStorage(CookieStorage storage) {
            this.cookieStorage = storage; 
        }
        public CookieStorage getCookieStorage() {
            return this.cookieStorage;
        }
    /**
         * Used to store credentials.
         */
        {
            // initialize with empty store so declaration not required
            setCredentialStore(new CredentialStore());
        }
        public CredentialStore getCredentialStore() {
            return (CredentialStore) kp.get("credentialStore");
        }
        /**
         * 证书存储对象
         * @param credentials
         */
        @Autowired(required=false)
        public void setCredentialStore(CredentialStore credentials) {
            kp.put("credentialStore",credentials);
        }
        
        /**
         * Used to do DNS lookups.
         */
        protected ServerCache serverCache;
        public ServerCache getServerCache() {
            return this.serverCache;
        }
        /**
         * 服务器缓存
         * @param serverCache
         */
        @Autowired
        public void setServerCache(ServerCache serverCache) {
            this.serverCache = serverCache;
        }

    静态代码块实现注册http和https协议(创建SOCKET工厂,重用IP缓存)

    /**
         * 注册http和https协议
         */
        static {
            Protocol.registerProtocol("http", new Protocol("http",
                    new HeritrixProtocolSocketFactory(), 80));
            try {
                ProtocolSocketFactory psf = new HeritrixSSLProtocolSocketFactory();
                Protocol p = new Protocol("https", psf, 443); 
                Protocol.registerProtocol("https", p);
            } catch (KeyManagementException e) {
                e.printStackTrace();
            } catch (KeyStoreException e) {
                e.printStackTrace();
            } catch (NoSuchAlgorithmException e) {
                e.printStackTrace();
            }
        }

    void start()初始化方法实例化HttpClient httpclient对象,设置Cookies,设置SSL工厂

    public void start() {
            if(isRunning()) {
                return; 
            }
            super.start();
            
            configureHttp();
    
            if (cookieStorage != null) {     
                cookieStorage.start(); 
                httpclient.getState().setCookiesMap(cookieStorage.getCookiesMap());
            }
    
            setSSLFactory();
        }

    void configureHttp(int soTimeout, String addressStr,String proxy, int port, String user, String password)方法初始化httpclient对象

    /**
         * 初始化httpclient对象
         */
        protected void configureHttp(int soTimeout, String addressStr,
                                     String proxy, int port, String user, String password) {
            // Get timeout. Use it for socket and for connection timeout.
            int timeout = (soTimeout > 0) ? soTimeout : 0;
    
            // HttpConnectionManager cm = new ThreadLocalHttpConnectionManager();
            HttpConnectionManager cm = new SingleHttpConnectionManager();
    
            // TODO: The following settings should be made in the corresponding
            // HttpConnectionManager, not here.
            HttpConnectionManagerParams hcmp = cm.getParams();
            hcmp.setConnectionTimeout(timeout);
            hcmp.setStaleCheckingEnabled(true);
            // Minimizes bandwidth usage. Setting to true disables Nagle's
            // algorithm. IBM JVMs < 142 give an NPE setting this boolean
            // on ssl sockets.
            hcmp.setTcpNoDelay(false);
    
            this.httpclient = new HttpClient(cm);
            HttpClientParams hcp = this.httpclient.getParams();
            // Set default socket timeout.
            hcp.setSoTimeout(timeout);
            // Set client to be version 1.0.
            hcp.setVersion(HttpVersion.HTTP_1_0);
    
            // configureHttpCookies(defaults);
    
            // Configure how we want the method to act.
            this.httpclient.getParams().setParameter(
                    HttpMethodParams.SINGLE_COOKIE_HEADER, new Boolean(true));
            this.httpclient.getParams().setParameter(
                    HttpMethodParams.UNAMBIGUOUS_STATUS_LINE, new Boolean(false));
            this.httpclient.getParams().setParameter(
                    HttpMethodParams.STRICT_TRANSFER_ENCODING, new Boolean(false));
            this.httpclient.getParams().setIntParameter(
                    HttpMethodParams.STATUS_LINE_GARBAGE_LIMIT, 10);
    
            if ((proxy != null) && (proxy.length() == 0)) {
                proxy = null;
            }
            HostConfiguration config = httpclient.getHostConfiguration();
            //配置代理
            configureProxy(proxy, port, user, password, config);
            //绑定地址
            configureBindAddress(addressStr,config);
    
            hcmp.setParameter(SSL_FACTORY_KEY, this.sslfactory);
        }

    void configureProxy(String proxy, int port, String user, String password,HostConfiguration config)方法配置代理 

    /**
         * 配置代理
         * @param proxy
         * @param port
         * @param user
         * @param password
         * @param config
         */
        private void configureProxy(String proxy, int port, String user, String password,
                                       HostConfiguration config) {
            if(StringUtils.isNotEmpty(proxy)) {
                config.setProxy(proxy, port);
                if (StringUtils.isNotEmpty(user)) {
                    Credentials credentials = new NTCredentials(user, password, "", "");
                    AuthScope authScope = new AuthScope(proxy, port);
                    this.httpclient.getState().setProxyCredentials(authScope, credentials);
                }
            }
        }

     void configureBindAddress(String address, HostConfiguration config)绑定地址 

    /**
         * 绑定地址
         * @param address
         * @param config
         */
        private void configureBindAddress(String address, HostConfiguration config) {
            if (StringUtils.isNotEmpty(address)) {
                try {
                    InetAddress localAddress = InetAddress.getByName(address);
                    config.setLocalAddress(localAddress);
                } catch (UnknownHostException e) {
                    // Convert all to RuntimeException so get an exception out
                    // if initialization fails.
                    throw new RuntimeException("Unknown host " + address
                            + " in local-address");
                }
            }
        }

    void setSSLFactory() 方法初始化SSLSocketFactory sslfactory工厂

    private void setSSLFactory() {
            // I tried to get the default KeyManagers but doesn't work unless you
            // point at a physical keystore. Passing null seems to do the right
            // thing so we'll go w/ that.
            try {
                SSLContext context = SSLContext.getInstance("SSL");
                context.init(null,
                        new TrustManager[] { new ConfigurableX509TrustManager(
                                getSslTrustLevel()) }, null);
                this.sslfactory = context.getSocketFactory();
            } catch (Exception e) {
                logger.log(Level.WARNING, "Failed configure of ssl context "
                        + e.getMessage(), e);
            }
            
        }

    下面方法为FetchHTTP处理器生命周期相关方法,持久化Cookies并且销毁HttpClient httpclient对象

    public void stop() {
            if(!isRunning()) {
                return; 
            }
            super.stop();
            // At the end save cookies to the file specified in the order file.
            if (cookieStorage != null) {
                @SuppressWarnings("unchecked")
                Map<String, Cookie> map = httpclient.getState().getCookiesMap();
                cookieStorage.saveCookiesMap(map);
                cookieStorage.stop();
            }
            cleanupHttp(); // XXX happens at finish; move to teardown?
        }
    
        /**
         * Perform any final cleanup related to the HttpClient instance.
         */
        protected void cleanupHttp() {
            this.httpclient = null; 
        }

     下面我们来看一下FetchHttp处理器的void innerProcess(final CrawlURI curi)方法

    protected void innerProcess(final CrawlURI curi)
                throws InterruptedException {
            // Note begin time
            curi.setFetchBeginTime(System.currentTimeMillis());// Get a reference to the HttpRecorder that is set into this ToeThread.
            Recorder rec = curi.getRecorder();
    
            // Shall we get a digest on the content downloaded?
            boolean digestContent = getDigestContent();
            String algorithm = null;
            if (digestContent) {
                algorithm = getDigestAlgorithm();
                //设置摘要算法
                rec.getRecordedInput().setDigest(algorithm);
            } else {
                // clear
                rec.getRecordedInput().setDigest((MessageDigest)null);
            }
    
            // Below we do two inner classes that add check of midfetch
            // filters just as we're about to receive the response body.
            String curiString = curi.getUURI().toString();
            HttpMethodBase method = null;
            if (curi.getFetchType() == HTTP_POST) {
                method = new HttpRecorderPostMethod(curiString, rec) {
                    protected void readResponseBody(HttpState state,
                            HttpConnection conn) throws IOException, HttpException {
                        addResponseContent(this, curi);
                        if (checkMidfetchAbort(curi, this.httpRecorderMethod, conn)) {
                            doAbort(curi, this, MIDFETCH_ABORT_LOG);
                        } else {
                            super.readResponseBody(state, conn);
                        }
                    }
                };
            } else {
                method = new HttpRecorderGetMethod(curiString, rec) {
                    protected void readResponseBody(HttpState state,
                            HttpConnection conn) throws IOException, HttpException {
                        //StatusCode和ContentType设置到CrawlURI curi对象
                        //this为HttpRecorderGetMethod对象
                        addResponseContent(this, curi);
                        if (checkMidfetchAbort(curi, this.httpRecorderMethod, conn)) {
                            doAbort(curi, this, MIDFETCH_ABORT_LOG);
                        } else {
                            super.readResponseBody(state, conn);
                        }
                    }
                };
            }
            
            // Save method into curi too. Midfetch filters may want to leverage
            // info in here.
            curi.setHttpMethod(method);
            //配置HttpMethod设置选项和头部
            HostConfiguration customConfigOrNull = configureMethod(curi, method);
            //添加证书  如Form提交参数等
            // Populate credentials. Set config so auth. is not automatic.
            //添加证书对象
            boolean addedCredentials = populateCredentials(curi, method);
            if (httpclient.getState().getProxyCredentials(new AuthScope(getProxyHost(), getProxyPort())) != null) {
                addedCredentials = true;
            }
            method.setDoAuthentication(addedCredentials);
    
            // set hardMax on bytes (if set by operator)
            long hardMax = getMaxLengthBytes();
            // set overall timeout (if set by operator)
            long timeoutMs = 1000 * getTimeoutSeconds();
            // Get max fetch rate (bytes/ms). It comes in in KB/sec
            long maxRateKBps = getMaxFetchKBSec();
            rec.getRecordedInput().setLimits(hardMax, timeoutMs, maxRateKBps);
    
            try {
                //提交http请求
                this.httpclient.executeMethod(customConfigOrNull, method);
            } catch (RecorderTooMuchHeaderException ex) {
                // when too much header material, abort like other truncations
                doAbort(curi, method, HEADER_TRUNC);
            } catch (IOException e) {
                failedExecuteCleanup(method, curi, e);
                return;
            } catch (ArrayIndexOutOfBoundsException e) {
                // For weird windows-only ArrayIndex exceptions in native
                // code... see
                // http://forum.java.sun.com/thread.jsp?forum=11&thread=378356
                // treating as if it were an IOException
                failedExecuteCleanup(method, curi, e);
                return;
            }
    
            // set softMax on bytes to get (if implied by content-length)
            long softMax = method.getResponseContentLength();
    
            try {
                if (!method.isAborted()) {
                    // Force read-to-end, so that any socket hangs occur here,
                    // not in later modules.
                    rec.getRecordedInput().readFullyOrUntil(softMax);
                }
            } catch (RecorderTimeoutException ex) {
                doAbort(curi, method, TIMER_TRUNC);
            } catch (RecorderLengthExceededException ex) {
                doAbort(curi, method, LENGTH_TRUNC);
            } catch (IOException e) {
                cleanup(curi, e, "readFully", S_CONNECT_LOST);
                return;
            } catch (ArrayIndexOutOfBoundsException e) {
                // For weird windows-only ArrayIndex exceptions from native code
                // see http://forum.java.sun.com/thread.jsp?forum=11&thread=378356
                // treating as if it were an IOException
                cleanup(curi, e, "readFully", S_CONNECT_LOST);
                return;
            } finally {
                // ensure recording has stopped
                rec.closeRecorders();
                if (!method.isAborted()) {
                    method.releaseConnection();
                }
                // Note completion time
                curi.setFetchCompletedTime(System.currentTimeMillis());
                // Set the response charset into the HttpRecord if available.
                setCharacterEncoding(curi, rec, method);
                setSizes(curi, rec);
                setOtherCodings(curi, rec, method); 
            }
    
            if (digestContent) {
                //设置摘要
                curi.setContentDigest(algorithm, 
                    rec.getRecordedInput().getDigestValue());
            }
            if (logger.isLoggable(Level.FINE)) {
                logger.fine(((curi.getFetchType() == HTTP_POST) ? "POST" : "GET")
                        + " " + curi.getUURI().toString() + " "
                        + method.getStatusCode() + " "
                        + rec.getRecordedInput().getSize() + " "
                        + curi.getContentType());
            }
    
            if (isSuccess(curi) && addedCredentials) {
                // Promote the credentials from the CrawlURI to the CrawlServer
                // so they are available for all subsequent CrawlURIs on this
                // server.
                //更新服务器证书
                promoteCredentials(curi);
                if (logger.isLoggable(Level.FINE)) {
                    // Print out the cookie. Might help with the debugging.
                    Header setCookie = method.getResponseHeader("set-cookie");
                    if (setCookie != null) {
                        logger.fine(setCookie.toString().trim());
                    }
                }
            } else if (method.getStatusCode() == HttpStatus.SC_UNAUTHORIZED) {
                // 401 is not 'success'.
                //basic/digest认证
                handle401(method, curi);
            }
    
            if (rec.getRecordedInput().isOpen()) {
                logger.severe(curi.toString() + " RIS still open. Should have"
                        + " been closed by method release: "
                        + Thread.currentThread().getName());
                try {
                    rec.getRecordedInput().close();
                } catch (IOException e) {
                    logger.log(Level.SEVERE, "second-chance RIS close failed", e);
                }
            }
        }

    其中boolean populateCredentials(CrawlURI curi, HttpMethod method)方法为添加证书对象

    /**
         * 添加证书对象
         * @param curi
         * @param method
         * @return
         */
        private boolean populateCredentials(CrawlURI curi, HttpMethod method) {
            // First look at the server avatars. Add any that are to be volunteered
            // on every request (e.g. RFC2617 credentials). Every time creds will
            // return true when we call 'isEveryTime().
            String serverKey;
            try {
                serverKey = CrawlServer.getServerKey(curi.getUURI());
            } catch (URIException e) {
                return false;
            }
            CrawlServer server = serverCache.getServerFor(serverKey);
            if (server.hasCredentials()) {
                for (Credential cred : server.getCredentials()) {
                    if (cred.isEveryTime()) {
                        //每次都要添加证书对象
                        cred.populate(curi, this.httpclient, method);
                    }
                }
            }
    
            boolean result = false;
    
            // Now look in the curi. The Curi will have credentials loaded either
            // by the handle401 method if its a rfc2617 or it'll have been set into
            // the curi by the preconditionenforcer as this login uri came through.
            //在PreconditionEnforcer处理器设置过CrawlURI curi对象的Set<Credential>集合
            for (Credential c: curi.getCredentials()) {
                if (c.populate(curi, this.httpclient, method)) {
                    result = true;
                }
            }
    
            return result;
        }

    void promoteCredentials(final CrawlURI curi)方法为更新服务器证书

    /**
         * 更新服务器证书
         * @param curi
         */
        private void promoteCredentials(final CrawlURI curi) {
            Set<Credential> credentials = curi.getCredentials();
            for (Iterator<Credential> i = credentials.iterator(); i.hasNext();) {
                Credential c = i.next();
                i.remove();
                // The server to attach too may not be the server that hosts
                // this passed curi. It might be of another subdomain.
                // The avatar needs to be added to the server that is dependent
                // on this precondition. Find it by name. Get the name from
                // the credential this avatar represents.
                String cd = c.getDomain();
                if (cd != null) {
                    CrawlServer cs = serverCache.getServerFor(cd);
                    if (cs != null) {
                        cs.addCredential(c);
                    }
                }
            }
        }

    void handle401(final HttpMethod method, final CrawlURI curi)方法为处理basic/digest认证

    protected void handle401(final HttpMethod method, final CrawlURI curi) {
            AuthScheme authscheme = getAuthScheme(method, curi);
            if (authscheme == null) {
                return;
            }
            String realm = authscheme.getRealm();
    
            /*
             * ======================================================= // Look to
             * see if this curi had rfc2617 avatars loaded. If so, are // any of
             * them for this realm? If so, then the credential failed // if we got a
             * 401 and it should be let die a natural 401 death. if
             * (curi.detachRfc2617Credential(realm)) { // Then, already tried this
             * credential. Remove ANY rfc2617 // credential since presence of a
             * rfc2617 credential serves // as flag to frontier to requeue this curi
             * and let the curi // die a natural death. logger.warning("Auth failed
             * (401) though supplied realm " + realm + " to " + curi.toString());
             * return; } curi.attachRfc2617Credential(realm);
             * =============================================================
             */
    
            // Look to see if this curi had rfc2617 avatars loaded. If so, are
            // any of them for this realm? If so, then the credential failed
            // if we got a 401 and it should be let die a natural 401 death.
            Set<Credential> curiRfc2617Credentials = getCredentials(curi,
                    HttpAuthenticationCredential.class);
            HttpAuthenticationCredential extant = HttpAuthenticationCredential.getByRealm(
                    curiRfc2617Credentials, realm, curi);
            if (extant != null) {
                // Then, already tried this credential. Remove ANY rfc2617
                // credential since presence of a rfc2617 credential serves
                // as flag to frontier to requeue this curi and let the curi
                // die a natural death.
                extant.detachAll(curi);
                logger.warning("Auth failed (401) though supplied realm " + realm
                        + " to " + curi.toString());
            } else {
                // Look see if we have a credential that corresponds to this
                // realm in credential store. Filter by type and credential
                // domain. If not, let this curi die. Else, add it to the
                // curi and let it come around again. Add in the AuthScheme
                // we got too. Its needed when we go to run the Auth on
                // second time around.
                String serverKey = getServerKey(curi);
                CrawlServer server = serverCache.getServerFor(serverKey);
                Set<Credential> storeRfc2617Credentials = getCredentialStore().subset(curi,
                        HttpAuthenticationCredential.class, server.getName());
                if (storeRfc2617Credentials == null
                        || storeRfc2617Credentials.size() <= 0) {
                    logger.fine("No rfc2617 credentials for " + curi);
                } else {
                    HttpAuthenticationCredential found = HttpAuthenticationCredential.getByRealm(
                            storeRfc2617Credentials, realm, curi);
                    if (found == null) {
                        logger.fine("No rfc2617 credentials for realm " + realm
                                + " in " + curi);
                    } else {
                        found.attach(curi);
                        logger.fine("Found credential for realm " + realm
                                + " in store for " + curi.toString());
                    }
                }
            }
        }

    设置CrawlURI curi对象的Recorder httpRecorder成员编码方法

    private void setCharacterEncoding(CrawlURI curi, final Recorder rec,
                final HttpMethod method) {
            String encoding = ((HttpMethodBase) method).getResponseCharSet();
            try {
                rec.setCharset(Charset.forName(encoding));
            } catch (IllegalArgumentException e) {
                curi.getAnnotations().add("unsatisfiableCharsetInHeader:"+StringUtils.stripToEmpty(encoding));
                rec.setCharset(getDefaultCharset());
            }
        }

    这种方式获取到的网页编码不一定完全准确,在必要的时候需要我们改写

    ---------------------------------------------------------------------------

    本系列Heritrix 3.1.0 源码解析系本人原创

    转载请注明出处 博客园 刺猬的温驯

    本文链接 http://www.cnblogs.com/chenying99/archive/2013/05/01/3052930.html

  • 相关阅读:
    服务器最大TCP连接数及调优汇总
    提升linux下TCP服务器并发连接数(limit)
    Linux systemd limits
    CENTOS/RHEL 7 系统中设置SYSTEMD SERVICE的ULIMIT资源限制
    查看CPU/CACHE的拓扑结构
    nmon 加权平均法
    关于游戏的开发流程
    关于Unity中的旋涡特效的制作(捕鱼达人3技术)(专题八)
    关于Unity中鼠标选取物体的解决方案
    关于Unity中的声音管理模块(专题七)
  • 原文地址:https://www.cnblogs.com/chenying99/p/3052930.html
Copyright © 2011-2022 走看看