zoukankan      html  css  js  c++  java
  • Heritrix 3.1.0 源码解析(十五)

    本文分析Heritrix3.1.0系统里面的WorkQueue队列(具体是BdbWorkQueue)的调度机制,这部分是系统里面比较复杂的,我只能是尝试分析(本文可能会修改)

    我在Heritrix 3.1.0 源码解析(六)一文中涉及BdbFrontier对象的初始化,现在回顾一下

    我们看到在WorkQueueFrontier类中的初始化方法void start()里面进一步调用了void initInternalQueues()方法

    而void initInternalQueues()方法 里面进一步调用子类BdbFrontier的void initOtherQueues()方法与void initAllQueues()方法(父类为抽象方法)

    @Override
        protected void initOtherQueues() throws DatabaseException {
            boolean recycle = (recoveryCheckpoint != null);
            
            // tiny risk of OutOfMemoryError: if giant number of snoozed
            // queues all wake-to-ready at once
            readyClassQueues = new LinkedBlockingQueue<String>();
    
            inactiveQueuesByPrecedence = new ConcurrentSkipListMap<Integer,Queue<String>>();
            
            retiredQueues = bdb.getStoredQueue("retiredQueues", String.class, recycle);
    
            // primary snoozed queues
            snoozedClassQueues = new DelayQueue<DelayedWorkQueue>();
            // just in case: overflow for extreme situations
            snoozedOverflow = bdb.getStoredMap(
                    "snoozedOverflow", Long.class, DelayedWorkQueue.class, true, false);
                
            this.futureUris = bdb.getStoredMap(
                    "futureUris", Long.class, CrawlURI.class, true, recoveryCheckpoint!=null);
            
            // initialize master map in which other queues live
            this.pendingUris = createMultipleWorkQueues();
        }

    上面方法主要是初始化队列,这里解释一下:

    readyClassQueues存储着已经准备好被爬取的队列的key;[Queue类型]

    inactiveQueuesByPrecedence用Map类型存储着优先级存与非活动状态的队列(队列存储着key);[Map类型]

    retiredQueues存储着不再激活的url队列的key;[Queue类型]

    snoozedClassQueues存储着所有休眠的url队列的key,它们都按唤醒时间排序;[Queue类型]

    snoozedOverflow用Map类型存储着休眠到期时间与过载的休眠状态的队列(队列存储着key)[Map类型]

    futureUris用Map类型存储着调度时间与CrawlURI对象[Map类型]

    这里我们需要注意的是snoozedClassQueues队列的类型DelayQueue<DelayedWorkQueue>,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长

    DelayedWorkQueue类的源码如下

    /**
     * A named WorkQueue wrapped with a wake time, perhaps referenced only
     * by name. 
     * 
     * @contributor gojomo
     */
    class DelayedWorkQueue implements Delayed, Serializable {
        private static final long serialVersionUID = 1L;
    
        public String classKey;
        public long wakeTime;
        
        /**
         * Reference to the WorkQueue, perhaps saving a deserialization
         * from allQueues.
         */
        protected transient WorkQueue workQueue;
        
        public DelayedWorkQueue(WorkQueue queue) {
            this.classKey = queue.getClassKey();
            this.wakeTime = queue.getWakeTime();
            this.workQueue = queue;
        }
        
        // TODO: consider if this should be method on WorkQueueFrontier
        public WorkQueue getWorkQueue(WorkQueueFrontier wqf) {
            if (workQueue == null) {
                // This is a recently deserialized DelayedWorkQueue instance
                WorkQueue result = wqf.getQueueFor(classKey);
                this.workQueue = result;
            }
            return workQueue;
        }
    
        public long getDelay(TimeUnit unit) {
            return unit.convert(
                    wakeTime - System.currentTimeMillis(),
                    TimeUnit.MILLISECONDS);
        }
        
        public String getClassKey() {
            return classKey;
        }
        
        public long getWakeTime() {
            return wakeTime;
        }
        
        public void setWakeTime(long time) {
            this.wakeTime = time;
        }
        
        public int compareTo(Delayed obj) {
            if (this == obj) {
                return 0; // for exact identity only
            }
            DelayedWorkQueue other = (DelayedWorkQueue) obj;
            if (wakeTime > other.getWakeTime()) {
                return 1;
            }
            if (wakeTime < other.getWakeTime()) {
                return -1;
            }
            // at this point, the ordering is arbitrary, but still
            // must be consistent/stable over time
            return this.classKey.compareTo(other.getClassKey());        
        }
        
    }

    该对象必须实现long getDelay(TimeUnit unit) 方法和int compareTo(Delayed obj)方法,用于队列的排序(我们可以看到,DelayedWorkQueue对象是对WorkQueue queue对象的封装,里面按WorkQueue queue设置的延迟时间排序)

    @Override
        protected void initAllQueues() throws DatabaseException {
            boolean isRecovery = (recoveryCheckpoint != null);
            this.allQueues = bdb.getObjectCache("allqueues", isRecovery, WorkQueue.class, BdbWorkQueue.class);
            //后面部分的代码略
        }

    上面方法主要是初始化ObjectIdentityCache<WorkQueue> allQueues变量,可以理解为BdbWorkQueue队列工厂

    接下来分析与BdbFrontier对象void schedule(CrawlURI curi)方法相关的方法

    /**
         * Send a CrawlURI to the appropriate subqueue.
         * 
         * @param curi
         */
        protected void sendToQueue(CrawlURI curi) {
    //        assert Thread.currentThread() == managerThread;
            
            WorkQueue wq = getQueueFor(curi.getClassKey());
            synchronized(wq) {
                int originalPrecedence = wq.getPrecedence();
                wq.enqueue(this, curi);
                // always take budgeting values from current curi
                // (whose overlay settings should be active here)
                wq.setSessionBudget(getBalanceReplenishAmount());
                wq.setTotalBudget(getQueueTotalBudget());
                
                if(!wq.isRetired()) {
                    incrementQueuedUriCount();
                    int currentPrecedence = wq.getPrecedence();
                    if(!wq.isManaged() || currentPrecedence < originalPrecedence) {
                        // queue newly filled or bumped up in precedence; ensure enqueuing
                        // at precedence level (perhaps duplicate; if so that's handled elsewhere)
                        deactivateQueue(wq);
                    }
                }
            }
            // Update recovery log.
            doJournalAdded(curi);
            wq.makeDirty();
            largestQueues.update(wq.getClassKey(), wq.getCount());
        }

    首先是根据classkey从ObjectIdentityCache<WorkQueue> allQueues里面获取BdbWorkQueue队列工厂,WorkQueue getQueueFor(final String classKey) 方法在BdbFrontier类里面

    /**
         * Return the work queue for the given classKey, or null
         * if no such queue exists.
         * 
         * @param classKey key to look for
         * @return the found WorkQueue
         */
        protected WorkQueue getQueueFor(final String classKey) {      
            WorkQueue wq = allQueues.getOrUse(
                    classKey,
                    new Supplier<WorkQueue>() {
                        public BdbWorkQueue get() {
                            String qKey = new String(classKey); // ensure private minimal key
                            BdbWorkQueue q = new BdbWorkQueue(qKey, BdbFrontier.this);
                            q.setTotalBudget(getQueueTotalBudget()); 
                            getQueuePrecedencePolicy().queueCreated(q);
                            return q;
                        }});
            return wq;
        }

    在初始化对应classkey的BdbWorkQueue对象同时,设置long totalBudget成员和 PrecedenceProvider precedenceProvider成员属性值

    再接着void sendToQueue(CrawlURI curi)方法分析,后面部分是先锁定WorkQueue wq对象(防止多线程同时写入),写入BDB数据库,设置属性int sessionBudget  long totalBudget

    如果队列不是移除的队列,再判断该队列是否在生命周期内,如果不在生命周期或者活动队列的数量超过设定的阈值(currentPrecedence < originalPrecedence),将指定队列归入非活动状态队列 (重置highestPrecedenceWaiting值 非活动状态队列里面的precedence最小值)

        /**
         * Put the given queue on the inactiveQueues queue
         * @param wq
         */
        protected void deactivateQueue(WorkQueue wq) {
            int precedence = wq.getPrecedence();
    
            synchronized(wq) {
                wq.noteDeactivated();//active = false; 活动状态   isManaged = true; 被管理
                inProcessQueues.remove(wq);//从进程中的队列移除该队列
                if(wq.getCount()==0) {
                    System.err.println("deactivate empty queue?");
                }
    
                synchronized (getInactiveQueuesByPrecedence()) {
                    getInactiveQueuesForPrecedence(precedence).add(wq.getClassKey());
                    if(wq.getPrecedence() < highestPrecedenceWaiting ) {
                        highestPrecedenceWaiting = wq.getPrecedence();
                    }
                }
    
                if(logger.isLoggable(Level.FINE)) {
                    logger.log(Level.FINE,
                            "queue deactivated to p" + precedence 
                            + ": " + wq.getClassKey());
                }
            }
        }

    getInactiveQueuesByPrecedence()方法是获取用Map类型存储着优先级存与非活动状态的队列(队列存储着key);[Map类型]

    getInactiveQueuesForPrecedence(precedence)方法是按指定优先级获取非活动状态的队列(如果没有则创建),然后将该队列的classkey添加到该非活动状态的队列里面 

     /**
         * 按指定优先级获取非活动状态的队列
         * Get the queue of inactive uri-queue names at the given precedence. 
         * 
         * @param precedence
         * @return queue of inacti
         */
        protected Queue<String> getInactiveQueuesForPrecedence(int precedence) {
            //优先级 /非活动状态的队列的map容器
            Map<Integer,Queue<String>> inactiveQueuesByPrecedence = 
                getInactiveQueuesByPrecedence();
            Queue<String> candidate = inactiveQueuesByPrecedence.get(precedence);
            if(candidate==null) {
                candidate = createInactiveQueueForPrecedence(precedence);
                inactiveQueuesByPrecedence.put(precedence,candidate);
            }
            return candidate;
        }

    相关方法在其子类BdbFrontier里面

    /* (non-Javadoc)
         * 创建非活动状态的队列
         * @see org.archive.crawler.frontier.WorkQueueFrontier#createInactiveQueueForPrecedence(int)
         */
        @Override
        Queue<String> createInactiveQueueForPrecedence(int precedence) {
            return createInactiveQueueForPrecedence(precedence, false);
        }
        
        /** 
         * inactiveQueues存储着所有非活动状态的url队列的key;
         * Optionally reuse prior data, for use when resuming from a checkpoint
         */
        Queue<String> createInactiveQueueForPrecedence(int precedence, boolean usePriorData) {
            return bdb.getStoredQueue("inactiveQueues-"+precedence, String.class, usePriorData);
        }

    ---------------------------------------------------------------------------

    本系列Heritrix 3.1.0 源码解析系本人原创

    转载请注明出处 博客园 刺猬的温驯

    本文链接 http://www.cnblogs.com/chenying99/archive/2013/04/21/3033437.html

  • 相关阅读:
    linux学习25 运维加薪技能-Linux特殊权限及facl扩展
    linux学习24 Linux运维必备技能-文件查找命令企业级应用
    linux学习23 Linux运维必备技能-vim编辑器高效用法进阶
    linux学习22 Linux运维必备技能-vim编辑器深入讲解
    linux学习21 运维核心节能-egrep进阶及文本处理工具应用
    linux学习20 运维核心技能-grep命令与基本正则表达式
    【Hadoop离线基础总结】关键路径转化率分析(漏斗模型)
    【Hadoop离线基础总结】Hive级联求和
    【Hadoop离线基础总结】hive的窗口函数
    【Hadoop离线基础总结】网站流量日志数据分析系统
  • 原文地址:https://www.cnblogs.com/chenying99/p/3033437.html
Copyright © 2011-2022 走看看