zoukankan      html  css  js  c++  java
  • 参悟python元类(又称metaclass)系列实战(五)

    写在前面

    在上一章节参悟python元类(又称metaclass)系列实战(四)完成了Mysql类, 用来连接数据库以及执行sql语句;
    继续丰富系列实战(三)的users类, 忘记的小伙伴请戳此处;
    本章内容为该系列的终结篇, 感谢大家从而终;
    有误的地方恳请大神指正下。

    热身预备

    • Mysql中, 严谨的sql语句都长的类似下面的格式

      SELECT `uid`, `name` FROM `users`;     -- 关键字都带"反引号"
      
    • Python中如何把一个list中的元素都加上反引号呢?

      fields = ['uid', 'name']
      new_fields = list(map(lambda a: f'`{a}`', fields))   # ['`uid`', '`name`']
      
    • 如何把fields带入到sql中呢?

      f"SELECT {', '.join(new_fields)} FROM `users`"
      
    • 我们上一章定义Mysql.execute时, 对传入的sql字符串做了特殊要求

      await cur.execute(sql.replace('?', '%s'), args)
      # 即在写sql时希望用 ? 代替占位符 %s, 执行时再通过 replace 替换回来
      
      # 该要求用于`INSERT`时, 就得构造类似如下的sql
      "INSERT INTO `users`(`uid`, 'name') VALUES('?', '?')"
      
      # 定义createArgsStr方法, 传入长度, 生成 ? 占位符
      def createArgsStr(num):
          return ', '.join(['?' for _ in range(num)])
      
      # 那 INSERT语句就可以这样构造
      f"INSERT INTO `users`({', '.join(new_fields)}) VALUES({createArgsStr(len(new_fields))})"
      
    • 热身完毕, 接下来我们把insert, select, delete, update的功能加到ORM中

    更新元类ModelMetaClass

    class ModelMetaClass(type):
        def __new__(cls, name, bases, attrs):
            if name == 'Model':
                # 当出现与'Model'同名的类时, 直接创建这类
                return type.__new__(cls, name, bases, attrs)
    
            # 定义表名: 要么在类中定义__table__属性(目的是"类名可以与表名不相同"), 否则与类名相同
            tableName = attrs.get('__table__') or name
            print(f'建立映射关系: {name}类 --> {tableName}表')
    
            mappings = Dict()   # 存储column与Field 子类的对应关系, Field在上一章中定义的, 忘了回去翻
            fields = []         # 用来存储除主键以外的所有字段名
            primaryKey = None   # 用来记录主键字段的名字, 初始没有
    
            for k, v in attrs.items():
                # 遍历所有属性, 即映射表的字段, 读不懂请回看第二章 Users 的定义
                if isinstance(v, Field):     # Field类, 所有字段类型的父类
                    print(f'建立映射... column: {k} ==> class: {v}')
                    mappings[k] = v
    
                    if v.primaryKey:         # 判断字段是否被设置成了主键
                        if primaryKey:       # 因为一张表只能有一个主键
                            raise Exception(f'Duplicate primary key for field {k}')
                        primaryKey = k
                    else:
                        fields.append(k)
    
            if not primaryKey:               # 这里做了一步强制要求设置主键, 你也可以去掉
                raise Exception(f'请给表{tableName}设置主键')
    
            for k in mappings.keys():
                # 删除原属性, 避免实例的属性遮盖类的同名属性, 况且我们已经保存到 mappings 中了, 不怕丢
                attrs.pop(k)
    
            # 接下来给本元类(ModelMetaClass)创建的class(如 Model)设置私有属性
            attrs['__mappings__'] = mappings
            attrs['__table__'] = tableName
            attrs['__primaryKey__'] = primaryKey
            attrs['__fields__'] = fields
    
            # 以下 8 行代码是新增的内容(注意提前定义好 createArgsStr), update 单拎出来, 这里不做处理
            escapedFields = list(map(lambda a: f'`{a}`', fields))
            attrs['__select__'] = f"SELECT `{primaryKey}`, {', '.join(escapedFields)} FROM `{tableName}`"
            attrs['__delete__'] = f"DELETE FROM `{tableName}` WHERE `{primaryKey}`=?"
    
            # 由于 update_at & created_at 可以由mysql自动写入, 我们写入时可以不传
            fields.remove('updated_at')
            fields.remove('created_at')
            attrs['__fields_2__'] = fields
            escapedFields_2 = list(map(lambda a: f'`{a}`', fields))
            attrs['__insert__'] = f"INSERT INTO `{tableName}` ({', '.join(escapedFields_2)}) VALUES ({createArgsStr(len(escapedFields_2))})"
    
            return type.__new__(cls, name, bases, attrs)
    

    更新Model类, 新增findAll方法

    class Model(Dict, metaclass=ModelMetaClass):
        """指定metaclass, 以实现动态定制"""
    
        def __init__(self, **kw):
            super().__init__(**kw)
    
        def getValue(self, key):
            return getattr(self, key, None)
    
        def getValueOrDefault(self, key):
            value = getattr(self, key, None)
            if value is None:
                field = self.__mappings__[key]   # 从所有column中获取value
                if field.default is not None:
                    # 如果default指向是方法(如time.time), 则调用方法获取其值; 否则直接赋值
                    value = field.default() if callable(field.default) else field.default
                    print(f'using defalut value for {key}: {value}')
                    setattr(self, key, value)    # 其实是调 Dict.__setattr__, 以支持用"."访问
            return value
    
        # 以下内容为更新(新增)
        @classmethod
        def appendCondition(cls, sql, where, args, **kw):
            if where:
                sql.append('WHERE')
                sql.append(where)
            if not args:
                args = []
            orderBy = kw.get('orderBy')
            if orderBy:
                sql.append('ORDER BY')
                sql.append(orderBy)
            limit = kw.get('limit')
            if limit:
                sql.append('LIMIT')
                if isinstance(limit, int):
                    sql.append('?')
                    args.append(limit)
                elif isinstance(limit, tuple) and len(limit) == 2:
                    sql.append('?, ?')
                    args.extend(limit)  # 在列表末尾一次性追加另一个序列中的多个值
                else:
                    raise Exception(f'Invalid limit value: {limit}')
            return sql
    
        @classmethod
        async def findAll(cls, where=None, args=None, **kw):
            '''返回结果集'''
            sql = [cls.__select__]
            sql = cls.appendCondition(sql, where, args, **kw)
            rs = await Mysql.select(' '.join(sql), args)
            # cls(**r)是调用Model.__init__方法, 将dict转为Dict, 就可以 . 的方式获取value
            return [cls(**r) for r in rs]
    
    • 挨个特性测试下findAll
    if __name__ == '__main__':
        import asyncio
        loop = asyncio.get_event_loop()
        loop.run_until_complete(Mysql.createPool())
        u = users()
        # 测试查询整张表
        rs = loop.run_until_complete(u.findAll())
        for i in rs:
            print(i.uid)
        # 测试where条件
        rs = loop.run_until_complete(u.findAll(where='is_deleted=?', args=[1]))
        print(rs[0].name)
        # 测试排序
        rs = loop.run_until_complete(u.findAll(where='is_deleted=?', args=[0], orderBy='uid DESC'))
        print(rs)
        # 测试分页查询
        rs = loop.run_until_complete(u.findAll(where='is_deleted=?', args=[1], orderBy='uid DESC', limit=1))
        print(rs[0].email)
    

    再给Model类新增一个findFieldValue方法, 可以查询指定字段

    @classmethod
    async def findFieldValue(cls, selectField, where=None, args=None, **kw):
        sql = [f"SELECT {selectField} FROM `{cls.__table__}`"]
        sql = cls.appendCondition(sql, where, args, **kw)
        rs = await Mysql.select(' '.join(sql), args)
        return [cls(**r) for r in rs]
    
    
    # 测试代码
    if __name__ == '__main__':
        import asyncio
        loop = asyncio.get_event_loop()
        loop.run_until_complete(Mysql.createPool())
        u = users()
        rs = loop.run_until_complete(u.findFieldValue(
            'uid, name, email', 'is_deleted=?', [1], orderBy='uid DESC', limit=(0, 2)))
        print(rs)
    

    再新增findByPrimaryKey, 根据主键id查找

    @classmethod
    async def findByPrimaryKey(cls, pk):
        '''find object by primary key.'''
        rs = await Mysql.select(f'{cls.__select__} where `{cls.__primaryKey__}`=?', [pk], 1)
        if len(rs) == 0:
            return {}
        return cls(**rs[0])
    

    还有insert, update, delete, logicDelete

    async def insert(self):
        # 因为insert都是具体的一行数据, 即单个实例, 所以没有定义成类方法
        args = list(map(self.getValueOrDefault, self.__fields_2__))  # __fields_2__不含update_at & created_at
        rows = await Mysql.execute(self.__insert__, args)
        if rows != 1:
            print(f'插入数据失败, 影响行数: {rows}')
    
    @classmethod
    async def update(cls, setField, where, args):
        """这样可以批量update"""
        sql = f"UPDATE `{cls.__table__}` SET {', '.join(map(lambda a: f'`{a}`=?', setField))} WHERE {where}"
        rows = await Mysql.execute(sql, args)
        if not rows:
            print(f'failed to update by {where}, affected rows: {rows}')
    
    @classmethod
    async def delete(cls, pk):
        """delete by primaryKey"""
        rows = await Mysql.execute(cls.__delete__, [pk])
        if rows != 1:
            print(
                f'failed to delete by {cls.__primaryKey__}, affected rows: {rows}')
    
    @classmethod
    async def logicDelete(cls, pk):
        await cls.update(setField=['is_deleted'],
                        where=f'{cls.__primaryKey__}=?', args=[1, pk])
    
    • 测试代码
    if __name__ == '__main__':
        import asyncio
        loop = asyncio.get_event_loop()
        loop.run_until_complete(Mysql.createPool())
        u = users()
        
        import hashlib
        passwd = hashlib.md5('123456'.encode(encoding='utf-8')).hexdigest()
        test = users(email='test@ztest.top', passwd=passwd, name='test',
                     created_by=100, updated_by=100)
        loop.run_until_complete(test.insert())
        loop.run_until_complete(u.update(setField=['name'], where='uid=106', args=['newTest']))
        loop.run_until_complete(u.logicDelete(106))
        print(loop.run_until_complete(u.findByPrimaryKey(106)))
    

    总结

    1. 到这里小伙伴可能有看懵的, 不要紧, 源码和sql我都打包好了戳这里下载, 请修改数据库密码啥的等配置项

    2. 其实此ORM缺点很多, 普适性差, 对异步也没能应用的合适; 只作为理解metaclass的小练习

    3. 不要重复造轮子, 推荐小伙伴还是用成熟的ORM框架, 如SQLAlchemy, aiomysql对它做了支持

  • 相关阅读:
    栈和堆的区别【转】
    C++虚函数表解析(转)
    C++编码规范(转)
    全局变量的声明和定义 以及dll中全局变量的导出
    Sizeof与Strlen的区别与联系.
    利用事件对象实现线程同步
    创建互斥对象同步线程
    MFC GDI笔记 转
    ClientToScreen( )和ScreenToClient( )
    Visual C++线程同步技术剖析
  • 原文地址:https://www.cnblogs.com/z417/p/13931545.html
Copyright © 2011-2022 走看看