zoukankan      html  css  js  c++  java
  • Heritrix 3.1.0 源码解析(三十五)

    本文接下来先分析CandidatesProcessor处理器,我们称之为候选处理器,该处理器的功能是对网页抽取的外链进行过滤,通过过滤的链接则添加到Frontier边界部件的BdbWorkQueue工作队列;CandidatesProcessor处理器对CrawlURI candidate对象的过滤功能是通过CandidateChain处理器链处理的,该处理器链包括两大处理器,分别为org.archive.crawler.prefetch.CandidateScoper处理器与org.archive.crawler.prefetch.FrontierPreparer处理器

    CandidatesProcessor处理器的处理方法如下:

     /* (non-Javadoc)
         * @see org.archive.modules.Processor#innerProcess(org.archive.modules.CrawlURI)
         */
        @Override
        protected void innerProcess(final CrawlURI curi) throws InterruptedException {
            // Handle any prerequisites when S_DEFERRED for prereqs
            //处理先决条件
            if (curi.hasPrerequisiteUri() && curi.getFetchStatus() == S_DEFERRED) {
                CrawlURI prereq = curi.getPrerequisiteUri();
                prereq.setFullVia(curi); 
                sheetOverlaysManager.applyOverlaysTo(prereq);
                try {
                    KeyedProperties.clearOverridesFrom(curi); 
                    KeyedProperties.loadOverridesFrom(prereq);
                    //Candidate处理器链
                    getCandidateChain().process(prereq, null);
                    
                    if(prereq.getFetchStatus()>=0) {
                        //System.out.println("prereq:"+prereq.toString());
                        frontier.schedule(prereq);
                    } else {
                        curi.setFetchStatus(S_PREREQUISITE_UNSCHEDULABLE_FAILURE);
                    }
                } finally {
                    KeyedProperties.clearOverridesFrom(prereq); 
                    KeyedProperties.loadOverridesFrom(curi);
                }
                return;
            }
    
            // Don't consider candidate links of error pages
            //通常为先决条件
            if (curi.getFetchStatus() < 200 || curi.getFetchStatus() >= 400) {
                curi.getOutLinks().clear();
                return;
            }
            //遍历外链
            for (Link wref: curi.getOutLinks()) {
                
                CrawlURI candidate;
                try {
                    //根据外链构造CrawlURI对象
                    candidate = curi.createCrawlURI(curi.getBaseURI(),wref);
                    // at least for duration of candidatechain, offer
                    // access to full CrawlURI of via
                    candidate.setFullVia(curi); 
                } catch (URIException e) {
                    loggerModule.logUriError(e, curi.getUURI(), 
                            wref.getDestination().toString());
                    continue;
                }
    
                sheetOverlaysManager.applyOverlaysTo(candidate);
                try {
                    KeyedProperties.clearOverridesFrom(curi); 
                    KeyedProperties.loadOverridesFrom(candidate);
                    //从种子CrawlURI curi跳转过来的CrawlURI candidate设置为种子
                    if(getSeedsRedirectNewSeeds() && curi.isSeed() 
                            && wref.getHopType() == Hop.REFER
                            && candidate.getHopCount() < SEEDS_REDIRECT_NEW_SEEDS_MAX_HOPS) {
                        candidate.setSeed(true);                     
                    }
                    getCandidateChain().process(candidate, null); 
                    if(candidate.getFetchStatus()>=0) {
                        //seed
                        if(checkForSeedPromotion(candidate)) {
                            /*
                             * We want to guarantee crawling of seed version of
                             * CrawlURI even if same url has already been enqueued,
                             * see https://webarchive.jira.com/browse/HER-1891
                             */
                            candidate.setForceFetch(true);
                            //System.out.println("candidate addSeed:"+candidate.toString());
                            getSeeds().addSeed(candidate);
                        } else {
                            //System.out.println("candidate:"+candidate.toString());
                            frontier.schedule(candidate);
                        }
                        //候选链接记录
                        curi.getOutCandidates().add(candidate);
                    }
                    
                } finally {
                    KeyedProperties.clearOverridesFrom(candidate); 
                    KeyedProperties.loadOverridesFrom(curi);
                }
            }
            curi.getOutLinks().clear();
        }

    我在代码里面已经加了注释,该方法首先是判断当前CrawlURI curi对象是否存在先决条件,如果存在,则将先决条件进入CandidateChain处理器链,如果符合条件(prereq.getFetchStatus()>=0),则将该先决条件添加到Frontier边界部件的BdbWorkQueue工作队列

    后面部分是遍历CrawlURI curi对象的外链,根据当前CrawlURI curi对象和外链链接构建CrawlURI candidate对象,然后同样进入CandidateChain处理器链,通过过滤的CrawlURI candidate对象同样添加到Frontier边界部件的BdbWorkQueue工作队列,最后清空外链

    CrawlURI candidate对象是怎么创建的,调用当前CrawlURI curi对象的CrawlURI createCrawlURI(UURI baseUURI, Link link)方法

     /**
         * Utility method for creation of CandidateURIs found extracting
         * links from this CrawlURI.
         * @param baseUURI BaseUURI for <code>link</code>.
         * @param link Link to wrap CandidateURI in.
         * @return New candidateURI wrapper around <code>link</code>.
         * @throws URIException
         */
        public CrawlURI createCrawlURI(UURI baseUURI, Link link)
        throws URIException {
            UURI u = (link.getDestination() instanceof UURI)?
                (UURI)link.getDestination():
                UURIFactory.getInstance(baseUURI,
                    link.getDestination().toString());
            CrawlURI newCaURI = new CrawlURI(u, 
                    extendHopsPath(getPathFromSeed(),link.getHopType().getHopChar()),
                    getUURI(), link.getContext());
            newCaURI.inheritFrom(this);
            return newCaURI;
        }

    新的CrawlURI candidate对象的String pathFromSeed属性是由其父级CrawlURI curi对象的String pathFromSeed属性和当前链接的Hop hop属性构建出来的

    /**
         * Extend a 'hopsPath' (pathFromSeed string of single-character hop-type symbols),
         * keeping the number of displayed hop-types under MAX_HOPS_DISPLAYED. For longer
         * hops paths, precede the string with a integer and '+', then the displayed 
         * hops. 
         * 
         * @param pathFromSeed
         * @param hopChar
         * @return
         */
        public static String extendHopsPath(String pathFromSeed, char hopChar) {
            if(pathFromSeed.length()<MAX_HOPS_DISPLAYED) {
                return pathFromSeed + hopChar;
            }
            int plusIndex = pathFromSeed.indexOf('+');
            int prevOverflow = (plusIndex<0) ? 0 : Integer.parseInt(pathFromSeed.substring(0,plusIndex));
            return (prevOverflow+1)+"+"+pathFromSeed.substring(plusIndex+2)+hopChar; 
        }

    里面的boolean checkForSeedPromotion(CrawlURI curi)方法检查CrawlURI curi对象是否seed种子(从种子URL跳转的)

    /**
         * Check if the URI needs special 'discovered seed' treatment.
         * 
         * @param curi
         */
        protected boolean checkForSeedPromotion(CrawlURI curi) {
            if (curi.isSeed() && curi.getVia() != null
                    && curi.flattenVia().length() > 0) {
                // The only way a seed can have a non-empty via is if it is the
                // result of a seed redirect. Returning true here schedules it 
                // via the seeds module, so it may affect scope and be logged 
                // as 'discovered' seed.
                //
                // This is a feature. This is handling for case where a seed
                // gets immediately redirected to another page. What we're doing is
                // treating the immediate redirect target as a seed.
                
                // And it needs rapid scheduling.
                //设置调度等级
                if (curi.getSchedulingDirective() == SchedulingConstants.NORMAL) {
                    curi.setSchedulingDirective(SchedulingConstants.MEDIUM);
                }
                return true; 
            }
            return false;
        }

    CandidateChain处理器链的第一个处理器为CandidateScoper,该处理器继承自Scoper类,对当前CrawlURI caUri对象的范围判断是通过调用DecideRule scope成员的DecideResult decisionFor(CrawlURI uri)方法返回的结果进项判断的,代码比较简单(关于DecideRule类,我在前面的文章已经有过分析,里面是通过迭代调用DecideRule类型集合的成员的方法,这里不再重复)

    /**
         * Schedule the given {@link CrawlURI CrawlURI} with the Frontier.
         * @param caUri The CrawlURI to be scheduled.
         * @return true if CrawlURI was accepted by crawl scope, false
         * otherwise.
         */
        protected boolean isInScope(CrawlURI caUri) {
            boolean result = false;
            //System.out.println(this.getClass().getName()+":"+"scope name:"+scope.getClass().getName());
            DecideResult dr = scope.decisionFor(caUri);
            if (dr == DecideResult.ACCEPT) {
                result = true;
                if (fileLogger != null) {
                    fileLogger.info("ACCEPT " + caUri); 
                }
            } else {
                outOfScope(caUri);
            }
            return result;
        }

    CandidateChain处理器链的第二个处理器为FrontierPreparer,该处理器的功能为当前CrawlURI uri对象在进入边界部件Frontier之前设置相关策略(该处理器前面文章已解析,这里不再重复)

    /**
         * Apply all configured policies to CrawlURI
         * 
         * @param curi CrawlURI
         */
        public void prepare(CrawlURI curi) {
            
            // set schedulingDirective
            curi.setSchedulingDirective(getSchedulingDirective(curi));
                
            // set canonicalized version
            curi.setCanonicalString(canonicalize(curi));
            
            // set queue key
            curi.setClassKey(getClassKey(curi));
            
            // set cost
            curi.setHolderCost(getCost(curi));
            
            // set URI precedence
            getUriPrecedencePolicy().uriScheduled(curi);
    
    
        }

    接下来要分析的处理器为DispositionProcessor,我们可以称之为后置处理器,其主要功能为更新服务器信息和设置队列延迟时间

     @Override
        protected void innerProcess(CrawlURI puri) {
            CrawlURI curi = (CrawlURI)puri;
            
            // Tally per-server, per-host, per-frontier-class running totals
            CrawlServer server = serverCache.getServerFor(curi.getUURI());
    
            String scheme = curi.getUURI().getScheme().toLowerCase();
            if (scheme.equals("http") || scheme.equals("https") &&
                    server != null) {
                // Update connection problems counter
                if(curi.getFetchStatus() == S_CONNECT_FAILED || curi.getFetchStatus() == S_CONNECT_LOST ) {
                    server.incrementConsecutiveConnectionErrors();
                } else if (curi.getFetchStatus() > 0){
                    server.resetConsecutiveConnectionErrors();
                }
    
                // Update robots info
                try {
                    if ("/robots.txt".equals(curi.getUURI().getPath()) && curi.getFetchStatus() != S_DEFERRED) {
                        // shortcut retries  w/ DEEMED when ignore-all
                        if (metadata.getRobotsPolicy() instanceof IgnoreRobotsPolicy) {
                            if(curi.getFetchStatus() < 0 && curi.getFetchStatus()!=S_DEFERRED) {
                                // prevent the rest of the usual retries
                                curi.setFetchStatus(S_DEEMED_NOT_FOUND);
                            }
                        }
                        
                        // Update server with robots info
                        // NOTE: in some cases the curi's status can be changed here
                        server.updateRobots(curi);
                    }
                }
                catch (URIException e) {
                    logger.severe("Failed get path on " + curi.getUURI());
                }
            }
            
            // set politeness delay
            curi.setPolitenessDelay(politenessDelayFor(curi));
            
            // consider operator-set force-retire
            if (getForceRetire()) {
                curi.setForceRetire(true);
            }
            
            // TODO: set other disposition decisions
            // success, failure, retry(retry-delay)
        }

    计算队列延迟时间的方法如下

    /**
         * Update any scheduling structures with the new information in this
         * CrawlURI. Chiefly means make necessary arrangements for no other URIs at
         * the same host to be visited within the appropriate politeness window.
         * 
         * @param curi
         *            The CrawlURI
         * @return millisecond politeness delay
         */
        protected long politenessDelayFor(CrawlURI curi) {
            long durationToWait = 0;
            Map<String,Object> cdata = curi.getData();
            if (cdata.containsKey(A_FETCH_BEGAN_TIME)
                    && cdata.containsKey(A_FETCH_COMPLETED_TIME)) {
    
                long completeTime = curi.getFetchCompletedTime();
                long durationTaken = (completeTime - curi.getFetchBeginTime());
                durationToWait = (long)(getDelayFactor() * durationTaken);
    
                long minDelay = getMinDelayMs();
                if (minDelay > durationToWait) {
                    // wait at least the minimum
                    durationToWait = minDelay;
                }
    
                long maxDelay = getMaxDelayMs();
                if (durationToWait > maxDelay) {
                    // wait no more than the maximum
                    durationToWait = maxDelay;
                }
                
                long respectThreshold = getRespectCrawlDelayUpToSeconds() * 1000;
                if (durationToWait<respectThreshold) {
                    // may need to extend wait
                    CrawlServer s = getServerCache().getServerFor(curi.getUURI());
                    String ua = curi.getUserAgent();
                    if (ua == null) {
                        ua = metadata.getUserAgent();
                    }
                    Robotstxt rep = s.getRobotstxt();
                    if (rep != null) {
                        long crawlDelay = (long)(1000 * rep.getDirectivesFor(ua).getCrawlDelay());
                        crawlDelay = 
                            (crawlDelay > respectThreshold) 
                                ? respectThreshold 
                                : crawlDelay;
                        if (crawlDelay > durationToWait) {
                            // wait at least the directive crawl-delay
                            durationToWait = crawlDelay;
                        }
                    }
                }
                
                long now = System.currentTimeMillis();
                int maxBandwidthKB = getMaxPerHostBandwidthUsageKbSec();
                if (maxBandwidthKB > 0) {
                    // Enforce bandwidth limit
                    ServerCache cache = this.getServerCache();
                    CrawlHost host = cache.getHostFor(curi.getUURI());
                    long minDurationToWait = host.getEarliestNextURIEmitTime()
                            - now;
                    float maxBandwidth = maxBandwidthKB * 1.024F; // kilo factor
                    long processedBytes = curi.getContentSize();
                    host
                            .setEarliestNextURIEmitTime((long)(processedBytes / maxBandwidth)
                                    + now);
    
                    if (minDurationToWait > durationToWait) {
                        durationToWait = minDurationToWait;
                    }
                }
            }
            return durationToWait;
        }

    如果我们需要更改队列延迟时间,可以在配置文件crawler-beans.cxml里面设置相关参数

    ---------------------------------------------------------------------------

    本系列Heritrix 3.1.0 源码解析系本人原创

    转载请注明出处 博客园 刺猬的温驯

    本文链接 http://www.cnblogs.com/chenying99/archive/2013/05/07/3065205.html

  • 相关阅读:
    《那些年,我们拿下FPGA》做笔记
    三种初始化
    hdu4417 Super Mario 树阵离线/划分树
    【设计模式】文章摘要 查找联系人控件
    STL set
    阐述 QUEST CENTRAL FOR DB2 八罪
    使用线程执行堆栈StackTraceElement设计Android日志模块
    苹果iOS苹果公司的手机用户都有权索赔
    Android 4.4 沉浸式透明状态栏与导航栏
    常见的几种RuntimeException
  • 原文地址:https://www.cnblogs.com/chenying99/p/3065205.html
Copyright © 2011-2022 走看看