一个Item Pipeline 不需要继承特定基类,只需要实现某些特定方法,面向接口。
class MyPipeline(object): def __init__(self): """ 可选实现,做参数初始化等 """ def process_item(self, item, spider): """ 该方法必须实现,每个item pipeline组件都需要调用该方法, 该方法必须返回一个 Item 对象,被丢弃的item将不会被之后的pipeline组件所处理。 :param item: 被爬取的item :param spider: 爬取该item的spider debug查看类属性 :return: """ return item def open_spider(self, spider): """ 可选实现,当spider被开启时,这个方法被调用。 :param spider: 被开启的spider :return: """ def close_spider(self, spider): """ 可选实现,当spider被关闭时,这个方法被调用 :param spider: 被关闭的spider :return: """
采用同步的机制写入数据:
class MysqlPipeline(object): def __init__(self): pass def process_item(self, item, spider): if isinstance(item,InstanceItem): save(item) if spider.name == "spider_name": save(item)
采用异步的机制写入代码
class MysqlTwistedPipeline(object): # 采用异步的机制写入mysql def __init__(self, dbpool): self.dbpool = dbpool @classmethod def from_settings(cls, settings): """ from_settings 激活pipeline之后,会自动调用该函数加载settings中的配置 :param settings: :return: """ dbparms = dict( host="127.0.0.1", # settings["MYSQL_HOST"], db="spider", # settings["MYSQL_DBNAME"], user="root", # settings["MYSQL_USER"] password="root", # settings["MYSQL_PASSWORD"], charset="utf8", use_unicode=True, # 不然没办法保存中文 cursorclass=cursors.DictCursor ) db_pool = adbapi.ConnectionPool('pymysql', **dbparms) return cls(db_pool) def process_item(self, item, spider): ##使用twisted将mysql插入变成异步执行 query = self.dbpool.runInteraction(self.do_insert, item) query.addErrback(self.handle_error, item, spider) # 处理异常 def handle_error(self, failure, item, spider): # 处理异步插入的异常 print(failure) def do_insert(self, cursor, item): # 执行具体的插入 # 根据不同的的item构建不同的sql语句插入到mysql中 insert_sql, params = item.get_insert_sql() cursor.execute(insert_sql, params) # 自动commit
数据库连接异常
pymysql.err.InterfaceError: (0, '')
原因:数据库操作对象实例未注销,但持有的数据库连接已失效,导致后续数据库操作无法进行。
解决:在每次插入数据之前检测连接是否可用Connection.ping()。
其实sqlalchemy就有这个处理,原生pymysql则需要自行处理。
下面是代码
from twisted.enterprise import adbapi from pymysql import cursors def get_db_pool(): dbparms = dict( host=MYSQL_HOST, port=MYSQL_PORT, db=MYSQL_DBNAME, user=MYSQL_USER, password=MYSQL_PASSWORD, charset="utf8", use_unicode=True, # 不然没办法保存中文 cursorclass=cursors.DictCursor ) db_pool = adbapi.ConnectionPool('pymysql', **dbparms) return db_pool class MysqlTwistedPipeline(object): # 采用异步的机制写入mysql def __init__(self, dbpool): self.dbpool = dbpool @classmethod def from_settings(cls, settings): """ from_settings 激活pipeline之后,会自动调用该函数加载settings中的配置 :param settings: :return: """ db_pool = get_db_pool() return cls(db_pool) def process_item(self, item, spider): ##使用twisted将mysql插入变成异步执行 query = self.dbpool.runInteraction(self.do_insert, item) query.addErrback(self.handle_error, item, spider) # 处理异常 def handle_error(self, failure, item, spider): # 处理异步插入的异常 print(failure) def do_insert(self, cursor, item): # 执行具体的插入 # 根据不同的的item构建不同的sql语句插入到mysql中 conn = cursor.connection try: conn.ping() except: self.dbpool.close() self.dbpool = get_db_pool() insert_sql, params = item.get_insert_sql() cursor.execute(insert_sql, params)