zoukankan      html  css  js  c++  java
  • DBUtils的使用

    一、问题引入

    如果是简单的数据库操作,使用pymysql这样就可以解决问题了:

    import pymysql
    
    conn = pymysql.connect(
        host='localhost',
        port=3306,
        user='xxx',
        password='xxx',
        database='test',
        charset='utf8'
    )
    cursor = conn.cursor()
    cursor.execute("select * from teacher")
    result = cursor.fetchall()
    cursor.close()
    conn.close()

      这样可以简单的完成一次查询,但是往往为了效率,会使用多进程或者多线程的方式,比如同时向数据库插入几百条或者查询多个结果,这样就会造成同时创建过多的连接,而数据库的最大连接数十有限制的,默认为200:

    mysql> show global variables like 'max_connections';
    +-----------------+-------+
    | Variable_name   | Value |
    +-----------------+-------+
    | max_connections | 200   |
    +-----------------+-------+
    1 row in set (0.00 sec)

    为了解决这个问题,可以引入数据库连接池。DBUtils可以很好的解决这个问题。

    二、DBUtils

    DBUtils是一套Python数据库连接池包,可为高并发访问数据库提供更好的性能服务,并允许对非线程安全的数据库接口进行线程安全包装。

    它有两种模式,分别为:

    • PersistentDB 
    • PooledDB 

    (一)PersistentDB 

    为每一个线程创建独有的连接,即使调用close方法也不会关闭,只是把连接放入到连接池中供后续线程继续使用,当线程结束后,连接也就会自动关闭。

    from DBUtils.PersistentDB import PersistentDB
    import pymysql
    
    pool = PersistentDB(
        pymysql,  #数据库连接模块
        maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表
        ping=0, # ping MySQL服务端,检查是否服务可用。如:0 = None = never, 1 = default = whenever it is requested, 
            2 = when a cursor is created, 4 = when a query is executed, 7 = always
    closeable=False, # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。
                  如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,
                  因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
    threadlocal=None, host='127.0.0.1', port=3306, user='root', password='123456', database='test', charset='utf8' )

    使用时直接调用连接池对象的connection方法获取连接即可。

    def index():
        conn = pool.connection(shareable=False)
        cursor = conn.cursor()
        cursor.execute('select * from table')
        result = cursor.fetchall()
        print(result)
        cursor.close()
        conn.close()

    (二)PooledDB 

    为所有的线程创建共享的连接,创建一批连接到连接池中,供所有的线程使用。

    import pymysql
    from DBUtils.PooledDB import PooledDB
    POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
        mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
        maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享。
        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
        ping=0,        # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 
                2 = when a cursor is created,
         4 = when a query is executed, 7 = always
    host='127.0.0.1', port=3306, user='root', password='123', database='test', charset='utf8' )

    使用时直接调用连接池对象的connection方法获取连接即可。

    def index():
        conn = pool.connection()
        cursor = conn.cursor()
        cursor.execute('select * from table')
        result = cursor.fetchall()
        conn.close()
    pip install DBUtils==1.3
    安装方式

    三、使用实例

    (一)方法一

    1、创建连接池实例

    import pymysql
    from DBUtils.PooledDB import PooledDB
    pool = PooledDB(
        creator=pymysql, 
        maxconnections=6,  
        mincached=2,  
        maxcached=5,  
        maxshared=3,  
        blocking=True, 
        maxusage=None, 
        setsession=[], 
        ping=0,
        host='127.0.0.1',
        port=3306,
        user='root',
        password='123',
        database='test',
        charset='utf8'
    )

    2、使用实例

     上面已经创建好了连接池的实例,现在只需要使用这个实例即可。

    class DBManger:
    
        @staticmethod
        def fetch_one(sql,args=None):
            conn = pool.connection()
            cursor = conn.cursor()
            if not args:
                cursor.execute(sql)
            cursor.execute(sql,args)
            result = cursor.fetchone()
            cursor.close()
            conn.close()
            return result

    如果调用它,只需要进行如下操作即可:

    def index():
        sql = 'select * from teacher where tid>%s'
        parames = 1
        result = DBManger.fetch_one(sql,(parames,))  #注意此处传参,可能传入的是字符串,也有可能是列表等(sql语句中含有in关键字)
        print(result)

    (二)方法二

    使用封装的方式进行调用:

    import pymysql
    from DBUtils.PooledDB import PooledDB
    
    
    class DBManger:
        __pool = None
    
        def __init__(self, creator=pymysql, maxconnections=200,mincached=10,maxcached=20,maxshared=3,
                     blocking=True, maxusage=None,setsession=None,host='127.0.0.1',port=3306,
                     user='root',password='123',database='test',charset='utf8'
                     ):
            """
            :param creator: 数据库连接模块
            :param maxconnections: 连接池允许的最大连接数,0和None表示不限制连接数
            :param mincached:连接池中空闲连接的初始数量
            :param maxcached:连接池中空闲连接的最大数量
            :param maxshared:共享连接的最大数量
            :param blocking:超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理
            :param maxusage:单个连接的最大重复使用次数
            :param setsession:开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
            :param ping: ping MySQL服务端,检查是否服务可用。# 如:0 = None = never
            :param host:主机
            :param port:端口号
            :param user:用户名
            :param password:密码
            :param database:数据库
            :param charset:编码
            """
            if not self.__pool:
                self.__pool = PooledDB(
                                        creator=creator,
                                        maxconnections=maxconnections,
                                        mincached=mincached,
                                        maxcached=maxcached,
                                        maxshared=maxshared,
                                        blocking=blocking,
                                        maxusage=maxusage,
                                        setsession=setsession,
                                        host=host,
                                        port=port,
                                        user=user,
                                        password=password,
                                        database=database,
                                        charset=charset,
                                        cursorclass=pymysql.cursors.DictCursor  #返回字典形式的列表
                                    )
            self._conn = None
            self._cursor = None
            self.__get_conn()
    
        def __get_conn(self):
            self._conn = self.__pool.connection()
            self._cursor = self._conn.cursor()
    
        def close(self):
            """释放连接给连接池"""
            self._cursor.close()
            self._conn.close()
    
        def get_one(self,sql,args=None):
            """
            获取一条数据
            :param sql:
            :param args:
            :return:
            """
            try:
                if not args:
                    self._cursor.excute(sql)
                print(sql,args)
                self._cursor.execute(sql,args)
                result = self._cursor.fetchone()
                self.close()
                return result
            except Exception as e:
                self.close()
    
        def get_many(self,sql,args=None):
            """
            获取多条数据
            :param sql:
            :param args:
            :return:
            """
            try:
                if not args:
                    self._cursor.excute(sql)
                self._cursor.execute(sql,args)
                result = self._cursor.fetchall()
                self.close()
                return result
            except Exception as e:
                self.close()
    
        def insert_one(self,sql,args=None):
            """
            插入单条数据
            :param sql:
            :param args:
            :return:
            """
            try:
                if not args:
                    self._cursor.excute(sql)
                count = self._cursor.execute(sql, args)
                self._conn.commit()
                self.close()
                return count
            except Exception as e:
                self._conn.rollback()
                self.close()
    
        def insert_many(self,sql,args=None):
            """
            插入多条数据
            :param sql:
            :param args:
            :return:
            """
            try:
                if not args:
                    self._cursor.executemany(sql)
                print(args)
                count = self._cursor.executemany(sql, args)
                self._conn.commit()
                self.close()
                return count
            except Exception as e:
                self._conn.rollback()
                self.close()
    
        def delete(self,sql,args=None):
            """
            删除数据
            :param sql:
            :param args:
            :return:
            """
            try:
                if not args:
                    self._cursor.execute(sql)
                count = self._cursor.execute(sql, args)
                self._conn.commit()
                self.close()
                return count
            except Exception as e:
                self._conn.rollback()
                self.close()
    
        def update(self,sql,args=None):
            """
            更新数据
            :param sql:
            :param args:
            :return:
            """
            try:
                if not args:
                    self._cursor.execute(sql)
                count = self._cursor.execute(sql, args)
                self._conn.commit()
                self.close()
                return count
            except Exception as e:
                self._conn.rollback()
                self.close()

    然后就可以进行调用了。

    def index():
        db = DBManger()
        #查询一条数据
        sql = 'select * from teacher where tid>%s'
        param = 1
        result1 = db.get_one(sql,(param,))
        print(result1) #{'tid': 2, 'tname': '王五'}
    
        #查询多条语句
        sql = 'select * from teacher where tid>%s'
        param = 1
        result2 = db.get_many(sql,(param,))
        print(result2) #[{'tid': 2, 'tname': '王五'}, {'tid': 3, 'tname': '赵六'}, {'tid': 4, 'tname': '张三'}]
    
        #插入一条数据
        sql = 'insert into teacher(tid,tname) values (%s,%s) '
        param = ((12,'李田')) #1
        result1 = db.insert_one(sql,param)
        print(result1) #{'tid': 2, 'tname': '王五'}
    
        #插入多条数据s
        sql = 'insert into teacher(tid,tname) values (%s,%s) '
        param = ((9,'刘大'),(10,'刘二'))
        result1 = db.insert_many(sql,param)
        print(result1) #2
    
        #删除数据
        sql = 'delete from teacher where tid>%s'
        param = (4,)
        result1 = db.delete(sql,param) #4 删除4条符合条件的数据
        print(result1) #2
    
        #更新数据
        sql = 'update teacher set tname=%s where tid>%s'
        param = ('kk',2,)
        result1 = db.update(sql,param) #2 将符合条件的两条语句进行了修改
        print(result1)
    
    if __name__ == '__main__':
        index()
  • 相关阅读:
    Android Xmpp协议讲解
    IOS 教程以及基础知识
    android的快速开发框架集合
    Android项目快速开发框架探索(Mysql + OrmLite + Hessian + Sqlite)
    afinal logoAndroid的快速开发框架 afinal
    Android 快速开发框架:ThinkAndroid
    2020.12.19,函数式接口,函数式编程,常用函数式接口,Stream流
    2020.12.18 网络编程基础,网络编程三要素,TCP通信,Socket类,ServerSocket
    2020.12.16,Properties,Buffer,InputStreamReader
    2020.12.15 IO流,字节流,字符流,流异常处理
  • 原文地址:https://www.cnblogs.com/shenjianping/p/13296452.html
Copyright © 2011-2022 走看看