zoukankan      html  css  js  c++  java
  • 我的第一个python web开发框架(25)——定制ORM(一)

      在开始编写ORM模块之前,我们需要先对db_helper进行重构,因为ORM最终生成的sql是需要转给db_helper来执行的,所以拥有一个功能完善、健壮的数据库操作类是非常必要的。

      这是项目原db_helper.py代码

    #!/usr/bin/env python
    # coding=utf-8
    
    import psycopg2
    from common import log_helper
    from config import const
    
    # 初始化数据库参数
    db_name = const.DB_NAME
    db_host = const.DB_HOST
    db_port = const.DB_PORT
    db_user = const.DB_USER
    db_pass = const.DB_PASS
    
    
    def read(sql):
        """
        连接pg数据库并进行数据查询
        如果连接失败,会把错误写入日志中,并返回false,如果sql执行失败,也会把错误写入日志中,并返回false
        如果所有执行正常,则返回查询到的数据,这个数据是经过转换的,转成字典格式,方便模板调用,其中字典的key是数据表里的字段名
        """
        try:
            # 连接数据库
            conn = psycopg2.connect(database=db_name, user=db_user, password=db_pass, host=db_host, port=db_port)
            # 获取游标
            cursor = conn.cursor()
        except Exception as e:
            print(e.args)
            log_helper.error('连接数据库失败:' + str(e.args))
            return False
        try:
            # 执行查询操作
            cursor.execute(sql)
            # 将返回的结果转换成字典格式
            data = [dict((cursor.description[i][0], value) for i, value in enumerate(row)) for row in cursor.fetchall()]
        except Exception as e:
            print(e.args)
            log_helper.error('sql执行失败:' + str(e.args) + ' sql:' + str(sql))
            return False
        finally:
            # 关闭游标和数据库链接
            cursor.close()
            conn.close()
        # 返回结果(字典格式)
        return data
    
    
    def write(sql, vars):
        """
        连接pg数据库并进行写的操作
        如果连接失败,会把错误写入日志中,并返回false,如果sql执行失败,也会把错误写入日志中,并返回false,如果所有执行正常,则返回true
        """
        try:
            # 连接数据库
            conn = psycopg2.connect(database=db_name, user=db_user, password=db_pass, host=db_host, port=db_port)
            # 获取游标
            cursor = conn.cursor()
        except Exception as e:
            print(e.args)
            log_helper.error('连接数据库失败:' + str(e.args))
            return False
        try:
            # 执行sql语句
            cursor.execute(sql, vars)
            # 提交事务
            conn.commit()
        except Exception as e:
            print(e.args)
            # 如果出错,则事务回滚
            conn.rollback()
            log_helper.error('sql执行失败:' + str(e.args) + ' sql:' + str(sql))
            return False
        else:
            # 获取数据
            try:
                data = [dict((cursor.description[i][0], value) for i, value in enumerate(row))
                             for row in cursor.fetchall()]
            except Exception as e:
                # 没有设置returning或执行修改或删除语句时,记录不存在
                data = None
        finally:
            # 关闭游标和数据库链接
            cursor.close()
            conn.close()
    
        # 如果写入数据后,将数据库返回的数据返回给调用者
        return data
    View Code

      通过对代码的简单分析,可以看到整个模块在初化时,载入数据库链接配置,对数据库的操作也只有简单读与写操作。这样的功能对于一般的数据库增删改查操作已经足够了,但如果业务复杂,有多个库、需要用到事务或者需要访问不同类型数据库时,它就不够用了。所以首先要做的就是对它进行重构,功能进行完善。

      首先我们需要将配置独立出来,当有需要链接多个数据库时,可以读取不同的配置文件,让程序更加方便灵活。

      在config目录下创建db_config.py文件(有多个库时,可以配置多个不同的参数来引用)

    #!/usr/bin/env python
    # coding=utf-8
    
    
    ### 数据库链接参数 ###
    DB = {
        'db_host': '127.0.0.1',
        'db_port': 5432,
        'db_name': 'simple_db',
        'db_user': 'postgres',
        'db_pass': '123456'
    }
    # 是否将所有要执行的Sql语句输出到日志里
    IS_OUTPUT_SQL = False

      在配置中,我们同样定义了数据库连接地址、端口、数据库名称、用户名与密码。

      另外,为了方便我们进行排错,检查sql的生成情况,添加了IS_OUTPUT_SQL是否输出执行的sql语句到日志中这一个开关项,设置为True时,所有被执行的sql语句都会被写到日志中,方便下载日志下来进行分析。

     

      对于数据库操作模块,我们需要封装成一个类,在有需要调用时,就可以通过with语句进行初始化操作,设置对应的数据库链接配置,灵活的连接不同的数据库。

      在设计操作类时,我们需要思考几个问题:

      1.它可以支持多数据库操作,即读取不同的配置能连接操作不同的数据库(可以通过类初始化时进行注入配置信息)

      2.它需要支持with语句,当我们忘记关闭数据库游标和连接时,自动帮我们关闭(需要实现__enter__()与__exit__()方法)

      3.它需要支持数据库事务,当执行失败时,可以回滚数据,当所有sql执行都成功时,可以统一提交事务(需要创建rollback()与commit()方法)

      4.它需要支持查询、添加、修改、删除等操作,方便我们操作关系型数据库记录(需要创建sql执行方法)

      5.它需要支持sql执行优化,将超出指定执行时间的sql语句记录到日志中,方便开发人员进行分析(需要记录sql执行起始时间与结束时间,并进行计算,当这个时间大于指定值时执行日志写入程序)

      根据这些要求,我们初步设计出数据库操作类的基本模型

    class PgHelper(object):
        """postgresql数据库操作类"""
    
        def __init__(self, db, is_output_sql):
            """初始化数据库操作类配置信息"""
    
        def open_conn(self):
            """连接数据库,并建立游标"""
    
        def close_conn(self):
            """关闭postgresql数据库链接"""
    
        def __enter__(self):
            """初始化数据库链接"""
    
        def __exit__(self, type, value, trace):
            """关闭postgresql数据库链接"""
    
        def rollback(self):
            """回滚操作"""
    
        def commit(self):
            """提交事务"""
    
        def execute(self, query, vars=None):
            """执行sql语句查询,返回结果集或影响行数"""
    
        def write_log(self, start_time, end_time, sql):
            """记录Sql执行超时日志"""

      接下来,我们来一一实现上面的各个方法

      首先是完成初始化方法,我们可以通过注入的方法,将db_config配置信息里的参数注入进来初始化。连接不同的数据库时,可以注入不同的配置信息。

    class PgHelper(object):
        """postgresql数据库操作类"""
    
        def __init__(self, db, is_output_sql):
            self.connect = None
            self.cursor = None
            # 初始化数据库参数
            self.db_name = db.get('db_name')
            self.db_user = db.get('db_user')
            self.db_pass = db.get('db_pass')
            self.db_host = db.get('db_host')
            self.db_port = db.get('db_port')
            # 是否将所有要执行的Sql语句输出到日志里
            self.is_output_sql = is_output_sql

      然后我们来创建数据库打开连接方法与关闭连接的方法,当数据库连接失败时会抛出异常,程序会自动调用log_helper.error()方法,将异常写入日志当中,并第一时间发送邮件通知开发人员,方便开发人员即时排错。

        def open_conn(self):
            """连接数据库,并建立游标"""
            try:
                if not self.connect:
                    self.connect = psycopg2.connect(database=self.db_name, user=self.db_user, password=self.db_pass, host=self.db_host, port=self.db_port)
                return self.connect
            except Exception as e:
                log_helper.error('连接数据库失败:' + str(e.args))
                return False
    
        def close_conn(self):
            """关闭postgresql数据库链接"""
            # 关闭游标
            try:
                if self.cursor:
                    self.cursor.close()
            except Exception:
                pass
            # 关闭数据库链接
            try:
                if self.connect:
                    self.connect.close()
            except Exception:
                pass

      通过重写内置__enter__()与__exit__()方法,来实现with语句调用本类时,会自动对类进行初始化操作,自动创建数据库连接。当代码执行完毕后(程序退出with语句时),程序会自动调用对应的方法,将游标与数据库连接的关闭,避免手动操作时,忘记关闭连接出现异常。

        def __enter__(self):
            """初始化数据库链接"""
            self.open_conn()
            return self
    
        def __exit__(self, type, value, trace):
            """关闭postgresql数据库链接"""
            self.close_conn()

      为了方便事务处理,增加回滚方法。用于事务中执行操作失败时,调用回滚方法执行回滚操作。

        def rollback(self):
            """回滚操作"""
            try:
                # 异常时,进行回滚操作
                if self.connect:
                    self.connect.rollback()
                    self.close_conn()
            except Exception as e:
                log_helper.error('回滚操作失败:' + str(e.args))

      还需要增加事务提交方法,方便sql执行增删改成功以后,提交事务更新数据。在开发中很多朋友经常会忘记执行提交事务操作,一直以为代码有问题没有执行成功。

        def commit(self):
            """提交事务"""
            try:
                if self.connect:
                    self.connect.commit()
                    self.close_conn()
            except Exception as e:
                log_helper.error('提交事务失败:' + str(e.args))

      为了方便查看sql语句转换效果,我们还可以增加获取sql语句生成方法,当然这个方法并没有太大的用途。

        def get_sql(self, query, vars=None):
            """获取编译后的sql语句"""
            # 记录程序执行开始时间
            start_time = time.clock()
            try:
                # 判断是否记录sql执行语句
                if self.is_output_sql:
                    log_helper.info('sql:' + str(query))
                # 建立游标
                self.cursor = self.connect.cursor()
                # 执行SQL
                self.data = self.cursor.mogrify(query, vars)
            except Exception as e:
                # 将异常写入到日志中
                log_helper.error('sql生成失败:' + str(e.args) + ' query:' + str(query))
                self.data = '获取编译sql失败'
            finally:
                # 关闭游标
                self.cursor.close()
            # 记录程序执行结束时间
            end_time = time.clock()
            # 写入日志
            self.write_log(start_time, end_time, query)
    
            return self.data

      因为,当你直接使用完整的sql语句执行时,并不需要这个方法。但是,你使用的是下面方式,执行后就会生成组合好的sql语句,帮助我们分析sql语句生成情况

    # 使用with方法,初始化数据库连接
    with db_helper.PgHelper(db_config.DB, db_config.IS_OUTPUT_SQL) as db:
        # 设置sql执行语句
        sql = """insert into product (name, code) values (%s, %s) returning id"""
        # 设置提交参数
        vars = ('zhangsan', '201807251234568')
        # 生成sql语句,并打印到控制台
        print(db.get_sql(sql, vars))

      输出结果:

    b"insert into product (name, code) values ('zhangsan', '201807251234568') returning id"

      数据库最常见的操作就是增删改查操作,由于postgresql有个非常好用的特殊参数:returning,它可以在sql执行增改删结束后,返回我们想要的字段值,方便我们进行相应的判断与操作,所以增改删操作我们不需要将它与查询操作分离成两个方法,统一使用这个方法来获取数据库中返回的记录值。

      在实现这个方法之前,我们设计时要思考这几个问题:

      1.需要记录程序执行的起始与结束时间,计算sql语句执行时长,用来判断是否记录到日志中,方便开发人员进行分析优化sql语句

      2.需要根据参数判断,是否需要将所有执行的sql语句记录到日志中,方便开发人员有需要时,查看执行了哪些sql语句,进行数据与功能分析

      3.由于类在加载时就已经自动连接数据库了,所以在方法中不需要进行打开数据库链接操作

      5.在执行sql语句时,需要创建游标,然后执行sql语句

      6.为了让用户更好的体验,减少异常的直接抛出,需要进行异常捕捉,并将捕捉到的异常进行处理,记录到日志中方便开发人员分析错误,同时同步发送推送给相关人员,即时提醒错误

      7.sql执行成功以后,需要对返回的数据进行处理,组合成字典类型,方便前端使用

      8.完成数据处理后,需要及时关闭游标

      9.对返回的数据需要进行处理后,返回给上一级程序

     1     def execute(self, query, vars=None):
     2         """执行sql语句查询,返回结果集或影响行数"""
     3         if not query:
     4             return None
     5         # 记录程序执行开始时间
     6         start_time = time.clock()
     7         try:
     8             # 判断是否记录sql执行语句
     9             if self.is_output_sql:
    10                 log_helper.info('sql:' + str(query))
    11             # 建立游标
    12             self.cursor = self.connect.cursor()
    13             # 执行SQL
    14             result = self.cursor.execute(query, vars)
    15             print(str(result))
    16         except Exception as e:
    17             # 将异常写入到日志中
    18             log_helper.error('sql执行失败:' + str(e.args) + ' query:' + str(query))
    19             self.data = None
    20         else:
    21             # 获取数据
    22             try:
    23                 if self.cursor.description:
    24                     # 在执行insert/update/delete等更新操作时,如果添加了returning,则读取返回数据组合成字典返回
    25                     self.data = [dict((self.cursor.description[i][0], value) for i, value in enumerate(row)) for row in self.cursor.fetchall()]
    26                 else:
    27                     # 如果执行insert/update/delete等更新操作时没有添加returning,则返回影响行数,值为0时表时没有数据被更新
    28                     self.data = self.cursor.rowcount
    29             except Exception as e:
    30                 # 将异常写入到日志中
    31                 log_helper.error('数据获取失败:' + str(e.args) + ' query:' + str(query))
    32                 self.data = None
    33         finally:
    34             # 关闭游标
    35             self.cursor.close()
    36         # 记录程序执行结束时间
    37         end_time = time.clock()
    38         # 写入日志
    39         self.write_log(start_time, end_time, query)
    40 
    41         # 如果有返回数据,则把该数据返回给调用者
    42         return self.data

       最后一个是记录超时sql语句到日志方法,这里我将大于0.1秒的sql语句都记录下来

        def write_log(self, start_time, end_time, sql):
            """记录Sql执行超时日志"""
            t = end_time - start_time
            if (t) > 0.1:
                content = ' '.join(('run time:', str(t), 's sql:', sql))
                log_helper.info(content)

      完成的db_helper.py代码

      1 #!/usr/bin/env python
      2 # coding=utf-8
      3 
      4 import psycopg2
      5 import time
      6 from io import StringIO
      7 from common import log_helper, file_helper
      8 
      9 
     10 class PgHelper(object):
     11     """postgresql数据库操作类"""
     12 
     13     def __init__(self, db, is_output_sql):
     14         self.connect = None
     15         self.cursor = None
     16         # 初始化数据库参数
     17         self.db_name = db.get('db_name', '')
     18         self.db_user = db.get('db_user', '')
     19         self.db_pass = db.get('db_pass', '')
     20         self.db_host = db.get('db_host', '')
     21         self.db_port = db.get('db_port', '')
     22         # 是否将所有要执行的Sql语句输出到日志里
     23         self.is_output_sql = is_output_sql
     24 
     25     def open_conn(self):
     26         """连接数据库,并建立游标"""
     27         try:
     28             if not self.connect:
     29                 self.connect = psycopg2.connect(database=self.db_name, user=self.db_user, password=self.db_pass,
     30                                                 host=self.db_host, port=self.db_port)
     31             return self.connect
     32         except Exception as e:
     33             log_helper.error('连接数据库失败:' + str(e.args))
     34             return False
     35 
     36     def close_conn(self):
     37         """关闭postgresql数据库链接"""
     38         # 关闭游标
     39         try:
     40             if self.cursor:
     41                 self.cursor.close()
     42         except Exception:
     43             pass
     44         # 关闭数据库链接
     45         try:
     46             if self.connect:
     47                 self.connect.close()
     48         except Exception:
     49             pass
     50 
     51     def __enter__(self):
     52         """初始化数据库链接"""
     53         self.open_conn()
     54         return self
     55 
     56     def __exit__(self, type, value, trace):
     57         """关闭postgresql数据库链接"""
     58         self.close_conn()
     59 
     60     def rollback(self):
     61         """回滚操作"""
     62         try:
     63             # 异常时,进行回滚操作
     64             if self.connect:
     65                 self.connect.rollback()
     66         except Exception as e:
     67             log_helper.error('回滚操作失败:' + str(e.args))
     68 
     69     def commit(self):
     70         """提交事务"""
     71         try:
     72             if self.connect:
     73                 self.connect.commit()
     74                 self.close_conn()
     75         except Exception as e:
     76             log_helper.error('提交事务失败:' + str(e.args))
     77 
     78     def get_sql(self, query, vars=None):
     79         """获取编译后的sql语句"""
     80         # 记录程序执行开始时间
     81         start_time = time.clock()
     82         try:
     83             # 判断是否记录sql执行语句
     84             if self.is_output_sql:
     85                 log_helper.info('sql:' + str(query))
     86             # 建立游标
     87             self.cursor = self.connect.cursor()
     88             # 执行SQL
     89             self.data = self.cursor.mogrify(query, vars)
     90         except Exception as e:
     91             # 将异常写入到日志中
     92             log_helper.error('sql生成失败:' + str(e.args) + ' query:' + str(query))
     93             self.data = '获取编译sql失败'
     94         finally:
     95             # 关闭游标
     96             self.cursor.close()
     97         # 记录程序执行结束时间
     98         end_time = time.clock()
     99         # 写入日志
    100         self.write_log(start_time, end_time, query)
    101 
    102         return self.data
    103 
    104     def copy(self, values, table_name, columns):
    105         """
    106         百万级数据更新函数
    107         :param values: 更新内容,字段之间用	分隔,记录之间用
    分隔 "1	aaa	abc
    2bbabc
    "
    108         :param table_name: 要更新的表名称
    109         :param columns: 需要更新的字段名称:例:('id','userame','passwd')
    110         :return:
    111         """
    112         try:
    113             # 建立游标
    114             self.cursor = self.connect.cursor()
    115             self.cursor.copy_from(StringIO(values), table_name, columns=columns)
    116             self.connect.commit()
    117             return True
    118         except Exception as e:
    119             # 将异常写入到日志中
    120             log_helper.error('批量更新失败:' + str(e.args) + ' table:' + table_name)
    121         finally:
    122             # 关闭游标
    123             self.cursor.close()
    124 
    125     def execute(self, query, vars=None):
    126         """执行sql语句查询,返回结果集或影响行数"""
    127         if not query:
    128             return None
    129         # 记录程序执行开始时间
    130         start_time = time.clock()
    131         try:
    132             # 判断是否记录sql执行语句
    133             if self.is_output_sql:
    134                 log_helper.info('sql:' + str(query))
    135             # 建立游标
    136             self.cursor = self.connect.cursor()
    137             # 执行SQL
    138             result = self.cursor.execute(query, vars)
    139             print(str(result))
    140         except Exception as e:
    141             # 将异常写入到日志中
    142             log_helper.error('sql执行失败:' + str(e.args) + ' query:' + str(query))
    143             self.data = None
    144         else:
    145             # 获取数据
    146             try:
    147                 if self.cursor.description:
    148                     # 在执行insert/update/delete等更新操作时,如果添加了returning,则读取返回数据组合成字典返回
    149                     self.data = [dict((self.cursor.description[i][0], value) for i, value in enumerate(row)) for row in self.cursor.fetchall()]
    150                 else:
    151                     # 如果执行insert/update/delete等更新操作时没有添加returning,则返回影响行数,值为0时表时没有数据被更新
    152                     self.data = self.cursor.rowcount
    153             except Exception as e:
    154                 # 将异常写入到日志中
    155                 log_helper.error('数据获取失败:' + str(e.args) + ' query:' + str(query))
    156                 self.data = None
    157         finally:
    158             # 关闭游标
    159             self.cursor.close()
    160         # 记录程序执行结束时间
    161         end_time = time.clock()
    162         # 写入日志
    163         self.write_log(start_time, end_time, query)
    164 
    165         # 如果有返回数据,则把该数据返回给调用者
    166         return self.data
    167 
    168 
    169     def write_log(self, start_time, end_time, sql):
    170         """记录Sql执行超时日志"""
    171         t = end_time - start_time
    172         if (t) > 0.1:
    173             content = ' '.join(('run time:', str(t), 's sql:', sql))
    174             log_helper.info(content)
    View Code

      测试代码

     1 #!/usr/bin/evn python
     2 # coding=utf-8
     3 
     4 import unittest
     5 from common import db_helper
     6 from config import db_config
     7 
     8 class DbHelperTest(unittest.TestCase):
     9     """数据库操作包测试类"""
    10 
    11     def setUp(self):
    12         """初始化测试环境"""
    13         print('------ini------')
    14 
    15     def tearDown(self):
    16         """清理测试环境"""
    17         print('------clear------')
    18 
    19     def test(self):
    20         # 使用with方法,初始化数据库连接
    21         with db_helper.PgHelper(db_config.DB, db_config.IS_OUTPUT_SQL) as db:
    22             # 设置sql执行语句
    23             sql = """insert into product (name, code) values (%s, %s) returning id"""
    24             # 设置提交参数
    25             vars = ('张三', '201807251234568')
    26             # 生成sql语句,并打印到控制台
    27             print(db.get_sql(sql, vars))
    28 
    29             db.execute('select * from product where id=1000')
    30             db.execute('insert into product (name, code) values (%s, %s) returning id', ('张三', '201807251234568'))
    31             db.commit()
    32 
    33 if __name__ == '__main__':
    34     unittest.main()
    View Code

    版权声明:本文原创发表于 博客园,作者为 AllEmpty 本文欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则视为侵权。

    python开发QQ群:669058475(本群已满)、733466321(可以加2群)    作者博客:http://www.cnblogs.com/EmptyFS/

  • 相关阅读:
    ES6深入浅出-5 新版对象-1.如何创建对象
    ES6深入浅出-4 迭代器与生成器-5.科班 V.S. 培训
    ES6深入浅出-4 迭代器与生成器-4.总结
    ES6深入浅出-4 迭代器与生成器-3.生成器 & for...of
    ES6深入浅出-4 迭代器与生成器-2.Symbol 和迭代器
    Spring cloud微服务安全实战-3-2 第一个API及注入攻击防护
    Spring cloud微服务安全实战-3-1 API安全 常见的安全机制
    Spring Cloud微服务安全实战- 2-1 环境安装
    Spring cloud微服务安全实战_汇总
    ES6深入浅出-4 迭代器与生成器-1.字面量增强
  • 原文地址:https://www.cnblogs.com/EmptyFS/p/9368979.html
Copyright © 2011-2022 走看看