ORM.py
'''
ORM: 对象关系映射 ---> 映射到数据库MySQL中的数据表
类名 ---> 表名
对象 ---> 一条记录
对象.属性 ---> 字段
模拟Django的ORM,为了,将数据库的 增、删、改、查,全部封装成
一个个的方式,比如: save, delete, update, select。
优点:
使用者无需 关心具体的SQL命令 如何编写。
直接通过调用方法 来执行相对应的SQL命令。
缺点:
1.更高级的封装导致“执行效率变低”。
2.会逐渐遗忘SQL原生命令。
'''
from mysql_client import MySQLClient
# 1.创建字段的类型, 对应数据表中的一个个字段的创建规范
class Field:
def __init__(self, name, column_type, primary_key, default):
self.name = name
self.column_type = column_type
self.primary_key = primary_key
self.default = default
# Integer
class IntegerField(Field):
def __init__(self, name, column_type='int', primary_key=False, default=0):
super().__init__(name, column_type, primary_key, default)
# String
class StringField(Field):
def __init__(self, name, column_type='varchar(64)', primary_key=False, default=None):
super().__init__(name, column_type, primary_key, default)
class OrmMetaClass(type):
def __new__(cls, class_name, class_base, class_dict):
# 过滤Models类
if class_name == 'Models':
# models类中,什么都不做,将类原路返回。
return type.__new__(cls, class_name, class_base, class_dict)
# 1.一张表必须要有表名
# 假如table_name没有值,则将类名当做表名
table_name = class_dict.get('table_name', class_name) # get--> self.table_name
# 2.主键名
primary_key = None
# 3.定义一个空字典, 专门用来存放字段对象
mappings = {}
# 遍历名称空间中所有的属性
for key, value in class_dict.items():
# print(key, value)
# 除了有字段,还有其他字段以外的属性
# 过滤字段对象以外的内容
if isinstance(value, Field):
mappings[key] = value
# 判断字段对象primary_key是否为True
if value.primary_key:
# 先判断初识的primary_key是否有值
# 判断主键是否已存在
if primary_key:
raise TypeError('只能有一个主键!')
# 若主键不存在,则给primary_key赋值
primary_key = value.name
# 节省资源: 因为mappings与原类中名称空间中的属性重复,为了节省内存,剔除重复的属性。
for key in mappings.keys():
class_dict.pop(key)
# 判断是否有主键
if not primary_key:
raise TypeError('必须有一个主键')
# 给类的名称空间添加表名
class_dict['table_name'] = table_name
# 给类的名称空间添加主键名
class_dict['primary_key'] = primary_key
# 给类的名称空间添加一个mappings字典,字典中拥有所有字段属性
class_dict['mappings'] = mappings
return type.__new__(cls, class_name, class_base, class_dict)
class Models(dict, metaclass=OrmMetaClass): # OrmMetaClass(Models, Models_name, base, class_dict)
def __getattr__(self, item):
# print(item, '调用没有的属性时会触发...')
# 将字典的值,返回
return self.get(item)
def __setattr__(self, key, value):
# print(key, value)
self[key] = value
# 查询方法
# User.orm_select(id=1) ----> user_obj ---》 修改对象的属性值 user_obj.user_name = '李小花' ---》 user_obj.orm_update()
@classmethod
def orm_select(cls, **kwargs):
# kwargs --> {'id': 1}
# 1.调用MySQLClient拿到mysql对象
mysql = MySQLClient()
if not kwargs:
# 查询所有 sql: select * from User;
sql = 'select * from %s' % cls.table_name
res = mysql.my_select(sql)
else:
# dict.keys() ---> 返回的是一个对象,需要转成list类型
key = list(kwargs.keys())[0]
value = kwargs.get(key)
# 条件查询 sql: select * from User where id=1;
sql = 'select * from %s where %s=?' % (cls.table_name, key)
sql = sql.replace('?', '%s')
# 需要拿到mysql的游标,提交sql语句
# res = mysql.cursor.execute(sql, value)
res = mysql.my_select(sql, value)
# d = {'user_id': 1, 'user_name': 'tank', 'pwd': '123'}
# return [cls(**{'user_id': 1, 'user_name': 'tank', 'pwd': '123'}) for d in res]
# MySQL ---> [dict, ] --> [{}, {}] ---> [obj, obj] obj.属性取值
return [cls(**d) for d in res]
# 插入方法 User(传关键字参数) ---》 user_obj.orm_insert(user_obj)
def orm_insert(self):
mysql = MySQLClient()
# sql: insert into table(f1, f2, f3) values(?, ?, ?);
# 存储字段名
keys = []
# 存字段对应的值
values = []
# 存放?号的,有几个字段,就存几个?号
args = []
for k, v in self.mappings.items():
# 过滤掉主键,因为主键是自增的
if not v.primary_key:
# keys.append(k)
# 存表中除了主键以外的字段名
keys.append(v.name)
# 存表中除了主键以外的字段值,若值没有,则使用默认值
values.append(
getattr(self, v.name, v.default)
)
# 存放?号的,有几个字段,就存几个?号
args.append('?')
# sql: insert into table_name(v1, v2, v3) values(?, ?, ?)
sql = 'insert into %s(%s) values(%s)' % (
self.table_name,
','.join(keys),
','.join(args)
)
# sql: insert into table_name(v1, v2, v3) values(%s, %s, %s)
sql = sql.replace('?', '%s')
mysql.my_execute(sql, values)
# mysql.cursor.execute(sql, values)
# 更新方法 update User(传关键字参数) ---》 user_obj.orm_insert(user_obj)
def orm_update(self):
mysql = MySQLClient()
# 字段名
keys = []
# 字段值
values = []
# 主键: id=pk
primary_key = None
for k, v in self.mappings.items():
if v.primary_key:
# 只要思想不滑坡,方法总比问题多。
primary_key = v.name + '= %s' % getattr(self, v.name)
else:
keys.append(v.name + '=?')
values.append(
getattr(self, v.name) # 小贱贱 ---> 大贱贱
)
# sql: update table set k1=v1, k2=v2 where id=pk; #
# sql: update table set k1=?, k2=? where id=pk; #
# 注意: tank的更新方法,更新条件固定使用主键。
sql = 'update %s set %s where %s' % (
self.table_name,
','.join(keys),
primary_key
)
# sql: update table set k1=%s, k2=%s where id=pk; #
sql = sql.replace('?', '%s')
mysql.my_execute(sql, values)
# 用户表类
class User(Models): # ---> 表名
# table_name = 'user_info'
# 强调: 最好与字段类型的name属性同名
# user_id自增
user_id = IntegerField(name='user_id', primary_key=True)
user_name = StringField(name='user_name')
pwd = StringField(name='pwd')
# 用户表类
class Movie(Models): # ---> 表名
# table_name = 'user_info'
# 强调: 最好与字段类型的name属性同名
user_id = IntegerField(name='user_id', primary_key=True)
user_name = StringField(name='user_name')
pwd = StringField(name='pwd')
if __name__ == '__main__':
# ORM:类名 --> 表名 对象 ---> 一条记录 对象.属性 ---> 字段
# user = User(user_name='tank', pwd='123')
# 查 orm_select
# 查询所有 sql: select * from User;
# User.select()
# Movie.orm_select()
# 条件查询 sql: select * from User where id=1;
# User.orm_select(id=1)
# res = User.orm_select()[0]
# print(res)
# print(res.user_name)
# 增 orm_insert
# User表 ---> 添加字段对应的数据
# user_obj = User(user_name='小贱贱', pwd='123')
# user_obj.orm_insert()
# user_obj = User.orm_select(user_name='小贱贱')[0]
# print(user_obj.user_name)
# 改
# user_obj = User.orm_select(user_name='小贱贱')[0]
# print(user_obj.user_name)
# user_obj.user_name = '大贱贱'
# user_obj.orm_update()
user_obj = User.orm_select(user_name='大贱贱')[0]
print(user_obj)
mysql_client.py
import pymysql
class MySQLClient:
__instance = None
# 单例模式1
def __new__(cls, *args, **kwargs):
if not cls.__instance:
cls.__instance = object.__new__(cls)
return cls.__instance
# 单例模式2
# @classmethod
# def singleton(cls):
# if not cls.__instance:
# cls.__instance = cls()
# return cls.__instance
# 1.创建连接并获取游标
def __init__(self):
# 连接客户端
self.client = pymysql.connect(
host='127.0.0.1',
port=3306,
user='root',
password='123qwe',
database='orm_demo',
charset='utf8',
autocommit=True
)
self.cursor = self.client.cursor(
pymysql.cursors.DictCursor
)
# 2.提交查询sql命令
def my_select(self, sql, value=None):
# 提交查询的sql命令
self.cursor.execute(sql, value)
# 获取查询以后的结果
res = self.cursor.fetchall()
return res
# 3.封装 “提交sql命令,插入或者更新操作”的方法
def my_execute(self, sql, values):
try:
self.cursor.execute(sql, values)
except Exception as e:
print(e)
# 4.关闭数据库
def close(self):
self.cursor.close()
self.client.close()