zoukankan      html  css  js  c++  java
  • python Mysql数据库连接池组件封装(转载)

    以前一直在用Java来开发,数据库连接池等都是有组件封装好的,直接使用即可,最近在尝试Python的学习,碰到了和数据库打交道的问题,和数据库打交道我们都知道,数据库连接池必不可少,不然要么就是程序异常不稳定,要么就是数据库莫名其妙挂了,

    本篇博客主要是对数据库操作的简单封装,相当于一个DBHelper操作类

    组件
    Python中的数据库连接操作所需组件

    pymysql:mysql的Python连接包
    DBUtils:连接池组件
    configparser:配置文件模块
    mysql-connector-python:驱动包
    以上模块都是必装组件,使用pip很轻松就安装

    开始
    组件的封装主要考虑到多种数据库、可配置的情况,因此我们的数据库相关信息不能使用硬编码的方式,需要将数据库放到配置文件中,这样和代码会有一个解耦,也不用频繁更改代码

    代码
    封装的类主要包含的基础方法:

    提供查询的方法
    分页查询的方法
    插入数据
    删除数据
    更新数据
    批量插入、删除、更新数据
    配置文件
    将数据库的相关信息写入db.conf配置文件中,配置文件如下:

    [master]
    # 数据库连接主机
    host=192.168.0.212
    # 数据库端口号
    port=3306
    # 用户名
    user=root
    # 密码
    password=123456
    # 数据库名称
    database=test
    # 数据库连接池最大连接数
    maxconnections=20
    # 数据库连接池最小缓存数
    mincached=5
    # 数据库连接池最大缓存数
    maxcached=10
    
    [local111]
    # 数据库连接主机
    host=192.168.0.111
    # 数据库端口号
    port=3306
    # 用户名
    user=root
    # 密码
    password=123456
    # 数据库名称
    database=test
    # 数据库连接池最大连接数
    maxconnections=20
    # 数据库连接池最小缓存数
    mincached=5
    # 数据库连接池最大缓存数
    maxcached=10

    数据库的配置可以是多个,我们可以配置多个数据源,然后在代码中方便使用

    MySQLConnection

    数据库封装类MySQLConnection.py,代码如下:

    #!/usr/bin/env python
    # -*- encoding: utf-8 -*-
    #!@File    :   database.py
    #!@Time    :   2019/9/23 21:57
    #!@Author  :   xiaoyumin
    #!@Version :   1.0
    #!@Contact :   xiaoymin@foxmail.com
    #!@License :   Copyright (C) 2018 Zhejiang xiaominfo Technology CO.,LTD.
    #!@Desc    :   数据库连接池相关
    import pymysql
    from DBUtils.PooledDB import PooledDB
    import logging
    import configparser
    
    # 读取数据库配置信息
    config=configparser.ConfigParser()
    config.read('../db.conf',encoding='UTF-8')
    sections=config.sections()
    # 数据库工厂
    dbFactory={}
    for dbName in sections:
        # 读取相关属性
        maxconnections=config.get(dbName,"maxconnections")
        mincached=config.get(dbName,"mincached")
        maxcached=config.get(dbName,"maxcached")
        host=config.get(dbName,"host")
        port=config.get(dbName,"port")
        user=config.get(dbName,"user")
        password=config.get(dbName,"password")
        database=config.get(dbName,"database")
        databasePooled=PooledDB(creator=pymysql,
                            maxconnections=int(maxconnections),
                            mincached=int(mincached),
                            maxcached=int(maxcached),
                            blocking=True,
                            cursorclass = pymysql.cursors.DictCursor,
                            host=host,
                            port=int(port),
                            user=user,
                            password=password,
                            database=database)
        dbFactory[dbName]=databasePooled
    
    class MySQLConnection(object):
        """
        数据库连接池代理对象
        查询参数主要有两种类型
        第一种:传入元祖类型,例如(12,13),这种方式主要是替代SQL语句中的%s展位符号
        第二种: 传入字典类型,例如{"id":13},此时我们的SQL语句需要使用键来代替展位符,例如:%(name)s
        """
        def __init__(self,dbName="master"):
            self.connect = dbFactory[dbName].connection()
            self.cursor = self.connect.cursor()
            logging.debug("获取数据库连接对象成功,连接池对象:{}".format(str(self.connect)))
    
        def execute(self,sql,param=None):
            """
            基础更新、插入、删除操作
            :param sql:
            :param param:
            :return: 受影响的行数
            """
            ret=None
            try:
                if param==None:
                    ret=self.cursor.execute(sql)
                else:
                    ret=self.cursor.execute(sql,param)
            except TypeError as te:
                logging.debug("类型错误")
                logging.exception(te)
            return ret
        def query(self,sql,param=None):
            """
            查询数据库
            :param sql: 查询SQL语句
            :param param: 参数
            :return: 返回集合
            """
            self.cursor.execute(sql,param)
            result=self.cursor.fetchall()
            return result
        def queryOne(self,sql,param=None):
            """
            查询数据返回第一条
            :param sql: 查询SQL语句
            :param param: 参数
            :return: 返回第一条数据的字典
            """
            result=self.query(sql,param)
            if result:
                return result[0]
            else:
                return None
        def listByPage(self,sql,current_page,page_size,param=None):
            """
            分页查询当前表格数据
            :param sql: 查询SQL语句
            :param current_page: 当前页码
            :param page_size: 页码大小
            :param param:参数
            :return:
            """
            countSQL="select count(*) ct from ("+sql+") tmp "
            logging.debug("统计SQL:{}".format(sql))
            countNum=self.count(countSQL,param)
            offset=(current_page-1)*page_size
            totalPage=int(countNum/page_size)
            if countNum % page_size>0:
                totalPage = totalPage + 1
            pagination={"current_page":current_page,"page_size":page_size,"count":countNum,"total_page":totalPage}
            querySql="select * from ("+sql+") tmp limit %s,%s"
            logging.debug("查询SQL:{}".format(querySql))
            # 判断是否有参数
            if param==None:
                # 无参数
                pagination["data"]=self.query(querySql,(offset,page_size))
            else:
                # 有参数的情况,此时需要判断参数是元祖还是字典
                if isinstance(param,dict):
                    # 字典的情况,因此需要添加字典
                    querySql="select * from ("+sql+") tmp limit %(tmp_offset)s,%(tmp_pageSize)s"
                    param["tmp_offset"]=offset
                    param["tmp_pageSize"]=page_size
                    pagination["data"]=self.query(querySql,param)
                elif isinstance(param,tuple):
                    # 元祖的方式
                    listtp=list(param)
                    listtp.append(offset)
                    listtp.append(page_size)
                    pagination["data"]=self.query(querySql,tuple(listtp))
                else:
                    # 基础类型
                    listtp=[]
                    listtp.append(param)
                    listtp.append(offset)
                    listtp.append(page_size)
                    pagination["data"]=self.query(querySql,tuple(listtp))
            return pagination
        def count(self,sql,param=None):
            """
            统计当前表记录行数
            :param sql: 统计SQL语句
            :param param: 参数
            :return: 当前记录行
            """
            ret=self.queryOne(sql,param)
            count=None
            if ret:
                for k,v in ret.items():
                    count=v
            return count
    
        def insert(self,sql,param=None):
            """
            数据库插入
            :param sql: SQL语句
            :param param: 参数
            :return: 受影响的行数
            """
            return self.execute(sql,param)
        def update(self,sql,param=None):
            """
            更新操作
            :param sql: SQL语句
            :param param: 参数
            :return: 受影响的行数
            """
            return self.execute(sql,param)
        def delete(self,sql,param=None):
            """
            删除操作
            :param sql: 删除SQL语句
            :param param: 参数
            :return: 受影响的行数
            """
            return self.execute(sql,param)
        def batch(self,sql,param=None):
            """
            批量插入
            :param sql: 插入SQL语句
            :param param: 参数
            :return: 受影响的行数
            """
            return self.cursor.executemany(sql,param)
        def commit(self,param=None):
            """
            提交数据库
            :param param:
            :return:
            """
            if param==None:
                self.connect.commit()
            else:
                self.connect.rollback()
    
        def close(self):
            """
            关闭数据库连接
            :return:
            """
            if self.cursor:
                self.cursor.close()
            if self.connect:
                self.connect.close()
            logging.debug("释放数据库连接")
            return None

    首先我们从配置文件中读取数据库的配置信息,放入dbFactory字典中,一个数据源key对应一个数据库连接池对象

    使用
    封装好我们的数据库组件后,接下来我们对于数据库的操作就很方便了

    注意事项
    我们在写SQL语句的时候需要注意我们的参数占位符的使用,注意有两种方式

    %s:该方式你可以理解为任意字符,但是我们在参数传递的时候顺序必须正确,否则就会出现结果不一致的情况
    %(fied)s:这种方式相对友好一些,我们传入的参数可以是字典对象,而不用关心我们的占位参数是否一致
    获取不同的数据源连接
    我们上面提到过,我们需要的是一个多数据源的情况,那么我们代码中应该如何使用呢?

    # 第一种方式,获取默认数据库的连接,在配置文件中配置名称为master的数据库
    connect=MySQLConnect()
    
    
    # 第二种方式,指定key名称获取数据库连接,如下:获取local111的数据库连接
    connect=MySQLConnect("local111")

    新增数据

    新增数据

    def insertByIndex():
        """
        使用下标索引的方式插入数据
        """
        sql="insert into user(id,name) values(%s,%s)"
        # 获取数据库连接
        connect=MySQLConnection()
        try:
            # 执行插入动作
            connect.insert(sql,("1","张三"))
            # 提交
            connect.commit()
        except Exception as e:
            logging.exception(e)
        finally:
            # 关闭数据库连接
            connect.close()
            
    def insertBatchByIndex():
        """
        使用下标索引的方式批量插入数据
        """
        sql="insert into user(id,name) values(%s,%s)"
        # 获取数据库连接
        connect=MySQLConnection()
        try:
            # 执行批量插入动作
            data=[]
            # 放入的是元祖
            data.append(("1","张三"))
            data.append(("2","张三2"))
            connect.batch(sql,data)
            # 提交
            connect.commit()
        except Exception as e:
            logging.exception(e)
        finally:
            # 关闭数据库连接
            connect.close()

    看了以上的方式或许你会觉得麻烦,为什么呢,因为我们在开发的时候一般都是使用字典居多,都是key-value的方式,因此使用索引的方式需要我们再申明一个有顺序的元祖对象,很麻烦

    使用字典的方式如下:

    def insertByDict():
        """
        使用下标索引的方式插入数据
        """
        sql="insert into user(id,name) values(%(id)s,%(name)s)"
        # 获取数据库连接
        connect=MySQLConnection()
        try:
            # 执行插入动作
            # 此时我们使用的是字典
            connect.insert(sql,{"name":"张三","id":"1"})
            # 提交,必须
            connect.commit()
        except Exception as e:
            logging.exception(e)
        finally:
            # 关闭数据库连接,必须
            connect.close()
            
    def insertBatchByDict():
        """
        使用下标索引的方式批量插入数据
        """
        sql="insert into user(id,name) values(%(id)s,%(name)s)"
        # 获取数据库连接
        connect=MySQLConnection()
        try:
            # 执行批量插入动作
            data=[]
            # 放入的是自字典
            data.append({"name":"张三","id":"1"})
            data.append({"name":"张三1","id":"2"})
            connect.batch(sql,data)
            # 提交
            connect.commit()
        except Exception as e:
            logging.exception(e)
        finally:
            # 关闭数据库连接
            connect.close()

    以上就是新增的方式,删除和修改同理也差不多.

    分页查询数据

    比如分页查询的示例:

    sql="select id,name,address from restaurant where province=%s "
    connect=MySQLConnection()
    try:
          page=1
           size=500
           pagination=connect.listByPage(sql,page,size,'浙江省')
    except Exception as e:
        logging.exception(e)
    finally:
        # 关闭数据库连接
        connect.close()

    分页查询返回分页对象,是一个字典对象,主要包含属性:

    current_page:当前页码
    page_size:当前页码大小
    count:当前统计SQL语句的表记录总数
    total_page:当前总页码数
    data:当前的数据集合,是一个字典集合对象
    不分页的查询返回的是data集合字典对象,和分页使用方法类似

    总结
    以上就是一个简单的Python-MySQL数据库连接池组件,该组件同数据库交互过百万次,非常稳定,值得信赖~~!!!

    附录
    附上相关组件的说明文档:

    https://cito.github.io/DBUtils/
    https://pymysql.readthedocs.io/en/latest/modules/cursors.html

    转载地址:https://blog.csdn.net/u010192145/article/details/102487255

  • 相关阅读:
    POJ3714+最近点对
    HDU1632+半平面交
    POJ2402+模拟
    ASP.NET MVC几种找不到资源的问题解决办法
    ASP.NET MVC中的错误-友好的处理方法
    ASP.NET MVC 程序 报错“CS0012: 类型“System.Data.Objects.DataClasses.EntityObject”在未被引用的程序集中定义”的解决办法
    【Reporting Services 报表开发】— 表达式
    【Reporting Services 报表开发】— 级联式参数设置
    【Reporting Services 报表开发】— 数据表的使用
    【Reporting Services 报表开发】— 矩阵的使用
  • 原文地址:https://www.cnblogs.com/yinliang/p/11784911.html
Copyright © 2011-2022 走看看