zoukankan      html  css  js  c++  java
  • 【模块】:PyMySQL

    PyMySQL

    1、安装

    [root@localhost ~]# pip install PyMySQL

    2、初识

    创建数据表结构:

    mysql> CREATE TABLE `users` (
        ->     `id` int(11) NOT NULL AUTO_INCREMENT,
        ->     `email` varchar(255) COLLATE utf8_bin NOT NULL,
        ->     `password` varchar(255) COLLATE utf8_bin NOT NULL,
        ->     PRIMARY KEY (`id`)
        -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin
        -> AUTO_INCREMENT=1 ;
    Query OK, 0 rows affected (0.07 sec)

    使用示例:

    import pymysql.cursors
    
    # Connect to the database
    connection = pymysql.connect(host='192.168.1.134',
                                 port=3306,
                                 user='remote',
                                 password='tx_1234abc',
                                 db='Jefrey',
                                 charset='utf8mb4',
                                 cursorclass=pymysql.cursors.DictCursor)
    
    try:
        with connection.cursor() as cursor:   #增加
            # Create a new record
            sql = "INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)"
            cursor.execute(sql, ('webmaster@python.org', 'very-secret'))
    
        # connection is not autocommit by default. So you must commit to save your changes.
        connection.commit()   # 提交,不然数据库不生效
    
        with connection.cursor() as cursor:   #查询
            # Read a single record
            sql = "SELECT `id`, `password` FROM `users` WHERE `email`=%s"
            cursor.execute(sql, ('webmaster@python.org',))
            result = cursor.fetchone()
            print(result)
    finally:
        connection.close()
    
    # {'id':1, 'password': 'very-secret'}
    

    3、connection对象

    Representation of a socket with a mysql server.        与mysql建立socket

    The proper way to get an instance of this class is to call connect().

    Establish a connection to the MySQL database. Accepts several arguments:

    参数:

    class pymysql.connections.Connection(host=None, user=None, password='', database=None, port=0, unix_socket=None, charset='', sql_mode=None, read_default_file=None, conv=None, use_unicode=None, client_flag=0, cursorclass=<class 'pymysql.cursors.Cursor'>, init_command=None, connect_timeout=10, ssl=None, read_default_group=None, compress=None, named_pipe=None, no_delay=None, autocommit=False, db=None, passwd=None, local_infile=False, max_allowed_packet=16777216, defer_connect=False, auth_plugin_map={}, read_timeout=None, write_timeout=None, bind_address=None) 
    
    host – Host where the database server is located
    •user – Username to log in as
    •password – Password to use.
    •database – Database to use, None to not use a particular one.
    •port – MySQL port to use, default is usually OK. (default: 3306)
    •bind_address – When the client has multiple network interfaces, specify the interface from which to connect to the host. Argument can be a hostname or an IP address.
    •unix_socket – Optionally, you can use a unix socket rather than TCP/IP.
    •charset – Charset you want to use.
    •sql_mode – Default SQL_MODE to use.
    •read_default_file – Specifies my.cnf file to read these parameters from under the [client] section.
    •conv – Conversion dictionary to use instead of the default one. This is used to provide custom marshalling and unmarshaling of types. See converters.
    •use_unicode – Whether or not to default to unicode strings. This option defaults to true for Py3k.
    •client_flag – Custom flags to send to MySQL. Find potential values in constants.CLIENT.
    •cursorclass – Custom cursor class to use.
    •init_command – Initial SQL statement to run when connection is established.
    •connect_timeout – Timeout before throwing an exception when connecting. (default: 10, min: 1, max: 31536000)
    •ssl – A dict of arguments similar to mysql_ssl_set()’s parameters. For now the capath and cipher arguments are not supported.
    •read_default_group – Group to read from in the configuration file.
    •compress – Not supported
    •named_pipe – Not supported
    •autocommit – Autocommit mode. None means use server default. (default: False)
    •local_infile – Boolean to enable the use of LOAD DATA LOCAL command. (default: False)
    •max_allowed_packet – Max size of packet sent to server in bytes. (default: 16MB) Only used to limit size of “LOAD LOCAL INFILE” data packet smaller than default (16KB).
    •defer_connect – Don’t explicitly connect on contruction - wait for connect call. (default: False)
    •auth_plugin_map – A dict of plugin names to a class that processes that plugin. The class will take the Connection object as the argument to the constructor. The class needs an authenticate method taking an authentication packet as an argument. For the dialog plugin, a prompt(echo, prompt) method can be used (if no authenticate method) for returning a string from the user. (experimental)
    •db – Alias for database. (for compatibility to MySQLdb)
    •passwd – Alias for password. (for compatibility to MySQLdb)
    Parameters

     方法:

    autocommit_mode= None 
      specified autocommit mode. None means use server default.
    begin() 
      Begin transaction.
    close() 
      Send the quit message and close the socket
    commit() 
      Commit changes to stable storage
    cursor(cursor=None) 
      Create a new cursor to execute queries with
    ping(reconnect=True) 
      Check if the server is alive
    rollback() 
      Roll back the current transaction
    select_db(db) 
      Set current db
    show_warnings() 
      SHOW WARNINGS
    

     

    4、cursor对象

    class pymysql.cursors.Cursor(connection)

    This is the object you use to interact with the database.   cursor对象用于数据库交互

    Do not create an instance of a Cursor yourself. Call connections.Connection.cursor().  不要自己用Cursor生成实例,使用Connection.cursor()

    方法:

    callproc(procname, args=()) 
    Execute stored procedure procname with args
    procname – string, name of procedure to execute on server
    args – Sequence of parameters to use with procedure
    Returns the original args.
    
    close()     关闭
        Closing a cursor just exhausts all remaining data.
    
    execute(query, args=None)  #执行sql语句
        Execute a query  
    
        Parameters:
            •query (str) – Query to execute.  
            •args (tuple, list or dict) – parameters used with query. (optional)
     
        Returns:    Number of affected rows   #返回查询到的个数
        Return type:    int
        If args is a list or tuple, %s can be used as a placeholder in the query. If args is a dict, %(name)s can be used as a placeholder in the query.
    
    executemany(query, args) 
        Run several data against one query
    
        Parameters:
            •query – query to execute on server
            •args – Sequence of sequences or mappings. It is used as parameter.
     
        Returns:    Number of rows affected, if any.
        This method improves performance on multiple-row INSERT and REPLACE. Otherwise it is equivalent to looping over args with execute().
    
    fetchall() 
        Fetch all the rows
    
    fetchmany(size=None) 
        Fetch several rows
    
    fetchone() 
        Fetch the next row
    
    max_stmt_length= 1024000 
        Max statement size which executemany() generates.
        Max size of allowed statement is max_allowed_packet -       packet_header_size. Default value of max_allowed_packet is 1048576.
    
    mogrify(query, args=None) 
        Returns the exact string that is sent to the database by calling the     execute() method.
        This method follows the extension to the DB API 2.0 followed by Psycopg.
    
    setinputsizes(*args) 
        Does nothing, required by DB API.
    setoutputsizes(*args) Does nothing, required by DB API.

    其他

    class pymysql.cursors.SSCursor(connection) 
    Unbuffered Cursor, mainly useful for queries that return a lot of data, or for connections to remote servers over a slow network.
    
    Instead of copying every row of data into a buffer, this will fetch rows as needed. The upside of this is the client uses much less memory, and rows are returned much faster when traveling over a slow network or if the result set is very big.
    
    There are limitations, though. The MySQL protocol doesn’t support returning the total number of rows, so the only way to tell how many rows there are is to iterate over every row returned. Also, it currently isn’t possible to scroll backwards, as only the current row is held in memory.
    fetchall() 
    Fetch all, as per MySQLdb. Pretty useless for large queries, as it is buffered. See fetchall_unbuffered(), if you want an unbuffered generator version of this method.
    fetchall_unbuffered() 
    Fetch all, implemented as a generator, which isn’t to standard, however, it doesn’t make sense to return everything in a list, as that would use ridiculous memory for large result sets.
    fetchmany(size=None) 
    Fetch many
    fetchone() 
    Fetch next row
    read_next() 
    Read next row
    
    class pymysql.cursors.DictCursor(connection) 
    A cursor which returns results as a dictionary
    
    class pymysql.cursors.SSDictCursor(connection) 
    An unbuffered cursor, which returns results as a dictionary
    cursor其他对象

    5、execute与executemany使用及性能对比

     execute 执行单条sql语句

    源码:

    def execute(self, query, args=None):
        """Execute a query
        """
        while self.nextset():
            pass
    
        query = self.mogrify(query,
                             args)  # sql语句拼接,加上引号 select * from users WHERE email='webmaster@python.org' and password='very-secret' 
    
        result = self._query(query)  # 最终执行connection的query方法,统计匹配的rows返回
        self._executed = query  # 设置执行过的语句为query,查询的方法会用到
        return result

    示例:

    # cursor
    try:
        with connection.cursor() as cursor:
            # Create a new record
            sql = " select * from users WHERE email=%s and password=%s "
            rows_count = cursor.execute(sql, ('webmaster@python.org', 'very-secret'))
            print(rows_count)
            result = cursor.fetchone()
            print(result)
    finally:
       connection.close()
    
    # 12   查不到时为 0
    # {'id': 4, 'email': 'webmaster@python.org', 'password': 'very-secret'}
    

    ②  executemany 执行多条语句

    源码:

    def executemany(self, query, args):
        # type: (str, list) -> int
        """Run several data against one query """
        if not args:
            return
    
        m = RE_INSERT_VALUES.match(query)  # 正则匹配,暂忽略
        if m:
            q_prefix = m.group(1) % ()
            q_values = m.group(2).rstrip()
            q_postfix = m.group(3) or ''
            assert q_values[0] == '(' and q_values[-1] == ')'
            return self._do_execute_many(q_prefix, q_values, q_postfix, args,
                                         self.max_stmt_length,
                                         self._get_db().encoding)
    
        self.rowcount = sum(self.execute(query, arg) for arg in args)  # 重点!还是循环执行execute方法
        return self.rowcount= 1
    

    示例:

    # executemany
    try:
        with connection.cursor() as cursor:
            # Create a new record
            sql = " select * from users WHERE email=%s and password=%s "
            rows_count = cursor.executemany(sql, [('webmaster@python.org', 'very-secret'),('webmaster@python.org', 'test')])
            print(rows_count)
            result = cursor.fetchone()   # 查到的是后一条语句的第一条,一会儿剖析fitchone方法 fitchall会打印两次查询的所有
            print(result)
    
    finally:
       connection.close()
    
    # 26
    # {'id': 16, 'email': 'webmaster@python.org', 'password': 'test'}
    

    数据库各插入10000条数据,打印下执行时间 

    两者都共用了一个方法,感觉应该不会差很多(打脸)

    # 性能对比 环境python3.6
    try:
        # execute方法
        with connection.cursor() as cursor:
            # Create a new record
            start = time.time()
            sql = "INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)"
            for i in range(10000):
                rows_count = cursor.execute(sql,('webmaster@python.org','test'+str(i)))
            connection.commit()
            print('execut insert 10000 rows cost %s',time.time()-start)
    
        # executemany方法
        with connection.cursor() as cursor:
            # Create a new record
            start = time.time()
            sql = "INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)"
            # for i in range(10000):
            rows_count = cursor.executemany(sql, (('webmaster@python.org', 'test' + str(i)) for i in range(10000,20000)))
            connection.commit()
            print('executmany insert 10000 rows cost %s', time.time() - start)
    
        # executemany换成列表
        with connection.cursor() as cursor:
            # Create a new record
            start = time.time()
            sql = "INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)"
            li = []
            for i in range(20000,30000):
                li.append(('webmaster@python.org', 'test' + str(i)))
            rows_count = cursor.executemany(sql, li)
            connection.commit()
            print('executmany insert 10000 rows cost %s', time.time() - start)
    
    finally:
       connection.close()
    
    # execut insert 10000 rows cost %s 5.356306791305542
    # executmany insert 10000 rows cost %s 0.09076881408691406
    # executmany insert 10000 rows cost %s 0.08979249000549316

    注:什么情况!!速度差了50多倍,executmany速度感人呀

    6、fetchone、fetchmany、fetchall剖析

    fetchone 查询一条数据,为空时返回None

    源码:

    def fetchone(self):
        """Fetch the next row"""
        self._check_executed()       # 检查是否有sql语句执行过
        if self._rows is None or self.rownumber >= len(self._rows):
            return None      # _rows所有匹配项的集合,列表形式
        result = self._rows[self.rownumber]   # 返回第rownumber项数据
        self.rownumber += 1
        return result
    

    示例:

    # fetchone方法
    try:
        with connection.cursor() as cursor:   #查询
            # Read a single record
            sql = "SELECT `id`, `password` FROM `users` WHERE `email`=%s AND `password`=%s"
            cursor.execute(sql, ('webmaster@python.org','test1'))
            result = cursor.fetchone()
            print(result)
            result = cursor.fetchone()
            print(result)
    finally:
        connection.close()
    
    # {'id': 31, 'password': 'test1'}
    # {'id': 10031, 'password': 'test1'}
    

     fetchmany 查询指定条数数据,为空时返回()

    源码:

    def fetchmany(self, size=None):
        """Fetch several rows"""
        self._check_executed()
        if self._rows is None:
            return ()
        end = self.rownumber + (size or self.arraysize)
        result = self._rows[self.rownumber:end]
        self.rownumber = min(end, len(self._rows))
        return result
    

    示例:

    # fetchmany方法
    try:
        with connection.cursor() as cursor:   #查询
            # Read a single record
            sql = "SELECT `id`, `password` FROM `users` WHERE `email`=%s AND `password`=%s"
            cursor.execute(sql, ('webmaster@python.org','test1'))
            result = cursor.fetchmany(2)
            print(result)
            result = cursor.fetchmany(2)
            print(result)
    finally:
        connection.close()
    
    # [{'id': 31, 'password': 'test1'}, {'id': 10031, 'password': 'test1'}]
    # [{'id': 20031, 'password': 'test1'}, {'id': 30031, 'password': 'test1'}]
    

    fetchall 查询所有匹配数据,为空时返回()

    源码:

    def fetchall(self):
        """Fetch all the rows"""
        self._check_executed()
        if self._rows is None:
            return ()
        if self.rownumber:
            result = self._rows[self.rownumber:]
        else:
            result = self._rows
        self.rownumber = len(self._rows)
        return result
    

    示例:

    # fetchall方法
    try:
        with connection.cursor() as cursor:   #查询
            # Read a single record
            sql = "SELECT `id`, `password` FROM `users` WHERE `email`=%s AND `password`=%s"
            cursor.execute(sql, ('webmaster@python.org','test10000'))
            result = cursor.fetchall()
            print(result)
            result = cursor.fetchall()
            print(result)
    finally:
        connection.close()
    
    # [{'id': 70030, 'password': 'test10000'}, {'id': 120030, 'password': 'test10000'}, {'id': 150030, 'password': 'test10000'}]
    # []
    

    分析:三种方式,都是先获取到执行sql语句的所有结果,最后进行切片获取,所以三种方式的性能是类似的

    总结:增、删、改、查的使用不再赘述,传入相应的sql语句即可得到想要的结果

    完整的模块

    @util.singleton
    class PbxDB(object):
        '''
        数据库操作
        '''
        def __init__(self):
            self.mutex = threading.Lock()
            self.conn = None
            self.DB_CONFIG = PBX_DB_CONFIG
    
        def connect(self):
            '''
            连接数据库
            :return:
            '''
            try:
                self.conn = pymysql.connect(host=self.DB_CONFIG['host'],
                                            port=self.DB_CONFIG['port'],
                                            db=self.DB_CONFIG['db'],
                                            user=self.DB_CONFIG['user'],
                                            password=self.DB_CONFIG['passwd'],
                                            cursorclass=pymysql.cursors.DictCursor,             # 数据字典格式
                                            charset='utf8')
            except Exception as e:
                exe = traceback.format_exc()
                logging.error(exe)
    
        def reconnect(self):
            '''
            重新连接
            :return:
            '''
            try:
                self.conn.close()
            except Exception as e:
                pass
            finally:
                self.conn = None
                self.connect()
    
        def find(self, sql, *args):
            '''
            查询所有匹配项
            :param sql:
            :param args:
            :return:    list[{dict},{dict}] or tuple(空)
            '''
            database = []
            count = 0
            while count < 3:
                try:
                    self.mutex.acquire()
                    if not self.conn:
                        self.connect()
                    with self.conn.cursor() as cursor:
                        cursor.execute(sql, args)
                        database = cursor.fetchall()
                        self.conn.commit()
                    break
                except OperationalError as e:
                    self.reconnect()
                    count += 1
                except InterfaceError as  e:
                    self.reconnect()
                    count += 1
                except Exception as e:
                    logging.error('error[%s],sql[%s],*args[%s]', e, sql, args)
                    break
                finally:
                    self.mutex.release()
            return database
    
        def query(self, sql, *args):
            '''
            查询第一个匹配项
            :param sql:
            :param args:
            :return:  dict{} or None
            '''
            database = None
            count = 0
            while count < 3:
                try:
                    self.mutex.acquire()
                    if not self.conn:
                        self.connect()
                    with self.conn.cursor() as cursor:
                        cursor.execute(sql, args)
                        database = cursor.fetchone()
                        self.conn.commit()
                    break
                except OperationalError as e:
                    self.reconnect()
                    count += 1
                except InterfaceError as  e:
                    self.reconnect()
                    count += 1
                except Exception as e:
                    logging.error('error[%s],sql[%s],*args[%s]', e, sql, args)
                    break
                finally:
                    self.mutex.release()
            return database
    
        def __getattr__(self, item):
            if item in ('update','insert','delete'):
                setattr(self,item,self.__execute)
                return getattr(self,item)
    
        def __execute(self, sql, *args):
            '''
            执行update、insert、delete操作
            :param sql:
            :param args:
            :return:
            '''
            count = 0
            while count < 3:
                try:
                    self.mutex.acquire()
                    if not self.conn:
                        self.connect()
                    with self.conn.cursor() as cursor:
                        cursor.execute(sql, args)
                        self.conn.commit()
                    break
                except OperationalError as e:
                    self.reconnect()
                    count += 1
                except InterfaceError as  e:
                    self.reconnect()
                    count += 1
                except Exception as e:
                    logging.error('error[%s],sql[%s],*args[%s]', e, sql, args)
                    break
                finally:
                    self.mutex.release()
            return
    

      

  • 相关阅读:
    JVM笔记3-java内存区域之运行时常量池
    JVM笔记2-Java虚拟机内存管理简介
    JVM笔记1-内存溢出分析问题与解决
    ActiveMq笔记1-消息可靠性理论
    python基础学习16----模块
    python基础学习15----异常处理
    在windows下搭建汇编编程环境
    python基础学习14----正则表达式
    python基础学习13----生成器&迭代器
    python基础学习12----装饰器
  • 原文地址:https://www.cnblogs.com/lianzhilei/p/7267648.html
Copyright © 2011-2022 走看看