zoukankan      html  css  js  c++  java
  • scrapy分布式Spider源码分析及实现过程

    分布式框架scrapy_redis实现了一套完整的组件,其中也实现了spider,RedisSpider是在继承原scrapy的Spider的基础上略有改动,初始URL不在从start_urls列表中读取,而是从redis起始队列中读取。

    scrapy_redis源码在scrapy.redis.spider中,不仅实现了RedisSpider(分布式爬虫)还实现了RedisCrawlSpider(分布式深度爬虫)的逻辑,不过二者很多方法是一致的。

    源码如下:

    from scrapy import signals
    from scrapy.exceptions import DontCloseSpider
    from scrapy.spiders import Spider, CrawlSpider
    
    from . import connection
    
    
    # Default batch size matches default concurrent requests setting.
    
    DEFAULT_START_URLS_BATCH_SIZE = 16
    
    DEFAULT_START_URLS_KEY = '%(name)s:start_urls'
    
    
    class RedisMixin(object):
    
        """Mixin class to implement reading urls from a redis queue."""
    
        # Per spider redis key, default to DEFAULT_START_URLS_KEY.
    
        redis_key = None
    
        # Fetch this amount of start urls when idle. Default to DEFAULT_START_URLS_BATCH_SIZE.
    
        redis_batch_size = None
    
        # Redis client instance.
    
        server = None
    
    
        def start_requests(self):
    
            """Returns a batch of start requests from redis."""
    
            return self.next_requests()
    
    
        def setup_redis(self, crawler=None):
    
            """Setup redis connection and idle signal.
    
            This should be called after the spider has set its crawler object.
    
            """
    
            if self.server is not None:
    
                return
    
    
            if crawler is None:
    
                # We allow optional crawler argument to keep backwards
    
                # compatibility.
    
                # XXX: Raise a deprecation warning.
    
                crawler = getattr(self, 'crawler', None)
    
    
            if crawler is None:
    
                raise ValueError("crawler is required")
    
    
            settings = crawler.settings
    
    
            if self.redis_key is None:
    
                self.redis_key = settings.get(
    
                    'REDIS_START_URLS_KEY', DEFAULT_START_URLS_KEY,
    
                )
    
    
            self.redis_key = self.redis_key % {'name': self.name}
    
    
            if not self.redis_key.strip():
    
                raise ValueError("redis_key must not be empty")
    
    
            if self.redis_batch_size is None:
    
                self.redis_batch_size = settings.getint(
    
                    'REDIS_START_URLS_BATCH_SIZE', DEFAULT_START_URLS_BATCH_SIZE,
    
                )
    
    
            try:
    
                self.redis_batch_size = int(self.redis_batch_size)
    
            except (TypeError, ValueError):
    
                raise ValueError("redis_batch_size must be an integer")
    
    
            self.logger.info("Reading start URLs from redis key '%(redis_key)s' "
    
                             "(batch size: %(redis_batch_size)s)", self.__dict__)
    
    
            self.server = connection.from_settings(crawler.settings)
    
            # The idle signal is called when the spider has no requests left,
    
            # that's when we will schedule new requests from redis queue
    
            crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
    
    
        def next_requests(self):
    
            """Returns a request to be scheduled or none."""
    
            use_set = self.settings.getbool('REDIS_START_URLS_AS_SET')
    
            fetch_one = self.server.spop if use_set else self.server.lpop
    
            # XXX: Do we need to use a timeout here?
    
            found = 0
    
            while found < self.redis_batch_size:
    
                data = fetch_one(self.redis_key)
    
                if not data:
    
                    # Queue empty.
    
                    break
    
                req = self.make_request_from_data(data)
    
                if req:
    
                    yield req
    
                    found += 1
    
                else:
    
                    self.logger.debug("Request not made from data: %r", data)
    
    
            if found:
    
                self.logger.debug("Read %s requests from '%s'", found, self.redis_key)
    
    
        def make_request_from_data(self, data):
    
            # By default, data is an URL.
    
            if '://' in data:
    
                return self.make_requests_from_url(data)
    
            else:
    
                self.logger.error("Unexpected URL from '%s': %r", self.redis_key, data)
    
    
        def schedule_next_requests(self):
    
            """Schedules a request if available"""
    
            for req in self.next_requests():
    
                self.crawler.engine.crawl(req, spider=self)
    
    
        def spider_idle(self):
    
            """Schedules a request if available, otherwise waits."""
    
            # XXX: Handle a sentinel to close the spider.
    
            self.schedule_next_requests()
    
            raise DontCloseSpider
    
    
    class RedisSpider(RedisMixin, Spider):
    
        """Spider that reads urls from redis queue when idle."""
    
    
        @classmethod
    
        def from_crawler(self, crawler, *args, **kwargs):
    
            obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)
    
            obj.setup_redis(crawler)
    
            return obj
    
    
    class RedisCrawlSpider(RedisMixin, CrawlSpider):
    
        """Spider that reads urls from redis queue when idle."""
    
    
        @classmethod
    
        def from_crawler(self, crawler, *args, **kwargs):
    
            obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs)
    
            obj.setup_redis(crawler)
    
            return obj

    开始start_requests方法是从Redis中获取所有连接,然后与URL去重列表比对去重,不被去重的通过make_requests_from_url(url)传递给引擎,然后到调度器,这里和一般的爬虫思路是一致的。

    当引擎收到任务队列为空的消息后会调用Spider_idle方法,进而调用schedule_next_requests迭代获取到的起始URL递交给引擎,然后就开始任务,直到任务完成后在次调用起始队列中的URL。

       def schedule_next_requests(self):
    
            """Schedules a request if available"""
    
            for req in self.next_requests():
    
                self.crawler.engine.crawl(req, spider=self)
    
    
        def spider_idle(self):
    
            """Schedules a request if available, otherwise waits."""
    
            # XXX: Handle a sentinel to close the spider.
    
            self.schedule_next_requests()
    
            raise DontCloseSpider

    RedisSpider和RedisCrawlSpider很简单,继承了Spider和RedisMiXine,实现了setup_redis方法,该方法会根据不同的crawler初始化setting、redis_key,及通过connect接口,给spider绑定了spider_idle信号绑定。

    class RedisSpider(RedisMixin, Spider):
    
        """Spider that reads urls from redis queue when idle."""
    
    
        @classmethod
    
        def from_crawler(self, crawler, *args, **kwargs):
    
            obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)
    
            obj.setup_redis(crawler)
    
            return obj
    
    
    class RedisCrawlSpider(RedisMixin, CrawlSpider):
    
        """Spider that reads urls from redis queue when idle."""
    
    
        @classmethod
    
        def from_crawler(self, crawler, *args, **kwargs):
    
            obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs)
    
            obj.setup_redis(crawler)
    
            return obj
    
    
     def setup_redis(self, crawler=None):
    
            """Setup redis connection and idle signal.
    
            This should be called after the spider has set its crawler object.
    
            """
    
            if self.server is not None:
    
                return
    
    
            if crawler is None:
    
                # We allow optional crawler argument to keep backwards
    
                # compatibility.
    
                # XXX: Raise a deprecation warning.
    
                crawler = getattr(self, 'crawler', None)
    
    
            if crawler is None:
    
                raise ValueError("crawler is required")
    
    
            settings = crawler.settings
    
    
            if self.redis_key is None:
    
                self.redis_key = settings.get(
    
                    'REDIS_START_URLS_KEY', DEFAULT_START_URLS_KEY,
    
                )
    
    
            self.redis_key = self.redis_key % {'name': self.name}
    
    
            if not self.redis_key.strip():
    
                raise ValueError("redis_key must not be empty")
    
    
            if self.redis_batch_size is None:
    
                self.redis_batch_size = settings.getint(
    
                    'REDIS_START_URLS_BATCH_SIZE', DEFAULT_START_URLS_BATCH_SIZE,
    
                )
    
    
            try:
    
                self.redis_batch_size = int(self.redis_batch_size)
    
            except (TypeError, ValueError):
    
                raise ValueError("redis_batch_size must be an integer")
    
    
            self.logger.info("Reading start URLs from redis key '%(redis_key)s' "
    
                             "(batch size: %(redis_batch_size)s)", self.__dict__)
    
    
            self.server = connection.from_settings(crawler.settings)
    
            # The idle signal is called when the spider has no requests left,
    
            # that's when we will schedule new requests from redis queue
    
            crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)

    总结:RedisSpider从初始Redis队列中获取start_url生成Request,然后递交给scrapy引擎交给对应的Redis scheduler调度,完成去重和入队列。当调度条件满足是队列中的Request交由引擎给下载器,生成Response对象递交给Spider解析,产生新的Request,继续抓取;直到没有新的Request产生后,Spider由从起始队列中去获取URL,继续上一轮循环,当然如果队列中没有了,那么爬虫就结束了。

  • 相关阅读:
    Java-WebDriver模块
    Java-数据类型(八中基本数据类型)
    Java基础
    Java介绍
    Eclipse介绍
    Jmeter-服务器监控技术
    性能测试
    Jmeter-相关字段含义
    Jmeter-监听器
    Jmeter-BeanShell组件应用
  • 原文地址:https://www.cnblogs.com/pypypy/p/12121827.html
Copyright © 2011-2022 走看看