zoukankan      html  css  js  c++  java
  • scrapy自定义扩展(extensions)实现实时监控scrapy爬虫的运行状态

    效果图:
    在这里插入图片描述

    废话

    如何知道你写的爬虫有没有正常运行,运行了多长时间,请求了多少个网页,抓到了多少条数据呢?官方其实就提供了一个字典就包含一些抓取的相关信息:crawler.stats.get_stats(),crawler是scrapy中的一个组件。你可以在很多组件中访问他,比如包含from_crawler(cls, crawler)方法的所有组件。

    既然能得到scrapy的运行状态,想要实时显示出来应该也很简单吧。同样是使用上一篇博客用到的influxdb+grafana来展示数据,我们只需要将scrapy的一些运行信息实时同步到influxdb这个数据库,就能通过grafana以图的形式来展示出数据库里的内容了。

    写数据库

    如何实时将字典同步到数据库呢?这里肯定要设定一个同步的时间间隔,假设是5秒。那么我们的需求就是让scrapy每5秒写一次爬虫运行状态的信息到数据库,上面提到能访问到crawler.stats.get_stats()这个的组件有很多,比如中间件、管道、爬虫。我们应该在哪个组件中同步信息?

    这个我们可以先看一些内置的组件分别实现了什么功能,然后看和需求最相似的功能。很明显,功能最为合适的是extensions这个组件了,有很多人可能都没有用过这个组件,我在看很多博客都很少有提到这个组件的,因为这个组件能做的事,其他也能做,用它只是为了让分工更明确而已。所以一些额外的功能一般写入extensions,我们先看看内置的几个都实现了什么样的功能

    • 日志统计信息扩展(scrapy.extensions.logstats.LogStats):记录基本统计信息,例如抓取的页面和已抓取的项目
    • 核心统计信息扩展(scrapy.extensions.corestats.CoreStats): 如果启用了统计信息收集,请启用核心统计信息的收集
    • Telnet控制台扩展(scrapy.extensions.telnet.TelnetConsole): 提供一个telnet控制台,以进入当前正在运行的Scrapy进程内的Python解释器,这对于调试非常有用
    • 内存使用扩展(scrapy.extensions.memusage.MemoryUsage): 此扩展名在Windows中不起作用

    其中日志统计信息扩展就是把crawler.stats.get_stats()这个字典信息写入到日志,这和我要实现的功能基本类似。所以代码可以参考参考。直接看我的代码:

    import logging
    from scrapy import signals
    import datetime
    from threading import Timer
    from influxdb import InfluxDBClient
    
    
    logger = logging.getLogger(__name__)
    
    class SpiderStatLogging:
    
        def __init__(self, crawler, dbparams, interval):
            self.exit_code = False
            self.interval = interval
            self.crawler = crawler
            self.client = InfluxDBClient(**dbparams)
            self.stats_keys = set()
            self.cur_d = {
                'log_info': 0, 
                'log_warning': 0,
                'requested': 0,
                'request_bytes': 0,
                'response': 0,
                'response_bytes': 0,
                'response_200': 0,
                'response_301': 0,
                'response_404': 0,
                'responsed': 0,
                'item': 0,
                'filtered': 0,
            }
    
        @classmethod
        def from_crawler(cls, crawler):
            dbparams = crawler.settings.get('INFLUXDB_PARAMS')
            interval = crawler.settings.get('INTERVAL', 60)
            ext = cls(crawler, dbparams, interval)
            crawler.signals.connect(ext.engine_started, signal=signals.engine_started)
            crawler.signals.connect(ext.engine_stopped, signal=signals.engine_stopped)
            crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
            crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
            return ext
    
        def spider_closed(self, spider, reason):
            logger.info(self.stats_keys)
            influxdb_d = {
                "measurement": "spider_closed",
                "time": datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),
                "tags": {
                    'spider_name': spider.name
                },
                "fields": {
                            'end_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 
                            'reason': reason,
                            'spider_name':spider.name
                        }
            }
            if not self.client.write_points([influxdb_d]):
                raise Exception('写入influxdb失败!')
            
        def spider_opened(self, spider):
            influxdb_d = {
                "measurement": "spider_opened",
                "time": datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),
                "tags": {
                    'spider_name': spider.name
                },
                "fields": {
                            'start_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                            'spider_name':spider.name
                        }
            }
            if not self.client.write_points([influxdb_d]):
                raise Exception('写入influxdb失败!')
    
        def engine_started(self):
            Timer(self.interval, self.handle_stat).start()
        
        def engine_stopped(self):
            self.exit_code = True
    
        def handle_stat(self):
            stats = self.crawler.stats.get_stats()
            d = {
                'log_info': stats.get('log_count/INFO', 0), 
                'dequeued': stats.get('scheduler/dequeued/redis', 0),
                'log_warning': stats.get('log_count/WARNING', 0),
                'requested': stats.get('downloader/request_count', 0),
                'request_bytes': stats.get('downloader/request_bytes', 0),
                'response': stats.get('downloader/response_count', 0),
                'response_bytes': stats.get('downloader/response_bytes', 0),
                'response_200': stats.get('downloader/response_status_count/200', 0),
                'response_301': stats.get('downloader/response_status_count/301', 0),
                'response_404': stats.get('downloader/response_status_count/404', 0),
                'responsed': stats.get('response_received_count', 0),
                'item': stats.get('item_scraped_count', 0),
                'depth': stats.get('request_depth_max', 0),
                'filtered': stats.get('bloomfilter/filtered', 0),
                'enqueued': stats.get('scheduler/enqueued/redis', 0),
                'spider_name': self.crawler.spider.name
            }
            for key in self.cur_d:
                d[key], self.cur_d[key] = d[key] - self.cur_d[key], d[key]
            influxdb_d = {
                "measurement": "newspider",
                "time": datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),
                "tags": {
                    'spider_name': self.crawler.spider.name
                },
                "fields": d
            }
            if not self.client.write_points([influxdb_d]):
                raise Exception('写入influxdb失败!')
            self.stats_keys.update(stats.keys())
            if not self.exit_code:
                Timer(self.interval, self.handle_stat).start()
    

    代码应该不难理解,从settings.py中读取两个变量'INFLUXDB_PARAMS'、'INTERVAL',然后在引擎开始的时候开启一个定时器,每隔INTERVAL秒执行一次handle_stat函数,handle_stat函数的功能就是把crawler.stats.get_stats()这个字典写入到influxdb数据库里。接着只需要在配置文件中启用这个扩展即可,

    EXTENSIONS = {
        '项目名称.文件名称.SpiderStatLogging': 1,
        # 假设上面的代码都保存在extensions.py中,放在和settings.py同级目录,
        # 则可以写成:项目名称.extensions..SpiderStatLogging
    }
    

    展示数据库

    granfana我就不多说了,不懂的请百度,或者看一下我的上一篇博客再百度。

    图表json:https://lanzous.com/icrr5kb(太长就直接放网盘了,复制到grafana导入即可)

  • 相关阅读:
    日记1
    JDK、JRE、JVM三者间的关系
    线性表之二,SLINKLIST(单链表)类,模板类及C链表(增删改查,广义表
    线性表之一,SEQLIST(顺序表)类及其父类LIST,模板类及C结构体,包装顺序表
    PTA(中国人民解放军陆军工程大学数据结构,C语言)
    冒泡排序
    选择排序、堆排序
    冒泡排序,快速排序
    springMVC定时器
    MD5加密
  • 原文地址:https://www.cnblogs.com/kanadeblisst/p/12918723.html
Copyright © 2011-2022 走看看