zoukankan      html  css  js  c++  java
  • scrapy_redis 相关: 将 jobdir 保存的爬虫进度转移到 Redis

    0.参考

     Scrapy 隐含 bug: 强制关闭爬虫后从 requests.queue 读取的已保存 request 数量可能有误

    1.说明

    Scrapy 设置 jobdir,停止爬虫后,保存文件目录结构:

    crawl/apps/
    ├── requests.queue
    │   ├── active.json
    │   ├── p0
    │   └── p1
    ├── requests.seen
    └── spider.state

    requests.queue/p0 文件保存 priority=0 的未调度 request, p-1 对应实际 priority=1 的高优先级 request,转移到 redis 有序集合时,score 值越小排序越靠前,因此取 score 为 -1。以此类推,p1 对应 priority=-1 的低优先级 request。

    requests.seen 保存请求指纹过滤器对已入队 request 的 hash 值,每行一个值。

    spider.state 涉及自定义属性的持久化存储,不在本文处理范围以内。

    2.实现代码

    import os
    from os.path import join
    import re
    import struct
    
    import redis
    
    
    def sadd_dupefilter(jobdir, redis_server, name):
        """See python/lib/site-packages/scrapy/dupefilters.py"""
        
        file = join(jobdir, 'requests.seen')
        with open(file) as f:
            print('Processing %s, it may take minutes...'%file)
            key = '%s:dupefilter'%name
            for x in f:
                redis_server.sadd(key, x.rstrip())
        print('Result: {} {}'.format(key, redis_server.scard(key)))
    
    
    def zadd_requests(jobdir, redis_server, name):
        """See python/lib/site-packages/queuelib/queue.py"""
        
        SIZE_FORMAT = ">L"
        SIZE_SIZE = struct.calcsize(SIZE_FORMAT)
        
        key = '%s:requests'%name
        queue_dir = join(jobdir, 'requests.queue')
        file_list = os.listdir(queue_dir)
        file_score_dict = dict([(f, int(f[1:])) for f in file_list 
                                                    if re.match(r'^p-?d+$', f)])
        for (file, score) in file_score_dict.items():
            print('Processing %s, it may take minutes...'%file)
            f = open(join(queue_dir, file), 'rb+')
            qsize = f.read(SIZE_SIZE)
            total_size, = struct.unpack(SIZE_FORMAT, qsize)
            f.seek(0, os.SEEK_END)
    
            actual_size = 0
            while True:
                if f.tell() == SIZE_SIZE:
                    break
                f.seek(-SIZE_SIZE, os.SEEK_CUR)
                size, = struct.unpack(SIZE_FORMAT, f.read(SIZE_SIZE)) 
                f.seek(-size-SIZE_SIZE, os.SEEK_CUR)
                data = f.read(size)
                redis_server.execute_command('ZADD', key, score, data)
                f.seek(-size, os.SEEK_CUR)
                actual_size += 1
            print('total_size {}, actual_size {}, score {}'.format(
                    total_size, actual_size, score))
            print('Result: {} {}'.format(key, redis_server.zlexcount(key, '-', '+')))
    
    
    if __name__ == '__main__':
        name = 'test'
        jobdir = '/home/yourproject/crawl/apps'
        database_num = 0
        # apps/
        # ├── requests.queue
        # │   ├── active.json
        # │   ├── p0
        # │   └── p1
        # ├── requests.seen
        # └── spider.state
        
        password = 'password'
        host = '127.0.0.1'
        port = '6379'
        redis_server = redis.StrictRedis.from_url('redis://:{password}@{host}:{port}/{database_num}'.format(
                                                    password=password, host=host,
                                                    port=port, database_num=database_num))
        
        sadd_dupefilter(jobdir, redis_server, name)
        zadd_requests(jobdir, redis_server, name)

    3.运行结果

  • 相关阅读:
    TortoiseGit安装与配置
    Git GUI安装与配置
    Java小知识点
    form中button未设置type值时点击后提交表单
    文件上传下载学习笔记
    Listener--监听器学习笔记
    验证多个Filter过滤一个资源时执行顺序
    Filter--过滤器学习笔记
    Spring的DAO异常-你可能忽视的异常
    web安全浅析
  • 原文地址:https://www.cnblogs.com/my8100/p/scrapy_jobdir_to_redis.html
Copyright © 2011-2022 走看看