zoukankan      html  css  js  c++  java
  • Python Scrapy Item Pipeline总结

    帅图!

    在这里插入图片描述

    @

    前戏

    Item Pipeline是项目管道,在item.py中定义的具体项目被爬虫提取出来之后,传给项目管道,进行最后的写入文件,保存至数据库等操作,需要在pipelines.py内编写具体的代码,然后在settings.py内开启指定的Item Pineline

    一个Pipeline类中的核心方法是process_item(self, item, spider),执行将项目写入的操作,返回item交给下一个级别的Pipeline继续处理,或者抛出DropItem异常,丢弃Item

    其他的方法还有:

    • open_spider(self, spider)
      • 当开启爬虫时要进行的操作,比如打开一个文件,或者连接数据库
    • close_spider(self, spider)
      • 当关闭爬虫时要进行的操作,比如关闭一个文件,或者关闭数据库连接
    • from_crawler(cls, crawler)
      • 用@classmethod标识的类方法,通过crawler参数可以拿到在Scrapy的所有组件,包括在settings内定义的全局变量

    下面总结一下Scrapy中最常用的几个Pipeline

    CsvWriterPipeline

    保存为CSV文件

    import csv
    
    
    class CsvWriterPipeline:
        
        def __init__(self):
            self.file = open('storage/national_bus_stations.csv', 'w', encoding='utf-8')
            self.my_writer = csv.writer(self.file)
    
        def open_spider(self, spider):
            pass
    
        def process_item(self, item, spider):
            self.my_writer.writerow(list(dict(item).values()))
            return item
    
        def close_spider(self, spider):
            self.file.close()
    

    CsvItemPipeline

    利用Scrapy的CsvItemExporter,保存为csv文件

    from scrapy.exporters CsvItemExporter
    
    
    class CsvItemPipeline:
    
        def __init__(self):
            self.file = open('data.csv', 'w')
            self.exporter = CsvItemExporter(self.file)
    
        def open_spider(self, spider):
            pass
    
        def process_item(self, item, spider):
            self.exporter.export_item(item)
            return item
    
        def close_spider(self, spider):
            self.file.close()
    

    JsonWriterPipeline

    保存为jsonline文件,每一行都是json格式

    import json
    
    from itemadapter import ItemAdapter
    
    
    class JsonWriterPipeline:
    
        def __init__(self):
            self.file = open('items.jl', 'w', encoding='utf-8')
    
        def open_spider(self, spider):
            pass
    
        def close_spider(self, spider):
            self.file.close()
    
        def process_item(self, item, spider):
            line = json.dumps(ItemAdapter(item).asdict()) + "
    "
            self.file.write(line)
            return item
    

    JsonItemLinePipeline

    利用Scrapy的JsonLinesItemExporter,将提取出的item一条一条地写入json文件

    from scrapy.exporters import JsonLinesItemExporter
    
    
    class JsonItemLinePipeline:
    
        def __init__(self):
            self.file = open('storage/national_bus_stations.jl', 'wb')
            self.exporter = JsonLinesItemExporter(self.file, encoding='utf-8')
    
        def open_spider(self, spider):
            pass
    
        def process_item(self, item, spider):
            self.exporter.export_item(item)
            return item
    
        def close_spider(self, spider):
            self.file.close()
    

    JsonItemPipeline

    利用Scrapy的JsonItemExporter,将提取出的item一次性全部写入json文件

    from scrapy.exporters import JsonItemExporter
    
    
    class JsonItemPipeline:
    
        def __init__(self):
            self.file = open('data.json', 'w')
            self.exporter = JsonItemExporter(self.file, encoding='utf-8')
    
        def open_spider(self, spider):
            self.exporter.start_exporting()
    
        def process_item(self, item, spider):
            self.exporter.export_item(item)
            return item
    
        def close_spider(self, spider):
            self.exporter.finish_exporting()
            self.file.close()
    

    MongoPipeline

    写入MongoDB

    import pymongo
    from itemadapter import ItemAdapter
    
    
    class MongoPipeline:
    
        def __init__(self, mongo_uri, mongo_db):
            self.mongo_uri = mongo_uri
            self.mongo_db = mongo_db
            self.client = None
            self.db = None
    
        @classmethod
        def from_crawler(cls, crawler):
            return cls(
                mongo_uri=crawler.settings.get('MONGO_URI'),
                mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
            )
    
        def open_spider(self, spider):
            self.client = pymongo.MongoClient(self.mongo_uri)
            self.db = self.client[self.mongo_db]
    
        def process_item(self, item, spider):
            self.db[item.collection].insert_one(ItemAdapter(item).asdict())
            return item
    
        def close_spider(self, spider):
            self.client.close()
    

    其中collection可以在item.py内定义,如

    from scrapy import Item, Field
    
    
    class BusStationItem(Item):
        
        table = collection = 'national_bus_stations'
        province_name = Field()
        city_name = Field()
        area_name = Field()
        line_name = Field()
        line = Field()
        station = Field()
    

    MysqlPipeline

    写入Mysql

    import pymysql
    
    
    class MysqlPipeline:
    
        def __init__(self, host, user, port, database, password):
            self.host = host
            self.user = user
            self.port = port
            self.database = database
            self.password = password
            self.db = None
            self.cursor = None
    
        @classmethod
        def from_crawler(cls, crawler):
            return cls(
                host=crawler.settings.get('MYSQL_HOST'),
                user=crawler.settings.get('MYSQL_USER'),
                port=crawler.settings.get('MYSQL_PORT'),
                database=crawler.settings.get('MYSQL_DATABASE'),
                password=crawler.settings.get('MYSQL_PASSWORD')
            )
    
        def open_spider(self, spider):
            self.db = pymysql.connect(self.host, self.user, self.password, self.database, self.port, charset='utf8')
            self.cursor = self.db.cursor()
    
        def process_item(self, item, spider):
            data = dict(item)
            keys = ','.join(data.keys())
            values = ','.join(['%s'] * len(data))
            sql = f'INSERT INTO {item.table} ({keys}) VALUES ({values})'
            self.cursor.execute(sql, tuple(data.values()))
            self.db.commit()
            return item
    
        def close_spider(self, spider):
            self.cursor.close()
            self.db.close()
    
    

    表table也和mongo的collection一样,在item.py内指定

    AsyncMysqlPipeline

    异步写入Mysql

    import logging
    
    import pymysql
    from twisted.enterprise import adbapi
    
    
    class AsyncMysqlPipeline:
    
        def __init__(self, db_params):
            self.db_params = db_params
            self.db_pool = None
    
        @classmethod
        def from_crawler(cls, crawler):
            return cls(
                db_params=dict(
                    host=crawler.settings.get('MYSQL_HOST'),
                    user=crawler.settings.get('MYSQL_USER'),
                    port=crawler.settings.get('MYSQL_PORT'),
                    database=crawler.settings.get('MYSQL_DATABASE'),
                    password=crawler.settings.get('MYSQL_PASSWORD'),
                    charset='utf8',
                    cursorclass=pymysql.cursors.DictCursor,
                    use_unicode=True
                )
            )
    
        def open_spider(self, spider):
            self.db_pool = adbapi.ConnectionPool('pymysql', **self.db_params)
            self.db_pool.runInteraction(self.do_truncate)
    
        def process_item(self, item, spider):
            """
            使用Twisted 异步插入Mysql
            """
            query = self.db_pool.runInteraction(self.do_insert, item)
            query.addErrback(self.handle_error)
            return item
    
        @staticmethod
        def handle_error(failure):
            """
            记录异常
            """
            logging.debug(failure)
    
        @staticmethod
        def do_insert(cursor, item):
            """
            执行具体的插入操作, 和同步MysqlPipeline一样
            """
            data = dict(item)
            keys = ','.join(data.keys())
            values = ','.join(['%s'] * len(data))
            sql = f'INSERT INTO {item.table} ({keys}) VALUES ({values})'
            cursor.execute(sql, tuple(data.values()))
    
        def close_spider(self, spider):
            self.db_pool.close()
    

    Mysql的相关变量在settings.py内统一配置

    # Mysql settings
    MYSQL_HOST = 'localhost'
    MYSQL_USER = 'root'
    MYSQL_PORT = 3306
    MYSQL_DATABASE = 'station'
    MYSQL_PASSWORD = 'Password123$'
    

    为了对接大数据集群,还有另外自创的两个Pipeline,分别是可以将数据写入远程服务器的RemoteJsonPipeline,和对接远程Flume的RemoteFlumePipeline

    RemoteJsonPipeline

    利用paramiko库与远程服务器建立ssh连接,使用sftp协议将数据写入远程json文件

    import paramiko
    from scrapy.exporters import JsonLinesItemExporter
    
    
    class RemoteJsonPipeline:
    
        def __init__(self, host, port, user, password, file_path):
            self.host = host
            self.port = port
            self.user = user
            self.password = password
            self.file_path = file_path
            self.client = paramiko.SSHClient()
            self.sftp = None
            self.file = None
            self.exporter = None
    
        @classmethod
        def from_crawler(cls, crawler):
            return cls(
                host=crawler.settings.get('SSH_HOST'),
                port=crawler.settings.get('SSH_PORT'),
                user=crawler.settings.get('SSH_USER'),
                password=crawler.settings.get('SSH_PASSWORD'),
                file_path=crawler.settings.get('FILE_PATH')
            )
    
        def open_spider(self, spider):
            self.client.load_system_host_keys()
            self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            self.client.connect(self.host, self.port, self.user, self.password)
            self.sftp = self.client.open_sftp()
            self.file = self.sftp.open(self.file_path + 'data.json', 'wb')
            self.exporter = JsonLinesItemExporter(self.file, encoding='utf-8')
    
        def process_item(self, item, spider):
            self.exporter.export_item(item)
            return item
    
        def close_spider(self, spider):
            self.file.close()
            self.client.close()
    

    其中的SSH变量在settings.py内统一配置:

    # SSH settings
    SSH_HOST = '172.16.1.2'
    SSH_PORT = 22
    SSH_USER = 'root'
    SSH_PASSWORD = 'passwd'
    FILE_PATH = '/opt/'
    

    RemoteFlumePipeline

    使用telnetlib库,通过telnet协议与远程服务器建立连接,往Flume监听的端口发送数据,Flume接受到数据后写入HDFS

    from telnetlib import Telnet
    
    
    class RemoteFlumePipeline:
        def __init__(self, host, port):
            self.host = host
            self.port = port
            self.tn = None
    
        @classmethod
        def from_crawler(cls, crawler):
            return cls(
                host=crawler.settings.get('FLUME_HOST'),
                port=crawler.settings.get('FLUME_PORT')
            )
    
        def open_spider(self, spider):
            self.tn = Telnet(self.host, self.port)
    
        def process_item(self, item, spider):
            text = str(item).replace('
    ', '').encode('utf-8') + b'
    '
            self.tn.write(text)
            return item
    
        def close_spider(self, spider):
            self.tn.close()
    

    Flume的相关变量在settings.py内统一配置

    # Flume settings
    FLUME_HOST = '172.16.1.2'
    FLUME_PORT = 10000
    
  • 相关阅读:
    新新人加入博客园
    C#通过第三方组件生成二维码(QR Code)和条形码(Bar Code)
    关于delphi 类的属性定义property方法
    从XML文件乱码问题,探寻其背后的原理
    Clang RecursiveASTVisitor & ASTFrontendActions based on it
    Clang FrontendActions
    C++ 学习笔记
    Clang Preprocessor 类的创建
    世上最伟大的十个公式
    RestEasy+用户指南第5章.@PathParam
  • 原文地址:https://www.cnblogs.com/pineapple-py/p/14252463.html
Copyright © 2011-2022 走看看