orm的两种方法
方法一:线程池
orm_pool 文件夹
1 from DBUtils.PooledDB import PooledDB 2 import pymysql 3 4 POOL = PooledDB( 5 creator=pymysql, # 使用链接数据库的模块 6 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 7 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 8 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 9 maxshared=3, 10 # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 11 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 12 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 13 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] 14 ping=0, 15 # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always 16 host='127.0.0.1', 17 port=3306, 18 user='root', 19 password='123', 20 database='youku2', 21 charset='utf8', 22 autocommit='True' 23 )
1 import pymysql 2 from orm_pool import db_pool 3 4 class Mysql(object): 5 def __init__(self): 6 self.conn = db_pool.POOL.connection() 7 self.cursor = self.conn.cursor(pymysql.cursors.DictCursor) 8 9 def close(self): 10 self.cursor.close() 11 self.conn.close() 12 13 def select(self,sql,args=None): 14 self.cursor.execute(sql,args) 15 res = self.cursor.fetchall() # 列表套字典 16 return res 17 18 def execute(self,sql,args): 19 try: 20 self.cursor.execute(sql,args) 21 except BaseException as e : 22 print(e)
1 from orm_singleton.mysql_singleton import Mysql 2 3 4 # 定义字段类 5 class Field(object): 6 def __init__(self, name, column_type, primary_key, default): 7 self.name = name 8 self.column_type = column_type 9 self.primary_key = primary_key 10 self.default = default 11 12 13 # 定义具体的字段 14 class StringField(Field): 15 def __init__(self, name, column_type='varchar(255)', primary_key=False, default=None): 16 super().__init__(name, column_type, primary_key, default) 17 18 19 class IntegerField(Field): 20 def __init__(self, name, column_type='int', primary_key=False, default=None): 21 super().__init__(name, column_type, primary_key, default) 22 23 24 class ModelMetaClass(type): 25 def __new__(cls, class_name, class_bases, class_attrs): 26 # 我仅仅只想拦截模型表的类的创建过程 27 if class_name == 'Models': 28 return type.__new__(cls, class_name, class_bases, class_attrs) 29 # 给类放表名,主键字段,所有字段 30 table_name = class_attrs.get('table_name', class_name) 31 # 定义一个存储主键的变量 32 primary_key = None 33 # 定义一个字典用来存储用户自定义的表示表的所有字段信息 34 mappings = {} 35 # for循环当前类的名称空间 36 for k, v in class_attrs.items(): 37 if isinstance(v, Field): 38 mappings[k] = v 39 if v.primary_key: 40 if primary_key: 41 raise TypeError("主键只能有一个") 42 primary_key = v.name 43 # 将重复的键值对删除 44 for k in mappings.keys(): 45 class_attrs.pop(k) 46 if not primary_key: 47 raise TypeError('必须要有一个主键') 48 # 将处理好的数据放入class_attrs中 49 class_attrs['table_name'] = table_name 50 class_attrs['primary_key'] = primary_key 51 class_attrs['mappings'] = mappings 52 return type.__new__(cls, class_name, class_bases, class_attrs) 53 54 55 class Models(dict, metaclass=ModelMetaClass): 56 def __init__(self, **kwargs): 57 super().__init__(**kwargs) 58 59 def __getattr__(self, item): 60 return self.get(item, '没有该键值对') 61 62 def __setattr__(self, key, value): 63 self[key] = value 64 65 # 查询方法 66 @classmethod 67 def select(cls, **kwargs): 68 ms = Mysql() 69 # select * from userinfo 70 if not kwargs: 71 sql = 'select * from %s' % cls.table_name 72 res = ms.select(sql) 73 else: 74 # select * from userinfo where id = 1 75 k = list(kwargs.keys())[0] 76 v = kwargs.get(k) 77 sql = 'select * from %s where %s=?' % (cls.table_name, k) 78 # select * from userinfo where id = ? 79 sql = sql.replace('?', '%s') # select * from userinfo where id = %s 80 res = ms.select(sql,v) 81 if res: 82 return [ cls(**r) for r in res] # 将数据库的一条数据映射成类的对象 83 84 # 新增方法 85 def save(self): 86 ms = Mysql() 87 # insert into userinfo(name,password) values('jason','123') 88 # insert into %s(%s) values(?) 89 fields = [] # [name,password] 90 values = [] 91 args = [] 92 for k,v in self.mappings.items(): 93 if not v.primary_key: # 将id字段去除 因为新增一条数据 id是自动递增的不需要你传 94 fields.append(v.name) 95 args.append('?') 96 values.append(getattr(self,v.name)) 97 # insert into userinfo(name,password) values(?,?) 98 sql = "insert into %s(%s) values(%s)"%(self.table_name,','.join(fields),','.join(args)) 99 # insert into userinfo(name,password) values(?,?) 100 sql = sql.replace('?','%s') 101 ms.execute(sql,values) 102 103 # 修改方法:基于已经存在了的数据进行一个修改操作 104 def update(self): 105 ms = Mysql() 106 # update userinfo set name='jason',password='123' where id = 1 107 fields = [] # [name,password] 108 values = [] 109 pr = None 110 for k,v in self.mappings.items(): 111 if v.primary_key: 112 pr = getattr(self,v.name,v.default) 113 else: 114 fields.append(v.name+'=?') 115 values.append(getattr(self,v.name,v.default)) 116 sql = 'update %s set %s where %s = %s'%(self.table_name,','.join(fields),self.primary_key,pr) 117 # update userinfo set name='?',password='?' where id = 1 118 sql = sql.replace('?','%s') 119 ms.execute(sql,values) 120 121 122 if __name__ == '__main__': 123 class Teacher(Models): 124 table_name = 'teacher' 125 tid = IntegerField(name='tid',primary_key=True) 126 tname = StringField(name='tname') 127 # obj = Teacher(tname='jason老师') 128 # obj.save() 129 # res = Teacher.select() 130 # for r in res: 131 # print(r.tname) 132 # print(res) 133 res = Teacher.select(tid=1) 134 teacher_obj = res[0] 135 teacher_obj.tname = 'jason老师' 136 teacher_obj.update() 137 res1 = Teacher.select() 138 print(res1) 139 # class User(Models): 140 # table_name = 'User' 141 # id = IntegerField(name='id', primary_key=True) 142 # name = StringField(name='name') 143 # password = StringField(name='password') 144 # print(User.primary_key) 145 # print(User.mappings) 146 # obj = User(name='jason') 147 # print(obj.table_name) 148 # print(obj.primary_key) 149 # print(obj.mappings)
方法二:单例
orm_singleton 文件夹
1 from orm_singleton.mysql_singleton import Mysql 2 3 4 # 定义字段类 5 class Field(object): 6 def __init__(self, name, column_type, primary_key, default): 7 self.name = name 8 self.column_type = column_type 9 self.primary_key = primary_key 10 self.default = default 11 12 13 # 定义具体的字段 14 class StringField(Field): 15 def __init__(self, name, column_type='varchar(255)', primary_key=False, default=None): 16 super().__init__(name, column_type, primary_key, default) 17 18 19 class IntegerField(Field): 20 def __init__(self, name, column_type='int', primary_key=False, default=None): 21 super().__init__(name, column_type, primary_key, default) 22 23 24 class ModelMetaClass(type): 25 def __new__(cls, class_name, class_bases, class_attrs): 26 # 我仅仅只想拦截模型表的类的创建过程 27 if class_name == 'Models': 28 return type.__new__(cls, class_name, class_bases, class_attrs) 29 # 给类放表名,主键字段,所有字段 30 table_name = class_attrs.get('table_name', class_name) 31 # 定义一个存储主键的变量 32 primary_key = None 33 # 定义一个字典用来存储用户自定义的表示表的所有字段信息 34 mappings = {} 35 # for循环当前类的名称空间 36 for k, v in class_attrs.items(): 37 if isinstance(v, Field): 38 mappings[k] = v 39 if v.primary_key: 40 if primary_key: 41 raise TypeError("主键只能有一个") 42 primary_key = v.name 43 # 将重复的键值对删除 44 for k in mappings.keys(): 45 class_attrs.pop(k) 46 if not primary_key: 47 raise TypeError('必须要有一个主键') 48 # 将处理好的数据放入class_attrs中 49 class_attrs['table_name'] = table_name 50 class_attrs['primary_key'] = primary_key 51 class_attrs['mappings'] = mappings 52 return type.__new__(cls, class_name, class_bases, class_attrs) 53 54 55 class Models(dict, metaclass=ModelMetaClass): 56 def __init__(self, **kwargs): 57 super().__init__(**kwargs) 58 59 def __getattr__(self, item): 60 return self.get(item, '没有该键值对') 61 62 def __setattr__(self, key, value): 63 self[key] = value 64 65 # 查询方法 66 @classmethod 67 def select(cls, **kwargs): 68 ms = Mysql.singleton() 69 # select * from userinfo 70 if not kwargs: 71 sql = 'select * from %s' % cls.table_name 72 res = ms.select(sql) 73 else: 74 # select * from userinfo where id = 1 75 k = list(kwargs.keys())[0] 76 v = kwargs.get(k) 77 sql = 'select * from %s where %s=?' % (cls.table_name, k) 78 # select * from userinfo where id = ? 79 sql = sql.replace('?', '%s') # select * from userinfo where id = %s 80 res = ms.select(sql,v) 81 if res: 82 return [ cls(**r) for r in res] # 将数据库的一条数据映射成类的对象 83 84 # 新增方法 85 def save(self): 86 ms = Mysql.singleton() 87 # insert into userinfo(name,password) values('jason','123') 88 # insert into %s(%s) values(?) 89 fields = [] # [name,password] 90 values = [] 91 args = [] 92 for k,v in self.mappings.items(): 93 if not v.primary_key: # 将id字段去除 因为新增一条数据 id是自动递增的不需要你传 94 fields.append(v.name) 95 args.append('?') 96 values.append(getattr(self,v.name)) 97 # insert into userinfo(name,password) values(?,?) 98 sql = "insert into %s(%s) values(%s)"%(self.table_name,','.join(fields),','.join(args)) 99 # insert into userinfo(name,password) values(?,?) 100 sql = sql.replace('?','%s') 101 ms.execute(sql,values) 102 103 # 修改方法:基于已经存在了的数据进行一个修改操作 104 def update(self): 105 ms = Mysql.singleton() 106 # update userinfo set name='jason',password='123' where id = 1 107 fields = [] # [name,password] 108 values = [] 109 pr = None 110 for k,v in self.mappings.items(): 111 if v.primary_key: 112 pr = getattr(self,v.name,v.default) 113 else: 114 fields.append(v.name+'=?') 115 values.append(getattr(self,v.name,v.default)) 116 sql = 'update %s set %s where %s = %s'%(self.table_name,','.join(fields),self.primary_key,pr) 117 # update userinfo set name='?',password='?' where id = 1 118 sql = sql.replace('?','%s') 119 ms.execute(sql,values) 120 121 122 if __name__ == '__main__': 123 class Teacher(Models): 124 table_name = 'teacher' 125 tid = IntegerField(name='tid',primary_key=True) 126 tname = StringField(name='tname') 127 # obj = Teacher(tname='jason老师') 128 # obj.save() 129 # res = Teacher.select() 130 # for r in res: 131 # print(r.tname) 132 # print(res) 133 res = Teacher.select(tid=1) 134 teacher_obj = res[0] 135 teacher_obj.tname = 'jerry老师' 136 teacher_obj.update() 137 138 139 140 # class User(Models): 141 # table_name = 'User' 142 # id = IntegerField(name='id', primary_key=True) 143 # name = StringField(name='name') 144 # password = StringField(name='password') 145 # print(User.primary_key) 146 # print(User.mappings) 147 # obj = User(name='jason') 148 # print(obj.table_name) 149 # print(obj.primary_key) 150 # print(obj.mappings)
1 import pymysql 2 3 4 class Mysql(object): 5 _instance = None 6 def __init__(self): 7 self.conn = pymysql.connect( 8 host = '127.0.0.1', 9 port = 3306, 10 user = 'root', 11 password = '123', 12 database = 'youku2', 13 charset = 'utf8', 14 autocommit = True 15 ) 16 self.cursor = self.conn.cursor(pymysql.cursors.DictCursor) 17 18 def close(self): 19 self.cursor.close() 20 self.conn.close() 21 22 def select(self,sql,args=None): 23 self.cursor.execute(sql,args) 24 res = self.cursor.fetchall() # 列表套字典 25 return res 26 27 def execute(self,sql,args): 28 try: 29 self.cursor.execute(sql,args) 30 except BaseException as e : 31 print(e) 32 33 @classmethod 34 def singleton(cls): 35 if not cls._instance: 36 cls._instance = cls() 37 return cls._instance