zoukankan      html  css  js  c++  java
  • flask扩展

    一、严格模式

    二、重定向

    三、自定义URL正则表达式

     添加到flask中
     我们要用自定义的路由,用正则的话
    
    1.导入from werkzeug.routing import BaseConverter
    2.我先要写一个类,然后继承BaseConverter,然后实现__inti__, def to_python(self, value):to_url(self, value)
    3.app.url_map.converters['谁便'] = RegexConverter
    4.我们在路由里面@app.route('/index/<regex1("d+"):nid>'),regex1='谁便,regex1("正则表达式")
    5.regex1("正则表达式")匹配出来的结果,返回to_python,一定要return
    6.当我们做反向解析的解析的时候,我们的参数,会传递给to_url,return的结果才是我们拼接到我们路由上
    from flask import Flask, views, url_for
    from werkzeug.routing import BaseConverter
    
    app = Flask(import_name=__name__)
    
    class RegexConverter(BaseConverter):
        """
        自定义URL匹配正则表达式
        """
        def __init__(self, map, regex):
            super(RegexConverter, self).__init__(map)
            self.regex = regex
    
        def to_python(self, value):
            """
            路由匹配时,匹配成功后传递给视图函数中参数的值
            """
            #value就正则匹配出来的结果
            print('value',value,type(value))
            return "asdasdasd"
    
        def to_url(self, value):
            """
            使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数
            """
            val = super(RegexConverter, self).to_url(value)
            print(val)
            return val
    
    app.url_map.converters['regex1'] = RegexConverter
    @app.route('/index/<regex1("d+"):nid>',endpoint="sb")
    def index(nid):
        print("nid",nid,type(nid))
        print(url_for('sb', nid='888'))
        # /index/666
        return 'Index'
    
    if __name__ == '__main__':
        app.run()

    四、请求与响应

    from flask import Flask
        from flask import request
        from flask import render_template
        from flask import redirect
        from flask import make_response
    
        app = Flask(__name__)
    
    
        @app.route('/login.html', methods=['GET', "POST"])
        def login():
    
            # 请求相关信息
            # request.method  提交的方法
            # request.args  get请求提及的数据
            # request.form   post请求提交的数据
            # request.values  post和get提交的数据总和
            # request.cookies  客户端所带的cookie
            # request.headers  请求头
            # request.path     不带域名,请求路径
            # request.full_path  不带域名,带参数的请求路径
            # request.script_root  
            # request.url           带域名带参数的请求路径
            # request.base_url      带域名请求路径
            # request.url_root      域名
            # request.host_url      域名
            # request.host          127.0.0.1:500
            # request.files
            # obj = request.files['the_file_name']
            # obj.save('/var/www/uploads/' + secure_filename(f.filename))
    
            # 响应相关信息
            # return "字符串"
            # return render_template('html模板路径',**{})
            # return redirect('/index.html')
            #return jsonify({'k1':'v1'})
    
            # response = make_response(render_template('index.html'))
            # response是flask.wrappers.Response类型
            # response.delete_cookie('key')
            # response.set_cookie('key', 'value')
            # response.headers['X-Something'] = 'A value'
            # return response
            return "内容"
    
        if __name__ == '__main__':
            app.run()

    五、session

    cookie:存放在客户端的键值对
    session:存放在服务端的键值对
    token:存放在客户端,通过算法来校验
    '''
    app.session_interface这里面看
    存session,
    1 调用save_session,将我们的session加密的val,读取配置文件['SESSION_COOKIE_NAME']得到key
    2 将1种的key,val存储到cookies
    
    取session
    1 获取request里面的cookies,获取里面key,这个key就是['SESSION_COOKIE_NAME'],值就是加密的值
    2 对该值进行解密
    
    '''

    六、闪现

    什么是闪现

    a 产生信息,传给 c 页面
    但是用户访问a 页面以后,不是直接跳转到c,而是到b,或则是其他页面,但是用户访问c页面的时候,我希望把a给我的信息拿到

    -设置:flash('aaa')
    -取值:get_flashed_message()
    -
    -假设在a页面操作出错,跳转到b页面,在b页面显示a页面的错误信息
    from flask import Flask,flash,get_flashed_messages,request
    
    app = Flask(__name__)
    app.secret_key = 'asdfasdf'
    #1 如果要用flash就必须设置app.secret_key = 'asdfasdf'
    #2 只能取一次,在取就没有了
    #3 可以通过  flash('普通信息',category="info"),对信息做分类
    #4 get_flashed_messages(with_categories=True,category_filter=("error",)),with_categories以键值对的形式获取
    #  设置闪现,category_filter=("error",)进行分类信息的过滤
    @app.route('/index1')
    def index():
        #(category="message", message))
        flash('超时错误',category="error")
        flash('普通信息',category="info")
        return "ssdsdsdfsd"
        # return redirect('/error')
    
    @app.route('/error1')
    def error1():
        return "ok"
    
    @app.route('/error')
    def error():
        data = get_flashed_messages(with_categories=True,category_filter=("error","info"))
        data1 = get_flashed_messages(with_categories=True, category_filter=("error", "info"))
        print("data1",data1)
        print("data",data)
        return "错误信息"
    
    if __name__ == '__main__':
        app.run()
    代码

     

    #1 如果要用flash就必须设置app.secret_key = 'asdfasdf'
    #2 只能取一次,在取就没有了
    #3 可以通过  flash('普通信息',category="info"),对信息做分类
    #4 get_flashed_messages(with_categories=True,category_filter=("error",)),with_categories以键值对的形式获取
    #  设置闪现,category_filter=("error",)进行分类信息的过滤

    七、请求扩展

    1 before_request

    类比django中间件中的process_request,在请求收到之前绑定一个函数做一些事情

    #基于它做用户登录认证
    @app.before_request
    def process_request(*args,**kwargs):
        if request.path == '/login':
            return None
        user = session.get('user_info')
        if user:
            return None
        return redirect('/login')

    2 after_request

    类比django中间件中的process_response,每一个请求之后绑定一个函数,如果请求没有异常

    @app.after_request
    def process_response1(response):
        print('process_response1 走了')
        return response

    3 before_first_request

    第一次请求时,跟浏览器无关(接收的第一个请求)

    @app.before_first_request
    def first():
        pass

    4 teardown_request

    如论有没有异常都会执行,如果没有异常这个参数就是None,有就记录这个异常

    @app.teardown_request 
    def ter(e):
        pass

    5 errorhandler

    捕获异常,如果出现异常,而且状态就是@app.errorhandler(404),服务器内部错误500

    @app.errorhandler(404)
    def error_404(arg):
        return "404错误了"

    6 template_global

    标签

    @app.template_global()
    def sb(a1, a2):
        return a1 + a2
    #{{sb(1,2)}}

    7 template_filter

    过滤器

    @app.template_filter()
    def db(a1, a2, a3):
        return a1 + a2 + a3
    #{{ 1|db(2,3)}}

    总结:

    1 重点掌握before_request和after_request,

    2 注意有多个的情况,执行顺序

    3 before_request请求拦截后(也就是有return值),response所有都执行

    八、中间件

    from flask import Flask
    
    app = Flask(__name__)
    
    @app.route('/')
    def index():
        return 'Hello World!'
    # 模拟中间件
    class Md(object):
        def __init__(self,old_wsgi_app):
            self.old_wsgi_app = old_wsgi_app
    
        def __call__(self,  environ, start_response):
            print('开始之前')
            ret = self.old_wsgi_app(environ, start_response)
            print('结束之后')
            return ret
    
    if __name__ == '__main__':
        #1我们发现当执行app.run方法的时候,最终执行run_simple,最后执行app(),也就是在执行app.__call__方法 
        #2 在__call__里面,执行的是self.wsgi_app().那我们希望在执行他本身的wsgi之前做点事情。
        #3 所以我们先用Md类中__init__,保存之前的wsgi,然后我们用将app.wsgi转化成Md的对象。
        #4 那执行新的的app.wsgi_app,就是执行Md的__call__方法。
        #把原来的wsgi_app替换为自定义的,
        
        app.wsgi_app = Md(app.wsgi_app)
        app.run()

    请求所有的流程

    ctx = self.request_context(environ)
            error = None
            try:
                try:
                    ctx.push()
                    #根据路径去执行视图函数,视图类
                    response = self.full_dispatch_request()
                except Exception as e:
                    error = e
                    response = self.handle_exception(e)
                except:  # noqa: B001
                    error = sys.exc_info()[1]
                    raise
                return response(environ, start_response)
            finally:
                #不管出不出异常,都会走这里
                if self.should_ignore_error(error):
                    error = None
                ctx.auto_pop(error)

    九、请求上下文源码分析

     

    '''
    1     app.__call__
    2     wsgi_app(environ, start_response) 
    2.1 ctx = self.request_context(environ)
        2.1.1 return RequestContext(self, environ)
            这里的self是app,environ请求相关
        2.1.2 return RequestContext(self, environ)
        得到了RequestContext的对象,而且有request属性
    2.2  2.1中的ctx就是RequestContext的对象  
    
    2.3  ctx.push()执行这个,就是RequestContext的对象的push方法
         2.3.1  #执行这个,self-->ctx
            _request_ctx_stack.push(self) 
            2.3.1.1 我们发现_request_ctx_stack = LocalStack()
            他的push方法的源码:
                    def push(self, obj):
                        rv = getattr(self._local, "stack", None)
                        if rv is None:     
                            # self._local=>stack-->storage['线程id']['stack']=[ctx,]
                            self._local.stack = rv = []
                        rv.append(obj)
                        return rv


    3在请求中获取request.form 3.1 request是LocalProxy的对象,当获取属性的时候会走__getattr__ def __getattr__(self, name): if name == "__members__": return dir(self._get_current_object()) #name-->form, #self._get_current_object()===>ctx.request,form #_get_current_object()---》self.__local() return getattr(self._get_current_object(), name) 3.1.1 self._get_current_object():源码:最终:partial(_lookup_req_object, "request") def _get_current_object(self): if not hasattr(self.__local, "__release_local__"): #local==>partial(_lookup_req_object, "request") #def __init__(self, local, name=None): # object.__setattr__(self, "_LocalProxy__local", local) #self.__local()===>local() return self.__local() try: return getattr(self.__local, self.__name__) except AttributeError: raise RuntimeError("no object bound to %s" % self.__name__)


    4 partial(_lookup_req_object, "request")偏函数的源码 def _lookup_req_object(name): #name是request #ctx top = _request_ctx_stack.top if top is None: raise RuntimeError(_request_ctx_err_msg) #ctx-->request return getattr(top, name) 4.1中_request_ctx_stack.top @property def top(self): try: return self._local.stack[-1] except (AttributeError, IndexError): return None

    十、蓝图

    对程序进行目录结构划分

    不使用蓝图,自己分文件

    目录结构:

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Title</title>
    </head>
    <body>
    <h1>随便写点东西</h1>
    </body>
    </html>
    index.html
    from flask import Flask
    app = Flask(__name__)
    from pro import views
    
    app.register_blueprint(views.us)  # 注册
    __init__.py
    from flask import Blueprint,render_template
    us = Blueprint("user",__name__)
    @us.route('/')
    def index():
    
        return render_template("index.html")
    views.py
    from pro import app
    
    if __name__ == '__main__':
        app.run()
    manage.py

    十一、g对象

    专门用来存储用户信息的g对象,g的全称的为global

    g对象在一次请求中的所有的代码的地方,都是可以使用的

    g对象和session的区别

    session对象是可以跨request的,只要session还未失效,不同的request的请求会获取到同一个session,但是g对象不是,g对象不需要管过期时间,请求一次就g对象就改变了一次,或者重新赋值了一次

    g对象的特性

    当前请求内你设置就可以取,必须先设置,后取,当前请求可以取无限次
    就算你当前请求,设置了,如果不取,其他请求过来,也取不到

    十二、信号

    Flask框架中的信号基于blinker,其主要就是让开发者可是在flask请求过程中定制一些用户行为

    安装:

    pip3 install blinker

    内置信号:

    request_started = _signals.signal('request-started')                # 请求到来前执行
    request_finished = _signals.signal('request-finished')              # 请求结束后执行
     
    before_render_template = _signals.signal('before-render-template')  # 模板渲染前执行
    template_rendered = _signals.signal('template-rendered')            # 模板渲染后执行
     
    got_request_exception = _signals.signal('got-request-exception')    # 请求执行出现异常时执行
     
    request_tearing_down = _signals.signal('request-tearing-down')      # 请求执行完毕后自动执行(无论成功与否)
    appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)
     
    appcontext_pushed = _signals.signal('appcontext-pushed')            # 应用上下文push时执行
    appcontext_popped = _signals.signal('appcontext-popped')            # 应用上下文pop时执行
    message_flashed = _signals.signal('message-flashed')                # 调用flask在其中添加数据时,自动触发

    使用信号:

     

    from flask import Flask,signals
    app = Flask(__name__)
    def func(*args,**kwargs):
        print('触发信号',args,kwargs)
    # 与该信号进行绑定
    signals.request_started.connect(func)
    # 触发信号:signals.request_started.send()
    @app.before_first_request
    def before_first1(*args,**kwargs):
        print('before_first_request')
    @app.before_request
    def before_first2(*args,**kwargs):
        print('before_request')
    
    @app.route('/',methods=['GET','POST'])
    def index():
        print('视图')
        return '视图'
    if __name__ == '__main__':
        app.run()
    代码

    自定义信号:

    十三、flask_session

    作用:将默认保存的签名cookie中的值 保存到 redis/memcached/file/Mongodb/SQLAlchemy

    安装:pip3 install flask-session

    第一种使用方法:

    from flask import Flask,session
    from flask_session import RedisSessionInterface
    import redis
    app = Flask(__name__)
    app.secret_key="ajksda"
    conn=redis.Redis(host='127.0.0.1',port=6379)
    app.session_interface=RedisSessionInterface(conn,key_prefix='jason',use_signer=True,permanent=False)
    @app.route('/')
    def hello_world():
        session['nb']='jason'
        return 'Hello World!'
    
    @app.route("/index")
    def index():
        print(session['nb'])
        return "ok"
    
    if __name__ == '__main__':
        app.run()

    第二种使用方法:

    from flask import Flask,session
    import  redis
    from flask_session import Session
    app = Flask(__name__)
    app.config['SESSION_TYPE'] = 'redis'
    app.config['SESSION_REDIS'] =redis.Redis(host='127.0.0.1',port='6379')
    app.config['SESSION_KEY_PREFIX']="jason"
    Session(app)
    
    @app.route('/')
    def hello_world():
        session['sb']='jason'
        return 'Hello World!'
    
    @app.route("/index")
    def index():
        print(session['sb'])
        return "ok"
    
    if __name__ == '__main__':
        app.run()

    十四、多APP应用

    from werkzeug.wsgi import DispatcherMiddleware
    from werkzeug.serving import run_simple
    from flask import Flask
    app1 = Flask('app01')
    app2 = Flask('app02')
    
    @app1.route('/index')
    def index():
        return "app01"
    
    @app2.route('/index')
    def index2():
        return "app2"
    dm = DispatcherMiddleware(app1, {
        '/sec12': app2,
    })
    if __name__ == "__main__":
    
        run_simple('localhost', 5000, dm)

    十五、flask-script

    用于实现类似于django中 python3 manage.py runserver ...类似的命令

    安装:pip3 install flask-script

    from flask import Flask
    from flask_script import Manager
    app = Flask(__name__)
    manager = Manager(app)
    
    @app.route('/')
    def index():
        return 'ok'
    
    if __name__ == '__main__':
        manager.run()

     

     自定义命令

    #第一步安装:pip3 install flask-script
    from flask import Flask
    from flask_script import Manager
    app = Flask(__name__)
    manager=Manager(app)
    
    @app.route("/")
    def index():
        return "ok"
    
    @manager.command
    def custom1(arg,a):
        """
        自定义命令
        python manage.py custom 123
        :param arg:
        :return:
        """
        print(arg,a)
    
    
    @manager.option('-n', '--name', dest='name')
    @manager.option('-u', '--url', dest='url')
    def cmd(name, url):
        """
        自定义命令(-n也可以写成--name)
        执行: python manage.py  cmd -n lqz -u http://www.oldboyedu.com
        执行: python manage.py  cmd --name lqz --url http://www.oldboyedu.com
        :param name:
        :param url:
        :return:
        """
        print(name, url)
    
    if __name__ == '__main__':
        manager.run()

     

    十六、wtforms

    安装:pip3 install wtforms

    使用1:表单验证

    from flask import Flask, render_template, request, redirect
    from wtforms import Form
    from wtforms.fields import simple
    from wtforms import validators
    from wtforms import widgets
    
    app = Flask(__name__)
    
    app.debug = True
    
    
    class LoginForm(Form):
        # 字段(内部包含正则表达式)
        name = simple.StringField(
            label='用户名',
            validators=[
                validators.DataRequired(message='用户名不能为空.'),
                validators.Length(min=2, max=6, message='用户名长度必须大于%(min)d且小于%(max)d')
            ],
            widget=widgets.TextInput(), # 页面上显示的插件
            render_kw={'class': 'form-control'}
        )
        # 字段(内部包含正则表达式)
        pwd = simple.PasswordField(
            label='密码',
            validators=[
                validators.DataRequired(message='密码不能为空.'),
                validators.Length(min=8, message='密码长度必须大于%(min)d'),
                # validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*d)(?=.*[$@$!%*?&])[A-Za-zd$@$!%*?&]{8,}",
                #                   message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
    
            ],
            widget=widgets.PasswordInput(),
            render_kw={'class': 'form-control'}
        )
    
    
    
    @app.route('/login', methods=['GET', 'POST'])
    def login():
        if request.method == 'GET':
            form = LoginForm()
            return render_template('login.html', form=form)
        else:
            form = LoginForm(formdata=request.form)
            if form.validate():
                print('用户提交数据通过格式验证,提交的值为:', form.data)
            else:
                print(form.errors)
            return render_template('login.html', form=form)
    
    if __name__ == '__main__':
        app.run()

    login.html

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Title</title>
    </head>
    <body>
    <h1>登录</h1>
    <form method="post">
        <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>
    
        <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
        <input type="submit" value="提交">
    </form>
    </body>
    </html>

    使用2:all表单验证

    from flask import Flask, render_template, request, redirect
    from wtforms import Form
    from wtforms.fields import core
    from wtforms.fields import html5
    from wtforms.fields import simple
    from wtforms import validators
    from wtforms import widgets
    
    app = Flask(__name__, template_folder='templates')
    app.debug = True
    
    
    
    class RegisterForm(Form):
    
        def validate_pwd_confirm(self, field):
            """
            自定义pwd_confirm字段规则,例:与pwd字段是否一致
            :param field:
            :return:
            """
            # 最开始初始化时,self.data中已经有所有的值
            print(field.data)
            if field.data !="sb":
                #raise validators.ValidationError("sb")  # 继续后续验证
                raise validators.StopValidation("SB")  # 不再继续后续验证
    
            # if field.data != self.data['pwd']:
            #     raise validators.ValidationError("密码不一致") # 继续后续验证
                #raise validators.StopValidation("密码不一致")  # 不再继续后续验证
    
        name = simple.StringField(
            label='用户名',
            validators=[
                validators.DataRequired()
            ],
            widget=widgets.TextInput(),
            render_kw={'class': 'form-control'},
            default='tank'
        )
    
        pwd = simple.PasswordField(
            label='密码',
            validators=[
                validators.DataRequired(message='密码不能为空.')
            ],
            widget=widgets.PasswordInput(),
            render_kw={'class': 'form-control'}
        )
    
        pwd_confirm = simple.PasswordField(
            label='重复密码',
            validators=[
                validate_pwd_confirm,
                validators.DataRequired(message='重复密码不能为空.'),
                #validators.EqualTo('pwd', message="两次密码输入不一致")
            ],
            widget=widgets.PasswordInput(),
            render_kw={'class': 'form-control'}
        )
    
        email = html5.EmailField(
            label='邮箱',
            validators=[
                validators.DataRequired(message='邮箱不能为空.'),
                validators.Email(message='邮箱格式错误')
            ],
            widget=widgets.TextInput(input_type='email'),
            render_kw={'class': 'form-control'}
        )
    
        gender = core.RadioField(
            label='性别',
            choices=(
                (1, ''),
                (2, ''),
            ),
            coerce=int # “1” “2”
         )
        city = core.SelectField(
            label='城市',
            choices=(
                ('bj', '北京'),
                ('sh', '上海'),
            )
        )
    
        hobby = core.SelectMultipleField(
            label='爱好',
            choices=(
                (1, '篮球'),
                (2, '足球'),
            ),
            coerce=int
        )
    
        favor = core.SelectMultipleField(
            label='喜好',
            choices=(
                (1, '篮球'),
                (2, '足球'),
            ),
            widget=widgets.ListWidget(prefix_label=False),
            option_widget=widgets.CheckboxInput(),
            coerce=int,
            default=[1, 2]
        )
    
        def __init__(self, *args, **kwargs):
            super(RegisterForm, self).__init__(*args, **kwargs)
            self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))
            self.favor.data=[1,]
    
    
    
    
    
    @app.route('/register', methods=['GET', 'POST'])
    def register():
        if request.method == 'GET':
            form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial
            return render_template('register.html', form=form)
        else:
            form = RegisterForm(formdata=request.form)
            if form.validate():
                print('用户提交数据通过格式验证,提交的值为:', form.data)
            else:
                print(form.errors)
            return render_template('register.html', form=form)
    
    
    
    if __name__ == '__main__':
        app.run()

    login.html

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Title</title>
    </head>
    <body>
    <h1>用户注册</h1>
    <form method="post" novalidate style="padding:0  50px">
        {% for field in form %}
        <p>{{field.label}}: {{field}} {{field.errors[0] }}</p>
        {% endfor %}
        <input type="submit" value="提交">
    </form>
    </body>
    </html>
  • 相关阅读:
    Timer 函数 C#
    【学习笔记】 golang的插件 创建share object
    mongodb数组操作
    mongo数据库的表设计原则:二
    mongodb + nginx 存储照片方案
    mongodb聚合操作$使用例子
    mongo配置复制集
    尼采全集
    java日志框架
    常用包收集
  • 原文地址:https://www.cnblogs.com/xiongying4/p/11844737.html
Copyright © 2011-2022 走看看