zoukankan      html  css  js  c++  java
  • Heritrix 3.1.0 源码解析(三十七)

    今天有兴趣重新看了一下heritrix3.1.0系统里面的线程池源码,heritrix系统没有采用java的cocurrency包里面的并发框架,而是采用了线程组ThreadGroup类来实现线程池的(线程组类似于树结构,一个线程组包含多个子线程组或多个子线程,数据结构类似于composite模式,不过枝节点与叶子节点没有实现类似composite模式的共同接口)

    关键类是org.archive.crawler.framework包里面的ToePool类与ToeThread类,前者继承自ThreadGroup类,后者继承自Thread类

    ToeThread显然是工作线程,用于执行采集任务,构造函数初始化成员变量CrawlController controller,用于获取Frontier对象及相关处理器链

        private CrawlController controller; 
    private String coreName; private CrawlURI currentCuri; /** * Create a ToeThread * * @param g ToeThreadGroup * @param sn serial number */ public ToeThread(ToePool g, int sn) { // TODO: add crawl name? super(g,"ToeThread #" + sn); coreName="ToeThread #" + sn + ": "; controller = g.getController(); serialNumber = sn; setPriority(DEFAULT_PRIORITY); int outBufferSize = controller.getRecorderOutBufferBytes(); int inBufferSize = controller.getRecorderInBufferBytes(); httpRecorder = new Recorder(controller.getScratchDir().getFile(), "tt" + sn + "http", outBufferSize, inBufferSize); lastFinishTime = System.currentTimeMillis(); } /** (non-Javadoc) * @see java.lang.Thread#run() */ public void run() { String name = controller.getMetadata().getJobName(); logger.fine(getName()+" started for order '"+name+"'"); Recorder.setHttpRecorder(httpRecorder); try { while ( true ) { ArchiveUtils.continueCheck(); setStep(Step.ABOUT_TO_GET_URI, null); CrawlURI curi = controller.getFrontier().next(); synchronized(this) { ArchiveUtils.continueCheck(); setCurrentCuri(curi); currentCuri.setThreadNumber(this.serialNumber); lastStartTime = System.currentTimeMillis(); currentCuri.setRecorder(httpRecorder); } try { KeyedProperties.loadOverridesFrom(curi); controller.getFetchChain().process(curi,this); controller.getFrontier().beginDisposition(curi); controller.getDispositionChain().process(curi,this); } catch (RuntimeExceptionWrapper e) { // Workaround to get cause from BDB if(e.getCause() == null) { e.initCause(e.getCause()); } recoverableProblem(e); } catch (AssertionError ae) { // This risks leaving crawl in fatally inconsistent state, // but is often reasonable for per-Processor assertion problems recoverableProblem(ae); } catch (RuntimeException e) { recoverableProblem(e); } catch (InterruptedException e) { if(currentCuri!=null) { recoverableProblem(e); Thread.interrupted(); // clear interrupt status } else { throw e; } } catch (StackOverflowError err) { recoverableProblem(err); } catch (Error err) { // OutOfMemory and any others seriousError(err); } finally { httpRecorder.endReplays(); KeyedProperties.clearOverridesFrom(curi); } setStep(Step.ABOUT_TO_RETURN_URI, null); ArchiveUtils.continueCheck(); synchronized(this) { controller.getFrontier().finished(currentCuri); controller.getFrontier().endDisposition(); setCurrentCuri(null); } curi = null; setStep(Step.FINISHING_PROCESS, null); lastFinishTime = System.currentTimeMillis(); if(shouldRetire) { break; // from while(true) } } } catch (InterruptedException e) { if(currentCuri!=null){ logger.log(Level.SEVERE,"Interrupt leaving unfinished CrawlURI "+getName()+" - job may hang",e); } // thread interrupted, ok to end logger.log(Level.FINE,this.getName()+ " ended with Interruption"); } catch (Exception e) { // everything else (including interruption) logger.log(Level.SEVERE,"Fatal exception in "+getName(),e); } catch (OutOfMemoryError err) { seriousError(err); } finally { controller.getFrontier().endDisposition(); } setCurrentCuri(null); // Do cleanup so that objects can be GC. this.httpRecorder.closeRecorders(); this.httpRecorder = null; logger.fine(getName()+" finished for order '"+name+"'"); setStep(Step.FINISHED, null); controller = null; }

    ToePool是线程组,用于管理上面的工作线程,初始化、查看活动线程、中断或终止工作线程等

    protected CrawlController controller;
        protected int nextSerialNumber = 1;
        protected int targetSize = 0; 
    
        /**
         * Constructor. Creates a pool of ToeThreads. 
         *
         * @param c A reference to the CrawlController for the current crawl.
         */
        public ToePool(AlertThreadGroup atg, CrawlController c) {
            //传入父线程组
            super(atg, "ToeThreads");        
            this.controller = c;
            setDaemon(true);
        }
        
        public void cleanup() {
            // force all Toes waiting on queues, etc to proceed
            Thread[] toes = getToes();
            for(Thread toe : toes) {
                if(toe!=null) {
                    toe.interrupt();
                }
            }
    //        this.controller = null;
        }
    
        /**
         * @return The number of ToeThreads that are not available (Approximation).
         */
        public int getActiveToeCount() {
            Thread[] toes = getToes();
            int count = 0;
            for (int i = 0; i < toes.length; i++) {
                if((toes[i] instanceof ToeThread) &&
                        ((ToeThread)toes[i]).isActive()) {
                    count++;
                }
            }
            return count; 
        }
    
        /**
         * @return The number of ToeThreads. This may include killed ToeThreads
         *         that were not replaced.
         */
        public int getToeCount() {
            Thread[] toes = getToes();
            int count = 0;
            for (int i = 0; i<toes.length; i++) {
                if((toes[i] instanceof ToeThread)) {
                    count++;
                }
            }
            return count; 
        }
        //获取活动线程数组
        private Thread[] getToes() {
            Thread[] toes = new Thread[activeCount()+10];
            this.enumerate(toes);
            return toes;
        }
    
        /**
         * Change the number of ToeThreads.
         *
         * @param newsize The new number of ToeThreads.
         */
        public void setSize(int newsize)
        {
            targetSize = newsize;
            int difference = newsize - getToeCount(); 
            if (difference > 0) {
                // must create threads
                for(int i = 1; i <= difference; i++) {
                    //启动线程
                    startNewThread();
                }
            } else {
                //退出多余线程
                // must retire extra threads
                int retainedToes = targetSize; 
                Thread[] toes = this.getToes();
                for (int i = 0; i < toes.length ; i++) {
                    if(!(toes[i] instanceof ToeThread)) {
                        continue;
                    }
                    retainedToes--;
                    if (retainedToes>=0) {
                        continue; // this toe is spared
                    }
                    // otherwise:
                    ToeThread tt = (ToeThread)toes[i];
                    tt.retire();
                }
            }
        }
    
        /**
         * Kills specified thread. Killed thread can be optionally replaced with a
         * new thread.
         *
         * <p><b>WARNING:</b> This operation should be used with great care. It may
         * destabilize the crawler.
         *
         * @param threadNumber Thread to kill
         * @param replace If true then a new thread will be created to take the
         *           killed threads place. Otherwise the total number of threads
         *           will decrease by one.
         */
        public void killThread(int threadNumber, boolean replace){
    
            Thread[] toes = getToes();
            for (int i = 0; i< toes.length; i++) {
                if(! (toes[i] instanceof ToeThread)) {
                    continue;
                }
                ToeThread toe = (ToeThread) toes[i];
                if(toe.getSerialNumber()==threadNumber) {
                    toe.kill();
                }
            }
    
            if(replace){
                // Create a new toe thread to take its place. Replace toe
                startNewThread();
            }
        }
        //锁定,防止并发初始化线程
        private synchronized void startNewThread() {
            ToeThread newThread = new ToeThread(this, nextSerialNumber++);
            newThread.setPriority(DEFAULT_TOE_PRIORITY);
            newThread.start();
        }
    
    
    public void waitForAll() {
            while (true) try {
                if (isAllAlive(getToes())) {
                    return;
                }
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        }
        
        
        private static boolean isAllAlive(Thread[] threads) {
            for (Thread t: threads) {
                if ((t != null) && (!t.isAlive())) {
                    return false;
                }
            }
            return true;
        }

    最后,线程组的初始化及工作线程的相关管理在CrawlController对象的相关方法执行

    /**
         * Maximum number of threads processing URIs at the same time.
         */
        int maxToeThreads; 
        public int getMaxToeThreads() {
            return maxToeThreads;
        }
        @Value("25")
        public void setMaxToeThreads(int maxToeThreads) {
            this.maxToeThreads = maxToeThreads;
            if(toePool!=null) {
                toePool.setSize(this.maxToeThreads);
            }
        }
    
    
    private transient ToePool toePool;
    
    /**
         * Called when the last toethread exits.
         */
        protected void completeStop() {
            LOGGER.fine("Entered complete stop.");
    
            statisticsTracker.getSnapshot(); // ???
            
            this.reserveMemory = null;
            if (this.toePool != null) {
                this.toePool.cleanup();
            }
            this.toePool = null;
    
            LOGGER.fine("Finished crawl.");
    
            try {
                appCtx.stop(); 
            } catch (RuntimeException re) {
                LOGGER.log(Level.SEVERE,re.getMessage(),re);
            }
            
            sendCrawlStateChangeEvent(State.FINISHED, this.sExit);
    
            // CrawlJob needs to be sure all beans have received FINISHED signal before teardown
            this.isStopComplete = true;
            appCtx.publishEvent(new StopCompleteEvent(this)); 
        }
    
    
    
    /**
         * Operator requested for crawl to stop.
         */
        public synchronized void requestCrawlStop() {
            if(state == State.STOPPING) {
                // second stop request; nudge the threads with interrupts
                getToePool().cleanup();
            }
            requestCrawlStop(CrawlStatus.ABORTED);
        }
    
    
    
    /**
         * @return Active toe thread count.
         */
        public int getActiveToeCount() {
            if (toePool == null) {
                return 0;
            }
            return toePool.getActiveToeCount();
        }
    
        protected void setupToePool() {
            toePool = new ToePool(alertThreadGroup,this);
            // TODO: make # of toes self-optimizing
            toePool.setSize(getMaxToeThreads());
            toePool.waitForAll();
        }
    
        /**
         * @return The number of ToeThreads
         *
         * @see ToePool#getToeCount()
         */
        public int getToeCount() {
            return this.toePool == null? 0: this.toePool.getToeCount();
        }
    
        /**
         * @return The ToePool
         */
        public ToePool getToePool() {
            return toePool;
        }
    
        /**
         * Kills a thread. For details see
         * {@link org.archive.crawler.framework.ToePool#killThread(int, boolean)
         * ToePool.killThread(int, boolean)}.
         * @param threadNumber Thread to kill.
         * @param replace Should thread be replaced.
         * @see org.archive.crawler.framework.ToePool#killThread(int, boolean)
         */
        public void killThread(int threadNumber, boolean replace){
            toePool.killThread(threadNumber, replace);
        }

     说得够清楚吧

    --------------------------------------------------------------------------- 

    本系列Heritrix 3.1.0 源码解析系本人原创 

    本人邮箱:chenying998179@163#com (#改为.

    转载请注明出处 博客园 刺猬的温驯 

    本文链接 http://www.cnblogs.com/chenying99/p/3213556.html

  • 相关阅读:
    Python3之网络编程
    Python3之内置函数
    Python3之面向对象
    Python3之函数
    Python3基础数据类型-字符串
    else配合while或者for循环只用注意点
    字符串
    元组
    48964
    1651
  • 原文地址:https://www.cnblogs.com/chenying99/p/3213556.html
Copyright © 2011-2022 走看看