PART 1 REDIS简介
redis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。
参考博客https://www.cnblogs.com/wupeiqi/articles/5132791.html
PART2 基于redis 启动爬虫
settings设置
# ############ 连接redis 信息 ################# REDIS_HOST = '127.0.0.1' # 主机名 REDIS_PORT = 6379 # 端口 # REDIS_URL = 'redis://user:pass@hostname:9001' # 连接URL(优先于以上配置) REDIS_PARAMS = {} # Redis连接参数 默认:REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,}) # REDIS_PARAMS['redis_cls'] = 'myproject.RedisClient' # 指定连接Redis的Python模块 默认:redis.StrictRedis REDIS_ENCODING = "utf-8" DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter" #redis去重 # 有引擎来执行:自定义调度器 SCHEDULER = "scrapy_redis.scheduler.Scheduler" SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue' # 默认使用优先级队列(默认广度优先),其他:PriorityQueue(有序集合),FifoQueue(列表)、LifoQueue(列表) SCHEDULER_QUEUE_KEY = '%(spider)s:requests' # 调度器中请求存放在redis中的key SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat" # 对保存到redis中的数据进行序列化,默认使用pickle SCHEDULER_PERSIST = False # 是否在关闭时候保留原来的调度器和去重记录,True=保留,False=清空 SCHEDULER_FLUSH_ON_START = True # 是否在开始之前清空 调度器和去重记录,True=清空,False=不清空 SCHEDULER_IDLE_BEFORE_CLOSE = 10 # 去调度器中获取数据时,如果为空,最多等待时间(最后没数据,未获取到)。 SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter' # 去重规则,在redis中保存时对应的key chouti:dupefilter SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter' # 去重规则对应处理的类 DUPEFILTER_DEBUG = False # 深度和优先级相关 DEPTH_PRIORITY = 1 # REDIS_START_URLS_BATCH_SIZE = 1 # # REDIS_START_URLS_AS_SET = True # 把起始url放到redis的集合 REDIS_START_URLS_AS_SET = False # 把起始url放到redis的列表
# -*- coding: utf-8 -*- from scrapy_redis.spiders import RedisSpider from scrapy.selector import HtmlXPathSelector from scrapy.http.request import Request from ..items import BigfileItem class Spider1Spider(RedisSpider): name = 'spider1' redis_key = 'chouti:start_urls' allowed_domains = ['chouti.com'] def parse(self, response): print(response) req = Request( url='https://dig.chouti.com/login', method='POST', headers={'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', 'referer': 'https://dig.chouti.com/'}, body='phone=************&password=************&oneMonth=1', meta={"cookiejar": True}, callback=self.check_login, ) yield req
写一个start.py文件连接redis服务,加入初始URL
import redis conn = redis.Redis(host='127.0.0.1', port=6379) # 起始url的Key: chouti:start_urls conn.lpush("chouti:start_urls", 'https://dig.chouti.com')
PART3 大文件下载
items
import scrapy class BigfileItem(scrapy.Item): # define the fields for your item here like: # name = scrapy.Field() url = scrapy.Field() type = scrapy.Field() file_name = scrapy.Field()
piplines,一点一点下载,基于源码方法
# -*- coding: utf-8 -*- # Define your item pipelines here # # Don't forget to add your pipeline to the ITEM_PIPELINES setting # See: http://doc.scrapy.org/en/latest/topics/item-pipeline.html from twisted.web.client import Agent, getPage, ResponseDone, PotentialDataLoss from twisted.python import log, failure, components from twisted.internet import defer, reactor, protocol from twisted.internet import interfaces, error from .middlewares import to_bytes from twisted.web._newclient import Response from io import BytesIO connectionDone = failure.Failure(error.ConnectionDone()) connectionDone.cleanFailure() class _ResponseReader(protocol.Protocol): def __init__(self, finished, txresponse, file_name): self._finished = finished self._txresponse = txresponse self._bytes_received = 0 self.f = open(file_name, mode='wb') def dataReceived(self, bodyBytes): self._bytes_received += len(bodyBytes) # 一点一点的下载 self.f.write(bodyBytes) self.f.flush() def connectionLost(self, reason=connectionDone): if self._finished.called: return if reason.check(ResponseDone): # 下载完成 self._finished.callback((self._txresponse, 'success')) elif reason.check(PotentialDataLoss): # 下载部分 self._finished.callback((self._txresponse, 'partial')) else: # 下载异常 self._finished.errback(reason) self.f.close() class BigfilePipeline(object): def process_item(self, item, spider): # 创建一个下载文件的任务 """ url 必须加http或https前缀,不然会报错 """ if item['type'] == 'file': agent = Agent(reactor) print("开始下载....") d = agent.request( method=b'GET', uri=bytes(item['url'], encoding='ascii') ) # 当文件开始下载之后,自动执行 self._cb_bodyready 方法 d.addCallback(self._cb_bodyready, file_name=item['file_name']) return d else: return item def _cb_bodyready(self, txresponse, file_name): # 创建 Deferred 对象,控制直到下载完成后,再关闭链接 d = defer.Deferred() d.addBoth(self.download_result) # 下载完成/异常/错误之后执行的回调函数 txresponse.deliverBody(_ResponseReader(d, txresponse, file_name)) return d def download_result(self, response): pass
PART4 下载中间件添加代理
方法一:
start_requests中设置代理加入环境变量
os.environ['HTTP_PROXY'] = "http://192.168.11.11”
方法二:
重写下载中间件

import random import base64 import six def to_bytes(text, encoding=None, errors='strict'): """Return the binary representation of `text`. If `text` is already a bytes object, return it as-is.""" if isinstance(text, bytes): return text if not isinstance(text, six.string_types): raise TypeError('to_bytes must receive a unicode, str or bytes ' 'object, got %s' % type(text).__name__) if encoding is None: encoding = 'utf-8' return text.encode(encoding, errors) class MyProxyDownloaderMiddleware(object): def process_request(self, request, spider): proxy_list = [ {'ip_port': '111.11.228.75:80', 'user_pass': ''}, {'ip_port': '120.198.243.22:80', 'user_pass': ''}, {'ip_port': '111.8.60.9:8123', 'user_pass': ''}, {'ip_port': '101.71.27.120:80', 'user_pass': ''}, {'ip_port': '122.96.59.104:80', 'user_pass': ''}, ] proxy = random.choice(proxy_list) if proxy['user_pass'] is not None: request.meta['proxy'] = to_bytes("http://%s" % proxy['ip_port']) encoded_user_pass = base64.encodestring(to_bytes(proxy['user_pass'])) request.headers['Proxy-Authorization'] = to_bytes('Basic ' + encoded_user_pass) else: request.meta['proxy'] = to_bytes("http://%s" % proxy['ip_port'])