zoukankan      html  css  js  c++  java
  • python 数据库连接池

    数据库连接池:
    from DBUtils.PooledDB import PooledDB
    import pymysql

    Release_Write_database_setting = {
        "host": '192.168.32.6',
        "port": 31306,
        "user": 'root',
        "password": 'xxx',
        "database": 'tornado',
        "charset": 'utf8',
        "cursorclass": pymysql.cursors.DictCursor
    }

    Release_Query_database_setting = {
        "host": '192.168.32.8',
        "port": 31306,
        "user": 'root',
        "password": 'xxx',
        "database": 'tornado',
        "charset": 'utf8',
        "cursorclass": pymysql.cursors.DictCursor
    }

    Development_Write_database_setting = {
        "host" : '192.168.185.4',
        "port" : 31306,
        "user" : 'root',
        "password" : 'xxx',
        "database" :  'tornado20200708quan',
        # "database" :  'tornado',
        "charset" : 'utf8',
        "cursorclass" : pymysql.cursors.DictCursor
    }

    Development_Query_database_setting = {
        "host" : '192.168.185.5',
        "port" : 31306,
        "user" : 'root',
        "password" : 'xxx',
        "database" :  'tornado20200708quan',
        "charset" : 'utf8',
        "cursorclass" : pymysql.cursors.DictCursor
    }
    # 发布时更改
    Query_database_setting = Development_Query_database_setting
    Write_database_setting = Development_Write_database_setting


    class Singleton(type):
        _instances = {}

        def __call__(cls, *args, **kwargs):
            if cls not in cls._instances:
                cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
                return cls._instances[cls]

    class DBpool():
        __metaclass__ = Singleton
        _pool = None
        def __init__(self):
            # 密码、 用户名相关 等
            self._pool = PooledDB(
                creator=pymysql,
                mincached=1,
                maxcached=20,
                **Query_database_setting
            )
            self._conn = None
            self._cursor = None
            self.getCoon()

        def getCoon(self):
            return self._pool.connection()


        # def getCoon(self):
        #     self._conn = self._pool.connection()
        #     self._cursor = self._conn.cursor()
        #     # return self._pool.connection()

        # def execut(self, sql, param=()):
        #     count = self._cursor.execute(sql, param)
        #     return count

    参考 1:普通数据库连接 对比 数据库连接池
     

    不用数据库连接池的写法:

    import MySQLdb
    conn= MySQLdb.connect(host='localhost',user='root',passwd='pwd',db='myDB',port=3306)
    cur=conn.cursor()
    SQL="select * from table1"
    r=cur.execute(SQL)
    r=cur.fetchall()
    cur.close()
    conn.close()
     

    使用 PooledDB

    from DBUtils.PooledDB import PooledDB
    import pymysql
    import xlwt
    import os
    import threading
    import zipfile


    class CreateExcel(object):
    """
    查询数据库,并生成excel 文档
    """
    def __init__(self, mysql_info):
    self.mysql_info = mysql_info
    self.pool = PooledDB(pymysql, 5, host=self.mysql_info['host'], user=self.mysql_info['user'],
    password=self.mysql_info['password'], db=self.mysql_info['db'], port=self.mysql_info['port'],
    charset='utf-8')
    连接参数定义:

    1. mincached,最少的空闲连接数,如果空闲连接数小于这个数,pool会创建一个新的连接
    2. maxcached,最大的空闲连接数,如果空闲连接数大于这个数,pool会关闭空闲连接
    3. maxconnections,最大的连接数,
    4. blocking,当连接数达到最大的连接数时,在请求连接的时候,如果这个值是True,请求连接的程序会一直等待,直到当前连接数小于最大连接数,如果这个值是False,会报错,
    5. maxshared 当连接数达到这个数,新请求的连接会分享已经分配出去的连接
     

    这里的 5  应该是默认的连接数吧

    在uwsgi中,每个http请求都会分发给一个进程,连接池中配置的连接数都是一个进程为单位的(即上面的最大连接数,都是在一个进程中的连接数),而如果业务中,一个http请求中需要的sql连接数不是很多的话(其实大多数都只需要创建一个连接),配置的连接数配置都不需要太大。
    连接池对性能的提升表现在:
    1.在程序创建连接的时候,可以从一个空闲的连接中获取,不需要重新初始化连接,提升获取连接的速度
    2.关闭连接的时候,把连接放回连接池,而不是真正的关闭,所以可以减少频繁地打开和关闭连接
    参考 2 : PooledDB参数解释
     

    版本环境 python 3.7 DBUtils 1.3 mysqlclient 1.4.6 连接池初始化

    pool = PooledDB(creator=MySQLdb, mincached=0, maxcached=0,
    maxshared=0, maxconnections=0,
    blocking=False,maxusage=None,
    setsession=None, reset=True,
    failures=None, ping=1,
    *args, **kwargs)
    参数说明 creator

    #creator => 任何符合DB-API 2.0规范的函数或者兼容的数据库模块
    mincached

    #mincached => 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached

    #maxcached => 链接池中最大闲置的链接数(0和None不限制)
    maxshared

    #maxshared => maximum number of shared connections
    (0 or None means all connections are dedicated)
    When this maximum number is reached, connections are
    shared if they have been requested as shareable
    maxconnections

    #maxconnections => 允许的最大链接数(0或None表示不限制)
    blocking

    #blocking => 链接池没有可用链接后,是否阻塞等待。
    True表示阻塞等待,直到获取到链接;
    False不等待,抛异常退出
    maxusage

    #maxusage => 同一个链接最多被重复使用的次数(0和None表示无限制)
    setsession

    #setsession => 可选的会话命令:开始会话前执行的命令列表。
    例如["set datestyle to…","set time zone…"]
    reset

    #reset => 当连接放回池中时,重置连接的方式,默认为True。
    False或者None表示使用begin()开启了事务的链接,会执行回滚;
    安全起见,建议使用True,当为True时表示所有链接都执行回滚操作
    failures

    #failures => 当默认的(OperationalError,InternalError)异常不能满足要求时,
    可以自定义抛出异常:默认为None;
    自定义为传入的为tuple或者issubclass(failures, Exception)
    ping

    #ping => 检查连接是否仍然处于活动状态的方式
    0 = None = never,
    1 = default = whenever fetched from the pool,
    2 = when a cursor is created,
    4 = when a query is executed,
    7 = always, and all other bit combinations of these values
    args, kwargs

    #args, kwargs => 传递给creator的参数
    使用

    # -*- coding: utf-8 -*-
    # @Time : 2020/1/26 0026 20:28
    # @Email : lofish@foxmail.com(撸小鱼)

    import MySQLdb
    import MySQLdb.cursors
    from DBUtils.PooledDB import PooledDB
    import datetime


    class DbManager(object):

    def __init__(self, host, port, db_name, user_name, password):
    cmds = ["set names utf8mb4;"]
    conn_args = {'host': host,
    'port': port,
    'db': db_name,
    'user': user_name,
    'passwd': password,
    'charset': 'utf8',
    'cursorclass': MySQLdb.cursors.DictCursor
    }
    # 初始化时,链接池中至少创建的空闲的链接,0表示不创建,mincached: 5
    # 链接池中最大闲置的链接数(0和None不限制): 20
    self._pool = PooledDB(MySQLdb, mincached=5, maxcached=20, setsession=cmds, **conn_args)

    def connection(self):
    return self._pool.connection()


    _db_manager = None


    def create_db_manager(host, port, dbname, username, password):
    global _db_manager
    if _db_manager is None:
    _db_manager = DbManager(host, port, dbname, username, password)
    return _db_manager
     

    参考 3 : 样例代码(采纳)
     

    python使用dbutils的PooledDB连接池,操作数据库
    1、使用dbutils的PooledDB连接池,操作数据库。

    这样就不需要每次执行sql后都关闭数据库连接,频繁的创建连接,消耗时间

    2、如果是使用一个连接一直不关闭,多线程下,插入超长字符串到数据库,运行一段时间后很容易出现OperationalError: (2006, ‘MySQL server has gone away’)这个错误。

    使用PooledDB解决。

    # coding=utf-8
    """
    使用DBUtils数据库连接池中的连接,操作数据库
    OperationalError: (2006, ‘MySQL server has gone away’)
    """
    import json
    import pymysql
    import datetime
    from DBUtils.PooledDB import PooledDB
    import pymysql


    class MysqlClient(object):
    __pool = None;

    def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,
    maxusage=100, setsession=None, reset=True,
    host='127.0.0.1', port=3306, db='test',
    user='root', passwd='123456', charset='utf8mb4'):
    """

    :param mincached:连接池中空闲连接的初始数量
    :param maxcached:连接池中空闲连接的最大数量
    :param maxshared:共享连接的最大数量
    :param maxconnections:创建连接池的最大数量
    :param blocking:超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理
    :param maxusage:单个连接的最大重复使用次数
    :param setsession:optional list of SQL commands that may serve to prepare
    the session, e.g. ["set datestyle to ...", "set time zone ..."]
    :param reset:how connections should be reset when returned to the pool
    (False or None to rollback transcations started with begin(),
    True to always issue a rollback for safety's sake)
    :param host:数据库ip地址
    :param port:数据库端口
    :param db:库名
    :param user:用户名
    :param passwd:密码
    :param charset:字符编码
    """

    if not self.__pool:
    self.__class__.__pool = PooledDB(pymysql,
    mincached, maxcached,
    maxshared, maxconnections, blocking,
    maxusage, setsession, reset,
    host=host, port=port, db=db,
    user=user, passwd=passwd,
    charset=charset,
    cursorclass=pymysql.cursors.DictCursor
    )
    self._conn = None
    self._cursor = None
    self.__get_conn()

    def __get_conn(self):
    self._conn = self.__pool.connection();
    self._cursor = self._conn.cursor();

    def close(self):
    try:
    self._cursor.close()
    self._conn.close()
    except Exception as e:
    print e

    def __execute(self, sql, param=()):
    count = self._cursor.execute(sql, param)
    print count
    return count

    @staticmethod
    def __dict_datetime_obj_to_str(result_dict):
    """把字典里面的datatime对象转成字符串,使json转换不出错"""
    if result_dict:
    result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime.datetime)}
    result_dict.update(result_replace)
    return result_dict

    def select_one(self, sql, param=()):
    """查询单个结果"""
    count = self.__execute(sql, param)
    result = self._cursor.fetchone()
    """:type result:dict"""
    result = self.__dict_datetime_obj_to_str(result)
    return count, result

    def select_many(self, sql, param=()):
    """
    查询多个结果
    :param sql: qsl语句
    :param param: sql参数
    :return: 结果数量和查询结果集
    """
    count = self.__execute(sql, param)
    result = self._cursor.fetchall()
    """:type result:list"""
    [self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
    return count, result

    def execute(self, sql, param=()):
    count = self.__execute(sql, param)
    return count

    def begin(self):
    """开启事务"""
    self._conn.autocommit(0)

    def end(self, option='commit'):
    """结束事务"""
    if option == 'commit':
    self._conn.autocommit()
    else:
    self._conn.rollback()


    if __name__ == "__main__":
    mc = MysqlClient()
    sql1 = 'SELECT * FROM shiji WHERE id = 1'
    result1 = mc.select_one(sql1)
    print json.dumps(result1[1], ensure_ascii=False)

    sql2 = 'SELECT * FROM shiji WHERE id IN (%s,%s,%s)'
    param = (2, 3, 4)
    print json.dumps(mc.select_many(sql2, param)[1], ensure_ascii=False)
     

    参考 4 : 样例代码
     

    #!/usr/bin/env python
    #_*_ coding:utf-8_*_

    import tornado.ioloop
    import tornado.web
    import tornado.escape
    import pymssql,pymysql
    from DBUtils.PooledDB import PooledDB


    class Database:
    def __init__(self,*db):
    if len(db) == 5:
    #mysql
    self.host = db[0]
    self.port = db[1]
    self.user = db[2]
    self.pwd = db[3]
    self.db = db[4]
    else:
    #mssql
    self.host = db[0]
    self.port = None
    self.user = db[1]
    self.pwd = db[2]
    self.db = db[3]
    self._CreatePool()

    def _CreatePool(self):
    if not self.db:
    raise NameError + '没有设置数据库信息'
    if (self.port == None):
    self.Pool = PooledDB(creator=pymssql, mincached=2, maxcached=5, maxshared=3, maxconnections=6,
    blocking=True, host=self.host, user=self.user,
    password=self.pwd, database=self.db, charset="utf8")
    else:
    self.Pool = PooledDB(creator=pymysql, mincached=2, maxcached=5, maxshared=3, maxconnections=6,
    blocking=True, host=self.host, port=self.port,
    user=self.user, password=self.pwd, database=self.db, charset="utf8")

    def _Getconnect(self):
    self.conn = self.Pool.connection()
    cur = self.conn.cursor()
    if not cur:
    raise "数据库连接不上"
    else:
    return cur
    # 查询sql

    def ExecQuery(self, sql):
    cur = self._Getconnect()
    cur.execute(sql)
    relist = cur.fetchall()
    cur.close()
    self.conn.close()
    return relist
    # 非查询的sql

    def ExecNoQuery(self, sql):
    cur = self._Getconnect()
    cur.execute(sql)
    self.conn.commit()
    cur.close()
    self.conn.close()

    gdbp = Database
    class MainHadle(tornado.web.RequestHandler):
    def get(self,*args):
    filename = self.get_argument('filename')
    print(filename)
    self.set_header('Content-Type', 'application/octet-stream')
    self.set_header('Content-Disposition', 'attachment; filename=%s'%filename.encode('utf-8'))
    with open(filename,'rb') as f:
    while True:
    data = f.read(1024)
    if not data:
    break
    self.write(data)

    class MainIndex(tornado.web.RequestHandler):
    def get(self):
    self.write('Hello')

    class CheckUser(tornado.web.RequestHandler):
    def get(self):
    user = self.get_argument('user')
    pwd = self.get_argument('passwd')
    #print(user)

    if user != '' and pwd != '':
    lssql = "select usr_code,password from sb_user where usr_code= '%s' " % user
    #print(lssql)
    rds = gdbp.ExecQuery(lssql)
    if rds[0][1] == pwd :
    js_str = tornado.escape.json_encode('{"result":"true","msg":""}')
    self.write(js_str)
    else:
    js_str = tornado.escape.json_encode('{"result":"false","msg":"用户或密码错误"}')
    self.write(js_str)
    #print(rds[0][0])
    else:
    js_str = tornado.escape.json_encode('{"result":"false","msg":"参数错误"}')
    self.write(js_str)




    def make_app():
    return tornado.web.Application([(r"/download",MainHadle),(r"/",MainIndex),(r"/checkuser",CheckUser)])

    def make_dbpool():
    global gdbp
    gdbp = Database('172.20.1.2','sa','xxx','MPL')


    if __name__ == '__main__':
    app = make_app()
    app.listen(8888)
    make_dbpool()
    tornado.ioloop.IOLoop.current().start()

    ————————————————
    版权声明:本文为CSDN博主「Lucky@Dong」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/zzddada/article/details/113771780

  • 相关阅读:
    转:修改虚拟机参数
    NhiberNate 和linq学习博客园网址
    如何配置sqlserver 以允许远程连接
    Mongodb安装配置文档
    IIS安装和配置
    Mvc篇
    在Castle中使用nhibernate
    多线程
    WCF REST系列文章汇总(共9篇)
    测试Api工具Fiddler
  • 原文地址:https://www.cnblogs.com/liang715200/p/14787825.html
Copyright © 2011-2022 走看看