添加到flask中 我们要用自定义的路由,用正则的话 1.导入from werkzeug.routing import BaseConverter 2.我先要写一个类,然后继承BaseConverter,然后实现__inti__, def to_python(self, value):to_url(self, value) 3.app.url_map.converters['谁便'] = RegexConverter 4.我们在路由里面@app.route('/index/<regex1("d+"):nid>'),regex1='谁便,regex1("正则表达式") 5.regex1("正则表达式")匹配出来的结果,返回to_python,一定要return 6.当我们做反向解析的解析的时候,我们的参数,会传递给to_url,return的结果才是我们拼接到我们路由上
from flask import Flask, views, url_for from werkzeug.routing import BaseConverter app = Flask(import_name=__name__) class RegexConverter(BaseConverter): """ 自定义URL匹配正则表达式 """ def __init__(self, map, regex): super(RegexConverter, self).__init__(map) self.regex = regex def to_python(self, value): """ 路由匹配时,匹配成功后传递给视图函数中参数的值 """ #value就正则匹配出来的结果 print('value',value,type(value)) return "asdasdasd" def to_url(self, value): """ 使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数 """ val = super(RegexConverter, self).to_url(value) print(val) return val app.url_map.converters['regex1'] = RegexConverter @app.route('/index/<regex1("d+"):nid>',endpoint="sb") def index(nid): print("nid",nid,type(nid)) print(url_for('sb', nid='888')) # /index/666 return 'Index' if __name__ == '__main__': app.run()
from flask import Flask from flask import request from flask import render_template from flask import redirect from flask import make_response app = Flask(__name__) @app.route('/login.html', methods=['GET', "POST"]) def login(): # 请求相关信息 # request.method 提交的方法 # request.args get请求提及的数据 # request.form post请求提交的数据 # request.values post和get提交的数据总和 # request.cookies 客户端所带的cookie # request.headers 请求头 # request.path 不带域名,请求路径 # request.full_path 不带域名,带参数的请求路径 # request.script_root # request.url 带域名带参数的请求路径 # request.base_url 带域名请求路径 # request.url_root 域名 # request.host_url 域名 # request.host 127.0.0.1:500 # request.files # obj = request.files['the_file_name'] # obj.save('/var/www/uploads/' + secure_filename(f.filename)) # 响应相关信息 # return "字符串" # return render_template('html模板路径',**{}) # return redirect('/index.html') #return jsonify({'k1':'v1'}) # response = make_response(render_template('index.html')) # response是flask.wrappers.Response类型 # response.delete_cookie('key') # response.set_cookie('key', 'value') # response.headers['X-Something'] = 'A value' # return response return "内容" if __name__ == '__main__': app.run()
cookie:存放在客户端的键值对
session:存放在服务端的键值对
token:存放在客户端,通过算法来校验
''' app.session_interface这里面看 存session, 1 调用save_session,将我们的session加密的val,读取配置文件['SESSION_COOKIE_NAME']得到key 2 将1种的key,val存储到cookies 取session 1 获取request里面的cookies,获取里面key,这个key就是['SESSION_COOKIE_NAME'],值就是加密的值 2 对该值进行解密 '''
a 产生信息,传给 c 页面
但是用户访问a 页面以后,不是直接跳转到c,而是到b,或则是其他页面,但是用户访问c页面的时候,我希望把a给我的信息拿到
-设置:flash('aaa') -取值:get_flashed_message() - -假设在a页面操作出错,跳转到b页面,在b页面显示a页面的错误信息
from flask import Flask,flash,get_flashed_messages,request app = Flask(__name__) app.secret_key = 'asdfasdf' #1 如果要用flash就必须设置app.secret_key = 'asdfasdf' #2 只能取一次,在取就没有了 #3 可以通过 flash('普通信息',category="info"),对信息做分类 #4 get_flashed_messages(with_categories=True,category_filter=("error",)),with_categories以键值对的形式获取 # 设置闪现,category_filter=("error",)进行分类信息的过滤 @app.route('/index1') def index(): #(category="message", message)) flash('超时错误',category="error") flash('普通信息',category="info") return "ssdsdsdfsd" # return redirect('/error') @app.route('/error1') def error1(): return "ok" @app.route('/error') def error(): data = get_flashed_messages(with_categories=True,category_filter=("error","info")) data1 = get_flashed_messages(with_categories=True, category_filter=("error", "info")) print("data1",data1) print("data",data) return "错误信息" if __name__ == '__main__': app.run()
#1 如果要用flash就必须设置app.secret_key = 'asdfasdf' #2 只能取一次,在取就没有了 #3 可以通过 flash('普通信息',category="info"),对信息做分类 #4 get_flashed_messages(with_categories=True,category_filter=("error",)),with_categories以键值对的形式获取 # 设置闪现,category_filter=("error",)进行分类信息的过滤
1 before_request
类比django中间件中的process_request,在请求收到之前绑定一个函数做一些事情
#基于它做用户登录认证 @app.before_request def process_request(*args,**kwargs): if request.path == '/login': return None user = session.get('user_info') if user: return None return redirect('/login')
2 after_request
类比django中间件中的process_response,每一个请求之后绑定一个函数,如果请求没有异常
@app.after_request def process_response1(response): print('process_response1 走了') return response
3 before_first_request
第一次请求时,跟浏览器无关(接收的第一个请求)
@app.before_first_request def first(): pass
4 teardown_request
如论有没有异常都会执行,如果没有异常这个参数就是None,有就记录这个异常
@app.teardown_request def ter(e): pass
5 errorhandler
捕获异常,如果出现异常,而且状态就是@app.errorhandler(404),服务器内部错误500
@app.errorhandler(404) def error_404(arg): return "404错误了"
6 template_global
标签
@app.template_global() def sb(a1, a2): return a1 + a2 #{{sb(1,2)}}
7 template_filter
过滤器
@app.template_filter() def db(a1, a2, a3): return a1 + a2 + a3 #{{ 1|db(2,3)}}
总结:
1 重点掌握before_request和after_request,
2 注意有多个的情况,执行顺序
3 before_request请求拦截后(也就是有return值),response所有都执行
from flask import Flask app = Flask(__name__) @app.route('/') def index(): return 'Hello World!' # 模拟中间件 class Md(object): def __init__(self,old_wsgi_app): self.old_wsgi_app = old_wsgi_app def __call__(self, environ, start_response): print('开始之前') ret = self.old_wsgi_app(environ, start_response) print('结束之后') return ret if __name__ == '__main__': #1我们发现当执行app.run方法的时候,最终执行run_simple,最后执行app(),也就是在执行app.__call__方法 #2 在__call__里面,执行的是self.wsgi_app().那我们希望在执行他本身的wsgi之前做点事情。 #3 所以我们先用Md类中__init__,保存之前的wsgi,然后我们用将app.wsgi转化成Md的对象。 #4 那执行新的的app.wsgi_app,就是执行Md的__call__方法。 #把原来的wsgi_app替换为自定义的, app.wsgi_app = Md(app.wsgi_app) app.run()
请求所有的流程
ctx = self.request_context(environ) error = None try: try: ctx.push() #根据路径去执行视图函数,视图类 response = self.full_dispatch_request() except Exception as e: error = e response = self.handle_exception(e) except: # noqa: B001 error = sys.exc_info()[1] raise return response(environ, start_response) finally: #不管出不出异常,都会走这里 if self.should_ignore_error(error): error = None ctx.auto_pop(error)
''' 1 app.__call__ 2 wsgi_app(environ, start_response) 2.1 ctx = self.request_context(environ) 2.1.1 return RequestContext(self, environ) 这里的self是app,environ请求相关 2.1.2 return RequestContext(self, environ) 得到了RequestContext的对象,而且有request属性 2.2 2.1中的ctx就是RequestContext的对象 2.3 ctx.push()执行这个,就是RequestContext的对象的push方法 2.3.1 #执行这个,self-->ctx _request_ctx_stack.push(self) 2.3.1.1 我们发现_request_ctx_stack = LocalStack() 他的push方法的源码: def push(self, obj): rv = getattr(self._local, "stack", None) if rv is None: # self._local=>stack-->storage['线程id']['stack']=[ctx,] self._local.stack = rv = [] rv.append(obj) return rv
3在请求中获取request.form 3.1 request是LocalProxy的对象,当获取属性的时候会走__getattr__ def __getattr__(self, name): if name == "__members__": return dir(self._get_current_object()) #name-->form, #self._get_current_object()===>ctx.request,form #_get_current_object()---》self.__local() return getattr(self._get_current_object(), name) 3.1.1 self._get_current_object():源码:最终:partial(_lookup_req_object, "request") def _get_current_object(self): if not hasattr(self.__local, "__release_local__"): #local==>partial(_lookup_req_object, "request") #def __init__(self, local, name=None): # object.__setattr__(self, "_LocalProxy__local", local) #self.__local()===>local() return self.__local() try: return getattr(self.__local, self.__name__) except AttributeError: raise RuntimeError("no object bound to %s" % self.__name__)
4 partial(_lookup_req_object, "request")偏函数的源码 def _lookup_req_object(name): #name是request #ctx top = _request_ctx_stack.top if top is None: raise RuntimeError(_request_ctx_err_msg) #ctx-->request return getattr(top, name) 4.1中_request_ctx_stack.top @property def top(self): try: return self._local.stack[-1] except (AttributeError, IndexError): return None
对程序进行目录结构划分
不使用蓝图,自己分文件
目录结构:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>随便写点东西</h1> </body> </html>
from flask import Flask app = Flask(__name__) from pro import views app.register_blueprint(views.us) # 注册
from flask import Blueprint,render_template us = Blueprint("user",__name__) @us.route('/') def index(): return render_template("index.html")
from pro import app if __name__ == '__main__': app.run()
专门用来存储用户信息的g对象,g的全称的为global
g对象在一次请求中的所有的代码的地方,都是可以使用的
g对象和session的区别
session对象是可以跨request的,只要session还未失效,不同的request的请求会获取到同一个session,但是g对象不是,g对象不需要管过期时间,请求一次就g对象就改变了一次,或者重新赋值了一次
当前请求内你设置就可以取,必须先设置,后取,当前请求可以取无限次
就算你当前请求,设置了,如果不取,其他请求过来,也取不到
Flask框架中的信号基于blinker,其主要就是让开发者可是在flask请求过程中定制一些用户行为
安装:
pip3 install blinker
内置信号:
request_started = _signals.signal('request-started') # 请求到来前执行 request_finished = _signals.signal('request-finished') # 请求结束后执行 before_render_template = _signals.signal('before-render-template') # 模板渲染前执行 template_rendered = _signals.signal('template-rendered') # 模板渲染后执行 got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行 request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否) appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否) appcontext_pushed = _signals.signal('appcontext-pushed') # 应用上下文push时执行 appcontext_popped = _signals.signal('appcontext-popped') # 应用上下文pop时执行 message_flashed = _signals.signal('message-flashed') # 调用flask在其中添加数据时,自动触发
使用信号:
from flask import Flask,signals app = Flask(__name__) def func(*args,**kwargs): print('触发信号',args,kwargs) # 与该信号进行绑定 signals.request_started.connect(func) # 触发信号:signals.request_started.send() @app.before_first_request def before_first1(*args,**kwargs): print('before_first_request') @app.before_request def before_first2(*args,**kwargs): print('before_request') @app.route('/',methods=['GET','POST']) def index(): print('视图') return '视图' if __name__ == '__main__': app.run()
自定义信号:
作用:将默认保存的签名cookie中的值 保存到 redis/memcached/file/Mongodb/SQLAlchemy
安装:pip3 install flask-session
第一种使用方法:
from flask import Flask,session from flask_session import RedisSessionInterface import redis app = Flask(__name__) app.secret_key="ajksda" conn=redis.Redis(host='127.0.0.1',port=6379) app.session_interface=RedisSessionInterface(conn,key_prefix='jason',use_signer=True,permanent=False) @app.route('/') def hello_world(): session['nb']='jason' return 'Hello World!' @app.route("/index") def index(): print(session['nb']) return "ok" if __name__ == '__main__': app.run()
第二种使用方法:
from flask import Flask,session import redis from flask_session import Session app = Flask(__name__) app.config['SESSION_TYPE'] = 'redis' app.config['SESSION_REDIS'] =redis.Redis(host='127.0.0.1',port='6379') app.config['SESSION_KEY_PREFIX']="jason" Session(app) @app.route('/') def hello_world(): session['sb']='jason' return 'Hello World!' @app.route("/index") def index(): print(session['sb']) return "ok" if __name__ == '__main__': app.run()
from werkzeug.wsgi import DispatcherMiddleware from werkzeug.serving import run_simple from flask import Flask app1 = Flask('app01') app2 = Flask('app02') @app1.route('/index') def index(): return "app01" @app2.route('/index') def index2(): return "app2" dm = DispatcherMiddleware(app1, { '/sec12': app2, }) if __name__ == "__main__": run_simple('localhost', 5000, dm)
用于实现类似于django中 python3 manage.py runserver ...类似的命令
安装:pip3 install flask-script
from flask import Flask from flask_script import Manager app = Flask(__name__) manager = Manager(app) @app.route('/') def index(): return 'ok' if __name__ == '__main__': manager.run()
自定义命令
#第一步安装:pip3 install flask-script from flask import Flask from flask_script import Manager app = Flask(__name__) manager=Manager(app) @app.route("/") def index(): return "ok" @manager.command def custom1(arg,a): """ 自定义命令 python manage.py custom 123 :param arg: :return: """ print(arg,a) @manager.option('-n', '--name', dest='name') @manager.option('-u', '--url', dest='url') def cmd(name, url): """ 自定义命令(-n也可以写成--name) 执行: python manage.py cmd -n lqz -u http://www.oldboyedu.com 执行: python manage.py cmd --name lqz --url http://www.oldboyedu.com :param name: :param url: :return: """ print(name, url) if __name__ == '__main__': manager.run()
安装:pip3 install wtforms
使用1:表单验证
from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__) app.debug = True class LoginForm(Form): # 字段(内部包含正则表达式) name = simple.StringField( label='用户名', validators=[ validators.DataRequired(message='用户名不能为空.'), validators.Length(min=2, max=6, message='用户名长度必须大于%(min)d且小于%(max)d') ], widget=widgets.TextInput(), # 页面上显示的插件 render_kw={'class': 'form-control'} ) # 字段(内部包含正则表达式) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.'), validators.Length(min=8, message='密码长度必须大于%(min)d'), # validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*d)(?=.*[$@$!%*?&])[A-Za-zd$@$!%*?&]{8,}", # message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'GET': form = LoginForm() return render_template('login.html', form=form) else: form = LoginForm(formdata=request.form) if form.validate(): print('用户提交数据通过格式验证,提交的值为:', form.data) else: print(form.errors) return render_template('login.html', form=form) if __name__ == '__main__': app.run()
login.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>登录</h1> <form method="post"> <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p> <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p> <input type="submit" value="提交"> </form> </body> </html>
使用2:all表单验证
from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class RegisterForm(Form): def validate_pwd_confirm(self, field): """ 自定义pwd_confirm字段规则,例:与pwd字段是否一致 :param field: :return: """ # 最开始初始化时,self.data中已经有所有的值 print(field.data) if field.data !="sb": #raise validators.ValidationError("sb") # 继续后续验证 raise validators.StopValidation("SB") # 不再继续后续验证 # if field.data != self.data['pwd']: # raise validators.ValidationError("密码不一致") # 继续后续验证 #raise validators.StopValidation("密码不一致") # 不再继续后续验证 name = simple.StringField( label='用户名', validators=[ validators.DataRequired() ], widget=widgets.TextInput(), render_kw={'class': 'form-control'}, default='tank' ) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) pwd_confirm = simple.PasswordField( label='重复密码', validators=[ validate_pwd_confirm, validators.DataRequired(message='重复密码不能为空.'), #validators.EqualTo('pwd', message="两次密码输入不一致") ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) email = html5.EmailField( label='邮箱', validators=[ validators.DataRequired(message='邮箱不能为空.'), validators.Email(message='邮箱格式错误') ], widget=widgets.TextInput(input_type='email'), render_kw={'class': 'form-control'} ) gender = core.RadioField( label='性别', choices=( (1, '男'), (2, '女'), ), coerce=int # “1” “2” ) city = core.SelectField( label='城市', choices=( ('bj', '北京'), ('sh', '上海'), ) ) hobby = core.SelectMultipleField( label='爱好', choices=( (1, '篮球'), (2, '足球'), ), coerce=int ) favor = core.SelectMultipleField( label='喜好', choices=( (1, '篮球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, 2] ) def __init__(self, *args, **kwargs): super(RegisterForm, self).__init__(*args, **kwargs) self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球')) self.favor.data=[1,] @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'GET': form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial return render_template('register.html', form=form) else: form = RegisterForm(formdata=request.form) if form.validate(): print('用户提交数据通过格式验证,提交的值为:', form.data) else: print(form.errors) return render_template('register.html', form=form) if __name__ == '__main__': app.run()
login.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用户注册</h1> <form method="post" novalidate style="padding:0 50px"> {% for field in form %} <p>{{field.label}}: {{field}} {{field.errors[0] }}</p> {% endfor %} <input type="submit" value="提交"> </form> </body> </html>