zoukankan      html  css  js  c++  java
  • pymysql 连接池

    pymysql连接池

    import pymysql
    from DBUtils.PooledDB import PooledDB, SharedDBConnection
    '''
    连接池
    '''
    class MysqlPool(object):
    
        def __init__(self):
            self.POOL = PooledDB(
                creator=pymysql,  # 使用链接数据库的模块
                maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
                mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
                maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
                maxshared=3,
                # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
                blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
                maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
                setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
                ping=0,
                # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
                host='127.0.0.1',
                port=3306,
                user='root',
                password='root',
                database='test',
                charset='utf8'
            )
        def __new__(cls, *args, **kw):
            '''
            启用单例模式
            :param args:
            :param kw:
            :return:
            '''
            if not hasattr(cls, '_instance'):
                cls._instance = object.__new__(cls)
            return cls._instance
    
        def connect(self):
            '''
            启动连接
            :return:
            '''
            conn = self.POOL.connection()
            cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
            return conn, cursor
    
        def connect_close(self,conn, cursor):
            '''
            关闭连接
            :param conn:
            :param cursor:
            :return:
            '''
            cursor.close()
            conn.close()
    
        def fetch_all(self,sql, args):
            '''
            批量查询
            :param sql:
            :param args:
            :return:
            '''
            conn, cursor = self.connect()
    
            cursor.execute(sql, args)
            record_list = cursor.fetchall()
            self.connect_close(conn, cursor)
    
            return record_list
    
        def fetch_one(self,sql, args):
            '''
            查询单条数据
            :param sql:
            :param args:
            :return:
            '''
            conn, cursor = self.connect()
            cursor.execute(sql, args)
            result = cursor.fetchone()
            self.connect_close(conn, cursor)
    
            return result
    
        def insert(self,sql, args):
            '''
            插入数据
            :param sql:
            :param args:
            :return:
            '''
            conn, cursor = self.connect()
            row = cursor.execute(sql, args)
            conn.commit()
            self.connect_close(conn, cursor)
            return row

    操作

    #实例化
    mp=MysqlPool()
    '''
    查询单条
    '''
    username='admin'
    data = mp.fetch_one("select id,nickname from userinfo where user=%s",(username,))
    print(data)
    '''
    批量读取
    
    '''
    data = mp.fetch_all("select * from record where user_id=%s",(1,))
    print(data)
    '''
    插入数据
    '''
    
    data = mp.insert("insert into record(line,ctime,user_id)values(%s,%s,%s)",(22,'2019-11-11',1))
    print(data)
  • 相关阅读:
    React.render和reactDom.render的区别
    CSS中position的4种定位详解
    React.js入门必须知道的那些事
    JS处理事件小技巧
    React.js深入学习详细解析
    React.js实现原生js拖拽效果及思考
    Linux ./configure && make && make install 编译安装和卸载
    Redis set集合结构及命令详解
    Redis数据过期策略
    Redis TTL命令
  • 原文地址:https://www.cnblogs.com/huay/p/11562094.html
Copyright © 2011-2022 走看看