Flask生命周期
1.app = Flask(),实例化时,执行app的__init__方法
app = Flask(__name__,static_url_path='/xx') # 实例化对象
flask源码:
url_rule_class = Rule
url_map_class = Map
static_folder="static",
template_folder="templates",
def __init__(import_name,
static_url_path=None,
static_folder="static",
static_host=None,
host_matching=False,
subdomain_matching=False,
template_folder="templates",
instance_path=None,
instance_relative_config=False,
root_path=None,):
self.static_url_path = static_url_path
self.static_folder = static_folder
self.view_functions = {}
self.before_first_request_funcs = []
self.before_request_funcs = {}
self.after_request_funcs = {}
# flask加载配置文件
self.config = self.make_config(instance_relative_config)
def make_config(self, instance_relative=False):
return self.config_class(root_path, defaults)
# 在flask中config_class = Config,因此self.config = Config对象
self.url_map = self.url_map_class()
# 在flask中url_map_class = Map,因此self.url_map = Map
# 添加静态文件路径
if self.has_static_folder:
assert (
bool(static_host) == host_matching),
"Invalid static_host/host_matching combination"
self.add_url_rule(
self.static_url_path + "/<path:filename>",
# def static_url_path(self):
# if self.static_folder is not None:
# basename = os.path.basename(self.static_folder)
# return ("/" + basename).rstrip("/")
# 而且默认 static_folder = 'static'
endpoint="static",
host=static_host,
view_func=self.send_static_file,
)
@setupmethod
def add_url_rule(
self,
rule,
endpoint=None,
view_func=None,
provide_automatic_options=None,
**options
):
rule = self.url_rule_class(rule, methods=methods, **options)
# 默认url_rule_class = Rule,methods是允许请求的方式,**options里面封装了endpoint别名,默认为'static'
self.url_map.add(rule)
# 默认url_map = Map,将rule对象封装进Map对象中
2.app.config.from_object('xx.xx'),导入配置文件
# 在后端写上 Flask对象.config.from_object('导入配置文件')
# from_object中:
# dir(obj)取到当前配置文件中所有的配置名
def from_object(self, obj):
for key in dir(obj):
if key.isupper():
self[key] = getattr(obj, key)
# 读取配置文件中的所有键值对,并将键值对全都放到Config对象。(Config是一个字典)
# 把包含所有配置文件的Config对象,赋值给 app.config
3.执行before_first_request,before_request,after_request
实例化时,默认的
self.before_first_request_funcs = []
self.before_request_funcs = {}
self.after_request_funcs = {}
# 在执行视图函数之前,进行before_first_request,before_request,after_request,但不调用。
# f 代表被装饰的函数
@setupmethod
def before_first_request(self, f):
self.before_first_request_funcs.append(f)
return f
@setupmethod
def before_request(self, f):
self.before_request_funcs.setdefault(None, []).append(f)
return f
@setupmethod
def after_request(self, f):
self.after_request_funcs.setdefault(None, []).append(f)
return f
4.执行路由
# 将生成的url,methods,endpoints放入rule对象中,在将rule对象放入Map对象中。
# 将视图函数放入封装到view_functions中,{'endpoints':view_func}
def route(self, rule, **options):
def decorator(f):
endpoint = options.pop("endpoint", None)
self.add_url_rule(rule, endpoint, f, **options)
return f
return decorator
@setupmethod
def add_url_rule(
self,
rule,
endpoint=None,
view_func=None,
provide_automatic_options=None,
**options
):
# 如果endpoint为None,就等于函数名
if endpoint is None:
endpoint = _endpoint_from_view_func(view_func)
# def _endpoint_from_view_func(view_func):
# return view_func.__name__
options["endpoint"] = endpoint
methods = options.pop("methods", None)
rule = self.url_rule_class(rule, methods=methods, **options)
self.url_map.add(rule) # 将rule对象封装进Map中
if view_func is not None:
old_func = self.view_functions.get(endpoint)
if old_func is not None and old_func != view_func:
raise AssertionError(
"View function mapping is overwriting an "
"existing endpoint function: %s" % endpoint
)
self.view_functions[endpoint] = view_func
# 向view_functions添加,以endpoint为key,以view_func为值
5.运行flask
from flask import Flask
app = Flask(__name__,static_url_path='/xx')
@app.route('/index')
def index():
return 'hello world'
if __name__ == '__main__':
app.run()
# 1. 内部调用werkzeug的run()函数,执行run_simple(),内部创建socket,监听IP和端口,等待用户请求到来。
# 2. 一旦有用户请求,执行app.__call__方法。
class Flask(object):
def __call__(self,envion,start_response):
return self.wsgi_app(environ, start_response)
def run(self):
run_simple(host, port, self, **options)
if __name__ == '__main__':
app.run()
# 执行self.wsgi_app(environ, start_response)
def wsgi_app(self, environ, start_response):
ctx = self.request_context(environ)
'''
# 执行ctx = self.request_context(environ),并赋值给ctx
# ctx = RequestContext(self, environ)
def request_context(self, environ):
return RequestContext(self, environ)
class RequestContext(object):
def __init__(self, app, environ, request=None, session=None):
self.app = app
if request is None:
request = app.request_class(environ)
# 默认request_class = Request
self.request = request
self.session = session
# 封装了session
'''
try:
try:
ctx.push()
'''
def push(self):
app_ctx = _app_ctx_stack.top
if app_ctx is None or app_ctx.app != self.app:
———— app_ctx = self.app.app_context()
app_ctx.push() ————
| _request_ctx_stack.push(self) ————
| | |
|
# 找到ctx中的session给它赋值
| | |
if self.session is None:
| | |
| session_interface = self.app.session_interface
self.session =
| session_interface.open_session(self.app, self.request) | |
if self.session is None:
self.session =
| session_interface.make_null_session(self.app) | |
# 先略过
if self.url_adapter is not None:
| self.match_request()
_____________________________________________________ | |
|
# 执行app_ctx = self.app.app_context() | |
| —— 》 def app_context(self):
return AppContext(self)
class AppContext(object): | |
def __init__(self, app):
# 封装app
self.app = app
# 封装g | |
self.g = app.app_ctx_globals_class()
—————————————————————————————————————————————————— | |
# 执行app_ctx.push() = AppContext.push() 《———— |
# 将app_ctx封装进local()中
def push(self):
_app_ctx_stack.push(self) |
# _app_ctx_stack = LocalStack() |
def push(self, obj):
rv = getattr(self._local, "stack", None)
if rv is None:
self._local.stack = rv = [] |
rv.append(obj)
return rv
———————————————————————————————————————————————— |
# _request_ctx_stack.push(self) 《————
ctx调用push(),因此self=ctx对象
def push(self, obj):
rv = getattr(self._local, "stack", None)
if rv is None:
self._local.stack = rv = []
rv.append(obj)
return rv
'''
response = self.full_dispatch_request()
'''
def full_dispatch_request(self):
self.try_trigger_before_first_request_functions() ————
try:
# 信号 停留
request_started.send(self) |
# 视图之前,执行所有before_request
rv = self.preprocess_request() ————
| |
if rv is None:
# 执行视图函数 | |
———— rv = self.dispatch_request()
except Exception as e:
| rv = self.handle_user_exception(e) |
|
| # 执行所有的after_request |
# 加密session并保存到cookie中 |
———— return self.finalize_request(rv)
| | ———————————————————————————————————————————————————— | |
| | def try_trigger_before_first_request_functions(self): 《————
if self._got_first_request:
return |
| | with self._before_request_lock:
if self._got_first_request:
return |
| | # 执行所有的before_first_request
for func in self.before_first_request_funcs:
func() |
| | self._got_first_request = True
—————————————————————————————————————————————————— |
# 执行所有brefore_request的函数
| | def preprocess_request(self): 《————
funcs = self.before_request_funcs.get(None, ())
| | if bp is not None and bp in self.before_request_funcs:
funcs = chain(funcs, self.before_request_funcs[bp])
| | for func in funcs:
rv = func()
if rv is not None:
| | return rv
————————————————————————————————————————————————————
|
| ——》 def dispatch_request(self):
# req = ctx.request
| req = _request_ctx_stack.top.request
| # rule = ctx.request.url_rule
# url_rule =Rule,存着endpoint
| rule = req.url_rule
# 加括号执行视图函数
| return self.view_functions[rule.endpoint](**req.view_args)
————————————————————————————————————————————————————————————
|
——》
def finalize_request(self, rv, from_error_handler=False):
# 将视图函数返回值封装到Response中
response = self.make_response(rv)
try:
# 执行所有的after_request_funcs
response = self.process_response(response) ————
request_finished.send(self, response=response)
except Exception: |
if not from_error_handler:
raise self.logger.exception( |
"Request finalizing failed with an
error while handling an error" |
)
return response |
————————————————————————————————————————————————————————
def process_response(self, response): 《————
# 获取_request_ctx_stack对象栈顶的ctx对象
ctx = _request_ctx_stack.top
# 获取after_request_funcs中的以None为key的列表
# 列表中是被装饰的函数名
if None in self.after_request_funcs:
funcs = chain(funcs, reversed(self.after_request_funcs[None]))
# 加括号执行函数,将resonse传入并加工在赋值给response
for handler in funcs:
response = handler(response)
# 取到session的值并加密存放到,返回给用户浏览器的cookie中
if not self.session_interface.is_null_session(ctx.session):
self.session_interface.save_session(self, ctx.session, response)
# 返回给response
return response
'''
except Exception as e:
error = e
response = self.handle_exception(e)
except: # noqa: B001
error = sys.exc_info()[1]
raise
# 返回给用户
return response(environ, start_response)
finally:
if self.should_ignore_error(error):
error = None
# 返回给用户之后,需要把local中的数据销毁。
ctx.auto_pop(error)
'''
def auto_pop(self, exc):
self.pop(exc)
def pop(self, exc=_sentinel):
finally:
——— rv = _request_ctx_stack.pop()
| if app_ctx is not None:
|
# 与_request_ctx_stack.pop()相同流程
| app_ctx.pop(exc)
| _________________________________________________
——》 def pop(self):
# 获取local中的列表:local = {stack:[ctx,]}
stack = getattr(self._local, "stack", None)
if stack is None:
return None
elif len(stack) == 1:
——— release_local(self._local)
| return stack[-1]
| else:
| return stack.pop()
| ___________________________________________
——》 def release_local(local):
local.__release_local__()
def __release_local__(self):
self.__storage__.pop(self.__ident_func__(), None)
'''