zoukankan      html  css  js  c++  java
  • Heritrix 3.1.0 源码解析(六)

    本文分析BdbFrontier对象的相关状态和方法

    BdbFrontier类继承自WorkQueueFrontier类   WorkQueueFrontier类继承自AbstractFrontier类

    BdbFrontier类的void start()方法如下(在其父类WorkQueueFrontier里面):

    org.archive.crawler.frontier.BdbFrontier

            org.archive.crawler.frontier.WorkQueueFrontier

    public void start() {
            if(isRunning()) {
                return; 
            }
            uriUniqFilter.setDestination(this);
            super.start();
            try {
                initInternalQueues();
            } catch (Exception e) {
                throw new IllegalStateException(e);
            }
    
        }

    调用父类AbstractFrontier的void start()方法

     public void start() {
            if(isRunning()) {
                return; 
            }
            
            if (getRecoveryLogEnabled()) try {
                initJournal(loggerModule.getPath().getFile().getAbsolutePath());
            } catch (IOException e) {
                throw new IllegalStateException(e);
            }
            pause();
            startManagerThread();
        }

    首先设置当前对象(BdbFrontier)为State.PAUSE状态,然后调用void startManagerThread()方法

    /**
         * Start the dedicated thread with an independent view of the frontier's
         * state. 
         */
        protected void startManagerThread() {
            managerThread = new Thread(this+".managerThread") {
                public void run() {
                    AbstractFrontier.this.managementTasks();
                }
            };
            managerThread.setPriority(Thread.NORM_PRIORITY+1); 
            managerThread.start();
        }

    在线程对象Thread managerThread里面调用void managementTasks()方法

    /**
         * Main loop of frontier's managerThread. Only exits when State.FINISH 
         * is requested (perhaps automatically at URI exhaustion) and reached. 
         * 
         * General strategy is to try to fill outbound queue, then process an
         * item from inbound queue, and repeat. A HOLD (to be implemented) or 
         * PAUSE puts frontier into a stable state that won't be changed
         * asynchronously by worker thread activity. 
         */
        protected void managementTasks() {
            assert Thread.currentThread() == managerThread;
            try {
                loop: while (true) {
                    try {
                        State reachedState = null; 
                        switch (targetState) {
                        case EMPTY:
                            reachedState = State.EMPTY; 
                        case RUN:
                            // enable outbound takes if previously locked
                            while(outboundLock.isWriteLockedByCurrentThread()) {
                                outboundLock.writeLock().unlock();
                            }
                            if(reachedState==null) {
                                reachedState = State.RUN; 
                            }
                            reachedState(reachedState);
                            
                            Thread.sleep(1000);
                            
                            if(isEmpty()&&targetState==State.RUN) {
                                requestState(State.EMPTY); 
                            } else if (!isEmpty()&&targetState==State.EMPTY) {
                                requestState(State.RUN); 
                            }
                            break;
                        case HOLD:
                            // TODO; for now treat same as PAUSE
                        case PAUSE:
                            // pausing
                            // prevent all outbound takes
                            outboundLock.writeLock().lock();
                            // process all inbound
                            while (targetState == State.PAUSE) {
                                if (getInProcessCount()==0) {
                                    reachedState(State.PAUSE);
                                }
                                
                                Thread.sleep(1000);
                            }
                            break;
                        case FINISH:
                            // prevent all outbound takes
                            outboundLock.writeLock().lock();
                            // process all inbound
                            while (getInProcessCount()>0) {
                                Thread.sleep(1000);
                            }
    
                            finalTasks(); 
                            // TODO: more cleanup?
                            reachedState(State.FINISH);
                            break loop;
                        }
                    } catch (RuntimeException e) {
                        // log, try to pause, continue
                        logger.log(Level.SEVERE,"",e);
                        if(targetState!=State.PAUSE && targetState!=State.FINISH) {
                            requestState(State.PAUSE);
                        }
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } 
            
            // try to leave in safely restartable state: 
            targetState = State.PAUSE;
            while(outboundLock.isWriteLockedByCurrentThread()) {
                outboundLock.writeLock().unlock();
            }
            //TODO: ensure all other structures are cleanly reset on restart
            
            logger.log(Level.FINE,"ending frontier mgr thread");
        }

    上面的方法是不断的根据BdbFrontier对象当前状态设置成员变量protected ReentrantReadWriteLock outboundLock = new ReentrantReadWriteLock(true)的锁定状态

    后面的void initInternalQueues() 方法是初始化爬虫任务的相关队列

    /**
         * Initializes internal queues.  May decide to keep all queues in memory based on
         * {@link QueueAssignmentPolicy#maximumNumberOfKeys}.  Otherwise invokes
         * {@link #initAllQueues()} to actually set up the queues.
         * 
         * Subclasses should invoke this method with recycle set to "true" in 
         * a private readObject method, to restore queues after a checkpoint.
         * 
         * @param recycle
         * @throws IOException
         * @throws DatabaseException
         */
        protected void initInternalQueues() 
        throws IOException, DatabaseException {
            this.initOtherQueues();
            if (workQueueDataOnDisk()
                    && preparer.getQueueAssignmentPolicy().maximumNumberOfKeys() >= 0
                    && preparer.getQueueAssignmentPolicy().maximumNumberOfKeys() <= 
                        MAX_QUEUES_TO_HOLD_ALLQUEUES_IN_MEMORY) {
                this.allQueues = 
                    new ObjectIdentityMemCache<WorkQueue>(701, .9f, 100);
            } else {
                this.initAllQueues();
            }
        }

    首先调用BdbFrontier对象的void initOtherQueues()方法,在BdbFrontier类里面

    @Override
        protected void initOtherQueues() throws DatabaseException {
            boolean recycle = (recoveryCheckpoint != null);
            
            // tiny risk of OutOfMemoryError: if giant number of snoozed
            // queues all wake-to-ready at once
            readyClassQueues = new LinkedBlockingQueue<String>();
    
            inactiveQueuesByPrecedence = new ConcurrentSkipListMap<Integer,Queue<String>>();
            
            retiredQueues = bdb.getStoredQueue("retiredQueues", String.class, recycle);
    
            // primary snoozed queues
            snoozedClassQueues = new DelayQueue<DelayedWorkQueue>();
            // just in case: overflow for extreme situations
            snoozedOverflow = bdb.getStoredMap(
                    "snoozedOverflow", Long.class, DelayedWorkQueue.class, true, false);
                
            this.futureUris = bdb.getStoredMap(
                    "futureUris", Long.class, CrawlURI.class, true, recoveryCheckpoint!=null);
            
            // initialize master map in which other queues live
            this.pendingUris = createMultipleWorkQueues();
        }

    上述方法初始化了一系列的队列,这些队列各自的作用待后文再分析

    void initAllQueues()方法是初始化成员变量ObjectIdentityCache<WorkQueue> allQueues = null;如下,在BdbFrontier类里面

    @Override
        protected void initAllQueues() throws DatabaseException {
            boolean isRecovery = (recoveryCheckpoint != null);
            this.allQueues = bdb.getObjectCache("allqueues", isRecovery, WorkQueue.class, BdbWorkQueue.class);
            if(isRecovery) {
                // restore simple instance fields 
                JSONObject json = recoveryCheckpoint.loadJson(beanName);
                try {
                    nextOrdinal.set(json.getLong("nextOrdinal"));
                    queuedUriCount.set(json.getLong("queuedUriCount"));
                    futureUriCount.set(json.getLong("futureUriCount"));
                    succeededFetchCount.set(json.getLong("succeededFetchCount"));
                    failedFetchCount.set(json.getLong("failedFetchCount"));
                    disregardedUriCount.set(json.getLong("disregardedUriCount"));
                    totalProcessedBytes.set(json.getLong("totalProcessedBytes"));
                    JSONArray inactivePrecedences = json.getJSONArray("inactivePrecedences"); 
                    // restore all intended inactiveQueues
                    for(int i = 0; i < inactivePrecedences.length(); i++) {
                        int precedence = inactivePrecedences.getInt(i);
                        inactiveQueuesByPrecedence.put(precedence,createInactiveQueueForPrecedence(precedence,true));
                    }
                } catch (JSONException e) {
                    throw new RuntimeException(e);
                }           
                
                // retired queues already restored with prior data in initOtherQueues
                
                // restore ready queues (those not already on inactive, retired)
                BufferedReader activeQueuesReader = null;
                try {
                    activeQueuesReader = recoveryCheckpoint.loadReader(beanName,"active");
                    String line; 
                    while((line = activeQueuesReader.readLine())!=null) {
                        readyClassQueues.add(line); 
                    }
                } catch (IOException ioe) {
                    throw new RuntimeException(ioe); 
                } finally {
                    IOUtils.closeQuietly(activeQueuesReader); 
                }
    
                // TODO: restore largestQueues topNset?
            }
        }

     ObjectIdentityCache<WorkQueue> allQueues成员用于管理BdbWorkQueue工作队列的缓存

    ---------------------------------------------------------------------------

    本系列Heritrix 3.1.0 源码解析系本人原创

    转载请注明出处 博客园 刺猬的温驯

    本文链接 http://www.cnblogs.com/chenying99/archive/2013/04/18/3027677.html

  • 相关阅读:
    Socket listen 简要分析
    Socket connect 等简要分析
    HIVE函数大全
    元数据管理
    flume
    shell编程
    数据仓库集锦
    数据库知识
    hive sql 转化mapreduce原理
    Hadoop 学习笔记
  • 原文地址:https://www.cnblogs.com/chenying99/p/3027677.html
Copyright © 2011-2022 走看看