zoukankan      html  css  js  c++  java
  • Flask中使用数据库连接池 DBUtils ——(4)

    DBUtils是Python的一个用于实现数据库连接池的模块。

    此连接池有两种连接模式:

    模式一:为每个线程创建一个连接,线程即使调用了close方法,也不会关闭,只是把连接重新放到连接池,供自己线程再次使用。当线程终止时,连接自动关闭。

    import pymysql
    from DBUtils.PersistentDB import PersistentDB
    
    POOL = PersistentDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
        ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
        closeable=False,
        # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
        threadlocal=None,  # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
        host='127.0.0.1',
        port=3306,
        user='root',
        password='mysql',
        database='python',
        charset='utf8'
    )
    
    def func():
        conn = POOL.connection(shareable=False)
        cursor = conn.cursor()
        cursor.execute('select * from students')
        result = cursor.fetchall()
        print(result)
        cursor.close()
        conn.close()
    
    func()

     运行程序打印数据如下

    模式二:创建一批连接到连接池,供所有线程共享使用。
    PS:由于pymysql、MySQLdb等threadsafety值为1,所以该模式连接池中的线程会被所有线程共享。

    import time
    import pymysql
    import threading
    from DBUtils.PooledDB import PooledDB, SharedDBConnection
    POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
        mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
        maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
        ping=0,
        # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
        host='127.0.0.1',
        port=3306,
        user='root',
        password='mysql',
        database='python',
        charset='utf8'
    )
    
    
    def func():
        # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
        # 否则
        # 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
        # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
        # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
        # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
        conn = POOL.connection()
    
        # print(th, '链接被拿走了', conn1._con)
        # print(th, '池子里目前有', pool._idle_cache, '
    ')
    
        cursor = conn.cursor()
        cursor.execute('select * from students')
        result = cursor.fetchall()
        print(result)
        conn.close()
    
    
    func()

    打印的数据如下

    Flask中使用DBUtils数据库连接池

    不用实例化对象

    在根目录下创建db_helper.py 

    import pymysql
    from DBUtils.PooledDB import PooledDB
    POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
        mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
        maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
        ping=0,
        # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
        host='127.0.0.1',
        port=3306,
        user='root',
        password='mysql',
        database='python',
        charset='utf8'
    )
    
    
    class SQLHelper(object):
        
        @staticmethod
        def fetch_one(sql,args):
            conn = POOL.connection()
            cursor = conn.cursor()
            cursor.execute(sql, args)
            result = cursor.fetchone()
            conn.close()
            return result
    
        @staticmethod
        def fetch_all(self,sql,args):
            conn = POOL.connection()
            cursor = conn.cursor()
            cursor.execute(sql, args)
            result = cursor.fetchone()
            conn.close()
            return result

     创建主程序run.py

    from flask import Flask
    from db_helper import SQLHelper
    
    
    app = Flask(__name__)
    
    @app.route("/")
    def hello():
        result = SQLHelper.fetch_one('select * from students',[])
        print(result)
        return "Hello World"
    
    if __name__ == '__main__':
        app.run()

    输入以下网址

    http://127.0.0.1:5000/

    后台测试结果如下

    需要实例化对象

     在根目录下创建db_helper.py 

    import pymysql
    from DBUtils.PooledDB import PooledDB
    POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
        mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
        maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
        ping=0,
        # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
        host='127.0.0.1',
        port=3306,
        user='root',
        password='mysql',
        database='python',
        charset='utf8'
    )
    
    
    class SQLHelper(object):
    
        def __init__(self):
            self.conn = POOL.connection()
            self.cursor = self.conn.cursor()
    
        def close(self):
            self.cursor.close()
            self.conn.close()
    
        def fetch_one(self,sql, args):
            self.cursor.execute(sql, args)
            result = self.cursor.fetchone()
            self.close()
            return result
    
    
        def fetch_all(self, sql, args):
            self.cursor.execute(sql, args)
            result = self.cursor.fetchall()
            self.close()
            return result

      创建主程序run.py

    from flask import Flask
    from db_helper import SQLHelper
    
    
    app = Flask(__name__)
    
    @app.route("/")
    def hello():
        Con = SQLHelper()
        result = Con.fetch_one('select * from students',[])
        print(result)
        return "Hello World"
    
    if __name__ == '__main__':
        app.run()

     输入以下网址

    http://127.0.0.1:5000/

    测试结果如下

    http://127.0.0.1:5000/
    

      

  • 相关阅读:
    MySql 获取当前节点及递归所有上级节点
    MySql创建树结构递归查询存储过程
    F2工作流引擎Web层全新扁平化UI上线
    F2工作流引擎参与者类型成员的交、并、互拆计算规则
    F2工作流引擎之组织用户模型(四)
    F2工作流引擎之 工作流运转模型(三)
    F2工作流引擎之 概述(一)
    离线安装docker,并导入docker镜像
    sudo: /usr/bin/sudo must be owned by uid 0 and have the setuid bit set 的解决办法
    yml 文件中使用环境变量
  • 原文地址:https://www.cnblogs.com/crazymagic/p/9595872.html
Copyright © 2011-2022 走看看