zoukankan      html  css  js  c++  java
  • python之数据库连接池DBUtils

    DBUtils 是Python 的一个用于实现数据库连接池的模块。

    此连接池有两种连接模式:

        DBUtils :提供两种外部接口:

        PersistentDB :提供线程专用的数据库连接,并自动管理连接。

        PooledDB :提供线程间可共享的数据库连接,并自动管理连接。

    介绍

    PersistentDB模式

    为每个线程创建一个连接,线程即使调用了 close 方法,也不会关闭,只是把链接重新放到链接池,供自己线程再次使用,当线程终止时,链接自动关闭。

    from DBUtils.PersistentDB import PersistentDB
    import pymysql
    POOL = PersistentDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表。
        ping=0,  # ping MySQL服务端,检查是否服务可用。
        closeable=False,  # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
        threadlocal=None,  # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
        host='127.0.0.1',
        port=3306,
        user='root',
        password='123456',
        database='test',
        charset='utf8'
    )
    
    def func():
        conn = POOL.connection(shareable=False)
        cursor = conn.cursor()
        cursor.execute('select * from user')
        result = cursor.fetchall()
        print(result)
        cursor.close()
        conn.close()
    if __name__ == '__main__':
    
        func()

    PooledDB模式

    创建一批连接到连接池,供所有线程共享使用。

    import pymysql
    
    from DBUtils.PooledDB import PooledDB
    POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
        mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
        maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表。
        ping=0,  # ping MySQL服务端,检查是否服务可用。
        host='127.0.0.1',
        port=3306,
        user='root',
        password='123456',
        database='test',
        charset='utf8'
    )
    
    
    def func():
        # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则等待或报raise TooManyConnections异常
        # 否则则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
        # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
        # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
        # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
        conn = POOL.connection()
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        cursor.execute('select * from user')
        result = cursor.fetchall()
        print(result)
        conn.close()
    
    if __name__ == '__main__':
    
        func()

    工具类

    # TEST数据库信息
    DB_TEST_HOST = "127.0.0.1"
    DB_TEST_PORT = 3306
    DB_TEST_DBNAME = "1019"
    DB_TEST_USER = "root"
    DB_TEST_PASSWORD = "root"
    
    # 数据库连接编码
    DB_CHARSET = "utf8"
    
    # mincached : 启动时开启的闲置连接数量(缺省值 0 开始时不创建连接)
    DB_MIN_CACHED = 10
    
    # maxcached : 连接池中允许的闲置的最多连接数量(缺省值 0 代表不闲置连接池大小)
    DB_MAX_CACHED = 10
    
    # maxshared : 共享连接数允许的最大数量(缺省值 0 代表所有连接都是专用的)如果达到了最大数量,被请求为共享的连接将会被共享使用
    DB_MAX_SHARED = 20
    
    # maxconnecyions : 创建连接池的最大数量(缺省值 0 代表不限制)
    DB_MAX_CONNECYIONS = 100
    
    # blocking : 设置在连接池达到最大数量时的行为(缺省值 0 或 False 代表返回一个错误<toMany......>; 其他代表阻塞直到连接数减少,连接被分配)
    DB_BLOCKING = True
    
    # maxusage : 单个连接的最大允许复用次数(缺省值 0 或 False 代表不限制的复用).当达到最大数时,连接会自动重新连接(关闭和重新打开)
    DB_MAX_USAGE = 0
    
    # setsession : 一个可选的SQL命令列表用于准备每个会话,如["set datestyle to german", ...]
    DB_SET_SESSION = None
    db_config.py
    import pymysql
    from DBUtils.PooledDB import PooledDB
    import db_config as Config
    
    
    class ConnectionPool(object):
        __pool = None
    
        def __enter__(self):
            self.conn = self.__getConn()
            self.cursor = self.conn.cursor()
            print("数据库创建conn和cursor")
            return self
    
        def __getConn(self):
            if self.__pool is None:
                self.__pool = PooledDB(creator=pymysql, mincached=Config.DB_MIN_CACHED, maxcached=Config.DB_MAX_CACHED,
                                       maxshared=Config.DB_MAX_SHARED, maxconnections=Config.DB_MAX_CONNECYIONS,
                                       blocking=Config.DB_BLOCKING, maxusage=Config.DB_MAX_USAGE,
                                       setsession=Config.DB_SET_SESSION,
                                       host=Config.DB_TEST_HOST, port=Config.DB_TEST_PORT,
                                       user=Config.DB_TEST_USER, passwd=Config.DB_TEST_PASSWORD,
                                       db=Config.DB_TEST_DBNAME, use_unicode=False, charset=Config.DB_CHARSET)
            return self.__pool.connection()
    
        def __exit__(self, type, value, trace):
            """
             @summary: 释放连接池资源
             """
            self.cursor.close()
            self.conn.close()
            print("连接池释放conn和cursor")
    
        def getconn(self):
            '''
            从连接池中取出一个连接
            '''
            conn = self.__getConn()
            cursor = conn.cursor(pymysql.cursors.DictCursor)
            return cursor, conn
    
        def close(self):
            '''
            关闭连接归还给连接池
            '''
            self.cursor.close()
            self.conn.close()
            print("连接池释放conn和cursor")
    
    
    POOL = ConnectionPool()
    
    
    class MysqlHelper(object):
        mysql = None
    
        def __init__(self):
            self.db = POOL
    
        def __new__(cls, *args, **kwargs):
            if not hasattr(cls, 'inst'):
                cls.inst = super(MysqlHelper, cls).__new__(cls, *args, **kwargs)
            return cls.inst
    
        def selectall(self, sql='', param=()):
            '''
            查询所有
            '''
    
            try:
                cursor, conn = self.execute(sql, param)
                res = cursor.fetchall()
                self.close(cursor, conn)
                return res
            except Exception as e:
                print('selectall except   ', e.args)
                self.close(cursor, conn)
                return None
    
        def selectone(self, sql='', param=()):
            '''
            查询一条
            '''
            try:
                cursor, conn = self.execute(sql, param)
                res = cursor.fetchone()
                self.close(cursor, conn)
                return res
            except Exception as e:
                print('selectone except   ', e.args)
                self.close(cursor, conn)
                return None
    
        def insert(self, sql='', param=()):
            '''
            增加
            '''
            try:
                cursor, conn = self.execute(sql, param)
                print('============')
                _id = cursor.lastrowid
                print('_id   ', _id)
                conn.commit()
                self.close(cursor, conn)
                # 防止表中没有id返回0
                if _id == 0:
                    return True
                return _id
            except Exception as e:
                print('insert except   ', e.args)
                conn.rollback()
                self.close(cursor, conn)
                # self.conn.rollback()
                return 0
    
        def insertmany(self, sql='', param=()):
            '''
            增加多行
            '''
            cursor, conn = self.db.getconn()
            try:
                cursor.executemany(sql, param)
                conn.commit()
                self.close(cursor, conn)
                return True
            except Exception as e:
                print('insert many except   ', e.args)
                conn.rollback()
                self.close(cursor, conn)
                return False
    
        def delete(self, sql='', param=()):
            '''
            删除
            '''
            try:
                cursor, conn = self.execute(sql, param)
                self.close(cursor, conn)
                return True
            except Exception as e:
                print('delete except   ', e.args)
                conn.rollback()
                self.close(cursor, conn)
                return False
    
        def update(self, sql='', param=()):
            '''
            更新
            '''
            try:
                cursor, conn = self.execute(sql, param)
                self.close(cursor, conn)
                return True
            except Exception as e:
                print('update except   ', e.args)
                conn.rollback()
                self.close(cursor, conn)
                return False
    
        @classmethod
        def getInstance(self):
            if MysqlHelper.mysql == None:
                MysqlHelper.mysql = MysqlHelper()
            return MysqlHelper.mysql
    
        # 执行命令
        def execute(self, sql='', param=(), autoclose=False):
            cursor, conn = self.db.getconn()
            try:
                if param:
                    cursor.execute(sql, param)
                else:
                    cursor.execute(sql)
                conn.commit()
                if autoclose:
                    self.close(cursor, conn)
            except Exception as e:
                pass
            return cursor, conn
    
        def executemany(self, list=[]):
            '''
            # 执行多条命令
             '[{"sql":"xxx","param":"xx"}....]'
            '''
            cursor, conn = self.db.getconn()
            try:
                for order in list:
                    sql = order['sql']
                    param = order['param']
                    if param:
                        cursor.execute(sql, param)
                    else:
                        cursor.execute(sql)
                conn.commit()
                self.close(cursor, conn)
                return True
            except Exception as e:
                print('execute failed========', e.args)
                conn.rollback()
                self.close(cursor, conn)
                return False
    
        def close(self, cursor, conn):
            cursor.close()
            conn.close()
            print("PT连接池释放con和cursor")
    mysqlhelper
  • 相关阅读:
    Win RT Webview获取cookie
    c#代码片段新建(sinppet)
    wp8.1启动协议
    移动开源框架
    Web开发工具箱
    比较2个字符串相似度
    js的继承
    mvc4开篇之BundleConfig(1)
    职业规划历程
    Redis Cluster管理
  • 原文地址:https://www.cnblogs.com/zze46/p/10132026.html
Copyright © 2011-2022 走看看