zoukankan      html  css  js  c++  java
  • 数据采集: scrapy-redis源码分析(一)

    一般我们使用python来写爬虫会优先选择scrapy框架, 框架本身基于异步网络请求性能比较高, 另外对并发控制, 延迟请求支持的比较好, 可以使我们专注于爬虫的逻辑. 但是scrapy仅仅支持单机的爬虫, 如果要支持分布式的话还需要借助scrapy-redis来实现. 接下来我们主要关注scrapy-redis的实现方式, 了解实现原理使用起来会更加顺手.

    scrapy-redis的目录结构如下, 各个模块功能见注释

    ├── __init__.py
    ├── connection.py   # 负责redis的连接
    ├── defaults.py    # 一些默认参数配置
    ├── dupefilter.py    # 用于请求队列的去重, 继承了scrapy本身的去重器
    ├── picklecompat.py    # 使用pickle进行序列化
    ├── pipelines.py    # 会把item丢进redis中去
    ├── queue.py    # 调度队列, 调度器会使用该队列
    ├── scheduler.py    # 调度器, 负责任务的调度工作
    ├── spiders.py    # spider基类, 加入了信号等
    └── utils.py
    

    queue.py

    支持三种队列, 都继承自Base

    1. FIFO Queue

    使用了redis的list结构

    class FifoQueue(Base):
        def __len__(self):
            """返回队列长度大小"""
            return self.server.llen(self.key)
    
        def push(self, request):
            """发送请求到队列左边"""
            self.server.lpush(self.key, self._encode_request(request))
    
        def pop(self, timeout=0):
            """从队列右边抛出请求"""
            if timeout > 0:
                data = self.server.brpop(self.key, timeout)
                if isinstance(data, tuple):
                    data = data[1]
            else:
                data = self.server.rpop(self.key)
            if data:
                return self._decode_request(data)
    

    2. PriorityQueue

    使用了redis的有序集合结构

    class PriorityQueue(Base):
    
        def __len__(self):
            """返回队列内长度大小"""
            return self.server.zcard(self.key)
    
        def push(self, request):
            """放入请求到zset中"""
            data = self._encode_request(request)
            score = -request.priority
            self.server.execute_command('ZADD', self.key, score, data)
    
        def pop(self, timeout=0):
            """从zset中抛出请求. 此处不支持timeout参数"""
            pipe = self.server.pipeline()
            pipe.multi()
            pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
            results, count = pipe.execute()
            if results:
                return self._decode_request(results[0])
    

    使用redis的sorted set实现, 如果在spider脚本中需要指定priority的话, 一定要在settings中来声明使用的是PriorityQueue.

    3. LIFO Queue

    后入先出, 使用list结构实现

    
    class LifoQueue(Base):
        """Per-spider LIFO queue."""
    
        def __len__(self):
            """Return the length of the stack"""
            return self.server.llen(self.key)
    
        def push(self, request):
            """Push a request"""
            self.server.lpush(self.key, self._encode_request(request))
    
        def pop(self, timeout=0):
            """Pop a request"""
            if timeout > 0:
                data = self.server.blpop(self.key, timeout)
                if isinstance(data, tuple):
                    data = data[1]
            else:
                data = self.server.lpop(self.key)
    
            if data:
                return self._decode_request(data)
    

    和先进先出队列基本一样, 实现了栈结构

    dupefilter.py

    scrapy默认使用了集合结构来进行去重, 在scrapy-redis中使用redis的集合(set)进行了替换, 请求指纹的计算方法还是用的内置的.

    def request_seen(self, request):
        """获取请求指纹并添加到redis的去重集合中去"""
        fp = self.request_fingerprint(request)    # 得到请求的指纹
        added = self.server.sadd(self.key, fp)    # 把指纹添加到redis的集合中
        return added == 0
    
    def request_fingerprint(self, request):
        return request_fingerprint(request)    # 得到请求指纹
    

    去重指纹计算使用的是sha1算法, 计算值包括请求方法, url, body等信息

    scheduler.py

    替换了scrapy原生的scheduler, 所有方法名称和原生scheduler保持一致, 在爬虫开启后会连接待抓取队列和去重集合, 然后就是不断把新的请求去重后放入待抓取队列, 然后从待抓取队列拿出请求给下载器

        def open(self, spider):
          """爬虫启动时触发, 主要是连接待抓取和去重模块"""
          pass
          
        def enqueue_request(self, request):
            """
            把请求去重后放入 待抓取队列中
            Parameters
            ----------
            request
    
            Returns
            -------
    
            """
            if not request.dont_filter and self.df.request_seen(request):
                self.df.log(request, self.spider)
                return False
            if self.stats:
                self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
            self.queue.push(request)
            return True
    
        def next_request(self):
            """
            从请求队列拿出下一个请求并返回
            Returns
            -------
    
            """
            block_pop_timeout = self.idle_before_close
            request = self.queue.pop(block_pop_timeout)
            if request and self.stats:
                self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
            return request
    

    调度器肯定要和请求队列去重队列进行交互, 所以初始化要获取使用的queuedupfilter的类, 并在open方法中完成实例化

    open方法

    
        def open(self, spider):
            self.spider = spider
    
            try:
                # 得到队列queue的实例化对象
                self.queue = load_object(self.queue_cls)(
                    server=self.server,
                    spider=spider,
                    key=self.queue_key % {'spider': spider.name},
                    serializer=self.serializer,
                )
            except TypeError as e:
                raise ValueError("Failed to instantiate queue class '%s': %s",
                                 self.queue_cls, e)
    
            try:
                # 得到去重的实例化对象
                self.df = load_object(self.dupefilter_cls)(
                    server=self.server,
                    key=self.dupefilter_key % {'spider': spider.name},
                    debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
                )
            except TypeError as e:
                raise ValueError("Failed to instantiate dupefilter class '%s': %s",
                                 self.dupefilter_cls, e)
    
            if self.flush_on_start:    # 如果为True, 要在爬虫开启前删除对应爬虫request队列和dupfilter队列
                self.flush()
            # notice if there are requests already in the queue to resume the crawl
            if len(self.queue):
                spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
    

    spider.py

    spider空闲的时候会从start_urls队列中读取url, 默认一次读取CONCURRENT_REQUESTS个url, 可以在settings中设置REDIS_START_URLS_BATCH_SIZE来改变每次的读取数量, 一般我会在使用的时候增大这个值, 可以降低spide进入idle的次数, 从而适当提升抓取性能

        def setup_redis(self, crawler=None):
            """初始化了redis参数, 包括使用的种子url的key, 批量读取url的数量等信息"""
            ......
            # 当spider空闲的时候会触发该信号, 调用spider_idle函数
            crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
          
        def spider_idle(self):
          """空闲的时候触发该函数, 尝试请求下一批url. 有url的时候会直接请求, 最后都会抛出异常, 防止spider被关闭, 然后等待新的url过来"""
          self.schedule_next_requests()
          raise DontCloseSpider
    
        
    

    以上是scrapy-redis的基本分析, 可以发现源码中大量使用了基础的数据结构和算法的知识, 不太熟悉的同学可以参看之前的文章, 我会在接下来的时间继续分享两篇使用技巧和一些实用的特性.

    文章会首发在公众号, 欢迎大家关注及时查看.

  • 相关阅读:
    Oracle笔记(十五) 数据库备份
    Oracle笔记(十四) 用户管理
    Oracle笔记(十三) 视图、同义词、索引
    Oracle笔记(十二) 集合、序列
    Oracle笔记(十一) 建表、更新、查询综合练习
    Oracle笔记(十) 约束
    Oracle笔记(八) 复杂查询及总结
    Oracle笔记(九) 表的创建及管理
    06-流程控制
    05-数据类型转换
  • 原文地址:https://www.cnblogs.com/zlone/p/11079481.html
Copyright © 2011-2022 走看看