一、单例模式
单例模式分为四种:基于文件的单例模式,基于类方法的单例模式,基于__new__的单例模式,基于metaclass的单例模式
1. 基于类方法的单例模式
- 不支持多线程模式
import threading class Singleton(object): def __init__(self): pass import time time.sleep(1) @classmethod def instance(cls,*args,**kwargs): if not hasattr(Singleton,'_instance'): Singleton._instance = Singleton(*args,**kwargs) return Singleton._instance def task(arg): obj = Singleton.instance() print(obj) for i in range(10): t = threading.Thread(target=task,args=[i,]) t.start()
- 支持多线程模式,加锁(线程锁),保证线程安全
import time import threading class Singleton(object): _instance_lock = threading.Lock() # 线程锁 def __init__(self): time.sleep(1) @classmethod def instance(cls, *args, **kwargs): if not hasattr(Singleton, "_instance"): # 当有了_instance时,则直接返回,不用加锁。 with Singleton._instance_lock: # 当一个线程进来就会加锁,只有该线程执行完毕,其他线程等待 if not hasattr(Singleton, "_instance"): Singleton._instance = Singleton(*args, **kwargs) return Singleton._instance def task(arg): obj = Singleton.instance() print(obj) for i in range(10): t = threading.Thread(target=task,args=[i,]) t.start() # 之后的使用不用加锁,都直接使用_instance time.sleep(20) obj = Singleton.instance() print('#',obj)
2. 基于__new__的单例模式
类单例模式需要,调用类方法,实例化时会先调用__new__
import time import threading class Singleton(object): _instance_lock = threading.Lock() def __init__(self): # print('init',self) pass def __new__(cls, *args, **kwargs): if not hasattr(Singleton,'_instance'): with Singleton._instance_lock: if not hasattr(Singleton,'_instance'): Singleton._instance = object.__new__(cls,*args,**kwargs) return Singleton._instance obj1 = Singleton() obj2 = Singleton() print(obj1) print(obj2)
3. 基于metaclass方法实现的单例模式
对象是类创建,创建对象的时候类的__init__方法自动执行,对象()执行类的 __call__ 方法
类是type创建,创建类时候type的__init__方法自动执行,类() 执行type的 __call__方法(类的__new__方法,类的__init__方法)
- metaclass原理
# 继承type,模仿type的功能 class SingletonType(type): def __init__(self,*args,**kwargs): super(SingletonType,self).__init__(*args,**kwargs) def __call__(cls, *args, **kwargs): obj = cls.__new__(cls,*args,**kwargs) # 1.1创建对象 cls.__init__(obj,*args,**kwargs) # 1.2初始化对象 return obj # 第0步:将代码加载到内存中,编译类,执行type的__init__方法(类是type的对象) class Foo(metaclass=SingletonType): # 指定metaclass def __init__(self,name): self.name = name def __new__(cls, *args, **kwargs): return object.__new__(cls,*args,**kwargs) # 第一步:执行type的__call__方法(类=type的对象) # 1.1 调用Foo类(type的对象)的__new__方法,用于创建对象。 # 1.2 调用Foo类(type的对象)的__init__方法,用于对象初始化。 obj = Foo() # 第二步:执行Foo的__call__方法 obj()
- metaclass实现单例模式
import threading class SingletonType(type): _instance_lock = threading.Lock() def __call__(cls, *args, **kwargs): if not hasattr(cls,'_instance'): with SingletonType._instance_lock: if not hasattr(cls,'_instance'): cls._instance = super(SingletonType,cls).__call__(*args,**kwargs) return cls._instance # 如需单例模式加上metaclass=SingletonType即可 class Foo(metaclass=SingletonType): def __init__(self,name): self.name = name obj1 = Foo('name') obj2 = Foo('name') print(obj1) print(obj2)
4 .单例模式数据库连接池应用
# pool.py import pymysql import threading from DBUtils.PooledDB import PooledDB, SharedDBConnection class SingletonDBPool(object): _instance_lock = threading.Lock() def __init__(self): # print('init',self) self.pool = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123456', database='flask_test', charset='utf8' ) def __new__(cls, *args, **kwargs): if not hasattr(SingletonDBPool,'_instance'): with SingletonDBPool._instance_lock: if not hasattr(SingletonDBPool,'_instance'): SingletonDBPool._instance = object.__new__(cls,*args,**kwargs) return SingletonDBPool._instance def connect(self): return self.pool.connection() # app.py from pool import SingletonDBPool def run(): pool = SingletonDBPool conn = pool.connect() # 数据库连接 con.close() # 并没有真正断开连接 if __name__ == '__main__': run()
二、session
1. 源码解析
# 1. 执行Flask类的__call__ class RequestContext(object): def __init__(self,environ): self.environ = environ def push(self): # 3 # 请求相关数据,加到local中: stack.push... _request_ctx_stack.push(self) # 获取cookie中的随机字符串,检验是否有,没有就生成 # 根据随机字符串,获取服务端保存的session的 # { # 'xxxxxxx': {...} # 'xxxxxxx': {...} # } # 新用户: {} # 老用户:{user:'xxx'} self.session = self.app.open_session(self.request) if self.session is None: self.session = self.app.make_null_session() class Flask: def process_response(self, response): # 8 # 执行 after_request装饰器 for handler in funcs: response = handler(response) # 将内存中的session持久化到:数据库、.... if not self.session_interface.is_null_session(ctx.session): self.save_session(ctx.session, response) return response def finalize_request(self, rv, from_error_handler=False): # 7 response = self.make_response(rv) try: response = self.process_response(response) request_finished.send(self, response=response) except Exception: if not from_error_handler: raise self.logger.exception('Request finalizing failed with an ' 'error while handling an error') return response def full_dispatch_request(self): # 5 # 触发只执行一次的装饰器函数,@before_first_request self.try_trigger_before_first_request_functions() try: # 触发Flask的信号,没用: pip3 install blinker request_started.send(self) # 执行特殊装饰器:before_request # 如果没有返回值,rv=None;有返回值 “嘻嘻嘻” rv = self.preprocess_request() if rv is None: # 触发执行视图函数 rv = self.dispatch_request() except Exception as e: rv = self.handle_user_exception(e) # 6 对返回值进行封装 return self.finalize_request(rv) def wsgi_app(self, environ, start_response): # 处理request,将请求添加到local中 ctx = self.request_context(environ) # 2.处理request和session ctx.push() error = None try: try: # 4 执行视图函数 response = self.full_dispatch_request() except Exception as e: error = e response = self.handle_exception(e) except: error = sys.exc_info()[1] raise return response(environ, start_response) finally: if self.should_ignore_error(error): error = None # 9 ctx.auto_pop(error) def __call__(self, environ, start_response): """Shortcut for :attr:`wsgi_app`.""" # 1.xxx return self.wsgi_app(environ, start_response)
2. 自定义session
from flask import Flask,request,session app = Flask(__name__) app.secret_key = 'sdfsdfsd' from flask.sessions import SessionInterface,SessionMixin import uuid import json from flask.sessions import SessionInterface from flask.sessions import SessionMixin from itsdangerous import Signer, BadSignature, want_bytes class MySession(dict, SessionMixin): def __init__(self, initial=None, sid=None): self.sid = sid self.initial = initial super(MySession, self).__init__(initial or ()) def __setitem__(self, key, value): super(MySession, self).__setitem__(key, value) def __getitem__(self, item): return super(MySession, self).__getitem__(item) def __delitem__(self, key): super(MySession, self).__delitem__(key) class MySessionInterface(SessionInterface): session_class = MySession container = { # 'asdfasdfasdfas':{'k1':'v1','k2':'v2'} # 'asdfasdfasdfas':"{'k1':'v1','k2':'v2'}" } def __init__(self): pass # import redis # self.redis = redis.Redis() def _generate_sid(self): return str(uuid.uuid4()) def _get_signer(self, app): if not app.secret_key: return None return Signer(app.secret_key, salt='flask-session', key_derivation='hmac') def open_session(self, app, request): """ 程序刚启动时执行,需要返回一个session对象 """ sid = request.cookies.get(app.session_cookie_name) if not sid: # 生成随机字符串,并将随机字符串添加到 session对象中 sid = self._generate_sid() return self.session_class(sid=sid) signer = self._get_signer(app) try: sid_as_bytes = signer.unsign(sid) sid = sid_as_bytes.decode() except BadSignature: sid = self._generate_sid() return self.session_class(sid=sid) # session保存在redis中 # val = self.redis.get(sid) # session保存在内存中 val = self.container.get(sid) if val is not None: try: data = json.loads(val) return self.session_class(data, sid=sid) except: return self.session_class(sid=sid) return self.session_class(sid=sid) def save_session(self, app, session, response): """ 程序结束前执行,可以保存session中所有的值 如: 保存到resit 写入到用户cookie """ domain = self.get_cookie_domain(app) path = self.get_cookie_path(app) httponly = self.get_cookie_httponly(app) secure = self.get_cookie_secure(app) expires = self.get_expiration_time(app, session) val = json.dumps(dict(session)) # session保存在redis中 # self.redis.setex(name=session.sid, value=val, time=app.permanent_session_lifetime) # session保存在内存中 self.container.setdefault(session.sid, val) session_id = self._get_signer(app).sign(want_bytes(session.sid)) response.set_cookie(app.session_cookie_name, session_id, expires=expires, httponly=httponly, domain=domain, path=path, secure=secure) app.session_interface = MySessionInterface() # app.session_interface = Foo() # app.session_interface # app.make_null_session() @app.route('/index') def index(): print('网站的所有session',MySessionInterface.container) print(session) session['k1'] = 'v1' session['k2'] = 'v2' del session['k1'] # 在内存中操作字典.... # session['k1'] = 'v1' # session['k2'] = 'v2' # del session['k1'] return "xx" if __name__ == '__main__': app.__call__ app.run()