下面我们来看看,scrapy-redis的每一个源代码文件都实现了什么功能,最后如何实现分布式的爬虫系统:
connection.py 连接得配置文件
defaults.py 默认得配置文件
dupefilter.py 去重规则
picklecompat.py 格式化
pipelines.py 序列化变成字符串
queue.py 队列
scheduler.py 调度器
spiders.py 爬虫
utils.py 把字节转换成字符串
connect.py
import six from scrapy.utils.misc import load_object from . import defaults # Shortcut maps 'setting name' -> 'parmater name'. SETTINGS_PARAMS_MAP = { 'REDIS_URL': 'url', 'REDIS_HOST': 'host', 'REDIS_PORT': 'port', 'REDIS_ENCODING': 'encoding', } def get_redis_from_settings(settings): """Returns a redis client instance from given Scrapy settings object. This function uses ``get_client`` to instantiate the client and uses ``defaults.REDIS_PARAMS`` global as defaults values for the parameters. You can override them using the ``REDIS_PARAMS`` setting. Parameters ---------- settings : Settings A scrapy settings object. See the supported settings below. Returns ------- server Redis client instance. Other Parameters ---------------- REDIS_URL : str, optional Server connection URL. REDIS_HOST : str, optional Server host. REDIS_PORT : str, optional Server port. REDIS_ENCODING : str, optional Data encoding. REDIS_PARAMS : dict, optional Additional client parameters. """ params = defaults.REDIS_PARAMS.copy() params.update(settings.getdict('REDIS_PARAMS')) # XXX: Deprecate REDIS_* settings. for source, dest in SETTINGS_PARAMS_MAP.items(): val = settings.get(source) if val: params[dest] = val # Allow ``redis_cls`` to be a path to a class. if isinstance(params.get('redis_cls'), six.string_types): params['redis_cls'] = load_object(params['redis_cls']) return get_redis(**params) # Backwards compatible alias. from_settings = get_redis_from_settings def get_redis(**kwargs): """Returns a redis client instance. Parameters ---------- redis_cls : class, optional Defaults to ``redis.StrictRedis``. url : str, optional If given, ``redis_cls.from_url`` is used to instantiate the class. **kwargs Extra parameters to be passed to the ``redis_cls`` class. Returns ------- server Redis client instance. """ redis_cls = kwargs.pop('redis_cls', defaults.REDIS_CLS) url = kwargs.pop('url', None) if url: return redis_cls.from_url(url, **kwargs) else: return redis_cls(**kwargs)
connect文件引入了redis模块,这个是redis-python库的接口,用于通过python访问redis数据库,主要是实现连接redis数据库的功能(返回的是reids库的Redis对象或者StrictRedis对象,这俩都是可以直接用来进行数据操作的对象)。这些连接接口在其他文件中经常被用到。其中,我们可以看到,要想连接到redis数据库,和其他数据库差不多,需要一个ip地址、端口号、用户名密码(可选)和一个整型的数据库编号,同时我们还可以再scrapy的settings文件中配置套接字的超时时间、等待时间等。
picklecompat.py
"""A pickle wrapper module with protocol=-1 by default.""" try: import cPickle as pickle # PY2 except ImportError: import pickle def loads(s): return pickle.loads(s) def dumps(obj): return pickle.dumps(obj, protocol=-1)
这里实现了loads和dumps两个函数,其实就是实现了一个serializer,因为redis数据库不能存储复杂对象(value部分只能是字符串,字符串列表,字符串集合和hash,key部分只能是字符串),所以我们存啥都要先串行化成文本才行。这里使用的就是python的pickle模块,一个兼容py2和py3的串行化工具。这个serializer主要用于一会的scheduler存reuqest对象,至于为什么不实用json格式,我也不是很懂,item pipeline的串行化默认用的就是json。
pipeline.py
from scrapy.utils.misc import load_object from scrapy.utils.serialize import ScrapyJSONEncoder from twisted.internet.threads import deferToThread from . import connection, defaults default_serialize = ScrapyJSONEncoder().encode class RedisPipeline(object): """Pushes serialized item into a redis list/queue Settings -------- REDIS_ITEMS_KEY : str Redis key where to store items. REDIS_ITEMS_SERIALIZER : str Object path to serializer function. """ def __init__(self, server, key=defaults.PIPELINE_KEY, serialize_func=default_serialize): """Initialize pipeline. Parameters ---------- server : StrictRedis Redis client instance. key : str Redis key where to store items. serialize_func : callable Items serializer function. """ self.server = server self.key = key self.serialize = serialize_func @classmethod def from_settings(cls, settings): params = { 'server': connection.from_settings(settings), } if settings.get('REDIS_ITEMS_KEY'): params['key'] = settings['REDIS_ITEMS_KEY'] if settings.get('REDIS_ITEMS_SERIALIZER'): params['serialize_func'] = load_object( settings['REDIS_ITEMS_SERIALIZER'] ) return cls(**params) @classmethod def from_crawler(cls, crawler): return cls.from_settings(crawler.settings) def process_item(self, item, spider): return deferToThread(self._process_item, item, spider) def _process_item(self, item, spider): key = self.item_key(item, spider) data = self.serialize(item) self.server.rpush(key, data) return item def item_key(self, item, spider): """Returns redis key based on given spider. Override this function to use a different key depending on the item and/or spider. """ return self.key % {'spider': spider.name}
pipeline文件实现了一个item pipieline类,和scrapy的item pipeline是同一个对象,通过从settings中拿到我们配置的REDIS_ITEMS_KEY作为key,把item串行化之后存入redis数据库对应的value中(这个value可以看出出是个list,我们的每个item是这个list中的一个结点),这个pipeline把提取出的item存起来,主要是为了方便我们延后处理数据。
queue.py
支持三种队列, 都继承自Base
类
1. FIFO Queue
使用了redis的list结构
class FifoQueue(Base): def __len__(self): """返回队列长度大小""" return self.server.llen(self.key) def push(self, request): """发送请求到队列左边""" self.server.lpush(self.key, self._encode_request(request)) def pop(self, timeout=0): """从队列右边抛出请求""" if timeout > 0: data = self.server.brpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.rpop(self.key) if data: return self._decode_request(data)
2. PriorityQueue
使用了redis的有序集合结构
class PriorityQueue(Base): def __len__(self): """返回队列内长度大小""" return self.server.zcard(self.key) def push(self, request): """放入请求到zset中""" data = self._encode_request(request) score = -request.priority self.server.execute_command('ZADD', self.key, score, data) def pop(self, timeout=0): """从zset中抛出请求. 此处不支持timeout参数""" pipe = self.server.pipeline() pipe.multi() pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0) results, count = pipe.execute() if results: return self._decode_request(results[0])
使用redis的sorted set
实现, 如果在spider脚本中需要指定priority
的话, 一定要在settings
中来声明使用的是PriorityQueue
.
3. LIFO Queue
后入先出, 使用list结构实现
class LifoQueue(Base): """Per-spider LIFO queue.""" def __len__(self): """Return the length of the stack""" return self.server.llen(self.key) def push(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request)) def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.blpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.lpop(self.key) if data: return self._decode_request(data)
和先进先出队列基本一样, 实现了栈结构
该文件实现了几个容器类,可以看这些容器和redis交互频繁,同时使用了我们上边picklecompat中定义的serializer。这个文件实现的几个容器大体相同,只不过一个是队列,一个是栈,一个是优先级队列,这三个容器到时候会被scheduler对象实例化,来实现request的调度。比如我们使用SpiderQueue最为调度队列的类型,到时候request的调度方法就是先进先出,而实用SpiderStack就是先进后出了。
我们可以仔细看看SpiderQueue的实现,他的push函数就和其他容器的一样,只不过push进去的request请求先被scrapy的接口request_to_dict变成了一个dict对象(因为request对象实在是比较复杂,有方法有属性不好串行化),之后使用picklecompat中的serializer串行化为字符串,然后使用一个特定的key存入redis中(该key在同一种spider中是相同的)。而调用pop时,其实就是从redis用那个特定的key去读其值(一个list),从list中读取最早进去的那个,于是就先进先出了。
这些容器类都会作为scheduler调度request的容器,scheduler在每个主机上都会实例化一个,并且和spider一一对应,所以分布式运行时会有一个spider的多个实例和一个scheduler的多个实例存在于不同的主机上,但是,因为scheduler都是用相同的容器,而这些容器都连接同一个redis服务器,又都使用spider名加queue来作为key读写数据,所以不同主机上的不同爬虫实例公用一个request调度池,实现了分布式爬虫之间的统一调度。
dupefilter.py
scrapy默认使用了集合结构来进行去重, 在scrapy-redis中使用redis的集合(set)
进行了替换, 请求指纹的计算方法还是用的内置的.
def request_seen(self, request): """获取请求指纹并添加到redis的去重集合中去""" fp = self.request_fingerprint(request) # 得到请求的指纹 added = self.server.sadd(self.key, fp) # 把指纹添加到redis的集合中 return added == 0 def request_fingerprint(self, request): return request_fingerprint(request) # 得到请求指纹
去重指纹计算使用的是sha1算法, 计算值包括请求方法, url, body等信息
这个文件看起来比较复杂,重写了scrapy本身已经实现的request判重功能。因为本身scrapy单机跑的话,只需要读取内存中的request队列或者持久化的request队列(scrapy默认的持久化似乎是json格式的文件,不是数据库)就能判断这次要发出的request url是否已经请求过或者正在调度(本地读就行了)。而分布式跑的话,就需要各个主机上的scheduler都连接同一个数据库的同一个request池来判断这次的请求是否是重复的了。
在这个文件中,通过继承BaseDupeFilter重写他的方法,实现了基于redis的判重。根据源代码来看,scrapy-redis使用了scrapy本身的一个fingerprint接request_fingerprint,这个接口很有趣,根据scrapy文档所说,他通过hash来判断两个url是否相同(相同的url会生成相同的hash结果),但是当两个url的地址相同,get型参数相同但是顺序不同时,也会生成相同的hash结果(这个真的比较神奇。。。)所以scrapy-redis依旧使用url的fingerprint来判断request请求是否已经出现过。这个类通过连接redis,使用一个key来向redis的一个set中插入fingerprint(这个key对于同一种spider是相同的,redis是一个key-value的数据库,如果key是相同的,访问到的值就是相同的,这里使用spider名字+DupeFilter的key就是为了在不同主机上的不同爬虫实例,只要属于同一种spider,就会访问到同一个set,而这个set就是他们的url判重池),如果返回值为0,说明该set中该fingerprint已经存在(因为集合是没有重复值的),则返回False,如果返回值为1,说明添加了一个fingerprint到set中,则说明这个request没有重复,于是返回True,还顺便把新fingerprint加入到数据库中了。
DupeFilter判重会在scheduler类中用到,每一个request在进入调度之前都要进行判重,如果重复就不需要参加调度,直接舍弃就好了,不然就是白白浪费资源。
spider.py
spider空闲的时候会从start_urls
队列中读取url, 默认一次读取CONCURRENT_REQUESTS
个url, 可以在settings
中设置REDIS_START_URLS_BATCH_SIZE
来改变每次的读取数量, 一般我会在使用的时候增大这个值, 可以降低spide进入idle的次数, 从而适当提升抓取性能
def setup_redis(self, crawler=None): """初始化了redis参数, 包括使用的种子url的key, 批量读取url的数量等信息""" ...... # 当spider空闲的时候会触发该信号, 调用spider_idle函数 crawler.signals.connect(self.spider_idle, signal=signals.spider_idle) def spider_idle(self): """空闲的时候触发该函数, 尝试请求下一批url. 有url的时候会直接请求, 最后都会抛出异常, 防止spider被关闭, 然后等待新的url过来""" self.schedule_next_requests() raise DontCloseSpider
spider的改动也不是很大,主要是通过connect接口,给spider绑定了spider_idle信号,spider初始化时,通过setup_redis函数初始化好和redis的连接,之后通过next_requests函数从redis中取出strat url,使用的key是settings中REDIS_START_URLS_AS_SET定义的(注意了这里的初始化url池和我们上边的queue的url池不是一个东西,queue的池是用于调度的,初始化url池是存放入口url的,他们都存在redis中,但是使用不同的key来区分,就当成是不同的表吧),spider使用少量的start url,可以发展出很多新的url,这些url会进入scheduler进行判重和调度。直到spider跑到调度池内没有url的时候,会触发spider_idle信号,从而触发spider的next_requests函数,再次从redis的start url池中读取一些url。
总结:
crapy-redis的总体思路:这个工程通过重写scheduler和spider类,实现了调度、spider启动和redis的交互。
实现新的dupefilter和queue类,达到了判重和调度容器和redis的交互,因为每个主机上的爬虫进程都访问同一个redis数据库,所以调度和判重都统一进行统一管理,达到了分布式爬虫的目的。
当spider被初始化时,同时会初始化一个对应的scheduler对象,这个调度器对象通过读取settings,配置好自己的调度容器queue和判重工具dupefilter。
每当一个spider产出一个request的时候,scrapy内核会把这个reuqest递交给这个spider对应的scheduler对象进行调度,scheduler对象通过访问redis对request进行判重,如果不重复就把他添加进redis中的调度池。当调度条件满足时,scheduler对象就从redis的调度池中取出一个request发送给spider,让他爬取。当spider爬取的所有暂时可用url之后,scheduler发现这个spider对应的redis的调度池空了,于是触发信号spider_idle,spider收到这个信号之后,直接连接redis读取strart url池,拿去新的一批url入口,然后再次重复上边的工作