zoukankan      html  css  js  c++  java
  • scrapy-redis(调度器Scheduler源码分析)

    settings里面的配置:
    '''当下面配置了这个(scrapy-redis)时候,下面的调度器已经配置在scrapy-redis里面了'''
    ##########连接配置########
    REDIS_HOST = '127.0.0.1'
    REDIS_PORT = 6379
    # REDIS_PARAMS  = {'password':'xxxx'}    #Redis连接参数,默认:REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,})
    REDIS_ENCODING = "utf-8"
    
    # REDIS_URL = 'redis://user:pass@hostname:6379' #连接URL(优先于以上配置)
    ###########调度器##########
    # from   scrapy_pro1.scheduler_test import  Self_Scheduler
    #SCHEDULER='scrapy_pro1.scheduler_test.Self_Scheduler'##可以使用自己定制的调度器
    
    SCHEDULER='scrapy_redis.scheduler.Scheduler'#自带的调度器
    ##有scrapy_redis里面的调度器,也就是调度器》》scrapy-redis里面的调度器
    SCHEDULER_QUEUE_KEY = '%(spider)s:requests'  # 调度器中请求存放在redis中的key
    #每一个爬虫都有自己自己的历史记录
    '''
    {
    里面是全部的爬虫(里面有相对应的爬虫记录)
    chouti:requets(封装了>>url:'',callback=''):'xx结果'
    由于redis不能存放request对象,所以需要序列化一下,生成字符串然后保存在redis里面,作为key存在
    pickle.dumps(chouti:requets,requets里面封装了要访问url和回调函数,chouti:requets就是key,要去这里面的数据的时候应该也是conn.smembers('chouti:requets')
    }
    '''
    SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat"  # 对保存到redis中的数据进行序列化,默认使用pickle
    ##将requets对象进行序列化处理,作为key保存
    SCHEDULER_PERSIST = False  # 是否在关闭时候保留原来的调度器和去重记录,True=保留,False=清空
    ##是否在关闭的时候保留数据REDIS_PARAMS
    SCHEDULER_FLUSH_ON_START = True  # 是否在开始之前清空 调度器和去重记录,True=清空,False=不清空
    ##在爬虫启动的时候清空或者是不清空
    # SCHEDULER_IDLE_BEFORE_CLOSE = 10  # 去调度器中获取数据时,如果为空,最多等待时间(最后没数据,未获取到)。
    #当没有数据的时候,最多等待的时间
    SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'  # 去重规则,在redis中保存时对应的key》》chouti:dupefilter
    ##爬虫相对应的记录,对应的键
    SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'  # 去重规则对应处理的类
    START_URLS_KEY = '%(name)s:start_urls'
    ##你要保存去重规则的键
    REDIS_START_URLS_AS_SET = False


    在scray-redis调度器scheduler里面:
    实例化调度器对象:scrapy  crawl  baidu  --nolog
    最开始执行from_crawler:
    @classmethod
    def from_crawler(cls, crawler):##当你执行调度器scrapy-redis的时候,就会传入settigs进来,配置信息是在crawler.settings
        instance = cls.from_settings(crawler.settings)##crawlwe.settinsg拿到的是setting对象<scrapy.settings.Settings object at 0x00000265B2E41940>
        '''可以调用里面的方法,通过crawler.settings.get("host")'''
        # FIXME: for now, stats are only supported from this constructor
        instance.stats = crawler.stats
        return instance##执行from_settings,传入参数settings

    执行from_settings(传入参数settings,配置信息):
    作用:读取配置信息
    @classmethod
    def from_settings(cls, settings):##settings是传过来的配置文件信息
        kwargs = {
            'persist': settings.getbool('SCHEDULER_PERSIST'),
            'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
            'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
        }
    
        # If these values are missing, it means we want to use the defaults.
        optional = {
            # TODO: Use custom prefixes for this settings to note that are
            # specific to scrapy-redis.
            'queue_key': 'SCHEDULER_QUEUE_KEY',
            'queue_cls': 'SCHEDULER_QUEUE_CLASS',
            'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
            # We use the default setting name to keep compatibility.
            'dupefilter_cls': 'DUPEFILTER_CLASS',
            'serializer': 'SCHEDULER_SERIALIZER',
        }
        ##读取上面的配置文件,取settings里面找到相对应的值,拿到settings后面的结果
        for name, setting_name in optional.items():
            val = settings.get(setting_name)##匹配settings对应的值出来(自己配置的)
            if val:
                kwargs[name] = val
    '''
    
    val = settings.get(setting_name)取配置文件settings里面拿到相对应的值出来,settings里面的键是在这里面循环拿到的(optional),也就是optional后面的值,对应settinsg里面的键
    kwargs[name] = val#存进去
    '''
        # Support serializer as a path to a module.
    ##序列化操作,爬虫key序列化
        if isinstance(kwargs.get('serializer'), six.string_types):
            kwargs['serializer'] = importlib.import_module(kwargs['serializer'])
    ##取settings里面拿到相对应的配置信息,连接上redis,在settings里面的配置信息就是:
    '''
    REDIS_HOST = '127.0.0.1'
    REDIS_PORT = 6379
    # REDIS_PARAMS  = {'password':'xxxx'}    #Redis连接参数,默认:REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,})
    REDIS_ENCODING = "utf-8"
    
    # REDIS_URL = 'redis://user:pass@hostname:6379' #连接URL(优先于以上配置)
    '''
    server = connection.from_settings(settings)##取配置文件里面读取自己配置的连接相关的配置文件,连接redis操作

    # Ensure the connection is working.
    server.ping()##可以测试有没有连接成功
    return cls(server=server, **kwargs)
    ##开始实例化scheduler对象,执行爬虫,cls是当前的类
    
    
    连接redis操作:from_settings
    from_settings = get_redis_from_settings
    def get_redis_from_settings(settings):
    
    
        params = defaults.REDIS_PARAMS.copy()
    ##拿到默认的配置参数:
    '''
    REDIS_PARAMS = {
        'socket_timeout': 30,
        'socket_connect_timeout': 30,
        'retry_on_timeout': True,
        'encoding': REDIS_ENCODING,
    }
    '''
        params.update(settings.getdict('REDIS_PARAMS'))##取settings里面读取相对应的连接的配合信息,字典扩展一下,后面是settings配置的值,加进去
    ##把配置settings里面的信息加进来
        # XXX: Deprecate REDIS_* settings.
        for source, dest in SETTINGS_PARAMS_MAP.items():
            val = settings.get(source)##settings.get这个是settings里面的字典名称,DNA在settings里面没有配置名称,所以自己是取模块文件取静态方法,直接后面是模块名字
            '''
            这个操作是去到这里的键
            然后在settigs里面拿到拿到相对应的值出来
            '''
            if val:
                params[dest] = val
    
        # Allow ``redis_cls`` to be a path to a class.
        if isinstance(params.get('redis_cls'), six.string_types):
            params['redis_cls'] = load_object(params['redis_cls'])
    
        return get_redis(**params)

    getdict方法:
    def getdict(self, name, default=None):
       
        value = self.get(name, default or {})
        if isinstance(value, six.string_types):
            value = json.loads(value)
        return dict(value)

    实例化scheduler对象的时候,开始执行爬虫:
    ##开始真正执行下面的爬虫部分了,上面的只是取读取配置信息
       def enqueue_request(self, request):
            if not request.dont_filter and self.df.request_seen(request):
                #判断requets里面是否封装了dont_filter
                ##判断之前是否已经存在此爬虫
                self.df.log(request, self.spider)
                return False
            ##已经访问过不用在访问了,返回false
            if self.stats:
                ##如果已经访问过的话
                self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
            ##如果未访问过的话,将这个requets对象,加进调度器里面,以便下载器调度使用
            self.queue.push(request)##放进队列里面,可能是先进先出,优先级队列,取决于你在settings里面的配置
            ##其请求的调度其里面
            return True##没有访问过的url,将他添加进调度器里面
    
    
    下载器去队列里面获取数据:queue
    def next_request(self):
        block_pop_timeout = self.idle_before_close
        request = self.queue.pop(block_pop_timeout)##每pop一次的时候,可以拿出当前取出的requets对象
        if request and self.stats:
            self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
        return request

    scrapy-redis调度器源码:
    from   scrapy_redis.scheduler import   Scheduler
    import importlib
    import six##判断类型,six.xxtype
    
    from scrapy.utils.misc import load_object
    
    from . import connection, defaults
    
    
    # TODO: add SCRAPY_JOB support.
    class Scheduler(object):
        """Redis-based scheduler
    
        Settings
        --------
        SCHEDULER_PERSIST : bool (default: False)
            Whether to persist or clear redis queue.
        SCHEDULER_FLUSH_ON_START : bool (default: False)
            Whether to flush redis queue on start.
        SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0)
            How many seconds to wait before closing if no message is received.
        SCHEDULER_QUEUE_KEY : str
            Scheduler redis key.
        SCHEDULER_QUEUE_CLASS : str
            Scheduler queue class.
        SCHEDULER_DUPEFILTER_KEY : str
            Scheduler dupefilter redis key.
        SCHEDULER_DUPEFILTER_CLASS : str
            Scheduler dupefilter class.
        SCHEDULER_SERIALIZER : str
            Scheduler serializer.
    
        """
    
        def __init__(self, server,
                     persist=False,
                     flush_on_start=False,
                     queue_key=defaults.SCHEDULER_QUEUE_KEY,
                     queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
                     dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,
                     dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,
                     idle_before_close=0,
                     serializer=None):
            """Initialize scheduler.
    
            Parameters
            ----------
            server : Redis
                The redis server instance.
            persist : bool
                Whether to flush requests when closing. Default is False.
            flush_on_start : bool
                Whether to flush requests on start. Default is False.
            queue_key : str
                Requests queue key.
            queue_cls : str
                Importable path to the queue class.
            dupefilter_key : str
                Duplicates filter key.
            dupefilter_cls : str
                Importable path to the dupefilter class.
            idle_before_close : int
                Timeout before giving up.
    
            """
            if idle_before_close < 0:
                raise TypeError("idle_before_close cannot be negative")
    
            self.server = server
            self.persist = persist
            self.flush_on_start = flush_on_start
            self.queue_key = queue_key
            self.queue_cls = queue_cls
            self.dupefilter_cls = dupefilter_cls
            self.dupefilter_key = dupefilter_key
            self.idle_before_close = idle_before_close
            self.serializer = serializer
            self.stats = None
    
        def __len__(self):
            return len(self.queue)
    
        @classmethod
        def from_settings(cls, settings):##settings是传过来的配置文件信息
            kwargs = {
                'persist': settings.getbool('SCHEDULER_PERSIST'),
                'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
                'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
            }
    
            # If these values are missing, it means we want to use the defaults.
            optional = {
                # TODO: Use custom prefixes for this settings to note that are
                # specific to scrapy-redis.
                'queue_key': 'SCHEDULER_QUEUE_KEY',
                'queue_cls': 'SCHEDULER_QUEUE_CLASS',
                'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
                # We use the default setting name to keep compatibility.
                'dupefilter_cls': 'DUPEFILTER_CLASS',
                'serializer': 'SCHEDULER_SERIALIZER',
            }
            ##读取上面的配置文件,取settings里面找到相对应的值,拿到settings后面的结果
            for name, setting_name in optional.items():
                val = settings.get(setting_name)##匹配settings对应的值出来(自己配置的)
                if val:
                    kwargs[name] = val
    
            # Support serializer as a path to a module.
            if isinstance(kwargs.get('serializer'), six.string_types):
                kwargs['serializer'] = importlib.import_module(kwargs['serializer'])
    
            server = connection.from_settings(settings)##取配置文件里面读取自己配置的连接相关的配置文件
            # Ensure the connection is working.
            server.ping()
    
            return cls(server=server, **kwargs)##这里开始实例化scheduler对象,开始正式执行爬虫,cls就是当前的类
    
        @classmethod
        def from_crawler(cls, crawler):##当你执行调度器scrapy-redis的时候,就会传入settigs进来,配置信息是在crawler.settings
            instance = cls.from_settings(crawler.settings)##crawlwe.settinsg拿到的是setting对象<scrapy.settings.Settings object at 0x00000265B2E41940>
            '''可以调用里面的方法,通过crawler.settings.get("host")'''
            # FIXME: for now, stats are only supported from this constructor
            instance.stats = crawler.stats
            return instance
    
        def open(self, spider):
            self.spider = spider
    
            try:
                self.queue = load_object(self.queue_cls)(
                    server=self.server,
                    spider=spider,
                    key=self.queue_key % {'spider': spider.name},
                    serializer=self.serializer,
                )
            except TypeError as e:
                raise ValueError("Failed to instantiate queue class '%s': %s",
                                 self.queue_cls, e)
    
            try:
                self.df = load_object(self.dupefilter_cls)(
                    server=self.server,
                    key=self.dupefilter_key % {'spider': spider.name},
                    debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
                )
            except TypeError as e:
                raise ValueError("Failed to instantiate dupefilter class '%s': %s",
                                 self.dupefilter_cls, e)
    
            if self.flush_on_start:
                self.flush()
            # notice if there are requests already in the queue to resume the crawl
            if len(self.queue):
                spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
    
        def close(self, reason):
            if not self.persist:
                self.flush()
    
        def flush(self):
            self.df.clear()
            self.queue.clear()
    
    
    ##开始真正执行下面的爬虫部分了,上面的只是取读取配置信息
        def enqueue_request(self, request):
            if not request.dont_filter and self.df.request_seen(request):
                #判断requets里面是否封装了dont_filter
                ##判断之前是否已经存在此爬虫
                self.df.log(request, self.spider)
                return False
            ##已经访问过不用在访问了,返回false
            if self.stats:
                ##如果已经访问过的话
                self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
            ##如果未访问过的话,将这个requets对象,加进调度器里面,以便下载器调度使用
            self.queue.push(request)
            ##其请求的调度其里面
            return True##没有访问过的url,将他添加进调度器里面
    
        def next_request(self):
            block_pop_timeout = self.idle_before_close
            request = self.queue.pop(block_pop_timeout)
            if request and self.stats:
                self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
            return request
    
        def has_pending_requests(self):
            return len(self) > 0
  • 相关阅读:
    hadoop之hdfs架构详解
    hadoop之hdfs命令详解
    hadoop高可用安装和原理详解
    mysql事务
    mysql之innodb-锁
    [转]网络基本功02-细说交换机
    python随用随学20200221-生成器中的send(),throw()和close()方法
    python随用随学20200118-函数的高级特性
    [转载]网络基本功01-细说网络传输
    selenium+chrome抓取淘宝宝贝-崔庆才思路
  • 原文地址:https://www.cnblogs.com/yunxintryyoubest/p/9954481.html
Copyright © 2011-2022 走看看