我们从上文的CrawlController对象可以看到,爬虫任务是通过ToePool类建立线程ToeThread的线程池的
我们在了解采集线程池的相关类之前,先有必要了解一下CrawlController类,因为我们的爬虫操作指令最终是通过调用CrawlController对象的方法的
CrawlController类的成员和方法都是直接与采集任务相关的,好比控制中心
// ApplicationContextAware implementation, for eventing AbstractApplicationContext appCtx; public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.appCtx = (AbstractApplicationContext)applicationContext; } CrawlMetadata metadata; public CrawlMetadata getMetadata() { return metadata; } @Autowired public void setMetadata(CrawlMetadata provider) { this.metadata = provider; } protected ServerCache serverCache; public ServerCache getServerCache() { return this.serverCache; } @Autowired public void setServerCache(ServerCache serverCache) { this.serverCache = serverCache; } /** * The frontier to use for the crawl. */ protected Frontier frontier; public Frontier getFrontier() { return this.frontier; } @Autowired public void setFrontier(Frontier frontier) { this.frontier = frontier; } /** * Scratch directory for temporary overflow-to-disk */ protected ConfigPath scratchDir = new ConfigPath("scratch subdirectory","scratch"); public ConfigPath getScratchDir() { return scratchDir; } public void setScratchDir(ConfigPath scratchDir) { this.scratchDir = scratchDir; } /** * Statistics tracking modules. Any number of specialized statistics * trackers that monitor a crawl and write logs, reports and/or provide * information to the user interface. */ protected StatisticsTracker statisticsTracker; public StatisticsTracker getStatisticsTracker() { return this.statisticsTracker; } @Autowired public void setStatisticsTracker(StatisticsTracker statisticsTracker) { this.statisticsTracker = statisticsTracker; } protected SeedModule seeds; public SeedModule getSeeds() { return this.seeds; } @Autowired public void setSeeds(SeedModule seeds) { this.seeds = seeds; } /** * Fetch chain */ protected FetchChain fetchChain; public FetchChain getFetchChain() { return this.fetchChain; } @Autowired public void setFetchChain(FetchChain fetchChain) { this.fetchChain = fetchChain; } /** * Disposition chain */ protected DispositionChain dispositionChain; public DispositionChain getDispositionChain() { return this.dispositionChain; } @Autowired public void setDispositionChain(DispositionChain dispositionChain) { this.dispositionChain = dispositionChain; } /** * Candidate chain */ protected CandidateChain candidateChain; public CandidateChain getCandidateChain() { return this.candidateChain; } @Autowired public void setCandidateChain(CandidateChain candidateChain) { this.candidateChain = candidateChain; }
上述成员变量都是很重要的,包括spring容器对象、CrawlMetadata元数据、ServerCache服务缓存、Frontier对象、SeedModule种子模块、statisticsTracker统计跟踪、以及后面的处理器链FetchChain DispositionChain CandidateChain等,其他的没有贴出来的基本上与采集任务配置相关的参数,如线程数量等
它的初始化方法设置初始状态
public void start() { // cache AlertThreadGroup for later ToePool launch AlertThreadGroup atg = AlertThreadGroup.current(); if(atg!=null) { alertThreadGroup = atg; } if(isRunning) { return; } sExit = CrawlStatus.FINISHED_ABNORMAL; // force creation of DNS Cache now -- avoids CacheCleaner in toe-threads group // also cap size at 1 (we never wanta cached value; 0 is non-operative) Lookup.getDefaultCache(DClass.IN).setMaxEntries(1); reserveMemory = new LinkedList<char[]>(); for(int i = 0; i < RESERVE_BLOCKS; i++) { reserveMemory.add(new char[RESERVE_BLOCK_SIZE]); } isRunning = true; }
ToePool类继承自ThreadGroup线程组类,它的成员变量如下
public static int DEFAULT_TOE_PRIORITY = Thread.NORM_PRIORITY - 1; protected CrawlController controller; protected int nextSerialNumber = 1; protected int targetSize = 0;
我们再回头看上文中提到的CrawlController对象里面初始化ToePool方法
protected void setupToePool() { toePool = new ToePool(alertThreadGroup,this); // TODO: make # of toes self-optimizing toePool.setSize(getMaxToeThreads()); toePool.waitForAll(); }
传入线程组(这里作为父线程组)和CrawlController对象
ToeThread类的构造函数如下
/** * Constructor. Creates a pool of ToeThreads. * * @param c A reference to the CrawlController for the current crawl. */ public ToePool(AlertThreadGroup atg, CrawlController c) { super(atg, "ToeThreads"); this.controller = c; setDaemon(true); }
设置父线程组对象和初始化CrawlController对象
void setSize(int newsize)方法设置线程池大小,并启动指定数量线程
/** * Change the number of ToeThreads. * * @param newsize The new number of ToeThreads. */ public void setSize(int newsize) { targetSize = newsize; int difference = newsize - getToeCount(); if (difference > 0) { // must create threads for(int i = 1; i <= difference; i++) { startNewThread(); } } else { // must retire extra threads int retainedToes = targetSize; Thread[] toes = this.getToes(); for (int i = 0; i < toes.length ; i++) { if(!(toes[i] instanceof ToeThread)) { continue; } retainedToes--; if (retainedToes>=0) { continue; // this toe is spared } // otherwise: ToeThread tt = (ToeThread)toes[i]; tt.retire(); } } }
关键在这个方法startNewThread();
private synchronized void startNewThread() { ToeThread newThread = new ToeThread(this, nextSerialNumber++); newThread.setPriority(DEFAULT_TOE_PRIORITY); newThread.start(); }
这里新建线程并将当前线程组ToePool对象作为参数传入并调用其启动方法
ToeThread类继承自Thread类,其成员变量如下:
public enum Step { NASCENT, ABOUT_TO_GET_URI, FINISHED, ABOUT_TO_BEGIN_PROCESSOR, HANDLING_RUNTIME_EXCEPTION, ABOUT_TO_RETURN_URI, FINISHING_PROCESS } private static Logger logger = Logger.getLogger("org.archive.crawler.framework.ToeThread"); private CrawlController controller; private int serialNumber; /** * Each ToeThead has an instance of HttpRecord that gets used * over and over by each request. * * @see org.archive.util.RecorderMarker */ private Recorder httpRecorder = null; // activity monitoring, debugging, and problem detection private Step step = Step.NASCENT; private long atStepSince; private String currentProcessorName = ""; private String coreName; private CrawlURI currentCuri; private long lastStartTime; private long lastFinishTime; // default priority; may not be meaningful in recent JVMs private static final int DEFAULT_PRIORITY = Thread.NORM_PRIORITY-2; // indicator that a thread is now surplus based on current desired // count; it should wrap up cleanly private volatile boolean shouldRetire = false;
我们再查看ToeThread类的构造函数
/** * Create a ToeThread * * @param g ToeThreadGroup * @param sn serial number */ public ToeThread(ToePool g, int sn) { // TODO: add crawl name? super(g,"ToeThread #" + sn); coreName="ToeThread #" + sn + ": "; controller = g.getController(); serialNumber = sn; setPriority(DEFAULT_PRIORITY); int outBufferSize = controller.getRecorderOutBufferBytes(); int inBufferSize = controller.getRecorderInBufferBytes(); httpRecorder = new Recorder(controller.getScratchDir().getFile(), "tt" + sn + "http", outBufferSize, inBufferSize); lastFinishTime = System.currentTimeMillis(); }
设置线程组对象、线程序号、初始化CrawlController controller对象等
当启动线程时,ToeThread线程对象的void run()方法如下
/** (non-Javadoc) * @see java.lang.Thread#run() */ public void run() { String name = controller.getMetadata().getJobName(); logger.fine(getName()+" started for order '"+name+"'"); Recorder.setHttpRecorder(httpRecorder); try { while ( true ) { ArchiveUtils.continueCheck(); setStep(Step.ABOUT_TO_GET_URI, null); CrawlURI curi = controller.getFrontier().next(); synchronized(this) { ArchiveUtils.continueCheck(); setCurrentCuri(curi); currentCuri.setThreadNumber(this.serialNumber); lastStartTime = System.currentTimeMillis(); currentCuri.setRecorder(httpRecorder); } try { KeyedProperties.loadOverridesFrom(curi); //System.out.println("FetchChain:"+controller.getFetchChain().getClass().getName()); controller.getFetchChain().process(curi,this); //System.out.println("Frontier:"+controller.getFrontier().getClass().getName()); controller.getFrontier().beginDisposition(curi); //System.out.println("DispositionChain:"+controller.getDispositionChain().getClass().getName()); controller.getDispositionChain().process(curi,this); } catch (RuntimeExceptionWrapper e) { // Workaround to get cause from BDB if(e.getCause() == null) { e.initCause(e.getCause()); } recoverableProblem(e); } catch (AssertionError ae) { // This risks leaving crawl in fatally inconsistent state, // but is often reasonable for per-Processor assertion problems recoverableProblem(ae); } catch (RuntimeException e) { recoverableProblem(e); } catch (InterruptedException e) { if(currentCuri!=null) { recoverableProblem(e); Thread.interrupted(); // clear interrupt status } else { throw e; } } catch (StackOverflowError err) { recoverableProblem(err); } catch (Error err) { // OutOfMemory and any others seriousError(err); } finally { httpRecorder.endReplays(); KeyedProperties.clearOverridesFrom(curi); } setStep(Step.ABOUT_TO_RETURN_URI, null); ArchiveUtils.continueCheck(); synchronized(this) { controller.getFrontier().finished(currentCuri); controller.getFrontier().endDisposition(); setCurrentCuri(null); } curi = null; setStep(Step.FINISHING_PROCESS, null); lastFinishTime = System.currentTimeMillis(); if(shouldRetire) { break; // from while(true) } } } catch (InterruptedException e) { if(currentCuri!=null){ logger.log(Level.SEVERE,"Interrupt leaving unfinished CrawlURI "+getName()+" - job may hang",e); } // thread interrupted, ok to end logger.log(Level.FINE,this.getName()+ " ended with Interruption"); } catch (Exception e) { // everything else (including interruption) logger.log(Level.SEVERE,"Fatal exception in "+getName(),e); } catch (OutOfMemoryError err) { seriousError(err); } finally { controller.getFrontier().endDisposition(); } setCurrentCuri(null); // Do cleanup so that objects can be GC. this.httpRecorder.closeRecorders(); this.httpRecorder = null; logger.fine(getName()+" finished for order '"+name+"'"); setStep(Step.FINISHED, null); controller = null; }
ToePool类的void waitForAll()方法如下
public void waitForAll() { while (true) try { if (isAllAlive(getToes())) { return; } Thread.sleep(1000); } catch (InterruptedException e) { throw new IllegalStateException(e); } }
是否存在活动线程
private static boolean isAllAlive(Thread[] threads) { for (Thread t: threads) { if ((t != null) && (!t.isAlive())) { return false; } } return true; }
当CrawlController对象启动线程时,调用BdbFrontier对象的void unpause()方法,在BdbFrontier类的父类的父类AbstractFrontier类里面
org.archive.crawler.frontier.BdbFrontier
org.archive.crawler.frontier.AbstractFrontier
public void unpause() { requestState(State.RUN); }
设置BdbFrontier对象的状态volatile State targetState = State.PAUSE;
下文我们再来分析BdbFrontier对象的相关状态和方法
---------------------------------------------------------------------------
本系列Heritrix 3.1.0 源码解析系本人原创
转载请注明出处 博客园 刺猬的温驯
本文链接 http://www.cnblogs.com/chenying99/archive/2013/04/18/3027672.html