zoukankan      html  css  js  c++  java
  • Scrapy 扩展中间件: 同步/异步提交批量 item 到 MySQL

    0.参考

    https://doc.scrapy.org/en/latest/topics/item-pipeline.html?highlight=mongo#write-items-to-mongodb

     20180721新增:异步版本

    https://twistedmatrix.com/documents/15.3.0/core/howto/rdbms.html

    https://twistedmatrix.com/documents/18.7.0/api/twisted.python.failure.Failure.html

    https://twistedmatrix.com/documents/12.1.0/core/howto/time.html

    1.主要实现

    (1) 连接超时自动重连 MySQL server

    (2) 通过 item_list 收集 item,达到阈值后批量提交至数据库

    (3) 通过解析异常,自动移除存在异常的数据行,重新提交 item_list

    (4) shutdown 之前在 close_spider() 中提交当前 item_list

    (5) 20180721新增:异步版本

    2.同步版本

    保存至 /site-packages/my_pipelines.py

    from socket import gethostname
    import time
    import re
    from html import escape
    
    import pymysql
    pymysql.install_as_MySQLdb()
    from pymysql import OperationalError, InterfaceError, DataError, IntegrityError
    
    
    class MyMySQLPipeline(object):
    
        hostname = gethostname()
    
        def __init__(self, settings):
            self.mysql_host = settings.get('MYSQL_HOST', '127.0.0.1')
            self.mysql_port = settings.get('MYSQL_PORT', 3306)
            self.mysql_user = settings.get('MYSQL_USER', 'username')
            self.mysql_passwd = settings.get('MYSQL_PASSWD', 'password')
            self.mysql_reconnect_wait = settings.get('MYSQL_RECONNECT_WAIT', 60)
    
            self.mysql_db = settings.get('MYSQL_DB')
            self.mysql_charset = settings.get('MYSQL_CHARSET', 'utf8')  #utf8mb4
            self.mysql_item_list_limit = settings.get('MYSQL_ITEM_LIST_LIMIT', 30)
    
            self.item_list = []
    
    
        @classmethod
        def from_crawler(cls, crawler):
            return cls(
                settings = crawler.settings
            )
    
    
        def open_spider(self, spider):
            try:
                self.conn = pymysql.connect(
                    host = self.mysql_host,
                    port = self.mysql_port,
                    user = self.mysql_user,
                    passwd = self.mysql_passwd,
                    db = self.mysql_db,
                    charset = self.mysql_charset,
                )
            except Exception as err:
                spider.logger.warn('MySQL: FAIL to connect {} {}'.format(err.__class__, err))
                time.sleep(self.mysql_reconnect_wait)
                self.open_spider(spider)
            else:
                spider.logger.info('MySQL: connected')
    
                self.curs = self.conn.cursor(pymysql.cursors.DictCursor)
                spider.curs = self.curs
    
    
        def close_spider(self, spider):
            self.insert_item_list(spider)
            self.conn.close()
            spider.logger.info('MySQL: closed')
    
    
        def process_item(self, item, spider):
            self.item_list.append(item)
            if len(self.item_list) >= self.mysql_item_list_limit:
                self.insert_item_list(spider)
    
            return item
    
    
        def sql(self):
            raise NotImplementedError('Subclass of MyMySQLPipeline must implement the sql() method')
    
    
        def insert_item_list(self, spider):
            spider.logger.info('insert_item_list: {}'.format(len(self.item_list)))
            try:
                self.sql()
            except (OperationalError, InterfaceError) as err:
                # <class 'pymysql.err.OperationalError'> 
                    # (2013, 'Lost connection to MySQL server during query ([Errno 110] Connection timed out)')
                spider.logger.info('MySQL: exception {} err {}'.format(err.__class__, err))
                self.open_spider(spider)
                self.insert_item_list(spider)
            except Exception as err:
                if len(err.args) == 2 and isinstance(err.args[1], str):
                    # <class 'pymysql.err.DataError'> 
                        # (1264, "Out of range value for column 'position_id' at row 2")
                    # <class 'pymysql.err.InternalError'> 
                        # (1292, "Incorrect date value: '1977-06-31' for column 'release_day' at row 26")
                    m_row = re.search(r'ats+rows+(d+)$', err.args[1])
                    # <class 'pymysql.err.IntegrityError'> 
                        # (1048, "Column 'name' cannot be null") films 43894
                    m_column = re.search(r"Columns'(.+)'", err.args[1])
    
                    if m_row:
                        row = m_row.group(1)
                        item = self.item_list.pop(int(row) - 1)
                        spider.logger.warn('MySQL: {} {} exception from item {}'.format(err.__class__, err, item))
                        self.insert_item_list(spider)
                    elif m_column:
                        column = m_column.group(1)
                        item_list = []
                        for item in self.item_list:
                            if item[column] == None:
                                item_list.append(item)
                        for item in item_list:
                            self.item_list.remove(item)
                            spider.logger.warn('MySQL: {} {} exception from item {}'.format(err.__class__, err, item))
                        self.insert_item_list(spider)
                    else:
                        spider.logger.error('MySQL: {} {} unhandled exception from item_list: 
    {}'.format(
                                            err.__class__, err, self.item_list))
                else:
                    spider.logger.error('MySQL: {} {} unhandled exception from item_list: 
    {}'.format(
                                            err.__class__, err, self.item_list))
            finally:
                self.item_list.clear()

    3.调用方法

    Scrapy 项目 project_name

    MySQL 数据库 database_name, 表 table_name

    (1) 项目 pipelines.py 添加代码:

    from my_pipelines import MyMySQLPipeline
    
    
    class MySQLPipeline(MyMySQLPipeline):
    
        def sql(self):
            self.curs.executemany("""
                INSERT INTO table_name (
                    position_id, crawl_time)
                VALUES (
                    %(position_id)s, %(crawl_time)s)
                ON DUPLICATE KEY UPDATE
                    crawl_time=values(crawl_time)
            """, self.item_list)
    
            self.conn.commit()

    (2) 项目 settings.py 添加代码:

    # Configure item pipelines
    # See https://doc.scrapy.org/en/latest/topics/item-pipeline.html
    ITEM_PIPELINES = {
       # 'project_name.pipelines.ProxyPipeline': 300,
       'project_name.pipelines.MySQLPipeline': 301,   
    }
    
    MYSQL_HOST = '127.0.0.1'
    MYSQL_PORT = 3306
    MYSQL_USER = 'username'
    MYSQL_PASSWD ='password'
    MYSQL_RECONNECT_WAIT = 60
    
    MYSQL_DB = 'database_name'
    MYSQL_CHARSET = 'utf8'  #utf8mb4
    MYSQL_ITEM_LIST_LIMIT = 3  #100

    4.运行结果

    自动移除存在异常的数据行,重新提交 item_list:

    2018-07-18 12:35:52 [scrapy.core.scraper] DEBUG: Scraped from <200 http://httpbin.org/>
    {'position_id': 103, 'crawl_time': '2018-07-18 12:35:52'}
    2018-07-18 12:35:52 [scrapy.core.scraper] DEBUG: Scraped from <200 http://httpbin.org/>
    {'position_id': None, 'crawl_time': '2018-07-18 12:35:52'}
    2018-07-18 12:35:52 [scrapy.core.scraper] DEBUG: Scraped from <200 http://httpbin.org/>
    {'position_id': 104, 'crawl_time': '2018-02-31 17:51:47'}
    2018-07-18 12:35:55 [scrapy.core.engine] DEBUG: Crawled (200) <GET http://httpbin.org/> (referer: http://httpbin.org/)
    2018-07-18 12:35:55 [test] INFO: insert_item_list: 3
    2018-07-18 12:35:55 [test] WARNING: MySQL: <class 'pymysql.err.IntegrityError'> (1048, "Column 'position_id' cannot be null") exception from item {'position_id': None, 'crawl_time': '2018-07-18 12:35:52'}
    2018-07-18 12:35:55 [test] INFO: insert_item_list: 2
    2018-07-18 12:35:55 [test] WARNING: MySQL: <class 'pymysql.err.InternalError'> (1292, "Incorrect datetime value: '2018-02-31 17:51:47' for column 'crawl_time' at row 1") exception from item {'position_id': 104, 'crawl_time': '2018-02-31 17:51:47'}
    2018-07-18 12:35:55 [test] INFO: insert_item_list: 1
    2018-07-18 12:35:55 [scrapy.core.scraper] DEBUG: Scraped from <200 http://httpbin.org/>

    提交结果:

    在 self.item_list.append(item) 之后 添加代码 spider.logger.info('process_item: {}'.format(len(self.item_list))) 打印添加 item 后的当前 item_list 元素个数

    连续 yield 5个 item,累计3个则触发 insert,红框 insert 部分将会阻塞绿框中后续 yield 部分:

    5.异步版本

    (1) 保存至 /site-packages/my_pipelines.py

    # -*- coding: utf-8 -*-
    from socket import gethostname
    import time
    import re
    
    # https://twistedmatrix.com/documents/15.3.0/core/howto/rdbms.html
    #   twisted.enterprise.adbapi: Twisted RDBMS support
    from twisted.enterprise import adbapi
    import pymysql
    from pymysql import OperationalError, InterfaceError, DataError, InternalError, IntegrityError
    
    
    class MyMySQLPipeline(object):
    
        hostname = gethostname()
    
        def __init__(self, spider, settings):
            self.spider = spider
            
            self.dbpool = adbapi.ConnectionPool('pymysql',
                host = settings.get('MYSQL_HOST', '127.0.0.1'),
                port = settings.get('MYSQL_PORT', 3306),
                user = settings.get('MYSQL_USER', 'username'),
                passwd = settings.get('MYSQL_PASSWD', 'password'),
                db = settings.get('MYSQL_DB', 'test'),
                charset = settings.get('MYSQL_CHARSET', 'utf8'), #utf8mb4
                
                cursorclass = pymysql.cursors.DictCursor
            )
            self.mysql_reconnect_wait = settings.get('MYSQL_RECONNECT_WAIT', 60)
            self.mysql_item_list_limit = settings.get('MYSQL_ITEM_LIST_LIMIT', 30)
            self.item_list = []
    
    
        @classmethod
        def from_crawler(cls, crawler):
            return cls(
                spider = crawler.spider,
                settings = crawler.settings
            )
    
    
        def close_spider(self, spider):
            self._sql(list(self.item_list))
    
    
        def process_item(self, item, spider):
            self.item_list.append(item)
            
            if len(self.item_list) >= self.mysql_item_list_limit:
                spider.log('item_list: %s'%len(self.item_list))
                self._sql(list(self.item_list))
                self.item_list.clear()
    
            return item
    
    
        def sql(self, txn, item_list):
            raise NotImplementedError('Subclass of MyMySQLPipeline must implement the sql() method')
    
    
        def _sql(self, item_list, retrying=False):
            d = self.dbpool.runInteraction(self.sql, item_list)
            d.addCallback(self.handle_result, item_list)
            d.addErrback(self.handle_error, item_list, retrying)
            
    
        def handle_result(self, result, item_list):
            self.spider.logger.info('{} items inserted with retcode {}'.format(len(item_list), result))
    
    
        def handle_error(self, failure, item_list, retrying):
            # https://twistedmatrix.com/documents/18.7.0/api/twisted.python.failure.Failure.html
            # r = failure.trap(pymysql.err.InternalError)
    
            args = failure.value.args
            
            # <class 'pymysql.err.OperationalError'> (1045, "Access denied for user 'username'@'localhost' (using password: YES)")
            # <class 'pymysql.err.OperationalError'> (2013, 'Lost connection to MySQL server during query ([Errno 110] Connection timed out)')
            # <class 'pymysql.err.OperationalError'> (2003, "Can't connect to MySQL server on '127.0.0.1' ([WinError 10061] 由于目标计算机积极拒绝,无法连接。)")
            # <class 'pymysql.err.InterfaceError'> (0, '')    # after crawl started: sudo service mysqld stop
            if failure.type in [OperationalError, InterfaceError]:
                if not retrying:
                    self.spider.logger.info('MySQL: exception {} {} 
    {}'.format(
                                            failure.type, args, item_list))            
                    self.spider.logger.debug('MySQL: Trying to recommit in %s sec'%self.mysql_reconnect_wait)
                    
                    # self._sql(item_list)
                    # https://twistedmatrix.com/documents/12.1.0/core/howto/time.html
                    from twisted.internet import task
                    from twisted.internet import reactor
                    task.deferLater(reactor, self.mysql_reconnect_wait, self._sql, item_list, True)
                else:
                    self.spider.logger.warn('MySQL: exception {} {} 
    {}'.format(
                                            failure.type, args, item_list))
    
                return
    
            # <class 'pymysql.err.DataError'> (1264, "Out of range value for column 'position_id' at row 2")
            # <class 'pymysql.err.InternalError'> (1292, "Incorrect date value: '1977-06-31' for column 'release_day' at row 26")
            elif failure.type in [DataError, InternalError]:
                m_row = re.search(r'ats+rows+(d+)$', args[1])
                row = m_row.group(1)
                item = item_list.pop(int(row) - 1)
                self.spider.logger.warn('MySQL: {} {} exception from item {}'.format(failure.type, args, item))
    
                self._sql(item_list)
                return
                
            # <class 'pymysql.err.IntegrityError'> (1048, "Column 'name' cannot be null") films 43894
            elif failure.type in [IntegrityError]:    
                m_column = re.search(r"Columns'(.+)'", args[1])
                column = m_column.group(1)
                some_items = [item for item in item_list if item[column] is None]
                self.spider.logger.warn('MySQL: {} {} exception from some items: 
    {}'.format(
                                    failure.type, args, some_items))
    
                self._sql([item for item in item_list if item[column] is not None])
                return
            
            else:
                self.spider.logger.error('MySQL: {} {} unhandled exception from item_list: 
    {}'.format(
                                    failure.type, args, item_list))
    
                return

    (2) 项目 pipelines.py 添加代码:注意 dbpool.runInteraction 是自动提交的 transaction

    from my_pipelines import MyMySQLPipeline
    
    
    class MySQLPipeline(MyMySQLPipeline):
    
        def sql(self, txn, item_list):
            return txn.executemany("""
                INSERT INTO table_name (
                    position_id, crawl_time)
                VALUES (
                    %(position_id)s, %(crawl_time)s)
                ON DUPLICATE KEY UPDATE
                    crawl_time=values(crawl_time)
            """, item_list)

    (3) 项目 settings.py

    见上文同步版本 3(1) 

    (4) 运行结果

    在 self.item_list.append(item) 之后 添加代码 spider.logger.info('process_item: {}'.format(len(self.item_list))) 打印添加 item 后的当前 item_list 元素个数

    连续 yield 5个 item,累计3个则触发 insert,红框 insert 部分并不会阻塞绿框中后续 yield 部分:

    另外可见使用了连接池

  • 相关阅读:
    Beta冲刺置顶随笔
    Beta总结
    用户试用与调查报告
    Beta冲刺第七天
    Beta冲刺第六天
    Beta冲刺第五天
    Beta冲刺第四天
    Beta冲刺第三天
    Beta冲刺第二天
    爬虫基本操作
  • 原文地址:https://www.cnblogs.com/my8100/p/scrapy_middleware_mysql.html
Copyright © 2011-2022 走看看