面试的时候聊: 1. Flask中令你影响深刻的是什么? - 路由系统 - 装饰器,带参数的装饰器 - 额外装饰器 - 特殊的装饰器 2. 有没有遇到印象深刻: - 本地线程 - 最大共享数(文档中写的是最大共享数,但是看源码实现时发现pymysql.threadsafety=1有关),无用。
1. flask知识点: - flask依赖wsgi,实现wsgi协议的模块:wsgiref(django),werkzeug,uwsgi
- 创建Flask对象 app = Flask(__name__)
def __init__(self, import_name, static_url_path=None,
static_folder='static', template_folder='templates',
instance_path=None, instance_relative_config=False,
root_path=None):
- 当前模块名 - 静态文件文件前缀 - 静态文件文件夹位置 - 模板路径 - 配置文件寻找位置: from flask import Flask app = Flask(__name__,instance_path=None, instance_relative_config=True) #只有当 引用文件的方式是 配置文件时,instance这两个参数才有用 #instance_path是指定 从哪个位置开始找settings文件
#__name__ 是当前模块名,当模块被直接运行时模块名为 __main__ 。这句话的意思就是,当模块被直接运行时,以下代码块将被运行,当模块是被导入时,代码块不被运行。
app.config.from_pyfile('settings.py') # C:UsersAdministratorPycharmProjectss6day1201.实例化补充 if __name__ == '__main__': app.run() - 配置文件 - 推荐使用对象方式 方式一: app.config['SESSION_COOKIE_NAME'] = 'session_lvning' # 方式二: app.config.from_pyfile('settings.py') settings.py : XXX=123 方式三: import os os.environ['FLAKS-SETTINGS'] = 'settings.py' app.config.from_envvar('FLAKS-SETTINGS') 方式四: app.config.from_object('settings.DevConfig') settings.py : class BaseConfig(object): NNN = 123 class TestConfig(BaseConfig): DB = '127.0.0.1' class DevConfig(BaseConfig): DB = '192.168.1.1' class ProConfig(BaseConfig): DB = '47.18.1.1' - 路由系统 - 通过带参数的装饰器实现的路由关系 注意:其他的装饰器要写在路由装饰器下面 - 两种形式: 第一种: @app.route('/xxxx') def index(): return "Index" 第二种: def index(): return "Index" app.add_url_rule('/xxx', "n1", index) #n1 是别名 - 将函数和url封装到一个 Rule对象 - 将Role对象添加到 app.url_map(Map对象) - 参数: (url路径,endpoint,视图函数名,method=[],default,redirect_to,strict_slashes,subdomain) endpoint 是路由的别名,不写时默认是被装饰的函数名; 反向生成用url_for 来实现 url_for("aaa",**dic)或者 url_for("aaa",x=1) method 允许的请求方式 /index/<int:nid> 不写类型的时候默认是 字符串 def index(nid): print(nid) defaults=None, 默认值,当URL中无参数,函数需要参数时,使用defaults={'k':'v'}为函数提供参数 strict_slashes=None, 对URL最后的 / 符号是否严格要求, redirect_to 直接重定向,跳转的url需要参数时,也得传参,注意:不用加类型 redirect_to("/index/<nid>") subdomain=None 二级域名 :详见day118-06 首先你要有一个主域名 动态的二级域名 - 在本地hosts文件中找IP C:WindowsSystem32driversetc ios/lenux系统是在/etc/hosts from flask import Flask,render_template,request,redirect,session,url_for app = Flask(__name__) app.config['SERVER_NAME'] = 'bjg.com:5000' @app.route("/index",subdomain='<xxxxx>') def index(xxxxx): return "%s.bjg.com" %(xxxxx,) if __name__ == '__main__': app.run() - 扩展Flask的路由系统,让他支持正则 from flask import Flask,url_for app = Flask(__name__) # 定义转换的类 from werkzeug.routing import BaseConverter class RegexConverter(BaseConverter): """ 自定义URL匹配正则表达式 """ def __init__(self, map, regex): super(RegexConverter, self).__init__(map) self.regex = regex def to_python(self, value): """ 路由匹配时,匹配成功后传递给视图函数中参数的值 :param value: :return: """ return int(value) def to_url(self, value): """ 使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数 :param value: :return: """ val = super(RegexConverter, self).to_url(value) return val # 添加到converts中 app.url_map.converters['xxx'] = RegexConverter # 进行使用 @app.route('/index/<xxx("d+"):nid>',endpoint='xx') def index(nid): url_for('xx',nid=123) return "Index" if __name__ == '__main__': app.run() - 视图函数 请求: request.files 文件信息 request.values 所有的信息 request.form post请求 request.args get请求 响应: reutrn render_template() reutrn redirect() return "" return jsonify(name='alex',age='18') 返回json格式数据 make_response 每一个return的时候flask都做处理 → make_response(返回值), 可以设置cookie 及session 或者更多的其他内容 eg: response = make_response('xxxxx') response.headers['xxx'] = '123123' return response - CBV、FBV 回顾django的cbv: url(r'^login_cbv/',views.Login.as_view) from django.views import View class Login(View): def get(self,request): return render(request,"login.html") def post(self,request): return HttpResponse("post...") 如果是get请求就走 get方法,如果是post请求就走post方法。 flask的cbv def auth(func): def inner(*args, **kwargs): result = func(*args, **kwargs) return result return inner class IndexView(views.MethodView): methods = ['POST'] #只写POST时,只有请求时post时才生效,也就是get函数不执行 decorators = [auth,] 为每一个函数都加上auth装饰器 def get(self): v = url_for('index') print(v) return "GET" def post(self): return "GET" app.add_url_rule('/index', view_func=IndexView.as_view(name='index')) if __name__ == '__main__': app.run() 零碎知识点: root_path 根路径 to_dict() 把url变成字典 urllib.parse 引入 urlencode 把字典变成url形式 quote 和 unquote 把汉字变成乱码/把乱码变成汉字 self.__class__ 找到对象的类 - 模板 1.方法不会自动执行,要加括号,字典也可以用get方法 2.Makeup相当于django的 mark_safe , 或者在前端页面上 |safe Markup("<input type='text' />") 引出 xss 攻击 3.自定义的 标签和 过滤器 @ ,在页面上写的区别, 3.1 需要把test传给前端 def test(a1,a2): return a1+a2 return render_template('index.html',test=test) 使用:{{test(1,19)}} 3.2 所有的模板都可以用 @app.template_global() #加上这个装饰器以后就不需要传了,所有的页面直接就可以使用 def sb(a1, a2): return a1 + a2 + 100 使用:{{sb(1,2)}} 3.3 所有的模板都可以用 @app.template_filter() def db(a1, a2, a3): return a1 + a2 + a3 使用:{{ 1|db(2,3)}} 4.模板的继承和django一样的,include也是一样的 5.hong: {% macro xxxx(name, type='text', value='') %} <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> {% endmacro %} {{ xxxx('n1') }} - 蓝图 - 项目目录规则化 (把一个py文件分成多个py文件 ) 蓝图:小中型项目:结构 大中型项目:结构 - 特殊装饰器 app.before_first_request app.before_request app.after_request 必须要有返回值,并且得有一个形参 - session 设置值 session['xx']=1 取值 session.get('xx') 超时时间的设置 session超时时间如何设置? PERMANENT_SESSION_LIFETIME app.config['SESSION_COOKIE_NAME'] = 'session_lvning' """ 'SESSION_COOKIE_NAME': 'session', 'SESSION_COOKIE_DOMAIN': None, 'SESSION_COOKIE_PATH': None, 'SESSION_COOKIE_HTTPONLY': True, 'SESSION_COOKIE_SECURE': False, 'SESSION_REFRESH_EACH_REQUEST': True, #是否实时更新 'PERMANENT_SESSION_LIFETIME': timedelta(days=31) """ session本质是操作的一个字典,在修改session的时候是在内存中操作的,等在return之前 把内存的session字典再返回给你open_session / save_session - flash 同session的原理,区别在于flash取一次就没了 设置值,flash('xxxx') 取值,get_flashed_messages()
2.上下文管理
什么是上写文?
上下文个人理解是:程序运行时相关的周围环境,flask里的上下文是指:在请求刚进来的时候把某个数据或者变量放到一个栈里,等你后面什么时候用就去栈里取就行了。在java 和php中上下文被叫做 HttpRequestContext
a. 创建Local类: { 线程或协程唯一标识: { 'stack':[request],'xxx':[session,] }, 线程或协程唯一标识: { 'stack':[] }, 线程或协程唯一标识: { 'stack':[] }, 线程或协程唯一标识: { 'stack':[] }, } b. 本质 当请求进来之后,将请求相关数据添加到 [request,] 以后使用时:去读取 请求完成之后,将request从列表中移除。 c. 关系 local= 宋康 = { 线程或协程唯一标识: { 'stack':[] }, 线程或协程唯一标识: { 'stack':[] }, 线程或协程唯一标识: { 'stack':[] }, 线程或协程唯一标识: { 'stack':[] }, } stack=强哥={ push top pop } 存取数据时,要基于stack来做。 d. Flask和Django区别? - 请求相关数据传递方式 - Django: 参数 - Flask: 基于 Local,LocalStack对象 - 问题:多个请求到来会不会混淆 - 单线程 - 多线程 - 协程 解决: from greenlet import getcurrent as get_ident
3. 数据库连接池
3.1本地线程 -每一个线程来的时候,都分配一个标示,也就是说每个线程都有自己的数据信息,当取值的时候,只取自己线程的数据,这样实现了线程之间的数据隔离 import threading import time # 本地线程对象 local_values = threading.local() def func(num): """ # 第一个线程进来,本地线程对象会为他创建一个唯一标识 # 第二个线程进来,本地线程对象会为他创建一个唯一标识 { 线程1的唯一标识:{name:1}, 线程2的唯一标识:{name:2}, } """ local_values.name = num # 线程停下来了 time.sleep(2) # local_values.name,去local_values中根据自己的唯一标识作为key,获取value中name对应的值 print(local_values.name, threading.current_thread().name) for i in range(5): th = threading.Thread(target=func, args=(i,), name='线程%s' % i) th.start() 3.2原来连接数据库的方式: 1 #缺点:每次请求反复创建数据库连接,连接数太多 conn = pymysql.connect() cursor = conn.cursor() cursor.execute('select * from tb where id > %s',[5,]) result = cursor.fetchall() cursor.close() conn.close() print(result) 2 # 公用一个连接,多线程有问题,加锁→缺点,不能支持并发 with LOCK: cursor = CONN.cursor() cursor.execute('select * from tb where id > %s', [5, ]) result = cursor.fetchall() cursor.close() print(result) 3.3数据库连接池: 原理:设置连接池中最大连接数、默认启动时连接池中创建的连接数 3.3.1.为每个线程创建一个连接,该线程关闭时,不是真正关闭;本线程再次调用时,还是使用的最开始创建的连接。直到线程终止,数据库连接才关闭。(本质是用本地线程实现的) ;当很多线程进来时还是会创建很多的连接,所以这种方法也不好。 from DBUtils.PersistentDB import PersistentDB import pymysql import threading POOL = PersistentDB( creator=pymysql, # 使用链接数据库的模块 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always closeable=False, # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接) threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置 host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) def func(): conn = POOL.connection() cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() conn.close() # 不是真的关闭,而是假的关闭。 for i in range(10): t = threading.Thread(target=func) t.start() 3.3.2.创建一个连接池(10个连接),为所有线程提供连接,使用时来进行获取,使用完毕后,再次放回到连接池。 注意: 连接池中的所有连接都可以被重新使用,原因是因为pymsql.threadsafety的 个数是1 数据库连接池样板: xxx.py--------------- from flask import Flask from db import POOL app = Flask(__name__) @app.route('/index') def index(): conn = POOL.connection() cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() conn.close() return '执行成功' if __name__ == '__main__': app.run() db.py--------------- import pymysql from DBUtils.PooledDB import PooledDB, SharedDBConnection POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always #ping : 0 或 4最常用 host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) 如果有三个线程来连接池(最大连接数:9个,初始化创建了5个)拿连接,有三种情况: 1. 他们三个先后顺序来, 同一个连接就够三个线程用了 2. 他们来的顺序不定, 有可能需要两个连接 3. 他们同时来的, 需要三个链接
4. 单例模式 : - 推荐:__new__
- 文件 - 基于类方法 # 单例模式:无法支持多线程情况 """ class Singleton(object): def __init__(self): import time time.sleep(1) @classmethod def instance(cls, *args, **kwargs): if not hasattr(Singleton, "_instance"): Singleton._instance = Singleton(*args, **kwargs) return Singleton._instance import threading def task(arg): obj = Singleton.instance() print(obj) for i in range(10): t = threading.Thread(target=task,args=[i,]) t.start() """ # # 单例模式:支持多线程情况 """ import time import threading class Singleton(object): _instance_lock = threading.Lock() def __init__(self): time.sleep(1) @classmethod def instance(cls, *args, **kwargs): if not hasattr(Singleton, "_instance"): with Singleton._instance_lock: if not hasattr(Singleton, "_instance"): Singleton._instance = Singleton(*args, **kwargs) return Singleton._instance def task(arg): obj = Singleton.instance() print(obj) for i in range(10): t = threading.Thread(target=task,args=[i,]) t.start() time.sleep(20) obj = Singleton.instance() print(obj) """ - 基于__new__方法 引出:new,call,init 方法及区别 """ 1.对象是类创建,创建对象时候类的__init__方法自动执行,对象()执行类的 __call__ 方法 2.类是type创建,创建类时候type的__init__方法自动执行,类() 执行type的 __call__方法(类的__new__方法,类的__init__方法) # 第0步: 执行type的 __init__ 方法【类是type的对象】 class Foo: def __init__(self): pass def __call__(self, *args, **kwargs): pass # 第1步: 执行type的 __call__ 方法 # 1.1 调用 Foo类(是type的对象)的 __new__方法,用于创建对象。 # 1.2 调用 Foo类(是type的对象)的 __init__方法,用于对对象初始化。 obj = Foo() # 第2步:执行Foodef __call__ 方法 obj() """ """ class SingletonType(type): def __init__(self,*args,**kwargs): super(SingletonType,self).__init__(*args,**kwargs) def __call__(cls, *args, **kwargs): obj = cls.__new__(cls,*args, **kwargs) cls.__init__(obj,*args, **kwargs) # Foo.__init__(obj) return obj class Foo(metaclass=SingletonType): def __init__(self,name): self.name = name def __new__(cls, *args, **kwargs): return object.__new__(cls, *args, **kwargs) obj = Foo('name') """ import threading class Singleton(object): _instance_lock = threading.Lock() def __init__(self): pass def __new__(cls, *args, **kwargs): if not hasattr(Singleton, "_instance"): with Singleton._instance_lock: if not hasattr(Singleton, "_instance"): Singleton._instance = object.__new__(cls, *args, **kwargs) return Singleton._instance obj1 = Singleton() obj2 = Singleton() print(obj1,obj2) # <__main__.Singleton object at 0x0000029B7FE127F0> <__main__.Singleton object at 0x0000029B7FE127F0> - 基于metaclass方法 import threading class SingletonType(type): _instance_lock = threading.Lock() def __call__(cls, *args, **kwargs): if not hasattr(cls, "_instance"): with SingletonType._instance_lock: if not hasattr(cls, "_instance"): cls._instance = super(SingletonType,cls).__call__(*args, **kwargs) return cls._instance class Foo(metaclass=SingletonType): def __init__(self,name): self.name = name obj1 = Foo('name') obj2 = Foo('name') print(obj1,obj2) # <__main__.Foo object at 0x0000017B6A612898> <__main__.Foo object at 0x0000017B6A612898> 在哪应用了单例模式: a. stark组件 b. 数据库连接池
5.Session
5.1- 读Flask session源码 5.2- 流程: 当请求第一次进来,生成随机字符串:sidfnsdfisdfs - 发给用户cookie - 保存到session字典中 PS: 调用 stack(强哥)将随机字符串和对应的值放到 local(宋康) 视图函数使用 session = LocalProxy(partial(_lookup_req_object, 'session')) 请求处理完毕 将session做持久化: - 存到数据 - 存到redis - 存到加密cookie中 5.3- 自定义session: from flask import Flask,request,session app = Flask(__name__) app.secret_key = 'sdfsdfsd' from flask.sessions import SessionInterface,SessionMixin import uuid import json from flask.sessions import SessionInterface from flask.sessions import SessionMixin from itsdangerous import Signer, BadSignature, want_bytes class MySession(dict, SessionMixin): def __init__(self, initial=None, sid=None): self.sid = sid self.initial = initial super(MySession, self).__init__(initial or ()) def __setitem__(self, key, value): super(MySession, self).__setitem__(key, value) def __getitem__(self, item): return super(MySession, self).__getitem__(item) def __delitem__(self, key): super(MySession, self).__delitem__(key) class MySessionInterface(SessionInterface): session_class = MySession container = { # 'asdfasdfasdfas':{'k1':'v1','k2':'v2'} # 'asdfasdfasdfas':"{'k1':'v1','k2':'v2'}" } def __init__(self): pass # import redis # self.redis = redis.Redis() def _generate_sid(self): return str(uuid.uuid4()) def _get_signer(self, app): if not app.secret_key: return None return Signer(app.secret_key, salt='flask-session', key_derivation='hmac') def open_session(self, app, request): """ 程序刚启动时执行,需要返回一个session对象 """ sid = request.cookies.get(app.session_cookie_name) if not sid: # 生成随机字符串,并将随机字符串添加到 session对象中 sid = self._generate_sid() return self.session_class(sid=sid) signer = self._get_signer(app) try: sid_as_bytes = signer.unsign(sid) sid = sid_as_bytes.decode() except BadSignature: sid = self._generate_sid() return self.session_class(sid=sid) # session保存在redis中 # val = self.redis.get(sid) # session保存在内存中 val = self.container.get(sid) if val is not None: try: data = json.loads(val) return self.session_class(data, sid=sid) except: return self.session_class(sid=sid) return self.session_class(sid=sid) def save_session(self, app, session, response): """ 程序结束前执行,可以保存session中所有的值 如: 保存到resit 写入到用户cookie """ domain = self.get_cookie_domain(app) path = self.get_cookie_path(app) httponly = self.get_cookie_httponly(app) secure = self.get_cookie_secure(app) expires = self.get_expiration_time(app, session) val = json.dumps(dict(session)) # session保存在redis中 # self.redis.setex(name=session.sid, value=val, time=app.permanent_session_lifetime) # session保存在内存中 self.container.setdefault(session.sid, val) session_id = self._get_signer(app).sign(want_bytes(session.sid)) response.set_cookie(app.session_cookie_name, session_id, expires=expires, httponly=httponly, domain=domain, path=path, secure=secure) app.session_interface = MySessionInterface() # app.session_interface = Foo() # app.session_interface # app.make_null_session() @app.route('/index') def index(): print('网站的所有session',MySessionInterface.container) print(session) session['k1'] = 'v1' session['k2'] = 'v2' del session['k1'] # 在内存中操作字典.... # session['k1'] = 'v1' # session['k2'] = 'v2' # del session['k1'] return "xx" if __name__ == '__main__': app.__call__ app.run() 5.4 最常用 pip3 install flask-session #!/usr/bin/env python # -*- coding:utf-8 - from flask import Flask,current_app,session from flask_session import Session app = Flask(__name__) app.debug = True app.secret_key = 'xxxx' # 为Flask-session组件提供的配置 # import redis # app.config['SESSION_TYPE'] = 'redis' # session类型为redis # app.config['SESSION_REDIS'] = redis.Redis(host='127.0.0.1', port='6379', password='123123') # 用于连接redis的配置 # app.config['SESSION_KEY_PREFIX'] = 'session:' # 保存到session中的值的前缀 # app.config['SESSION_PERMANENT'] = False # 如果设置为True,则关闭浏览器session就失效。 # app.config['SESSION_USE_SIGNER'] = False # 是否对发送到浏览器上 session:cookie值进行加密 # Session(app) # # import memcache # app.config['SESSION_TYPE'] = 'memcached' # session类型为memcached # app.config['SESSION_PERMANENT'] = True # 如果设置为True,则关闭浏览器session就失效。 # app.config['SESSION_USE_SIGNER'] = False # 是否对发送到浏览器上session的cookie值进行加密 # app.config['SESSION_KEY_PREFIX'] = 'session:' # 保存到session中的值的前缀 # app.config['SESSION_MEMCACHED'] = memcache.Client(['10.211.55.4:12000']) # Session(app) app.config['SESSION_TYPE'] = 'filesystem' # session类型为filesystem app.config['SESSION_FILE_DIR'] = r'C:UsersAdministratorPycharmProjectsday1212.flask-session组件' # session类型为redis app.config['SESSION_FILE_THRESHOLD'] = 500 # 存储session的个数如果大于这个值时,就要开始进行删除了 app.config['SESSION_FILE_MODE'] = 384 # 文件权限类型 app.config['SESSION_PERMANENT'] = True # 如果设置为True,则关闭浏览器session就失效。 app.config['SESSION_USE_SIGNER'] = False # 是否对发送到浏览器上session的cookie值进行加密 app.config['SESSION_KEY_PREFIX'] = 'session:' # 保存到session中的值的前缀 Session(app) @app.route('/index') def index(): session['k1'] = 'v1' return 'xx' if __name__ == '__main__': app.run()
6.blinker 信号
pip3 install blinker 6.1 内置信号 10个信号: 2. request_started = _signals.signal('request-started') # 请求到来前执行 5. request_finished = _signals.signal('request-finished') # 请求结束后执行 3. before_render_template = _signals.signal('before-render-template') # 模板渲染前执行 4. template_rendered = _signals.signal('template-rendered') # 模板渲染后执行 2/3/4/5或不执行 got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行 6. request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否) 7. appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 请求上下文执行完毕后自动执行(无论成功与否) 1. appcontext_pushed = _signals.signal('appcontext-pushed') # 请求app上下文push时执行 8. appcontext_popped = _signals.signal('appcontext-popped') # 请求上下文pop时执行 message_flashed = _signals.signal('message-flashed') # 调用flask在其中添加数据时,自动触发 问题: 特殊的装饰器和信号有什么区别? - 装饰器返回值有意义 - 信号用于做什么呢? - 降低代码之间的耦合 6.2 自定义信号 :创建信号→注册→触发 from flask import Flask,flash from flask.signals import _signals app = Flask(__name__) wh = _signals.signal('wh') # 定义函数 def luominwen(*args,**kwargs): print('罗姑娘',args,kwargs) # 定义函数 def shaowei(*args,**kwargs): print('少姑娘',args,kwargs) # 将函数注册到request_started信号中: 添加到这个列表 wh.connect(luominwen) wh.connect(shaowei) @app.route('/index') def index(): # 触发这个信号:执行注册到列表中的所有函数 # 发送短信,邮件,微信 wh.send(sender='xxx',a1=123,a2=456) return "xx" if __name__ == '__main__': app.__call__ app.run()
6.3Django内置
Request/response signals
request_started # 请求到来前,自动触发
request_finished # 请求结束后,自动触发
got_request_exception # 请求异常后,自动触发
Model signals
pre_init # django的modal执行其构造方法前,自动触发
post_init # django的modal执行其构造方法后,自动触发
pre_save # django的modal对象保存前,自动触发
post_save # django的modal对象保存后,自动触发, 它可以判断是增加的还是修改的
pre_delete # django的modal对象删除前,自动触发
post_delete # django的modal对象删除后,自动触发
m2m_changed # django的modal中使用m2m字段操作第三张表(add,remove,clear)前后,自动触发
class_prepared # 程序启动时,检测已注册的app中modal类,对于每一个类,自动触发
Management signals
pre_migrate # 执行migrate命令前,自动触发
post_migrate # 执行migrate命令后,自动触发
Test signals
setting_changed # 使用test测试修改配置文件时,自动触发
template_rendered # 使用test测试渲染模板时,自动触发
Database Wrappers
connection_created # 创建数据库连接时,自动触发
需求:新浪面试题,数据库12张表,每张表创建一条数据时,记录一条日志。
答案: 可以重写 django里面orm操作的save 方法,或者 使用信号
7. wtforms
- wtforms的源码
- 使用
metaclass的另外一种方式: class MyType(type): def __init__(self,*args,**kwargs): print('xxxx') super(MyType,self).__init__(*args,**kwargs) def __call__(cls, *args, **kwargs): obj = cls.__new__(cls,*args, **kwargs) cls.__init__(obj,*args, **kwargs) # Foo.__init__(obj) return obj def with_metaclass(base): return MyType("MyType",(base,),{}) class Foo(with_metaclass(object)): def __init__(self,name): self.name = name #打印结果: xxxx xxxx
8. flask的简单 使用:
- 蓝图 - 配置 from_objects - flask-session - 记录:请求到来写日志 - wtforms - DBUtils - 单例模式 PS: 连接数据库,对用户表进行:登录和注册
9. 导出程序内应用的所有模块
# 获取环境中所有安装的模块 pip3 freeze #在终端查看 pip3 freeze > requirements.txt #写入requirements.txt文件中 # pip3 install pipreqs # 获取当前所在程序目录中涉及到的所有模块,并自动生成 requirements.txt 且写入内容。 pipreqs ./ 以后别人给你程序: requirements.txt 进入程序目录: pip3 install -r requirements.txt