zoukankan      html  css  js  c++  java
  • scrapy 基础组件专题(九):scrapy-redis 源码分析

    下面我们来看看,scrapy-redis的每一个源代码文件都实现了什么功能,最后如何实现分布式的爬虫系统:

    connection.py 连接得配置文件
    defaults.py 默认得配置文件
    dupefilter.py   去重规则
    picklecompat.py 格式化
    pipelines.py    序列化变成字符串
    queue.py    队列
    scheduler.py    调度器
    spiders.py  爬虫
    utils.py    把字节转换成字符串

    connect.py

    import six
     
    from scrapy.utils.misc import load_object
     
    from . import defaults
     
     
    # Shortcut maps 'setting name' -> 'parmater name'.
    SETTINGS_PARAMS_MAP = {
        'REDIS_URL': 'url',
        'REDIS_HOST': 'host',
        'REDIS_PORT': 'port',
        'REDIS_ENCODING': 'encoding',
    }
     
     
    def get_redis_from_settings(settings):
        """Returns a redis client instance from given Scrapy settings object.
     
        This function uses ``get_client`` to instantiate the client and uses
        ``defaults.REDIS_PARAMS`` global as defaults values for the parameters. You
        can override them using the ``REDIS_PARAMS`` setting.
     
        Parameters
        ----------
        settings : Settings
            A scrapy settings object. See the supported settings below.
     
        Returns
        -------
        server
            Redis client instance.
     
        Other Parameters
        ----------------
        REDIS_URL : str, optional
            Server connection URL.
        REDIS_HOST : str, optional
            Server host.
        REDIS_PORT : str, optional
            Server port.
        REDIS_ENCODING : str, optional
            Data encoding.
        REDIS_PARAMS : dict, optional
            Additional client parameters.
     
        """
        params = defaults.REDIS_PARAMS.copy()
        params.update(settings.getdict('REDIS_PARAMS'))
        # XXX: Deprecate REDIS_* settings.
        for source, dest in SETTINGS_PARAMS_MAP.items():
            val = settings.get(source)
            if val:
                params[dest] = val
     
        # Allow ``redis_cls`` to be a path to a class.
        if isinstance(params.get('redis_cls'), six.string_types):
            params['redis_cls'] = load_object(params['redis_cls'])
     
        return get_redis(**params)
     
     
    # Backwards compatible alias.
    from_settings = get_redis_from_settings
     
     
    def get_redis(**kwargs):
        """Returns a redis client instance.
     
        Parameters
        ----------
        redis_cls : class, optional
            Defaults to ``redis.StrictRedis``.
        url : str, optional
            If given, ``redis_cls.from_url`` is used to instantiate the class.
        **kwargs
            Extra parameters to be passed to the ``redis_cls`` class.
     
        Returns
        -------
        server
            Redis client instance.
     
        """
        redis_cls = kwargs.pop('redis_cls', defaults.REDIS_CLS)
        url = kwargs.pop('url', None)
        if url:
            return redis_cls.from_url(url, **kwargs)
        else:
            return redis_cls(**kwargs)

    connect文件引入了redis模块,这个是redis-python库的接口,用于通过python访问redis数据库,主要是实现连接redis数据库的功能(返回的是reids库的Redis对象或者StrictRedis对象,这俩都是可以直接用来进行数据操作的对象)。这些连接接口在其他文件中经常被用到。其中,我们可以看到,要想连接到redis数据库,和其他数据库差不多,需要一个ip地址、端口号、用户名密码(可选)和一个整型的数据库编号,同时我们还可以再scrapy的settings文件中配置套接字的超时时间、等待时间等。

    picklecompat.py

    """A pickle wrapper module with protocol=-1 by default."""
     
    try:
        import cPickle as pickle  # PY2
    except ImportError:
        import pickle
     
     
    def loads(s):
        return pickle.loads(s)
     
     
    def dumps(obj):
        return pickle.dumps(obj, protocol=-1)

    这里实现了loads和dumps两个函数,其实就是实现了一个serializer,因为redis数据库不能存储复杂对象(value部分只能是字符串,字符串列表,字符串集合和hash,key部分只能是字符串),所以我们存啥都要先串行化成文本才行。这里使用的就是python的pickle模块,一个兼容py2和py3的串行化工具。这个serializer主要用于一会的scheduler存reuqest对象,至于为什么不实用json格式,我也不是很懂,item pipeline的串行化默认用的就是json。

    pipeline.py

    from scrapy.utils.misc import load_object
    from scrapy.utils.serialize import ScrapyJSONEncoder
    from twisted.internet.threads import deferToThread
     
    from . import connection, defaults
     
     
    default_serialize = ScrapyJSONEncoder().encode
     
     
    class RedisPipeline(object):
        """Pushes serialized item into a redis list/queue
     
        Settings
        --------
        REDIS_ITEMS_KEY : str
            Redis key where to store items.
        REDIS_ITEMS_SERIALIZER : str
            Object path to serializer function.
     
        """
     
        def __init__(self, server,
                     key=defaults.PIPELINE_KEY,
                     serialize_func=default_serialize):
            """Initialize pipeline.
     
            Parameters
            ----------
            server : StrictRedis
                Redis client instance.
            key : str
                Redis key where to store items.
            serialize_func : callable
                Items serializer function.
     
            """
            self.server = server
            self.key = key
            self.serialize = serialize_func
     
        @classmethod
        def from_settings(cls, settings):
            params = {
                'server': connection.from_settings(settings),
            }
            if settings.get('REDIS_ITEMS_KEY'):
                params['key'] = settings['REDIS_ITEMS_KEY']
            if settings.get('REDIS_ITEMS_SERIALIZER'):
                params['serialize_func'] = load_object(
                    settings['REDIS_ITEMS_SERIALIZER']
                )
     
            return cls(**params)
     
        @classmethod
        def from_crawler(cls, crawler):
            return cls.from_settings(crawler.settings)
     
        def process_item(self, item, spider):
            return deferToThread(self._process_item, item, spider)
     
        def _process_item(self, item, spider):
            key = self.item_key(item, spider)
            data = self.serialize(item)
            self.server.rpush(key, data)
            return item
     
        def item_key(self, item, spider):
            """Returns redis key based on given spider.
     
            Override this function to use a different key depending on the item
            and/or spider.
     
            """
            return self.key % {'spider': spider.name}

    pipeline文件实现了一个item pipieline类,和scrapy的item pipeline是同一个对象,通过从settings中拿到我们配置的REDIS_ITEMS_KEY作为key,把item串行化之后存入redis数据库对应的value中(这个value可以看出出是个list,我们的每个item是这个list中的一个结点),这个pipeline把提取出的item存起来,主要是为了方便我们延后处理数据。

    queue.py

    支持三种队列, 都继承自Base

    1. FIFO Queue

    使用了redis的list结构

    class FifoQueue(Base):
        def __len__(self):
            """返回队列长度大小"""
            return self.server.llen(self.key)
    
        def push(self, request):
            """发送请求到队列左边"""
            self.server.lpush(self.key, self._encode_request(request))
    
        def pop(self, timeout=0):
            """从队列右边抛出请求"""
            if timeout > 0:
                data = self.server.brpop(self.key, timeout)
                if isinstance(data, tuple):
                    data = data[1]
            else:
                data = self.server.rpop(self.key)
            if data:
                return self._decode_request(data)

    2. PriorityQueue

    使用了redis的有序集合结构

    class PriorityQueue(Base):
    
        def __len__(self):
            """返回队列内长度大小"""
            return self.server.zcard(self.key)
    
        def push(self, request):
            """放入请求到zset中"""
            data = self._encode_request(request)
            score = -request.priority
            self.server.execute_command('ZADD', self.key, score, data)
    
        def pop(self, timeout=0):
            """从zset中抛出请求. 此处不支持timeout参数"""
            pipe = self.server.pipeline()
            pipe.multi()
            pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
            results, count = pipe.execute()
            if results:
                return self._decode_request(results[0])

    使用redis的sorted set实现, 如果在spider脚本中需要指定priority的话, 一定要在settings中来声明使用的是PriorityQueue.

    3. LIFO Queue

    后入先出, 使用list结构实现

    class LifoQueue(Base):
        """Per-spider LIFO queue."""
    
        def __len__(self):
            """Return the length of the stack"""
            return self.server.llen(self.key)
    
        def push(self, request):
            """Push a request"""
            self.server.lpush(self.key, self._encode_request(request))
    
        def pop(self, timeout=0):
            """Pop a request"""
            if timeout > 0:
                data = self.server.blpop(self.key, timeout)
                if isinstance(data, tuple):
                    data = data[1]
            else:
                data = self.server.lpop(self.key)
    
            if data:
                return self._decode_request(data)

    和先进先出队列基本一样, 实现了栈结构

    该文件实现了几个容器类,可以看这些容器和redis交互频繁,同时使用了我们上边picklecompat中定义的serializer。这个文件实现的几个容器大体相同,只不过一个是队列,一个是栈,一个是优先级队列,这三个容器到时候会被scheduler对象实例化,来实现request的调度。比如我们使用SpiderQueue最为调度队列的类型,到时候request的调度方法就是先进先出,而实用SpiderStack就是先进后出了。 
    我们可以仔细看看SpiderQueue的实现,他的push函数就和其他容器的一样,只不过push进去的request请求先被scrapy的接口request_to_dict变成了一个dict对象(因为request对象实在是比较复杂,有方法有属性不好串行化),之后使用picklecompat中的serializer串行化为字符串,然后使用一个特定的key存入redis中(该key在同一种spider中是相同的)。而调用pop时,其实就是从redis用那个特定的key去读其值(一个list),从list中读取最早进去的那个,于是就先进先出了。 
    这些容器类都会作为scheduler调度request的容器,scheduler在每个主机上都会实例化一个,并且和spider一一对应,所以分布式运行时会有一个spider的多个实例和一个scheduler的多个实例存在于不同的主机上,但是,因为scheduler都是用相同的容器,而这些容器都连接同一个redis服务器,又都使用spider名加queue来作为key读写数据,所以不同主机上的不同爬虫实例公用一个request调度池,实现了分布式爬虫之间的统一调度。

    dupefilter.py

    scrapy默认使用了集合结构来进行去重, 在scrapy-redis中使用redis的集合(set)进行了替换, 请求指纹的计算方法还是用的内置的.

    def request_seen(self, request):
        """获取请求指纹并添加到redis的去重集合中去"""
        fp = self.request_fingerprint(request)    # 得到请求的指纹
        added = self.server.sadd(self.key, fp)    # 把指纹添加到redis的集合中
        return added == 0
    
    def request_fingerprint(self, request):
        return request_fingerprint(request)    # 得到请求指纹

    去重指纹计算使用的是sha1算法, 计算值包括请求方法, url, body等信息

    这个文件看起来比较复杂,重写了scrapy本身已经实现的request判重功能。因为本身scrapy单机跑的话,只需要读取内存中的request队列或者持久化的request队列(scrapy默认的持久化似乎是json格式的文件,不是数据库)就能判断这次要发出的request url是否已经请求过或者正在调度(本地读就行了)。而分布式跑的话,就需要各个主机上的scheduler都连接同一个数据库的同一个request池来判断这次的请求是否是重复的了。 
    在这个文件中,通过继承BaseDupeFilter重写他的方法,实现了基于redis的判重。根据源代码来看,scrapy-redis使用了scrapy本身的一个fingerprint接request_fingerprint,这个接口很有趣,根据scrapy文档所说,他通过hash来判断两个url是否相同(相同的url会生成相同的hash结果),但是当两个url的地址相同,get型参数相同但是顺序不同时,也会生成相同的hash结果(这个真的比较神奇。。。)所以scrapy-redis依旧使用url的fingerprint来判断request请求是否已经出现过。这个类通过连接redis,使用一个key来向redis的一个set中插入fingerprint(这个key对于同一种spider是相同的,redis是一个key-value的数据库,如果key是相同的,访问到的值就是相同的,这里使用spider名字+DupeFilter的key就是为了在不同主机上的不同爬虫实例,只要属于同一种spider,就会访问到同一个set,而这个set就是他们的url判重池),如果返回值为0,说明该set中该fingerprint已经存在(因为集合是没有重复值的),则返回False,如果返回值为1,说明添加了一个fingerprint到set中,则说明这个request没有重复,于是返回True,还顺便把新fingerprint加入到数据库中了。 
    DupeFilter判重会在scheduler类中用到,每一个request在进入调度之前都要进行判重,如果重复就不需要参加调度,直接舍弃就好了,不然就是白白浪费资源。

    spider.py

    spider空闲的时候会从start_urls队列中读取url, 默认一次读取CONCURRENT_REQUESTS个url, 可以在settings中设置REDIS_START_URLS_BATCH_SIZE来改变每次的读取数量, 一般我会在使用的时候增大这个值, 可以降低spide进入idle的次数, 从而适当提升抓取性能

     def setup_redis(self, crawler=None):
            """初始化了redis参数, 包括使用的种子url的key, 批量读取url的数量等信息"""
            ......
            # 当spider空闲的时候会触发该信号, 调用spider_idle函数
            crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
          
        def spider_idle(self):
          """空闲的时候触发该函数, 尝试请求下一批url. 有url的时候会直接请求, 最后都会抛出异常, 防止spider被关闭, 然后等待新的url过来"""
          self.schedule_next_requests()
          raise DontCloseSpider

    spider的改动也不是很大,主要是通过connect接口,给spider绑定了spider_idle信号,spider初始化时,通过setup_redis函数初始化好和redis的连接,之后通过next_requests函数从redis中取出strat url,使用的key是settings中REDIS_START_URLS_AS_SET定义的(注意了这里的初始化url池和我们上边的queue的url池不是一个东西,queue的池是用于调度的,初始化url池是存放入口url的,他们都存在redis中,但是使用不同的key来区分,就当成是不同的表吧),spider使用少量的start url,可以发展出很多新的url,这些url会进入scheduler进行判重和调度。直到spider跑到调度池内没有url的时候,会触发spider_idle信号,从而触发spider的next_requests函数,再次从redis的start url池中读取一些url。

    总结:

             crapy-redis的总体思路:这个工程通过重写scheduler和spider类,实现了调度、spider启动和redis的交互。

             实现新的dupefilter和queue类,达到了判重和调度容器和redis的交互,因为每个主机上的爬虫进程都访问同一个redis数据库,所以调度和判重都统一进行统一管理,达到了分布式爬虫的目的。 
             当spider被初始化时,同时会初始化一个对应的scheduler对象,这个调度器对象通过读取settings,配置好自己的调度容器queue和判重工具dupefilter。

             每当一个spider产出一个request的时候,scrapy内核会把这个reuqest递交给这个spider对应的scheduler对象进行调度,scheduler对象通过访问redis对request进行判重,如果不重复就把他添加进redis中的调度池。当调度条件满足时,scheduler对象就从redis的调度池中取出一个request发送给spider,让他爬取。当spider爬取的所有暂时可用url之后,scheduler发现这个spider对应的redis的调度池空了,于是触发信号spider_idle,spider收到这个信号之后,直接连接redis读取strart url池,拿去新的一批url入口,然后再次重复上边的工作

  • 相关阅读:
    Java实现 蓝桥杯 算法提高 小X的购物计划
    Java实现 蓝桥杯 算法提高 小X的购物计划
    Java实现 第十一届 蓝桥杯 (高职专科组)省内模拟赛
    Java实现 第十一届 蓝桥杯 (高职专科组)省内模拟赛
    Java实现 第十一届 蓝桥杯 (高职专科组)省内模拟赛
    Java 第十一届 蓝桥杯 省模拟赛 小明的城堡
    Java 第十一届 蓝桥杯 省模拟赛 小明的城堡
    Java 第十一届 蓝桥杯 省模拟赛 小明的城堡
    129. Sum Root to Leaf Numbers
    117. Populating Next Right Pointers in Each Node II
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/12638997.html
Copyright © 2011-2022 走看看