zoukankan      html  css  js  c++  java
  • webmagic源码学习(一)

      最近工作主要是一些爬虫相关的东西,由于公司需要构建自己的爬虫框架,在调研过程中参考了许多优秀的开源作品,包括webmagic,webcollector,Spiderman等,通过学习这些优秀的源码获益良多。

         webmagic是一个简单灵活的爬虫框架。基于WebMagic,你可以快速开发出一个高效、易维护的爬虫。(官网地址:http://webmagic.io/)

         本篇是webmagic源码阅读第一篇,主要探讨webmagic的核心机制,即一个BFS的爬虫是如何构建出来的。

        webmagic分为以下四大组件,Downloader(页面下载器),Scheduler(下载调度器),PageProcessor(页面解析器),Pipeline(管道组件,通常做将抓取结果入库写文件等操作)

                              (图片来自官网)

     以上四个组件由Spider组件组装起来,爬取数据时协同工作。我们先研究webmagic的核心类Spider。

    在Spider中的run()方法中可以清晰的看到典型的BFS代码,通过一个循环不断地从scheduler中的内存队列中取一个抓取任务(Request)并进行相应处理(processRequest),如果抓取成功则回调监听器中的onSuccess()方法,失败则调用onError()方法,最后将已抓取页面的数量自增。如果队列中没有任何抓取任务了,爬虫会在这里停一会防止有新的任务

    加入(waitNewURL()),当然,这里的暂停时间是由你自己决定的。

        最后,如果等待一段时间后队列中仍没有请求,退出循环,将爬虫的状态改为停止并释放资源。

         /**
         * 爬虫的核心方法,广度优先遍历
         */
        @Override
        public void run() {
    //检查爬虫状态:初始化,抓取中,停止 checkRunningStat();
    //初始化爬虫组件 initComponent(); logger.info("Spider " + getUUID() + " started!"); //注意,这里的stat状态是一个CAS变量,保证了多线程访问的安全性 //这里是一个BFS算法 while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) { final Request request = scheduler.poll(this); if (request == null) { if (threadPool.getThreadAlive() == 0 && exitWhenComplete) { break; } // wait until new url added //队列为空时等待一会以防有新URL加入 waitNewUrl(); } else { threadPool.execute(new Runnable() { @Override public void run() { try { //处理遍历到的request processRequest(request); //成功时回调我们注册的所有SpiderListener中的onSuccess()方法 onSuccess(request); } catch (Exception e) { //失败时回调我们注册的所有SpiderListener中的onError()方法 onError(request); logger.error("process request " + request + " error", e); } finally { //抓取总数自增,这里同样是一个CAS操作 pageCount.incrementAndGet(); signalNewUrl(); } } }); } } stat.set(STAT_STOPPED); // release some resources if (destroyWhenExit) { close(); } }

      需要注意的是,这里无论是爬虫的状态变量检查还是最后的自增变量(pageCount)都是CAS操作,因为我们的大多数情况下都会为爬虫开多个线程(当然,你要确保你的

    爬虫不会被网站封禁,而且最好也不要开过多线程,避免给对方服务器造成太大压力)。

        这里的另一个核心方法是processRequest(见下图),对于从scheduler中取到的每个抓取请求,都会做如下操作:

               1.页面下载:首先使用Downloader进行网页下载,获取网页对象Page,如果抓取内容为空,说明抓取出现错误,回调Listener中的onError方法并退出。

               2.页面解析:接下来Spider会回调我们自己写的pageProcessor中的process方法,由于每个网页都有自己的特点,所以需要我们自己进行处理。

               3.新URL抽取:如果事先定义了爬虫需要循环抓取(needCycleRetry)则从当前页面中抽取新的链接并放入调度队列中

               4.数据入库/写文件:Spider回调我们注册的所有pipline,在pipline中我们通常会将结果诸如入库,写文件或简单输出到控制台(webmagic默认支持)。

        /**
         * 处理队列中的某个请求
         * @param request
         */
        protected void processRequest(Request request) {
            Page page = downloader.download(request, this);
            if (page == null) {
                sleep(site.getSleepTime());
                onError(request);
                return;
            }
            // for cycle retry
            if (page.isNeedCycleRetry()) {
                extractAndAddRequests(page, true);
                sleep(site.getRetrySleepTime());
                return;
            }
            //注意,在这里回调了我们自己写的process方法
            pageProcessor.process(page);
            //提取链接并放入调度队列中
            extractAndAddRequests(page, spawnUrl);
            //顺序调用我们注册的pipline,在pipline通常将结果入库,写文件
            if (!page.getResultItems().isSkip()) {
                for (Pipeline pipeline : pipelines) {
                    pipeline.process(page.getResultItems(), this);
                }
            }
            sleep(site.getSleepTime());
        }

            接下来,我们探讨一下爬虫的另一个核心组件Scheduler(任务调度器),以下代码是webmagic中调度器的接口,我们可以看到,它仅仅需要支持两个操作,插入待抓取

    链接(push)和取链接(poll)

     1 public interface Scheduler {
     2 
     3     /**
     4      * add a url to fetch
     5      *
     6      * @param request request
     7      * @param task task
     8      */
     9     public void push(Request request, Task task);
    10 
    11     /**
    12      * get an url to crawl
    13      *
    14      * @param task the task of spider
    15      * @return the url to crawl
    16      */
    17     public Request poll(Task task);
    18 
    19 }

           下面的代码是webmagic默认提供的任务调度器,由于内存中的任务需要进行性排重,我们可以看到webmagic默认使用了HashSet排重,有可能你会说使用单机内存进

    行排重会OOM,事实上在webmagic-extension(webmagic的扩展包)里支持其他几种排重方式,包括Redis排重,布隆过滤器排重(如果不了解的话可以维基一下)。当然,

    如果使用布隆过滤器的话会有一定的误差。

    public abstract class DuplicateRemovedScheduler implements Scheduler {
    
        protected Logger logger = LoggerFactory.getLogger(getClass());
        //可以看到,webmagic默认使用HashSet进行链接去重
        private DuplicateRemover duplicatedRemover = new HashSetDuplicateRemover();
    
        public DuplicateRemover getDuplicateRemover() {
            return duplicatedRemover;
        }
    
        public DuplicateRemovedScheduler setDuplicateRemover(DuplicateRemover duplicatedRemover) {
            this.duplicatedRemover = duplicatedRemover;
            return this;
        }
    
        @Override
        public void push(Request request, Task task) {
            logger.trace("get a candidate url {}", request.getUrl());
            if (shouldReserved(request) || noNeedToRemoveDuplicate(request) || !duplicatedRemover.isDuplicate(request, task)) {
                logger.debug("push to queue {}", request.getUrl());
                pushWhenNoDuplicate(request, task);
            }
        }
    
        protected boolean shouldReserved(Request request) {
            return request.getExtra(Request.CYCLE_TRIED_TIMES) != null;
        }
        /**
         * 判断是否需要去重,如果是一个POST请求则不进行去重
         */
        protected boolean noNeedToRemoveDuplicate(Request request) {
            return HttpConstant.Method.POST.equalsIgnoreCase(request.getMethod());
        }
    
        protected void pushWhenNoDuplicate(Request request, Task task) {
    
        }
    }

      在上图中,我们可以看到,在webmagic中默认不对POST请求进行排重(或许是POST参数的原因),在实际工作中,你也可以对这里进行修改,比如对POST请求的URL+Request Body做一个MD5操作,再将其放入队列中,这样会浪费一些计算时间,但可以对POST请求进行排重,也可以节省一些内存开销。

  • 相关阅读:
    Echars折线配置详解
    Echarts中太阳图(Sunburst)的实例
    MongoDB shell 介绍
    js深度克隆对象
    js将有父子关系的数据转换成树形结构数据
    使用Mongoose类库实现简单的增删改查
    MongoDB可视化工具--Robo 3T 使用教程
    go语言之进阶篇并行和并发的区别与go语言并发优势
    go语言之进阶篇拷贝文件案例
    go语言之进阶篇借助bufio实现按行读取内容
  • 原文地址:https://www.cnblogs.com/showing/p/6752611.html
Copyright © 2011-2022 走看看