内容概要:
1.flask
- 蓝图
- 中间件
- 闪现
2.扩展
- session
- wtfrom
3.上下文管理
- local-threading
4.websocket
- 轮训
- 长轮训
- websocket
一.谈谈你对面向对象的理解
1.三大特性,继承,封装,多态(初级水平)
封装分两种:
(1).函数的封装
class db1():
def func():
pass
def func2():
pass
(2).数据的封装(一个类的对象)
2.__str__...方法
__str__,
__new__,
__init__,
__getattr__,
__setattr__,
__delattr__,
__getitem__,
__setitem__,
__delitem__,
__enter__,
__exit__,
__del__,
__call__,
__dict__,
这些方法有特有的执行场景,类加()执行__init__方法,对象加(),__call__方法....
****一个对象和一个对象能不能相加、减、乘、除
可以 加用__add__方法
class Foo(object): def __add__(self, other): pass obj1 = Foo() obj2 = Foo() res = obj1 + obj2
3.metaclass
1.创建类的两种方法
#创建类的方式一 class Foo(object): pass #创建类的方式二 #第一个参数是类名,第二个元祖是继承谁,第三个字典是构造字段 Bar = type("MyFoo",(object,),{}) g = Bar() print(g)
2.验证类是由type创建
class Mytype(type): def __init__(self,*args,**kwargs): print("from my type") super(Mytype,self).__init__(*args,**kwargs) class Foo(metaclass=Mytype): pass g = Foo() """result from my type """
二、flask-蓝图
1.目录结构:
2.__init__.py
from flask import Flask app = Flask(__name__) from .views import account from .views import user app.register_blueprint(account.ac) #注册之后可以进行路由分发 app.register_blueprint(user.us) """ 这里特殊的装饰器,是全局app,每个function都需要经过 @app.before_request def check_login(): print('.....') """
3.user.py
from flask import Blueprint us = Blueprint('us',__name__,url_prefix='/xx') #生成蓝图对象,url_prefix是前缀,加上之后请求index就变为/xx/index #这里是局部的装饰器,用于特定视图需要权限等 # @us.before_request # def check_login(): # print('.....') @us.route('/index') def index(): return 'index'
4.manage.py
from pro_flask import app if __name__ == '__main__': app.run()
三、flask-闪现
1.用于1次请求之后删除,基于session,取的时候用pop
from flask import Blueprint,redirect,request,flash,get_flashed_messages ac = Blueprint('ac',__name__) @ac.route('/login') def login(): flash("登录成功1",category="x1") #category多了一层分组 flash("登录成功2",category="x2") return redirect("/logout") @ac.route('/logout') def logout(): res = get_flashed_messages(category_filter="x1") #默认不加category_filter取所有 print(res) return 'logout'
2.get_flask_message源码
def get_flashed_messages(with_categories=False, category_filter=[]): flashes = _request_ctx_stack.top.flashes if flashes is None: #存入local里的stack里对象的flashes _request_ctx_stack.top.flashes = flashes = session.pop('_flashes') if '_flashes' in session else [] if category_filter: flashes = list(filter(lambda f: f[0] in category_filter, flashes)) if not with_categories: return [x[1] for x in flashes] return flashes
三、flask-middleware
from flask import Flask app = Flask(__name__) @app.route('/user',methods=['GET','POST'],endpoint='xxx') def user(): return "login" class MiddleWare(object): #主要理解对象加括号执行__call__方法 def __init__(self,old_wsgi_app): self.old_wsgi_app = old_wsgi_app def __call__(self, *args, **kwargs): #这是个时候还没有request print("我做一些数据库连接check,或者清理缓存操作") return self.old_wsgi_app(*args,**kwargs) if __name__ == '__main__': app.wsgi_app = MiddleWare(app.wsgi_app) app.run("0.0.0.0",9999)
四、flask-session
from flask import Flask,session app = Flask(__name__) #如下几行操作就成功将session写入redis中了 from flask.ext.session import Session from redis import Redis app.config["SESSION_TYPE"] = 'redis' app.config["SESSION_REDIS"] = Redis(host='192.16.1.1',port="6379",) Session(app) @app.route('/user',methods=['GET','POST'],endpoint='xxx') def user(): return "login" if __name__ == '__main__': app.run("0.0.0.0",9999)
源码剖析:
#session_interface = RedisSessionInterface() #程序刚开始加载时候执行 def _get_interface(self, app): config = app.config.copy() config.setdefault('SESSION_TYPE', 'null') config.setdefault('SESSION_PERMANENT', True) config.setdefault('SESSION_USE_SIGNER', False) config.setdefault('SESSION_KEY_PREFIX', 'session:') config.setdefault('SESSION_REDIS', None) config.setdefault('SESSION_MEMCACHED', None) config.setdefault('SESSION_FILE_DIR', os.path.join(os.getcwd(), 'flask_session')) config.setdefault('SESSION_FILE_THRESHOLD', 500) config.setdefault('SESSION_FILE_MODE', 384) config.setdefault('SESSION_MONGODB', None) config.setdefault('SESSION_MONGODB_DB', 'flask_session') config.setdefault('SESSION_MONGODB_COLLECT', 'sessions') config.setdefault('SESSION_SQLALCHEMY', None) config.setdefault('SESSION_SQLALCHEMY_TABLE', 'sessions') if config['SESSION_TYPE'] == 'redis': session_interface = RedisSessionInterface( config['SESSION_REDIS'], config['SESSION_KEY_PREFIX'], config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT']) elif config['SESSION_TYPE'] == 'memcached': session_interface = MemcachedSessionInterface( config['SESSION_MEMCACHED'], config['SESSION_KEY_PREFIX'], config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT']) elif config['SESSION_TYPE'] == 'filesystem': session_interface = FileSystemSessionInterface( config['SESSION_FILE_DIR'], config['SESSION_FILE_THRESHOLD'], config['SESSION_FILE_MODE'], config['SESSION_KEY_PREFIX'], config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT']) elif config['SESSION_TYPE'] == 'mongodb': session_interface = MongoDBSessionInterface( config['SESSION_MONGODB'], config['SESSION_MONGODB_DB'], config['SESSION_MONGODB_COLLECT'], config['SESSION_KEY_PREFIX'], config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT']) elif config['SESSION_TYPE'] == 'sqlalchemy': session_interface = SqlAlchemySessionInterface( app, config['SESSION_SQLALCHEMY'], config['SESSION_SQLALCHEMY_TABLE'], config['SESSION_KEY_PREFIX'], config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT']) else: session_interface = NullSessionInterface() return session_interface
程序执行session["xx"]=123时候先执行session_interface的open_session方法:
def open_session(self, app, request): sid = request.cookies.get(app.session_cookie_name) if not sid: sid = self._generate_sid() #根据uuid生成一个随机字符串 #第一次登陆进,返回{"sid":"asdasdada","xx":123} return self.session_class(sid=sid, permanent=self.permanent) if self.use_signer: signer = self._get_signer(app) if signer is None: return None try: sid_as_bytes = signer.unsign(sid) sid = sid_as_bytes.decode() except BadSignature: sid = self._generate_sid() return self.session_class(sid=sid, permanent=self.permanent) if not PY2 and not isinstance(sid, text_type): sid = sid.decode('utf-8', 'strict') val = self.redis.get(self.key_prefix + sid) if val is not None: try: data = self.serializer.loads(val) return self.session_class(data, sid=sid) except: return self.session_class(sid=sid, permanent=self.permanent) return self.session_class(sid=sid, permanent=self.permanent)
程序返回之前会执行save_session动作:
def save_session(self, app, session, response): domain = self.get_cookie_domain(app) path = self.get_cookie_path(app) if not session: if session.modified: self.redis.delete(self.key_prefix + session.sid) response.delete_cookie(app.session_cookie_name, domain=domain, path=path) return httponly = self.get_cookie_httponly(app) secure = self.get_cookie_secure(app) expires = self.get_expiration_time(app, session) val = self.serializer.dumps(dict(session)) self.redis.setex(name=self.key_prefix + session.sid, value=val, time=total_seconds(app.permanent_session_lifetime)) #往redis里写数据ex为过期时间 if self.use_signer: session_id = self._get_signer(app).sign(want_bytes(session.sid)) else: session_id = session.sid #最后将随机字符串写到cookie里 response.set_cookie(app.session_cookie_name, session_id, expires=expires, httponly=httponly, domain=domain, path=path, secure=secure)
五、上下文管理
1.threading.local
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading #本地线程,保证即使是多个线程,自己的值也是互相隔离。 local_values = threading.local() def func(num): #创建数据库连接 local_values.name = num import time time.sleep(1) print(local_values.name, threading.current_thread().name) for i in range(20): th = threading.Thread(target=func, args=(i,), name='线程%s' % i) th.start()
2.自定义local类实现本地线程,也可以用setitem
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading from threading import get_ident #通过自定义模拟本地线程原理 class Local(object): def __init__(self): object.__setattr__(self, 'storage', {}) #self.storage = {} #这样设置就会产生递归,触发settattr def __setattr__(self, key, value): ident = get_ident() if ident in self.storage: self.storage[ident][key] = value else: self.storage[ident] = {key: value} def __getattr__(self, item): ident = get_ident() return self.storage[ident][item] obj = Local() def func(num): #创建数据库连接 obj.value = num import time time.sleep(1) print(obj.value, threading.current_thread().name) for i in range(20): th = threading.Thread(target=func, args=(i,), name='线程%s' % i) th.start()
3.带协程判断加入
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading from threading import get_ident try: #协程获取唯一id方法 from greenlet import getcurrent as get_ident except ImportError: try: from thread import get_ident except ImportError: from _thread import get_ident #通过自定义模拟本地线程原理 class Local(object): def __init__(self): object.__setattr__(self, 'storage', {}) #self.storage = {} #这样设置就会产生递归,触发settattr def __setattr__(self, key, value): ident = get_ident() if ident in self.storage: self.storage[ident][key] = value else: self.storage[ident] = {key: value} def __getattr__(self, item): ident = get_ident() return self.storage[ident][item] obj = Local() def func(num): #创建数据库连接 obj.value = num import time time.sleep(1) print(obj.value, threading.current_thread().name) for i in range(20): th = threading.Thread(target=func, args=(i,), name='线程%s' % i) th.start()