zoukankan      html  css  js  c++  java
  • Item pipeline

    一个Item Pipeline 不需要继承特定基类,只需要实现某些特定方法,面向接口。

    class MyPipeline(object):
        def __init__(self):
            """
            可选实现,做参数初始化等
            """
    
        def process_item(self, item, spider):
            """
            该方法必须实现,每个item pipeline组件都需要调用该方法,
            该方法必须返回一个 Item 对象,被丢弃的item将不会被之后的pipeline组件所处理。
            :param item: 被爬取的item
            :param spider: 爬取该item的spider debug查看类属性
            :return:
            """
            return item
    
        def open_spider(self, spider):
            """
            可选实现,当spider被开启时,这个方法被调用。
            :param spider: 被开启的spider
            :return:
            """
    
        def close_spider(self, spider):
            """
            可选实现,当spider被关闭时,这个方法被调用
            :param spider: 被关闭的spider
            :return:
            """
    

      

    采用同步的机制写入数据:

    class MysqlPipeline(object):
        def __init__(self):
            pass
    
        def process_item(self, item, spider):
            if isinstance(item,InstanceItem):
                save(item)
            if spider.name == "spider_name":
                save(item)
    

      

    采用异步的机制写入代码

    class MysqlTwistedPipeline(object):
        # 采用异步的机制写入mysql
        def __init__(self, dbpool):
            self.dbpool = dbpool
    
        @classmethod
        def from_settings(cls, settings):
            """
            from_settings 激活pipeline之后,会自动调用该函数加载settings中的配置
            :param settings:
            :return:
            """
            dbparms = dict(
                host="127.0.0.1",  # settings["MYSQL_HOST"],
                db="spider",  # settings["MYSQL_DBNAME"],
                user="root",  # settings["MYSQL_USER"]
                password="root",  # settings["MYSQL_PASSWORD"],
                charset="utf8",
                use_unicode=True,  # 不然没办法保存中文
                cursorclass=cursors.DictCursor
            )
            db_pool = adbapi.ConnectionPool('pymysql', **dbparms)
            return cls(db_pool)
    
        def process_item(self, item, spider):
            ##使用twisted将mysql插入变成异步执行
            query = self.dbpool.runInteraction(self.do_insert, item)
            query.addErrback(self.handle_error, item, spider)  # 处理异常
    
        def handle_error(self, failure, item, spider):
            # 处理异步插入的异常
            print(failure)
    
        def do_insert(self, cursor, item):
            # 执行具体的插入
            # 根据不同的的item构建不同的sql语句插入到mysql中
            insert_sql, params = item.get_insert_sql()
    
            cursor.execute(insert_sql, params)
            # 自动commit
    

    数据库连接异常

    pymysql.err.InterfaceError: (0, '')
    

    原因:数据库操作对象实例未注销,但持有的数据库连接已失效,导致后续数据库操作无法进行。

    解决:在每次插入数据之前检测连接是否可用Connection.ping()。

    其实sqlalchemy就有这个处理,原生pymysql则需要自行处理。

    下面是代码

    from twisted.enterprise import adbapi
    from pymysql import cursors
    
    
    def get_db_pool():
        dbparms = dict(
            host=MYSQL_HOST,
            port=MYSQL_PORT,
            db=MYSQL_DBNAME,
            user=MYSQL_USER,
            password=MYSQL_PASSWORD,
            charset="utf8",
            use_unicode=True,  # 不然没办法保存中文
            cursorclass=cursors.DictCursor
        )
        db_pool = adbapi.ConnectionPool('pymysql', **dbparms)
        return db_pool
    
    
    class MysqlTwistedPipeline(object):
        # 采用异步的机制写入mysql
        def __init__(self, dbpool):
            self.dbpool = dbpool
    
        @classmethod
        def from_settings(cls, settings):
            """
            from_settings 激活pipeline之后,会自动调用该函数加载settings中的配置
            :param settings:
            :return:
            """
            db_pool = get_db_pool()
            return cls(db_pool)
    
        def process_item(self, item, spider):
            ##使用twisted将mysql插入变成异步执行
            query = self.dbpool.runInteraction(self.do_insert, item)
            query.addErrback(self.handle_error, item, spider)  # 处理异常
    
        def handle_error(self, failure, item, spider):
            # 处理异步插入的异常
            print(failure)
    
        def do_insert(self, cursor, item):
            # 执行具体的插入
            # 根据不同的的item构建不同的sql语句插入到mysql中
            conn = cursor.connection
            try:
                conn.ping()
            except:
                self.dbpool.close()
                self.dbpool = get_db_pool()
    
            insert_sql, params = item.get_insert_sql()
            cursor.execute(insert_sql, params)
    
  • 相关阅读:
    悲观锁乐观锁实战
    悲观锁
    乐观锁
    mysql数据库怎么设置乐观锁
    猴子吃桃问题
    算法题
    面试总结
    分布式系统理论(二):一致性协议Paxos
    职工工资管理
    79-WordSearch
  • 原文地址:https://www.cnblogs.com/zenan/p/8288485.html
Copyright © 2011-2022 走看看