zoukankan      html  css  js  c++  java
  • Scrapy-redis

    Redis

    scrapy-redis是一个基于redis的scrapy组件,通过它可以快速实现简单分布式爬虫程序,该组件本质上提供了三大功能:

    • scheduler - 调度器
    • dupefilter - URL去重规则(被调度器使用)
    • pipeline   - 数据持久化

    安装与开启redis 

    1 #redis安装路劲:https://github.com/ServiceStack/redis-windows
    2 
    3 路劲+/redis-server /etc/redis/6379.conf
    View Code

    1.用redis去重url

    情况一:
    只用redis去重:
    # ############ 连接redis 信息 #################
    REDIS_HOST = '127.0.0.1'                            # 主机名
    REDIS_PORT = 6379                                   # 端口
    # REDIS_URL = 'redis://user:pass@hostname:9001'       # 连接URL(优先于以上配置)
    REDIS_PARAMS  = {}                                  # Redis连接参数             默认:REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,})
    # REDIS_PARAMS['redis_cls'] = 'myproject.RedisClient' # 指定连接Redis的Python模块  默认:redis.StrictRedis
    REDIS_ENCODING = "utf-8"
    DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
    配置文件
    class RFPDupeFilter(BaseDupeFilter):
        def __init__(self, server, key, debug=False):
            self.server = server
            self.key = key
    
        @classmethod
        def from_settings(cls, settings):  
            server = get_redis_from_settings(settings)
            key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}
            debug = settings.getbool('DUPEFILTER_DEBUG')
            return cls(server, key=key, debug=debug)
    
        @classmethod
        def from_crawler(cls, crawler):
    
            return cls.from_settings(crawler.settings)
    
        def request_seen(self, request):
    
            fp = self.request_fingerprint(request)
            # This returns the number of values added, zero if already exists.
            added = self.server.sadd(self.key, fp)
            return added == 0 
            # 添加成功返回1,失败返回0
    
        def request_fingerprint(self, request):
    
            return request_fingerprint(request)
    
        def close(self, reason=''):
        clear()
    
        def clear(self):
            self.server.delete(self.key)
    
            if self.debug:
                msg = "Filtered duplicate request: %(request)s"
                self.logger.debug(msg, {'request': request}, extra={'spider': spider})
            elif self.logdupes:
                msg = ("Filtered duplicate request %(request)s"
                       " - no more duplicates will be shown"
                       " (see DUPEFILTER_DEBUG to show all duplicates)")
                self.logger.debug(msg, {'request': request}, extra={'spider': spider})
                self.logdupes = False
    源码

    相对比scrapy的去重源码:

    1.scrapy去重规则是如何实现的?
    class RFPDupeFilter(BaseDupeFilter):
    
        """Request Fingerprint duplicates filter"""
    
        def __init__(self, path=None, debug=False):
            self.file = None
            self.fingerprints = set()
    
        @classmethod
        def from_settings(cls, settings):
            debug = settings.getbool('DUPEFILTER_DEBUG')
            return cls(job_dir(settings), debug)
    
        def request_seen(self, request):
                将request对象转换成唯一标识.
            fp = self.request_fingerprint(request)
    
              判断在集合中是否存在,如果存在返回True,标识已经访问过
            if fp in self.fingerprints:
                return True
              之前没有访问过,将url添加到记录中.
            self.fingerprints.add(fp)
            if self.file:
                self.file.write(fp + os.linesep)
        def request_fingerprint(self, request):
            return request_fingerprint(request)
    
        def close(self, reason):
            if self.file:
                self.file.close()
    1.scrapy去重规则是如何实现的?

    2.redis调度器

    scrapy-redis中的调度器是如何实现的?
    将请求通过pickle进行序列化,然后添加到redis:列表
    或者有序集合中.(根据配置文件)
    SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat"

    去重+调度器
    # ############ 连接redis 信息 #################
    REDIS_HOST = '127.0.0.1'                            # 主机名
    REDIS_PORT = 6379                                   # 端口
    # REDIS_URL = 'redis://user:pass@hostname:9001'       # 连接URL(优先于以上配置)
    REDIS_PARAMS  = {}                                  # Redis连接参数             默认:REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,})
    # REDIS_PARAMS['redis_cls'] = 'myproject.RedisClient' # 指定连接Redis的Python模块  默认:redis.StrictRedis
    REDIS_ENCODING = "utf-8"
    
    DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
    
    # 有引擎来执行:自定义调度器
    SCHEDULER = "scrapy_redis.scheduler.Scheduler"
    SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.LifoQueue'  # 默认使用优先级队列(默认广度优先),其他:PriorityQueue(有序集合),FifoQueue(列表)、LifoQueue(列表)
    SCHEDULER_QUEUE_KEY = '%(spider)s:requests'  # 调度器中请求存放在redis中的key
    SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat"  # 对保存到redis中的数据进行序列化,默认使用pickle
    SCHEDULER_PERSIST = True  # 是否在关闭时候保留原来的调度器和去重记录,True=保留,False=清空
    SCHEDULER_FLUSH_ON_START = False  # 是否在开始之前清空 调度器和去重记录,True=清空,False=不清空
    # SCHEDULER_IDLE_BEFORE_CLOSE = 10  # 去调度器中获取数据时,如果为空,最多等待时间(最后没数据,未获取到)。
    SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'  # 去重规则,在redis中保存时对应的key  chouti:dupefilter
    SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'  # 去重规则对应处理的类
    DUPEFILTER_DEBUG = False
    去重+调度器配置文件

    调度器的源码是如何实现的?

    主要是enqueue_request()

    next_request()两个方法

    import importlib
    import six
    
    from scrapy.utils.misc import load_object
    
    from . import connection, defaults
    
    
    # TODO: add SCRAPY_JOB support.
    redis调度器源码
    class Scheduler(object):
    
    
        def __init__(self, server,
                     persist=False,
                     flush_on_start=False,
                     queue_key=defaults.SCHEDULER_QUEUE_KEY,
                     queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
                     dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,
                     dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,
                     idle_before_close=0,
                     serializer=None):
            if idle_before_close < 0:
                raise TypeError("idle_before_close cannot be negative")
    
            self.server = server
            self.persist = persist
            self.flush_on_start = flush_on_start
            self.queue_key = queue_key
            self.queue_cls = queue_cls
            self.dupefilter_cls = dupefilter_cls
            self.dupefilter_key = dupefilter_key
            self.idle_before_close = idle_before_close
            self.serializer = serializer
            self.stats = None
    
        def __len__(self):
            return len(self.queue)
    
        @classmethod
        def from_settings(cls, settings):
            kwargs = {
                'persist': settings.getbool('SCHEDULER_PERSIST'),
                'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
                'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
            }
    
            # If these values are missing, it means we want to use the defaults.
            optional = {
                # TODO: Use custom prefixes for this settings to note that are
                # specific to scrapy-redis.
                'queue_key': 'SCHEDULER_QUEUE_KEY',
                'queue_cls': 'SCHEDULER_QUEUE_CLASS',
                'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
                # We use the default setting name to keep compatibility.
                'dupefilter_cls': 'DUPEFILTER_CLASS',
                'serializer': 'SCHEDULER_SERIALIZER',
            }
            for name, setting_name in optional.items():
                val = settings.get(setting_name)
                if val:
                    kwargs[name] = val
    
            # Support serializer as a path to a module.
            if isinstance(kwargs.get('serializer'), six.string_types):
                kwargs['serializer'] = importlib.import_module(kwargs['serializer'])
    
            server = connection.from_settings(settings)
            # Ensure the connection is working.
            server.ping()
    
            return cls(server=server, **kwargs)
    
        @classmethod
        def from_crawler(cls, crawler):
            instance = cls.from_settings(crawler.settings)
            # FIXME: for now, stats are only supported from this constructor
            instance.stats = crawler.stats
            return instance
    
        def open(self, spider):
            self.spider = spider
    
            try:
                self.queue = load_object(self.queue_cls)(
                    server=self.server,
                    spider=spider,
                    key=self.queue_key % {'spider': spider.name},
                    serializer=self.serializer,
                )
            except TypeError as e:
                raise ValueError("Failed to instantiate queue class '%s': %s",
                                 self.queue_cls, e)
    
            try:
                self.df = load_object(self.dupefilter_cls)(
                    server=self.server,
                    key=self.dupefilter_key % {'spider': spider.name},
                    debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
                )
            except TypeError as e:
                raise ValueError("Failed to instantiate dupefilter class '%s': %s",
                                 self.dupefilter_cls, e)
    
            if self.flush_on_start:
                self.flush()
            # notice if there are requests already in the queue to resume the crawl
            if len(self.queue):
                spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
    
        def close(self, reason):
            if not self.persist:
                self.flush()
    
        def flush(self):
            self.df.clear()
            self.queue.clear()
        
        *******
        def enqueue_request(self, request):
            if not request.dont_filter and self.df.request_seen(request):
                self.df.log(request, self.spider)
                return False
            if self.stats:
                self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
            self.queue.push(request)
            return True
        *******
        def next_request(self):
            block_pop_timeout = self.idle_before_close
            request = self.queue.pop(block_pop_timeout)
            if request and self.stats:
                self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
            return request
    
        def has_pending_requests(self):
            return len(self) > 0
            
            
    相关Queue源码
    
    class Base(object):
        """Per-spider base queue class"""
    
        def __init__(self, server, spider, key, serializer=None):
            if serializer is None:
                # Backward compatibility.
                # TODO: deprecate pickle.
                serializer = picklecompat
            if not hasattr(serializer, 'loads'):
                raise TypeError("serializer does not implement 'loads' function: %r"
                                % serializer)
            if not hasattr(serializer, 'dumps'):
                raise TypeError("serializer '%s' does not implement 'dumps' function: %r"
                                % serializer)
    
            self.server = server
            self.spider = spider
            self.key = key % {'spider': spider.name}
            self.serializer = serializer
    
        def _encode_request(self, request):
            """Encode a request object"""
            obj = request_to_dict(request, self.spider)
            return self.serializer.dumps(obj)
    
        def _decode_request(self, encoded_request):
            """Decode an request previously encoded"""
            obj = self.serializer.loads(encoded_request)
            return request_from_dict(obj, self.spider)
    
        def __len__(self):
            """Return the length of the queue"""
            raise NotImplementedError
    
        def push(self, request):
            """Push a request"""
            raise NotImplementedError
    
        def pop(self, timeout=0):
            """Pop a request"""
            raise NotImplementedError
    
        def clear(self):
            """Clear queue/stack"""
            self.server.delete(self.key)
    
    class FifoQueue(Base):
        """Per-spider FIFO queue"""
    
        def __len__(self):
            """Return the length of the queue"""
            return self.server.llen(self.key)
    
        def push(self, request):
            """Push a request"""
            self.server.lpush(self.key, self._encode_request(request))
    
        def pop(self, timeout=0):
            """Pop a request"""
            if timeout > 0:
                data = self.server.brpop(self.key, timeout)
                if isinstance(data, tuple):
                    data = data[1]
            else:
                data = self.server.rpop(self.key)
            if data:
                return self._decode_request(data)
    
    class PriorityQueue(Base):
        """Per-spider priority queue abstraction using redis' sorted set"""
    
        def __len__(self):
            """Return the length of the queue"""
            return self.server.zcard(self.key)
    
        def push(self, request):
            """Push a request"""
            data = self._encode_request(request)
            score = -request.priority
            # We don't use zadd method as the order of arguments change depending on
            # whether the class is Redis or StrictRedis, and the option of using
            # kwargs only accepts strings, not bytes.
            self.server.execute_command('ZADD', self.key, score, data)
    
        def pop(self, timeout=0):
            """
            Pop a request
            timeout not support in this queue class
            """
            # use atomic range/remove using multi/exec
            pipe = self.server.pipeline()
            pipe.multi()
            pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
            results, count = pipe.execute()
            if results:
                return self._decode_request(results[0])
    
    class LifoQueue(Base):
        """Per-spider LIFO queue."""
    
        def __len__(self):
            """Return the length of the stack"""
            return self.server.llen(self.key)
    
        def push(self, request):
            """Push a request"""
            self.server.lpush(self.key, self._encode_request(request))
    
        def pop(self, timeout=0):
            """Pop a request"""
            if timeout > 0:
                data = self.server.blpop(self.key, timeout)
                if isinstance(data, tuple):
                    data = data[1]
            else:
                data = self.server.lpop(self.key)
    
            if data:
                return self._decode_request(data)
    
    
    # TODO: Deprecate the use of these names.
    SpiderQueue = FifoQueue
    SpiderStack = LifoQueue
    SpiderPriorityQueue = PriorityQueue
    View Code

    3.pipelines

    使用scrapy-redis内置的pipeline做持久化,就是将item对象保存到redis的列表中:
    配置文件中:

    单独使用pipelins

    # ############ 连接redis 信息 #################
    REDIS_HOST = '127.0.0.1'                            # 主机名
    REDIS_PORT = 6379                                   # 端口
    # REDIS_URL = 'redis://user:pass@hostname:9001'       # 连接URL(优先于以上配置)
    REDIS_PARAMS  = {}                                  # Redis连接参数             默认:REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,})
    # REDIS_PARAMS['redis_cls'] = 'myproject.RedisClient' # 指定连接Redis的Python模块  默认:redis.StrictRedis
    REDIS_ENCODING = "utf-8"
    
    #使用scrapy-redis内置的pipeline做持久化,就是将item对象保存到redis的列表中:
    #配置文件中:
    ITEM_PIPELINES = {
       'scrapy_redis.pipelines.RedisPipeline': 300,
    }
    配置pipelines

    如果是去重+调度器+pipelines

    # ############ 连接redis 信息 #################
    REDIS_HOST = '127.0.0.1'                            # 主机名
    REDIS_PORT = 6379                                   # 端口
    # REDIS_URL = 'redis://user:pass@hostname:9001'       # 连接URL(优先于以上配置)
    REDIS_PARAMS  = {}                                  # Redis连接参数             默认:REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,})
    # REDIS_PARAMS['redis_cls'] = 'myproject.RedisClient' # 指定连接Redis的Python模块  默认:redis.StrictRedis
    REDIS_ENCODING = "utf-8"
    
    DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
    
    # 有引擎来执行:自定义调度器
    SCHEDULER = "scrapy_redis.scheduler.Scheduler"
    SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.LifoQueue'  # 默认使用优先级队列(默认广度优先),其他:PriorityQueue(有序集合),FifoQueue(列表)、LifoQueue(列表)
    SCHEDULER_QUEUE_KEY = '%(spider)s:requests'  # 调度器中请求存放在redis中的key
    SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat"  # 对保存到redis中的数据进行序列化,默认使用pickle
    SCHEDULER_PERSIST = True  # 是否在关闭时候保留原来的调度器和去重记录,True=保留,False=清空
    SCHEDULER_FLUSH_ON_START = False  # 是否在开始之前清空 调度器和去重记录,True=清空,False=不清空
    # SCHEDULER_IDLE_BEFORE_CLOSE = 10  # 去调度器中获取数据时,如果为空,最多等待时间(最后没数据,未获取到)。
    SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'  # 去重规则,在redis中保存时对应的key  chouti:dupefilter
    SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'  # 去重规则对应处理的类
    DUPEFILTER_DEBUG = False
    
    
    #使用scrapy-redis内置的pipeline做持久化,就是将item对象保存到redis的列表中:
    #配置文件中:
    ITEM_PIPELINES = {
       'scrapy_redis.pipelines.RedisPipeline': 300,
    }
    配置

    4.通过scrapy-redis去redis中获取

    实现了分布式爬取,如果没有任务的话,爬取的服务器就夯住

    只要别的服务器往爬虫服务器发送任务,就继续爬取

    在spiders里

    from scrapy_redis.spiders import RedisSpider
    class ChoutiSpider(RedisSpider):
        name = 'chouti'
        allowed_domains = ['chouti.com']
    
        def parse(self, response):
            pass
    View Code

    新建一个.py设置起始url(相当于另外一台服务器)

    import redis
    
    conn=redis.Redis(host='127.0.0.1',port=6379)
    
    conn.lpush('chouti:start_urls','https://dig.chouti.com/')
    View Code

    最后进行设置配置文件

    # 配置文件
     # 起始url的最大数量
    REDIS_START_URLS_BATCH_SIZE=1
    
    # 把起始url放到redis列表
    REDIS_START_URLS_AS_SET=False
    
    # 把起始url放到redis集合
    REDIS_START_URLS_AS_SET=True
    配置文件

    5.大文件下载

    from twisted.web.client import Agent, getPage, ResponseDone, PotentialDataLoss
    
    from twisted.internet import defer, reactor, protocol
    from twisted.web._newclient import Response
    from io import BytesIO
    
    
    class _ResponseReader(protocol.Protocol):
    
        def __init__(self, finished, txresponse, file_name):
            self._finished = finished
            self._txresponse = txresponse
            self._bytes_received = 0
            self.f = open(file_name, mode='wb')
    
        def dataReceived(self, bodyBytes):
            self._bytes_received += len(bodyBytes)
    
            # 一点一点的下载
            self.f.write(bodyBytes)
    
            self.f.flush()
    
        def connectionLost(self, reason):
            if self._finished.called:
                return
            if reason.check(ResponseDone):
                # 下载完成
                self._finished.callback((self._txresponse, 'success'))
            elif reason.check(PotentialDataLoss):
                # 下载部分
                self._finished.callback((self._txresponse, 'partial'))
            else:
                # 下载异常
                self._finished.errback(reason)
    
            self.f.close()
    
    
    class BigfilePipeline(object):
        def process_item(self, item, spider):
            # 创建一个下载文件的任务
    
            if item['type'] == 'file':
                agent = Agent(reactor)
                d = agent.request(
                    method=b'GET',
                    uri=bytes(item['url'], encoding='ascii')
                )
                # 当文件开始下载之后,自动执行 self._cb_bodyready 方法
                d.addCallback(self._cb_bodyready, file_name=item['file_name'])
    
                return d
            else:
                return item
    
        def _cb_bodyready(self, txresponse, file_name):
            # 创建 Deferred 对象,控制直到下载完成后,再关闭链接
            d = defer.Deferred()
            d.addBoth(self.download_result)  # 下载完成/异常/错误之后执行的回调函数
            txresponse.deliverBody(_ResponseReader(d, txresponse, file_name))
            return d
    
        def download_result(self, response):
            pass
    View Code
  • 相关阅读:
    Swift3 重写一个带占位符的textView
    Swift3 使用系统UIAlertView方法做吐司效果
    Swift3 页面顶部实现拉伸效果代码
    Swift3 倒计时按钮扩展
    iOS 获取当前对象所在的VC
    SpringBoot在IDEA下使用JPA
    hibernate 异常a different object with the same identifier value was already associated with the session
    SpringCloud IDEA 教学 番外篇 后台运行Eureka服务注册中心
    SpringCloud IDEA 教学 (五) 断路器控制台(HystrixDashboard)
    SpringCloud IDEA 教学 (四) 断路器(Hystrix)
  • 原文地址:https://www.cnblogs.com/chenxuming/p/9278985.html
Copyright © 2011-2022 走看看