zoukankan      html  css  js  c++  java
  • Python Day 45 手撸ORM框架

      ##ORM介绍

    #对象关系映射
      在python中出名的是对象关系映射框架:sqlalchemy
    #
    优点: 能够让不会mysql的程序员也能够顺顺利利的操作数据库 #缺点: 可扩展性比较差 #对象关系映射 类 >>> 数据库的表 对象 >>> 数据库的表的一条记录 对象点属性 >>> 记录的某个字段对应的值

    #步骤分析
      1、前言:User(dict)----->User类继承了字典dict------>实例化user=User(name='icon')------>user['name']=owen
                    User类中使用__getattr__和__setattr__方法------->可以实现对象点属性:user.name=icon
      2、__getattr__和__setattr__方法区别:
        __getattr__:当对未定义的属性名称和实例进行点号时,就会用属性名作为字符串调用这个方法。如果继承树可以找到该属性,则不调用此方法
        __setattr__:会拦截所有属性的的赋值语句
      3、定义Field:数据库中每一列数据,都有:列名,列的数据类型,是否是主键,默认值
      4、定义Model基类:
      5、定义ModelMetaclass元类
      6、单例模式(四五种类型,后续再把该知识点补上)

      ##ORM单例模式

    from orm_singleton.mysql_singleton import Mysql
    
    
    # 定义字段类
    class Field(object):
        def __init__(self, name, column_type, primary_key, default):
            self.name = name
            self.column_type = column_type
            self.primary_key = primary_key
            self.default = default
    
    
    # 定义具体的字段
    class StringField(Field):
        def __init__(self, name, column_type='varchar(255)', primary_key=False, default=None):
            super().__init__(name, column_type, primary_key, default)
    
    
    class IntegerField(Field):
        def __init__(self, name, column_type='int', primary_key=False, default=None):
            super().__init__(name, column_type, primary_key, default)
    
    
    class ModelMetaClass(type):
        def __new__(cls, class_name, class_bases, class_attrs):
            # 我仅仅只想拦截模型表的类的创建过程
            if class_name == 'Models':
                return type.__new__(cls, class_name, class_bases, class_attrs)
            # 给类放表名,主键字段,所有字段
            table_name = class_attrs.get('table_name', class_name)
            # 定义一个存储主键的变量
            primary_key = None
            # 定义一个字典用来存储用户自定义的表示表的所有字段信息
            mappings = {}
            # for循环当前类的名称空间
            for k, v in class_attrs.items():
                if isinstance(v, Field):
                    mappings[k] = v
                    if v.primary_key:
                        if primary_key:
                            raise TypeError("主键只能有一个")
                        primary_key = v.name
            # 将重复的键值对删除
            for k in mappings.keys():
                class_attrs.pop(k)
            if not primary_key:
                raise TypeError('必须要有一个主键')
            # 将处理好的数据放入class_attrs中
            class_attrs['table_name'] = table_name
            class_attrs['primary_key'] = primary_key
            class_attrs['mappings'] = mappings
            return type.__new__(cls, class_name, class_bases, class_attrs)
    
    
    class Models(dict, metaclass=ModelMetaClass):
        def __init__(self, **kwargs):
            super().__init__(**kwargs)
        “”“
        # __getattr__:对象获取自己没有的属性或方法的时候自动触发
        # __setattr__:当对象obj.name='jason' 固定句式
        ”“”
    
        def __getattr__(self, item):
            return self.get(item, '没有该键值对')
    
        def __setattr__(self, key, value):
            self[key] = value
    
        # 查询方法
        @classmethod
        def select(cls, **kwargs):
            ms = Mysql.singleton()
            # select * from userinfo
            if not kwargs:
                sql = 'select * from %s' % cls.table_name
                res = ms.select(sql)
            else:
                # select * from userinfo where id = 1
                k = list(kwargs.keys())[0]
                v = kwargs.get(k)
                sql = 'select * from %s where %s=?' % (cls.table_name, k)
                # select * from userinfo where id = ?
                sql = sql.replace('?', '%s')  # select * from userinfo where id = %s
                res = ms.select(sql,v)
            if res:
                return [ cls(**r) for r in res]  # 将数据库的一条数据映射成类的对象
    
        # 新增方法
        def save(self):
            ms = Mysql.singleton()
            # insert into userinfo(name,password) values('jason','123')
            # insert into %s(%s) values(?)
            fields = []  # [name,password]
            values = []
            args = []
            for k,v in self.mappings.items():
                if not v.primary_key:  # 将id字段去除   因为新增一条数据 id是自动递增的不需要你传
                    fields.append(v.name)
                    args.append('?')
                    values.append(getattr(self,v.name))
            # insert into userinfo(name,password) values(?,?)
            sql = "insert into %s(%s) values(%s)"%(self.table_name,','.join(fields),','.join(args))
            # insert into userinfo(name,password) values(?,?)
            sql = sql.replace('?','%s')
            ms.execute(sql,values)
    
        # 修改方法:基于已经存在了的数据进行一个修改操作
        def update(self):
            ms = Mysql.singleton()
            # update userinfo set name='jason',password='123' where id = 1
            fields = []  # [name,password]
            values = []
            pr = None
            for k,v in self.mappings.items():
                if v.primary_key:
                    pr = getattr(self,v.name,v.default)
                else:
                    fields.append(v.name+'=?')
                    values.append(getattr(self,v.name,v.default))
            sql = 'update %s set %s where %s = %s'%(self.table_name,','.join(fields),self.primary_key,pr)
            # update userinfo set name='?',password='?' where id = 1
            sql = sql.replace('?','%s')
            ms.execute(sql,values)
    
    #测试的话放在main下面,这样当作模块执行时main下面语句不执行
    if __name__ == '__main__':
        class Teacher(Models):
            table_name = 'teacher'
            tid = IntegerField(name='tid',primary_key=True)
            tname = StringField(name='tname')
        # obj = Teacher(tname='jason老师')
        # obj.save()
        # res = Teacher.select()
        # for r in res:
        #     print(r.tname)
        # print(res)
        res = Teacher.select(tid=1)
        teacher_obj = res[0]
        teacher_obj.tname = 'jerry老师'
        teacher_obj.update()
    
    
    
        # class User(Models):
        #     table_name = 'User'
        #     id = IntegerField(name='id', primary_key=True)
        #     name = StringField(name='name')
        #     password = StringField(name='password')
        # print(User.primary_key)
        # print(User.mappings)
        # obj = User(name='jason')
        # print(obj.table_name)
        # print(obj.primary_key)
        # print(obj.mappings)
    orm
    import pymysql
    
    
    class Mysql(object):
        _instance = None
        def __init__(self):
            self.conn = pymysql.connect(
                host = '127.0.0.1',
                port = 3306,
                user = 'root',
                password = '123',
                database = 'db2',
                charset = 'utf8',
                autocommit = True # 自动提交
            )
            self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
    
        def close(self):
            self.cursor.close()
            self.conn.close()
    
        def select(self,sql,args=None):
            self.cursor.execute(sql,args)
            res = self.cursor.fetchall()  # 列表套字典
            return res
    
        def execute(self,sql,args):
            try:
                self.cursor.execute(sql,args)
            except BaseException as e :
                print(e)
    
        @classmethod
        def singleton(cls):
            if not cls._instance:
                cls._instance = cls()
            return cls._instance
    msql_singleton

      ##ORM数据库连接池

    from orm_singleton.mysql_singleton import Mysql
    
    
    # 定义字段类
    class Field(object):
        def __init__(self, name, column_type, primary_key, default):
            self.name = name
            self.column_type = column_type
            self.primary_key = primary_key
            self.default = default
    
    
    # 定义具体的字段
    class StringField(Field):
        def __init__(self, name, column_type='varchar(255)', primary_key=False, default=None):
            super().__init__(name, column_type, primary_key, default)
    
    
    class IntegerField(Field):
        def __init__(self, name, column_type='int', primary_key=False, default=None):
            super().__init__(name, column_type, primary_key, default)
    
    
    class ModelMetaClass(type):
        def __new__(cls, class_name, class_bases, class_attrs):
            # 我仅仅只想拦截模型表的类的创建过程
            if class_name == 'Models':
                return type.__new__(cls, class_name, class_bases, class_attrs)
            # 给类放表名,主键字段,所有字段
            table_name = class_attrs.get('table_name', class_name)
            # 定义一个存储主键的变量
            primary_key = None
            # 定义一个字典用来存储用户自定义的表示表的所有字段信息
            mappings = {}
            # for循环当前类的名称空间
            for k, v in class_attrs.items():
                if isinstance(v, Field):
                    mappings[k] = v
                    if v.primary_key:
                        if primary_key:
                            raise TypeError("主键只能有一个")
                        primary_key = v.name
            # 将重复的键值对删除
            for k in mappings.keys():
                class_attrs.pop(k)
            if not primary_key:
                raise TypeError('必须要有一个主键')
            # 将处理好的数据放入class_attrs中
            class_attrs['table_name'] = table_name
            class_attrs['primary_key'] = primary_key
            class_attrs['mappings'] = mappings
            return type.__new__(cls, class_name, class_bases, class_attrs)
    
    
    class Models(dict, metaclass=ModelMetaClass):
        def __init__(self, **kwargs):
            super().__init__(**kwargs)
    
        def __getattr__(self, item):
            return self.get(item, '没有该键值对')
    
        def __setattr__(self, key, value):
            self[key] = value
    
        # 查询方法
        @classmethod
        def select(cls, **kwargs):
            ms = Mysql()
            # select * from userinfo
            if not kwargs:
                sql = 'select * from %s' % cls.table_name
                res = ms.select(sql)
            else:
                # select * from userinfo where id = 1
                k = list(kwargs.keys())[0]
                v = kwargs.get(k)
                sql = 'select * from %s where %s=?' % (cls.table_name, k)
                # select * from userinfo where id = ?
                sql = sql.replace('?', '%s')  # select * from userinfo where id = %s
                res = ms.select(sql,v)
            if res:
                return [ cls(**r) for r in res]  # 将数据库的一条数据映射成类的对象
    
        # 新增方法
        def save(self):
            ms = Mysql()
            # insert into userinfo(name,password) values('jason','123')
            # insert into %s(%s) values(?)
            fields = []  # [name,password]
            values = []
            args = []
            for k,v in self.mappings.items():
                if not v.primary_key:  # 将id字段去除   因为新增一条数据 id是自动递增的不需要你传
                    fields.append(v.name)
                    args.append('?')
                    values.append(getattr(self,v.name))
            # insert into userinfo(name,password) values(?,?)
            sql = "insert into %s(%s) values(%s)"%(self.table_name,','.join(fields),','.join(args))
            # insert into userinfo(name,password) values(?,?)
            sql = sql.replace('?','%s')
            ms.execute(sql,values)
    
        # 修改方法:基于已经存在了的数据进行一个修改操作
        def update(self):
            ms = Mysql()
            # update userinfo set name='jason',password='123' where id = 1
            fields = []  # [name,password]
            values = []
            pr = None
            for k,v in self.mappings.items():
                if v.primary_key:
                    pr = getattr(self,v.name,v.default)
                else:
                    fields.append(v.name+'=?')
                    values.append(getattr(self,v.name,v.default))
            sql = 'update %s set %s where %s = %s'%(self.table_name,','.join(fields),self.primary_key,pr)
            # update userinfo set name='?',password='?' where id = 1
            sql = sql.replace('?','%s')
            ms.execute(sql,values)
    
    
    if __name__ == '__main__':
        class Teacher(Models):
            table_name = 'teacher'
            tid = IntegerField(name='tid',primary_key=True)
            tname = StringField(name='tname')
        # obj = Teacher(tname='jason老师')
        # obj.save()
        # res = Teacher.select()
        # for r in res:
        #     print(r.tname)
        # print(res)
        res = Teacher.select(tid=1)
        teacher_obj = res[0]
        teacher_obj.tname = 'jason老师'
        teacher_obj.update()
        res1 = Teacher.select()
        print(res1)
        # class User(Models):
        #     table_name = 'User'
        #     id = IntegerField(name='id', primary_key=True)
        #     name = StringField(name='name')
        #     password = StringField(name='password')
        # print(User.primary_key)
        # print(User.mappings)
        # obj = User(name='jason')
        # print(obj.table_name)
        # print(obj.primary_key)
        # print(obj.mappings)
    orm
    import pymysql
    from orm_pool import db_pool
    
    class Mysql(object):
        def __init__(self):
            self.conn = db_pool.POOL.connection()
            self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
    
        def close(self):
            self.cursor.close()
            self.conn.close()
    
        def select(self,sql,args=None):
            self.cursor.execute(sql,args)
            res = self.cursor.fetchall()  # 列表套字典
            return res
    
        def execute(self,sql,args):
            try:
                self.cursor.execute(sql,args)
            except BaseException as e :
                print(e)
    mysql_singleton
    from DBUtils.PooledDB import PooledDB
    import pymysql
    
    POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
        mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
        maxshared=3,
        # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
        ping=0,
        # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
        host='127.0.0.1',
        port=3306,
        user='root',
        password='123',
        database='db2',
        charset='utf8',
        autocommit='True'
    )
    db_pool
  • 相关阅读:
    Flask_自定义参数类型(自定义转换器)
    数据结构与算法(排序)
    数据结构与算法(查找)
    Vue_fetch和axios数据请求
    Vue_修饰符
    Vue_列表过滤应用
    Vue_生命周期函数
    Vue_watch()方法,检测数据的改变
    Django_redis_缓存
    防火墙相关
  • 原文地址:https://www.cnblogs.com/liangzhenghong/p/11075292.html
Copyright © 2011-2022 走看看