zoukankan      html  css  js  c++  java
  • scrapy数据处理

    Date: 2019-07-17

    Author: Sun

    本节要讲解的内容有:

    (1)scrapy item数据封装

    (2)scrapy管道处理

    (3) scrapy 数据持久化

    (4) scrapy下载项目文件和图片

    一. Scrapy Item数据封装

    ​ 爬取的主要目标就是从非结构性的数据源提取结构性数据,例如网页。 Scrapy spider可以以python的dict来返回提取的数据.虽然dict很方便,并且用起来也熟悉,但是其缺少结构性,容易打错字段的名字或者返回不一致的数据,尤其在具有多个spider的大项目中。

    ​ 为了定义常用的输出数据,Scrapy提供了 Item 类。 Item对象是种简单的容器,保存了爬取到得数据。 其提供了 类似于词典(dictionary-like)的API以及用于声明可用字段的简单语法。

    ​ items.py:数据结构模板文件。定义数据属性。

    声明Item

    Item使用简单的class定义语法以及 Field对象来声明。例如:

    import scrapy
    
    class Product(scrapy.Item):
        name = scrapy.Field()
        price = scrapy.Field()
        stock = scrapy.Field()
        last_updated = scrapy.Field(serializer=str)
    

    说明:

    ​ 熟悉Django的人一定会注意到Scrapy Item定义方式与Django Models很类似, 不过没有那么多不同的字段类型(Field type),更为简单。

    Item字段(Item Fields)

    ​ Field对象指明了每个字段的元数据(metadata)。例如下面例子中 last_updated 中指明了该字段的序列化函数。

    ​ 您可以为每个字段指明任何类型的元数据。 Field对象对接受的值没有任何限制。也正是因为这个原因,文档也无法提供所有可用的元数据的键(key)参考列表。 Field对象中保存的每个键可以由多个组件使用,并且只有这些组件知道这个键的存在。您可以根据自己的需求,定义使用其他的 Field键。 设置 Field对象的主要目的就是在一个地方定义好所有的元数据。 一般来说,那些依赖某个字段的组件肯定使用了特定的键(key)。您必须查看组件相关的文档,查看其用了哪些元数据键(metadata key)。

    ​ 需要注意的是,用来声明item的Field对象并没有被赋值为class的属性。 不过您可以通过 Item.fields 属性进行访问。

    与Item配合

    接下来以 下边声明的 Product item来演示一些item的操作。您会发现API和 dict API 非常相似。

    创建item

    >>> product = Product(name='Desktop PC', price=1000)
    >>> print product
    Product(name='Desktop PC', price=1000)
    

    获取字段的值

    >>> product['name']
    Desktop PC
    >>> product.get('name')
    Desktop PC
    
    >>> product['price']
    1000
    
    >>> product['last_updated']
    Traceback (most recent call last):
        ...
    KeyError: 'last_updated'
    
    >>> product.get('last_updated', 'not set')
    not set
    
    >>> product['lala'] # getting unknown field
    Traceback (most recent call last):
        ...
    KeyError: 'lala'
    
    >>> product.get('lala', 'unknown field')
    'unknown field'
    
    >>> 'name' in product  # is name field populated?
    True
    
    >>> 'last_updated' in product  # is last_updated populated?
    False
    
    >>> 'last_updated' in product.fields  # is last_updated a declared field?
    True
    
    >>> 'lala' in product.fields  # is lala a declared field?
    False
    
    

    设置字段的值

    >>> product['last_updated'] = 'today'
    >>> product['last_updated']
    today
    
    >>> product['lala'] = 'test' # setting unknown field
    Traceback (most recent call last):
        ...
    KeyError: 'Product does not support field: lala'
    

    获取所有获取到的值

    您可以使用 dict API 来获取所有的值:

    >>> product.keys()
    ['price', 'name']
    
    >>> product.items()
    [('price', 1000), ('name', 'Desktop PC')]
    

    扩展Item

    ​ 您可以通过继承原始的Item来扩展item(添加更多的字段或者修改某些字段的元数据)。

    例如:

    class DiscountedProduct(Product):
        discount_percent = scrapy.Field(serializer=str)
        discount_expiration_date = scrapy.Field()
    
    

    ​ 您也可以通过使用原字段的元数据,添加新的值或修改原来的值来扩展字段的元数据:

    class SpecificProduct(Product):
        name = scrapy.Field(Product.fields['name'], serializer=my_serializer)
    
    

    ​ 这段代码在保留所有原来的元数据值的情况下添加(或者覆盖)了 name 字段的 serializer

    Item对象

    class scrapy.item.Item([arg])

    ​ 返回一个根据给定的参数可选初始化的item。

    ​ Item复制了标准的 dict API。包括初始化函数也相同。Item唯一额外添加的属性是:fields一个包含了item所有声明的字段的字典,而不仅仅是获取到的字段。该字典的key是字段(field)的名字,值是Item声明中使用到的Field对象。

    字段(Field)对象

    • class scrapy.item.Field([arg])

      Field仅仅是内置的dict类的一个别名,并没有提供额外的方法或者属性。

      换句话说, Field对象完完全全就是Python字典(dict)。被用来基于类属性(class attribute)的方法来支持item声明语法。

    项目中应用

    新建一个文件bookitem.py

    from scrapy import Item,Field
    
    class BookItems(Item):
    
        name = Field()
        price = Field()
    

    定义了上述item后,就可以在spiders爬虫类中使用了

    import  scrapy
    from books.bookitem import BookItems
    
    class BooksSpider(scrapy.Spider):
        name = "start"
        start_urls = ["http://books.toscrape.com/"]
    
        def parse(self, response):
            for book in response.css('article.product_pod'):
                name = book.xpath('./h3/a/@title').extract_first()
                price = book.css('p.price_color::text').extract_first()
                item = BookItems()
                item['name'] = name
                item['price'] = price
                yield item
    
    

    ​ 由上述案例可以分析出,BookItems就是为了存储页面中待爬取页面元素类,方便存储,spiders层往pipeline层进行数据传输,并且起到了数据压缩目的。yield关键字是发射此item实例对象给pipeliine

    二、scrapy管道处理

    每个项目管道组件(有时称为“Item Pipeline”)是一个实现简单方法的Python类。他们接收一个项目并对其执行操作,还决定该项目是否应该继续通过流水线或被丢弃并且不再被处理。

    1 管道的典型用途:

    • 清理HTML数据
    • 验证抓取的数据(检查项目是否包含特定字段)
    • 检查重复(并删除)
    • 将抓取的数据存储在数据库中

    2. scrapy管道

    ​ pipelines.py*文件中定义我们的管道,其实一个管道实际上就是一个类,而这个类定义了一些方法(属性),用来处理我们传进类(管道)中的数据,在处理完以后,再返回被处理以后的数据。那么,多个管道合用,当然就是讲一个数据先后传进多个管道中处理,最后输出数据了。

    2.1 Item管道(Item Pipeline)

    • 主要负责处理有爬虫从网页中抽取的Item,主要任务是清洗、验证和存储数据。

    • 当页面被蜘蛛解析后,将被发送到Item管道,并经过几个特定的次序处理数据。

    • 每个Item管道的组件都是有一个简单的方法组成的Python类。

    • 它们获取了Item并执行它们的方法,同时还需要确定是否需要在Item管道中继续执行下一步或是直接丢弃掉不处理。

    2.2 Item管道主要函数

    (1). process_item(self, item, spider) —— 必须实现(也是用的最多的方法);

    每个 Item Pipeline 组件都需要调用该方法,这个方法必须返回一个 Item (或任何继承类)对象, 或是抛出 DropItem 异常,被丢弃的 item 将不会被之后的 pipeline 组件所处理

    需要传入的参数为:

    • item (Item 对象) : 被爬取的 item
    • spider (Spider 对象) : 爬取该 item 的 spider

    该方法会被每一个 item pipeline 组件所调用,process_item 必须返回以下其中的任意一个对象:

    • 一个 dict
    • 一个 Item 对象或者它的子类对象
    • 一个 Twisted Deferred 对象
    • 一个 DropItem exception;如果返回此异常,则该 item 将不会被后续的 item pipeline 所继续访问

    注意:该方法是Item Pipeline必须实现的方法,其它三个方法(open_spider/close_spider/from_crawler)是可选的方法

    (2).open_spider(self, spider) —— 非必需,为爬虫启动的时候调用;

    当 spider 被开启时,这个方法被调用。可以实现在爬虫开启时需要进行的操作,比如说打开一个待写入的文件,或者连接数据库等

    需要传入的参数:

    • spider (Spider 对象) : 被开启的 spider

    (3). close_spider(self, spider) —— 非必需, 为爬虫关闭的时候调用;

    当 spider 被关闭时,这个方法被调用。可以实现在爬虫关闭时需要进行的操作,比如说关闭已经写好的文件,或者关闭与数据库的连接

    需要传入的参数:

    • spider (Spider 对象) : 被关闭的 spider

    (4). from_crawler(cls, crawler) —— 非必需,也是在启动的时候调用,比 open_spider早。

    该类方法用来从 Crawler 中初始化得到一个 pipeline 实例;它必须返回一个新的 pipeline 实例;Crawler 对象提供了访问所有 Scrapy 核心组件的接口,包括 settings 和 signals

    需要传入的参数:

    • crawler (Crawler 对象) : 使用该管道的crawler

    3 编写自己的项目管道

    ​ 每个项目管道组件是一个Python类,必须实现以下方法:
    ​ process_item(self, item, spider)

    ​ 对于每个项目管道组件调用此方法。

    ​ process_item() 必须:返回一个带数据的dict,返回一个Item (或任何后代类)对象,返回一个Twisted Deferred或者raise DropItemexception。丢弃的项目不再由其他管道组件处理。

    参数:

    • item(Itemobject或dict) - 剪切的项目
    • Spider(Spider对象) - 抓取物品的爬虫

    另外,它们还可以实现以下方法:

    open_spider(self, spider)

    当爬虫打开时调用此方法。

    参数:

    • 爬虫(Spider对象) - 打开的爬虫

    close_spider(self, spider)
    当爬虫关闭时调用此方法。

    参数:

    • 爬虫(Spider对象) - 被关闭的爬虫

    from_crawler(cls, crawler)
    如果存在,则调用此类方法以从a创建流水线实例Crawler。它必须返回管道的新实例。Crawler对象提供对所有Scrapy核心组件(如设置和信号)的访问; 它是管道访问它们并将其功能挂钩到Scrapy中的一种方式。

    参数:

    • crawler(Crawlerobject) - 使用此管道的crawler

    实例:

    ​ 让我们来看看以下假设的管道,它调整 price那些不包括增值税(price_excludes_vat属性)的项目的属性,并删除那些不包含价格的项目:

    from scrapy.exceptions import DropItem
    
    class PricePipeline(object):
    
        vat_factor = 1.15
    
        def process_item(self, item, spider):
            if item['price']:
                if item['price_excludes_vat']:
                    item['price'] = item['price'] * self.vat_factor
                return item
            else:
                raise DropItem("Missing price in %s" % item)
    
    
    
    将项目写入JSON文件

    以下管道将所有抓取的项目(来自所有蜘蛛)存储到单个items.jl文件中,每行包含一个项目,以JSON格式序列化:

    import json
    
    class JsonWriterPipeline(object):
    
        def open_spider(self, spider):
            self.file = open('items.jl', 'wb')
    
        def close_spider(self, spider):
            self.file.close()
    
        def process_item(self, item, spider):
            line = json.dumps(dict(item)) + "\n"
            self.file.write(line)
            return item
    
    

    注意

    ​ JsonWriterPipeline的目的只是介绍如何编写项目管道。如果您真的想要将所有抓取的项目存储到JSON文件中,则应使用Feed导出。

    复制过滤器

    ​ 用于查找重复项目并删除已处理的项目的过滤器。假设我们的项目具有唯一的ID,但是我们的蜘蛛会返回具有相同id的多个项目

    from scrapy.exceptions import DropItem
    
    class DuplicatesPipeline(object):
    
        def __init__(self):
            self.ids_seen = set()
    
        def process_item(self, item, spider):
            if item['id'] in self.ids_seen:
                raise DropItem("Duplicate item found: %s" % item)
            else:
                self.ids_seen.add(item['id'])
                return item
    
    
    激活项目管道组件

    要激活项目管道组件,必须将其类添加到 ITEM_PIPELINES设置,类似于以下示例:

    ITEM_PIPELINES = {
        'myproject.pipelines.PricePipeline': 300,
        'myproject.pipelines.JsonWriterPipeline': 800,
    }
    
    

    优先级说明:

    ​ 您在此设置中分配给类的整数值确定它们运行的顺序:项目从较低值到较高值类。通常将这些数字定义在0-1000范围内。

    三、scrapy 数据持久化

    Scrapy的数据持久化,主要包括存储到数据库、json文件以及内置数据存储

    持久化流程:

    ​ 1.爬虫文件爬取到数据后,需要将数据封装到items对象中。
    ​ 2.使用yield关键字将items对象提交给pipelines管道进行持久化操作。
    ​ 3.在管道文件中的process_item方法中接收爬虫文件提交过来的item对象,然后编写持久化存储的代码将item对象中存储的数据进行持久化存储
    ​ 4.settings.py配置文件中开启管道

    1 写入不同格式的文件

    执行输出指定格式进行存储:将爬取到的数据写入不同格式的文件中进行存储
    ​ scrapy crawl 爬虫名称 -o xxx.json
    ​ scrapy crawl 爬虫名称 -o xxx.xml
    ​ scrapy crawl 爬虫名称 -o xxx.csv

    2 基于管道的持久化存储

    scrapy框架中已经为我们专门集成好了高效、便捷的持久化操作功能,我们直接使用即可。要想使用scrapy的持久化操作功能:

    ​ pipelines.py:管道文件。接收数据(items),进行持久化操作。

    (1)存储到JSON文件

    import json
    from scrapy.exceptions import DropItem
    
    
    class myPipeline(object):
        def __init__(self):
            self.file = open('test.json', 'wb')
    
        def process_item(self, item, spider):
            if item['title']:
                line = json.dumps(dict(item)) + '\n'
                self.file.write(line)
                return item
            else:
                raise DropItem("Missing title in %s" % item)
    
    

    (2) 存储到mongodb数据库

    import pymongo
    class myPipeline(object):
     def __init__(self):
        self.client = pymongo.MongoClient(host=settings['MONGO_HOST'], port=settings['MONGO_PORT'])
        self.db = self.client[settings['MONGO_DB']]
        # self.coll = self.db[settings['MONGO_COLL2']]
        self.chinacwa = self.db['chinacwa']
        self.iot = self.db['iot']
        self.ny135 = self.db['ny135']
        self.productprice = self.db['productprice']
        self.allproductprice = self.db['allproductprice']
    
     def process_item(self, item, spider):
        if isinstance(item, ChinacwaItem):
            try:
                if item['article_title']:
                    item = dict(item)
                    self.chinacwa.insert(item)
                    print("插入成功")
                    return item
            except Exception as e:
                spider.logger.exceptionn("")
    
    

    (3)基于MySQL数据库

    import pymysql
    class QiubaiproPipeline(object):
    
        conn = None
        cursor = None
        def open_spider(self,spider):
            print('开始爬虫')
            #链接数据库
            self.conn = pymysql.Connect(host='127.0.0.1',port=3306,user='root',password='123456',db='qiubai')
        #编写向数据库中存储数据的相关代码
        def process_item(self, item, spider):
            #1.链接数据库
            #2.执行sql语句
            sql = 'insert into qiubai values("%s","%s")'%(item['author'],item['content'])
            self.cursor = self.conn.cursor()
            try:
                self.cursor.execute(sql)
                self.conn.commit()
            except Exception as e:
                print(e)
                self.conn.rollback()
            #3.提交事务
    
            return item
        def close_spider(self,spider):
            print('爬虫结束')
            self.cursor.close()
            self.conn.close()
    
    

    (4)基于redis存储

    import redis
    
    class QiubaiproPipeline(object):
        conn = None
        def open_spider(self,spider):
            print('开始爬虫')
            self.conn = redis.Redis(host='127.0.0.1',port=6379)
    
        def process_item(self, item, spider):
            dict = {
                'author':item['author'],
                'content':item['content']
            }
            self.conn.lpush('data', dict)
            return item
    
    

    总结:

    ​ 有时候,为了更好的处理同一个pipeline来自于不同的spider情形,需要通过spider携带的参数和item携带的数据进行逻辑的分离和处理。

    四、下载项目文件和图片

    ​ Scrapy提供了一个 item pipeline,来下载属于某个特定项目的图片,比如,当你抓取产品时,也想把它们的图片下载到本地。

    ​ 这条管道,被称作图片管道,在 ImagesPipeline类中实现,提供了一个方便并具有额外特性的方法,来下载并本地存储图片:

    • 将所有下载的图片转换成通用的格式(JPG)和模式(RGB)
    • 避免重新下载最近已经下载过的图片 (md5sum)
    • 缩略图生成
    • 检测图像的宽/高,确保它们满足最小限制

    ​ 这个管道也会为那些当前安排好要下载的图片保留一个内部队列,并将那些到达的包含相同图片的项目连接到那个队列中。 这可以避免多次下载几个项目共享的同一个图片。

    ​ Pillow 是用来生成缩略图,并将图片归一化为JPEG/RGB格式,因此为了使用图片管道,你需要安装这个库。 Python Imaging Library(PIL) 在大多数情况下是有效的,但众所周知,在一些设置里会出现问题,因此我们推荐使用 Pillow 而不是PIL。

    ​ 依赖pillow库下载:

    ​ pip install pillow

    文件下载:

    ​ FilesPipeline

    图片下载:

    ImagesPipeline

    使用图片管道

    当使用ImagesPipeline ,典型的工作流程如下所示:

    1. 在一个爬虫里,你抓取一个项目,把其中图片的URL放入item类中的 image_url 字段内。

    2. 项目从爬虫内返回,进入项目管道。

    3. 当项目进入 ImagesPipeline,image_url内的URL将被Scrapy的调度器和下载器(这意味着调度器和下载器的中间件可以复用)安排下载,当优先级更高,会在其他页面被抓取前处理。项目会在这个特定的管道阶段保持“locker”的状态,直到完成图片的下载(或者由于某些原因未完成下载)。

    4. 当图片下载完,另一个组(images)将被更新到结构中。这个组将包含一个字典列表,其中包括下载图片的信息,比如下载路径、源抓取地址(从 image_url获得)和图片的校验码。images 列表中的图片顺序将和源 image_url保持一致。如果某个图片下载失败,将会记录下错误信息,图片也不会出现在 images 组中。

    图片存储

    ​ 图片存储在文件中(一个图片一个文件),并使用它们URL的 SHA1 hash 作为文件名。

    比如,对下面的图片URL:

    http://www.example.com/image.jpg
    
    

    它的 SHA1 hash 值为:

    3afec3b4765f8f0a07b78f98c07b83f013567a0a
    
    

    将被下载并存为下面的文件:

    <IMAGES_STORE>/full/3afec3b4765f8f0a07b78f98c07b83f013567a0a.jpg
    
    
    

    其中:

    • <IMAGES_STORE> 是定义在 IMAGES_STORE 设置里的文件夹

    • full 是用来区分图片和缩略图(如果使用的话)的一个子文件夹。

    图片失效

    ​ 图像管道避免下载最近已经下载的图片。使用 IMAGES_EXPIRES 设置可以调整失效期限,可以用天数来指定:

    # 90天的图片失效期限
    IMAGES_EXPIRES = 90
    
    
    
    缩略图片生成

    ​ 图片管道可以自动创建下载图片的缩略图。

    ​ 如下信息都在settings.py中设置

    ​ 为了使用这个特性,你需要设置 IMAGES_THUMBS字典,其关键字为缩略图名字,值为它们的大小尺寸。

    比如:

    IMAGES_THUMBS = {
        'small': (50, 50),
        'big': (270, 270),
    }
    
    
    

    当你使用这个特性时,图片管道将使用下面的格式来创建各个特定尺寸的缩略图:

    <IMAGES_STORE>/thumbs/<size_name>/<image_id>.jpg
    
    
    

    其中:

    • <size_name> 是 IMAGES_THUMBS字典关键字(smallbig ,等)
    • <image_id> 是图像url的 [SHA1 hash]

    例如使用 smallbig 缩略图名字的图片文件:

    <IMAGES_STORE>/full/63bbfea82b8880ed33cdb762aa11fab722a90a24.jpg
    <IMAGES_STORE>/thumbs/small/63bbfea82b8880ed33cdb762aa11fab722a90a24.jpg
    <IMAGES_STORE>/thumbs/big/63bbfea82b8880ed33cdb762aa11fab722a90a24.jpg
    
    
    

    第一个是从网站下载的完整图片。

    图片管道Pipeline:

    方案1:直接采用继承ImagesPipeline,发送下载图片url请求下载图片:

    import scrapy
    from scrapy.contrib.pipeline.images import ImagesPipeline
    from scrapy.exceptions import DropItem
    
    class MyImagesPipeline(ImagesPipeline):
    
        def get_media_requests(self, item, info):
            #for image_url in item['image_url']:
            yield scrapy.Request(item['image_url'])
    
    
    

    方案2:采用继承普通类方式进行下载图片

    定义普通类方式

    import scrapy
    import hashlib
    from urllib.parse import quote
    
    class ScreenshotPipeline(object):
        """Pipeline that uses Splash to render screenshot of
        every Scrapy item."""
    
        SPLASH_URL = "http://localhost:8050/render.png?url={}"
    
        def process_item(self, item, spider):
            encoded_item_url = quote(item["url"])
            screenshot_url = self.SPLASH_URL.format(encoded_item_url)
            request = scrapy.Request(screenshot_url)
            dfd = spider.crawler.engine.download(request, spider)
            dfd.addBoth(self.return_item, item)
            return dfd
    
        def return_item(self, response, item):
            if response.status != 200:
                # Error happened, return item.
                return item
    
            # Save screenshot to file, filename will be hash of url.
            url = item["url"]
            url_hash = hashlib.md5(url.encode("utf8")).hexdigest()
            filename = "{}.png".format(url_hash)
            with open(filename, "wb") as f:
                f.write(response.body)
    
            # Store filename in item.
            item["screenshot_filename"] = filename
            return item
    
    
  • 相关阅读:
    Notes about "Exploring Expect"
    Reuse Sonar Checkstyle Violation Report for Custom Data Analysis
    Eclipse带参数调试的方法
    MIT Scheme Development on Ubuntu
    Manage Historical Snapshots in Sonarqube
    U盘自动弹出脚本
    hg的常用配置
    Java程序员的推荐阅读书籍
    使用shared memory 计算矩阵乘法 (其实并没有加速多少)
    CUDA 笔记
  • 原文地址:https://www.cnblogs.com/sunBinary/p/11186588.html
Copyright © 2011-2022 走看看