from my_signleton import MySignleton
# 字段类型的属性
class Field(object):
def __init__(self,name,column_type,primary_key,default):
self.name = name
self.column_type = column_type
self.primary_key = primary_key
self.default = default
# 数字类型字段
class IntegerField(Field):
def __init__(self,name,column_type="int",primary_key=False,default=0):
super().__init__(name,column_type,primary_key,default)
# 字符串类型字段
class StringField(Field):
def __init__(self,name,column_type="varchar(255)",primary_key=False,default=""):
super().__init__(name,column_type,primary_key,default)
class Field(object):
def __init__(self,name,column_type,primary_key,default):
self.name = name
self.column_type = column_type
self.primary_key = primary_key
self.default = default
# 数字类型字段
class IntegerField(Field):
def __init__(self,name,column_type="int",primary_key=False,default=0):
super().__init__(name,column_type,primary_key,default)
# 字符串类型字段
class StringField(Field):
def __init__(self,name,column_type="varchar(255)",primary_key=False,default=""):
super().__init__(name,column_type,primary_key,default)
class MyMetaClass(type):
def __new__(cls, class_name, class_bases,class_attrs):
# 我们要拦截是模型表的创建过程,而下面的Model并不需要拦截,所以先做一层判断
if class_name == "Model":
return type.__new__(class_name,class_bases,class_attrs)
# 首先我们要获取的是表名,主键字段名,其他字段名
table_name = class_attrs.get("table_name")
primary_key = None
mappings = {}
for k, v in class_attrs.items():
# 判断出来的都是上面的字段对象
if isinstance(v,Field):
# 判断出主键对象
if v.primary_key:
# 如果已经存在主键那就报个错
if primary_key:
raise TypeError("主键只能由一个")
primary_key = v
# 剩下的就是其他字段,全都添加到mappings中
mappings[k] = v
# 取出所有字段对象后,class_attrs中就没必要继续存在了,要删掉
for k in mappings.keys():
class_attrs.pop(k)
# 如果一通操作下来还是没有主键,就抛个异常
if not primary_key:
raise TypeError("必须要有一个主键")
# 接下来把我们获得到的几个属性(table_name,primary_key,mappings)添加到名称空间(class_attrs)中
class_attrs["table_name"] = table_name
class_attrs["primary_key"] = primary_key
class_attrs["mappings"] = mappings
# 最后调用type中的new方法
return type.__new__(class_name,class_bases,class_attrs)
def __new__(cls, class_name, class_bases,class_attrs):
# 我们要拦截是模型表的创建过程,而下面的Model并不需要拦截,所以先做一层判断
if class_name == "Model":
return type.__new__(class_name,class_bases,class_attrs)
# 首先我们要获取的是表名,主键字段名,其他字段名
table_name = class_attrs.get("table_name")
primary_key = None
mappings = {}
for k, v in class_attrs.items():
# 判断出来的都是上面的字段对象
if isinstance(v,Field):
# 判断出主键对象
if v.primary_key:
# 如果已经存在主键那就报个错
if primary_key:
raise TypeError("主键只能由一个")
primary_key = v
# 剩下的就是其他字段,全都添加到mappings中
mappings[k] = v
# 取出所有字段对象后,class_attrs中就没必要继续存在了,要删掉
for k in mappings.keys():
class_attrs.pop(k)
# 如果一通操作下来还是没有主键,就抛个异常
if not primary_key:
raise TypeError("必须要有一个主键")
# 接下来把我们获得到的几个属性(table_name,primary_key,mappings)添加到名称空间(class_attrs)中
class_attrs["table_name"] = table_name
class_attrs["primary_key"] = primary_key
class_attrs["mappings"] = mappings
# 最后调用type中的new方法
return type.__new__(class_name,class_bases,class_attrs)
# 设计一个模型类,可以替代表
# (可以点出属性,也可以点设置属性,还可以实例化括号里传可变长生成对象)
# 不管传多少参数都能实例化出对象,参考字典类,我们想用这方法只需要继承字典类
class Model(dict,metaclass=MyMetaClass):
#字典类打印出来的虽然是一个字典里面有键值对
#但这
# (可以点出属性,也可以点设置属性,还可以实例化括号里传可变长生成对象)
# 不管传多少参数都能实例化出对象,参考字典类,我们想用这方法只需要继承字典类
class Model(dict,metaclass=MyMetaClass):
#字典类打印出来的虽然是一个字典里面有键值对
#但这
单例
import pymysql
# 我在这需要定义一个类,让orm调用我产生对象
# 然后通过我里面的方法把他的和数据库操作对应上
# 先连接上数据库并且生成操作数据库对象cursor
conn = pymysql.connect(host="localhost", user="root", password="hsjqqq", charset="utf8",
database="youku",autocommit=True)
cursor = conn.cursor(pymysql.cursors.DictCursor)
# 然后通过我里面的方法把他的和数据库操作对应上
# 先连接上数据库并且生成操作数据库对象cursor
conn = pymysql.connect(host="localhost", user="root", password="hsjqqq", charset="utf8",
database="youku",autocommit=True)
cursor = conn.cursor(pymysql.cursors.DictCursor)
class MySingleton(object):
_instence = None
# 把连接直接写在init方法中,这样只要生成对象就会连接到数据库
# 而不是一导入模块就会连接数据库
def __init__(self):
self.conn = pymysql.connect(
host="localhost", user="root", password="hsjqqq", charset="utf8",
database="youku", autocommit=True
)
self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
# 传过来sql语句和需要传入的参数(防止sql注入)
def select(self,sql,arg=None):
# execute(self, query, args=None):这个函数中本来就有一个arg默认None
# 所以不管arg有没有传值,我们都可以往execute中取丢
self.cursor.execute(sql,arg)
back_list = self.cursor.fetchall()
return back_list # 返回查询到的所有的字典列表
_instence = None
# 把连接直接写在init方法中,这样只要生成对象就会连接到数据库
# 而不是一导入模块就会连接数据库
def __init__(self):
self.conn = pymysql.connect(
host="localhost", user="root", password="hsjqqq", charset="utf8",
database="youku", autocommit=True
)
self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
# 传过来sql语句和需要传入的参数(防止sql注入)
def select(self,sql,arg=None):
# execute(self, query, args=None):这个函数中本来就有一个arg默认None
# 所以不管arg有没有传值,我们都可以往execute中取丢
self.cursor.execute(sql,arg)
back_list = self.cursor.fetchall()
return back_list # 返回查询到的所有的字典列表
def save(self,sql,args):
try:
self.cursor.execute(sql,args)
except Exception as e:
print(e)
# 为了不让每生成一个对象取内存地址开辟一个空间,这里我们用单例模式
@classmethod
def single(cls):
# 先给类一个变量用来存在生成的对象,
# 如果找不到这个对象,那就表名这是第一次产生对象
if not cls._instence:
cls._instence = cls()
# 然后他们每次想生成对象,只需要从我这个变量中拿取即可
return cls._instence
try:
self.cursor.execute(sql,args)
except Exception as e:
print(e)
# 为了不让每生成一个对象取内存地址开辟一个空间,这里我们用单例模式
@classmethod
def single(cls):
# 先给类一个变量用来存在生成的对象,
# 如果找不到这个对象,那就表名这是第一次产生对象
if not cls._instence:
cls._instence = cls()
# 然后他们每次想生成对象,只需要从我这个变量中拿取即可
return cls._instence
singletion
线程池版本
from DBUtils.PooledDB import PooledDB
import pymysql
import pymysql
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3,
# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='hsjqqq',
database='youku',
charset='utf8',
autocommit='True'
)
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3,
# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='hsjqqq',
database='youku',
charset='utf8',
autocommit='True'
)
class MySingleton(object):
_instence = None
# 把连接直接写在init方法中,这样只要生成对象就会连接到数据库
# 而不是一导入模块就会连接数据库
def __init__(self):
self.conn = POOL.connection()
self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
_instence = None
# 把连接直接写在init方法中,这样只要生成对象就会连接到数据库
# 而不是一导入模块就会连接数据库
def __init__(self):
self.conn = POOL.connection()
self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
# 传过来sql语句和需要传入的参数(防止sql注入)
def select(self,sql,arg=None):
# execute(self, query, args=None):这个函数中本来就有一个arg默认None
# 所以不管arg有没有传值,我们都可以往execute中取丢
self.cursor.execute(sql,arg)
back_list = self.cursor.fetchall()
return back_list # 返回查询到的所有的字典列表
def select(self,sql,arg=None):
# execute(self, query, args=None):这个函数中本来就有一个arg默认None
# 所以不管arg有没有传值,我们都可以往execute中取丢
self.cursor.execute(sql,arg)
back_list = self.cursor.fetchall()
return back_list # 返回查询到的所有的字典列表
def save(self,sql,args):
try:
self.cursor.execute(sql,args)
except Exception as e:
print(e)
try:
self.cursor.execute(sql,args)
except Exception as e:
print(e)
single_pool