zoukankan      html  css  js  c++  java
  • Heritrix 3.1.0 源码解析(三十六)

    接下来本文还要继续分析Heritrix3.1.0系统中的CrawlController类及BdbFrontier类,因为本人觉得前面部分对相关逻辑还没用理清头绪,更重要的原因是由于每篇文章的关注点不同,本人不能在同一篇文章将相关类的所有关注点一一道来

    本文要分析的是,Heritrix3.1.0系统是怎样控制采集任务的启动、暂停及停止等相关状态(如果具备状态机的背景知识,也许会更容易理解相关逻辑)

    CrawlController对象的状态为枚举类型,共有如下状态

    public static enum State {
            NASCENT, RUNNING, EMPTY, PAUSED, PAUSING, 
            STOPPING, FINISHED, PREPARING 
        }
    
        transient private State state = State.NASCENT;

    其初始状态为State.NASCENT

    (细心的读者会发现,CrawlController对象里面还有另外一个枚举类型CrawlStatus 

    /**
         * Crawl exit status.
         */
        private transient CrawlStatus sExit = CrawlStatus.CREATED;

    由于该枚举类型仅用于描述状态,不参与执行逻辑的处理,所以本文对该成员不予深究)

    当我们执行采集任务的相关命令时,CrawlController对象的状态同步改变,事件发布方法如下 

    /**
         * 改变状态   发布事件
         * Send crawl change event to all listeners.
         * @param newState State change we're to tell listeners' about.
         * @param message Message on state change.
         */
        protected void sendCrawlStateChangeEvent(State newState, 
                CrawlStatus status) {
            if(this.state == newState) {
                // suppress duplicate state-reports
                return;
            }
            this.state = newState; 
            CrawlStateEvent event = new CrawlStateEvent(this,newState,status.getDescription());
            appCtx.publishEvent(event); 
        }

    在上面方法中,除了设置CrawlController对象的状态,同时发布事件,事件类型为CrawlStateEvent(这里的CrawlController对象作为observer模式的subject角色)

    CrawlJob类实现了ApplicationListener接口,作为该事件监听者之一(相当于observer模式的observer角色),事件监听方法如下

     /** 
         * 事件监听程序
         * Log note of all ApplicationEvents.
         * 
         * @see org.springframework.context.ApplicationListener#onApplicationEvent(org.springframework.context.ApplicationEvent)
         */
        public void onApplicationEvent(ApplicationEvent event) {
            if(event instanceof CrawlStateEvent) {
                getJobLogger().log(Level.INFO, ((CrawlStateEvent)event).getState() + 
                        (ac.getCurrentLaunchId() != null ? " " + ac.getCurrentLaunchId() : ""));
            }
            //event.getSource();
            synchronized (this) {
                //当正在执行synchronized boolean teardown()方法(里面调用CrawlController对象的void requestCrawlStop()方法)
    //并且还没有执行完成时 needTeardown=true
    //doTeardown()方法后,needTeardown重置为false,保证方法只执行一次
    if (needTeardown && event instanceof StopCompleteEvent) { doTeardown(); } } if(event instanceof CheckpointSuccessEvent) { getJobLogger().log(Level.INFO, "CHECKPOINTED "+((CheckpointSuccessEvent)event).getCheckpoint().getName()); } }

    Heritrix3.1.0系统里面,有多种事件监听者,它们分别监听不同的事件,当相关事件发生时,执行自身的事件处理方法,我们用eclipse开发工具可以看到,有如下事件监听类(实现了ApplicationListener接口)

    那么在Heritrix3.1.0系统里面又有哪些可发布的事件类型呢

    在Heritrix3.1.0系统里面又有哪些类发布了事件呢

    某个问题如果我们穷追不舍的话,总是会越来越多,这里就此打住,有兴趣的读者可以去深究这些事件的来龙去脉(本人预计,事件监听者大多是打印日志功能)

    现在回到主题,当我们执行采集任务的 launch命令时(CrawlJob对象的void launch()方法),CrawlController对象的状态开始变为State.PREPARING,再改变为State.PAUSED

    State.NASCENT——>State.PREPARING——>State.PAUSED

    /** 
         * CrawlJob对象的void launch()方法 launch命令
         * Operator requested crawl begin
         */
        public void requestCrawlStart() {
            hasStarted = true; 
            //改变状态为State.PREPARING
            sendCrawlStateChangeEvent(State.PREPARING, CrawlStatus.PREPARING);
            
            if(recoveryCheckpoint==null) {
                // only announce (trigger scheduling of) seeds
                // when doing a cold (non-recovery) start
                getSeeds().announceSeeds();
            }
            
            setupToePool();
    
            // A proper exit will change this value.
            this.sExit = CrawlStatus.FINISHED_ABNORMAL;
            
            if (getPauseAtStart()) {
                // frontier is already paused unless started, so just 
                // 'complete'/ack pause
                //改变状态为State.PAUSED
                completePause();
            } else {
                getFrontier().run();
            }
        }

    上面方法中,后面部分的判断为在执行采集任务的 launch命令时,初始状态是否启动BdbFrontier对象的void run()方法(启动采集任务)

    当我们执行采集任务的 Unpause命令时,CrawlController对象的状态变为State.RUNNING(预先状态为state== State.PAUSING || state == State.PAUSED)

    /**
         * Unpause命令
         * Resume crawl from paused state
         */
        public void requestCrawlResume() {
            if (state != State.PAUSING && state != State.PAUSED) {
                // Can't resume if not been told to pause
                return;
            }
            
            assert toePool != null;
            
            Frontier f = getFrontier();
            f.unpause();
            //改变状态
            sendCrawlStateChangeEvent(State.RUNNING, CrawlStatus.RUNNING);
        }

    上面方法中,首先执行BdbFrontier对象的void unpause()方法,然后改变CrawlController对象的状态

    当我们执行采集任务的 pause命令时,CrawlController对象的状态变为State.PAUSING(预先状态与上面相反,state != State.PAUSING && state != State.PAUSED)

    /**
         * pause命令
         * Stop the crawl temporarly.
         */
        public synchronized void requestCrawlPause() {
            if (state == State.PAUSING || state == State.PAUSED) {
                // Already about to pause
                return;
            }
            sExit = CrawlStatus.WAITING_FOR_PAUSE;
            getFrontier().pause();
            //改变状态为State.PAUSING
            sendCrawlStateChangeEvent(State.PAUSING, this.sExit);
            // wait for pause to come via frontier changes
        }

    上面方法中,首先执行BdbFrontier对象的void pause()方法,然后改变CrawlController对象的状态

    当我们执行采集任务的terminate命令时,CrawlController对象的状态变为State.STOPPING(预先状态为state != State.STOPPING && state != State.FINISHED)

     /**
         * terminate命令/teardown命令
         * Operator requested for crawl to stop.
         */
        public synchronized void requestCrawlStop() {
            if(state == State.STOPPING) {
                // second stop request; nudge the threads with interrupts
                getToePool().cleanup();
            }
            requestCrawlStop(CrawlStatus.ABORTED);
        }

    进一步调用synchronized void requestCrawlStop(CrawlStatus message)方法

    /**
         * Operator requested for crawl to stop.
         * @param message 
         */
        public synchronized void requestCrawlStop(CrawlStatus message) {
            if (state == State.NASCENT) {
                this.sExit = message;
                this.state = State.FINISHED;
                this.isStopComplete = true;
            }
            if (state == State.STOPPING || state == State.FINISHED ) {
                return;
            }
            if (message == null) {
                throw new IllegalArgumentException("Message cannot be null.");
            }
            if(this.sExit != CrawlStatus.FINISHED) {
                // don't clobber an already-FINISHED with alternate status
                this.sExit = message;
            }
            beginCrawlStop();
        }

    进一步调用void beginCrawlStop()方法

    /**
         * Start the process of stopping the crawl. 
         */
        public void beginCrawlStop() {
            LOGGER.fine("Started.");
            //改变状态为State.STOPPING
            sendCrawlStateChangeEvent(State.STOPPING, this.sExit);
            Frontier frontier = getFrontier();
            if (frontier != null) {
                frontier.terminate();
            }
            LOGGER.fine("Finished."); 
        }

    上面方法中,首先改变CrawlController对象的状态,然后执行BdbFrontier对象的void terminate()方法

    当我们执行采集任务的teardown命令时,我们先从CrawlJob对象的synchronized boolean teardown()方法开始分析

    /**
         * Ensure a fresh start for any configuration changes or relaunches,
         * by stopping and discarding an existing ApplicationContext.
         * 
         * @return true if teardown is complete when method returns, false if still in progress
         */
        public synchronized boolean teardown() {
            CrawlController cc = getCrawlController();
            if (cc != null) {
                cc.requestCrawlStop();
                needTeardown = true;
                
                // wait up to 3 seconds for stop
                for(int i = 0; i < 11; i++) {
                    if(cc.isStopComplete()) {
                        break;
                    }
                    try {
                        Thread.sleep(300);
                    } catch (InterruptedException e) {
                        // do nothing
                    }
                }
                //判断CrawlController cc的成员属性(boolean isStopComplete=true)
                //设置needTeardown = false
                //关闭PathSharingContext ac对象
                if (cc.isStopComplete()) {
                    doTeardown();
                }
            }
            
            assert needTeardown == (ac != null);
            return !needTeardown; 
        }

    同样首先调用CrawlController对象的synchronized void requestCrawlStop()方法(二次调用会强制中断ToePool toePool中的线程)

    /**
         * terminate命令/teardown命令
         * Operator requested for crawl to stop.
         */
        public synchronized void requestCrawlStop() {
            if(state == State.STOPPING) {
                // second stop request; nudge the threads with interrupts
                getToePool().cleanup();
            }
            requestCrawlStop(CrawlStatus.ABORTED);
        }

    我们不能孤立的来分析CrawlController对象的状态,因为在执行命令调用的相关方法里面,都会执行相应的BdbFrontier对象的相关方法,而在BdbFrontier对象里面同时会回调CrawlController对象的方法,从而使CrawlController对象的状态改变

    BdbFrontier对象同样也维持自身的状态,这个状态成员同样也是枚举类型,不过跟CrawlController对象的状态成员不是同一个

    BdbFrontier对象的状态成员如下(State targetState = State.PAUSE为自身的状态,State lastReachedState = null为最后发送到CrawlController对象的状态记录)

     /** last Frontier.State reached; used to suppress duplicate notifications */
        State lastReachedState = null;
        /** Frontier.state that manager thread should seek to reach */
        volatile State targetState = State.PAUSE;

    这里的State成员定义如下(在Frontier接口里面)

    /**
         * Enumeration of possible target states. 
         */
        public enum State { 
            RUN,  // juggle/prioritize/emit; usual state
            EMPTY, // running/ready but no URIs queued/scheduled
            HOLD, // NOT YET USED enter a consistent, stable, checkpointable state ASAP
            PAUSE, // enter a stable state where no URIs are in-progress; unlike
                   // HOLD requires all in-process URIs to complete
            FINISH  // end and cleanup; may not return to any other state after
                      // this state is requested/reached
        }

    当CrawlController对象调用的方法分别如下,调用这些方法同时会改变BdbFrontier对象的状态(State targetState)

    public void run() {
            requestState(State.RUN);
        }
        
        /* (non-Javadoc)
         * @see org.archive.crawler.framework.Frontier#requestState(org.archive.crawler.framework.Frontier.State)
         */
        public void requestState(State target) {
            targetState = target;
        }
        
        public void pause() {
            requestState(State.PAUSE);
        }
    
        public void unpause() {
            requestState(State.RUN);
        }
    
        public void terminate() {
            requestState(State.FINISH);
        }

     在CrawlController对象初始化时,会在一个线程里面调用自身的void managementTasks()方法,该方法不断的根据CrawlController对象状态(State targetState成员属性)来修改ReentrantReadWriteLock outboundLock = new ReentrantReadWriteLock(true)成员的锁定属性,并且调用void reachedState(State justReached)方法(在该方法里面回调CrawlController对象的方法从而使其状态作相应的变化)

    /**
         * Main loop of frontier's managerThread. Only exits when State.FINISH 
         * is requested (perhaps automatically at URI exhaustion) and reached. 
         * 
         * General strategy is to try to fill outbound queue, then process an
         * item from inbound queue, and repeat. A HOLD (to be implemented) or 
         * PAUSE puts frontier into a stable state that won't be changed
         * asynchronously by worker thread activity. 
         */
        protected void managementTasks() {
            assert Thread.currentThread() == managerThread;
            try {
                loop: while (true) {
                    try {
                        State reachedState = null; 
                        switch (targetState) {
                        case EMPTY:
                            reachedState = State.EMPTY; 
                        case RUN:
                            // enable outbound takes if previously locked
                            while(outboundLock.isWriteLockedByCurrentThread()) {
                                outboundLock.writeLock().unlock();
                            }
                            if(reachedState==null) {
                                reachedState = State.RUN; 
                            }
                            reachedState(reachedState);
                            
                            Thread.sleep(1000);
                            
                            if(isEmpty()&&targetState==State.RUN) {
                                requestState(State.EMPTY); 
                            } else if (!isEmpty()&&targetState==State.EMPTY) {
                                requestState(State.RUN); 
                            }
                            break;
                        case HOLD:
                            // TODO; for now treat same as PAUSE
                        case PAUSE:
                            // pausing
                            // prevent all outbound takes
                            outboundLock.writeLock().lock();
                            // process all inbound
                            while (targetState == State.PAUSE) {
                                if (getInProcessCount()==0) {
                                    reachedState(State.PAUSE);
                                }
                                
                                Thread.sleep(1000);
                            }
                            break;
                        case FINISH:
                            // prevent all outbound takes
                            outboundLock.writeLock().lock();
                            // process all inbound
                            while (getInProcessCount()>0) {
                                Thread.sleep(1000);
                            }
    
                            finalTasks(); 
                            // TODO: more cleanup?
                            reachedState(State.FINISH);
                            break loop;
                        }
                    } catch (RuntimeException e) {
                        // log, try to pause, continue
                        logger.log(Level.SEVERE,"",e);
                        if(targetState!=State.PAUSE && targetState!=State.FINISH) {
                            requestState(State.PAUSE);
                        }
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } 
            
            // try to leave in safely restartable state: 
            targetState = State.PAUSE;
            while(outboundLock.isWriteLockedByCurrentThread()) {
                outboundLock.writeLock().unlock();
            }
            //TODO: ensure all other structures are cleanly reset on restart
            
            logger.log(Level.FINE,"ending frontier mgr thread");
        }

     void reachedState(State justReached)方法如下,回调CrawlController对象的方法

    /**
         * The given state has been reached; if it is a new state, generate
         * a notification to the CrawlController. 
         * 
         * TODO: evaluate making this a generic notification others can sign up for
         */
        protected void reachedState(State justReached) {
            if(justReached != lastReachedState) {
                controller.noteFrontierState(justReached);
                lastReachedState = justReached;
            }
        }

    CrawlController对象的void noteFrontierState(Frontier.State reachedState) 方法(我们可以看到,参数为Frontier.State类型)

    /**
         * Receive notification from the frontier, in the frontier's own 
         * manager thread, that the frontier has reached a new state. 
         * 
         * @param reachedState the state the frontier has reached
         */
        public void noteFrontierState(Frontier.State reachedState) {
            switch (reachedState) {
            case RUN: 
                LOGGER.info("Crawl running.");
                sendCrawlStateChangeEvent(State.RUNNING, CrawlStatus.RUNNING);
                break;
            case EMPTY: 
                LOGGER.info("Crawl empty.");
                if(!getRunWhileEmpty()) {
                    this.sExit = CrawlStatus.FINISHED;
                    beginCrawlStop();
                }
                sendCrawlStateChangeEvent(State.EMPTY, CrawlStatus.RUNNING);
                break; 
            case PAUSE:
                if (state == State.PAUSING) {
                    completePause();
                }
                break;
            case FINISH:
                completeStop();
                break;
            default:
                // do nothing
            }
        }

    如果发送过来的状态为Frontier.State.FINISH,则进一步调用void completeStop()方法

    /**
         * Called when the last toethread exits.
         */
        protected void completeStop() {
            LOGGER.fine("Entered complete stop.");
    
            statisticsTracker.getSnapshot(); // ???
            
            this.reserveMemory = null;
            if (this.toePool != null) {
                this.toePool.cleanup();
            }
            this.toePool = null;
    
            LOGGER.fine("Finished crawl.");
    
            try {
                appCtx.stop(); 
            } catch (RuntimeException re) {
                LOGGER.log(Level.SEVERE,re.getMessage(),re);
            }
            
            sendCrawlStateChangeEvent(State.FINISHED, this.sExit);
    
            // CrawlJob needs to be sure all beans have received FINISHED signal before teardown
            this.isStopComplete = true;
            appCtx.publishEvent(new StopCompleteEvent(this)); 
        }

    该方法里面修改CrawlController对象的状态,中断ToePool toePool中的线程,并且发布StopCompleteEvent事件

    这里我们还可以看到this.isStopComplete = true属性的修改,表示在CrawlJob对象在执行doTeardown()方法前保证this.isStopComplete = true

    这样我们就不难理解前面的synchronized boolean teardown()方法和事处理件方法void onApplicationEvent(ApplicationEvent event)里面的判断了

    doTeardown()方法为同步方法,在synchronized boolean teardown()方法和事件处理方法void onApplicationEvent(ApplicationEvent event)只能同时一个调用该方法 

    // ac guaranteed to be null after this method is called
        protected synchronized void doTeardown() {
            needTeardown = false;
    
            try {
                if (ac != null) { 
                    ac.close();
                }
            } finally {
                // all this stuff should happen even in case ac.close() bugs out
                ac = null;
                
                xmlOkAt = new DateTime(0);
                
                if (currentLaunchJobLogHandler != null) {
                    getJobLogger().removeHandler(currentLaunchJobLogHandler);
                    currentLaunchJobLogHandler.close();
                    currentLaunchJobLogHandler = null;
                }
    
                getJobLogger().log(Level.INFO,"Job instance discarded");
            }
        }

    在上面方法里面设置同时设置needTeardown = false,则事件处理方法接收到StopCompleteEvent事件时不会再调用该方法

    (思考:假如synchronized boolean teardown()方法处于阻塞,如果事件处理方法void onApplicationEvent(ApplicationEvent event)接收到StopCompleteEvent事件先执行doTeardown()方法,synchronized boolean teardown()方法还会不会再次执行该方法) 

    --------------------------------------------------------------------------- 

    本系列Heritrix 3.1.0 源码解析系本人原创 

    转载请注明出处 博客园 刺猬的温驯 

    本文链接 http://www.cnblogs.com/chenying99/archive/2013/05/12/3073654.html

  • 相关阅读:
    支持nmap批量漏洞扫描的script
    Linux学习路径(小白必看)
    SEED实验系列:Collabtive系统SQL注入实验
    SEED实验系列:ShellShock 攻击实验
    SEED信息安全实验系列:缓冲区溢出漏洞实验
    SEED实验系列:缓冲区溢出漏洞试验
    信息安全不可错过的30门实验
    SEED实验系列文章目录
    Laravel大型项目系列教程(七)之7 扩展包和Artisan开发
    laravel大型项目系列教程(六)之优化、单元测试以及部署
  • 原文地址:https://www.cnblogs.com/chenying99/p/3073654.html
Copyright © 2011-2022 走看看