本文分析BdbFrontier对象的相关状态和方法
BdbFrontier类继承自WorkQueueFrontier类 WorkQueueFrontier类继承自AbstractFrontier类
BdbFrontier类的void start()方法如下(在其父类WorkQueueFrontier里面):
org.archive.crawler.frontier.BdbFrontier
org.archive.crawler.frontier.WorkQueueFrontier
public void start() { if(isRunning()) { return; } uriUniqFilter.setDestination(this); super.start(); try { initInternalQueues(); } catch (Exception e) { throw new IllegalStateException(e); } }
调用父类AbstractFrontier的void start()方法
public void start() { if(isRunning()) { return; } if (getRecoveryLogEnabled()) try { initJournal(loggerModule.getPath().getFile().getAbsolutePath()); } catch (IOException e) { throw new IllegalStateException(e); } pause(); startManagerThread(); }
首先设置当前对象(BdbFrontier)为State.PAUSE状态,然后调用void startManagerThread()方法
/** * Start the dedicated thread with an independent view of the frontier's * state. */ protected void startManagerThread() { managerThread = new Thread(this+".managerThread") { public void run() { AbstractFrontier.this.managementTasks(); } }; managerThread.setPriority(Thread.NORM_PRIORITY+1); managerThread.start(); }
在线程对象Thread managerThread里面调用void managementTasks()方法
/** * Main loop of frontier's managerThread. Only exits when State.FINISH * is requested (perhaps automatically at URI exhaustion) and reached. * * General strategy is to try to fill outbound queue, then process an * item from inbound queue, and repeat. A HOLD (to be implemented) or * PAUSE puts frontier into a stable state that won't be changed * asynchronously by worker thread activity. */ protected void managementTasks() { assert Thread.currentThread() == managerThread; try { loop: while (true) { try { State reachedState = null; switch (targetState) { case EMPTY: reachedState = State.EMPTY; case RUN: // enable outbound takes if previously locked while(outboundLock.isWriteLockedByCurrentThread()) { outboundLock.writeLock().unlock(); } if(reachedState==null) { reachedState = State.RUN; } reachedState(reachedState); Thread.sleep(1000); if(isEmpty()&&targetState==State.RUN) { requestState(State.EMPTY); } else if (!isEmpty()&&targetState==State.EMPTY) { requestState(State.RUN); } break; case HOLD: // TODO; for now treat same as PAUSE case PAUSE: // pausing // prevent all outbound takes outboundLock.writeLock().lock(); // process all inbound while (targetState == State.PAUSE) { if (getInProcessCount()==0) { reachedState(State.PAUSE); } Thread.sleep(1000); } break; case FINISH: // prevent all outbound takes outboundLock.writeLock().lock(); // process all inbound while (getInProcessCount()>0) { Thread.sleep(1000); } finalTasks(); // TODO: more cleanup? reachedState(State.FINISH); break loop; } } catch (RuntimeException e) { // log, try to pause, continue logger.log(Level.SEVERE,"",e); if(targetState!=State.PAUSE && targetState!=State.FINISH) { requestState(State.PAUSE); } } } } catch (InterruptedException e) { throw new RuntimeException(e); } // try to leave in safely restartable state: targetState = State.PAUSE; while(outboundLock.isWriteLockedByCurrentThread()) { outboundLock.writeLock().unlock(); } //TODO: ensure all other structures are cleanly reset on restart logger.log(Level.FINE,"ending frontier mgr thread"); }
上面的方法是不断的根据BdbFrontier对象当前状态设置成员变量protected ReentrantReadWriteLock outboundLock = new ReentrantReadWriteLock(true)的锁定状态
后面的void initInternalQueues() 方法是初始化爬虫任务的相关队列
/** * Initializes internal queues. May decide to keep all queues in memory based on * {@link QueueAssignmentPolicy#maximumNumberOfKeys}. Otherwise invokes * {@link #initAllQueues()} to actually set up the queues. * * Subclasses should invoke this method with recycle set to "true" in * a private readObject method, to restore queues after a checkpoint. * * @param recycle * @throws IOException * @throws DatabaseException */ protected void initInternalQueues() throws IOException, DatabaseException { this.initOtherQueues(); if (workQueueDataOnDisk() && preparer.getQueueAssignmentPolicy().maximumNumberOfKeys() >= 0 && preparer.getQueueAssignmentPolicy().maximumNumberOfKeys() <= MAX_QUEUES_TO_HOLD_ALLQUEUES_IN_MEMORY) { this.allQueues = new ObjectIdentityMemCache<WorkQueue>(701, .9f, 100); } else { this.initAllQueues(); } }
首先调用BdbFrontier对象的void initOtherQueues()方法,在BdbFrontier类里面
@Override protected void initOtherQueues() throws DatabaseException { boolean recycle = (recoveryCheckpoint != null); // tiny risk of OutOfMemoryError: if giant number of snoozed // queues all wake-to-ready at once readyClassQueues = new LinkedBlockingQueue<String>(); inactiveQueuesByPrecedence = new ConcurrentSkipListMap<Integer,Queue<String>>(); retiredQueues = bdb.getStoredQueue("retiredQueues", String.class, recycle); // primary snoozed queues snoozedClassQueues = new DelayQueue<DelayedWorkQueue>(); // just in case: overflow for extreme situations snoozedOverflow = bdb.getStoredMap( "snoozedOverflow", Long.class, DelayedWorkQueue.class, true, false); this.futureUris = bdb.getStoredMap( "futureUris", Long.class, CrawlURI.class, true, recoveryCheckpoint!=null); // initialize master map in which other queues live this.pendingUris = createMultipleWorkQueues(); }
上述方法初始化了一系列的队列,这些队列各自的作用待后文再分析
void initAllQueues()方法是初始化成员变量ObjectIdentityCache<WorkQueue> allQueues = null;如下,在BdbFrontier类里面
@Override protected void initAllQueues() throws DatabaseException { boolean isRecovery = (recoveryCheckpoint != null); this.allQueues = bdb.getObjectCache("allqueues", isRecovery, WorkQueue.class, BdbWorkQueue.class); if(isRecovery) { // restore simple instance fields JSONObject json = recoveryCheckpoint.loadJson(beanName); try { nextOrdinal.set(json.getLong("nextOrdinal")); queuedUriCount.set(json.getLong("queuedUriCount")); futureUriCount.set(json.getLong("futureUriCount")); succeededFetchCount.set(json.getLong("succeededFetchCount")); failedFetchCount.set(json.getLong("failedFetchCount")); disregardedUriCount.set(json.getLong("disregardedUriCount")); totalProcessedBytes.set(json.getLong("totalProcessedBytes")); JSONArray inactivePrecedences = json.getJSONArray("inactivePrecedences"); // restore all intended inactiveQueues for(int i = 0; i < inactivePrecedences.length(); i++) { int precedence = inactivePrecedences.getInt(i); inactiveQueuesByPrecedence.put(precedence,createInactiveQueueForPrecedence(precedence,true)); } } catch (JSONException e) { throw new RuntimeException(e); } // retired queues already restored with prior data in initOtherQueues // restore ready queues (those not already on inactive, retired) BufferedReader activeQueuesReader = null; try { activeQueuesReader = recoveryCheckpoint.loadReader(beanName,"active"); String line; while((line = activeQueuesReader.readLine())!=null) { readyClassQueues.add(line); } } catch (IOException ioe) { throw new RuntimeException(ioe); } finally { IOUtils.closeQuietly(activeQueuesReader); } // TODO: restore largestQueues topNset? } }
ObjectIdentityCache<WorkQueue> allQueues成员用于管理BdbWorkQueue工作队列的缓存
---------------------------------------------------------------------------
本系列Heritrix 3.1.0 源码解析系本人原创
转载请注明出处 博客园 刺猬的温驯
本文链接 http://www.cnblogs.com/chenying99/archive/2013/04/18/3027677.html