- 前戏Twisted使用
- 1.简单实现版本1.0
-
# 事件循环,监听socket变化(终止条件,所有socket都已经移除) from twisted.internet import reactor # 创建socket对象,如果下载完成,自动从事件循环中移除 from twisted.web.client import getPage # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除 from twisted.internet import defer # 1.利用getPage创建socket # 2.将socket添加到事件循环 # 3.开始事件循环(内部发送请求,并接受响应,当所有socket请求完成,终止事件循环) # 1.利用getPage创建socket def response(content): print(content) # 2.将socket添加到事件循环 # 这个装饰器和yield d表示将socket已经添加到事件循环 @defer.inlineCallbacks def task(): url = "http://www.baidu.com" # 根据域名找到你的ip,并创建socket,返回值d和defer.Deferred类似,不会自动终止的特殊socket对象 d = getPage(url.encode('utf-8')) # 利用socket发请求,请求完成拿到值,执行response函数 d.addCallback(response) yield d # 执行task函数 task() # 3.开始事件循环 reactor.run()
-
- 版本2.0
-
# 事件循环,监听socket变化(终止条件,所有socket都已经移除) from twisted.internet import reactor # 创建socket对象,如果下载完成,自动从事件循环中移除 from twisted.web.client import getPage # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除 from twisted.internet import defer # 版本1.0 # 1.1.利用getPage创建socket # 1.2.将socket添加到事件循环 # 1.3.开始事件循环(内部发送请求,并接受响应,当所有socket请求完成,终止事件循环) # 版本2.0 # 2.1.解决不能自动终止的问题 # 1.1. 利用getPage创建socket def response(content): print(content) # 1.2.将socket添加到事件循环 # 这个装饰器和yield d表示将socket已经添加到事件循环 @defer.inlineCallbacks def task(): url = "http://www.baidu.com" # 根据域名找到你的ip,并创建socket,返回值d和defer.Deferred类似,不会自动终止的特殊socket对象 d = getPage(url.encode('utf-8')) # 利用socket发请求,请求完成拿到值,执行response函数 d.addCallback(response) yield d def done(*args,**kwargs): # 终止事件循环 reactor.stop() # 执行task函数 d = task() # 监听d是否完成,需要用列表[d,]加入 dd = defer.DeferredList([d,]) # 监听d是否完成,如果完成就会调用addBoth的回调函数 # 2.1:利用回调函数done终止事件循环 dd.addBoth(done) # 1.3.开始事件循环 reactor.run()
-
- 版本3.0
-
# 事件循环,监听socket变化(终止条件,所有socket都已经移除) from twisted.internet import reactor # 创建socket对象,如果下载完成,自动从事件循环中移除 from twisted.web.client import getPage # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除 from twisted.internet import defer # 版本1.0 # 1.1.利用getPage创建socket # 1.2.将socket添加到事件循环 # 1.3.开始事件循环(内部发送请求,并接受响应,当所有socket请求完成,终止事件循环) # 版本2.0 # 2.1.解决不能自动终止的问题 # 版本3.0 # 3.1.1.解决并发,异步IO的问题--利用多个socket # 1.1. 利用getPage创建socket def response(content): print(content) # 1.2.将socket添加到事件循环 # 这个装饰器和yield d表示将socket已经添加到事件循环 @defer.inlineCallbacks def task(): url = "http://www.baidu.com" # 根据域名找到你的ip,并创建socket,返回值d和defer.Deferred类似,不会自动终止的特殊socket对象 d = getPage(url.encode('utf-8')) # 利用socket发请求,请求完成拿到值,执行response函数 d.addCallback(response) yield d def done(*args, **kwargs): # 终止事件循环 reactor.stop() # 执行task函数 # d = task() # 3.1.1.同时监听多个d,利用多个socket,解决并发的问题,异步IO的问题,全部发出去了,等请求回来 li = [] for i in range(10): d = task() li.append(d) dd = defer.DeferredList(li) # 监听d是否完成,需要用列表[d,]加入 # dd = defer.DeferredList([d,]) # 监听d是否完成,如果完成就会调用addBoth的回调函数 # 2.1:利用回调函数done终止事件循环 dd.addBoth(done) # 1.3.开始事件循环 reactor.run()
- 版本3.1-另一种方法解决并发问题-以及多爬虫同时爬取的并发问题
- 有bug是因为_close只有一个,后面会进行封装,不用多关注
-
# 事件循环,监听socket变化(终止条件,所有socket都已经移除) from twisted.internet import reactor # 创建socket对象,如果下载完成,自动从事件循环中移除 from twisted.web.client import getPage # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除 from twisted.internet import defer # 版本1.0 # 1.1.利用getPage创建socket # 1.2.将socket添加到事件循环 # 1.3.开始事件循环(内部发送请求,并接受响应,当所有socket请求完成,终止事件循环) # 版本2.0 # 2.1.解决不能自动终止的问题 # 版本3.1 # 3.1.解决并发,异步IO的问题--利用task中加入特殊socket对象 # 3.2 加入多个爬虫同时运行的功能--类似scrapy crawl all _close = None count = 0 # 1.1. 利用getPage创建socket def response(content): print(content) global count count += 1 if count == 3: # 使特殊socket对象终止 _close.callback(None) # 1.2.将socket添加到事件循环 # 这个装饰器和yield d表示将socket已经添加到事件循环 @defer.inlineCallbacks def task(): # 3.1:创建多个socket,因为defer.Deferred()特殊对象,不会自动停止 # 设定_close 全局变量,以便能请求全部返还能够手动终止 # 利用全局变量count的计数,去控制特殊对象的终止,只有全部终止才会结束 global _close # 这个相当于scrapy中的start_url url = "http://www.baidu.com" d1 = getPage(url.encode('utf-8')) d1.addCallback(response) url = "http://www.cnblogs.com" d2 = getPage(url.encode('utf-8')) d2.addCallback(response) url = "http://www.bing.com" d3 = getPage(url.encode('utf-8')) d3.addCallback(response) _close = defer.Deferred() yield _close def done(*args, **kwargs): # 终止事件循环 reactor.stop() # 3.2:同时创建多个task即可实现,scrapy爬虫同时执行,2个爬虫有各自的start_url是并发的 # 执行task函数 spider1 = task() spider2 = task() # 监听d是否完成,需要用列表[d,]加入 dd = defer.DeferredList([spider1,spider2]) # 监听d是否完成,如果完成就会调用addBoth的回调函数 # 2.1:利用回调函数done终止事件循环 dd.addBoth(done) # 1.3.开始事件循环 reactor.run()
-
- 1.简单实现版本1.0
- scrapy经验 + Twisted功能
- Low
-
# 事件循环,监听socket变化(终止条件,所有socket都已经移除) from twisted.internet import reactor # 创建socket对象,如果下载完成,自动从事件循环中移除 from twisted.web.client import getPage # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除 from twisted.internet import defer import queue Q = queue.Queue() class Request(object): # 这里的callback = parse def __init__(self, url, callback): self.url = url self.callback = callback class HttpResponse(object): def __init__(self, content, request): self.content = content self.request = request self.url = request.url self.text = str(content, encoding='utf-8') class ChoutiSpider(object): name = 'chouti' def start_requests(self): start_url = ['http://www.baidu.com', 'http://www.bing.com', ] for url in start_url: # 执行Request函数 yield Request(url, self.parse) def parse(self, response): # 1.crawling移除 # 2.获取parse yield返回值 # 3.再次去队列中获取 print(response.text) # 执行HttpResponse()中的方法 yield Request('http://www.cnblogs.com', callback=self.parse) class Engine(object): def __init__(self): self._close = None self.spider = None self.max = 5 # 最大并发数 self.crawling = [] # 表示正在爬取的爬虫 def get_response_callback(self, content, request): # getPage的返回值response,传入的req ---url 和 callback self.crawling.remove(request) # 删除已经下载完成的url callback rep = HttpResponse(content, request) # 将返回值传递过去 # 生成器或空 result = request.callback(rep) # 调用spider中的parse方法 = parse(rep) import types # 判断返回值是不是生成器 if isinstance(result, types.GeneratorType): for req in result: Q.put(req) # 将新请求加入队列 def _next_request(self): # 判断终止条件 if Q.qsize() == 0 and len(self.crawling) == 0: self._close.callback(None) # 手动停止 return # 发送过程中,会有最大并发数的限制,循环取url并下载 if len(self.crawling) >= self.max: # 超过最大并发数,直接返回 return while len(self.crawling) < self.max: # 低于最大并发数 try: req = Q.get(block=False) # 取数据,如果为空会报错,加入block不会等队列中的数据 self.crawling.append(req) # 将取到的url加入记录正在爬取数量的列表crawling d = getPage(req.url.encode('utf-8')) # getPage创建socket对象,发送请求进行下载 # 5.等页面下载完成执行用户自己定义的回调函数,处理response d.addCallback(self.get_response_callback, req) # d为请求的结果 # 未达到最大并发数,可以再去调度器中获取Request # d.addCallback(self._next_request) # 上一个方法执行玩,进行递归调用,继续取url d.addCallback(lambda _: reactor.callLater(0, self._next_request)) # 多久后调用 except Exception as e: # 如果队列为空,直接返回,不再循环取 return # 这个装饰器和yield self._close表示将socket已经添加到事件循环 @defer.inlineCallbacks def crawl(self, spider): # 3.将初始Request对象添加到调度器---将初始urL加入队列 start_requests = iter(spider.start_requests()) # 迭代器---执行spider中start_request函数 while True: try: request = next(start_requests) # 取迭代器中的下一个值 url和callback Q.put(request) # 将取到的值放入队列 except StopIteration as e: # 如果队列取完,就跳出循环 break # 4.反复去调度器中取request并发送请求进行下载,下载完成后执行回调函数 # self._next_request() reactor.callLater(0, self._next_request) # scrapy内部的写法 self._close = defer.Deferred() # 特殊socket不会自动结束,只能手动结束 yield self._close # 爬虫对象 spider = ChoutiSpider() _active = set() # 1.创建引擎 engine = Engine() # 2.将爬虫放入引擎进行处理,执行引擎中crawl函数 d = engine.crawl(spider) _active.add(d) # 监听爬虫d是否完成,如果完成执行addBoth终止socket dd = defer.DeferredList(_active) # 终止socket dd.addBoth(lambda _: reactor.stop()) reactor.run()
-
- High
- engine.py---scrapy主要实现逻辑
-
# 事件循环,监听socket变化(终止条件,所有socket都已经移除) from twisted.internet import reactor # 创建socket对象,如果下载完成,自动从事件循环中移除 from twisted.web.client import getPage # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除 from twisted.internet import defer from queue import Queue ''' scrapy的分工: ExecutionEngine---引擎:作用是帮你做调度,体现是去调度器里拿,或放入调度器 Crawler---创建引擎和spider对象 ''' class Request(object): '''用于封装用户请求相关信息''' def __init__(self, url, callback): # rep的参数 self.url = url self.callback = callback class HttpResponse(object): """封装返回的content和request""" def __init__(self, content, request): self.content = content self.request = request self.url = request.url self.text = str(content, encoding='utf-8') class Scheduler(object): """任务调度器""" def __init__(self): self.q = Queue() # 创建Q对象 def open(self): pass def next_request(self): # 不断重复取队列,如果队列为空,返回 try: req = self.q.get(block=False) except Exception as e: req = None return req def enqueue_request(self, req): # url+callback放入队列 self.q.put(req) def size(self): # 队列大小 return self.q.qsize() class ExecutionEngine(object): """引擎:所有调度""" def __init__(self): self._close = None self.scheduler = None self.max = 5 self.crawlling = [] def get_response_callback(self, content, request): # 24.删除已经下载完成的url callback self.crawlling.remove(request) # 25.将content和requset封装成response response = HttpResponse(content, request) # 26.执行解析函数,解析的结果url加入Q,content返回 result = request.callback(response) import types if isinstance(result, types.GeneratorType): # 如果是url for req in result: self.scheduler.enqueue_request(req) # f放入Q def _next_request(self): # 21.被最大并发数限制不停的取数据,并不断往外发 if self.scheduler.size() == 0 and len(self.crawlling) == 0: self._close.callback(None) return while len(self.crawlling) < self.max: req = self.scheduler.next_request() if not req: return self.crawlling.append(req) # 放入正在爬取列表 # 22. 发请求,此时的d就是下载下来的response d = getPage(req.url.encode('utf-8')) # 23.此时req包括url和callback,执行get_response_callback d.addCallback(self.get_response_callback, req) d.addCallback(lambda _: reactor.callLater(0, self._next_request)) # 在调用一次自己 @defer.inlineCallbacks def open_spider(self, start_requests): # 实例化调度器 self.scheduler = Scheduler() # 17.执行调度器中的open方法 yield self.scheduler.open() # 18.将初始Request对象添加到调度器 while True: try: req = next(start_requests) except StopIteration as e: break # 19.执行调度器enqueue_request self.scheduler.enqueue_request(req) # yield None # 20.相当于self._next_request() reactor.callLater(0, self._next_request) @defer.inlineCallbacks def start(self): self._close = defer.Deferred() yield self._close class Crawler(object): """用于封装调度器以及引擎的...""" def _create_engine(self): # 15.创建一个引擎对象 return ExecutionEngine() def _create_spider(self, spider_cls_path): """ :param spider_cls_path: spider.chouti.ChoutiSpider :return: """ # 分割:模块路径,爬虫名字 module_path, cls_name = spider_cls_path.rsplit('.', maxsplit=1) import importlib # 16.创建一个spider对象 m = importlib.import_module(module_path) cls = getattr(m, cls_name) # 返回m对象的cls_name值,即为爬虫名 return cls() # 返回爬虫对象 @defer.inlineCallbacks # 这个装饰器作用是将socket放入事件循环 def crawl(self, spider_cls_path): # 10.实例化私有方法_create_engine创建引擎 ---15 engine = self._create_engine() # 11.创建spider对象--16 spider = self._create_spider(spider_cls_path) # 12.根据返回的spider对象,执行爬虫的start_requests方法,得到初始url的列表--- start_requests = iter(spider.start_requests()) # 迭代器 # 13.调用引擎的open_spider方法将初始Request对象添加到调度器---17 yield engine.open_spider(start_requests) # 14.执行defer.Deferred等待,_close的结束信号 yield engine.start() class CrawlerProcess(object): """开启事件循环""" # 初始化 def __init__(self): self._active = set() # 创建一个集合 def crawl(self, spider_cls_path): """ :param spider_cls_path: :return: """ # 6.创建用于封装调度器和引擎的Crawler类的实例化对象crawler crawler = Crawler() # 7.执行Crawler类中的crawl方法,将爬虫名作为参数传入---10.返回值d d = crawler.crawl(spider_cls_path) # 8.将d加入正在活动的集合 self._active.add(d) def start(self): # 特殊socket,不会自动结束 # 9.等待_active中的d已经全部完成,如果完成就执行addBoth函数终止该特殊socket dd = defer.DeferredList(self._active) dd.addBoth(lambda _: reactor.stop()) reactor.run() class Commond(object): """自定制的命令""" def run(self): # 3.创建事件循环CrawlerProcess实例化对象crawl_process crawl_process = CrawlerProcess() # 将需要执行的爬虫名加入列表spider_cls_path_list spider_cls_path_list = ['spider.chouti.ChoutiSpider', 'spider.cnblogs.CnblogsSpider', ] for spider_cls_path in spider_cls_path_list: # 4.通过实例化对象crawl_process调用CrawlerProcess类中的crawl方法,将该列表作为参数传入--6 crawl_process.crawl(spider_cls_path) # 5.调用CrawlerProcess类中的start方法---主要通过defer.DeferredList为了限制事件循环的结束 --9 crawl_process.start() if __name__ == '__main__': # 1.创建自定制命令Commond类的实例化对象cmd cmd = Commond() # 2.调用Commond类中的run方法---将需要执行的爬虫交给事件循环类CrawlerProcess--3 cmd.run()
-
- 爬虫1:chouti.py
-
# 需要导入Request from TinySpider.engine import Request class ChoutiSpider(object): name = 'chouti' def start_requests(self): start_url = ['http://www.baidu.com', 'http://www.bing.com', ] for url in start_url: # 执行Request函数 yield Request(url, self.parse) def parse(self, response): # 1.crawling移除 # 2.获取parse yield返回值 # 3.再次去队列中获取 print(response) # 执行HttpResponse()中的方法 yield Request('http://www.cnblogs.com', callback=self.parse)
-
- 爬虫2:cnblogs.py
-
# 需要导入Request from TinySpider.engine import Request class CnblogsSpider(object): name = 'cnblogs' def start_requests(self): start_url = ['http://www.cnblogs.com',] for url in start_url: # 执行Request函数 yield Request(url, self.parse) def parse(self, response): # 1.crawling移除 # 2.获取parse yield返回值 # 3.再次去队列中获取 # print(response.text) # 执行HttpResponse()中的方法 print(response) # 执行HttpResponse()中的方法 yield Request('http://www.cnblogs.com', callback=self.parse)
-
- Low
- Scrapy源码剖析
- 大致框架跟High一样