zoukankan      html  css  js  c++  java
  • 数据库连接池

    高并发下数据库连接使用方案:

      1.使用已有数据库连接池(下文有介绍)

      2.每个执行单元建立一次连接独享;(效率超低,难以维护,不建议使用)

      3.手动维护线程池(本人在MAC测试目前效率最高);

    实测代码案例:

    import time
    import pymysql
    from dbutils import persistent_db, pooled_db, simple_pooled_db, steady_db
    from multiprocessing.pool import ThreadPool
    from abc import abstractmethod,ABCMeta
    from queue import Queue
    # print(help(pooled_db.PooledDB))
    # pooled_db 线程安全
    # print(pymysql.threadsafety)
    
    db_conf = {"host": "127.0.0.1",
               "port": 3306,
               "user": "root",
               "password": "123456",
               "database": "userinfo"
               }
    
    
    class TaskConsumer(metaclass=ABCMeta):
        @abstractmethod
        def _task_worker(self, i):
            pass
    
        def consumer(self):
            task_pool = ThreadPool(self.task_pool_num)
            start_time = time.time()
            for i in range(self.task_num):
                task_pool.apply_async(func=self._task_worker, args=(i,))
            task_pool.close()
            task_pool.join()
            return time.time() - start_time
    
        def db_read(self, cur, sql, i):
            # time.sleep(2)
            cur.execute(sql)
            print(i, cur.fetchall())
    
    class DbPoolTest(TaskConsumer):
        def __init__(self, task_num, task_pool_num, max_conn, blocking=True, **db_conf):
            self.task_num = task_num
            self.task_pool_num = task_pool_num
            self.max_conn=max_conn
            self.blocking = blocking
            self.kwargs = db_conf
            self._get_db_pool()
    
        def _get_db_pool(self):
            self.db_pool = pooled_db.PooledDB(
                creator=pymysql,  # 连接对象或符合DB-API 2的数据库模块(使用链接数据库的模块)
                mincached=self.max_conn,  # 池中空闲连接的初始数量(默认值为0表示启动时未建立连接)
                maxcached=0,  # 池中最大空闲连接数(默认值0或None表示池大小不受限制)
                maxshared=0,
                # 允许的最大共享连接数(默认值为0或None表示所有连接都是专用的);达到此最大数目后,如果已将连接请求为可共享的,则将共享它们。因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
                maxconnections=self.max_conn,  # 通常允许的最大连接数(默认值为0或无表示任何数量的连接)
                blocking=self.blocking,  # 确定超出最大值时的行为(如果将其设置为true,阻塞并等待直到连接减少,但默认情况下将报告错误)
                maxusage=None,  # 单个连接的最大重用次数(默认值为0或无表示无限重用);当达到连接的最大使用次数时,该连接将自动重置(关闭并重新打开)
                setsession=None,  # 可用于准备会话的SQL命令的可选列表,例如 [“将日期样式设置为德语”,...]
                reset=True,  # 将连接返回池时应如何重置(False或None为以begin()开始的回滚事务,出于安全考虑,默认值True始终触发回滚)
                failures=None,  # 如果默认(OperationalError,InternalError)不足,则应为其应用连接故障转移机制的可选异常类或异常类的元组
                ping=1,
                # 可选标志,用于控制何时使用ping()方法检查连接(如果该方法可用)(0 = None = never, 1 = default = whenever it is requested,2 = when a cursor is created, 4 = when a query is executed, 7 = always)
                **self.kwargs)
    
        def _task_worker(self, i):
            conn = self.db_pool.connection()
            cur = conn.cursor()
            sql = "select * from username;"
            self.db_read(cur, sql, i)
            cur.close()
            conn.close()
    
        def close(self):
            self.db_pool.close()
    
    class PymysqlTest(TaskConsumer):
        def __init__(self,task_num, task_pool_num, **db_conf):
            self.task_num = task_num
            self.task_pool_num = task_pool_num
            self.kwargs = db_conf
    
        def _task_worker(self, i):
            conn = pymysql.Connect(**self.kwargs)
            cur = conn.cursor()
            sql = "select * from username;"
            self.db_read(cur, sql, i)
            cur.close()
            conn.close()
    
    
    class PymysqlPoolTest(TaskConsumer):
        def __init__(self, task_num, task_pool_num, max_conn, blocking=True, **db_conf):
            self.task_num = task_num
            self.task_pool_num = task_pool_num
            self.max_conn = max_conn
            self.blocking = blocking
            self.kwargs = db_conf
            self._get_pymysql_pool()
    
        def _get_pymysql_pool(self):
            self.queue_pool = Queue(maxsize=self.max_conn)
            for i in range(self.max_conn):
                self.queue_pool.put(pymysql.Connect(**self.kwargs))
    
        def _task_worker(self, i):
            conn = self.queue_pool.get()
            cur = conn.cursor()
            sql = "select * from username;"
            self.db_read(cur, sql, i)
            cur.close()
            self.queue_pool.put(conn)
    
        def close(self):
            while not self.queue_pool.empty():
                self.queue_pool.get().close()
    
    if __name__ == '__main__':
        TASK_NUM = 10000
        TASK_POOL_NUM = 100
    
    
        # 方案1:数据库连接池方案(建议设置blocking=True,maxconnections最大连接数量)
        start_time_dbpool=time.time()
        DB_POOL_NUM = 100 #DB_POOL_NUM>=TASK_POOL_NUM任务执行时间基本保持不变
        db_pool_test = DbPoolTest(TASK_NUM, TASK_POOL_NUM, DB_POOL_NUM,**db_conf)
        # start_time_dbpool = time.time()
        time_dbpool_task = db_pool_test.consumer()
        db_pool_test.close()
        time_dbpool = (time.time()-start_time_dbpool, time_dbpool_task)
    
    
        # 方案2:每个线程池中手动建立一个连接
        start_time_pymysql = time.time()
        pymysql_test = PymysqlTest(TASK_NUM, TASK_POOL_NUM, **db_conf)
        time_pymysql_task = pymysql_test.consumer()
        time_pymysql = (time.time()-start_time_pymysql, time_pymysql_task)
    
        # 方案3:手动维护数据库连接池
        start_time_pymysqlpool = time.time()
        DB_POOL_NUM = 100  # DB_POOL_NUM>=TASK_POOL_NUM任务执行时间基本保持不变
        pymysql_pool_test = PymysqlPoolTest(TASK_NUM, TASK_POOL_NUM, DB_POOL_NUM, **db_conf)
        # start_time_dbpool = time.time()
        time_pymysqlpool_task = pymysql_pool_test.consumer()
        pymysql_pool_test.close()
        time_pymysqlpool = (time.time() - start_time_pymysqlpool, time_pymysqlpool_task)
    
    
        print("数据库连接池方案:", time_dbpool)
        print("线程内手动连接方案:", time_pymysql)
        print("手动维护数据库连接池方案:", time_pymysqlpool)
    数据库连接三种方案

    DBUtils实际上是一个包含两个子模块的Python包,一个用于连接DB-API 2模块,另一个用于连接典型的PyGreSQL模块。

    全局的DB-API 2变量
    SteadyDB.py 用于稳定数据库连接
    PooledDB.py 连接池
    PersistentDB.py 维持持续的数据库连接
    SimplePooledDB.py 简单连接池
    典型的 PyGreSQL 变量
    SteadyPg.py 稳定PyGreSQL连接
    PooledPg.py PyGreSQL连接池
    PersistentPg.py 维持持续的PyGreSQL连接
    SimplePooledPg.py 简单的PyGreSQL连接池

    SteadyDB

    DBUtils.SteadyDB 是一个模块实现了"强硬"的数据库连接,基于DB-API 2建立的原始连接。一个"强硬"的连接意味着在连接关闭之后,或者使用次数操作限制时会重新连接。

    一个典型的例子是数据库重启时,而你的程序仍然在运行并需要访问数据库,或者当你的程序连接了一个防火墙后面的远程数据库,而防火墙重启时丢失了状态时。

    一般来说你不需要直接使用 SteadyDB 它只是给接下来的两个模块提供基本服务, PersistentDB 和 PooledDB 。

    SimplePooledDB

    DBUtils.SimplePooledDB 是一个非常简单的数据库连接池实现。他比完善的 PooledDB 模块缺少很多功能

    PersistentDB

    DBUtils.PersistentDB 实现了强硬的、线程安全的、顽固的数据库连接,使用DB-API 2模块。

    当一个线程首次打开一个数据库连接时,一个连接会打开并仅供这个线程使用。当线程关闭连接时,连接仍然持续打开供这个线程下次请求时使用这个已经打开的连接。连接在线程死亡时自动关闭。

    简单的来说 PersistentDB 尝试重用数据库连接来提高线程化程序的数据库访问性能,并且他确保连接不会被线程之间共享。

    因此, PersistentDB 可以在底层DB-API模块并非线程安全的时候同样工作的很好,并且他会在其他线程改变数据库会话或者使用多语句事务时同样避免问题的发生。

    PooledDB

    DBUtils.PooledDB 实现了一个强硬的、线程安全的、有缓存的、可复用的数据库连接,使用任何DB-API 2模块。

    PooledDB 可以在不同线程之间共享打开的数据库连接。这在你连接并指定 maxshared 参数,并且底层的DB-API 2接口是线程安全才可以,但是你仍然可以使用专用数据库连接而不在线程之间共享连接。除了共享连接以外,还可以设立一个至少 mincached 的连接池,并且最多允许使用 maxcached 个连接,这可以同时用于专用和共享连接池。当一个线程关闭了一个非共享连接,则会返还到空闲连接池中等待下次使用。

    如果底层DB-API模块是非线程安全的,线程锁会确保使用 PooledDB 是线程安全的。所以你并不需要为此担心,但是你在使用专用连接来改变数据库会话或执行多命令事务时必须小心。

  • 相关阅读:
    新华字典有多少字
    lisp install
    OCaml Language Sucks
    Erlang, Haskell, OCaml: screw one, marry one, kill one. Which and why?
    Linux获取网页源码的几种方法
    什么是zhcon
    What is plowshare?
    neo4j简单学习
    neo4j 云端部署
    Clojure语言 vs Scala语言
  • 原文地址:https://www.cnblogs.com/open-yang/p/14317632.html
Copyright © 2011-2022 走看看