zoukankan      html  css  js  c++  java
  • flask

    Flask

    flask中还需要掌握的知识

    jinja2   Werkzeug  路由源码  CBV源码及使用   请求上下文   路由中正则表达式
    

    1.Flask初始

    Flask是一个同步web框架,跟django一样,属于轻量级的,小而精,使用灵活

    Flask基于Werkzeug,

    Flask只保留了web开发的核心功能

    flask有很多组件都没有,models使用的是SQLAlchemy

    nginx不能直接调用代码,是调用的服务来调用代码,来将数据返回,nginx只能调用静态资源,不能直接调用动态资源。

    2.Flask四剑客

    1.返回字符串:return

    from flask import Flask,render_template,redirect,jsonify
    return '直接返回字符串'   # 相当于HttpResponse
    

    2.返回html页面:render_template

    将html页面放在 templates 中,会自动找到

    from flask import Flask,render_template,redirect,jsonify
    return render_template(".html",在html中使用的名字="需要携带的参数") 
    
    在heml页面中使用{{}} 接收,如果是字典的话,可以使用字典的取值方法 get或者直接点的方式取出
    

    3.重定向:redirect

    from flask import Flask,render_template,redirect,jsonify
    return redirect("/login")
    

    4.返回json字符串

    from flask import Flask,render_template,redirect,jsonify
    name_dict = [{'name': "jason-gdx"},{'name': "tank-sb"}]
    return  jsonify(name_dict)
    

    3.Flask的配置文件

    方式一

    直接进行配置

    app.debug=True
    

    方式二

    以字典的形式

    app.config['DEBUG']=True
    

    方式三

    使用配置文件的形式

    settings.py

    DEBUG = True
    

    app.py

    app.config.from_pyfile("settings.py")
    
    

    方式四

    使用类的形式,在settings配置文件创建类,在不同的环境下使用不同的类

    settings.py

    class Config(object):
        DEBUG = False
        TESTING = False
        DATABASE_URI = 'sqlite://:memory:'
    
    
    class ProductionConfig(Config):
        DATABASE_URI = 'mysql://user@localhost/foo'
    
    
    class DevelopmentConfig(Config):
        DEBUG = True
    
    
    class TestingConfig(Config):
        TESTING = True
    
    

    app.py

    app.config.from_object('settings.DevelopmentConfig')    # from_object 是一个固定的方法
    
    

    4.路由详解

    路由源码分析(重要)

    路由route中()的参数

    rule   就是路由
    endpoint   取别名,反向解析,如果没有就用当前函数名,不能重复
    methods   方法
    view_func   就我们endpoint,指向的函数,有人就是请求的时候
    strict_slashes  该参数是用来设置,我们的路由是否为严格模式,Flask是非严格模式,True是严格模式,默认是严格模式
    redirect_to   重定向到指定地址,直接跳转,不再执行下面视图函数中的代码
    
    

    路由常见使用

    1.有名传参

    @app.route('/login/<string:nid>/')
    def index(url_path,nid):
        print(nid)
        return '返回值'
    
    

    传入的参数的数据类型有:int,string,float,path,uuid

    2.无名传参

    # http://127.0.0.1:5000/login/ja/?name=wang&password=123
    @app.route('/login/')
    def index():
        name = request.args.get('name')  # 获取使用?拼接传过来的值 
        return '返回值'
    
    

    3.多路由选择使用

    # http://127.0.0.1:5000/login/ja/?name=wang&password=123
    @app.route('/<any(login,reginstr):url_path>/<string:nid>/')
    def index(url_path, nid):
        return '返回值'
    
    # 多路由选择使用就是在括号内定义多个路由关键字,在前端访问的时候只要访问到了路由中的路径,就可以匹配到此视图函数
    
    

    4.反向解析

    使用url_for,可以通过函数名来找到函数对应的路由,再通过重定向来跳转到指定的页面,url也可以传参,使用对应url中的参数名来进行传参

    @app.route('/')
    def register():
        return url_for('index',)   #/login/sgvc  在使用反向解析的时候,如果有参数,需要把参数也带上
    
    

    处理动态的视图函数

    @app.route("/login/<string:nid>",endpoint="login")
    def login(nid):
        return "登录页面"
    
    @app.route("/<string:nid>",endpoint="home")
    def home(nid):
        print("主页面",nid)
        # nid是login函数中需要携带的参数,如果需要在后边使用?拼接参数,使用**=**的形式来进行拼接
        return redirect(url_for('login',nid="111",name="wang"))
    
    
    @app.route('/student/<int:id>/')
    def student(id):
        return 'student {}'.format(id)
    
    

    5.反向解析拼接参数

    拼接参数是在url的最后使用?来对参数进行拼接,可以使用任意键值对来进行拼接

    @app.route('/')
    def register():
        return url_for('index',name='wang',)   #/login/sgvc?name=wang  有参数的也可以这样写
    
    

    直接在括号内传入关键字参数,以无名路由的形式返回

    6.自定义动态路由:正则匹配

    # 我们要用自定义的路由,用正则的话
    #1导入from werkzeug.routing import BaseConverter
    # 2我先要写一个类,然后继承BaseConverter,然后实现__inti__, def to_python(self, value):to_url(self, value)
    # 3 app.url_map.converters['谁便'] = RegexConverter
    #4 我们在路由里面@app.route('/index/<regex1("d+"):nid>'),regex1='随便,regex1("正则表达式")
    #5 regex1("正则表达式")匹配出来的结果,返回to_python,一定要return
    #6 当我们做反向解析的解析的时候,我们的参数,会传递给to_url,return的结果才是我们拼接到我们路由上
    from flask import Flask, views, url_for
    from werkzeug.routing import BaseConverter
    
    app = Flask(import_name=__name__)
    
    class RegexConverter(BaseConverter):
        """
        自定义URL匹配正则表达式
        """
        def __init__(self, map, regex):
            super(RegexConverter, self).__init__(map)
            self.regex = regex
    
        def to_python(self, value):
            """
            路由匹配时,匹配成功后传递给视图函数中参数的值
            """
            #value就正则匹配出来的结果
            print('value',value,type(value))
            return "asdasdasd"
    
        def to_url(self, value):
            """
            使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数
            """
            val = super(RegexConverter, self).to_url(value)
            print(val)
            return val
    
    app.url_map.converters['regex1'] = RegexConverter   # 对自定义的正则进行注册
    @app.route('/index/<regex1("d+"):nid>',endpoint="sb")
    def index(nid):
        print("nid",nid,type(nid))
        print(url_for('sb', nid='888'))
        # /index/666
        return 'Index'
    
    if __name__ == '__main__':
        app.run()
    
    

    7.视图函数装饰器

    可以对视图函数加上装饰器,这样在执行视图函数的时候会先去做校验,然后再执行视图函数中的内容

    from flask import Flask, request
    from functools import  wraps
    
    app = Flask(__name__)
    
    def login_verify(func):
        @wraps(func)   ############ 必须
        def wrapper(*args, **kwargs):
            user_name = request.args.get('user')
            password = request.args.get('password')
            if user_name == 'mark' and password == '123':
                return func(*args,**kwargs)
            else:
                return '请登录'
        return wrapper
    
    @app.route('/my_info/')
    @login_verify    # 使用装饰器
    def my_info():
        return '个人信息页面'
    
    

    注意点

    1.装饰器一定要写在注册路由的下面,写在试图函数的上面

    2.装饰器内部一定要使用@wraps(func) 方法,用于保护被装饰函数的属性

    5.CBV详解(源码)

    flask中有视图类,当前有两种视图,from flask.views import MethodView,View ,使用的时候需要注意重写dispatch_request , 写完视图需要通过 app.add_url_rule(url_rule, view_func) 来进行注册

    6.模板

    一些常见的模板语法,使用的时候和html中基本相同都是使用 {{ }} 进行传参,但是需要加括号运行的,需要加括号才能运行,使用模板的语法的时候有时候需要自己输入,不会进行提示,

    在视图中返回模板的时候,使用方法如下

    # 1.新建一个templates文件夹,把html文件放在这个文件夹下
    
    # 2.定义一个视图函数,将对应的html页面返回,在输入的时候要将后边的.html也加上。
    from flask import render_template
    @app.route("/login/html",endpoint="login/html")
    def login_html():
        info = "我是info信息"
        return render_template("html_test.html",info=info,html="<h3>我需要在前端进行展示</h3>")    # 记得在前端加上safe取消转义
    
    # 第二种方法
    # 视图函数中使用
    from flask import Markup
    def func(st,st1):
        return Markup(f"<h1>使用func返回的数据{st}{st1}</h1>")
    
    @app.route("/login/html",endpoint="login/html")
    def login_html():
        info = "我是info信息"
        return render_template("html_test.html",info=info,func=func)
    
    # 前端使用
    {{func("aaaa","bbbb")}}
    
    # 总结
    # 1.如果返回了html标签,需要在前端中使用{{html|safe}},如果没有加上safe,会按照字符串的形式显示出来
    # 2.如果使用了Markup方法,就需要导入,这种方法可以传入多个参数
    
    

    逻辑判断

    <table>
        {% if 10 %}    # 开始
          <h1>hahahahh</h1>
        {% else %}    # 否则,必须有
          <h2>aaaaaaaa</h2>
        {% endif %}   # 结束,必须有
    </table>
    
    

    for循环

    <table>
        {% for i in "ajfkdhksadh" %}
            <p>{{ i }}</p>    # 需要加括号取值
        {% endfor %}
    </table>
    
    

    在模板中使用反向解析

    <a href="{{url_for('login',nid=1)}}">登录</a>    # 如果有参数需要加上参数
    
    

    7.请求与响应

    flask中需要的请求都需要从全局导入,不能在函数中导入

    使用request和response进行操作,操作方式与django一致

    自定义响应要使用 make_response()

    1.导入make_response

    2.response =make_response(4剑客),四剑客正常使用,只是对response做定制化响应,添加一点想添加的参数

    3.操作response

    4.return response

    例如:(for example)

    response = make_response(render_templage('index.html'))
    response.set_cookie('json','sb')
    return response
    
    

    常见的请求响应

       from flask import Flask
        from flask import request
        from flask import render_template
        from flask import redirect
        from flask import make_response
    
        app = Flask(__name__)
    
    
        @app.route('/login.html', methods=['GET', "POST"])
        def login():
    
            # 请求相关信息
            # request.method  提交的方法
            # request.args  get请求提交的数据
            # request.form   post请求提交的数据
            # request.values  post和get提交的数据总和
            # request.cookies  客户端所带的cookie
            # request.headers  请求头
            # request.path     不带域名,请求路径
            # request.full_path  不带域名,带参数的请求路径
            # request.script_root  
            # request.url           带域名带参数的请求路径
            # request.base_url      带域名请求路径
            # request.url_root      域名
            # request.host_url      域名
            # request.host          127.0.0.1:500
            # request.files     接受文件
            # obj = request.files['the_file_name']   获取文件对象
            # obj.save('/var/www/uploads/' + secure_filename(f.filename)) 将文件保存在某个地方
    
            # 响应相关信息
            # return "字符串"
            # return render_template('html模板路径',**{})
            # return redirect('/index.html')
            #return jsonify({'k1':'v1'})
    
            # response = make_response(render_template('index.html'))
            # response是flask.wrappers.Response类型
            # response.delete_cookie('key')   # 删除指定的cookie的值
            # response.set_cookie('key', 'value')   # 添加指定的cookie值
            # response.headers['X-Something'] = 'A value'   # 设置头中的信息
            # return response
            return "内容"
    
        if __name__ == '__main__':
            app.run()
    
    

    8.session

    app.session_interface里边发生的事情:

    存session:save_session

    1.存session的时候,调用save_session,将我们的session加密的val,读取配置文件得到key,

    2.将1中的key,val存储到cookies

    取session

    1.获取request里边的cookies,获取里面的key,这个key就是['SESSION_COOKIE_NAME'],值就是加密的值

    2.对该值进行解密

    关于session的一些设置

    app.secret_key="askjdaksd"     # 使用cookie的时候必须添加这个参数
    app.config['SESSION_COOKIE_NAME']="dsb"   # 设置在cookie中的key键
    
    

    使用

    from flask import Flask,render_template,make_response,request
    from flask import session
    
    app = Flask(__name__)
    app.secret_key = 'jlaksjdfl'   # 设置session使用的密钥
    
    @app.route('/',methods=['get'])
    def index():
        session['name'] = 'wang'
        response = make_response('hahahahah')
        return response
    
    @app.route('/login')
    def login():
        response = make_response(session['name'])
        return response
    
    if __name__ == '__main__':
        app.run()
    
    

    9.闪现

    什么是闪现

    闪现就是闪一下就没有了,a产生信息,传给c页面,但是用户访问a页面以后,不是直接跳转到c,而是到b,或者是到其他页面,但是用户访问c页面的时候,可以把a给我的信息拿到

    闪现用到的两个方法

    flash,get_flashed_messages   # 存的时候使用flash来存,取的时候使用get_flash_messages来取
    from flask import Flask,flash,get_flashed_messages,request
    
    

    1.如果要用闪现,必须先设置 app.secret_key = 'aaaaaa'

    2.只能取一次,再取就没有了

    3.我们可以通过flash('普通信息',category="info"),对信息做分类

    4.get_flashed_messages(with_categories=True,category_filter=("error",)),with_categories以键值对的形式获取我们设置的闪现,category_filter=("error",)对设置的闪现 进行信息分类的过滤,只会取出元组中的值

    5.在同一个视图函数中可以使用多次get_flashed_messages,

    from flask import Flask,flash,get_flashed_messages,request
    
    app = Flask(__name__)
    app.secret_key = 'asdfasdf'
    @app.route('/index1')
    def index():
        flash('超时错误',category="error")
        flash('普通信息',category="info")
        return "ssdsdsdfsd"
    
    @app.route('/error')
    def error():
        data = get_flashed_messages(with_categories=True,category_filter=("error","info"))
        data1 = get_flashed_messages(with_categories=True, category_filter=("error", "info"))    # 在一个视图函数中可以使用多次
        print("data1",data1)
        print("data",data)
        return "错误信息"
    
    if __name__ == '__main__':
        app.run()
        
    # 注:没有with_categories=True,返回['超时错误']
    # 加上with_categories=rue,返回[('error', '超时错误')]
    
    

    demon

    # 使用闪现做登录之后的提示信息
    # 前端代码
    <form action="" method="post">
        <p>用户名:<input type="text" name="username">{{error}}</p>
        <p>密码:<input type="text" name="password">{{error}}</p>
        <p>{{get_flashed_messages()[0]}}</p>
        <button type="submit">提交</button>
    </form>
    
    # 后端代码
    from flask import flash,get_flashed_messages
    @app.route('/login',endpoint='login',methods=['POST'])
    def login():
        error = ''
        if request.method == 'POST':
            if request.form['username'] == 'wang' 
                    and request.form['password'] == '123':
                print(request.form)
                flash("登录成功")
            else:
                error = '用户名或密码错误'
    
        return render_template('html_test.html',error=error)
    
    

    10.请求扩展

    与django中的中间件相似,使用对应的请求扩展绑定对应的方法,这样在执行视图函数之前都会先执行已经做了请求扩展的方法,之后再执行视图函数

    1.before_request

    类比django中间件中的process_request,在请求收到之前绑定一个函数做一些事情

    #基于它做用户登录认证
    @app.before_request
    def process_request(*args,**kwargs):
        if request.path == '/login':
            return None
        user = session.get('user_info')
        if user:
            return None
        return redirect('/login')
    
    

    2.after_request

    类比django中间件中的process_response,每一个请求之后绑定一个函数,如果请求没有异常

    @app.after_request
    def process_response1(response):
        print('process_response1 走了')
        return response
    
    

    3.before_first_request

    第一次请求时,跟浏览器无关

    @app.before_first_request
    def first():
        pass
    
    

    4.teardown_request

    每一个请求之后绑定一个函数,即使遇到了异常,无论有没有异常都会执行,如果没有异常这个参数就是None,有就记录异常,我们可以在这里设置一个异常日志,当有异常的时候就记录到日志中

    @app.teardown_request 
    def ter(e):
        pass
    
    

    5.errorhandler

    路径不存在时404,服务器内部错误500,也可以使用其他的状态码,当出现这种状态码的时候触发。

    @app.errorhandler(404)   # 可以拦截到指定的错误状态码,当出现这种错误的时候会执行这个请求中来,并且代码中不会报错。
    def error_404(arg):
        return "404错误了"
    
    

    6.template_global

    标签,把一个方法变成一个全局的方法,在使用的时候不需要再传了,直接使用就行了,也可以在前端直接使用

    @app.template_global()
    def sb(a1, a2):
        return a1 + a2
    #{{sb(1,2)}}
    
    

    7.template_filter

    过滤器

    @app.template_filter()
    def db(a1, a2, a3):
        return a1 + a2 + a3
    #{{ 1|db(2,3)}}   # 1是默认传的第一个参数,(2,3)是自己传入的
    
    

    8.context_processor

    @app.context_processor
    def myprocess():
        return {}
    
    # 上下文管理器应该返回一个字典,字典中的 key 会被模板中当成变量来渲染
    # 上下文管理器中返回的字典,在所有页面中都可以用
    # 被这个装饰器修改的钩子函数,必须要返回一个字典,即使为空也要返回
    
    

    总结:

    1.重点掌握befor_request和after_request

    2.注意有多个情况下的执行顺序

    3.befor_request请求拦截后(也就是有return值),response所有都执行

    flask中的钩子函数

    # 钩子函数可以分为两层说明,第一层是 app 层,第二层则是 blueprint 层
    
    app 层的钩子函数有 before_request,before_first_request,after_request,teardown_request,
    
    blueprint 层关于request 的钩子函数其实和app层基本一致,有两点区别:
    1.与 app 层相比,blueprint 层少了一个 before_first_request 的钩子函数多了一些,可以在蓝图调用 2.app 层的钩子函数before_app_first_request,before_app_request,after_app_request,after_app_request,teardown_app_request
    
    

    11.中间件

    from flask import Flask
    
    app = Flask(__name__)
    
    @app.route('/')
    def index():
        return 'Hello World!'
    # 模拟中间件class Md(object):
        def __init__(self,old_wsgi_app):
            self.old_wsgi_app = old_wsgi_app
    
        def __call__(self,  environ, start_response):
            print('开始之前')
            ret = self.old_wsgi_app(environ, start_response)
            print('结束之后')
            return ret
    
    if __name__ == '__main__':
        #1我们发现当执行app.run方法的时候,最终执行run_simple,最后执行app(),也就是在执行app.__call__方法 
        #2 在__call__里面,执行的是self.wsgi_app().那我们希望在执行他本身的wsgi之前做点事情。
        #3 所以我们先用Md类中__init__,保存之前的wsgi,然后我们用将app.wsgi转化成Md的对象。
        #4 那执行新的的app.wsgi_app,就是执行Md的__call__方法。
        #把原来的wsgi_app替换为自定义的,
        
        app.wsgi_app = Md(app.wsgi_app)
        app.run()
    
    

    12.请求流程

    flask的请求流程是从请求来到响应的时候这一段使时间内走过的流程

    1.执行app.__call__
    2.执行wsgi_app()
    3.
    
    

    请求所有的流程

    ctx = self.request_context(environ)
            error = None
            try:
                try:
                    ctx.push()
                    #根据路径去执行视图函数,视图类
                    response = self.full_dispatch_request()
                except Exception as e:
                    error = e
                    response = self.handle_exception(e)
                except:  # noqa: B001
                    error = sys.exc_info()[1]
                    raise
                return response(environ, start_response)
            finally:
                #不管出不出异常,都会走这里
                if self.should_ignore_error(error):
                    error = None
                ctx.auto_pop(error)
    
    

    13.local

    local就是一把锁

    兼容线程和协程(源码到request中去看,看local的__getattr__,setattr)
    try:
        from greenlet import getcurrent as get_ident
    except Exception as e:
        from threading import get_ident
    from threading import Thread
    import time
    class Local(object):
        def __init__(self):
            object.__setattr__(self,'storage',{})
        def __setattr__(self, k, v):
            ident = get_ident()
            if ident in self.storage:
                self.storage[ident][k] = v
            else:
                self.storage[ident] = {k: v}
        def __getattr__(self, k):
            ident = get_ident()
            return self.storage[ident][k]
    obj = Local()
    def task(arg):
        obj.val = arg
        obj.xxx = arg
        print(obj.val)
    for i in range(10):
        t = Thread(target=task,args=(i,))
        t.start()
    
    

    14.偏函数

    参数在使用的时候可以先传入一部分参数,之后再调用的时候将剩余的值传入就可以执行了

    from functools import partial
    def test(a,b,c,d):
        return a + b + c + d
    
    tes = partial(test,2,2)   # 括号内的参数为  需要调用的函数名,剩余的参数
    
    print(tes(3,4))     # 传入的2,3是c,d的值
    
    

    15.请求上下文

    '''
    1     app.__call__
    2     wsgi_app(environ, start_response) 
    2.1 ctx = self.request_context(environ)
        2.1.1 return RequestContext(self, environ)
            这里的self是app,environ请求相关
        2.1.2 return RequestContext(self, environ)
        得到了RequestContext的对象,而且有request属性
    2.2  2.1中的ctx就是RequestContext的对象  
    
    2.3  ctx.push()执行这个,就是RequestContext的对象的push方法
         2.3.1  #执行这个,self-->ctx
            _request_ctx_stack.push(self) 
            2.3.1.1 我们发现_request_ctx_stack = LocalStack()
            他的push方法的源码:
                    def push(self, obj):
                        rv = getattr(self._local, "stack", None)
                        if rv is None:     
                            # self._local=>stack-->storage['线程id']['stack']=[ctx,]
                            self._local.stack = rv = []
                        rv.append(obj)
                        return rv
                        
    3在请求中获取request.form
    3.1 request是LocalProxy的对象,当获取属性的时候会走__getattr__
        def __getattr__(self, name):
            if name == "__members__":
                return dir(self._get_current_object())
            #name-->form,
            #self._get_current_object()===>ctx.request,form
            #_get_current_object()---》self.__local()
            
            return getattr(self._get_current_object(), name)
            
        3.1.1 self._get_current_object():源码:最终:partial(_lookup_req_object, "request")
         def _get_current_object(self):
         
            if not hasattr(self.__local, "__release_local__"):
                    #local==>partial(_lookup_req_object, "request")
                    #def __init__(self, local, name=None):
                        # object.__setattr__(self, "_LocalProxy__local", local)
                #self.__local()===>local()
                return self.__local()
            try:
                return getattr(self.__local, self.__name__)
            except AttributeError:
                raise RuntimeError("no object bound to %s" % self.__name__)
    4 partial(_lookup_req_object, "request")偏函数的源码
        def _lookup_req_object(name):
            #name是request
            #ctx
            top = _request_ctx_stack.top
            if top is None:
                raise RuntimeError(_request_ctx_err_msg)
            #ctx-->request
            return getattr(top, name)
        4.1中_request_ctx_stack.top
           @property
            def top(self):
            
            try:
                return self._local.stack[-1]
            except (AttributeError, IndexError):
                return None
    '''
    
    

    程序运行,两个LocalStack()对象,一个里面放request和session,另一个方g和current_app

    16.蓝图

    构建项目的目录,可以直接使用模板,直接使用项目文件夹的模板

    做一个项目的框架,划分项目目录

    views视图类中

    from flask import Blueprint
    
    user = Blurprint('文件包的名',__name__)    # 此时的user相当于app,文件包的名是做反向解析的时候使用的名,例如:url_for(acc.login)
    每一个包中都有一个包名,可以在不同的视图中使用请求扩展,如果需要对所有的视图都要进行操作,就把指定的请求扩展写在__init__中
    
    # 路由中的使用
    @user.route()
    
    

    __init.py中

    from flask import Flask
    
    app = Flask(__name__,template_folder='templates',static_folder='statics',static_url_path='/static')
    
    # 对所有的视图做请求扩展
    @app.before_request
    def a():
        print("我是app里面的befor_request")
    
    from .views.account import account
    from .views.blog import blog
    from .views.user import user
    app.register_blueprint(admin, url_prefix='/admin')   # url_prefix 在请求指定的函数中的方法的时候需要加上对应的url别名,不然没有办法直接访问,类似于前缀
    
    # 蓝图总结
    1.先导入蓝图,在需要使用蓝图的视图函数中定义蓝图
    2.对定义好的蓝图进行注册
    3.在对应的视图函数中写视图函数
    4.做反向解析的时候需要把蓝图的前缀加上,防止不同视图中的视图函数重复出现
    
    

    17.g对象

    g对象:global

    1.g对象是专门用来保存用户的数据的

    2.g对象是在一次请求中的所有的代码的地方,都是可以使用的

    g对象的特性:

    当前请求内你设置就可以取,必须先设置后取,当前请求类可以取且无限制

    就算你当前请求设置了,如果不去,其他请求过来,也取不到

    from flask import g
    def set_g():   # 设置g对象的使用
        g.name = 'wang'
    
    @app.route('/aa')
    def a():
        g.name = 'zhang'   # 这里改变后在这个视图函数中使用的g对象都是改变后的对象
        set_g()   # 调用g对象,在使用的时候也可以不写
        print(g.name)
        return '1111'
    
    

    18.信号

    Flask框架中的信号基于blinker,其主要作用就是让开发者在flask请求过程中定制一些用户行为,

    信号和请求扩展的区别

    请求扩展:只能使用内置的一些请求扩展,在指定的位置执行

    信号:可以自定义信号,在需要使用的地方调用就可以执行对应的方法,

    安装

    pip install blinker
    
    

    往信号中注册函数

    from flask import Flask,signals,render_template
    
    app = Flask(__name__)
    
    # 往信号中注册函数
    #1给信号绑定要执行的函数
    #无需管调用,因为flask,已经给我们设置调用点
    def func(*args,**kwargs):
        print('触发信号',args,kwargs)
    #与该信号进行绑定
    signals.request_started.connect(func)
    signals.request_started.send()
    # 触发信号: signals.request_started.send()
    
    

    一个流程中的信号出发点

    a. before_first_request
    b. 触发 request_started 信号
    c. before_request
    d. 模板渲染
        渲染前的信号 before_render_template.send(app, template=template, context=context)
            rv = template.render(context) # 模板渲染
        渲染后的信号 template_rendered.send(app, template=template, context=context)
    e. after_request
    f. session.save_session()
    g. 触发 request_finished信号        
        如果上述过程出错:
            触发错误处理信号 got_request_exception.send(self, exception=e)
                
    h. 触发信号 request_tearing_down
    
    

    flask内置的一些信号

    request_started = _signals.signal('request-started')                # 请求到来前执行
    request_finished = _signals.signal('request-finished')              # 请求结束后执行
     
    before_render_template = _signals.signal('before-render-template')  # 模板渲染前执行
    template_rendered = _signals.signal('template-rendered')            # 模板渲染后执行
     
    got_request_exception = _signals.signal('got-request-exception')    # 请求执行出现异常时执行
     
    request_tearing_down = _signals.signal('request-tearing-down')      # 请求执行完毕后自动执行(无论成功与否)
    appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)
     
    appcontext_pushed = _signals.signal('appcontext-pushed')            # 应用上下文push时执行
    appcontext_popped = _signals.signal('appcontext-popped')            # 应用上下文pop时执行
    message_flashed = _signals.signal('message-flashed')                # 调用flask在其中添加数据时,自动触发
    
    

    自定义信号

    from flask import Flask, current_app, flash, render_template
    from flask.signals import _signals
    
    app = Flask(import_name=__name__)
    
    # 1.自定义信号,xxxxxx就是这个信号的名字,之后调用的时候就是用xxx来进行调用
    xxxxx = _signals.signal('xxxxx')
    
    def func(sender,a):
        print(sender,a)
        print("我是自定义信号")
    
    # 2.自定义信号中注册函数
    xxxxx.connect(func)
    
    @app.route("/x")
    def index():
        # 触发信号 
        xxxxx.send("sb",a="1")   # 3.调用自定义的信号,必须要有一个参数,第一个是位置参数,第二个是关键字参数
        return 'Index'
    
    if __name__ == '__main__':
        app.run()
    
    

    19.flask-session

    作用:将默认保存的签名cookie中的值,保存到redis/memcached/file/Mongodb/SQLAlchemy,简单的说就是把session保存到数据库中

    安装:pip install flask-session

    使用1:(直接使用RedisSessionInterface方法)

    from flask import Flask,session
    from flask_session import RedisSessionInterface
    import redis
    app = Flask(__name__)
    app.secret_key="ajksda"
    conn=redis.Redis(host='127.0.0.1',port=6379)
    #use_signer是否对key签名
    app.session_interface=RedisSessionInterface(conn,key_prefix='jason',use_signer=True, permanent=False)
    """
    参数解析
    permanent=False   关闭浏览器是否清除cookie,默认为True,不清除
    key_prefix='jason'   设置前缀,保存在数据库中的时候的前缀
    use_signer=True   设置密钥
    """
    
    @app.route('/')
    def hello_world():
        session['sb']='jason'
        return 'Hello World!'
    
    @app.route("/index")
    def index():
        print(session['sb'])
        return "ok"
    
    if __name__ == '__main__':
        app.run()
    
    

    使用2:(推荐使用)

    from flask import Flask,session
    import  redis
    from flask_session import Session  # 直接导入Session就行了
    app = Flask(__name__)
    app.config['SESSION_TYPE'] = 'redis'
    app.config['SESSION_REDIS'] =redis.Redis(host='127.0.0.1',port='6379')
    app.config['SESSION_KEY_PREFIX']="jason"
    Session(app)
    
    @app.route('/')
    def hello_world():
        session['sb']='jason'
        return 'Hello World!'
    
    @app.route("/index")
    def index():
        print(session['sb'])
        return "ok"
    
    if __name__ == '__main__':
        app.run()
    
    

    问题:设置cookie时,如何设定关闭浏览器则cookie失效

    response.set_cookie('k','v',exipre=None)#这样设置即可:exipre=None  最长过期时间
    #在session中设置
    app.session_interface=RedisSessionInterface(conn,key_prefix='lqz',permanent=False)
    #一般不用,我们一般都设置超时时间,多长时间后失效
    
    

    问题:cookie默认超时时间是多少?如何设置超时时间

    #源码expires = self.get_expiration_time(app, session)
    'PERMANENT_SESSION_LIFETIME':           timedelta(days=31),#这个配置文件控制
    
    

    20.自定义命令:flask-script

    可以自己定义使用一些命令,在不用启动项目的情况下操作项目中的一些方法

    #第一步安装:pip3 install flask-script
    from flask import Flask
    from flask_script import Manager   # 使用命令行启动的时候要导入Manage
    app = Flask(__name__)
    manager=Manager(app)
    
    @app.route("/")
    def index():
        return "ok"
    
    # 位置传参
    @manager.command
    def custom1(arg,a):
        """
        自定义命令
        python manage.py custom 123
        :param arg:
        :return:
        """
        print(arg,a)
    
        
    # 关键字传参
    @manager.option('-n', '--name', dest='name')   # -n 是简写,--name是全称,name必须与参数中的name一致
    @manager.option('-u', '--url', dest='url')
    def cmd1(name, url):
        """
        自定义命令(-n也可以写成--name)
        执行: python manage.py  cmd -n lqz -u http://www.oldboyedu.com
        执行: python manage.py  cmd --name lqz --url http://www.oldboyedu.com
        :param name:
        :param url:
        :return:
        """
        print(name, url)
    
    if __name__ == '__main__':
        manager.run()
    
    

    21.多app应用

    from werkzeug.wsgi import DispatcherMiddleware
    from werkzeug.serving import run_simple
    from flask import Flask
    app1 = Flask('app01')
    app2 = Flask('app02')
    
    @app1.route('/index')
    def index():
        return "app01"
    
    @app2.route('/index')
    def index2():
        return "app2"
    dm = DispatcherMiddleware(app1, {
        '/sec12': app2,
    })
    if __name__ == "__main__":
    
        run_simple('localhost', 5000, dm)
        
    # 访问app2应用的路由:localhose:5000/sec/index
    
    

    22.wtforms

    安装:pip install wtforms

    作用:与django中的forms组件作用相似

    使用1:

    from flask import Flask, render_template, request, redirect
    from wtforms import Form
    from wtforms.fields import simple
    from wtforms import validators
    from wtforms import widgets
    
    app = Flask(__name__, template_folder='templates')
    
    app.debug = True
    
    
    class LoginForm(Form):
        # 字段(内部包含正则表达式)
        name = simple.StringField(
            label='用户名',
            # 对字段的判断及错误信息展示
            validators=[
                validators.DataRequired(message='用户名不能为空.'),
                validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
            ],
            widget=widgets.TextInput(), # 页面上显示的插件
            render_kw={'class': 'form-control'},   # 添加类和属性
            default='axal'   # 设置默认值
        )
        # 字段(内部包含正则表达式)
        pwd = simple.PasswordField(
            label='密码',
            validators=[
                validators.DataRequired(message='密码不能为空.'),
                validators.Length(min=8, message='用户名长度必须大于%(min)d'),
                validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*d)(?=.*[$@$!%*?&])[A-Za-zd$@$!%*?&]{8,}",
                                  message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
    
            ],
            widget=widgets.PasswordInput(),
            render_kw={'class': 'form-control'}
        )
    
    
    
    @app.route('/login', methods=['GET', 'POST'])
    def login():
        if request.method == 'GET':
            form = LoginForm()
            return render_template('login.html', form=form)
        else:
            form = LoginForm(formdata=request.form)
            if form.validate():    # 验证信息是否正确
                print('用户提交数据通过格式验证,提交的值为:', form.data)
            else:
                print(form.errors)
            return render_template('login.html', form=form)
    
    if __name__ == '__main__':
        app.run()
    
    

    login.html

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Title</title>
    </head>
    <body>
    <h1>登录</h1>
    <form method="post">
        <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p> # 获取某个字段的错误
    
        <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
        <input type="submit" value="提交">
    </form>
    </body>
    </html>
    
    

    使用2:

    from flask import Flask, render_template, request, redirect
    from wtforms import Form
    from wtforms.fields import core
    from wtforms.fields import html5
    from wtforms.fields import simple
    from wtforms import validators
    from wtforms import widgets
    
    app = Flask(__name__, template_folder='templates')
    app.debug = True
    
    
    class RegisterForm(Form):
        name = simple.StringField(
            label='用户名',
            validators=[
                validators.DataRequired()
            ],
            widget=widgets.TextInput(),
            render_kw={'class': 'form-control'},
            default='alex'
        )
    
        pwd = simple.PasswordField(
            label='密码',
            validators=[
                validators.DataRequired(message='密码不能为空.')
            ],
            widget=widgets.PasswordInput(),
            render_kw={'class': 'form-control'}
        )
    
        pwd_confirm = simple.PasswordField(
            label='重复密码',
            validators=[
                validators.DataRequired(message='重复密码不能为空.'),
                validators.EqualTo('pwd', message="两次密码输入不一致")  # 直接做判断,相当于钩子
            ],
            widget=widgets.PasswordInput(),
            render_kw={'class': 'form-control'}
        )
    	# 邮箱
        email = html5.EmailField(
            label='邮箱',
            validators=[
                validators.DataRequired(message='邮箱不能为空.'),
                validators.Email(message='邮箱格式错误')    # 自动校验邮箱格式
            ],
            widget=widgets.TextInput(input_type='email'),
            render_kw={'class': 'form-control'}
        )
    	# 单选
        gender = core.RadioField(
            label='性别',
            choices=(
                (1, '男'),
                (2, '女'),
            ),
            coerce=int   # “1” “2”,指定传入的是那种类型,默认是字符串
         )
        city = core.SelectField(
            label='城市',
            choices=(
                ('bj', '北京'),
                ('sh', '上海'),
            )
        )
    	# 多选
        hobby = core.SelectMultipleField(
            label='爱好',
            choices=(
                (1, '篮球'),
                (2, '足球'),
            ),
            coerce=int
        )
    
        favor = core.SelectMultipleField(
            label='喜好',
            choices=(
                (1, '篮球'),
                (2, '足球'),
            ),
            widget=widgets.ListWidget(prefix_label=False),
            option_widget=widgets.CheckboxInput(),
            coerce=int,
            default=[1, 2]
        )
    	
        # 可以改变已经定义好的字段的值
        def __init__(self, *args, **kwargs):
            super(RegisterForm, self).__init__(*args, **kwargs)
            self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))
            
    	# 局部校验
        def validate_pwd_confirm(self, field):
            """
            自定义pwd_confirm字段规则,例:与pwd字段是否一致
            :param field:
            :return:
            """
            # 最开始初始化时,self.data中已经有所有的值
    
            if field.data != self.data['pwd']:
                # raise validators.ValidationError("密码不一致") # 继续后续验证
                raise validators.StopValidation("密码不一致")  # 不再继续后续验证
    
    
    @app.route('/register', methods=['GET', 'POST'])
    def register():
        if request.method == 'GET':
            form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial
            return render_template('register.html', form=form)
        else:
            form = RegisterForm(formdata=request.form)
            if form.validate():
                print('用户提交数据通过格式验证,提交的值为:', form.data)
            else:
                print(form.errors)
            return render_template('register.html', form=form)
    
    
    
    if __name__ == '__main__':
        app.run()
    
    

    login.html

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Title</title>
    </head>
    <body>
    <h1>用户注册</h1>
    <form method="post" novalidate style="padding:0  50px">
        {% for field in form %}
        <p>{{field.label}}: {{field}} {{field.errors[0] }}</p>
        {% endfor %}
        <input type="submit" value="提交">
    </form>
    </body>
    </html>
    
    

    23.SQLAlchemy

    # https://www.cnblogs.com/xiaoyuanqujing/protected/articles/11715497.html,    学习网址
    
    

    SQLAlchemy是一个基于python实现的ORM框架,该框架建立在DB API之上,使用关系对象映射进行数据库操作,简而言之就是:将类和对象转换成SQL,然后使用数据API执行SQL并回去执行结果

    pip install sqlalchemy
    
    

    组成部分

    Engine,框架的引擎
    Connection Pooling ,数据库连接池
    Dialect,选择连接数据库的DB API种类
    Schema/Types,架构和类型
    SQL Exprression Language,SQL表达式语言
    
    

    SQLAlchemy本身无法操作数据库,其必须依赖pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作

    MySQL-Python
        mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
        
    pymysql
        mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
        
    MySQL-Connector
        mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
        
    cx_Oracle
        oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
        
    更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
    
    

    django中如何反向生成models

    python manage.py inspectdb > app/models.py
    
    

    简单使用(能创建表,删除表,不能修改表)

    1.执行原生sql(不常用)

    import time
    import threading
    import sqlalchemy
    from sqlalchemy import create_engine
    from sqlalchemy.engine.base import Engine
    
    engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    def task(arg):
        conn = engine.raw_connection()
        cursor = conn.cursor()
        cursor.execute(
            "select * from app01_book"
        )
        result = cursor.fetchall()
        print(result)
        cursor.close()
        conn.close()
    
    for i in range(20):
        t = threading.Thread(target=task, args=(i,))
        t.start()
    
    

    2.orm使用

    models.py

    import datetime
    from sqlalchemy import create_engine   # 必须导入
    from sqlalchemy.ext.declarative import declarative_base    # 必须导入
    from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, 
    UniqueConstraint, Index
    from werkzeug.security import generate_password_hash, check_password_hash   # 密码加密反解
    Base = declarative_base()   # 要用表就要产生这个对象
    
    class Users(Base):
        __tablename__ = 'users'  # 数据库表名称,必须
        id = Column(Integer, primary_key=True)  # id 主键
        name = Column(String(32), index=True, nullable=False)  # name列,索引,不可为空
        # email = Column(String(32), unique=True)
        #datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
        # ctime = Column(DateTime, default=datetime.datetime.now)
        # extra = Column(Text, nullable=True)
    
        __table_args__ = (
            # UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一
            # Index('ix_id_name', 'name', 'email'), #索引
        )
    
    def init_db():
        """
        根据类创建数据库表
        :return:
        """
        engine = create_engine(
            "mysql+pymysql://root:123456@127.0.0.1:3306/aaa?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
        Base.metadata.create_all(engine)    # 创建表的语句
    
    def drop_db():
        """
        根据类删除数据库表
        :return:
        """
        engine = create_engine(
            "mysql+pymysql://root:123456@127.0.0.1:3306/aaa?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
     
        Base.metadata.drop_all(engine)   # 删除表的语句
    
    if __name__ == '__main__':
        # drop_db()
        init_db()
        
    # 1.创建表的第一步:创建引擎去链接
    # 2.按照指定的字段去设置字段
    # 3.执行指定的添加表和删除表的命令
    # 4.表创建好之后不能再更改
    
    

    app.py

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import Users
    #"mysql+pymysql://root@127.0.0.1:3306/aaa"
    engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
    Connection = sessionmaker(bind=engine)
    
    # 每次执行数据库操作时,都需要创建一个Connection
    con = Connection()
    
    # ############# 执行ORM操作 #############
    obj1 = Users(name="lqz")
    con.add(obj1)
    # 提交事务
    con.commit()
    
    # 关闭session,其实是将连接放回连接池
    con.close()
    
    

    3.一对多关系

    class Hobby(Base):
        __tablename__ = 'hobby'
        id = Column(Integer, primary_key=True)
        caption = Column(String(50), default='篮球')
    
    
    class Person(Base):
        __tablename__ = 'person'
        nid = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=True)
        # hobby指的是tablename而不是类名,uselist=False
        hobby_id = Column(Integer, ForeignKey("hobby.id"))   # 和hobby表的id建立外键关系
        
        # 跟数据库无关,不会新增字段,只用于快速链表操作	
        # 类名,backref 用于反向查询
        hobby=relationship('Hobby',backref='pers')    # backref 是设置的反向字段
    
    

    4.多对多关系

    class Boy2Girl(Base):
        __tablename__ = 'boy2girl'
        id = Column(Integer, primary_key=True, autoincrement=True)
        girl_id = Column(Integer, ForeignKey('girl.id'))
        boy_id = Column(Integer, ForeignKey('boy.id'))
    
    
    class Girl(Base):
        __tablename__ = 'girl'
        id = Column(Integer, primary_key=True)
        name = Column(String(64), unique=True, nullable=False)
    
    
    class Boy(Base):
        __tablename__ = 'boy'
    
        id = Column(Integer, primary_key=True, autoincrement=True)
        hostname = Column(String(64), unique=True, nullable=False)
        
        # 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以
        servers = relationship('Girl', secondary='boy2girl', backref='boys')
    
    

    5.操作数据库

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import Users
      
    engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
      
    # 每次执行数据库操作时,都需要创建一个session
    session = Session()
      
    # ############# 执行ORM操作 #############
    obj1 = Users(name="lqz")
    session.add(obj1)
      
    # 提交事务
    session.commit()
    # 关闭session
    session.close()
    
    

    6.基于scoped_session实现线程安全

    保证线程的安全性,在使用原生SQLAchemy的时候需要注意,使用flask_sqlalchemy的时候不需要

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from sqlalchemy.orm import scoped_session
    from models import Users
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    
    """
    # 线程安全,基于本地线程实现每个线程用同一个session
    # 特殊的:scoped_session中有原来方法的Session中的一下方法:
    
    public_methods = (
        '__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested',
        'close', 'commit', 'connection', 'delete', 'execute', 'expire',
        'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind',
        'is_modified', 'bulk_save_objects', 'bulk_insert_mappings',
        'bulk_update_mappings',
        'merge', 'query', 'refresh', 'rollback',
        'scalar'
    )
    """
    #scoped_session类并没有继承Session,但是却又它的所有方法
    session = scoped_session(Session)
    # ############# 执行ORM操作 #############
    obj1 = Users(name="alex1")
    session.add(obj1)
    
    # 提交事务
    session.commit()
    # 关闭session
    session.close()
    
    

    7.基本的增删改查

    import time
    import threading
    
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine
    from sqlalchemy.sql import text
    
    from db import Users, Hosts
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    
    session = Session()
    
    # ################ 添加 ################
    # 单增
    obj1 = Users(name="wupeiqi")
    session.add(obj1)
    session.commit()    # 提交一定要写commie()
    
    # 群增
    session.add_all([
        Users(name="lqz"),
        Users(name="egon"),
        Hosts(name="c1.com"),
    ])
    session.commit()
    
    # ################ 删除 ################
    session.query(Users).filter(Users.id > 2).delete()   # 不写条件的时候全部删除
    session.commit()
    
    # ################ 修改 ################
    #传字典
    session.query(Users).filter(Users.id > 0).update({"name" : "lqz"})
    #类似于django的F查询
    session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)    # 添加字符串必须加上 synchronize_session=False
    session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")
    session.commit()
    
    # ################ 查询 ################
    r1 = session.query(Users,Auther).all()    # 可以查多个表,不加 .all()就是原生SQL
    
    #只取age列,把name重命名为xx,不改变数据库中name的值,改变返回到前端的name的值
    r2 = session.query(Users.name.label('xx'), Users.age).all()
    
    #filter传的是表达式,filter_by传的是参数
    r3 = session.query(Users).filter(Users.name == "lqz").all()
    r4 = session.query(Users).filter_by(name='lqz').all()
    r5 = session.query(Users).filter_by(name='lqz').first()
    
    #:value 和:name 相当于占位符,用params传参数
    r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all()
    
    #自定义查询sql
    r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()
    
    
    *****************************总结*****************************
    # 增,删,改都要  session.commit()
    # 最后使用close进行关闭   session.close()
    # 如果需要打印的对象是按名的形式展示,使用的是__rper___   
    
    

    8.常用操作

    # 条件
    ret = session.query(Users).filter_by(name='lqz').all()
    
    #表达式,and条件连接
    ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
    ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
    
    #注意下划线  in_ 包裹的是属于的条件
    ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
    
    #~非,除。。外
    ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
    
    #二次筛选
    ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
    
    #or_包裹的都是or条件,and_包裹的都是and条件
    from sqlalchemy import and_, or_
    ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
    ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
    ret = session.query(Users).filter(
        or_(
            Users.id < 2,
            and_(Users.name == 'eric', Users.id > 3),
            Users.extra != ""
        )).all()
    
    
    # 通配符,以e开头,不以e开头  like
    ret = session.query(Users).filter(Users.name.like('e%')).all()
    ret = session.query(Users).filter(~Users.name.like('e%')).all()
    
    # 限制,用于分页,区间
    ret = session.query(Users)[1:2]
    
    # 排序,根据name降序排列(从大到小)  order_by
    ret = session.query(Users).order_by(Users.name.desc()).all()
    
    #第一个条件重复后,再按第二个条件升序排  order_by
    ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
    
    # 分组 group_by
    from sqlalchemy.sql import func
    ret = session.query(Users).group_by(Users.extra).all()
    
    #分组之后取最大id,id之和,最小id  group_by
    from sqlalchemy.sql import func
    ret = session.query(
        func.max(Users.id),
        func.sum(Users.id),
        func.min(Users.id)).group_by(Users.name).all()
    
    #haviing 筛选
    from sqlalchemy.sql import func
    ret = session.query(
        func.max(Users.id),
        func.sum(Users.id),
        func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()
    
    # 连表(默认用forinkey关联)
    ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()
    
    #join表,默认是inner join
    ret = session.query(Person).join(Favor, Person.name == Favor.name).all()
    
    #isouter=True 外连,表示Person left join Favor,没有right join,反过来可实现 right join
    ret = session.query(Person).join(Favor, isouter=True).all()
    
    #打印原生sql  没有加 .all() 的时候打印的是原生sql
    aa=session.query(Person).join(Favor, isouter=True)
    
    # 自己指定on条件(连表条件),第二个参数,支持on多个条件,用and_,同上
    ret = session.query(Person).join(Favor,Person.id==Favor.id, isouter=True).all()
    
    # 组合(了解)UNION 操作符用于合并两个或多个 SELECT 语句的结果集
    #union和union all的区别?
    q1 = session.query(Users.name).filter(Users.id > 2)
    q2 = session.query(Favor.caption).filter(Favor.nid < 2)
    ret = q1.union(q2).all()
    
    q1 = session.query(Users.name).filter(Users.id > 2)
    q2 = session.query(Favor.caption).filter(Favor.nid < 2)
    ret = q1.union_all(q2).all()
    
    

    9.执行原生sql

    import time
    import threading
    
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine
    from sqlalchemy.sql import text
    from sqlalchemy.engine.result import ResultProxy
    from db import Users, Hosts
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    
    session = Session()
    
    # 查询
    # cursor = session.execute('select * from users')
    # result = cursor.fetchall()
    
    # 添加
    cursor = session.execute('insert into users(name) values(:value)',params={"value":'lqz'})
    session.commit()
    print(cursor.lastrowid)
    
    session.close()
    
    

    10.一对多

    import time
    import threading
    
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine
    from sqlalchemy.sql import text
    from sqlalchemy.engine.result import ResultProxy
    from db import Users, Hosts, Hobby, Person
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    session = Session()
    *************************** 添加 ***************************
    # 两个表一起添加
    session.add_all([
        Hobby(caption='乒乓球'),
        Hobby(caption='羽毛球'),
        Person(name='张三', hobby_id=3),
        Person(name='李四', hobby_id=4),
    ])
    
    # 添加一 正向插入数据
    person = Person(name='张九', hobby=Hobby(caption='姑娘'))
    session.add(person)
    session.commit()
    
    # 添加二 反向插入数据,使用设置的反向字段
    hb = Hobby(caption='人妖')
    hb.pers = [Person(name='文飞'), Person(name='博雅')]
    session.add(hb)
    session.commit()
    
    
    *************************** 查询 ***************************
    # 使用relationship正向查询
    v = session.query(Person).first()
    print(v.name)
    print(v.hobby.caption)
    
    # 使用relationship反向查询,使用关联的反向字段操作
    v = session.query(Hobby).first()
    print(v.caption)
    print(v.pers)   
    
    
    #方式一,自己链表    isouter=True 不加是inner join查询,加上是left join查询,没有right查询
    # person_list=session.query(models.Person.name,models.Hobby.caption).join(models.Hobby,isouter=True).all()
    person_list=session.query(models.Person,models.Hobby).join(models.Hobby, isouter=True).all()
    for row in person_list:
        # print(row.name,row.caption)
        print(row[0].name,row[1].caption)
    
    #方式二:通过relationship
    person_list=session.query(models.Person).all()
    for row in person_list:
        print(row.name,row.hobby.caption)
        
    #查询喜欢姑娘的所有人
    obj=session.query(models.Hobby).filter(models.Hobby.id==1).first()
    persons=obj.pers
    print(persons)
    session.close()
    
    # 如果没有外键
    ret = session.query(Person).join(Hobby, Person.nid=Hobby.id, isouter=True).all()
    
    

    11.多对多

    import time
    import threading
    
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine
    from sqlalchemy.sql import text
    from sqlalchemy.engine.result import ResultProxy
    from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    session = Session()
    ************************ 添加 ************************
    # 方式一   麻烦
    session.add_all([
        Server(hostname='c1.com'),
        Server(hostname='c2.com'),
        Group(name='A组'),
        Group(name='B组'),
    ])
    session.commit()
    
    s2g = Server2Group(server_id=1, group_id=1)
    session.add(s2g)
    session.commit()
    
    # 方法二   正向添加
    gp = Group(name='C组')
    gp.servers = [Server(hostname='c3.com'),Server(hostname='c4.com')]
    session.add(gp)
    session.commit()
    
    # 方法三   反向添加
    ser = Server(hostname='c6.com')
    ser.groups = [Group(name='F组'),Group(name='G组')]
    session.add(ser)
    session.commit()
    
    ************************ 查询 ************************
    # 使用relationship正向查询
    v = session.query(Group).first()
    print(v.name)
    print(v.servers)
    
    # 使用relationship反向查询    直接使用反向字段操作
    v = session.query(Server).first()
    print(v.hostname)
    print(v.groups)
    
    session.close()
    
    

    12.其他

    import time
    import threading
    
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine
    from sqlalchemy.sql import text, func
    from sqlalchemy.engine.result import ResultProxy
    from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    session = Session()
    
    # 关联子查询:correlate(Group)表示跟Group表做关联,as_scalar相当于对该sql加括号,用于放在后面当子查询
    subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()
    result = session.query(Group.name, subqry)
    """
    SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid 
    FROM server 
    WHERE server.id = `group`.id) AS anon_1 
    FROM `group`
    """
    '''
    
    select * from tb where id in [select id from xxx];
    
    select id,
            name,
            #必须保证此次查询只有一个值
            (select max(id) from xxx) as mid
    from tb
    
    例如,第三个字段只能有一个值
    id name  mid
    1  lqz   1,2  不合理
    2  egon   2
    
    
    '''
    '''
    成绩表:
    id sid    cid    score
    1  1      物理      99 
    2  1      化学      88
    3  2      物理      95
    
    学生表:
    id   name  每个学生总分数
    1     xx      88
    2     yy       77
    
    select id,name,
    (select avr(score) from 成绩表 where 成绩表.sid=学生表.id) as x
    from 学生表
    subqry = session.query(func.count(成绩表.scort).label("sc")).filter(学生表.id == 成绩表.sid).correlate(学生表).as_scalar()
    result = session.query(学生表.name, subqry)
    
    '''
    
    # 原生SQL
    """
    # 查询
    cursor = session.execute('select * from users')
    result = cursor.fetchall()
    
    # 添加
    cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
    session.commit()
    print(cursor.lastrowid)
    """
    
    session.close()
    
    

    13.Flask-SQLAlchemy

    flask和SQLAlchemy的管理者,通过他把他们做连接

    db = SQLAlchemy()
        - 包含配置
        - 包含ORM基类
        - 包含create_all
        - engine
        - 创建连接
    
    

    离线脚本,创建表

    # 存放models模型
    from flask import Flask
    from flask_script import Manager
    from flask_migrate import MigrateCommand,Migrate
    from flask_sqlalchemy import SQLAlchemy
    
    app = Flask(__name__)
    db = SQLAlchemy()   # 必须
    
    db.init_app(app)   # 必须
    
    manage = Manager(app)   # 必须
    Migrate(app,db)   # 必须
    
    manage.add_command('db',MigrateCommand)   # 必须
    
    
    # 添加迁移脚本的命令到manage中
    app.config['SQLALCHEMY_DATABASE_URI'] = "mysql+pymysql://root:123@127.0.0.1:3306/flask?charset=utf8"
    
    # 支持改表
    class User(db.Model):
        __tablename__ = 'user'
        id = db.Column(db.Integer,primary_key=True,autoincrement=True)
        user = db.Column(db.String(32),unique=True,nullable=True)
    
        def __repr__(self):
            return '<User%r>'%self.username
    
    if __name__ == '__main__':
        manage.run()
    
    
    

    详见代码

    flask-migrate
    python3 manage.py db init 初始化:只执行一次
    python3 manage.py db migrate 等同于makemigrations
    python3 manage.py db init upgrade 等同于migrate
    
    # 初始化之后再需要添加表的时候,只需要执行后两部即可
    
    

    单表操作

    ******************* 增 ***********************
    db.session.add(User(username='zhang'))
    db.session.commit()
    
    ******************* 查 ***********************
    obj = db.session.query(User).filter(User.username=='wang').first()
    
    # 使用方法和SQLAlchemy方法是一样的,只需要将Session换成db.session就可以了
    
    
    项目中使用案例
    from flask_sqlalchemy import BaseQuery
    
    class ProductNetValueQuery(BaseQuery):
    
        def get_all_products(self):
            return self.with_entities(ProductNetValue.id, ProductNetValue.name).group_by(
                ProductNetValue.name).all()
        
        def get__(self, trading_at, p):
            return self.filter(ProductNetValue.trading_at == trading_at).filter(
                ProductNetValue.name == p).first()
    
    class ProductNetValue(CRUDModel):
        __tablename__ = "product_net_values"
        query_class = ProductNetValueQuery
        
    使用的时候可以直接使用 res = ProductNetValue.query.get_all_products()  操作
    
    
    flask-sqlalchemy 字段
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    
    name = db.Column(db.String(64), index=True) 
    
    acc_net_value = db.Column(db.Float)
    
    cnt = db.Column(db.Integer)
    
    purchase = db.Column(db.DECIMAL(19, 2), name="Purchase")
    
    comment = db.Column(db.Text, name="Comment")
    
    is_last_update = db.Column(db.Boolean, name="IsLastUpdate", default=Fund, onupdate=True)
    
    update_time = db.Column(db.DateTime, name="UpdateTime", default=datetime.datetime.now, onupdate=datetime.datetime.now)    # onupdate 记录最后一次更新的时间
    
    
    外键约束字段
    选项名				说明
    backref			在关系的另一个模型中添加反向引用
    primaryjoin		明确指定两个模型之间使用的联结条件。只在模棱两可的关系中需要指定.
    lazy			指定如何加载相关记录。可选值如下 :
     				select(首次访问时按需加载)
     				immediate(源对象加载后就加载)
     				joined(加载记录,但使用联结)
     				subquery(立即加载,但使用子查询)
     				noload(永不加载)
     				dynamic(不加载记录,但提供加载记录的查询)
                    
    uselist			如果设为 Fales,不使用列表,而使用标量值
    order_by		指定关系中记录的排序方式
    secondary		指定多对多关系中关系表的名字
    secondaryjoin	SQLAlchemy 无法自行决定时,指定多对多关系中的二级联结条件
    
    

    14.项目依赖包处理

    #### 导出项目环境
    
    ​```
    1)进入本地项目根目录
    >: cd 项目根目录
    
    2)本地导出项目环境
    pip3 freeze > packages.txt
    >: 
    
    3)如果环境中有特殊的安装包,需要处理一下xadmin
    packages.txt中的
    	xadmin==2.0.1
    要被替换为
    	https://codeload.github.com/sshwsfc/xadmin/zip/django2
    ​```
    #### 项目虚拟环境
    
    ​```
    1)创建线上luffy项目虚拟环境
    >: mkvirtualenv luffy
    >: workon luffy
    
    2)安装所需环境,在packages.txt所在目录下安装执行packages.txt文件
    >: pip install uwsgi
    >: pip install -r /home/project/luffyapi/packages.txt
    ​```
    #### 项目提交到远程git仓库
    
    ​```
    1)去向本地项目仓库
    >: cd 项目根目录
    
    2)本地版本库操作
    >: git status
    >: git add .
    >: git commit -m '项目2.0上线'
    
    3)提交到远程版本库
    >: git pull origin master
    >: git push origin master
    ​```
    
    

    15.虚拟环境

    ### 安装虚拟环境
    
    ​```
    1)安装依赖
    >: pip3 install virtualenv
    >: pip3 install virtualenvwrapper
    
    2)建立虚拟环境软连接
    >: ln -s /usr/local/python3/bin/virtualenv /usr/bin/virtualenv
    
    3)配置虚拟环境:填入下方内容
    >: vim ~/.bash_profile
    
    VIRTUALENVWRAPPER_PYTHON=/usr/bin/python3
    source /usr/local/python3/bin/virtualenvwrapper.sh
    
    4)退出编辑状态
    >: esc
    
    5)保存修改并退出
    >: :wq
    
    6)更新配置文件内容
    >: source ~/.bash_profile
    
    7)虚拟环境默认根目录:~/.virtualenvs
    ​```
    
    

    24.flask前后端分离项目总结

    前端使用Vue,后端使用flask

    第一种情况:前端发送get或post请求

    如果前端直接发送get或者使用form表单提交post请求,
    get请求在flask中使用request.args 获取发送过来的请求
    post请求在flask中直接使用request.form.get()来获取发送过来的信息
    
    

    第二种情况:前端使用axios发送post请求

    当前端使用axios发送请求的时候,后端接收到的请求是json格式的,而且数据是在request.data中,不在request.form中,这点要注意
    后端接收的时候使用:request.data接收,再使用json做一个转化
    data = json.loads(request.data)
    username = data.get('username')
    password = data.get('password')
    re_password = data.get('re_password')
    
    

    25.pipenv 虚拟环境

    pipenv --where                 列出本地工程路径
    pipenv --venv                  列出虚拟环境路径
    pipenv --py                    列出虚拟环境的Python可执行文件
    pipenv install                 创建虚拟环境
    pipenv isntall [moduel]        安装包
    pipenv install [moduel] --dev  安装包到开发环境
    pipenv uninstall[module]       卸载包
    pipenv uninstall --all         卸载所有包
    pipenv graph                   查看包依赖
    pipenv lock                    生成lockfile
    pipenv run python [pyfile]     运行py文件
    pipenv --rm                    删除虚拟环境
    pipenv run python xxx.py       运行python代码
    pipenv lock -r --dev > requirements.txt    pipenv生成txt命令
    pipenv install -r requirements.txt    安装txt包
    pipenv shell    启动虚拟环境
    
    pipenv run flask run --port=5080
    yarn run serve
    
    
  • 相关阅读:
    HDU4685 Prince and Princess 完美搭配+良好的沟通
    坚持 本身是一种策略
    PowerDesigner中SQL文件、数据库表反向生成PDM
    Filter技术+职责链模式
    [ACM] poj 1258 Agri-Net (最小生成树)
    android 屏幕适配 课程笔记
    HDU 5071 Chat
    【玩转微信公众平台之中的一个】序章(纯粹扯淡)
    HTML表格标签的使用-&lt;table&gt;
    hdu 1251 统计难题 (map水过)
  • 原文地址:https://www.cnblogs.com/whkzm/p/13783119.html
Copyright © 2011-2022 走看看