zoukankan      html  css  js  c++  java
  • ORM + 单例

    mysql_singleton.py

    import pymysql
    
    
    class Mysql(object):
        _instance = None
        def __init__(self):
            self.conn = pymysql.connect(
                host = '127.0.0.1',
                port = 3306,
                user = 'root',
                password = '123',
                database = 'db2',
                charset = 'utf8',
                autocommit = True
            )
            self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
    
        def close(self):
            self.cursor.close()
            self.conn.close()
    
        def select(self,sql,args=None):
            self.cursor.execute(sql,args)
            res = self.cursor.fetchall()  # 列表套字典
            return res
    
        def execute(self,sql,args):
            try:
                self.cursor.execute(sql,args)
            except BaseException as e :
                print(e)
    
        @classmethod
        def singleton(cls):
            if not cls._instance:
                cls._instance = cls()
            return cls._instance
    View Code

    orm.py

    from orm_singleton.mysql_singleton import Mysql
    
    
    # 定义字段类
    class Field(object):
        def __init__(self, name, column_type, primary_key, default):
            self.name = name
            self.column_type = column_type
            self.primary_key = primary_key
            self.default = default
    
    
    # 定义具体的字段
    class StringField(Field):
        def __init__(self, name, column_type='varchar(255)', primary_key=False, default=None):
            super().__init__(name, column_type, primary_key, default)
    
    
    class IntegerField(Field):
        def __init__(self, name, column_type='int', primary_key=False, default=None):
            super().__init__(name, column_type, primary_key, default)
    
    
    class ModelMetaClass(type):
        def __new__(cls, class_name, class_bases, class_attrs):
            # 我仅仅只想拦截模型表的类的创建过程
            if class_name == 'Models':
                return type.__new__(cls, class_name, class_bases, class_attrs)
            # 给类放表名,主键字段,所有字段
            table_name = class_attrs.get('table_name', class_name)
            # 定义一个存储主键的变量
            primary_key = None
            # 定义一个字典用来存储用户自定义的表示表的所有字段信息
            mappings = {}
            # for循环当前类的名称空间
            for k, v in class_attrs.items():
                if isinstance(v, Field):
                    mappings[k] = v
                    if v.primary_key:
                        if primary_key:
                            raise TypeError("主键只能有一个")
                        primary_key = v.name
            # 将重复的键值对删除
            for k in mappings.keys():
                class_attrs.pop(k)
            if not primary_key:
                raise TypeError('必须要有一个主键')
            # 将处理好的数据放入class_attrs中
            class_attrs['table_name'] = table_name
            class_attrs['primary_key'] = primary_key
            class_attrs['mappings'] = mappings
            return type.__new__(cls, class_name, class_bases, class_attrs)
    
    
    class Models(dict, metaclass=ModelMetaClass):
        def __init__(self, **kwargs):
            super().__init__(**kwargs)
    
        def __getattr__(self, item):
            return self.get(item, '没有该键值对')
    
        def __setattr__(self, key, value):
            self[key] = value
    
        # 查询方法
        @classmethod
        def select(cls, **kwargs):
            ms = Mysql.singleton()
            # select * from userinfo
            if not kwargs:
                sql = 'select * from %s' % cls.table_name
                res = ms.select(sql)
            else:
                # select * from userinfo where id = 1
                k = list(kwargs.keys())[0]
                v = kwargs.get(k)
                sql = 'select * from %s where %s=?' % (cls.table_name, k)
                # select * from userinfo where id = ?
                sql = sql.replace('?', '%s')  # select * from userinfo where id = %s
                res = ms.select(sql,v)
            if res:
                return [ cls(**r) for r in res]  # 将数据库的一条数据映射成类的对象
    
        # 查询单条
        @classmethod
        def select_one(cls, **kwargs):
            # 只查一条
            key = list(kwargs.keys())[0]
            value = kwargs[key]
            # select * from user where id=%s
            sql = 'select * from %s where %s=?' % (cls.table_name, key)
            #
            sql = sql.replace('?', '%s')
            ms = Mysql.singleton()
            re = ms.select(sql, value)
            if re:
                # attrs={'name':'123','password':123}
                # u=User(**attrs)
                # 相当于 User(name='123',password=123)
                u = cls(**re[0])
                return u
            else:
                return
    
        # 新增方法
        def save(self):
            ms = Mysql.singleton()
            # insert into userinfo(name,password) values('jason','123')
            # insert into %s(%s) values(?)
            fields = []  # [name,password]
            values = []
            args = []
            for k,v in self.mappings.items():
                if not v.primary_key:  # 将id字段去除   因为新增一条数据 id是自动递增的不需要你传
                    fields.append(v.name)
                    args.append('?')
                    values.append(getattr(self,v.name))
            # insert into userinfo(name,password) values(?,?)
            sql = "insert into %s(%s) values(%s)"%(self.table_name,','.join(fields),','.join(args))
            # insert into userinfo(name,password) values(?,?)
            sql = sql.replace('?','%s')
            ms.execute(sql,values)
    
        # 修改方法:基于已经存在了的数据进行一个修改操作
        def update(self):
            ms = Mysql.singleton()
            # update userinfo set name='jason',password='123' where id = 1
            fields = []  # [name,password]
            values = []
            pr = None
            for k,v in self.mappings.items():
                if v.primary_key:
                    pr = getattr(self,v.name,v.default)
                else:
                    fields.append(v.name+'=?')
                    values.append(getattr(self,v.name,v.default))
            sql = 'update %s set %s where %s = %s'%(self.table_name,','.join(fields),self.primary_key,pr)
            # update userinfo set name='?',password='?' where id = 1
            sql = sql.replace('?','%s')
            ms.execute(sql,values)
    
    #测试
    if __name__ == '__main__':
        class Teacher(Models):
            table_name = 'teacher'
            tid = IntegerField(name='tid',primary_key=True)
            tname = StringField(name='tname')
            
        # obj = Teacher(tname='jason老师')
        # obj.save()
        # res = Teacher.select()
        # for r in res:
        #     print(r.tname)
        # print(res)
        
        res = Teacher.select(tid=1)
        teacher_obj = res[0]
        teacher_obj.tname = 'jerry老师'
        teacher_obj.update()
    
        #-----------------------------------------------------
    
        class User(Models):
            table_name = 'User'
            id = IntegerField(name='id', primary_key=True)
            name = StringField(name='name')
            password = StringField(name='password')
    
        print(User.primary_key)
        print(User.mappings)
        obj = User(name='jason')
        print(obj.table_name)
        print(obj.primary_key)
        print(obj.mappings)
    View Code
  • 相关阅读:
    Northwind数据库下载地址
    MSSQL跨服务访问数据库
    MSSQL基于一致性的I/O错误,解决方法之一
    DataGridView单元格ComboBox控件添加事件
    线程安全类 跨线程修改窗体UI
    数据库字段名
    SELECT INTO 和 INSERT INTO SELECT
    链表
    因为数据库正在使用,所以无法获得对数据库的独占访问权
    代替游标的循环
  • 原文地址:https://www.cnblogs.com/HZLS/p/11073513.html
Copyright © 2011-2022 走看看