zoukankan      html  css  js  c++  java
  • Heritrix 3.1.0 源码解析(五)

    我们从上文的CrawlController对象可以看到,爬虫任务是通过ToePool类建立线程ToeThread的线程池的

    我们在了解采集线程池的相关类之前,先有必要了解一下CrawlController类,因为我们的爬虫操作指令最终是通过调用CrawlController对象的方法的

    CrawlController类的成员和方法都是直接与采集任务相关的,好比控制中心

    // ApplicationContextAware implementation, for eventing
        AbstractApplicationContext appCtx;
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.appCtx = (AbstractApplicationContext)applicationContext;
        }
        
        CrawlMetadata metadata;
        public CrawlMetadata getMetadata() {
            return metadata;
        }
        @Autowired
        public void setMetadata(CrawlMetadata provider) {
            this.metadata = provider;
        }
        
        protected ServerCache serverCache;
        public ServerCache getServerCache() {
            return this.serverCache;
        }
        @Autowired
        public void setServerCache(ServerCache serverCache) {
            this.serverCache = serverCache;
        }
    
        /**
         * The frontier to use for the crawl.
         */
        protected Frontier frontier;
        public Frontier getFrontier() {
            return this.frontier;
        }
        @Autowired
        public void setFrontier(Frontier frontier) {
            this.frontier = frontier;
        }
    
        /**
         * Scratch directory for temporary overflow-to-disk
         */
        protected ConfigPath scratchDir = 
            new ConfigPath("scratch subdirectory","scratch");
        public ConfigPath getScratchDir() {
            return scratchDir;
        }
        public void setScratchDir(ConfigPath scratchDir) {
            this.scratchDir = scratchDir;
        }
    
        /**
         * Statistics tracking modules.  Any number of specialized statistics 
         * trackers that monitor a crawl and write logs, reports and/or provide 
         * information to the user interface.
         */
        protected StatisticsTracker statisticsTracker;
        public StatisticsTracker getStatisticsTracker() {
            return this.statisticsTracker;
        }
        @Autowired
        public void setStatisticsTracker(StatisticsTracker statisticsTracker) {
            this.statisticsTracker = statisticsTracker;
        }
    
        protected SeedModule seeds;
        public SeedModule getSeeds() {
            return this.seeds;
        }
        @Autowired
        public void setSeeds(SeedModule seeds) {
            this.seeds = seeds;
        }
        
        /**
         * Fetch chain
         */
        protected FetchChain fetchChain;
        public FetchChain getFetchChain() {
            return this.fetchChain;
        }
        @Autowired
        public void setFetchChain(FetchChain fetchChain) {
            this.fetchChain = fetchChain;
        }
        
        /**
         * Disposition chain
         */
        protected DispositionChain dispositionChain;
        public DispositionChain getDispositionChain() {
            return this.dispositionChain;
        }
        @Autowired
        public void setDispositionChain(DispositionChain dispositionChain) {
            this.dispositionChain = dispositionChain;
        }
        
        /**
         * Candidate chain
         */
        protected CandidateChain candidateChain;
        public CandidateChain getCandidateChain() {
            return this.candidateChain;
        }
        @Autowired
        public void setCandidateChain(CandidateChain candidateChain) {
            this.candidateChain = candidateChain;
        }

    上述成员变量都是很重要的,包括spring容器对象、CrawlMetadata元数据、ServerCache服务缓存、Frontier对象、SeedModule种子模块、statisticsTracker统计跟踪、以及后面的处理器链FetchChain  DispositionChain CandidateChain等,其他的没有贴出来的基本上与采集任务配置相关的参数,如线程数量等

    它的初始化方法设置初始状态

    public void start() {
            // cache AlertThreadGroup for later ToePool launch
            AlertThreadGroup atg = AlertThreadGroup.current();
            if(atg!=null) {
                alertThreadGroup = atg;
            }
            
            if(isRunning) {
                return; 
            }
           
            sExit = CrawlStatus.FINISHED_ABNORMAL;
    
            // force creation of DNS Cache now -- avoids CacheCleaner in toe-threads group
            // also cap size at 1 (we never wanta cached value; 0 is non-operative)
            Lookup.getDefaultCache(DClass.IN).setMaxEntries(1);
            
            reserveMemory = new LinkedList<char[]>();
            for(int i = 0; i < RESERVE_BLOCKS; i++) {
                reserveMemory.add(new char[RESERVE_BLOCK_SIZE]);
            }
            isRunning = true; 
        }

    ToePool类继承自ThreadGroup线程组类,它的成员变量如下

    public static int DEFAULT_TOE_PRIORITY = Thread.NORM_PRIORITY - 1;
        
        protected CrawlController controller;
        protected int nextSerialNumber = 1;
        protected int targetSize = 0; 

    我们再回头看上文中提到的CrawlController对象里面初始化ToePool方法

     protected void setupToePool() {
            toePool = new ToePool(alertThreadGroup,this);
            // TODO: make # of toes self-optimizing
            toePool.setSize(getMaxToeThreads());
            toePool.waitForAll();
        }

    传入线程组(这里作为父线程组)和CrawlController对象

    ToeThread类的构造函数如下

     /**
         * Constructor. Creates a pool of ToeThreads. 
         *
         * @param c A reference to the CrawlController for the current crawl.
         */
        public ToePool(AlertThreadGroup atg, CrawlController c) {
            super(atg, "ToeThreads");        
            this.controller = c;
            setDaemon(true);
        }

    设置父线程组对象和初始化CrawlController对象

    void setSize(int newsize)方法设置线程池大小,并启动指定数量线程

    /**
         * Change the number of ToeThreads.
         *
         * @param newsize The new number of ToeThreads.
         */
        public void setSize(int newsize)
        {
            targetSize = newsize;
            int difference = newsize - getToeCount(); 
            if (difference > 0) {
                // must create threads
                for(int i = 1; i <= difference; i++) {
                    startNewThread();
                }
            } else {
                // must retire extra threads
                int retainedToes = targetSize; 
                Thread[] toes = this.getToes();
                for (int i = 0; i < toes.length ; i++) {
                    if(!(toes[i] instanceof ToeThread)) {
                        continue;
                    }
                    retainedToes--;
                    if (retainedToes>=0) {
                        continue; // this toe is spared
                    }
                    // otherwise:
                    ToeThread tt = (ToeThread)toes[i];
                    tt.retire();
                }
            }
        }

    关键在这个方法startNewThread();

    private synchronized void startNewThread() {
            ToeThread newThread = new ToeThread(this, nextSerialNumber++);
            newThread.setPriority(DEFAULT_TOE_PRIORITY);
            newThread.start();
        }

    这里新建线程并将当前线程组ToePool对象作为参数传入并调用其启动方法

    ToeThread类继承自Thread类,其成员变量如下: 

    public enum Step {
            NASCENT, ABOUT_TO_GET_URI, FINISHED, 
            ABOUT_TO_BEGIN_PROCESSOR, HANDLING_RUNTIME_EXCEPTION, 
            ABOUT_TO_RETURN_URI, FINISHING_PROCESS
        }
    
        private static Logger logger =
            Logger.getLogger("org.archive.crawler.framework.ToeThread");
    
        private CrawlController controller;
        private int serialNumber;
        
        /**
         * Each ToeThead has an instance of HttpRecord that gets used
         * over and over by each request.
         * 
         * @see org.archive.util.RecorderMarker
         */
        private Recorder httpRecorder = null;
    
        // activity monitoring, debugging, and problem detection
        private Step step = Step.NASCENT;
        private long atStepSince;
        private String currentProcessorName = "";
        
        private String coreName;
        private CrawlURI currentCuri;
        private long lastStartTime;
        private long lastFinishTime;
    
        
        // default priority; may not be meaningful in recent JVMs
        private static final int DEFAULT_PRIORITY = Thread.NORM_PRIORITY-2;
        
        // indicator that a thread is now surplus based on current desired
        // count; it should wrap up cleanly
        private volatile boolean shouldRetire = false;

    我们再查看ToeThread类的构造函数

    /**
         * Create a ToeThread
         * 
         * @param g ToeThreadGroup
         * @param sn serial number
         */
        public ToeThread(ToePool g, int sn) {
            // TODO: add crawl name?
            super(g,"ToeThread #" + sn);
            coreName="ToeThread #" + sn + ": ";
            controller = g.getController();
            serialNumber = sn;
            setPriority(DEFAULT_PRIORITY);
            int outBufferSize = controller.getRecorderOutBufferBytes();
            int inBufferSize = controller.getRecorderInBufferBytes();
            httpRecorder = new Recorder(controller.getScratchDir().getFile(),
                "tt" + sn + "http", outBufferSize, inBufferSize);
            lastFinishTime = System.currentTimeMillis();
        }

    设置线程组对象、线程序号、初始化CrawlController controller对象等

    当启动线程时,ToeThread线程对象的void run()方法如下

    /** (non-Javadoc)
         * @see java.lang.Thread#run()
         */
        public void run() {
            String name = controller.getMetadata().getJobName();
            logger.fine(getName()+" started for order '"+name+"'");
            Recorder.setHttpRecorder(httpRecorder); 
            
            try {
                while ( true ) {
                    ArchiveUtils.continueCheck();
                    
                    setStep(Step.ABOUT_TO_GET_URI, null);
    
                    CrawlURI curi = controller.getFrontier().next();
                    
                    
                    synchronized(this) {
                        ArchiveUtils.continueCheck();
                        setCurrentCuri(curi);
                        currentCuri.setThreadNumber(this.serialNumber);
                        lastStartTime = System.currentTimeMillis();
                        currentCuri.setRecorder(httpRecorder);
                    }
                    
                    try {
                        KeyedProperties.loadOverridesFrom(curi);
                        
                        //System.out.println("FetchChain:"+controller.getFetchChain().getClass().getName());
                        
                        
                        controller.getFetchChain().process(curi,this);
                        //System.out.println("Frontier:"+controller.getFrontier().getClass().getName());
                        controller.getFrontier().beginDisposition(curi);
                        
                        //System.out.println("DispositionChain:"+controller.getDispositionChain().getClass().getName());
                        controller.getDispositionChain().process(curi,this);
      
                    } catch (RuntimeExceptionWrapper e) {
                        // Workaround to get cause from BDB
                        if(e.getCause() == null) {
                            e.initCause(e.getCause());
                        }
                        recoverableProblem(e);
                    } catch (AssertionError ae) {
                        // This risks leaving crawl in fatally inconsistent state, 
                        // but is often reasonable for per-Processor assertion problems 
                        recoverableProblem(ae);
                    } catch (RuntimeException e) {
                        recoverableProblem(e);
                    } catch (InterruptedException e) {
                        if(currentCuri!=null) {
                            recoverableProblem(e);
                            Thread.interrupted(); // clear interrupt status
                        } else {
                            throw e;
                        }
                    } catch (StackOverflowError err) {
                        recoverableProblem(err);
                    } catch (Error err) {
                        // OutOfMemory and any others
                        seriousError(err); 
                    } finally {
                        httpRecorder.endReplays();
                        KeyedProperties.clearOverridesFrom(curi); 
                    }
                    
                    setStep(Step.ABOUT_TO_RETURN_URI, null);
                    ArchiveUtils.continueCheck();
    
                    synchronized(this) {
                        controller.getFrontier().finished(currentCuri);
                        controller.getFrontier().endDisposition();
                        setCurrentCuri(null);
                    }
                    curi = null;
                    
                    setStep(Step.FINISHING_PROCESS, null);
                    lastFinishTime = System.currentTimeMillis();
                    if(shouldRetire) {
                        break; // from while(true)
                    }
                }
            } catch (InterruptedException e) {
                if(currentCuri!=null){
                    logger.log(Level.SEVERE,"Interrupt leaving unfinished CrawlURI "+getName()+" - job may hang",e);
                }
                // thread interrupted, ok to end
                logger.log(Level.FINE,this.getName()+ " ended with Interruption");
            } catch (Exception e) {
                // everything else (including interruption)
                logger.log(Level.SEVERE,"Fatal exception in "+getName(),e);
            } catch (OutOfMemoryError err) {
                seriousError(err);
            } finally {
                controller.getFrontier().endDisposition();
    
            }
    
            setCurrentCuri(null);
            // Do cleanup so that objects can be GC.
            this.httpRecorder.closeRecorders();
            this.httpRecorder = null;
    
            logger.fine(getName()+" finished for order '"+name+"'");
            setStep(Step.FINISHED, null);
            controller = null;
        }

    ToePool类的void waitForAll()方法如下 

    public void waitForAll() {
            while (true) try {
                if (isAllAlive(getToes())) {
                    return;
                }
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        }

    是否存在活动线程 

    private static boolean isAllAlive(Thread[] threads) {
            for (Thread t: threads) {
                if ((t != null) && (!t.isAlive())) {
                    return false;
                }
            }
            return true;
        }

    当CrawlController对象启动线程时,调用BdbFrontier对象的void unpause()方法,在BdbFrontier类的父类的父类AbstractFrontier类里面

    org.archive.crawler.frontier.BdbFrontier

             org.archive.crawler.frontier.AbstractFrontier

    public void unpause() {
            requestState(State.RUN);
        }

    设置BdbFrontier对象的状态volatile State targetState = State.PAUSE; 

    下文我们再来分析BdbFrontier对象的相关状态和方法

    --------------------------------------------------------------------------- 

    本系列Heritrix 3.1.0 源码解析系本人原创 

    转载请注明出处 博客园 刺猬的温驯 

    本文链接 http://www.cnblogs.com/chenying99/archive/2013/04/18/3027672.html

  • 相关阅读:
    函数
    文件处理及处理模式
    字符编码
    元组,字典和集合的用法
    数字类型、字符串和列表
    计算机硬件介绍
    数据类型及语法介绍
    初识python
    设计模式
    最近的时候
  • 原文地址:https://www.cnblogs.com/chenying99/p/3027672.html
Copyright © 2011-2022 走看看