zoukankan      html  css  js  c++  java
  • scrapy 源码解析 (五):启动流程源码分析(五) Scraper刮取器

    Scraper刮取器

    对ExecutionEngine执行引擎篇出现的Scraper进行展开。Scraper的主要作用是对spider中间件进行管理,通过中间件完成请求、响应、数据分析等工作。

    Scraper对象

    scrapy/core/scraper.py#Scraper:

    class Scraper(object):
        def __init__(self, crawler):
            self.slot = None
            self.spidermw = SpiderMiddlewareManager.from_crawler(crawler)
            itemproc_cls = load_object(crawler.settings['ITEM_PROCESSOR'])
            self.itemproc = itemproc_cls.from_crawler(crawler)
            self.concurrent_items = crawler.settings.getint('CONCURRENT_ITEMS')
            self.crawler = crawler
            self.signals = crawler.signals
            self.logformatter = crawler.logformatter

    主要有3个对象:spidermw,itemproc,concurrent_items。

    spidermw:SpiderMiddlewareManger爬虫中间件管理器

    self.spidermw = SpiderMiddlewareManager.from_crawler(crawler),同样通过from_crawler方法生成爬虫中间件管理器。
    这个from_cralwer方法是基类MiddlewareManger的方法:
    scrapy/middleware.py#MiddlewareManager:

        @classmethod
        def from_crawler(cls, crawler):
            return cls.from_settings(crawler.settings, crawler)
            
        @classmethod
        def from_settings(cls, settings, crawler=None):
            mwlist = cls._get_mwlist_from_settings(settings)
            middlewares = []
            enabled = []
            for clspath in mwlist:
                try:
                    mwcls = load_object(clspath)
                    if crawler and hasattr(mwcls, 'from_crawler'):
                        mw = mwcls.from_crawler(crawler)
                    elif hasattr(mwcls, 'from_settings'):
                        mw = mwcls.from_settings(settings)
                    else:
                        mw = mwcls()
                    middlewares.append(mw)
                    enabled.append(clspath)
                except NotConfigured as e:
                    if e.args:
                        clsname = clspath.split('.')[-1]
                        logger.warning("Disabled %(clsname)s: %(eargs)s",
                                       {'clsname': clsname, 'eargs': e.args[0]},
                                       extra={'crawler': crawler})
    
            logger.info("Enabled %(componentname)ss:
    %(enabledlist)s",
                        {'componentname': cls.component_name,
                         'enabledlist': pprint.pformat(enabled)},
                        extra={'crawler': crawler})
            return cls(*middlewares)

    1.首先调用_get_mwlist_from_settings方法从配置文件中生成中间件列表。
    2.然后依次加载中间件模块并构造对象,构造顺序是先尝试调用from_cralwer,再尝试调用from_settings,最后都没有再调用init(从这里可以看出中间件的加载逻辑,from_cralwer优先于init,中间件直接在配置文件中注册名称使用,不必继承基类)。
    3.中间件除了类路径,还有一个优先级,这个决定了后面调用的先后顺序,数字越小调用越靠前。

    默认中间件:
    scrapy/settings/default_settings.py:

    SPIDER_MIDDLEWARES_BASE = {
        # Engine side
        'scrapy.spidermiddlewares.httperror.HttpErrorMiddleware': 50,
        'scrapy.spidermiddlewares.offsite.OffsiteMiddleware': 500,
        'scrapy.spidermiddlewares.referer.RefererMiddleware': 700,
        'scrapy.spidermiddlewares.urllength.UrlLengthMiddleware': 800,
        'scrapy.spidermiddlewares.depth.DepthMiddleware': 900,
        # Spider side
    }

    再看构造函数:
    scrapy/middleware.py#MiddlewareManager:

    class MiddlewareManager(object):
        def __init__(self, *middlewares):
            self.middlewares = middlewares
            self.methods = defaultdict(list)
            for mw in middlewares:
                self._add_middleware(mw)
        def _add_middleware(self, mw):
            if hasattr(mw, 'open_spider'):
                self.methods['open_spider'].append(mw.open_spider)
            if hasattr(mw, 'close_spider'):
                self.methods['close_spider'].insert(0, mw.close_spider)

    scrapy/core/spidermw.py#SpiderMiddlewareManager:

        def _add_middleware(self, mw):
            super(SpiderMiddlewareManager, self)._add_middleware(mw)
            if hasattr(mw, 'process_spider_input'):
                self.methods['process_spider_input'].append(mw.process_spider_input)
            if hasattr(mw, 'process_spider_output'):
                self.methods['process_spider_output'].insert(0, mw.process_spider_output)
            if hasattr(mw, 'process_spider_exception'):
                self.methods['process_spider_exception'].insert(0, mw.process_spider_exception)
            if hasattr(mw, 'process_start_requests'):
                self.methods['process_start_requests'].insert(0, mw.process_start_requests)

    可知spider中间件默认能够识别的所有信号处理函数为:open_spider,close_spider,process_spider_input,process_spider_output,process_spider_exception,process_start_requests

    itemproc:ItemPipelineManager项目管道管理器

    itemproc_cls = load_object(crawler.settings[‘ITEM_PROCESSOR’])
    self.itemproc = itemproc_cls.from_crawler(crawler)

    itemproc从配置文件中获取‘ITEM_PROCESSOR’,默认为:
    ITEM_PROCESSOR = ‘scrapy.pipelines.ItemPipelineManager’

    同样通过from_crawler方法生成项目管道管理器,且ItemPipelineManager同样继承于MiddlewareManager,也属于中间件。
    scrapy/pipelines/init.py:

    class ItemPipelineManager(MiddlewareManager):
    
        component_name = 'item pipeline'
    
        @classmethod
        def _get_mwlist_from_settings(cls, settings):
            return build_component_list(settings.getwithbase('ITEM_PIPELINES'))
    
        def _add_middleware(self, pipe):
            super(ItemPipelineManager, self)._add_middleware(pipe)
            if hasattr(pipe, 'process_item'):
                self.methods['process_item'].append(pipe.process_item)
    
        def process_item(self, item, spider):
            return self._process_chain('process_item', item, spider)

    默认项目管道中间件为空,ITEM_PIPELINES_BASE = {}。需要程序员自己添加,比如在pipelines.py模块里实现MongodbPipeline类,实现process_item方法,然后在项目配置文件里添加:

    ITEM_PIPELINES = {
       'myspider.pipelines.MongodbPipeline': 300,
    }

    项目管道能够识别的信号处理函数:open_spider,close_spider,process_item

    concurrent_items:并发项目控制

    self.concurrent_items = crawler.settings.getint(‘CONCURRENT_ITEMS’),默认配置值为100。
    这个并发度用于控制同时处理的爬取到的item的数据数目,通过twisted.internet的task.Cooperator实现并发控制。
    scrapy/core/scraper.py#Scraper:

        def handle_spider_output(self, result, request, response, spider):
            if not result:
                return defer_succeed(None)
            it = iter_errback(result, self.handle_spider_error, request, response, spider)
            dfd = parallel(it, self.concurrent_items,
                self._process_spidermw_output, request, response, spider)
            return dfd

    Scraper工作流程

    这3个类是如何配合工作的呢?

    首先,ExecutionEngine在open_spider里会调用scraper的open_spider方法来初始化scraper:
    scrapy/core/engine.py#ExecutionEngine:

    yield self.scraper.open_spider(spider)

    scrapy/core/scraper.py#Scraper:

    @defer.inlineCallbacks
    def open_spider(self, spider):
        self.slot = Slot()
        yield self.itemproc.open_spider(spider)

    声明了一个Slot,如果item管理器中的中间件定义了open_spider方法则调用open_spider。
    前面讲engine的时候讲过,引擎里会通过不断执行’_next_request’方法来处理新的请求,其中又会在不需要backout时调用’_next_request_from_scheduler’来处理新请求,这个方法会从scheduler中取请求处理。
    scrapy/core/engine.py#ExecutionEngine:

        def _next_request_from_scheduler(self, spider):
            slot = self.slot
            request = slot.scheduler.next_request()
            if not request:
                return
            d = self._download(request, spider)
            d.addBoth(self._handle_downloader_output, request, spider)
            d.addErrback(lambda f: logger.info('Error while handling downloader output',
                                               exc_info=failure_to_exc_info(f),
                                               extra={'spider': spider}))
            d.addBoth(lambda _: slot.remove_request(request))
            d.addErrback(lambda f: logger.info('Error while removing request from slot',
                                               exc_info=failure_to_exc_info(f),
                                               extra={'spider': spider}))
            d.addBoth(lambda _: slot.nextcall.schedule())
            d.addErrback(lambda f: logger.info('Error while scheduling new request',
                                               exc_info=failure_to_exc_info(f),
                                               extra={'spider': spider}))
            return d

    可以看到,从scheduler中获取一个请求后,调用_download方法进行下载,然后给这个Deferred安装了一个callback方法_handle_downloader_output来处理下载完成后的操作。最后会移除请求并再一次调用nextcall的schedule来处理新请求,这是前面提到的主动调用的一种情况,被动调用即5s心跳。
    Scraper主要在下载完成后起作用,现在来分析_handle_downloader_output方法:
    scrapy/core/engine.py#ExecutionEngine:

        def _handle_downloader_output(self, response, request, spider):
            assert isinstance(response, (Request, Response, Failure)), response
            # downloader middleware can return requests (for example, redirects)
            if isinstance(response, Request):
                self.crawl(response, spider)
                return
            # response is a Response or Failure
            d = self.scraper.enqueue_scrape(response, request, spider)
            d.addErrback(lambda f: logger.error('Error while enqueuing downloader output',
                                                exc_info=failure_to_exc_info(f),
                                                extra={'spider': spider}))
            return d
            
        def crawl(self, request, spider):
            assert spider in self.open_spiders, 
                "Spider %r not opened when crawling: %s" % (spider.name, request)
            self.schedule(request, spider)
            self.slot.nextcall.schedule()

    可以看到,如果返回的response是Request则继续调用crawl方法入schdeuler队列,否则则调用scraper的enqueue_scrape方法。
    scrapy/core/scraper.py#Scraper:

        def enqueue_scrape(self, response, request, spider):
            slot = self.slot
            dfd = slot.add_response_request(response, request)
            def finish_scraping(_):
                slot.finish_response(response, request)
                self._check_if_closing(spider, slot)
                self._scrape_next(spider, slot)
                return _
            dfd.addBoth(finish_scraping)
            dfd.addErrback(
                lambda f: logger.error('Scraper bug processing %(request)s',
                                       {'request': request},
                                       exc_info=failure_to_exc_info(f),
                                       extra={'spider': spider}))
            self._scrape_next(spider, slot)
            return dfd

    这个方法先把要分析的response放入自己的队列中,然后为这个response返回的deferred添加一个finish_scraping方法,用来处理scraping完成后的操作,然后调用_scrape_next处理队列中的response。

    scrapy/core/scraper.py#Scraper:

        def _scrape_next(self, spider, slot):
            while slot.queue:
                response, request, deferred = slot.next_response_request_deferred()
                self._scrape(response, request, spider).chainDeferred(deferred)

    这个方法不断从队列中获取response来调用_scrape方法,并在_scrape后调用原来安装的finish_scraping方法。

    scrapy/core/scraper.py#Scraper:

        def _scrape(self, response, request, spider):
            """Handle the downloaded response or failure through the spider
            callback/errback"""
            assert isinstance(response, (Response, Failure))
    
            dfd = self._scrape2(response, request, spider) # returns spiders processed output
            dfd.addErrback(self.handle_spider_error, request, response, spider)
            dfd.addCallback(self.handle_spider_output, request, response, spider)
            return dfd

    _scrape方法调用_scrape2后,会给deferred安装handle_spider_output方法,说明在_scrape2处理完成后会调用handle_spider_output方法,这个方法也就是前面提到的处理具体item的方法。

    scrapy/core/scraper.py#Scraper:

        def _scrape2(self, request_result, request, spider):
            """Handle the different cases of request's result been a Response or a
            Failure"""
            if not isinstance(request_result, Failure):
                return self.spidermw.scrape_response(
                    self.call_spider, request_result, request, spider)
            else:
                # FIXME: don't ignore errors in spider middleware
                dfd = self.call_spider(request_result, request, spider)
                return dfd.addErrback(
                    self._log_download_errors, request_result, request, spider)

    _scrape2方法判断如果request_result不是错误就调用SpiderMiddlewareManager中间件管理器的scrape_response方法。

    scrapy/core/spidermw.py#SpiderMiddlewareManager:

        def scrape_response(self, scrape_func, response, request, spider):
            fname = lambda f:'%s.%s' % (
                    six.get_method_self(f).__class__.__name__,
                    six.get_method_function(f).__name__)
    
            def process_spider_input(response):
                for method in self.methods['process_spider_input']:
                    try:
                        result = method(response=response, spider=spider)
                        assert result is None, 
                                'Middleware %s must returns None or ' 
                                'raise an exception, got %s ' 
                                % (fname(method), type(result))
                    except:
                        return scrape_func(Failure(), request, spider)
                return scrape_func(response, request, spider)
    
            def process_spider_exception(_failure):
                exception = _failure.value
                for method in self.methods['process_spider_exception']:
                    result = method(response=response, exception=exception, spider=spider)
                    assert result is None or _isiterable(result), 
                        'Middleware %s must returns None, or an iterable object, got %s ' % 
                        (fname(method), type(result))
                    if result is not None:
                        return result
                return _failure
    
            def process_spider_output(result):
                for method in self.methods['process_spider_output']:
                    result = method(response=response, result=result, spider=spider)
                    assert _isiterable(result), 
                        'Middleware %s must returns an iterable object, got %s ' % 
                        (fname(method), type(result))
                return result
    
            dfd = mustbe_deferred(process_spider_input, response)
            dfd.addErrback(process_spider_exception)
            dfd.addCallback(process_spider_output)
            return dfd

    这个方法首先依次调用中间件的’process_spider_input’方法,然后调用传递进来的scrap_func,也就是call_spider方法,如果某个中间件的’process_spider_input’方法抛出了异常,则以Failure调用call_spider方法。
    如果所有中间件都处理成功,且call_spider也返回成功,则调用’process_spider_output’方法,这个方法依次调用中间件的’process_spider_output’方法。
    scrapy/core/scraper.py#Scraper:

        def call_spider(self, result, request, spider):
            result.request = request
            dfd = defer_result(result)
            dfd.addCallbacks(request.callback or spider.parse, request.errback)
            return dfd.addCallback(iterate_spider_output)

    会对返回的response调用request.callback或者spider.parse方法,即如果Request定义了callback则优先调用callback分析,如果没有则调用spider的parse方法分析。

    总结

    首先通过引擎,从队列中取出url,交给加载器下载得到网页(下载器的下载过程更为复杂,将在下一篇分析),然后会先调用各个spider中间件的’process_spider_input’方法处理,如果全部处理成功则调用request.callback或者spider.parse方法进行分析,然后将分析的结果调用各个spider中间件的‘process_spider_output’处理,都处理成功了再交给ItemPipeLine进行处理,ItemPipeLine调用’process_item’处理爬取到的数据结果。
    ————————————————
    版权声明:本文为CSDN博主「csdn_yym」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/csdn_yym/java/article/details/85577613

  • 相关阅读:
    MySQL存储引擎--MyISAM与InnoDB区别
    PHP笔试面试题精选(一)
    优化 sql 语句的一般步骤
    EXPLAIN 小秘密
    Git stash 使用的小技巧
    git remote用法
    git subtree用法
    nodejs-------windows安装配置
    MongoDB -----windows上安装服务
    layer父子页面交互
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/12932818.html
Copyright © 2011-2022 走看看