zoukankan      html  css  js  c++  java
  • 浅析Scrapy框架运行的基本流程

    本篇博客将从Twisted的下载任务基本流程开始介绍,然后再一步步过渡到Scrapy框架的基本运行流程,其中还会需要我们自定义一个Low版的Scrapy框架。但内容不会涉及太多具体细节,而且需要注意的是示例代码的运行过程不会Scrapy一模一样,但不影响你对整体的把握。希望可以帮助那些刚入门爬虫或者刚学习Scrapy的同学理清思路,做到对Scrapy的运行流程有个大概把握,这样以后在继续深入Scrapy框架或者扩展其应用时更加得心应手。(PS:大佬可忽略:))

    一、Twisted的下载任务基本过程

    首先需要我们导入三个基本的模块

    1 from twisted.internet import reactor   
    2 from twisted.web.client import getPage 
    3 from twisted.internet import defer    
    • reactor的作用是开启事件循环,可以简单理解为select或者epoll,循环监听socket对象,一旦连接成功或者响应完成,就移除掉对应的socket,不再监听。(终止条件,所有的socket都已移除)

    • getPage的作用是自动帮我们创建socket对象,模拟浏览器给服务器发送请求,放到reactor里面,一旦下载到需要的数据,socket将发生变化,就可以被reactor监听到。(如果下载完成,自动从事件循环中移除)

    • defer里面的defer.Deferred()的作用是创建一个特殊的socket对象,它不会发送请求,如果放在事件循环里面,将永远不会被移除掉,需要我们自己手动移除,作用就是使事件循环一直监听,不会停止。(不会发请求,手动移除)

    1、利用getPage创建socket

    def response(content):
          print(content)
    
    
    def task():
          url = "http://www.baidu.com"
          d = getPage(url)  # 创建socket对象,获取返回的对象d,它其实也是一个Deferred对象
          d.addCallback(response)  # 当页面下载完成,自动执行函数response,来处理返回数据

    上面代码的内部执行过程就是自动创建socket对象,并且发送请求,如果请求成功拿到返回的值,自动调用函数response来处理返回的内容。

    但以上内容你可以理解只是创建了socket,并且给了运行结束后应该执行的函数。但还没有加到事件循环。

    2.将socket添加到事件循环中

    def response(content):
          print(content)
    
    @defer.inlineCallbacks  # 添加事件循环
    def task():
          url = "http://www.baidu.com"
          d = getPage(url)
          d.addCallback(response)
          yield d

    3、开始事件循环

    def response(content):
          print(content)
    
    @defer.inlineCallbacks
    def task():
          url = "http://www.baidu.com"
          d = getPage(url.encode('utf-8'))
          d.addCallback(response)
          yield d
    
    
      task()  # 执行函数
      reactor.run()  # 开启事件循环。不先执行函数的话,在内部循环中该函数是不会执行的。

    但是上面的代码运行后,无法自动终止,需要有一个东西来监听着,当我们发的所有url回来时终止事件循环

    4、开始事件循环(自动结束)

    def response(content):
          print(content)
            
            
    @defer.inlineCallbacks
    def task():
        url = "http://www.baidu.com"
        d = getPage(url.encode('utf-8'))
        d.addCallback(response)
        yield d
    
    
      d = task()
      dd = defer.DeferredList([d,])
      dd.addBoth(lambda _:reactor.stop())  # 监听d是否完成或者失败,如果是,调用回调函数,终止事件循环
    
      reactor.run()

    上面的代码此时已经实现了基本的功能,但我们还需要再加最后一个功能,就是并发。让函数task可以一次性发送多个请求,并且在所有请求都回来后再终止事件循环。这时就要用到defer.Deferred

    5、并发操作

    _closewait = None
    
    def response(content):
    
          print(content)
          
    @defer.inlineCallbacks
    def task():
          """
          每个爬虫的开始:stats_request
          :return:
          """
          url = "http://www.baidu.com"
          d1 = getPage(url.encode('utf-8'))
          d1.addCallback(response)
    
          url = "http://www.cnblogs.com"
          d2 = getPage(url.encode('utf-8'))
          d2.addCallback(response)
    
          url = "http://www.bing.com"
          d3 = getPage(url.encode('utf-8'))
          d3.addCallback(response)
    
          global _closewait
          _close = defer.Deferred()
          yield _closewait
    
    
    spider1 = task()
    dd = defer.DeferredList([spider1,])
    dd.addBoth(lambda _:reactor.stop())
    
    reactor.run()

    调用defer.Deferred,使事件循环一直运行,直到发出去的每个请求都返回。但是上面的代码,在请求都回来之后,事件循环还是不会停止,需要我们手动关闭defer.Deferred

    6、手动关闭defer.Deferred

    _closewait = None
    count = 0
    def response(content):
    
          print(content)
          global count
          count += 1
          if count == 3:  # 当等于3时,说明发出去的url的响应都拿到了
              _closewait.callback(None)
    
    @defer.inlineCallbacks
    def task():
          """
          每个爬虫的开始:stats_request
          :return:
          """
          url = "http://www.baidu.com"
          d1 = getPage(url.encode('utf-8'))
          d1.addCallback(response)
    
          url = "http://www.cnblogs.com"
          d2 = getPage(url.encode('utf-8'))
          d2.addCallback(response)
    
          url = "http://www.bing.com"
          d3 = getPage(url.encode('utf-8'))
          d3.addCallback(response)
    
          global _closewait
          _close = defer.Deferred()
          yield _closewait
    
    
    spider = task()
    dd = defer.DeferredList([spider, ])
    dd.addBoth(lambda _:reactor.stop())
    
    reactor.run()

    总结上面的流程:

    (1)利用getPage创建socket

    (2)将socket添加到事件循环中

    (3)开始事件循环 (内部发送请求,并接受响应;当所有的socekt请求完成后,终止事件循环)

    二、low版Scrapy框架

    在理解了上面的内容之后,那么接下来的部分将会变得轻松,基本的思想和上面差不多。

    1、首先我们创建一个爬虫ChoutiSpider,如下代码所示

    有些人可能不知道start_requests是什么,其实这个跟我们用Scrapy创建的一样,这个函数在我们继承的scrapy.Spider里面,作用就是取start_url列表里面的url,并把url和规定好的回调函数parse传给Request类处理。需要注意的是这里的Request我们只是简单的接收来自start_requests传来的值,不做真正的处理,例如发送url请求什么的

    from twisted.internet import defer
    from twisted.web.client import getPage
    from twisted.internet import reactor
    
    
            
    class Request(object):
    
        def __init__(self,url,callback):
            self.url = url
            self.callback = callback
            
    
    class ChoutiSpider(object):
        name = 'chouti'
    
        def start_requests(self):
            start_url = ['http://www.baidu.com','http://www.bing.com',]
            for url in start_url:
                yield Request(url,self.parse)
    
        def parse(self,response):
            print(response) #response是下载的页面
          
            yield Request('http://www.cnblogs.com',callback=self.parse)

    2、创建引擎Engine,用来接收需要执行的spider对象

    from twisted.internet import defer
    from twisted.web.client import getPage
    from twisted.internet import reactor
    
    
    class Request(object):
    
        def __init__(self,url,callback):
            self.url = url
            self.callback = callback
            
    
    class ChoutiSpider(object):
        name = 'chouti'
    
        def start_requests(self):
            start_url = ['http://www.baidu.com','http://www.bing.com',]
            for url in start_url:
                yield Request(url,self.parse)
    
        def parse(self,response):
            print(response) #response是下载的页面
          
            yield Request('http://www.cnblogs.com',callback=self.parse)
            
            
    import queue
    Q = queue.Queue()  # 调度器
    
    class Engine(object):
    
        def __init__(self):
            self._closewait = None
        
        @defer.inlineCallbacks
        def crawl(self,spider):
           
            start_requests = iter(spider.start_requests())  # 把生成器Request对象转换为迭代器
            while True:
                try:
                    request = next(start_requests)  # 把Request对象放到调度器里面
                    Q.put(request)
                except StopIteration as e:
                    break
                    
    
            self._closewait = defer.Deferred()
            yield self._closewait
           

    3、更加具体的实现过程

    让引擎实现把Request对象加到调度器Q(队列)里面,然后再从调度器取值进行发送、接收和数据处理。

    from twisted.internet import defer
    from twisted.web.client import getPage
    from twisted.internet import reactor
    
            
    class Request(object):
    
        def __init__(self,url,callback):
            self.url = url
            self.callback = callback
    
    
    class ChoutiSpider(object):
        name = 'chouti'
    
        def start_requests(self):
            start_url = ['http://www.baidu.com','http://www.bing.com',]
            for url in start_url:
                yield Request(url,self.parse)
    
        def parse(self,response):
            print(response) #response是下载的页面
            
            yield Request('http://www.cnblogs.com',callback=self.parse)
            
            
    import queue
    Q = queue.Queue()
    
    
    class Engine(object):
    
        def __init__(self):
            self._closewait = None
            self.max = 5  # 最大并发数
            self.crawlling = []  # 正在爬取的爬虫个数
        
        def get_response_callback(self,content,request):
            # content就是返回的response, request就是一开始返回的迭代对象
            self.crawlling.remove(request)
            result = request.callback(content)  # 执行parse方法
            import types
            if isinstance(result,types.GeneratorType):  # 判断返回值是否是yield,如
                for req in result:                      # 果是则加入到队列中
                    Q.put(req)
        
        def _next_request(self):
            """
            去调度器中取request对象,并发送请求
            最大并发数限制
            :return:
            """
            # 什么时候终止
            if Q.qsize() == 0 and len(self.crawlling) == 0:
                self._closewait.callback(None)  # 结束defer.Deferred 
                return
    
            if len(self.crawlling) >= self.max:
                return
            while len(self.crawlling) < self.max:
                try:
                    req = Q.get(block=False)  # 设置为非堵塞
                    self.crawlling.append(req)
                    d = getPage(req.url.encode('utf-8'))  # 发送请求
                    # 页面下载完成,执行get_response_callback,并且将一开始的迭代对象req作为参数传过去
                    d.addCallback(self.get_response_callback,req)
                    # 未达到最大并发数,可以再去调度器中获取Request
                    d.addCallback(lambda _:reactor.callLater(0, self._next_request))  # callLater表示多久后调用
                    
                except Exception as e:
                    print(e)
                    return
        
        @defer.inlineCallbacks
        def crawl(self,spider):
            # 将初始Request对象添加到调度器
            start_requests = iter(spider.start_requests())
            while True:
                try:
                    request = next(start_requests)
                    Q.put(request)
                except StopIteration as e:
                    break
                    
            # 去调度器中取request,并发送请求
            # self._next_request()
            reactor.callLater(0, self._next_request)
    
            self._close = defer.Deferred()
            yield self._closewait
         
        
    spider = ChoutiSpider()
    _active = set()
    engine = Engine()
    
    d = engine.crawl(spider)  
    _active.add(d)
    dd = defer.DeferredList(_active)
    dd.addBoth(lambda _:reactor.stop())
    
    reactor.run()

    上面其实就是Scrapy框架的基本实现过程,不过要想更进一步的了解,就需要模仿Scrapy的源码,把各个函数的功能独立封装起来,如下。

    三、封装

    from twisted.internet import reactor   
    from twisted.web.client import getPage 
    from twisted.internet import defer     
    from queue import Queue
    
    class Request(object):
        """
        用于封装用户请求相关信息
        """
        def __init__(self,url,callback):
            self.url = url
            self.callback = callback
    
    class HttpResponse(object):  # 对返回的content和request做进一步封装,集合到response里面。然后通过回调函数parse调用response.xxx可以调用不同的功能。具体过程略
    
        def __init__(self,content,request):
            self.content = content
            self.request = request
    
    class Scheduler(object): 
        """
        任务调度器
        """
           def __init__(self):
            self.q = Queue()
    
        def open(self):
            pass
    
        def next_request(self):  # 取调度器里面的值并发送
            try:
                req = self.q.get(block=False)
            except Exception as e:
                req = None
            return req
    
        def enqueue_request(self,req):
            self.q.put(req)
    
        def size(self):
            return self.q.qsize()
    
    class ExecutionEngine(object):
        """
        引擎:所有调度
        """
        def __init__(self):
            self._closewait = None
            self.scheduler = None
            self.max = 5
            self.crawlling = []
        
        def get_response_callback(self,content,request):
            self.crawlling.remove(request) 
            response = HttpResponse(content,request)
            result = request.callback(response)  # 执行request里面的回调函数(parse)
            import types
            if isinstance(result,types.GeneratorType):
                for req in result:
                    self.scheduler.enqueue_request(req)
        
        def _next_request(self):
            if self.scheduler.size() == 0 and len(self.crawlling) == 0: 
                self._closewait.callback(None)
                return
            
            while len(self.crawlling) < self.max:
                req = self.scheduler.next_request()
                if not req:  
                    return
                self.crawlling.append(req)
                d = getPage(req.url.encode('utf-8'))  
                d.addCallback(self.get_response_callback,req)
                d.addCallback(lambda _:reactor.callLater(0,self._next_request))
        
        @defer.inlineCallbacks
        def open_spider(self,start_requests):
            self.scheduler = Scheduler()  # 创建调度器对象
            
            while True:
                try:
                    req = next(start_requests)
                except StopIteration as e:
                    break
                self.scheduler.enqueue_request(req)  # 放进调度器里面
            yield self.scheduler.open()  # 使用那个装饰器后一定要有yield,因此传个   None 相当于yield None
            reactor.callLater(0,self._next_request)   
    
        @defer.inlineCallbacks
        def start(self):  
            self._closewait = defer.Deferred()
            yield self._closewait
    
    class Crawler(object):
        """
        用户封装调度器以及引擎的...
        """
        def _create_engine(self):  # 创建引擎对象 
            return ExecutionEngine()
    
        def _create_spider(self,spider_cls_path):  
            """
    
            :param spider_cls_path:  spider.chouti.ChoutiSpider
            :return:
            """
            module_path,cls_name = spider_cls_path.rsplit('.',maxsplit=1)
            import importlib
            m = importlib.import_module(module_path)
            cls = getattr(m,cls_name)
            return cls()
    
        @defer.inlineCallbacks
        def crawl(self,spider_cls_path): 
            engine = self._create_engine()  # 需要把任务交给引擎去做
            spider = self._create_spider(spider_cls_path)  # 通过路径获取spider,创建对象
            start_requests = iter(spider.start_requests()) 
            yield engine.open_spider(start_requests)  # 把迭代器往调度器里面放
            yield engine.start()  # 相当于self._closewait方法  
    
    class CrawlerProcess(object):
        """
        开启事件循环
        """
        def __init__(self):
            self._active = set()
            
        def crawl(self,spider_cls_path):  # 每个爬虫过来只负责创建一个crawler对象
            """                          
            :param spider_cls_path:
            :return:
            """
            crawler = Crawler()
            d = crawler.crawl(spider_cls_path)
            self._active.add(d)
        
        def start(self):  # 开启事件循环
            dd = defer.DeferredList(self._active)
            dd.addBoth(lambda _:reactor.stop())
    
            reactor.run()  # 想run()起来,则要有装饰器和yield
    
    class Commond(object):
    
            def run(self):
                crawl_process = CrawlerProcess() 
                spider_cls_path_list =  ['spider.chouti.ChoutiSpider','spider.cnblogs.CnblogsSpider',]  
                for spider_cls_path in spider_cls_path_list:
                    crawl_process.crawl(spider_cls_path)  # 创建多个defer对象,也就是把d添加到_active里面
                crawl_process.start()  # 统一调用一次,相当于reactor.run()
    
    
    if __name__ == '__main__':
        cmd = Commond()
        cmd.run()

    其实上面的代码就是从下往上依次执行,每个类之间各个完成不同的功能。

    配合下面的思维导图,希望可以帮助理解。

  • 相关阅读:
    Deep Learning(深度学习)学习笔记整理系列之(一)
    Deep Learning(深度学习)学习笔记整理系列之(八)
    Deep Learning(深度学习)学习笔记整理系列之(七)
    Deep Learning(深度学习)学习笔记整理系列之(三)
    机器学习——深度学习(Deep Learning)
    Deep Learning(深度学习)学习笔记整理系列之(六)
    Deep Learning(深度学习)学习笔记整理系列之(四)
    浅谈深度学习
    Deep Learning(深度学习)学习笔记整理系列之(五)
    Deep Learning(深度学习)学习笔记整理系列之(二)
  • 原文地址:https://www.cnblogs.com/linyuhong/p/10164752.html
Copyright © 2011-2022 走看看