zoukankan      html  css  js  c++  java
  • Heritrix 3.1.0 源码解析(六)

    本文分析BdbFrontier对象的相关状态和方法

    BdbFrontier类继承自WorkQueueFrontier类   WorkQueueFrontier类继承自AbstractFrontier类

    BdbFrontier类的void start()方法如下(在其父类WorkQueueFrontier里面):

    org.archive.crawler.frontier.BdbFrontier

            org.archive.crawler.frontier.WorkQueueFrontier

    public void start() {
            if(isRunning()) {
                return; 
            }
            uriUniqFilter.setDestination(this);
            super.start();
            try {
                initInternalQueues();
            } catch (Exception e) {
                throw new IllegalStateException(e);
            }
    
        }

    调用父类AbstractFrontier的void start()方法

     public void start() {
            if(isRunning()) {
                return; 
            }
            
            if (getRecoveryLogEnabled()) try {
                initJournal(loggerModule.getPath().getFile().getAbsolutePath());
            } catch (IOException e) {
                throw new IllegalStateException(e);
            }
            pause();
            startManagerThread();
        }

    首先设置当前对象(BdbFrontier)为State.PAUSE状态,然后调用void startManagerThread()方法

    /**
         * Start the dedicated thread with an independent view of the frontier's
         * state. 
         */
        protected void startManagerThread() {
            managerThread = new Thread(this+".managerThread") {
                public void run() {
                    AbstractFrontier.this.managementTasks();
                }
            };
            managerThread.setPriority(Thread.NORM_PRIORITY+1); 
            managerThread.start();
        }

    在线程对象Thread managerThread里面调用void managementTasks()方法

    /**
         * Main loop of frontier's managerThread. Only exits when State.FINISH 
         * is requested (perhaps automatically at URI exhaustion) and reached. 
         * 
         * General strategy is to try to fill outbound queue, then process an
         * item from inbound queue, and repeat. A HOLD (to be implemented) or 
         * PAUSE puts frontier into a stable state that won't be changed
         * asynchronously by worker thread activity. 
         */
        protected void managementTasks() {
            assert Thread.currentThread() == managerThread;
            try {
                loop: while (true) {
                    try {
                        State reachedState = null; 
                        switch (targetState) {
                        case EMPTY:
                            reachedState = State.EMPTY; 
                        case RUN:
                            // enable outbound takes if previously locked
                            while(outboundLock.isWriteLockedByCurrentThread()) {
                                outboundLock.writeLock().unlock();
                            }
                            if(reachedState==null) {
                                reachedState = State.RUN; 
                            }
                            reachedState(reachedState);
                            
                            Thread.sleep(1000);
                            
                            if(isEmpty()&&targetState==State.RUN) {
                                requestState(State.EMPTY); 
                            } else if (!isEmpty()&&targetState==State.EMPTY) {
                                requestState(State.RUN); 
                            }
                            break;
                        case HOLD:
                            // TODO; for now treat same as PAUSE
                        case PAUSE:
                            // pausing
                            // prevent all outbound takes
                            outboundLock.writeLock().lock();
                            // process all inbound
                            while (targetState == State.PAUSE) {
                                if (getInProcessCount()==0) {
                                    reachedState(State.PAUSE);
                                }
                                
                                Thread.sleep(1000);
                            }
                            break;
                        case FINISH:
                            // prevent all outbound takes
                            outboundLock.writeLock().lock();
                            // process all inbound
                            while (getInProcessCount()>0) {
                                Thread.sleep(1000);
                            }
    
                            finalTasks(); 
                            // TODO: more cleanup?
                            reachedState(State.FINISH);
                            break loop;
                        }
                    } catch (RuntimeException e) {
                        // log, try to pause, continue
                        logger.log(Level.SEVERE,"",e);
                        if(targetState!=State.PAUSE && targetState!=State.FINISH) {
                            requestState(State.PAUSE);
                        }
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } 
            
            // try to leave in safely restartable state: 
            targetState = State.PAUSE;
            while(outboundLock.isWriteLockedByCurrentThread()) {
                outboundLock.writeLock().unlock();
            }
            //TODO: ensure all other structures are cleanly reset on restart
            
            logger.log(Level.FINE,"ending frontier mgr thread");
        }

    上面的方法是不断的根据BdbFrontier对象当前状态设置成员变量protected ReentrantReadWriteLock outboundLock = new ReentrantReadWriteLock(true)的锁定状态

    后面的void initInternalQueues() 方法是初始化爬虫任务的相关队列

    /**
         * Initializes internal queues.  May decide to keep all queues in memory based on
         * {@link QueueAssignmentPolicy#maximumNumberOfKeys}.  Otherwise invokes
         * {@link #initAllQueues()} to actually set up the queues.
         * 
         * Subclasses should invoke this method with recycle set to "true" in 
         * a private readObject method, to restore queues after a checkpoint.
         * 
         * @param recycle
         * @throws IOException
         * @throws DatabaseException
         */
        protected void initInternalQueues() 
        throws IOException, DatabaseException {
            this.initOtherQueues();
            if (workQueueDataOnDisk()
                    && preparer.getQueueAssignmentPolicy().maximumNumberOfKeys() >= 0
                    && preparer.getQueueAssignmentPolicy().maximumNumberOfKeys() <= 
                        MAX_QUEUES_TO_HOLD_ALLQUEUES_IN_MEMORY) {
                this.allQueues = 
                    new ObjectIdentityMemCache<WorkQueue>(701, .9f, 100);
            } else {
                this.initAllQueues();
            }
        }

    首先调用BdbFrontier对象的void initOtherQueues()方法,在BdbFrontier类里面

    @Override
        protected void initOtherQueues() throws DatabaseException {
            boolean recycle = (recoveryCheckpoint != null);
            
            // tiny risk of OutOfMemoryError: if giant number of snoozed
            // queues all wake-to-ready at once
            readyClassQueues = new LinkedBlockingQueue<String>();
    
            inactiveQueuesByPrecedence = new ConcurrentSkipListMap<Integer,Queue<String>>();
            
            retiredQueues = bdb.getStoredQueue("retiredQueues", String.class, recycle);
    
            // primary snoozed queues
            snoozedClassQueues = new DelayQueue<DelayedWorkQueue>();
            // just in case: overflow for extreme situations
            snoozedOverflow = bdb.getStoredMap(
                    "snoozedOverflow", Long.class, DelayedWorkQueue.class, true, false);
                
            this.futureUris = bdb.getStoredMap(
                    "futureUris", Long.class, CrawlURI.class, true, recoveryCheckpoint!=null);
            
            // initialize master map in which other queues live
            this.pendingUris = createMultipleWorkQueues();
        }

    上述方法初始化了一系列的队列,这些队列各自的作用待后文再分析

    void initAllQueues()方法是初始化成员变量ObjectIdentityCache<WorkQueue> allQueues = null;如下,在BdbFrontier类里面

    @Override
        protected void initAllQueues() throws DatabaseException {
            boolean isRecovery = (recoveryCheckpoint != null);
            this.allQueues = bdb.getObjectCache("allqueues", isRecovery, WorkQueue.class, BdbWorkQueue.class);
            if(isRecovery) {
                // restore simple instance fields 
                JSONObject json = recoveryCheckpoint.loadJson(beanName);
                try {
                    nextOrdinal.set(json.getLong("nextOrdinal"));
                    queuedUriCount.set(json.getLong("queuedUriCount"));
                    futureUriCount.set(json.getLong("futureUriCount"));
                    succeededFetchCount.set(json.getLong("succeededFetchCount"));
                    failedFetchCount.set(json.getLong("failedFetchCount"));
                    disregardedUriCount.set(json.getLong("disregardedUriCount"));
                    totalProcessedBytes.set(json.getLong("totalProcessedBytes"));
                    JSONArray inactivePrecedences = json.getJSONArray("inactivePrecedences"); 
                    // restore all intended inactiveQueues
                    for(int i = 0; i < inactivePrecedences.length(); i++) {
                        int precedence = inactivePrecedences.getInt(i);
                        inactiveQueuesByPrecedence.put(precedence,createInactiveQueueForPrecedence(precedence,true));
                    }
                } catch (JSONException e) {
                    throw new RuntimeException(e);
                }           
                
                // retired queues already restored with prior data in initOtherQueues
                
                // restore ready queues (those not already on inactive, retired)
                BufferedReader activeQueuesReader = null;
                try {
                    activeQueuesReader = recoveryCheckpoint.loadReader(beanName,"active");
                    String line; 
                    while((line = activeQueuesReader.readLine())!=null) {
                        readyClassQueues.add(line); 
                    }
                } catch (IOException ioe) {
                    throw new RuntimeException(ioe); 
                } finally {
                    IOUtils.closeQuietly(activeQueuesReader); 
                }
    
                // TODO: restore largestQueues topNset?
            }
        }

     ObjectIdentityCache<WorkQueue> allQueues成员用于管理BdbWorkQueue工作队列的缓存

    ---------------------------------------------------------------------------

    本系列Heritrix 3.1.0 源码解析系本人原创

    转载请注明出处 博客园 刺猬的温驯

    本文链接 http://www.cnblogs.com/chenying99/archive/2013/04/18/3027677.html

  • 相关阅读:
    Linkerd 2.10(Step by Step)—将 GitOps 与 Linkerd 和 Argo CD 结合使用
    Linkerd 2.10(Step by Step)—多集群通信
    Linkerd 2.10(Step by Step)—使用 Kustomize 自定义 Linkerd 的配置
    Linkerd 2.10(Step by Step)—控制平面调试端点
    Linkerd 2.10(Step by Step)—配置超时
    Linkerd 2.10(Step by Step)—配置重试
    Linkerd 2.10(Step by Step)—配置代理并发
    本地正常运行,线上环境诡异异常原因集合
    Need to invoke method 'xxx' declared on target class 'yyy', but not found in any interface(s) of the exposed proxy type
    alpine 安装常用命令
  • 原文地址:https://www.cnblogs.com/chenying99/p/3027677.html
Copyright © 2011-2022 走看看