zoukankan      html  css  js  c++  java
  • Scrapy框架的执行流程解析

    这里主要介绍七个大类
    Command->CrawlerProcess->Crawler->ExecutionEngine->sceduler
    另外还有两个类:Request和HttpRessponse

    执行流程

    1.首先通过Command类中的run方法
    (1).创建开始运行的命令
    (2).将每一个spider对象的路径传入到crawl_process.crawl方法中去
    (3).crawl_process.crawl方法创建一个Crawler对象,通过调用Crawler.crawl方法创建一个引擎和spider对象
    (4).通过引擎的open_spider方法创建一个Scheduler对象,将每一个spider对象加入到schedler队列中去,并且通过自身的_next_request方法对下一次请求进行调度
    (5).调用CrawlerProcess对象的start方法开始事件循环
    2.CrawlerProcess类:
    初始值:创建一个集合_active,用于存放每一个请求的socket对象
    crawl方法:创建一个Crawler对象,调用Crawler.crawl方法,传入spider对象的路径(Crawler.crawl方法的作用是将传入的spider对象的路径通过调用_create_spider方法,
    创建一个spider对象,通过_create_engine方法创建一个引擎,通过调用引擎中的open_spider方法将spider对象的start_reuqests请求加入到scheduler调度器中的
    队列当中,然后调用引擎的中的_next_request方法对下一个请求进行调度。然后yield一个引擎的start方法,start方法返回的是一个defer.Defered对象,
    不发送任何请求,需要手动停止,目的是为了夯住事件循环)并将返回的scoket的对象加入到集合中
    start方法:将该socket对象的集合加到到defer.DeferedList中去,并添加回调函数进行手动停止。开始事件循环

    3.Crawler类:用户封装引擎及spider对象
    _create_engine方法:创建一个引擎
    _create_spider:通过传入的spider对象的路径创建spider对象
    crawl:调用_create_engine方法创建engine,调用_create_spider方法去打开spider对象,对spider中的start_requests请求转化为迭代器,通过调用engine.open_spider方法
    调用next函数将每一个请求加入到调度器中的队列中去,并调用engine._next_request方法对下一次请求进行调度
    4.ExecutionEngine类:
    引擎:所有的调度
    初始值:self._close = None:用于存放一个defer。defered对象, self.scheduler = None用于创建一个队列
    self.max = 5设置最大并发数 self.crawlling = []创建一个正在执行请求的列表
    open_spider方法:
    1.创建一个调度器对象,将传入的start_requests请求通过next方法将每一个请求加入到队列当中去
    2.然后开始事件循环执行_next_request方法对下一次请求的调度
    注:每一个@defer.inlineCallbacks装饰的函数都必须yield一个对象,即使为None
    _next_request方法:
    1.对spider对象的请求进行调度
    2.设置事件循环终止条件:调度器队列中请求的个数为0,正在执行的请求数为0
    3.设置最大并发数,根据正在执行的请求数量满足最大并发数条件对sceduler队列中的请求进行调度执行,并将返回的请求加入到正在执行的列表中
    4.包括对请求进行下载,以及对返回的数据执行callback函数
    5.开始执行事件循环的下一次请求的调度
    get_response_callback方法:
    1.将返回数据的请求从正在执行的列表中移出
    2.将返回的数据调用HttpResponse封装成一个response对象,执行请求的callback函数,并判断如果callback函数返回的结果是一个生成器对象,则将该生成器
    中的每一个对 象加入到调度器中的队列中去
    start方法:不发送任何请求,需要手动停止,目的是为了夯住循环
    5.Scheduler类:任务调度器
    1.初始化一个队列
    2.next_request方法:读取队列中的下一个请求
    3.enqueue_request方法:将请求加入到队列
    4.size方法:返回当前队列请求的数量
    5.open方法:无任何操作,返回一个空值,用于引擎中用装饰器装饰的open_spider方法返回一个yield对象

    6.Resquet类:用于封装用户请求相关信息,供用户编写spider时发送请求所用,主要的两个参数:url和callback
    7.HttpResponse类:
    通过响应请求返回的数据和穿入的request对象封装成一个response对象
    目的是为了将请求返回的数据不仅包括返回的content数据,使其拥有更多的属性,比如请求头,请求url,请求的cookies等等
    更方便的被回调函数所解析有用的数据

    下面是根据自己的理解写的一个小型的scrapy框架:

    from twisted.internet import reactor   # 事件循环(终止条件,所有的socket都已经移除)
    from twisted.web.client import getPage # socket对象(如果下载完成,自动从时间循环中移除...)
    from twisted.internet import defer     # defer.Deferred 特殊的socket对象 (不会发请求,手动移除)
    from queue import Queue
    
    
    class Request(object):
        """
        用于封装用户请求相关信息,供用户编写spider时发送请求所用
        """
        def __init__(self,url,callback):
            self.url = url
            self.callback = callback
    
    
    class HttpResponse(object):
        """
        通过响应请求返回的数据和穿入的request对象封装成一个response对象
        目的是为了将请求返回的数据不仅包括返回的content数据,使其拥有更多的属性,比如请求头,请求url,请求的cookies等等
        更方便的被回调函数所解析有用的数据
        """
        def __init__(self,content,request):
            self.content = content
            self.request = request
            self.url = request.url
            self.text = str(content,encoding='utf-8')
    
    
    class Scheduler(object):
        """
        任务调度器:
        1.初始化一个队列
        2.next_request方法:读取队列中的下一个请求
        3.enqueue_request方法:将请求加入到队列
        4.size方法:返回当前队列请求的数量
        5.open方法:无任何操作,返回一个空值,用于引擎中用装饰器装饰的open_spider方法返回一个yield对象
        """
        def __init__(self):
            self.q = Queue()
    
        def open(self):
            pass
    
        def next_request(self):
            try:
                req = self.q.get(block=False)
            except Exception as e:
                req = None
            return req
    
        def enqueue_request(self,req):
            self.q.put(req)
    
        def size(self):
            return self.q.qsize()
    
    
    class ExecutionEngine(object):
        """
        引擎:所有的调度
        1.通过open_spider方法将start_requests中的每一个请求加入到scdeuler中的队列当中,
        2.处理每一个请求响应之后的回调函数(get_response_callback)和执行下一次请求的调度(_next_request)
        """
        def __init__(self):
            self._close = None
            self.scheduler = None
            self.max = 5
            self.crawlling = []
    
        def get_response_callback(self,content,request):
            self.crawlling.remove(request)
            response = HttpResponse(content,request)
            result = request.callback(response)
            import types
            if isinstance(result,types.GeneratorType):
                for req in result:
                    self.scheduler.enqueue_request(req)
    
        def _next_request(self):
            """
            1.对spider对象的请求进行调度
            2.设置事件循环终止条件:调度器队列中请求的个数为0,正在执行的请求数为0
            3.设置最大并发数,根据正在执行的请求数量满足最大并发数条件对sceduler队列中的请求进行调度执行
            4.包括对请求进行下载,以及对返回的数据执行callback函数
            5.开始执行事件循环的下一次请求的调度
            """
            if self.scheduler.size() == 0 and len(self.crawlling) == 0:
                self._close.callback(None)
                return
            """设置最大并发数为5"""
            while len(self.crawlling) < self.max:
                req = self.scheduler.next_request()
                if not req:
                    return
                self.crawlling.append(req)
                d = getPage(req.url.encode('utf-8'))
                d.addCallback(self.get_response_callback,req)
                d.addCallback(lambda _:reactor.callLater(0,self._next_request))
    
        @defer.inlineCallbacks
        def open_spider(self,start_requests):
            """
            1.创建一个调度器对象
            2.将start_requests中的每一个请求加入到scheduler队列中去
            3.然后开始事件循环执行下一次请求的调度
            注:每一个@defer.inlineCallbacks装饰的函数都必须yield一个对象,即使为None
            """
            self.scheduler = Scheduler()
            yield self.scheduler.open()
            while True:
                try:
                    req = next(start_requests)
                except StopIteration as e:
                    break
                self.scheduler.enqueue_request(req)
            reactor.callLater(0,self._next_request)
    
        @defer.inlineCallbacks
        def start(self):
            """不发送任何请求,需要手动停止,目的是为了夯住循环"""
            self._close = defer.Deferred()
            yield self._close
    
    
    class Crawler(object):
        """
        1.用户封装调度器以及引擎
        2.通过传入的spider对象的路径创建spider对象
        3.创建引擎去打开spider对象,对spider中的每一个请求加入到调度器中的队列中去,通过引擎对请求去进行调度
        """
        def _create_engine(self):
            return ExecutionEngine()
    
        def _create_spider(self,spider_cls_path):
            """
    
            :param spider_cls_path:  spider.chouti.ChoutiSpider
            :return:
            """
            module_path,cls_name = spider_cls_path.rsplit('.',maxsplit=1)
            import importlib
            m = importlib.import_module(module_path)
            cls = getattr(m,cls_name)
            return cls()
    
        @defer.inlineCallbacks
        def crawl(self,spider_cls_path):
            engine = self._create_engine()
            spider = self._create_spider(spider_cls_path)
            start_requests = iter(spider.start_requests())
            yield engine.open_spider(start_requests) #将start_requests中的每一个请求加入到调度器的队列中去,并有引擎调度请求的执行
            yield engine.start() #创建一个defer对象,目的是为了夯住事件循环,手动停止
    
    
    class CrawlerProcess(object):
        """
        1.创建一个Crawler对象
        2.将传入的每一个spider对象的路径传入Crawler.crawl方法
        3.并将返回的socket对象加入到集合中
        4.开始事件循环
        """
        def __init__(self):
            self._active = set()
    
        def crawl(self,spider_cls_path):
            """
            :param spider_cls_path:
            :return:
            """
            crawler = Crawler()
            d = crawler.crawl(spider_cls_path)
            self._active.add(d)
    
        def start(self):
            dd = defer.DeferredList(self._active)
            dd.addBoth(lambda _:reactor.stop())
    
            reactor.run()
    
    
    class Command(object):
        """
        1.创建开始运行的命令
        2.将每一个spider对象的路径传入到crawl_process.crawl方法中去
        3.crawl_process.crawl方法创建一个Crawler对象,通过调用Crawler.crawl方法创建一个引擎和spider对象
        4.通过引擎的open_spider方法创建一个scheduler对象,将每一个spider对象加入到schedler队列中去,并且通过自身的_next_request方法对下一次请求进行调度
        5.
        """
        def run(self):
            crawl_process = CrawlerProcess()
            spider_cls_path_list = ['spider.chouti.ChoutiSpider','spider.cnblogs.CnblogsSpider',]
            for spider_cls_path in spider_cls_path_list:
                crawl_process.crawl(spider_cls_path)
            crawl_process.start()
    
    
    if __name__ == '__main__':
        cmd = Command()
        cmd.run()
    

      

    作者:张亚飞
    出处:https://www.cnblogs.com/zhangyafei
    gitee:https://gitee.com/zhangyafeii
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接。
  • 相关阅读:
    [Java并发编程(三)] Java volatile 关键字介绍
    [Java并发编程(二)] 线程池 FixedThreadPool、CachedThreadPool、ForkJoinPool?为后台任务选择合适的 Java executors
    [Java并发编程(一)] 线程池 FixedThreadPool vs CachedThreadPool ...
    Raft 实现日志复制同步
    Paxos 实现日志复制同步(Multi-Paxos)
    Paxos 实现日志复制同步(Basic Paxos)
    解剖 Elasticsearch 集群
    解剖 Elasticsearch 集群
    小程序-textarea,input内文本浮在定位元素和弹框之上的解决
    将一个多重对象的所有值赋值给另一个对象
  • 原文地址:https://www.cnblogs.com/zhangyafei/p/9575768.html
Copyright © 2011-2022 走看看