zoukankan      html  css  js  c++  java
  • <scrapy>scrapy源码剖析

    • 前戏Twisted使用
      • 1.简单实现版本1.0
        • # 事件循环,监听socket变化(终止条件,所有socket都已经移除)
          from twisted.internet import reactor
          # 创建socket对象,如果下载完成,自动从事件循环中移除
          from twisted.web.client import getPage
          # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除
          from twisted.internet import defer
          
          
          # 1.利用getPage创建socket
          # 2.将socket添加到事件循环
          # 3.开始事件循环(内部发送请求,并接受响应,当所有socket请求完成,终止事件循环)
          
          # 1.利用getPage创建socket
          def response(content):
          	print(content)
          
          # 2.将socket添加到事件循环
          # 这个装饰器和yield d表示将socket已经添加到事件循环
          @defer.inlineCallbacks
          def task():
          	url = "http://www.baidu.com"
          	# 根据域名找到你的ip,并创建socket,返回值d和defer.Deferred类似,不会自动终止的特殊socket对象
          	d = getPage(url.encode('utf-8'))
          	# 利用socket发请求,请求完成拿到值,执行response函数
          	d.addCallback(response)
          
          	yield d
          
          # 执行task函数
          task()
          # 3.开始事件循环
          reactor.run()
      • 版本2.0
        • # 事件循环,监听socket变化(终止条件,所有socket都已经移除)
          from twisted.internet import reactor
          # 创建socket对象,如果下载完成,自动从事件循环中移除
          from twisted.web.client import getPage
          # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除
          from twisted.internet import defer
          
          # 版本1.0
          # 1.1.利用getPage创建socket
          # 1.2.将socket添加到事件循环
          # 1.3.开始事件循环(内部发送请求,并接受响应,当所有socket请求完成,终止事件循环)
          
          # 版本2.0
          # 2.1.解决不能自动终止的问题
          
          # 1.1. 利用getPage创建socket
          def response(content):
          	print(content)
          
          # 1.2.将socket添加到事件循环
          # 这个装饰器和yield d表示将socket已经添加到事件循环
          @defer.inlineCallbacks
          def task():
          	url = "http://www.baidu.com"
          	# 根据域名找到你的ip,并创建socket,返回值d和defer.Deferred类似,不会自动终止的特殊socket对象
          	d = getPage(url.encode('utf-8'))
          	# 利用socket发请求,请求完成拿到值,执行response函数
          	d.addCallback(response)
          
          	yield d
          
          def done(*args,**kwargs):
          	# 终止事件循环
          	reactor.stop()
          
          # 执行task函数
          d = task()
          # 监听d是否完成,需要用列表[d,]加入
          dd = defer.DeferredList([d,])
          # 监听d是否完成,如果完成就会调用addBoth的回调函数
          # 2.1:利用回调函数done终止事件循环
          dd.addBoth(done)
          
          
          # 1.3.开始事件循环
          reactor.run() 
      • 版本3.0
          • # 事件循环,监听socket变化(终止条件,所有socket都已经移除)
            from twisted.internet import reactor
            # 创建socket对象,如果下载完成,自动从事件循环中移除
            from twisted.web.client import getPage
            # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除
            from twisted.internet import defer
            
            
            # 版本1.0
            # 1.1.利用getPage创建socket
            # 1.2.将socket添加到事件循环
            # 1.3.开始事件循环(内部发送请求,并接受响应,当所有socket请求完成,终止事件循环)
            
            # 版本2.0
            # 2.1.解决不能自动终止的问题
            
            # 版本3.0
            # 3.1.1.解决并发,异步IO的问题--利用多个socket
            
            # 1.1. 利用getPage创建socket
            def response(content):
            	print(content)
            
            
            # 1.2.将socket添加到事件循环
            # 这个装饰器和yield d表示将socket已经添加到事件循环
            @defer.inlineCallbacks
            def task():
            	url = "http://www.baidu.com"
            	# 根据域名找到你的ip,并创建socket,返回值d和defer.Deferred类似,不会自动终止的特殊socket对象
            	d = getPage(url.encode('utf-8'))
            	# 利用socket发请求,请求完成拿到值,执行response函数
            	d.addCallback(response)
            
            	yield d
            
            
            def done(*args, **kwargs):
            	# 终止事件循环
            	reactor.stop()
            
            
            # 执行task函数
            # d = task()
            
            # 3.1.1.同时监听多个d,利用多个socket,解决并发的问题,异步IO的问题,全部发出去了,等请求回来
            li = []
            for i in range(10):
            	d = task()
            	li.append(d)
            
            dd = defer.DeferredList(li)
            
            # 监听d是否完成,需要用列表[d,]加入
            # dd = defer.DeferredList([d,])
            
            # 监听d是否完成,如果完成就会调用addBoth的回调函数
            # 2.1:利用回调函数done终止事件循环
            dd.addBoth(done)
            
            # 1.3.开始事件循环
            reactor.run()
          版本3.1-另一种方法解决并发问题-以及多爬虫同时爬取的并发问题
          • 有bug是因为_close只有一个,后面会进行封装,不用多关注
          • # 事件循环,监听socket变化(终止条件,所有socket都已经移除)
            from twisted.internet import reactor
            # 创建socket对象,如果下载完成,自动从事件循环中移除
            from twisted.web.client import getPage
            # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除
            from twisted.internet import defer
            
            # 版本1.0
            # 1.1.利用getPage创建socket
            # 1.2.将socket添加到事件循环
            # 1.3.开始事件循环(内部发送请求,并接受响应,当所有socket请求完成,终止事件循环)
            
            # 版本2.0
            # 2.1.解决不能自动终止的问题
            
            # 版本3.1
            # 3.1.解决并发,异步IO的问题--利用task中加入特殊socket对象
            # 3.2 加入多个爬虫同时运行的功能--类似scrapy crawl all
            
            
            _close = None
            count = 0
            
            
            # 1.1. 利用getPage创建socket
            def response(content):
            	print(content)
            	global count
            	count += 1
            	if count == 3:
            		# 使特殊socket对象终止
            		_close.callback(None)
            
            
            # 1.2.将socket添加到事件循环
            # 这个装饰器和yield d表示将socket已经添加到事件循环
            @defer.inlineCallbacks
            def task():
            	# 3.1:创建多个socket,因为defer.Deferred()特殊对象,不会自动停止
            	# 设定_close 全局变量,以便能请求全部返还能够手动终止
            	# 利用全局变量count的计数,去控制特殊对象的终止,只有全部终止才会结束
            	global _close
            
            	# 这个相当于scrapy中的start_url
            	url = "http://www.baidu.com"
            	d1 = getPage(url.encode('utf-8'))
            	d1.addCallback(response)
            
            	url = "http://www.cnblogs.com"
            	d2 = getPage(url.encode('utf-8'))
            	d2.addCallback(response)
            
            	url = "http://www.bing.com"
            	d3 = getPage(url.encode('utf-8'))
            	d3.addCallback(response)
            
            	_close = defer.Deferred()
            
            	yield _close
            
            
            def done(*args, **kwargs):
            	# 终止事件循环
            	reactor.stop()
            
            
            # 3.2:同时创建多个task即可实现,scrapy爬虫同时执行,2个爬虫有各自的start_url是并发的
            # 执行task函数
            spider1 = task()
            spider2 = task()
            
            # 监听d是否完成,需要用列表[d,]加入
            dd = defer.DeferredList([spider1,spider2])
            
            # 监听d是否完成,如果完成就会调用addBoth的回调函数
            # 2.1:利用回调函数done终止事件循环
            dd.addBoth(done)
            
            # 1.3.开始事件循环
            reactor.run()
                          
    • scrapy经验 + Twisted功能
      • Low
        • # 事件循环,监听socket变化(终止条件,所有socket都已经移除)
          from twisted.internet import reactor
          # 创建socket对象,如果下载完成,自动从事件循环中移除
          from twisted.web.client import getPage
          # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除
          from twisted.internet import defer
          import queue
          
          Q = queue.Queue()
          
          
          class Request(object):
          	# 这里的callback = parse
          	def __init__(self, url, callback):
          		self.url = url
          		self.callback = callback
          
          
          class HttpResponse(object):
          
          	def __init__(self, content, request):
          		self.content = content
          		self.request = request
          		self.url = request.url
          		self.text = str(content, encoding='utf-8')
          
          
          class ChoutiSpider(object):
          	name = 'chouti'
          
          	def start_requests(self):
          		start_url = ['http://www.baidu.com', 'http://www.bing.com', ]
          		for url in start_url:
          			# 执行Request函数
          			yield Request(url, self.parse)
          
          	def parse(self, response):
          		# 1.crawling移除
          		# 2.获取parse yield返回值
          		# 3.再次去队列中获取
          		print(response.text)  # 执行HttpResponse()中的方法
          		yield Request('http://www.cnblogs.com', callback=self.parse)
          
          
          class Engine(object):
          	def __init__(self):
          		self._close = None
          		self.spider = None
          		self.max = 5  # 最大并发数
          		self.crawling = []  # 表示正在爬取的爬虫
          
          	def get_response_callback(self, content, request):
          		# getPage的返回值response,传入的req ---url 和 callback
          		self.crawling.remove(request)  # 删除已经下载完成的url callback
          		rep = HttpResponse(content, request)  # 将返回值传递过去
          		# 生成器或空
          		result = request.callback(rep)  # 调用spider中的parse方法 = parse(rep)
          		import types
          		# 判断返回值是不是生成器
          		if isinstance(result, types.GeneratorType):
          			for req in result:
          				Q.put(req)  # 将新请求加入队列
          
          	def _next_request(self):
          		# 判断终止条件
          		if Q.qsize() == 0 and len(self.crawling) == 0:
          			self._close.callback(None)  # 手动停止
          			return
          
          		# 发送过程中,会有最大并发数的限制,循环取url并下载
          		if len(self.crawling) >= self.max:  # 超过最大并发数,直接返回
          			return
          		while len(self.crawling) < self.max:  # 低于最大并发数
          			try:
          				req = Q.get(block=False)  # 取数据,如果为空会报错,加入block不会等队列中的数据
          				self.crawling.append(req)  # 将取到的url加入记录正在爬取数量的列表crawling
          				d = getPage(req.url.encode('utf-8'))  # getPage创建socket对象,发送请求进行下载
          
          				# 5.等页面下载完成执行用户自己定义的回调函数,处理response
          				d.addCallback(self.get_response_callback, req)  # d为请求的结果
          				# 未达到最大并发数,可以再去调度器中获取Request
          				# d.addCallback(self._next_request)  # 上一个方法执行玩,进行递归调用,继续取url
          				d.addCallback(lambda _: reactor.callLater(0, self._next_request))  # 多久后调用
          			except Exception as e:  # 如果队列为空,直接返回,不再循环取
          				return
          
          	# 这个装饰器和yield self._close表示将socket已经添加到事件循环
          	@defer.inlineCallbacks
          	def crawl(self, spider):
          		# 3.将初始Request对象添加到调度器---将初始urL加入队列
          		start_requests = iter(spider.start_requests())  # 迭代器---执行spider中start_request函数
          		while True:
          			try:
          				request = next(start_requests)  # 取迭代器中的下一个值 url和callback
          				Q.put(request)  # 将取到的值放入队列
          			except StopIteration as e:  # 如果队列取完,就跳出循环
          				break
          
          		# 4.反复去调度器中取request并发送请求进行下载,下载完成后执行回调函数
          		# self._next_request()
          		reactor.callLater(0, self._next_request)  # scrapy内部的写法
          
          		self._close = defer.Deferred()  # 特殊socket不会自动结束,只能手动结束
          		yield self._close
          
          
          # 爬虫对象
          spider = ChoutiSpider()
          
          _active = set()
          # 1.创建引擎
          engine = Engine()
          # 2.将爬虫放入引擎进行处理,执行引擎中crawl函数
          d = engine.crawl(spider)
          
          _active.add(d)
          # 监听爬虫d是否完成,如果完成执行addBoth终止socket
          dd = defer.DeferredList(_active)
          # 终止socket
          dd.addBoth(lambda _: reactor.stop())
          
          reactor.run()
            
      • High
        •   
        • engine.py---scrapy主要实现逻辑
          • # 事件循环,监听socket变化(终止条件,所有socket都已经移除)
            from twisted.internet import reactor
            # 创建socket对象,如果下载完成,自动从事件循环中移除
            from twisted.web.client import getPage
            # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除
            from twisted.internet import defer
            from queue import Queue
            
            '''
            scrapy的分工:
            ExecutionEngine---引擎:作用是帮你做调度,体现是去调度器里拿,或放入调度器
            Crawler---创建引擎和spider对象
            '''
            
            
            class Request(object):
            	'''用于封装用户请求相关信息'''
            
            	def __init__(self, url, callback):
            		# rep的参数
            		self.url = url
            		self.callback = callback
            
            
            class HttpResponse(object):
            	"""封装返回的content和request"""
            	def __init__(self, content, request):
            		self.content = content
            		self.request = request
            		self.url = request.url
            		self.text = str(content, encoding='utf-8')
            
            
            class Scheduler(object):
            	"""任务调度器"""
            
            	def __init__(self):
            		self.q = Queue()  # 创建Q对象
            
            	def open(self):
            		pass
            
            	def next_request(self):
            		# 不断重复取队列,如果队列为空,返回
            		try:
            			req = self.q.get(block=False)
            		except Exception as e:
            			req = None
            		return req
            
            	def enqueue_request(self, req):
            		# url+callback放入队列
            		self.q.put(req)
            
            	def size(self):
            		# 队列大小
            		return self.q.qsize()
            
            
            class ExecutionEngine(object):
            	"""引擎:所有调度"""
            
            	def __init__(self):
            		self._close = None
            		self.scheduler = None
            		self.max = 5
            		self.crawlling = []
            
            	def get_response_callback(self, content, request):
            		# 24.删除已经下载完成的url callback
            		self.crawlling.remove(request)
            		# 25.将content和requset封装成response
            		response = HttpResponse(content, request)
            		# 26.执行解析函数,解析的结果url加入Q,content返回
            		result = request.callback(response)
            		import types
            		if isinstance(result, types.GeneratorType):  # 如果是url
            			for req in result:
            				self.scheduler.enqueue_request(req)  # f放入Q
            
            	def _next_request(self):
            		# 21.被最大并发数限制不停的取数据,并不断往外发
            		if self.scheduler.size() == 0 and len(self.crawlling) == 0:
            			self._close.callback(None)
            			return
            
            		while len(self.crawlling) < self.max:
            			req = self.scheduler.next_request()
            			if not req:
            				return
            			self.crawlling.append(req)  # 放入正在爬取列表
            			# 22. 发请求,此时的d就是下载下来的response
            			d = getPage(req.url.encode('utf-8'))
            			# 23.此时req包括url和callback,执行get_response_callback
            			d.addCallback(self.get_response_callback, req)
            			d.addCallback(lambda _: reactor.callLater(0, self._next_request))  # 在调用一次自己
            
            	@defer.inlineCallbacks
            	def open_spider(self, start_requests):
            		# 实例化调度器
            		self.scheduler = Scheduler()
            		# 17.执行调度器中的open方法
            		yield self.scheduler.open()
            		# 18.将初始Request对象添加到调度器
            		while True:
            			try:
            				req = next(start_requests)
            			except StopIteration as e:
            				break
            			# 19.执行调度器enqueue_request
            			self.scheduler.enqueue_request(req)
            		# yield None
            		# 20.相当于self._next_request()
            		reactor.callLater(0, self._next_request)
            
            	@defer.inlineCallbacks
            	def start(self):
            		self._close = defer.Deferred()
            		yield self._close
            
            
            class Crawler(object):
            	"""用于封装调度器以及引擎的..."""
            
            	def _create_engine(self):
            		# 15.创建一个引擎对象
            		return ExecutionEngine()
            
            	def _create_spider(self, spider_cls_path):
            		"""
            
            		:param spider_cls_path:  spider.chouti.ChoutiSpider
            		:return:
            		"""
            		# 分割:模块路径,爬虫名字
            		module_path, cls_name = spider_cls_path.rsplit('.', maxsplit=1)
            		import importlib
            		# 16.创建一个spider对象
            		m = importlib.import_module(module_path)
            		cls = getattr(m, cls_name)  # 返回m对象的cls_name值,即为爬虫名
            		return cls()  # 返回爬虫对象
            
            	@defer.inlineCallbacks  # 这个装饰器作用是将socket放入事件循环
            	def crawl(self, spider_cls_path):
            		# 10.实例化私有方法_create_engine创建引擎 ---15
            		engine = self._create_engine()
            		# 11.创建spider对象--16
            		spider = self._create_spider(spider_cls_path)
            		# 12.根据返回的spider对象,执行爬虫的start_requests方法,得到初始url的列表---
            		start_requests = iter(spider.start_requests())  # 迭代器
            		# 13.调用引擎的open_spider方法将初始Request对象添加到调度器---17
            		yield engine.open_spider(start_requests)
            		# 14.执行defer.Deferred等待,_close的结束信号
            		yield engine.start()
            
            
            class CrawlerProcess(object):
            	"""开启事件循环"""
            
            	# 初始化
            	def __init__(self):
            		self._active = set()  # 创建一个集合
            
            	def crawl(self, spider_cls_path):
            		"""
            		:param spider_cls_path:
            		:return:
            		"""
            		# 6.创建用于封装调度器和引擎的Crawler类的实例化对象crawler
            		crawler = Crawler()
            		# 7.执行Crawler类中的crawl方法,将爬虫名作为参数传入---10.返回值d
            		d = crawler.crawl(spider_cls_path)
            		# 8.将d加入正在活动的集合
            		self._active.add(d)
            
            	def start(self):
            		# 特殊socket,不会自动结束
            		# 9.等待_active中的d已经全部完成,如果完成就执行addBoth函数终止该特殊socket
            		dd = defer.DeferredList(self._active)
            		dd.addBoth(lambda _: reactor.stop())
            		reactor.run()
            
            
            class Commond(object):
            	"""自定制的命令"""
            
            	def run(self):
            		# 3.创建事件循环CrawlerProcess实例化对象crawl_process
            		crawl_process = CrawlerProcess()
            		# 将需要执行的爬虫名加入列表spider_cls_path_list
            		spider_cls_path_list = ['spider.chouti.ChoutiSpider', 'spider.cnblogs.CnblogsSpider', ]
            		for spider_cls_path in spider_cls_path_list:
            			# 4.通过实例化对象crawl_process调用CrawlerProcess类中的crawl方法,将该列表作为参数传入--6
            			crawl_process.crawl(spider_cls_path)
            		# 5.调用CrawlerProcess类中的start方法---主要通过defer.DeferredList为了限制事件循环的结束 --9
            		crawl_process.start()
            
            
            if __name__ == '__main__':
            	# 1.创建自定制命令Commond类的实例化对象cmd
            	cmd = Commond()
            	# 2.调用Commond类中的run方法---将需要执行的爬虫交给事件循环类CrawlerProcess--3
            	cmd.run()
        • 爬虫1:chouti.py
          • # 需要导入Request
            from TinySpider.engine import Request
            
            class ChoutiSpider(object):
            	name = 'chouti'
            
            	def start_requests(self):
            		start_url = ['http://www.baidu.com', 'http://www.bing.com', ]
            		for url in start_url:
            			# 执行Request函数
            			yield Request(url, self.parse)
            
            	def parse(self, response):
            		# 1.crawling移除
            		# 2.获取parse yield返回值
            		# 3.再次去队列中获取
            		print(response)  # 执行HttpResponse()中的方法
            		yield Request('http://www.cnblogs.com', callback=self.parse)
        • 爬虫2:cnblogs.py
          • # 需要导入Request
            from TinySpider.engine import Request
            
            class CnblogsSpider(object):
            	name = 'cnblogs'
            
            	def start_requests(self):
            		start_url = ['http://www.cnblogs.com',]
            		for url in start_url:
            			# 执行Request函数
            			yield Request(url, self.parse)
            
            	def parse(self, response):
            		# 1.crawling移除
            		# 2.获取parse yield返回值
            		# 3.再次去队列中获取
            		# print(response.text)  # 执行HttpResponse()中的方法
            		print(response)  # 执行HttpResponse()中的方法
            		yield Request('http://www.cnblogs.com', callback=self.parse)  
                      
    • Scrapy源码剖析
      • 大致框架跟High一样              
  • 相关阅读:
    js 安全
    js压缩 uglify(2)
    js压缩 uglify
    抢红包算法 java
    手机调试
    Java中的随机数生成器:Random,ThreadLocalRandom,SecureRandom
    字符集编码 定长与变长
    db2 sqlcode
    2015.7.14(大盘结束红色,中色连坐4T)
    2015.7.10(全部涨停!想逢高出货,但是担心周一创新高)
  • 原文地址:https://www.cnblogs.com/shuimohei/p/13363462.html
Copyright © 2011-2022 走看看