- 保证一个类只有一个实例,并提供一个访问它的全局访问点
- 单例
- 当类只有一个实例而且客户可以从一个众所周知的访问点访问它时
- 比如:数据库链接、Socket创建链接
- 对唯一实例的受控访问
- 单利相当于全局变量,但防止了命名空间被污染
s1.py class Foo(object): def test(self): print("123") v = Foo() #v是Foo的实例 s2.py from s1 import v as v1 print(v1,id(v1)) #<s1.Foo object at 0x0000000002221710> 35788560 from s1 import v as v2 print(v1,id(v2)) #<s1.Foo object at 0x0000000002221710> 35788560 # 两个的内存地址是一样的 # 文件加载的时候,第一次导入后,再次导入时不会再重新加载
# ======================单例模式:无法支持多线程情况=============== class Singleton(object): def __init__(self): import time time.sleep(1) @classmethod def instance(cls, *args, **kwargs): if not hasattr(Singleton, "_instance"): Singleton._instance = Singleton(*args, **kwargs) return Singleton._instance import threading def task(arg): obj = Singleton.instance() print(obj) for i in range(10): t = threading.Thread(target=task,args=[i,]) t.start() # ====================单例模式:支持多线程情况================、 import time import threading class Singleton(object): _instance_lock = threading.Lock() def __init__(self): time.sleep(1) @classmethod def instance(cls, *args, **kwargs): if not hasattr(Singleton, "_instance"): with Singleton._instance_lock: #为了保证线程安全在内部加锁 if not hasattr(Singleton, "_instance"): Singleton._instance = Singleton(*args, **kwargs) return Singleton._instance def task(arg): obj = Singleton.instance() print(obj) for i in range(10): t = threading.Thread(target=task,args=[i,]) t.start() time.sleep(20) obj = Singleton.instance() print(obj) # 使用先说明,以后用单例模式,obj = Singleton.instance() # 示例: # obj1 = Singleton.instance() # obj2 = Singleton.instance() # print(obj1,obj2) # 错误示例 # obj1 = Singleton() # obj2 = Singleton() # print(obj1,obj2)
# =============单线程下执行=============== import threading class Singleton(object): _instance_lock = threading.Lock() def __init__(self): pass def __new__(cls, *args, **kwargs): if not hasattr(Singleton, "_instance"): with Singleton._instance_lock: if not hasattr(Singleton, "_instance"): # 类加括号就回去执行__new__方法,__new__方法会创建一个类实例:Singleton() Singleton._instance = object.__new__(cls) # 继承object类的__new__方法,类去调用方法,说明是函数,要手动传cls return Singleton._instance #obj1 #类加括号就会先去执行__new__方法,在执行__init__方法 # obj1 = Singleton() # obj2 = Singleton() # print(obj1,obj2) # ===========多线程执行单利============ def task(arg): obj = Singleton() print(obj) for i in range(10): t = threading.Thread(target=task,args=[i,]) t.start() # 使用先说明,以后用单例模式,obj = Singleton() # 示例 # obj1 = Singleton() # obj2 = Singleton() # print(obj1,obj2)
""" 1.对象是类创建,创建对象时候类的__init__方法自动执行,对象()执行类的 __call__ 方法 2.类是type创建,创建类时候type的__init__方法自动执行,类() 执行type的 __call__方法(类的__new__方法,类的__init__方法) # 第0步: 执行type的 __init__ 方法【类是type的对象】 class Foo: def __init__(self): pass def __call__(self, *args, **kwargs): pass # 第1步: 执行type的 __call__ 方法 # 1.1 调用 Foo类(是type的对象)的 __new__方法,用于创建对象。 # 1.2 调用 Foo类(是type的对象)的 __init__方法,用于对对象初始化。 obj = Foo() # 第2步:执行Foo的 __call__ 方法 obj() """ # ===========类的执行流程================ class SingletonType(type): def __init__(self,*args,**kwargs): print(self) #会不会打印? #<class '__main__.Foo'> super(SingletonType,self).__init__(*args,**kwargs) def __call__(cls, *args, **kwargs): #cls = Foo obj = cls.__new__(cls, *args, **kwargs) obj.__init__(*args, **kwargs) return obj class Foo(metaclass=SingletonType): def __init__(self,name): self.name = name def __new__(cls, *args, **kwargs): return object.__new__(cls, *args, **kwargs) ''' 1、对象是类创建的,创建对象时类的__init__方法会自动执行,对象()执行类的__call__方法 2、类是type创建的,创建类时候type类的__init__方法会自动执行,类()会先执行type的__call__方法(调用类的__new__,__init__方法) Foo 这个类是由SingletonType这个类创建的 ''' obj = Foo("hiayan") # ============第三种方式实现单例模式================= import threading class SingletonType(type): _instance_lock = threading.Lock() def __call__(cls, *args, **kwargs): if not hasattr(cls, "_instance"): with SingletonType._instance_lock: if not hasattr(cls, "_instance"): cls._instance = super(SingletonType,cls).__call__(*args, **kwargs) return cls._instance class Foo(metaclass=SingletonType): def __init__(self,name): self.name = name obj1 = Foo('name') obj2 = Foo('name') print(obj1,obj2)
import pymysql import threading from DBUtils.PooledDB import PooledDB class SingletonDBPool(object): _instance_lock = threading.Lock() def __init__(self): self.pool = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) def __new__(cls, *args, **kwargs): if not hasattr(SingletonDBPool, "_instance"): with SingletonDBPool._instance_lock: if not hasattr(SingletonDBPool, "_instance"): SingletonDBPool._instance = object.__new__(cls, *args, **kwargs) return SingletonDBPool._instance def connect(self): return self.pool.connection()
from pool import SingletonDBPool def run(): pool = SingletonDBPool() conn = pool.connect() # xxxxxx cursor = conn.cursor() cursor.execute("select * from td where id=%s", [5, ]) result = cursor.fetchall() # 获取数据 cursor.close() conn.close() if __name__ == '__main__': run()
def wrapper(cls): instance = {} def inner(*args,**kwargs): if cls not in instance: instance[cls] = cls(*args,**kwargs) return instance[cls] return inner @wrapper class Singleton(object): def __init__(self,name,age): self.name = name self.age = age obj1 = Singleton('haiyan',22) obj2 = Singleton('xx',22) print(obj1) print(obj2)