zoukankan      html  css  js  c++  java
  • scrapy 基础组件专题(九):scrapy-redis 源码分析

    下面我们来看看,scrapy-redis的每一个源代码文件都实现了什么功能,最后如何实现分布式的爬虫系统:

    connection.py 连接得配置文件
    defaults.py 默认得配置文件
    dupefilter.py   去重规则
    picklecompat.py 格式化
    pipelines.py    序列化变成字符串
    queue.py    队列
    scheduler.py    调度器
    spiders.py  爬虫
    utils.py    把字节转换成字符串

    connect.py

    import six
     
    from scrapy.utils.misc import load_object
     
    from . import defaults
     
     
    # Shortcut maps 'setting name' -> 'parmater name'.
    SETTINGS_PARAMS_MAP = {
        'REDIS_URL': 'url',
        'REDIS_HOST': 'host',
        'REDIS_PORT': 'port',
        'REDIS_ENCODING': 'encoding',
    }
     
     
    def get_redis_from_settings(settings):
        """Returns a redis client instance from given Scrapy settings object.
     
        This function uses ``get_client`` to instantiate the client and uses
        ``defaults.REDIS_PARAMS`` global as defaults values for the parameters. You
        can override them using the ``REDIS_PARAMS`` setting.
     
        Parameters
        ----------
        settings : Settings
            A scrapy settings object. See the supported settings below.
     
        Returns
        -------
        server
            Redis client instance.
     
        Other Parameters
        ----------------
        REDIS_URL : str, optional
            Server connection URL.
        REDIS_HOST : str, optional
            Server host.
        REDIS_PORT : str, optional
            Server port.
        REDIS_ENCODING : str, optional
            Data encoding.
        REDIS_PARAMS : dict, optional
            Additional client parameters.
     
        """
        params = defaults.REDIS_PARAMS.copy()
        params.update(settings.getdict('REDIS_PARAMS'))
        # XXX: Deprecate REDIS_* settings.
        for source, dest in SETTINGS_PARAMS_MAP.items():
            val = settings.get(source)
            if val:
                params[dest] = val
     
        # Allow ``redis_cls`` to be a path to a class.
        if isinstance(params.get('redis_cls'), six.string_types):
            params['redis_cls'] = load_object(params['redis_cls'])
     
        return get_redis(**params)
     
     
    # Backwards compatible alias.
    from_settings = get_redis_from_settings
     
     
    def get_redis(**kwargs):
        """Returns a redis client instance.
     
        Parameters
        ----------
        redis_cls : class, optional
            Defaults to ``redis.StrictRedis``.
        url : str, optional
            If given, ``redis_cls.from_url`` is used to instantiate the class.
        **kwargs
            Extra parameters to be passed to the ``redis_cls`` class.
     
        Returns
        -------
        server
            Redis client instance.
     
        """
        redis_cls = kwargs.pop('redis_cls', defaults.REDIS_CLS)
        url = kwargs.pop('url', None)
        if url:
            return redis_cls.from_url(url, **kwargs)
        else:
            return redis_cls(**kwargs)

    connect文件引入了redis模块,这个是redis-python库的接口,用于通过python访问redis数据库,主要是实现连接redis数据库的功能(返回的是reids库的Redis对象或者StrictRedis对象,这俩都是可以直接用来进行数据操作的对象)。这些连接接口在其他文件中经常被用到。其中,我们可以看到,要想连接到redis数据库,和其他数据库差不多,需要一个ip地址、端口号、用户名密码(可选)和一个整型的数据库编号,同时我们还可以再scrapy的settings文件中配置套接字的超时时间、等待时间等。

    picklecompat.py

    """A pickle wrapper module with protocol=-1 by default."""
     
    try:
        import cPickle as pickle  # PY2
    except ImportError:
        import pickle
     
     
    def loads(s):
        return pickle.loads(s)
     
     
    def dumps(obj):
        return pickle.dumps(obj, protocol=-1)

    这里实现了loads和dumps两个函数,其实就是实现了一个serializer,因为redis数据库不能存储复杂对象(value部分只能是字符串,字符串列表,字符串集合和hash,key部分只能是字符串),所以我们存啥都要先串行化成文本才行。这里使用的就是python的pickle模块,一个兼容py2和py3的串行化工具。这个serializer主要用于一会的scheduler存reuqest对象,至于为什么不实用json格式,我也不是很懂,item pipeline的串行化默认用的就是json。

    pipeline.py

    from scrapy.utils.misc import load_object
    from scrapy.utils.serialize import ScrapyJSONEncoder
    from twisted.internet.threads import deferToThread
     
    from . import connection, defaults
     
     
    default_serialize = ScrapyJSONEncoder().encode
     
     
    class RedisPipeline(object):
        """Pushes serialized item into a redis list/queue
     
        Settings
        --------
        REDIS_ITEMS_KEY : str
            Redis key where to store items.
        REDIS_ITEMS_SERIALIZER : str
            Object path to serializer function.
     
        """
     
        def __init__(self, server,
                     key=defaults.PIPELINE_KEY,
                     serialize_func=default_serialize):
            """Initialize pipeline.
     
            Parameters
            ----------
            server : StrictRedis
                Redis client instance.
            key : str
                Redis key where to store items.
            serialize_func : callable
                Items serializer function.
     
            """
            self.server = server
            self.key = key
            self.serialize = serialize_func
     
        @classmethod
        def from_settings(cls, settings):
            params = {
                'server': connection.from_settings(settings),
            }
            if settings.get('REDIS_ITEMS_KEY'):
                params['key'] = settings['REDIS_ITEMS_KEY']
            if settings.get('REDIS_ITEMS_SERIALIZER'):
                params['serialize_func'] = load_object(
                    settings['REDIS_ITEMS_SERIALIZER']
                )
     
            return cls(**params)
     
        @classmethod
        def from_crawler(cls, crawler):
            return cls.from_settings(crawler.settings)
     
        def process_item(self, item, spider):
            return deferToThread(self._process_item, item, spider)
     
        def _process_item(self, item, spider):
            key = self.item_key(item, spider)
            data = self.serialize(item)
            self.server.rpush(key, data)
            return item
     
        def item_key(self, item, spider):
            """Returns redis key based on given spider.
     
            Override this function to use a different key depending on the item
            and/or spider.
     
            """
            return self.key % {'spider': spider.name}

    pipeline文件实现了一个item pipieline类,和scrapy的item pipeline是同一个对象,通过从settings中拿到我们配置的REDIS_ITEMS_KEY作为key,把item串行化之后存入redis数据库对应的value中(这个value可以看出出是个list,我们的每个item是这个list中的一个结点),这个pipeline把提取出的item存起来,主要是为了方便我们延后处理数据。

    queue.py

    支持三种队列, 都继承自Base

    1. FIFO Queue

    使用了redis的list结构

    class FifoQueue(Base):
        def __len__(self):
            """返回队列长度大小"""
            return self.server.llen(self.key)
    
        def push(self, request):
            """发送请求到队列左边"""
            self.server.lpush(self.key, self._encode_request(request))
    
        def pop(self, timeout=0):
            """从队列右边抛出请求"""
            if timeout > 0:
                data = self.server.brpop(self.key, timeout)
                if isinstance(data, tuple):
                    data = data[1]
            else:
                data = self.server.rpop(self.key)
            if data:
                return self._decode_request(data)

    2. PriorityQueue

    使用了redis的有序集合结构

    class PriorityQueue(Base):
    
        def __len__(self):
            """返回队列内长度大小"""
            return self.server.zcard(self.key)
    
        def push(self, request):
            """放入请求到zset中"""
            data = self._encode_request(request)
            score = -request.priority
            self.server.execute_command('ZADD', self.key, score, data)
    
        def pop(self, timeout=0):
            """从zset中抛出请求. 此处不支持timeout参数"""
            pipe = self.server.pipeline()
            pipe.multi()
            pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
            results, count = pipe.execute()
            if results:
                return self._decode_request(results[0])

    使用redis的sorted set实现, 如果在spider脚本中需要指定priority的话, 一定要在settings中来声明使用的是PriorityQueue.

    3. LIFO Queue

    后入先出, 使用list结构实现

    class LifoQueue(Base):
        """Per-spider LIFO queue."""
    
        def __len__(self):
            """Return the length of the stack"""
            return self.server.llen(self.key)
    
        def push(self, request):
            """Push a request"""
            self.server.lpush(self.key, self._encode_request(request))
    
        def pop(self, timeout=0):
            """Pop a request"""
            if timeout > 0:
                data = self.server.blpop(self.key, timeout)
                if isinstance(data, tuple):
                    data = data[1]
            else:
                data = self.server.lpop(self.key)
    
            if data:
                return self._decode_request(data)

    和先进先出队列基本一样, 实现了栈结构

    该文件实现了几个容器类,可以看这些容器和redis交互频繁,同时使用了我们上边picklecompat中定义的serializer。这个文件实现的几个容器大体相同,只不过一个是队列,一个是栈,一个是优先级队列,这三个容器到时候会被scheduler对象实例化,来实现request的调度。比如我们使用SpiderQueue最为调度队列的类型,到时候request的调度方法就是先进先出,而实用SpiderStack就是先进后出了。 
    我们可以仔细看看SpiderQueue的实现,他的push函数就和其他容器的一样,只不过push进去的request请求先被scrapy的接口request_to_dict变成了一个dict对象(因为request对象实在是比较复杂,有方法有属性不好串行化),之后使用picklecompat中的serializer串行化为字符串,然后使用一个特定的key存入redis中(该key在同一种spider中是相同的)。而调用pop时,其实就是从redis用那个特定的key去读其值(一个list),从list中读取最早进去的那个,于是就先进先出了。 
    这些容器类都会作为scheduler调度request的容器,scheduler在每个主机上都会实例化一个,并且和spider一一对应,所以分布式运行时会有一个spider的多个实例和一个scheduler的多个实例存在于不同的主机上,但是,因为scheduler都是用相同的容器,而这些容器都连接同一个redis服务器,又都使用spider名加queue来作为key读写数据,所以不同主机上的不同爬虫实例公用一个request调度池,实现了分布式爬虫之间的统一调度。

    dupefilter.py

    scrapy默认使用了集合结构来进行去重, 在scrapy-redis中使用redis的集合(set)进行了替换, 请求指纹的计算方法还是用的内置的.

    def request_seen(self, request):
        """获取请求指纹并添加到redis的去重集合中去"""
        fp = self.request_fingerprint(request)    # 得到请求的指纹
        added = self.server.sadd(self.key, fp)    # 把指纹添加到redis的集合中
        return added == 0
    
    def request_fingerprint(self, request):
        return request_fingerprint(request)    # 得到请求指纹

    去重指纹计算使用的是sha1算法, 计算值包括请求方法, url, body等信息

    这个文件看起来比较复杂,重写了scrapy本身已经实现的request判重功能。因为本身scrapy单机跑的话,只需要读取内存中的request队列或者持久化的request队列(scrapy默认的持久化似乎是json格式的文件,不是数据库)就能判断这次要发出的request url是否已经请求过或者正在调度(本地读就行了)。而分布式跑的话,就需要各个主机上的scheduler都连接同一个数据库的同一个request池来判断这次的请求是否是重复的了。 
    在这个文件中,通过继承BaseDupeFilter重写他的方法,实现了基于redis的判重。根据源代码来看,scrapy-redis使用了scrapy本身的一个fingerprint接request_fingerprint,这个接口很有趣,根据scrapy文档所说,他通过hash来判断两个url是否相同(相同的url会生成相同的hash结果),但是当两个url的地址相同,get型参数相同但是顺序不同时,也会生成相同的hash结果(这个真的比较神奇。。。)所以scrapy-redis依旧使用url的fingerprint来判断request请求是否已经出现过。这个类通过连接redis,使用一个key来向redis的一个set中插入fingerprint(这个key对于同一种spider是相同的,redis是一个key-value的数据库,如果key是相同的,访问到的值就是相同的,这里使用spider名字+DupeFilter的key就是为了在不同主机上的不同爬虫实例,只要属于同一种spider,就会访问到同一个set,而这个set就是他们的url判重池),如果返回值为0,说明该set中该fingerprint已经存在(因为集合是没有重复值的),则返回False,如果返回值为1,说明添加了一个fingerprint到set中,则说明这个request没有重复,于是返回True,还顺便把新fingerprint加入到数据库中了。 
    DupeFilter判重会在scheduler类中用到,每一个request在进入调度之前都要进行判重,如果重复就不需要参加调度,直接舍弃就好了,不然就是白白浪费资源。

    spider.py

    spider空闲的时候会从start_urls队列中读取url, 默认一次读取CONCURRENT_REQUESTS个url, 可以在settings中设置REDIS_START_URLS_BATCH_SIZE来改变每次的读取数量, 一般我会在使用的时候增大这个值, 可以降低spide进入idle的次数, 从而适当提升抓取性能

     def setup_redis(self, crawler=None):
            """初始化了redis参数, 包括使用的种子url的key, 批量读取url的数量等信息"""
            ......
            # 当spider空闲的时候会触发该信号, 调用spider_idle函数
            crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
          
        def spider_idle(self):
          """空闲的时候触发该函数, 尝试请求下一批url. 有url的时候会直接请求, 最后都会抛出异常, 防止spider被关闭, 然后等待新的url过来"""
          self.schedule_next_requests()
          raise DontCloseSpider

    spider的改动也不是很大,主要是通过connect接口,给spider绑定了spider_idle信号,spider初始化时,通过setup_redis函数初始化好和redis的连接,之后通过next_requests函数从redis中取出strat url,使用的key是settings中REDIS_START_URLS_AS_SET定义的(注意了这里的初始化url池和我们上边的queue的url池不是一个东西,queue的池是用于调度的,初始化url池是存放入口url的,他们都存在redis中,但是使用不同的key来区分,就当成是不同的表吧),spider使用少量的start url,可以发展出很多新的url,这些url会进入scheduler进行判重和调度。直到spider跑到调度池内没有url的时候,会触发spider_idle信号,从而触发spider的next_requests函数,再次从redis的start url池中读取一些url。

    总结:

             crapy-redis的总体思路:这个工程通过重写scheduler和spider类,实现了调度、spider启动和redis的交互。

             实现新的dupefilter和queue类,达到了判重和调度容器和redis的交互,因为每个主机上的爬虫进程都访问同一个redis数据库,所以调度和判重都统一进行统一管理,达到了分布式爬虫的目的。 
             当spider被初始化时,同时会初始化一个对应的scheduler对象,这个调度器对象通过读取settings,配置好自己的调度容器queue和判重工具dupefilter。

             每当一个spider产出一个request的时候,scrapy内核会把这个reuqest递交给这个spider对应的scheduler对象进行调度,scheduler对象通过访问redis对request进行判重,如果不重复就把他添加进redis中的调度池。当调度条件满足时,scheduler对象就从redis的调度池中取出一个request发送给spider,让他爬取。当spider爬取的所有暂时可用url之后,scheduler发现这个spider对应的redis的调度池空了,于是触发信号spider_idle,spider收到这个信号之后,直接连接redis读取strart url池,拿去新的一批url入口,然后再次重复上边的工作

  • 相关阅读:
    Oozie — What Why and How
    git 用户手册
    整理笔记 C语言
    第一次理解通用链表
    C++ 通用队列类
    懂了这些,你才真正懂了C
    简单键盘驱动
    简述进程间通信方式
    几何原本
    GSP几何画板简介
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/12638997.html
Copyright © 2011-2022 走看看