zoukankan      html  css  js  c++  java
  • Heritrix 3.1.0 源码解析(十六)

     我们接下来分析与与BdbFrontier对象CrawlURI next()方法相关的方法

    /**
         * Return the next CrawlURI eligible to be processed (and presumably
         * visited/fetched) by a a worker thread.
         *
         * Relies on the readyClassQueues having been loaded with
         * any work queues that are eligible to provide a URI. 
         *
         * @return next CrawlURI eligible to be processed, or null if none available
         *
         * @see org.archive.crawler.framework.Frontier#next()
         */
        protected CrawlURI findEligibleURI() {
                // wake any snoozed queues
                wakeQueues();
                // consider rescheduled URIS
                checkFutures();
                       
                // find a non-empty ready queue, if any 
                // TODO: refactor to untangle these loops, early-exits, etc!
                WorkQueue readyQ = null;
                findauri: while(true) {
                    findaqueue: do {
                        String key = readyClassQueues.poll();
                        if(key==null) {
                            // no ready queues; try to activate one
                            if(!getInactiveQueuesByPrecedence().isEmpty() 
                                && highestPrecedenceWaiting < getPrecedenceFloor()) {
                                activateInactiveQueue();
                                continue findaqueue;
                            } else {
                                // nothing ready or readyable
                                break findaqueue;
                            }
                        }
                        readyQ = getQueueFor(key);
                        if(readyQ==null) {
                             // readyQ key wasn't in all queues: unexpected
                            logger.severe("Key "+ key +
                                " in readyClassQueues but not allQueues");
                            break findaqueue;
                        }
                        if(readyQ.getCount()==0) {
                            // readyQ is empty and ready: it's exhausted
                            readyQ.noteExhausted(); 
                            readyQ.makeDirty();
                            readyQ = null;
                            continue; 
                        }
                        if(!inProcessQueues.add(readyQ)) {
                            // double activation; discard this and move on
                            // (this guard allows other enqueuings to ready or 
                            // the various inactive-by-precedence queues to 
                            // sometimes redundantly enqueue a queue key)
                            readyQ = null; 
                            continue;
                        }
                        // queue has gone 'in process' 
                        readyQ.considerActive();
                        readyQ.setWakeTime(0); // clear obsolete wake time, if any
    
                        readyQ.setSessionBudget(getBalanceReplenishAmount());
                        readyQ.setTotalBudget(getQueueTotalBudget()); 
                        if (readyQ.isOverSessionBudget()) {
                            deactivateQueue(readyQ);
                            readyQ.makeDirty();
                            readyQ = null;
                            continue; 
                        }
                        if (readyQ.isOverTotalBudget()) {
                            retireQueue(readyQ);
                            readyQ.makeDirty();
                            readyQ = null;
                            continue; 
                        }
                    } while (readyQ == null);
                    
                    if (readyQ == null) {
                        // no queues left in ready or readiable
                        break findauri; 
                    }
               
                    returnauri: while(true) { // loop left by explicit return or break on empty
                        CrawlURI curi = null;
                        curi = readyQ.peek(this);   
                        if(curi == null) {
                            // should not reach
                            logger.severe("No CrawlURI from ready non-empty queue "
                                    + readyQ.classKey + "\n" 
                                    + readyQ.shortReportLegend() + "\n"
                                    + readyQ.shortReportLine() + "\n");
                            break returnauri;
                        }
                        
                        // from queues, override names persist but not map source
                        curi.setOverlayMapsSource(sheetOverlaysManager);
                        // TODO: consider optimizations avoiding this recalc of
                        // overrides when not necessary
                        sheetOverlaysManager.applyOverlaysTo(curi);
                        // check if curi belongs in different queue
                        String currentQueueKey;
                        try {
                            KeyedProperties.loadOverridesFrom(curi);
                            currentQueueKey = getClassKey(curi);
                        } finally {
                            KeyedProperties.clearOverridesFrom(curi); 
                        }
                        if (currentQueueKey.equals(curi.getClassKey())) {
                            // curi was in right queue, emit
                            noteAboutToEmit(curi, readyQ);
                            return curi;
                        }
                        // URI's assigned queue has changed since it
                        // was queued (eg because its IP has become
                        // known). Requeue to new queue.
                        // TODO: consider synchronization on readyQ
                        readyQ.dequeue(this,curi);
                        doJournalRelocated(curi);
                        curi.setClassKey(currentQueueKey);
                        decrementQueuedCount(1);
                        curi.setHolderKey(null);
                        sendToQueue(curi);
                        if(readyQ.getCount()==0) {
                            // readyQ is empty and ready: it's exhausted
                            // release held status, allowing any subsequent 
                            // enqueues to again put queue in ready
                            // FIXME: tiny window here where queue could 
                            // receive new URI, be readied, fail not-in-process?
                            inProcessQueues.remove(readyQ);
                            readyQ.noteExhausted();
                            readyQ.makeDirty();
                            readyQ = null;
                            continue findauri;
                        }
                    }
                }
                    
                if(inProcessQueues.size()==0) {
                    // Nothing was ready or in progress or imminent to wake; ensure 
                    // any piled-up pending-scheduled URIs are considered
                    uriUniqFilter.requestFlush();
                }
                
                // if truly nothing ready, wait a moment before returning null
                // so that loop in surrounding next() has a chance of getting something
                // next time
                if(getTotalEligibleInactiveQueues()==0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // 
                    } 
                }
                
                // nothing eligible
                return null; 
        }

    这个方法有点长,我们先看一下void wakeQueues() 方法

         /** 唤醒snoozed queue中到时的队列
         * Wake any queues sitting in the snoozed queue whose time has come.
         */
        protected void wakeQueues() {
            DelayedWorkQueue waked; 
            while((waked = snoozedClassQueues.poll())!=null) {
                WorkQueue queue = waked.getWorkQueue(this);
                queue.setWakeTime(0);
                queue.makeDirty();
                reenqueueQueue(queue);
            }
            // also consider overflow (usually empty)
            if(!snoozedOverflow.isEmpty()) {
                synchronized(snoozedOverflow) {
                    Iterator<DelayedWorkQueue> iter = 
                        snoozedOverflow.headMap(System.currentTimeMillis()).values().iterator();
                    while(iter.hasNext()) {
                        DelayedWorkQueue dq = iter.next();
                        iter.remove();
                        snoozedOverflowCount.decrementAndGet();
                        WorkQueue queue = dq.getWorkQueue(this);
                        queue.setWakeTime(0);
                        queue.makeDirty();
                        reenqueueQueue(queue);
                    }
                }
            }
        }

    snoozedClassQueues.poll()方法是从休眠队列中取出时间到期的元素,重置睡眠时间为0,然后重置WorkQueue wq的队列归属(非活动状态队列或已经准备好被爬取的队列)

    /**
         * Enqueue the given queue to either readyClassQueues or inactiveQueues,
         * as appropriate.
         * 
         * @param wq
         */
        protected void reenqueueQueue(WorkQueue wq) { 
            //TODO:SPRINGY set overrides by queue? 
            getQueuePrecedencePolicy().queueReevaluate(wq);
            if (logger.isLoggable(Level.FINE)) {
                logger.fine("queue reenqueued: " +
                    wq.getClassKey());
            }
            if(highestPrecedenceWaiting < wq.getPrecedence() 
                || wq.getPrecedence() >= getPrecedenceFloor()) {
                // if still over budget, deactivate
                deactivateQueue(wq);
            } else {
                readyQueue(wq);
            }
        }

     首先是重置队列的优先级,然后是将WorkQueue wq归入非活动状态队列或已经准备好被爬取的队列

    deactivateQueue(wq)方法我们上文已经分析过(将WorkQueue wq加入非活动状态队列),这里看一下readyQueue(wq)方法

    /**
         * Put the given queue on the readyClassQueues queue
         * @param wq
         */
        protected void readyQueue(WorkQueue wq) {
    //        assert Thread.currentThread() == managerThread;
    
            try {
                readyClassQueues.put(wq.getClassKey());
                if(logger.isLoggable(Level.FINE)) {
                    logger.log(Level.FINE,
                            "queue readied: " + wq.getClassKey());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.err.println("unable to ready queue "+wq);
                // propagate interrupt up 
                throw new RuntimeException(e);
            }
        }

    该方法是将WorkQueue wq加入已经准备好被爬取的队列readyClassQueues

    重新回到void wakeQueues()方法,后面是从snoozedOverflow容器中取出休眠到期的队列(snoozedOverflow用Map类型存储着优先级与过载的休眠状态的队列(队列存储着key)[Map类型]),然后重置WorkQueue wq归入哪个队列

    回到CrawlURI findEligibleURI()方法里面的void checkFutures()方法检测到延迟时间的CrawlURI对象,并且加入BDB数据库

    /**
         * Check for any future-scheduled URIs now eligible for reenqueuing
         */
        protected void checkFutures() {
    //        assert Thread.currentThread() == managerThread;
            // TODO: consider only checking this every set interval
            if(!futureUris.isEmpty()) {
                synchronized(futureUris) {
                    Iterator<CrawlURI> iter = 
                        futureUris.headMap(System.currentTimeMillis())
                            .values().iterator();
                    while(iter.hasNext()) {
                        CrawlURI curi = iter.next();
                        curi.setRescheduleTime(-1); // unless again set elsewhere
                        iter.remove();
                        futureUriCount.decrementAndGet();
                        receive(curi);
                    }
                }
            }
        }

    继续往下面看,String key = readyClassQueues.poll()方法为从已经准备好被爬取的队列readyClassQueues中取出队头元素(WorkQueue wq的classkey)

    如果预备队列中不存在元素,则激活非活动状态队列inactiveQueues,将合适的WorkQueue wq放已经准备好被爬取的队列入readyClassQueues中

    activateInactiveQueue()

    /**
         * 激活非活动状态的队列
         * Activate an inactive queue, if any are available. 
         */
        protected boolean activateInactiveQueue() {
            for (Entry<Integer, Queue<String>> entry: getInactiveQueuesByPrecedence().entrySet()) {
                int expectedPrecedence = entry.getKey();
                Queue<String> queueOfWorkQueueKeys = entry.getValue();
    
                while (true) {
                    synchronized (getInactiveQueuesByPrecedence()) {
                        String workQueueKey = queueOfWorkQueueKeys.poll();
                        if (workQueueKey == null) {
                            break;
                        }
    
                        WorkQueue candidateQ = (WorkQueue) this.allQueues.get(workQueueKey);
                        if (candidateQ.getPrecedence() > expectedPrecedence) {
                            // queue demoted since placed; re-deactivate
                            deactivateQueue(candidateQ);
                            candidateQ.makeDirty();
                            continue; 
                        }
    
                        updateHighestWaiting(expectedPrecedence);
                        try {
                            readyClassQueues.put(workQueueKey);//readyClassQueues存储着已经准备好被爬取的队列的key
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e); 
                        } 
                        
                        return true; 
                    }
                }
            }
            
            return false;
        }

    更新非活动状态队列inactiveQueues中最高优先级的值(最小值) 

    /**
         * Recalculate the value of thehighest-precedence queue waiting
         * among inactive queues. 
         * 
         * @param startFrom start looking at this precedence value
         */
        protected void updateHighestWaiting(int startFrom) {
            // probe for new highestWaiting
            for(int precedenceKey : getInactiveQueuesByPrecedence().tailMap(startFrom).keySet()) {
                if(!getInactiveQueuesByPrecedence().get(precedenceKey).isEmpty()) {
                    highestPrecedenceWaiting = precedenceKey;
                    return;
                }
            }
            // nothing waiting
            highestPrecedenceWaiting = Integer.MAX_VALUE;
        }

     上面方法为从非活动状态队列inactiveQueues中获取大于指定值得队列元素集合,然后将highestPrecedenceWaiting值设置为非活动状态队列inactiveQueues中precedence最小的值(inactiveQueues是有序的)

    ---------------------------------------------------------------------------

    本系列Heritrix 3.1.0 源码解析系本人原创

    转载请注明出处 博客园 刺猬的温驯

    本文链接 http://www.cnblogs.com/chenying99/archive/2013/04/21/3033510.html

  • 相关阅读:
    nsight system
    unity 拿到管线权限的解决方案
    Erlang cowboy 入门参考
    [Erlang]Mnesia分布式应用
    erlang浅谈
    erlang 中带下划线变量的使用
    erlang 符号相关基本语法
    Erlang与ActionScript3采用JSON格式进行Socket通讯
    Erlang游戏开发-协议
    Erlang之IO编程
  • 原文地址:https://www.cnblogs.com/chenying99/p/3033510.html
Copyright © 2011-2022 走看看