zoukankan      html  css  js  c++  java
  • Flask框架

    前言: 
        1.flask 是web框架,他和django一样是同步的框架
        2.Flask 基于Werkzeug WSGI工具箱和Jinja2 模板引擎
        安装:flask:  pip install  flask
    app.run()
        本质:当执行app.run方法的时候,最终执行run_simplerun_simple(host,port,app,**aptions)
             最后 app(),对象加(),执行__call__方法
    flask的4剑客:
        1.直接返回字符串
            return "ok"
         2.返回 html页面
            首先导入render_template
            return render_template("index.html",name="xxx ")
         3.跳转页面
            return redirect('/lobin')
         4.返回json数据
            return jsonify(name_dict)
    flask配置文件的方法:
        1.app对象.属性=值 这种方式只能配置这两种 配置
            app.debug=True
            app.secret_key="4324353"
        2.已字典的形式配置
            app.config['DEGUG']=True
            
        3.以文件形式
            新建一个xxx.py文件,内写配置
                eg:DEGUG=True
            app.config.from_pyfile('xxx.py')
    
        4.以类的形式配置
            新建一个xxx.py文件,内写各种场景下的类
            
            class Config(object):
                DEBUG = False
                TESTING = False
                DATABASE_URI = 'sqlite://:memory:'
    
    
            class ProductionConfig(Config):
                DATABASE_URI = 'mysql://user@localhost/foo'
    
    
            class DevelopmentConfig(Config):
                DEBUG = True
    
    
            class TestingConfig(Config):
                TESTING = True
            
            app.config.from_project('xxx.xxx类 ')
    配置方法
    flask路由:
        @app.route("/index")本质是 app.add_url_rule("/index",view_func='index')
        route的参数:
            endpoint: 起别名 ,反向解析 即: url_for('名称')   注:不能重复
            methods: 不指定默认GET,允许的请求方式 如:["GET", "POST"]
            view_func:是endpoint 指向的函数:也就是请求该路由要响应的函数
            strict_slashes:该参数是用来设置我们的路由是否为严格模式,默认严格,True严格
            
            redirect_to:重定向的指定的地址
        
        route使用正则:
             我们要用自定义的路由,用正则的话
            1.导入from werkzeug.routing import BaseConverter
            2.要写一个类,然后继承BaseConverter,然后实现__init__;to_python(self, value);to_url(self, value)
            3.app.url_map.converters['谁便'] = RegexConverter
            4.我们在路由里面@app.route('/index/<regex1("d+"):nid>'),regex1='谁便,regex1("正则表达式")
            5.regex1("正则表达式")匹配出来的结果,返回to_python,一定要return
            6.当我们做反向解析的解析的时候,我们的参数,会传递给to_url,return的结果才是我们拼接到我们路由上
    
            
            from flask import Flask, views, url_for
            from werkzeug.routing import BaseConverter
    
            app = Flask(import_name=__name__)
    
            class RegexConverter(BaseConverter):
                """
                自定义URL匹配正则表达式
                """
                def __init__(self, map, regex):
                    super(RegexConverter, self).__init__(map)
                    self.regex = regex
    
                def to_python(self, value):
                    """
                    路由匹配时,匹配成功后传递给视图函数中参数的值
                    """
                    #value就正则匹配出来的结果
                    return "asdasdasd"
    
                def to_url(self, value):
                    """
                    使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数
                    """
                    val = super(RegexConverter, self).to_url(value)
                    print(val)
                    return val
    
            app.url_map.converters['regex1'] = RegexConverter
            @app.route('/index/<regex1("d+"):nid>',endpoint="sb")
            def index(nid):
                print(url_for('sb', nid='888'))
                return 'Index'
    
            if __name__ == '__main__':
                app.run()    
    flask路由
    cbv:
        原始:
            from flask import Flask,views,request
    
            app = Flask(__name__)
            app.config.from_pyfile('settings.py')
    
            class IndexView(views.View):
                def dispatch_request(self):
                    print(request.args)
                    return 'index'
    
            app.add_url_rule('/index',view_func=IndexView.as_view(name='index'))
            if __name__ == '__main__':
                app.run()    
    
        注:为什么as_view(name='index')?
            如果不指定,每次都是view,会报错,所有必须指定
        
        要想仿照django,类得继承Method.view
            from flask import Flask, views, request, url_for
            from functools import wraps
    
            def login_verify(func):
                @wraps(func)
                def wrapper(*args, **kwargs):
                    user_name = request.args.get('user')
                    password = request.args.get('password')
                    if user_name == 'mark' and password == '123':
                        return func(*args,**kwargs)
                    else:
                        return '请登录'
                return wrapper
    
    
            class CBVTest(views.MethodView):
    
                methods = ['GET','POST']  # 指定可以接收的方法有什么
                decorators = [login_verify,]  # 指定自定义的装饰器
    
                def get(self):
                    print(url_for('cbvtest'))
                    return 'cbv_get'
                def post(self):
                    return 'cbv_post'
            app.add_url_rule('/cbvtest',view_func=CBVTest.as_view(name='cbvtest'),endpoint='end_demo')
    请求与响应:
       请求相关信息
        request.method            提交的方法
        request.args               get请求提及的数据
        request.form               post请求提交的数据
        equest.values              post和get提交的数据总和
        request.cookies            客户端所带的cookie
        request.headers            请求头
        request.path               不带域名,请求路径
        request.full_path          不带域名,带参数的请求路径
        request.script_root
        request.url               带域名带参数的请求路径
        request.base_url          带域名请求路径
        request.url_root          域名
        request.host_url          域名
        request.host              127.0.0.1:500
        request.files           获取传过来的文件
    
       响应相关信息
         自定义一些响应
         1.导入make_response
         2.response=make_response(4剑客)
         3.操作response
         4.return response
          eg:
            response=make_response(render_template('index.html'))
            #设置cookie
            response.set_cookie('jason', 'sb')
            #删除cookie
            response.delete_cookie('key')
            #设置header 
            response.headers['X-Something'] = 'A value sbwewewe'
            
            obj = request.files['the_file_name']
            obj.save('/var/www/uploads/' + secure_filename(f.filename))
            return response
    session:
        1导入session
        2必须设置app.secret_key="askjdaksd"
        3存session['name']="xxx"
        4取session['name']
        app.session_interface
        原理:
            存: 把session中的值加密序列化放到cookie中,返回到浏览器中
                1.调用 save_session,将我们的session 通过加密得到val,读取配置文件中的['SESSION_COOKIE_NAME']得到key
                2.将key与val 存进cookie
                
            取:从cookie中取出值,反解,生成session对象,在视图函数中直接用sessoin就可以
                1.获取request里面的cookies,获取里面的key,这个key就是['SESSION_COOKIE_NAME'],值就是加密的值
                2.对值进行解密
                
    闪现:
        what?
            a页面产生信息,传给c页面;
        但是用户访问a页面以后,不是直接跳到c页面, 而是途经其他页面后,才去访问c页面,
        这时拿到a页面给的信息
        
        flash:存信息 
        get_flashed_messages:取信息
        
        from flask import Flask,render_template
        from flask import session,flash,get_flashed_messages
    
        app = Flask(__name__)
        app.config.from_pyfile('settings.py')
    
        @app.route('/')
        def home():
            session['name'] = 'wangyanfei'
            flash('普通信息', category='info')
            return render_template('home.html')
    
        @app.route('/index')
        def index():
            A = get_flashed_messages(with_categories=True)
            B = get_flashed_messages()
            C = get_flashed_messages(with_categories=True)
            print(A)
            print(B,C)
            return "dslkfjsdkjfl"
    
        if __name__ == '__main__':
            app.run()
        
        注:1.要用闪现 必须设置app.secret_key='dfk';
        
           2.我们可以通过 flash的参数 category 对信息做分类
                eg: flash('xxxx',category="info")
           3.我们可以通过get_flashed_messages的参数
                >:with_categories:以键值对的形式获取我们设置的闪现
                >:category_filter:进行分类信息的过滤
                eg: get_flashed_messages(with_categories=True,category_filter=('error',))
                  
           4.同一次请求内,不管取多少次,都可以取到;第二次请求时就取不到了,因为上下文管理会把
                _request_ctx_stack.top.flashes 清空掉
    请求扩展:
        1.before_request  类比django中间件中的process_request,在请求收到之前绑定一个函数做一些事情
            #基于它做用户登录认证
            @app.before_request
            def process_request(*args,**kwargs):
                if request.path == '/login':
                    return None
                user = session.get('user_info')
                if user:
                    return None
                return redirect('/login')
        2.after_request  类比django中间件中的process_response,每一个请求之后绑定一个函数,如果请求没有异常
            
            @app.after_request
            def process_response1(response):
                print('process_response1 走了')
                return response
                
        3.before_first_request 第一次请求时 执行,跟浏览器无关
        
        4.teardown_request  如论有没有异常都会执行,如果没有异常这个参数就是None,有就记录这个异常
        
        5.errorhandler 捕获异常并处理让用户无法感知 路径不存在时404,服务器内部错误500
            @app.errorhandler(404)
            def error_404(arg):
                return "404错误了"
                
            @app.errorhandler(500)
            def error_500(arg):
                print(arg)
                return "500错误了"
    
        6.template_global  标签 被装饰的函数在不传给模板的情况下,模板可以直接调用    
            @app.template_global()
            def sb(a1, a2):
                return a1 + a2
            #{{sb(1,2)}}
        7.template_filter   过滤器
            @app.template_filter()
            def db(a1, a2, a3):
                return a1 + a2 + a3
            #{{ 1|db(2,3)}}
    
                
        小结:
            1. 请求来时 多个before_request的话 ,自上而下执行
            2. 如果前面的有返回值,后面就不会执行,包括正真的视图函数,但是after_request依然会执行
            3. response走时 自下而上 的执行(源码中是先反转,再遍历);必须接受response,也必须返回response
            
    中间件(了解):
    
        from flask import Flask
    
        app = Flask(__name__)
    
        @app.route('/')
        def index():
            return 'Hello World!'
        # 模拟中间件
        class Md(object):
            def __init__(self,old_wsgi_app):
                self.old_wsgi_app = old_wsgi_app
    
            def __call__(self,  environ, start_response):
                print('开始之前')
                ret = self.old_wsgi_app(environ, start_response)
                print('结束之后')
                return ret
    
        if __name__ == '__main__':
            #1我们发现当执行app.run方法的时候,最终执行run_simple,最后执行app(),也就是在执行app.__call__方法    
            #2 在__call__里面,执行的是self.wsgi_app().那我们希望在执行他本身的wsgi之前做点事情。
            #3 所以我们先用Md类中__init__,保存之前的wsgi,然后我们用将app.wsgi转化成Md的对象。
            #4 那执行新的的app.wsgi_app,就是执行Md的__call__方法。
            #把原来的wsgi_app替换为自定义的,
            
            app.wsgi_app = Md(app.wsgi_app)
            app.run()
    线程与偏函数:
        1.因为线程是不安全的,所以我们要解决这种线程不安全的问题,有如下两种解决方案
    
            方案一:加锁,多个线程要真正实现共用一个数据,并且该线程修改了数据之后会影响到其他线程
    
            方案二:使用threading.local对象把要修改的数据复制一份,使得每个数据互不影响
    
            思考:
                为什么不用加锁的思路?加锁的思路是多个线程要真正实现共用一个数据,并且该线程修改了数据之后会影响到其他线程,
            更适合类似于12306抢票的应用场景,而我们是要做请求对象的并发,想要实现的是该线程对于请求对象这部分内容有任何修改
            并不影响其他线程。所以使用方案二
    
        2.请求并发设计
            函数与方法:
                from types import MethodType,FunctionType
    
                class Foo(object):
                    def fetch(self):
                        pass
    
                print(isinstance(Foo.fetch,MethodType))
                print(isinstance(Foo.fetch,FunctionType)) # True
    
                obj = Foo()
                print(isinstance(obj.fetch,MethodType)) # True
                print(isinstance(obj.fetch,FunctionType))
            threading.local:
                不用local:
                from threading import Thread
                import time
                cxw = -1
                def task(arg):
                    global cxw
                    cxw = arg
                    # time.sleep(2)
                    print(cxw)
    
                for i in range(10):
                    t = Thread(target=task,args=(i,))
                    t.start()
                    
                使用local:
                    from threading import Thread
                    from threading import local
                    import time
                    from threading import get_ident
                    # 特殊的对象
                    cxw = local()
                    def task(arg):
                        # 对象.val = 1/2/3/4/5
                        cxw.value = arg
                        time.sleep(2)
                        print(cxw.value)
                    for i in range(10):
                        t = Thread(target=task,args=(i,))
                        t.start()
            通过字典自定义threading.local(函数):
                from threading import get_ident,Thread
                import time
                    storage = {}
                    def set(k,v):
                        ident = get_ident()
                        if ident in storage:
                            storage[ident][k] = v
                        else:
                            storage[ident] = {k:v}
                    def get(k):
                        ident = get_ident()
                        return storage[ident][k]
                    def task(arg):
                        set('val',arg)
                        v = get('val')
                        print(v)
    
                    for i in range(10):
                        t = Thread(target=task,args=(i,))
                        t.start()
            面向对象:
                from threading import get_ident,Thread
                import time
                class Local(object):
                    storage = {}
                    def set(self, k, v):
                        ident = get_ident()
                        if ident in Local.storage:
                            Local.storage[ident][k] = v
                        else:
                            Local.storage[ident] = {k: v}
                    def get(self, k):
                        ident = get_ident()
                        return Local.storage[ident][k]
                obj = Local()
                def task(arg):
                    obj.set('val',arg) 
                    v = obj.get('val')
                    print(v)
                for i in range(10):
                    t = Thread(target=task,args=(i,))
                    t.start()
            通过setattr和getattr实现:
                from threading import get_ident,Thread
                import time
                class Local(object):
                    storage = {}
                    def __setattr__(self, k, v):
                        ident = get_ident()
                        if ident in Local.storage:
                            Local.storage[ident][k] = v
                        else:
                            Local.storage[ident] = {k: v}
                    def __getattr__(self, k):
                        ident = get_ident()
                        return Local.storage[ident][k]
                obj = Local()
                def task(arg):
                    obj.val = arg
                    print(obj.val)
                for i in range(10):
                    t = Thread(target=task,args=(i,))
                    t.start()
            每个对象有自己的存储空间(字典):
                from threading import get_ident,Thread
                import time
                class Local(object):
                    def __init__(self):
                        object.__setattr__(self,'storage',{})
                        self.storage={}
                    def __setattr__(self, k, v):
                        ident = get_ident()
                        if ident in self.storage:
                            self.storage[ident][k] = v
                        else:
                            self.storage[ident] = {k: v}
                    def __getattr__(self, k):
                        ident = get_ident()
                        return self.storage[ident][k]
                obj = Local()
                def task(arg):
                    obj.val = arg
                    obj.xxx = arg
                    print(obj.val)
                for i in range(10):
                    t = Thread(target=task,args=(i,))
                    t.start()
            兼容线程和协程(源码到request中去看,看local的__getattr__,setattr):
                try:
                    from greenlet import getcurrent as get_ident
                except Exception as e:
                    from threading import get_ident
                from threading import Thread
                import time
                class Local(object):
                    def __init__(self):
                        object.__setattr__(self,'storage',{})
                    def __setattr__(self, k, v):
                        ident = get_ident()
                        if ident in self.storage:
                            self.storage[ident][k] = v
                        else:
                            self.storage[ident] = {k: v}
                    def __getattr__(self, k):
                        ident = get_ident()
                        return self.storage[ident][k]
                obj = Local()
                def task(arg):
                    obj.val = arg
                    obj.xxx = arg
                    print(obj.val)
                for i in range(10):
                    t = Thread(target=task,args=(i,))
                    t.start()
                           
            情况一:单进程单线程,基于全局变量就可以做
            情况二:单进程多线程,基于threading.local对象做
            情况三:单进程多线程多协程,如何做?
            方法:依赖于底层的werkzeug外部包,werkzeug实现了保证多线程和多协程的安全
            原理:
                就是在最开始导入线程和协程的唯一标识的时候统一命名为get_ident,并且先导入协程模块的时候如果报错说明不支持协程,
            就会去导入线程的get_ident,这样无论是只有线程运行还是协程运行都可以获取唯一标识,并且把这个标识的线程或协程需要设
            置的内容都分类存放于__storage__字典中.
                    
            
        3.偏函数
            定义:
                当函数的参数太多,需要简化时,用functools.partial可以创建一个新的函数,这个新函数可以固定住原函数的部分参数,
            从而在调用时更简单.
            #偏函数的第二个部分(可变参数),按原有函数的参数顺序进行补充,参数将作用在原函数上,最后偏函数返回一个新函数
            from functools import partial
            def test(a,b,c,d):
                return a+b+c+d
    
            tes=partial(test,1,2)
            print(tes(3,4))
    线程与偏函数
    上下文管理
       1 app.__call__
        2 wsgi_app(environ, start_response) 
            2.1 ctx = self.request_context(environ)        
                2.1.1 return RequestContext(self, environ)
                    这里的self是app,environ请求相关
                2.1.2 return RequestContext(self, environ)
                    得到了RequestContext的对象,而且有request属性
            2.2  2.1中的ctx就是RequestContext的对象  
    
            2.3  ctx.push()执行这个,就是RequestContext的对象的push方法
                2.3.1  #执行这个,self-->ctx
                    _request_ctx_stack.push(self) 
                2.3.1.1 我们发现_request_ctx_stack = LocalStack()
                    他的push方法的源码:
                        def push(self, obj):
                            rv = getattr(self._local, "stack", None)
                            if rv is None:     
                                # self._local=>stack-->storage['线程id']['stack']=[ctx,]
                                self._local.stack = rv = []
                            rv.append(obj)
                            return rv                    
        3在请求中获取request.form
            3.1 request是LocalProxy的对象,当获取属性的时候会走__getattr__
                def __getattr__(self, name):
                    if name == "__members__":
                        return dir(self._get_current_object())
                    #name-->form,
                    #self._get_current_object()===>ctx.request,form
                    #_get_current_object()---》self.__local()
                    
                    return getattr(self._get_current_object(), name)
                    
                3.1.1 self._get_current_object():源码:最终:partial(_lookup_req_object, "request")
                 def _get_current_object(self):
                 
                    if not hasattr(self.__local, "__release_local__"):
                            #local==>partial(_lookup_req_object, "request")
                            #def __init__(self, local, name=None):
                                # object.__setattr__(self, "_LocalProxy__local", local)
                        #self.__local()===>local()
                        return self.__local()
                    try:
                        return getattr(self.__local, self.__name__)
                    except AttributeError:
                        raise RuntimeError("no object bound to %s" % self.__name__)
        4 partial(_lookup_req_object, "request")偏函数的源码
            def _lookup_req_object(name):
                #name是request
                #ctx
                top = _request_ctx_stack.top
                if top is None:
                    raise RuntimeError(_request_ctx_err_msg)
                #ctx-->request
                return getattr(top, name)
            4.1中_request_ctx_stack.top
               @property
                def top(self):
                
                try:
                    return self._local.stack[-1]
                except (AttributeError, IndexError):
                    return None
    
        总结:其实操作flask的请求上下文就是操作Local中的字典__storage__
            
            1.通过REquestContext类首先实例化ctx请求上下文对象,其内部包含请求对象
    
            2.入栈,通过请求上下文对象的类的push()方法触发了LocalStack类的push() 方法,从而添加到Local类中的字典里。
    
            3.观察导入的request源码 ,通过观察LocalProxy的源码,最后触发了LocalStack的top()方得到上下文对象,再得到请求对象
              从而实现reuqest的功能。到请求对象,从而实现reuqest的功能。
    
            4.出栈通过请求上下文对象的类的方法,触发了LocalStack的的pop()方法从而从字典中删除掉当前线程或当前携程的请求信息。
    蓝图:
        基本使用: 创建蓝图-->利用蓝图创建路由关系-->注册蓝图
            -templates
            -static
            -views
                -user.py
                -order.py
            -app.py
                
            user.py/order.py
            
                from flask import Blueprint
    
                # 1 创建蓝图
                user_bp = Blueprint('user',__name__)
    
                # 2 利用蓝图创建路由关系
                @user_bp.route('/login/')
                def login():
                    return "login"
    
                @user_bp.route('/logout/')
                def logout():
                    return "logout"
                    
            app.py    
            
                from flask import Flask
                from views.user import user_bp
                from views.order import order_bp
    
                app = Flask(__name__)
                # 3 注册蓝图
                app.register_blueprint(user_bp)
                app.register_blueprint(order_bp)
    
    
                if __name__ == '__main__':
                    app.run()
    
            注:
                1.user_bp :是用于指向创建出的蓝图对象,可以自由命名。
                2.Blueprint的第一个参数自定义命名的‘user’用于url_for翻转url时使用。
                3.__name__用于寻找蓝图自定义的模板和静态文件使用。
        高级使用:
            1.蓝图中实现path部分的url前缀
                1.创建蓝图的时候填写url_prefix可以为增加url的path部分的前缀,可以更方便的去管理访问视图函数
                    from flask import Blueprint
                    # 1 创建蓝图
                    user_bp = Blueprint('user',__name__,url_prefix='/user')
                    # 注意斜杠跟视图函数的url连起来时候不要重复了。
                    
                2.url加前缀的时候也可以再注册蓝图的时候加上,更推荐这么做,因为代码的可读性更强。
                    app.register_blueprint(user_bp,url_prefix='/order')
            2.蓝图中自定义模板路径    
                创建蓝图的时候填写template_folder可以指定自定义模板路径
                    # 1 创建蓝图                                           #所对应的参数路径是相对于蓝图文件的
                    user_bp = Blueprint('user',__name__,url_prefix='/user',template_folder='views_templates')
                注:
                    1.蓝图虽然指定了自定义的模板查找路径,但是查找顺序还是会先找主app规定的模板路径(templates),
                      找不到再找蓝图自定义的模板路径
                    2.Blueprint的template_folder参数指定的自定义模板路径是相对于蓝图文件的路径。
    g对象:
        作用:
            专门用来存储用户信息的,g的全称的为global
        使用范围:    
            g对象在一次请求中的所有的代码的地方,都是可以使用的
        特性:
            1.当前请求内你设置了就可以取,必须先设置,后取,当前请求可以取无数次
            2.当前请求内你设置了,没取,其他请求来了也取不到
        
            from  flask import Flask,request,g
            
            app=Flask(__name__)
            
            def set_g():
                g.name='sb'
    
            @app.route("/")
            def index():
                set_g()
    
                return redirect("/index")
    
            @app.route("/index")
            def login():
                print(g.name)
                return "2"
    
            if __name__ == '__main__':
                app.run()
                
            
    
        g对象和session的区别:
            session对象是可以跨request的,只要session还未失效,不同的request的请求会获取到同一个session;
            g对象不是,g对象不需要管过期时间,请求一次就g对象就改变了一次,或者重新赋值了一次
    信号:
        定义:基于blinker,可以让开发者在flask请求过程中定制一些用户行为    
        安装: pip install blinker
        步骤: 
            1.往信号中注册函数signals.request_started.connect(func) 
            2.触发信号signals.request_started.send()
        自定义信号:
            from flask import Flask
            from flask.signals import _signals
    
            app = Flask(import_name=__name__)
    
            # 自定义信号
            xxxxx = _signals.signal('xxxxx')
    
            def func(sender,a):
                print(sender,a)
                print("我是自定义信号")
    
            # 自定义信号中注册函数
            xxxxx.connect(func)
    
    
            @app.route("/x")
            def index():
                # 触发信号
                xxxxx.send("sb",a="1")
                return 'Index'
    
            if __name__ == '__main__':
                app.run()
                
    flask-session:
    
        作用:将cookie中的值 保存到 redis/memcached/file/Mongodb/SQLAlchemy中
        安装:pip3 install flask-session
        
        方式1:
            from flask import Flask,session,jsonify
            from flask_session import RedisSessionInterface
            import redis
            app = Flask(__name__)
            app.config.from_pyfile('settings.py')
            conn = redis.Redis(host='127.0.0.1', port=6379)
            app.session_interface=RedisSessionInterface(conn,key_prefix='wyf')
            @app.route('/')
            def home():
                session['token'] = 'wyf2012'
                return 'ojbk'
    
            def login():
                a = session['token']
                print(a)
                dic = {'a': a}
                return jsonify(dic)
    
            app.add_url_rule('/login',view_func=login,endpoint='mylog')
    
            if __name__ == '__main__':
                app.run()
                
        方式2:
            from redis import Redis
            from flask.ext.session import Session
            app.config['SESSION_TYPE'] = 'redis'
            app.config['SESSION_REDIS'] = Redis(host='127.0.0.1',port='6379')
            Session(app)
            
        注:RedisSessionInterface的参数
            use_signer:是否对key签名
            permanent: 关闭浏览器,cookie是否失效
    flask-script:自定义一些类似django的命令
        安装: pip install flask-script
        使用:
            from flask import Flask
            from flask_script import Manager
            app = Flask(__name__)
            manager=Manager(app)
    
            @app.route("/")
            def index():
                return "ok"
    
            @manager.command
            def custom1(arg,a):
                """
                自定义命令
                python manage.py custom 123
                :param arg:
                :return:
                """
                print(arg,a)
    
    
            @manager.option('-n', '--name', dest='name')
            @manager.option('-u', '--url', dest='url')
            def cmd1(name, url):
                """
                自定义命令(-n也可以写成--name)
                执行: python manage.py  cmd -n lqz -u http://www.oldboyedu.com
                执行: python manage.py  cmd --name lqz --url http://www.oldboyedu.com
                :param name:
                :param url:
                :return:
                """
                print(name, url)
    
            if __name__ == '__main__':
                manager.run()
                
    多app应用:(了解)
        使用:
            from werkzeug.wsgi import DispatcherMiddleware
            from werkzeug.serving import run_simple
            from flask import Flask
            app1 = Flask('app01')
            app2 = Flask('app02')
    
            @app1.route('/index')
            def index():
                return "app01"
    
            @app2.route('/index')
            def index2():
                return "app2"
            dm = DispatcherMiddleware(app1, {
                '/sec12': app2,
            })
            if __name__ == "__main__":
    
                run_simple('localhost', 5000, dm)
    wtforms:
        安装: pip install wtforms
        使用:
            app.py
                from flask import Flask, render_template, request, redirect
                from wtforms import Form
                from wtforms.fields import simple
                from wtforms import validators
                from wtforms import widgets
    
                app = Flask(__name__)
    
                app.debug = True
    
    
                class LoginForm(Form):
                    # 字段(内部包含正则表达式)
                    name = simple.StringField(
                        label='用户名',
                        validators=[
                            validators.DataRequired(message='用户名不能为空.'),
                            validators.Length(min=2, max=6, message='用户名长度必须大于%(min)d且小于%(max)d')
                        ],
                        widget=widgets.TextInput(), # 页面上显示的插件
                        render_kw={'class': 'form-control'}
                    )
                    # 字段(内部包含正则表达式)
                    pwd = simple.PasswordField(
                        label='密码',
                        validators=[
                            validators.DataRequired(message='密码不能为空.'),
                            validators.Length(min=8, message='密码长度必须大于%(min)d'),
                            # validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*d)(?=.*[$@$!%*?&])[A-Za-zd$@$!%*?&]{8,}",
                            #                   message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
    
                        ],
                        widget=widgets.PasswordInput(),
                        render_kw={'class': 'form-control'}
                    )
    
    
    
                @app.route('/login', methods=['GET', 'POST'])
                def login():
                    if request.method == 'GET':
                        form = LoginForm()
                        return render_template('login.html', form=form)
                    else:
                        form = LoginForm(formdata=request.form)
                        if form.validate():
                            print('用户提交数据通过格式验证,提交的值为:', form.data)
                        else:
                            print(form.errors)
                        return render_template('login.html', form=form)
    
                if __name__ == '__main__':
                    app.run()
            
            html:
                <!DOCTYPE html>
                <html lang="en">
                <head>
                    <meta charset="UTF-8">
                    <title>Title</title>
                </head>
                <body>
                <h1>登录</h1>
                <form method="post">
                    <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>
                    <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
                    <input type="submit" value="提交">
                </form>
                </body>
                </html>
        
        所有表单验证:
            app.py
                from flask import Flask, render_template, request, redirect
                from wtforms import Form
                from wtforms.fields import core
                from wtforms.fields import html5
                from wtforms.fields import simple
                from wtforms import validators
                from wtforms import widgets
    
                app = Flask(__name__, template_folder='templates')
                app.debug = True
    
    
    
                class RegisterForm(Form):
    
                    def validate_pwd_confirm(self, field):
                        """
                        自定义pwd_confirm字段规则,例:与pwd字段是否一致
                        :param field:
                        :return:
                        """
                        # 最开始初始化时,self.data中已经有所有的值
                        print(field.data)
                        if field.data !="sb":
                            #raise validators.ValidationError("sb")  # 继续后续验证
                            raise validators.StopValidation("SB")  # 不再继续后续验证
    
                        # if field.data != self.data['pwd']:
                        #     raise validators.ValidationError("密码不一致") # 继续后续验证
                            #raise validators.StopValidation("密码不一致")  # 不再继续后续验证
    
                    name = simple.StringField(
                        label='用户名',
                        validators=[
                            validators.DataRequired()
                        ],
                        widget=widgets.TextInput(),
                        render_kw={'class': 'form-control'},
                        default='tank'
                    )
    
                    pwd = simple.PasswordField(
                        label='密码',
                        validators=[
                            validators.DataRequired(message='密码不能为空.')
                        ],
                        widget=widgets.PasswordInput(),
                        render_kw={'class': 'form-control'}
                    )
    
                    pwd_confirm = simple.PasswordField(
                        label='重复密码',
                        validators=[
                            validate_pwd_confirm,
                            validators.DataRequired(message='重复密码不能为空.'),
                            #validators.EqualTo('pwd', message="两次密码输入不一致")
                        ],
                        widget=widgets.PasswordInput(),
                        render_kw={'class': 'form-control'}
                    )
    
                    email = html5.EmailField(
                        label='邮箱',
                        validators=[
                            validators.DataRequired(message='邮箱不能为空.'),
                            validators.Email(message='邮箱格式错误')
                        ],
                        widget=widgets.TextInput(input_type='email'),
                        render_kw={'class': 'form-control'}
                    )
    
                    gender = core.RadioField(
                        label='性别',
                        choices=(
                            (1, ''),
                            (2, ''),
                        ),
                        coerce=int # “1” “2”
                     )
                    city = core.SelectField(
                        label='城市',
                        choices=(
                            ('bj', '北京'),
                            ('sh', '上海'),
                        )
                    )
    
                    hobby = core.SelectMultipleField(
                        label='爱好',
                        choices=(
                            (1, '篮球'),
                            (2, '足球'),
                        ),
                        coerce=int
                    )
    
                    favor = core.SelectMultipleField(
                        label='喜好',
                        choices=(
                            (1, '篮球'),
                            (2, '足球'),
                        ),
                        widget=widgets.ListWidget(prefix_label=False),
                        option_widget=widgets.CheckboxInput(),
                        coerce=int,
                        default=[1, 2]
                    )
    
                    def __init__(self, *args, **kwargs):
                        super(RegisterForm, self).__init__(*args, **kwargs)
                        self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))
                        self.favor.data=[1,]
    
    
    
    
    
                @app.route('/register', methods=['GET', 'POST'])
                def register():
                    if request.method == 'GET':
                        form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial
                        return render_template('register.html', form=form)
                    else:
                        form = RegisterForm(formdata=request.form)
                        if form.validate():
                            print('用户提交数据通过格式验证,提交的值为:', form.data)
                        else:
                            print(form.errors)
                        return render_template('register.html', form=form)
    
    
    
                if __name__ == '__main__':
                    app.run()
                        
            html:
                <!DOCTYPE html>
                <html lang="en">
                <head>
                    <meta charset="UTF-8">
                    <title>Title</title>
                </head>
                <body>
                <h1>用户注册</h1>
                <form method="post" novalidate style="padding:0  50px">
                    {% for field in form %}
                    <p>{{field.label}}: {{field}} {{field.errors[0] }}</p>
                    {% endfor %}
                    <input type="submit" value="提交">
                </form>
                </body>
    wtforms
    Flask-SQLAlchemy:
        定义:flask和SQLAchemy的管理者,通过他把他们做连接
        下载: pip install flask-sqlalchemy
        下载: pip install flask-migrate
        
        案例:
            acccount.py
                #!/usr/bin/env python
                # -*- coding:utf-8 -*-
    
                from flask import Blueprint
                from .. import db
                from .. import models
    
                account = Blueprint('account', __name__)
    
    
                @account.route('/login')
                def login():
                    # db.session.add(models.Users(username='lqz', email='123'))
                    # db.session.query(models.Users).all()
                    # db.session.commit()
                    # 添加示例
                    """
                    db.session.add(models.Users(username='lqz', pwd='123', gender=1))
                    db.session.commit()
    
                    obj = db.session.query(models.Users).filter(models.Users.id == 1).first()
                    print(obj)
    
                    PS: db.session和db.create_session
                    """
                    # db.session.add(models.Users(username='wupeiqi1', email='wupeiqi1@xx.com'))
                    # db.session.commit()
                    # db.session.close()
                    #
                    # db.session.add(models.Users(username='wupeiqi2', email='wupeiqi2@xx.com'))
                    # db.session.commit()
                    # db.session.close()
                    # db.session.add(models.Users(username='alex1',email='alex1@live.com'))
                    # db.session.commit()
                    # db.session.close()
    
    
    
                    user_list = db.session.query(models.Users).all()
                    db.session.close()
                    for item in user_list:
                        print(item.username)
    
    
                    return 'login'                
                    
            __init__.py    
                #!/usr/bin/env python
                # -*- coding:utf-8 -*-
                from flask import Flask
                from flask_sqlalchemy import SQLAlchemy
                db = SQLAlchemy()
    
                from .models import *
                from .views import account
    
                def create_app():
                    app = Flask(__name__)
                    app.config.from_object('settings.DevelopmentConfig')
                    # 将db注册到app中
                    db.init_app(app)
                    # 注册蓝图
                    app.register_blueprint(account.account)
    
                    return app
    
    
            models.py
    
                from . import db
    
                class Users(db.Model):
                    """
                    用户表
                    """
                    __tablename__ = 'users'
                    id = db.Column(db.Integer, primary_key=True)
                    username = db.Column(db.String(80), unique=True, nullable=False)
                    email = db.Column(db.String(120), unique=True, nullable=False)
                    # ids = db.Column(db.Integer)
    
                    def __repr__(self):
                        return '<User %r>' % self.username
    
    
            manage.py
                
                from sansa import create_app
                from flask_script import Manager
                from flask_migrate import Migrate,MigrateCommand
                from sansa import db
                app = create_app()
                manager=Manager(app)
                #为了实现迁移
                Migrate(app,db)
                #现在把命令注册进来
                manager.add_command('db1', MigrateCommand)
    
                if __name__ == '__main__':
                    # app.run()
                    manager.run()
    
    
            settings.py
                
                class BaseConfig(object):
                    # SESSION_TYPE = 'redis'  # session类型为redis
                    # SESSION_KEY_PREFIX = 'session:'  # 保存到session中的值的前缀
                    # SESSION_PERMANENT = True  # 如果设置为False,则关闭浏览器session就失效。
                    # SESSION_USE_SIGNER = False  # 是否对发送到浏览器上 session:cookie值进行加密
    
                    SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:123@127.0.0.1:3306/flask01?charset=utf8"
                    SQLALCHEMY_POOL_SIZE = 5
                    SQLALCHEMY_POOL_TIMEOUT = 30
                    SQLALCHEMY_POOL_RECYCLE = -1
    
                    # 追踪对象的修改并且发送信号
                    SQLALCHEMY_TRACK_MODIFICATIONS = False
    
    
                class ProductionConfig(BaseConfig):
                    pass
    
    
                class DevelopmentConfig(BaseConfig):
                    pass
    
    
                class TestingConfig(BaseConfig):
                    pass
    flask-sqlalchemy
     
  • 相关阅读:
    http修改443端口,http 强制跳转https
    线程event事件函数实现红绿灯
    信号量设置
    多线程简单实例
    paramiko 实现ssh登录和sftp登录
    在同一台电脑安装python 2 和3,并且怎样安装各自的pip和模块
    ThreadingTCPServer 如何设置端口重用
    Python 变量比较
    python 多线程 并发socket实例
    python 中变量引用问题
  • 原文地址:https://www.cnblogs.com/wyf20190411-/p/13772188.html
Copyright © 2011-2022 走看看