zoukankan      html  css  js  c++  java
  • 小白进阶之Scrapy第六篇Scrapy-Redis详解(转)

    Scrapy-Redis 详解

    通常我们在一个站站点进行采集的时候,如果是小站的话 我们使用scrapy本身就可以满足。

    但是如果在面对一些比较大型的站点的时候,单个scrapy就显得力不从心了。

    要是我们能够多个Scrapy一起采集该多好啊 人多力量大。

    很遗憾Scrapy官方并不支持多个同时采集一个站点,虽然官方给出一个方法:

    **将一个站点的分割成几部分 交给不同的scrapy去采集**

    似乎是个解决办法,但是很麻烦诶!毕竟分割很麻烦的哇

    下面就改轮到我们的额主角Scrapy-Redis登场了!

    什么??你这么就登场了?还没说为什么呢?

    好吧 为了简单起见 就用官方图来简单说明一下:

    这张图大家相信大家都很熟悉了。重点看一下SCHEDULER

    1. 先来看看官方对于SCHEDULER的定义:

    **SCHEDULER接受来自Engine的Requests,并将它们放入队列(可以按顺序优先级),以便在之后将其提供给Engine**

    点我看文档

    2. 现在我们来看看SCHEDULER都提供了些什么功能:

    根据官方文档说明 在我们没有没有指定 SCHEDULER 参数时,默认使用:’scrapy.core.scheduler.Scheduler’ 作为SCHEDULER(调度器)

    scrapy.core.scheduler.py:

    只挑了一些重点的写了一些注释剩下大家自己领会(才不是我懒哦 )

    从上面的代码 我们可以很清楚的知道 SCHEDULER的主要是完成了 push Request pop Request 和 去重的操作。

    而且queue 操作是在内存队列中完成的。

    大家看queuelib.queue就会发现基于内存的(deque)

    那么去重呢?

    按照正常流程就是大家都会进行重复的采集;我们都知道进程之间内存中的数据不可共享的,那么你在开启多个Scrapy的时候,它们相互之间并不知道对方采集了些什么那些没有没采集。那就大家伙儿自己玩自己的了。完全没没有效率的提升啊!

    怎么解决呢?

    这就是我们Scrapy-Redis解决的问题了,不能协作不就是因为Request 和 去重这两个 不能共享吗?

    那我把这两个独立出来好了。

    将Scrapy中的SCHEDULER组件独立放到大家都能访问的地方不就OK啦!加上scrapy-redis后流程图就应该变成这样了?

    So············· 这样是不是看起来就清楚多了???

    下面我们来看看Scrapy-Redis是怎么处理的?

    scrapy_redis.scheduler.py:

    class Scheduler(object):
        """Redis-based scheduler
     
        Settings
        --------
        SCHEDULER_PERSIST : bool (default: False)
            Whether to persist or clear redis queue.
        SCHEDULER_FLUSH_ON_START : bool (default: False)
            Whether to flush redis queue on start.
        SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0)
            How many seconds to wait before closing if no message is received.
        SCHEDULER_QUEUE_KEY : str
            Scheduler redis key.
        SCHEDULER_QUEUE_CLASS : str
            Scheduler queue class.
        SCHEDULER_DUPEFILTER_KEY : str
            Scheduler dupefilter redis key.
        SCHEDULER_DUPEFILTER_CLASS : str
            Scheduler dupefilter class.
        SCHEDULER_SERIALIZER : str
            Scheduler serializer.
     
        """
     
        def __init__(self, server,
                     persist=False,
                     flush_on_start=False,
                     queue_key=defaults.SCHEDULER_QUEUE_KEY,
                     queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
                     dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,
                     dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,
                     idle_before_close=0,
                     serializer=None):
            """Initialize scheduler.
     
            Parameters
            ----------
            server : Redis
                这是Redis实例
            persist : bool
                是否在关闭时清空Requests.默认值是False。
            flush_on_start : bool
                是否在启动时清空Requests。 默认值是False。
            queue_key : str
                Request队列的Key名字
            queue_cls : str
                队列的可导入路径(就是使用什么队列)
            dupefilter_key : str
                去重队列的Key
            dupefilter_cls : str
                去重类的可导入路径。
            idle_before_close : int
                等待多久关闭
     
            """
            if idle_before_close < 0:
                raise TypeError("idle_before_close cannot be negative")
     
            self.server = server
            self.persist = persist
            self.flush_on_start = flush_on_start
            self.queue_key = queue_key
            self.queue_cls = queue_cls
            self.dupefilter_cls = dupefilter_cls
            self.dupefilter_key = dupefilter_key
            self.idle_before_close = idle_before_close
            self.serializer = serializer
            self.stats = None
     
        def __len__(self):
            return len(self.queue)
     
        @classmethod
        def from_settings(cls, settings):
            kwargs = {
                'persist': settings.getbool('SCHEDULER_PERSIST'),
                'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
                'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
            }
     
            # If these values are missing, it means we want to use the defaults.
            optional = {
                # TODO: Use custom prefixes for this settings to note that are
                # specific to scrapy-redis.
                'queue_key': 'SCHEDULER_QUEUE_KEY',
                'queue_cls': 'SCHEDULER_QUEUE_CLASS',
                'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
                # We use the default setting name to keep compatibility.
                'dupefilter_cls': 'DUPEFILTER_CLASS',
                'serializer': 'SCHEDULER_SERIALIZER',
            }
            # 从setting中获取配置组装成dict(具体获取那些配置是optional字典中key)
            for name, setting_name in optional.items():
                val = settings.get(setting_name)
                if val:
                    kwargs[name] = val
     
            # Support serializer as a path to a module.
            if isinstance(kwargs.get('serializer'), six.string_types):
                kwargs['serializer'] = importlib.import_module(kwargs['serializer'])
                    # 或得一个Redis连接
            server = connection.from_settings(settings)
            # Ensure the connection is working.
            server.ping()
     
            return cls(server=server, **kwargs)
     
        @classmethod
        def from_crawler(cls, crawler):
            instance = cls.from_settings(crawler.settings)
            # FIXME: for now, stats are only supported from this constructor
            instance.stats = crawler.stats
            return instance
     
        def open(self, spider):
            self.spider = spider
     
            try:
                  # 根据self.queue_cls这个可以导入的类 实例化一个队列
                self.queue = load_object(self.queue_cls)(
                    server=self.server,
                    spider=spider,
                    key=self.queue_key % {'spider': spider.name},
                    serializer=self.serializer,
                )
            except TypeError as e:
                raise ValueError("Failed to instantiate queue class '%s': %s",
                                 self.queue_cls, e)
     
            try:
                  # 根据self.dupefilter_cls这个可以导入的类 实例一个去重集合
                # 默认是集合 可以实现自己的去重方式 比如 bool 去重
                self.df = load_object(self.dupefilter_cls)(
                    server=self.server,
                    key=self.dupefilter_key % {'spider': spider.name},
                    debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
                )
            except TypeError as e:
                raise ValueError("Failed to instantiate dupefilter class '%s': %s",
                                 self.dupefilter_cls, e)
     
            if self.flush_on_start:
                self.flush()
            # notice if there are requests already in the queue to resume the crawl
            if len(self.queue):
                spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
     
        def close(self, reason):
            if not self.persist:
                self.flush()
     
        def flush(self):
            self.df.clear()
            self.queue.clear()
     
        def enqueue_request(self, request):
          """这个和Scrapy本身的一样"""
            if not request.dont_filter and self.df.request_seen(request):
                self.df.log(request, self.spider)
                return False
            if self.stats:
                self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
            # 向队列里面添加一个Request
            self.queue.push(request)
            return True
     
        def next_request(self):
          """获取一个Request"""
            block_pop_timeout = self.idle_before_close
            # block_pop_timeout 是一个等待参数 队列没有东西会等待这个时间  超时就会关闭
            request = self.queue.pop(block_pop_timeout)
            if request and self.stats:
                self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
            return request
     
        def has_pending_requests(self):
            return len(self) > 0

    来先来看看

    以上就是Scrapy-Redis中的SCHEDULER模块。下面我们来看看queue和本身的什么不同:

    scrapy_redis.queue.py

    以最常用的优先级队列 PriorityQueue 举例:

    以上就是SCHEDULER在处理Request的时候做的操作了。

    是时候来看看SCHEDULER是怎么处理去重的了!

    只需要注意这个?方法即可:

    这样大家就都可以访问同一个Redis 获取同一个spider的Request 在同一个位置去重,就不用担心重复啦

    大概就像这样:

    1. spider1:检查一下这个Request是否在Redis去重,如果在就证明其它的spider采集过啦!如果不在就添加进调度队列,等待别 人获取。自己继续干活抓取网页 产生新的Request了 重复之前步骤。
    2. spider2:以相同的逻辑执行

    可能有些小伙儿会产生疑问了~~!spider2拿到了别人的Request了 怎么能正确的执行呢?逻辑不会错吗?

    这个不用担心啦 因为整Request当中包含了,所有的逻辑,回去看看上面那个序列化的字典。

    总结一下:

    1. 1. Scrapy-Reids 就是将Scrapy原本在内存中处理的 调度(就是一个队列Queue)、去重、这两个操作通过Redis来实现
    2. 多个Scrapy在采集同一个站点时会使用相同的redis key(可以理解为队列)添加Request 获取Request 去重Request,这样所有的spider不会进行重复采集。效率自然就嗖嗖的上去了。
    3. 3. Redis是原子性的,好处不言而喻(一个Request要么被处理 要么没被处理,不存在第三可能)

    另外Scrapy-Redis本身不支持Redis-Cluster,大量网站去重的话会给单机很大的压力(就算使用boolfilter 内存也不够整啊!)

    改造方式很简单:

    1.  使用 **rediscluster** 这个包替换掉本身的Redis连接
    2. Redis-Cluster 不支持事务,可以使用lua脚本进行代替(lua脚本是原子性的哦)
    3. **注意使用lua脚本 不能写占用时间很长的操作**(毕竟一大群人等着操作Redis 你总不能让人家等着吧)

    以上!完毕

    对于懒人小伙伴儿 看看这个我改好的: 集群版Scrapy-Redis **PS: 支持Python3.6+ 哦 ! 其余的版本没测试过**

    转载请注明:静觅 » 小白进阶之Scrapy第六篇Scrapy-Redis详解

  • 相关阅读:
    idea设置全局ignore
    win 2012 安装mysql 5.7.20 及报错 This application requires Visual Studio 2013 Redistributable. Please ins
    win 2012 安装mysql 5.7.20 及报错 This application requires Visual Studio 2013 Redistr
    kafka 删除 topic
    java编译中出现了Exception in thread “main" java.lang.UnsupportedClassVersionError
    Centos中使用yum安装java时,没有jps的问题的解决
    Spring 整合Junit
    Spring纯注解配置
    Spring 基于注解的 IOC 配置
    打印java系统的信息
  • 原文地址:https://www.cnblogs.com/jayxuan/p/10876800.html
Copyright © 2011-2022 走看看