zoukankan      html  css  js  c++  java
  • Heritrix 3.1.0 源码解析(四)

    如果孤立的发现某类的方法,不免使我们难以理解它的含义;当我们将对象的相互作用串起来的时候,更容易理解方法的意图

    在对象之间相互通信时,首先应该了解对象的状态;最基本的入手方式就是 了解它的构造函数或者初始化方法以及执行相关方法后状态的变化,其次是相应方法的输入参数(发送消息)

    当我们在后台建立一个爬行任务时,在Heritrix3.1.0系统里面对应一个爬行任务类,当前爬行任务的所有属性和行为都封装在这个爬行任务类里面

    这个类为CrawlJob(org.archive.crawler.framework),我们先来熟悉一下该类的相关成员和方法

    爬行任务CrawlJob类实现了两接口Comparable<CrawlJob>, ApplicationListener<ApplicationEvent>,前者显然是用于排序,后者是spring的事件监听器接口(事件监听模式)

    CrawlJob类具有如下属性:

    File primaryConfig; 
    PathSharingContext ac; 
    int launchCount; 
    boolean isLaunchInfoPartial;
    DateTime lastLaunch;
    AlertThreadGroup alertThreadGroup;
        
    DateTime xmlOkAt = new DateTime(0L);
    Logger jobLogger;

    这些属性我们暂时无从知道它们的具体作用,继续查看它的构造函数

    public CrawlJob(File cxml) {
        primaryConfig = cxml; 
        isLaunchInfoPartial = false;
        scanJobLog(); // XXX look at launch directories instead/first? 
        alertThreadGroup = new AlertThreadGroup(getShortName());
      }

    在它的构造函数里面,初始化成员变量File primaryConfig 是任务配置文件crawler-beans.cxml,boolean isLaunchInfoPartial是否加载部分,scanJobLog()扫描日志,AlertThreadGroup alertThreadGroup线程组(本身用于发布日志记录)

    当我们执行任务的build操作时,实际执行的是CrawlJob对象的void validateConfiguration()方法

    /**
         * Does the assembled ApplicationContext self-validate? Any failures
         * are reported as WARNING log events in the job log. 
         * 
         * TODO: make these severe? 
         */
        public synchronized void validateConfiguration() {
            instantiateContainer();
            if(ac==null) {
                // fatal errors already encountered and reported
                return; 
            }
            ac.validate();
            HashMap<String,Errors> allErrors = ac.getAllErrors();
            for(String name : allErrors.keySet()) {
                for(Object err : allErrors.get(name).getAllErrors()) {
                   LOGGER.log(Level.WARNING,err.toString());
                }
            }
        }

    继续调用void instantiateContainer()方法,这里是是实例化PathSharingContext ac(封装后的spring容器),并且设置当前CrawlJob对象为spring的监听器

     /**
         * Can the configuration yield an assembled ApplicationContext? 
         */
        public synchronized void instantiateContainer() {
            checkXML(); 
            if(ac==null) {
                try {
                    ac = new PathSharingContext(new String[] {"file:"+primaryConfig.getAbsolutePath()},false,null);
                    ac.addApplicationListener(this);
                    ac.refresh();
                    getCrawlController(); // trigger NoSuchBeanDefinitionException if no CC
                    getJobLogger().log(Level.INFO,"Job instantiated");
                } catch (BeansException be) {
                    // Calling doTeardown() and therefore ac.close() here sometimes
                    // triggers an IllegalStateException and logs stack trace from
                    // within spring, even if ac.isActive(). So, just null it.
                    ac = null;
                    beansException(be);
                }
            }
        }

    后面是验证PathSharingContext ac的有效性(PathSharingContext类的方法

    //
        // Cascading self-validation
        //
        HashMap<String,Errors> allErrors; // bean name -> Errors
        public void validate() {
            allErrors = new HashMap<String,Errors>();
                
            for(Entry<String, HasValidator> entry : getBeansOfType(HasValidator.class).entrySet()) {
                String name = entry.getKey();
                HasValidator hv = entry.getValue();
                Validator v = hv.getValidator();
                Errors errors = new BeanPropertyBindingResult(hv,name);
                v.validate(hv, errors);
                if(errors.hasErrors()) {
                    allErrors.put(name,errors);
                }
            }
            for(String name : allErrors.keySet()) {
                for(Object obj : allErrors.get(name).getAllErrors()) {
                    LOGGER.fine("validation error for '"+name+"': "+obj);
                }
            }
        }

    如果没有异常,此时CrawlJob对象的getJobStatusDescription为Ready

    下一步我们执行任务的launch操作了,实际执行的是CrawlJob对象的void launch()方法

    /**
         * Launch a crawl into 'running' status, assembling if necessary. 
         * 
         * (Note the crawl may have been configured to start in a 'paused'
         * state.) 
         */
        public synchronized void launch() {
            if (isProfile()) {
                throw new IllegalArgumentException("Can't launch profile" + this);
            }
            
            if(isRunning()) {
                getJobLogger().log(Level.SEVERE,"Can't relaunch running job");
                return;
            } else {
                CrawlController cc = getCrawlController();
                if(cc!=null && cc.hasStarted()) {
                    getJobLogger().log(Level.SEVERE,"Can't relaunch previously-launched assembled job");
                    return;
                }
            }
            
            validateConfiguration();
            if(!hasValidApplicationContext()) {
                getJobLogger().log(Level.SEVERE,"Can't launch problem configuration");
                return;
            }
    
            //final String job = changeState(j, ACTIVE);
            
            // this temporary thread ensures all crawl-created threads
            // land in the AlertThreadGroup, to assist crawl-wide 
            // logging/alerting
            alertThreadGroup = new AlertThreadGroup(getShortName());
            alertThreadGroup.addLogger(getJobLogger());
            Thread launcher = new Thread(alertThreadGroup, getShortName()+" launchthread") {
                public void run() {
                    CrawlController cc = getCrawlController();
                    startContext();
                    if(cc!=null) {
                        cc.requestCrawlStart();
                    }
                }
            };
            getJobLogger().log(Level.INFO,"Job launched");
            scanJobLog();
            launcher.start();
            // look busy (and give startContext/crawlStart a chance)
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                // do nothing
            }
        }

    关键方法是线程类Thread launcher里面的void startContext()和CrawlController对象的void requestCrawlStart()方法 

    void startContext()方法是启动spring容器里面的bean(实现了Lifecycle接口),调用bean的start方法)

    /**
         * Start the context, catching and reporting any BeansExceptions.
         */
        protected synchronized void startContext() {
            try {
                ac.start(); 
                
                // job log file covering just this launch
                getJobLogger().removeHandler(currentLaunchJobLogHandler);
                File f = new File(ac.getCurrentLaunchDir(), "job.log");
                currentLaunchJobLogHandler = new FileHandler(f.getAbsolutePath(), true);
                currentLaunchJobLogHandler.setFormatter(new JobLogFormatter());
                getJobLogger().addHandler(currentLaunchJobLogHandler);
                
            } catch (BeansException be) {
                doTeardown();
                beansException(be);
            } catch (Exception e) {
                LOGGER.log(Level.SEVERE,e.getClass().getSimpleName()+": "+e.getMessage(),e);
                try {
                    doTeardown();
                } catch (Exception e2) {
                    e2.printStackTrace(System.err);
                }        
            }
        }

    CrawlController对象的void requestCrawlStart()方法 

    /** 
         * Operator requested crawl begin
         */
        public void requestCrawlStart() {
            hasStarted = true; 
            sendCrawlStateChangeEvent(State.PREPARING, CrawlStatus.PREPARING);
            
            if(recoveryCheckpoint==null) {
                // only announce (trigger scheduling of) seeds
                // when doing a cold (non-recovery) start
                getSeeds().announceSeeds();
            }
            
            setupToePool();
    
            // A proper exit will change this value.
            this.sExit = CrawlStatus.FINISHED_ABNORMAL;
            
            if (getPauseAtStart()) {
                // frontier is already paused unless started, so just 
                // 'complete'/ack pause
                completePause();
            } else {
                getFrontier().run();
            }
        }

    该方法里面是导入seed种子文件,然后启动线程

    protected void setupToePool() {
            toePool = new ToePool(alertThreadGroup,this);
            // TODO: make # of toes self-optimizing
            toePool.setSize(getMaxToeThreads());
            toePool.waitForAll();
        }

    当我们执行任务的unpause操作时,实际执行的是CrawlController对象的void requestCrawlResume()方法

    /**
         * Resume crawl from paused state
         */
        public void requestCrawlResume() {
            if (state != State.PAUSING && state != State.PAUSED) {
                // Can't resume if not been told to pause
                return;
            }
            
            assert toePool != null;
            
            Frontier f = getFrontier();
            f.unpause();
            sendCrawlStateChangeEvent(State.RUNNING, CrawlStatus.RUNNING);
        }

    pause指令 CrawlController void requestCrawlPause()

    /**
         * Stop the crawl temporarly.
         */
        public synchronized void requestCrawlPause() {
            if (state == State.PAUSING || state == State.PAUSED) {
                // Already about to pause
                return;
            }
            sExit = CrawlStatus.WAITING_FOR_PAUSE;
            getFrontier().pause();
            sendCrawlStateChangeEvent(State.PAUSING, this.sExit);
            // wait for pause to come via frontier changes
        }

    terminate指令 CrawlJob对象 void terminate() 

    public void terminate() {
            getCrawlController().requestCrawlStop();
        }

    继续调用CrawlController对象的void requestCrawlStop()方法

    /**
         * Operator requested for crawl to stop.
         */
        public synchronized void requestCrawlStop() {
            if(state == State.STOPPING) {
                // second stop request; nudge the threads with interrupts
                getToePool().cleanup();
            }
            requestCrawlStop(CrawlStatus.ABORTED);
        }

    teardown指令 CrawlJob对象 boolean teardown()

    /**
         * Ensure a fresh start for any configuration changes or relaunches,
         * by stopping and discarding an existing ApplicationContext.
         * 
         * @return true if teardown is complete when method returns, false if still in progress
         */
        public synchronized boolean teardown() {
            CrawlController cc = getCrawlController();
            if (cc != null) {
                cc.requestCrawlStop();
                needTeardown = true;
                
                // wait up to 3 seconds for stop
                for(int i = 0; i < 11; i++) {
                    if(cc.isStopComplete()) {
                        break;
                    }
                    try {
                        Thread.sleep(300);
                    } catch (InterruptedException e) {
                        // do nothing
                    }
                }
                
                if (cc.isStopComplete()) {
                    doTeardown();
                }
            }
            
            assert needTeardown == (ac != null);
            return !needTeardown; 
        }

    ---------------------------------------------------------------------------

    本系列Heritrix 3.1.0 源码解析系本人原创

    转载请注明出处 博客园 刺猬的温驯

    本文链接 http://www.cnblogs.com/chenying99/archive/2013/04/17/3025413.html

  • 相关阅读:
    url_encode and url_decode in Shell
    学嵌入式技术,做高端电子工程师
    MCU助推居家移动医疗微型化
    深圳单片机培训,7月盛大开班,报名既送
    中国嵌入式工控机市场前景广阔
    嵌入式 现已发展为 IT行业的主流——高薪,且人才缺乏
    谷歌苹果已“技穷”?移动操作系统2013无创新
    C# 与 C++,语法差别有多小-其他2
    link
    C# 与 C++,语法差别有多小-第三章 C++数据类型 第一部分
  • 原文地址:https://www.cnblogs.com/chenying99/p/3025413.html
Copyright © 2011-2022 走看看