zoukankan      html  css  js  c++  java
  • 小白进阶之Scrapy第六篇Scrapy-Redis详解(转)

    Scrapy-Redis 详解

    通常我们在一个站站点进行采集的时候,如果是小站的话 我们使用scrapy本身就可以满足。

    但是如果在面对一些比较大型的站点的时候,单个scrapy就显得力不从心了。

    要是我们能够多个Scrapy一起采集该多好啊 人多力量大。

    很遗憾Scrapy官方并不支持多个同时采集一个站点,虽然官方给出一个方法:

    **将一个站点的分割成几部分 交给不同的scrapy去采集**

    似乎是个解决办法,但是很麻烦诶!毕竟分割很麻烦的哇

    下面就改轮到我们的额主角Scrapy-Redis登场了!

    什么??你这么就登场了?还没说为什么呢?

    好吧 为了简单起见 就用官方图来简单说明一下:

    这张图大家相信大家都很熟悉了。重点看一下SCHEDULER

    1. 先来看看官方对于SCHEDULER的定义:

    **SCHEDULER接受来自Engine的Requests,并将它们放入队列(可以按顺序优先级),以便在之后将其提供给Engine**

    点我看文档

    2. 现在我们来看看SCHEDULER都提供了些什么功能:

    根据官方文档说明 在我们没有没有指定 SCHEDULER 参数时,默认使用:’scrapy.core.scheduler.Scheduler’ 作为SCHEDULER(调度器)

    scrapy.core.scheduler.py:

    只挑了一些重点的写了一些注释剩下大家自己领会(才不是我懒哦 )

    从上面的代码 我们可以很清楚的知道 SCHEDULER的主要是完成了 push Request pop Request 和 去重的操作。

    而且queue 操作是在内存队列中完成的。

    大家看queuelib.queue就会发现基于内存的(deque)

    那么去重呢?

    按照正常流程就是大家都会进行重复的采集;我们都知道进程之间内存中的数据不可共享的,那么你在开启多个Scrapy的时候,它们相互之间并不知道对方采集了些什么那些没有没采集。那就大家伙儿自己玩自己的了。完全没没有效率的提升啊!

    怎么解决呢?

    这就是我们Scrapy-Redis解决的问题了,不能协作不就是因为Request 和 去重这两个 不能共享吗?

    那我把这两个独立出来好了。

    将Scrapy中的SCHEDULER组件独立放到大家都能访问的地方不就OK啦!加上scrapy-redis后流程图就应该变成这样了?

    So············· 这样是不是看起来就清楚多了???

    下面我们来看看Scrapy-Redis是怎么处理的?

    scrapy_redis.scheduler.py:

    class Scheduler(object):
        """Redis-based scheduler
     
        Settings
        --------
        SCHEDULER_PERSIST : bool (default: False)
            Whether to persist or clear redis queue.
        SCHEDULER_FLUSH_ON_START : bool (default: False)
            Whether to flush redis queue on start.
        SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0)
            How many seconds to wait before closing if no message is received.
        SCHEDULER_QUEUE_KEY : str
            Scheduler redis key.
        SCHEDULER_QUEUE_CLASS : str
            Scheduler queue class.
        SCHEDULER_DUPEFILTER_KEY : str
            Scheduler dupefilter redis key.
        SCHEDULER_DUPEFILTER_CLASS : str
            Scheduler dupefilter class.
        SCHEDULER_SERIALIZER : str
            Scheduler serializer.
     
        """
     
        def __init__(self, server,
                     persist=False,
                     flush_on_start=False,
                     queue_key=defaults.SCHEDULER_QUEUE_KEY,
                     queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
                     dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,
                     dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,
                     idle_before_close=0,
                     serializer=None):
            """Initialize scheduler.
     
            Parameters
            ----------
            server : Redis
                这是Redis实例
            persist : bool
                是否在关闭时清空Requests.默认值是False。
            flush_on_start : bool
                是否在启动时清空Requests。 默认值是False。
            queue_key : str
                Request队列的Key名字
            queue_cls : str
                队列的可导入路径(就是使用什么队列)
            dupefilter_key : str
                去重队列的Key
            dupefilter_cls : str
                去重类的可导入路径。
            idle_before_close : int
                等待多久关闭
     
            """
            if idle_before_close < 0:
                raise TypeError("idle_before_close cannot be negative")
     
            self.server = server
            self.persist = persist
            self.flush_on_start = flush_on_start
            self.queue_key = queue_key
            self.queue_cls = queue_cls
            self.dupefilter_cls = dupefilter_cls
            self.dupefilter_key = dupefilter_key
            self.idle_before_close = idle_before_close
            self.serializer = serializer
            self.stats = None
     
        def __len__(self):
            return len(self.queue)
     
        @classmethod
        def from_settings(cls, settings):
            kwargs = {
                'persist': settings.getbool('SCHEDULER_PERSIST'),
                'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
                'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
            }
     
            # If these values are missing, it means we want to use the defaults.
            optional = {
                # TODO: Use custom prefixes for this settings to note that are
                # specific to scrapy-redis.
                'queue_key': 'SCHEDULER_QUEUE_KEY',
                'queue_cls': 'SCHEDULER_QUEUE_CLASS',
                'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
                # We use the default setting name to keep compatibility.
                'dupefilter_cls': 'DUPEFILTER_CLASS',
                'serializer': 'SCHEDULER_SERIALIZER',
            }
            # 从setting中获取配置组装成dict(具体获取那些配置是optional字典中key)
            for name, setting_name in optional.items():
                val = settings.get(setting_name)
                if val:
                    kwargs[name] = val
     
            # Support serializer as a path to a module.
            if isinstance(kwargs.get('serializer'), six.string_types):
                kwargs['serializer'] = importlib.import_module(kwargs['serializer'])
                    # 或得一个Redis连接
            server = connection.from_settings(settings)
            # Ensure the connection is working.
            server.ping()
     
            return cls(server=server, **kwargs)
     
        @classmethod
        def from_crawler(cls, crawler):
            instance = cls.from_settings(crawler.settings)
            # FIXME: for now, stats are only supported from this constructor
            instance.stats = crawler.stats
            return instance
     
        def open(self, spider):
            self.spider = spider
     
            try:
                  # 根据self.queue_cls这个可以导入的类 实例化一个队列
                self.queue = load_object(self.queue_cls)(
                    server=self.server,
                    spider=spider,
                    key=self.queue_key % {'spider': spider.name},
                    serializer=self.serializer,
                )
            except TypeError as e:
                raise ValueError("Failed to instantiate queue class '%s': %s",
                                 self.queue_cls, e)
     
            try:
                  # 根据self.dupefilter_cls这个可以导入的类 实例一个去重集合
                # 默认是集合 可以实现自己的去重方式 比如 bool 去重
                self.df = load_object(self.dupefilter_cls)(
                    server=self.server,
                    key=self.dupefilter_key % {'spider': spider.name},
                    debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
                )
            except TypeError as e:
                raise ValueError("Failed to instantiate dupefilter class '%s': %s",
                                 self.dupefilter_cls, e)
     
            if self.flush_on_start:
                self.flush()
            # notice if there are requests already in the queue to resume the crawl
            if len(self.queue):
                spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
     
        def close(self, reason):
            if not self.persist:
                self.flush()
     
        def flush(self):
            self.df.clear()
            self.queue.clear()
     
        def enqueue_request(self, request):
          """这个和Scrapy本身的一样"""
            if not request.dont_filter and self.df.request_seen(request):
                self.df.log(request, self.spider)
                return False
            if self.stats:
                self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
            # 向队列里面添加一个Request
            self.queue.push(request)
            return True
     
        def next_request(self):
          """获取一个Request"""
            block_pop_timeout = self.idle_before_close
            # block_pop_timeout 是一个等待参数 队列没有东西会等待这个时间  超时就会关闭
            request = self.queue.pop(block_pop_timeout)
            if request and self.stats:
                self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
            return request
     
        def has_pending_requests(self):
            return len(self) > 0

    来先来看看

    以上就是Scrapy-Redis中的SCHEDULER模块。下面我们来看看queue和本身的什么不同:

    scrapy_redis.queue.py

    以最常用的优先级队列 PriorityQueue 举例:

    以上就是SCHEDULER在处理Request的时候做的操作了。

    是时候来看看SCHEDULER是怎么处理去重的了!

    只需要注意这个?方法即可:

    这样大家就都可以访问同一个Redis 获取同一个spider的Request 在同一个位置去重,就不用担心重复啦

    大概就像这样:

    1. spider1:检查一下这个Request是否在Redis去重,如果在就证明其它的spider采集过啦!如果不在就添加进调度队列,等待别 人获取。自己继续干活抓取网页 产生新的Request了 重复之前步骤。
    2. spider2:以相同的逻辑执行

    可能有些小伙儿会产生疑问了~~!spider2拿到了别人的Request了 怎么能正确的执行呢?逻辑不会错吗?

    这个不用担心啦 因为整Request当中包含了,所有的逻辑,回去看看上面那个序列化的字典。

    总结一下:

    1. 1. Scrapy-Reids 就是将Scrapy原本在内存中处理的 调度(就是一个队列Queue)、去重、这两个操作通过Redis来实现
    2. 多个Scrapy在采集同一个站点时会使用相同的redis key(可以理解为队列)添加Request 获取Request 去重Request,这样所有的spider不会进行重复采集。效率自然就嗖嗖的上去了。
    3. 3. Redis是原子性的,好处不言而喻(一个Request要么被处理 要么没被处理,不存在第三可能)

    另外Scrapy-Redis本身不支持Redis-Cluster,大量网站去重的话会给单机很大的压力(就算使用boolfilter 内存也不够整啊!)

    改造方式很简单:

    1.  使用 **rediscluster** 这个包替换掉本身的Redis连接
    2. Redis-Cluster 不支持事务,可以使用lua脚本进行代替(lua脚本是原子性的哦)
    3. **注意使用lua脚本 不能写占用时间很长的操作**(毕竟一大群人等着操作Redis 你总不能让人家等着吧)

    以上!完毕

    对于懒人小伙伴儿 看看这个我改好的: 集群版Scrapy-Redis **PS: 支持Python3.6+ 哦 ! 其余的版本没测试过**

    转载请注明:静觅 » 小白进阶之Scrapy第六篇Scrapy-Redis详解

  • 相关阅读:
    PyCharm的常用方法
    python的基本语法
    Python 环境搭建----windows
    Python 特点
    DML语句
    DDL语句
    程序员的职业素养 读书笔记
    程序员的职业素养 读书笔记
    程序员的职业素养 读书笔记
    程序员的职业素养 读书笔记
  • 原文地址:https://www.cnblogs.com/jayxuan/p/10876800.html
Copyright © 2011-2022 走看看