高并发下数据库连接使用方案:
1.使用已有数据库连接池(下文有介绍)
2.每个执行单元建立一次连接独享;(效率超低,难以维护,不建议使用)
3.手动维护线程池(本人在MAC测试目前效率最高);
实测代码案例:
import time import pymysql from dbutils import persistent_db, pooled_db, simple_pooled_db, steady_db from multiprocessing.pool import ThreadPool from abc import abstractmethod,ABCMeta from queue import Queue # print(help(pooled_db.PooledDB)) # pooled_db 线程安全 # print(pymysql.threadsafety) db_conf = {"host": "127.0.0.1", "port": 3306, "user": "root", "password": "123456", "database": "userinfo" } class TaskConsumer(metaclass=ABCMeta): @abstractmethod def _task_worker(self, i): pass def consumer(self): task_pool = ThreadPool(self.task_pool_num) start_time = time.time() for i in range(self.task_num): task_pool.apply_async(func=self._task_worker, args=(i,)) task_pool.close() task_pool.join() return time.time() - start_time def db_read(self, cur, sql, i): # time.sleep(2) cur.execute(sql) print(i, cur.fetchall()) class DbPoolTest(TaskConsumer): def __init__(self, task_num, task_pool_num, max_conn, blocking=True, **db_conf): self.task_num = task_num self.task_pool_num = task_pool_num self.max_conn=max_conn self.blocking = blocking self.kwargs = db_conf self._get_db_pool() def _get_db_pool(self): self.db_pool = pooled_db.PooledDB( creator=pymysql, # 连接对象或符合DB-API 2的数据库模块(使用链接数据库的模块) mincached=self.max_conn, # 池中空闲连接的初始数量(默认值为0表示启动时未建立连接) maxcached=0, # 池中最大空闲连接数(默认值0或None表示池大小不受限制) maxshared=0, # 允许的最大共享连接数(默认值为0或None表示所有连接都是专用的);达到此最大数目后,如果已将连接请求为可共享的,则将共享它们。因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 maxconnections=self.max_conn, # 通常允许的最大连接数(默认值为0或无表示任何数量的连接) blocking=self.blocking, # 确定超出最大值时的行为(如果将其设置为true,阻塞并等待直到连接减少,但默认情况下将报告错误) maxusage=None, # 单个连接的最大重用次数(默认值为0或无表示无限重用);当达到连接的最大使用次数时,该连接将自动重置(关闭并重新打开) setsession=None, # 可用于准备会话的SQL命令的可选列表,例如 [“将日期样式设置为德语”,...] reset=True, # 将连接返回池时应如何重置(False或None为以begin()开始的回滚事务,出于安全考虑,默认值True始终触发回滚) failures=None, # 如果默认(OperationalError,InternalError)不足,则应为其应用连接故障转移机制的可选异常类或异常类的元组 ping=1, # 可选标志,用于控制何时使用ping()方法检查连接(如果该方法可用)(0 = None = never, 1 = default = whenever it is requested,2 = when a cursor is created, 4 = when a query is executed, 7 = always) **self.kwargs) def _task_worker(self, i): conn = self.db_pool.connection() cur = conn.cursor() sql = "select * from username;" self.db_read(cur, sql, i) cur.close() conn.close() def close(self): self.db_pool.close() class PymysqlTest(TaskConsumer): def __init__(self,task_num, task_pool_num, **db_conf): self.task_num = task_num self.task_pool_num = task_pool_num self.kwargs = db_conf def _task_worker(self, i): conn = pymysql.Connect(**self.kwargs) cur = conn.cursor() sql = "select * from username;" self.db_read(cur, sql, i) cur.close() conn.close() class PymysqlPoolTest(TaskConsumer): def __init__(self, task_num, task_pool_num, max_conn, blocking=True, **db_conf): self.task_num = task_num self.task_pool_num = task_pool_num self.max_conn = max_conn self.blocking = blocking self.kwargs = db_conf self._get_pymysql_pool() def _get_pymysql_pool(self): self.queue_pool = Queue(maxsize=self.max_conn) for i in range(self.max_conn): self.queue_pool.put(pymysql.Connect(**self.kwargs)) def _task_worker(self, i): conn = self.queue_pool.get() cur = conn.cursor() sql = "select * from username;" self.db_read(cur, sql, i) cur.close() self.queue_pool.put(conn) def close(self): while not self.queue_pool.empty(): self.queue_pool.get().close() if __name__ == '__main__': TASK_NUM = 10000 TASK_POOL_NUM = 100 # 方案1:数据库连接池方案(建议设置blocking=True,maxconnections最大连接数量) start_time_dbpool=time.time() DB_POOL_NUM = 100 #DB_POOL_NUM>=TASK_POOL_NUM任务执行时间基本保持不变 db_pool_test = DbPoolTest(TASK_NUM, TASK_POOL_NUM, DB_POOL_NUM,**db_conf) # start_time_dbpool = time.time() time_dbpool_task = db_pool_test.consumer() db_pool_test.close() time_dbpool = (time.time()-start_time_dbpool, time_dbpool_task) # 方案2:每个线程池中手动建立一个连接 start_time_pymysql = time.time() pymysql_test = PymysqlTest(TASK_NUM, TASK_POOL_NUM, **db_conf) time_pymysql_task = pymysql_test.consumer() time_pymysql = (time.time()-start_time_pymysql, time_pymysql_task) # 方案3:手动维护数据库连接池 start_time_pymysqlpool = time.time() DB_POOL_NUM = 100 # DB_POOL_NUM>=TASK_POOL_NUM任务执行时间基本保持不变 pymysql_pool_test = PymysqlPoolTest(TASK_NUM, TASK_POOL_NUM, DB_POOL_NUM, **db_conf) # start_time_dbpool = time.time() time_pymysqlpool_task = pymysql_pool_test.consumer() pymysql_pool_test.close() time_pymysqlpool = (time.time() - start_time_pymysqlpool, time_pymysqlpool_task) print("数据库连接池方案:", time_dbpool) print("线程内手动连接方案:", time_pymysql) print("手动维护数据库连接池方案:", time_pymysqlpool)
DBUtils实际上是一个包含两个子模块的Python包,一个用于连接DB-API 2模块,另一个用于连接典型的PyGreSQL模块。
全局的DB-API 2变量 | |
---|---|
SteadyDB.py | 用于稳定数据库连接 |
PooledDB.py | 连接池 |
PersistentDB.py | 维持持续的数据库连接 |
SimplePooledDB.py | 简单连接池 |
典型的 PyGreSQL 变量 | |
---|---|
SteadyPg.py | 稳定PyGreSQL连接 |
PooledPg.py | PyGreSQL连接池 |
PersistentPg.py | 维持持续的PyGreSQL连接 |
SimplePooledPg.py | 简单的PyGreSQL连接池 |
SteadyDB
DBUtils.SteadyDB 是一个模块实现了"强硬"的数据库连接,基于DB-API 2建立的原始连接。一个"强硬"的连接意味着在连接关闭之后,或者使用次数操作限制时会重新连接。
一个典型的例子是数据库重启时,而你的程序仍然在运行并需要访问数据库,或者当你的程序连接了一个防火墙后面的远程数据库,而防火墙重启时丢失了状态时。
一般来说你不需要直接使用 SteadyDB 它只是给接下来的两个模块提供基本服务, PersistentDB 和 PooledDB 。
SimplePooledDB
DBUtils.SimplePooledDB 是一个非常简单的数据库连接池实现。他比完善的 PooledDB 模块缺少很多功能
PersistentDB
DBUtils.PersistentDB 实现了强硬的、线程安全的、顽固的数据库连接,使用DB-API 2模块。
当一个线程首次打开一个数据库连接时,一个连接会打开并仅供这个线程使用。当线程关闭连接时,连接仍然持续打开供这个线程下次请求时使用这个已经打开的连接。连接在线程死亡时自动关闭。
简单的来说 PersistentDB 尝试重用数据库连接来提高线程化程序的数据库访问性能,并且他确保连接不会被线程之间共享。
因此, PersistentDB 可以在底层DB-API模块并非线程安全的时候同样工作的很好,并且他会在其他线程改变数据库会话或者使用多语句事务时同样避免问题的发生。
PooledDB
DBUtils.PooledDB 实现了一个强硬的、线程安全的、有缓存的、可复用的数据库连接,使用任何DB-API 2模块。
PooledDB 可以在不同线程之间共享打开的数据库连接。这在你连接并指定 maxshared 参数,并且底层的DB-API 2接口是线程安全才可以,但是你仍然可以使用专用数据库连接而不在线程之间共享连接。除了共享连接以外,还可以设立一个至少 mincached 的连接池,并且最多允许使用 maxcached 个连接,这可以同时用于专用和共享连接池。当一个线程关闭了一个非共享连接,则会返还到空闲连接池中等待下次使用。
如果底层DB-API模块是非线程安全的,线程锁会确保使用 PooledDB 是线程安全的。所以你并不需要为此担心,但是你在使用专用连接来改变数据库会话或执行多命令事务时必须小心。