zoukankan      html  css  js  c++  java
  • Scrapy框架中的Pipeline组件

    简介

    在下图中可以看到items.py与pipeline.py,其中items是用来定义抓取内容的实体;pipeline则是用来处理抓取的item的管道
    2018-05-20_21-21-40.png
    Item管道的主要责任是负责处理有蜘蛛从网页中抽取的Item,他的主要任务是清晰、验证和存储数据。当页面被蜘蛛解析后,将被发送到Item管道,并经过几个特定的次序处理数据。每个Item管道的组件都是有一个简单的方法组成的Python类。获取了Item并执行方法,同时还需要确定是否需要在Item管道中继续执行下一步或是直接丢弃掉不处理。简而言之,就是通过spider爬取的数据都会通过这个pipeline处理,可以在pipeline中不进行操作或者执行相关对数据的操作。

    管道的功能

    1.清理HTML数据
    2.验证解析到的数据(检查Item是否包含必要的字段)
    3.检查是否是重复数据(如果重复就删除)
    4.将解析到的数据存储到数据库中

    Pipeline中的操作

    process_item(item, spider)
    每一个item管道组件都会调用该方法,并且必须返回一个item对象实例或raise DropItem异常。被丢掉的item将不会在管道组件进行执行。此方法有两个参数,一个是item,即要处理的Item对象,另一个参数是spider,即爬虫。
    此外,我们也可以在类中实现以下方法
    open_spider(spider)当spider执行的时候将调用该方法
    close_spider(spider)当spider关闭的时候将调用该方法

    定制自己的Pipeline组件:

    1.生成json数据

    class JsonWithEncodingPipeline(object):
        def __init__(self):
            self.file=codecs.open('article.json', 'w', encoding="utf-8")
        def process_item(self, item, spider):
            lines=json.dumps(dict(item), ensure_ascii=False) + '
    '
            self.file.write(lines)
            return item
        def spider_closed(self, spider):
            self.file.close()
    

    2.操作mysql关系数据库

    class MysqlPipeline(object):
        def __init__(self):
            self.conn=MySQLdb.connect('localhost', 'root', '*****', 'article_spider', charset="utf8", use_unicode=True)
            self.cursor=self.conn.cursor()
    
        def process_item(self, item, spider):
            insert_sql="""
                insert into article_items(title, url, url_object_id , create_date)
                VALUES(%s, %s, %s, %s)
            """
            self.cursor.execute(insert_sql, (item["title"], item["url"], item['url_object_id'], item["create_date"]))
            self.conn.commit()
    

    3.异步操作mysql关系数据库

    # 异步处理关系数据库
    class MysqlTwistedPipline(object):
        def __init__(self, dbpool):
            self.dbpool=dbpool
    
        @classmethod
        def from_settings(cls, settings):
            dbparms=dict(
                host=settings["MYSQL_HOST"],    #这里要在settings中事先定义好
                db=settings["MYSQL_DBNAME"],
                user=settings["MYSQL_USER"],
                passwd=settings["MYSQL_PASSWORD"],
                charset="utf8",
                cursorclass=MySQLdb.cursors.DictCursor,
                use_unicode=True,
            )
            dbpool=adbapi.ConnectPool("MySQLdb", **dbparms)
    
            return cls(dbpool)
    
        def process_item(self, item, spider):
        # 使用twisted将mysql插入变成异步执行
            query = self.dbpool.runInteraction(self.do_insert, item)
            query.addErrback(self.handle_error)
    
    
        def handle_error(self, failure, item, spider):
           #处理异步插入的异常
            print(failure)
    
        def do_insert(self, cursor, item):
            #执行具体的插入
            insert_sql = """
                        insert into article_items(title, url, url_object_id , create_date)
                        VALUES(%s, %s, %s, %s)
                    """
            self.cursor.execute(insert_sql, (item["title"], item["url"], item['url_object_id'], item["create_date"]))
    
    

    4.数据去重

    from scrapy.exceptions import DropItem
    
    class DuplicatesPipeline(object):
    
        def __init__(self):
            self.ids_seen = set()
    
        def process_item(self, item, spider):
            if item['id'] in self.ids_seen:
                raise DropItem("Duplicate item found: %s" % item)
            else:
                self.ids_seen.add(item['id'])
                return item
    

    使用组件

    # Configure item pipelines
    # See https://doc.scrapy.org/en/latest/topics/item-pipeline.html
    ITEM_PIPELINES = {
       # 'ArticleSpider.pipelines.ArticlespiderPipeline': 300,
      # 'scrapy.pipelines.images.ImagesPipeline': 1,
       'ArticleSpider.pipelines.MysqlPipeline': 1,
       # 'ArticleSpider.pipelines.JsonExporterPipeline': 2,
       # 'ArticleSpider.pipelines.ArticleImagePipeline': 1
    }
    

    每个pipeline后面有一个数值,这个数组的范围是0-1000,这个数值是这些在pipeline中定义的类的优先级,越小越优先。
    在异步处理数据库的时候会传递一个参数为后面的操作进行初始化,process_item()函数实际上是将处理的操作传回给这个_init__。

  • 相关阅读:
    英语中的一个月几天的表示法
    深圳梧桐山游记
    linux中创建文件和文件夹
    linux中~和/的区别
    linux中的--和-的区别
    linux中vi和vim的区别
    基本数据类型大小和范围
    洛谷 [AHOI2001]质数和分解
    codevs 1115 开心的金明--01背包
    codevs 1080 线段树练习--用树状数组做的
  • 原文地址:https://www.cnblogs.com/yunlambert/p/9064824.html
Copyright © 2011-2022 走看看