Flask
flask中还需要掌握的知识
jinja2 Werkzeug 路由源码 CBV源码及使用 请求上下文 路由中正则表达式
1.Flask初始
Flask是一个同步web框架,跟django一样,属于轻量级的,小而精,使用灵活
Flask基于Werkzeug,
Flask只保留了web开发的核心功能
flask有很多组件都没有,models使用的是SQLAlchemy
nginx不能直接调用代码,是调用的服务来调用代码,来将数据返回,nginx只能调用静态资源,不能直接调用动态资源。
2.Flask四剑客
1.返回字符串:return
from flask import Flask,render_template,redirect,jsonify
return '直接返回字符串' # 相当于HttpResponse
2.返回html页面:render_template
将html页面放在 templates 中,会自动找到
from flask import Flask,render_template,redirect,jsonify
return render_template(".html",在html中使用的名字="需要携带的参数")
在heml页面中使用{{}} 接收,如果是字典的话,可以使用字典的取值方法 get或者直接点的方式取出
3.重定向:redirect
from flask import Flask,render_template,redirect,jsonify
return redirect("/login")
4.返回json字符串
from flask import Flask,render_template,redirect,jsonify
name_dict = [{'name': "jason-gdx"},{'name': "tank-sb"}]
return jsonify(name_dict)
3.Flask的配置文件
方式一
直接进行配置
app.debug=True
方式二
以字典的形式
app.config['DEBUG']=True
方式三
使用配置文件的形式
settings.py
DEBUG = True
app.py
app.config.from_pyfile("settings.py")
方式四
使用类的形式,在settings配置文件创建类,在不同的环境下使用不同的类
settings.py
class Config(object):
DEBUG = False
TESTING = False
DATABASE_URI = 'sqlite://:memory:'
class ProductionConfig(Config):
DATABASE_URI = 'mysql://user@localhost/foo'
class DevelopmentConfig(Config):
DEBUG = True
class TestingConfig(Config):
TESTING = True
app.py
app.config.from_object('settings.DevelopmentConfig') # from_object 是一个固定的方法
4.路由详解
路由源码分析(重要)
路由route中()的参数
rule 就是路由
endpoint 取别名,反向解析,如果没有就用当前函数名,不能重复
methods 方法
view_func 就我们endpoint,指向的函数,有人就是请求的时候
strict_slashes 该参数是用来设置,我们的路由是否为严格模式,Flask是非严格模式,True是严格模式,默认是严格模式
redirect_to 重定向到指定地址,直接跳转,不再执行下面视图函数中的代码
路由常见使用
1.有名传参
@app.route('/login/<string:nid>/')
def index(url_path,nid):
print(nid)
return '返回值'
传入的参数的数据类型有:int,string,float,path,uuid
2.无名传参
# http://127.0.0.1:5000/login/ja/?name=wang&password=123
@app.route('/login/')
def index():
name = request.args.get('name') # 获取使用?拼接传过来的值
return '返回值'
3.多路由选择使用
# http://127.0.0.1:5000/login/ja/?name=wang&password=123
@app.route('/<any(login,reginstr):url_path>/<string:nid>/')
def index(url_path, nid):
return '返回值'
# 多路由选择使用就是在括号内定义多个路由关键字,在前端访问的时候只要访问到了路由中的路径,就可以匹配到此视图函数
4.反向解析
使用url_for,可以通过函数名来找到函数对应的路由,再通过重定向来跳转到指定的页面,url也可以传参,使用对应url中的参数名来进行传参
@app.route('/')
def register():
return url_for('index',) #/login/sgvc 在使用反向解析的时候,如果有参数,需要把参数也带上
处理动态的视图函数
@app.route("/login/<string:nid>",endpoint="login")
def login(nid):
return "登录页面"
@app.route("/<string:nid>",endpoint="home")
def home(nid):
print("主页面",nid)
# nid是login函数中需要携带的参数,如果需要在后边使用?拼接参数,使用**=**的形式来进行拼接
return redirect(url_for('login',nid="111",name="wang"))
@app.route('/student/<int:id>/')
def student(id):
return 'student {}'.format(id)
5.反向解析拼接参数
拼接参数是在url的最后使用?来对参数进行拼接,可以使用任意键值对来进行拼接
@app.route('/')
def register():
return url_for('index',name='wang',) #/login/sgvc?name=wang 有参数的也可以这样写
直接在括号内传入关键字参数,以无名路由的形式返回
6.自定义动态路由:正则匹配
# 我们要用自定义的路由,用正则的话
#1导入from werkzeug.routing import BaseConverter
# 2我先要写一个类,然后继承BaseConverter,然后实现__inti__, def to_python(self, value):to_url(self, value)
# 3 app.url_map.converters['谁便'] = RegexConverter
#4 我们在路由里面@app.route('/index/<regex1("d+"):nid>'),regex1='随便,regex1("正则表达式")
#5 regex1("正则表达式")匹配出来的结果,返回to_python,一定要return
#6 当我们做反向解析的解析的时候,我们的参数,会传递给to_url,return的结果才是我们拼接到我们路由上
from flask import Flask, views, url_for
from werkzeug.routing import BaseConverter
app = Flask(import_name=__name__)
class RegexConverter(BaseConverter):
"""
自定义URL匹配正则表达式
"""
def __init__(self, map, regex):
super(RegexConverter, self).__init__(map)
self.regex = regex
def to_python(self, value):
"""
路由匹配时,匹配成功后传递给视图函数中参数的值
"""
#value就正则匹配出来的结果
print('value',value,type(value))
return "asdasdasd"
def to_url(self, value):
"""
使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数
"""
val = super(RegexConverter, self).to_url(value)
print(val)
return val
app.url_map.converters['regex1'] = RegexConverter # 对自定义的正则进行注册
@app.route('/index/<regex1("d+"):nid>',endpoint="sb")
def index(nid):
print("nid",nid,type(nid))
print(url_for('sb', nid='888'))
# /index/666
return 'Index'
if __name__ == '__main__':
app.run()
7.视图函数装饰器
可以对视图函数加上装饰器,这样在执行视图函数的时候会先去做校验,然后再执行视图函数中的内容
from flask import Flask, request
from functools import wraps
app = Flask(__name__)
def login_verify(func):
@wraps(func) ############ 必须
def wrapper(*args, **kwargs):
user_name = request.args.get('user')
password = request.args.get('password')
if user_name == 'mark' and password == '123':
return func(*args,**kwargs)
else:
return '请登录'
return wrapper
@app.route('/my_info/')
@login_verify # 使用装饰器
def my_info():
return '个人信息页面'
注意点
1.装饰器一定要写在注册路由的下面,写在试图函数的上面
2.装饰器内部一定要使用@wraps(func) 方法,用于保护被装饰函数的属性
5.CBV详解(源码)
flask中有视图类,当前有两种视图,from flask.views import MethodView,View
,使用的时候需要注意重写dispatch_request
, 写完视图需要通过 app.add_url_rule(url_rule, view_func)
来进行注册
6.模板
一些常见的模板语法,使用的时候和html中基本相同都是使用 {{ }} 进行传参,但是需要加括号运行的,需要加括号才能运行,使用模板的语法的时候有时候需要自己输入,不会进行提示,
在视图中返回模板的时候,使用方法如下
# 1.新建一个templates文件夹,把html文件放在这个文件夹下
# 2.定义一个视图函数,将对应的html页面返回,在输入的时候要将后边的.html也加上。
from flask import render_template
@app.route("/login/html",endpoint="login/html")
def login_html():
info = "我是info信息"
return render_template("html_test.html",info=info,html="<h3>我需要在前端进行展示</h3>") # 记得在前端加上safe取消转义
# 第二种方法
# 视图函数中使用
from flask import Markup
def func(st,st1):
return Markup(f"<h1>使用func返回的数据{st}{st1}</h1>")
@app.route("/login/html",endpoint="login/html")
def login_html():
info = "我是info信息"
return render_template("html_test.html",info=info,func=func)
# 前端使用
{{func("aaaa","bbbb")}}
# 总结
# 1.如果返回了html标签,需要在前端中使用{{html|safe}},如果没有加上safe,会按照字符串的形式显示出来
# 2.如果使用了Markup方法,就需要导入,这种方法可以传入多个参数
逻辑判断
<table>
{% if 10 %} # 开始
<h1>hahahahh</h1>
{% else %} # 否则,必须有
<h2>aaaaaaaa</h2>
{% endif %} # 结束,必须有
</table>
for循环
<table>
{% for i in "ajfkdhksadh" %}
<p>{{ i }}</p> # 需要加括号取值
{% endfor %}
</table>
在模板中使用反向解析
<a href="{{url_for('login',nid=1)}}">登录</a> # 如果有参数需要加上参数
7.请求与响应
flask中需要的请求都需要从全局导入,不能在函数中导入
使用request和response进行操作,操作方式与django一致
自定义响应要使用 make_response()
1.导入make_response
2.response =make_response(4剑客),四剑客正常使用,只是对response做定制化响应,添加一点想添加的参数
3.操作response
4.return response
例如:(for example)
response = make_response(render_templage('index.html'))
response.set_cookie('json','sb')
return response
常见的请求响应
from flask import Flask
from flask import request
from flask import render_template
from flask import redirect
from flask import make_response
app = Flask(__name__)
@app.route('/login.html', methods=['GET', "POST"])
def login():
# 请求相关信息
# request.method 提交的方法
# request.args get请求提交的数据
# request.form post请求提交的数据
# request.values post和get提交的数据总和
# request.cookies 客户端所带的cookie
# request.headers 请求头
# request.path 不带域名,请求路径
# request.full_path 不带域名,带参数的请求路径
# request.script_root
# request.url 带域名带参数的请求路径
# request.base_url 带域名请求路径
# request.url_root 域名
# request.host_url 域名
# request.host 127.0.0.1:500
# request.files 接受文件
# obj = request.files['the_file_name'] 获取文件对象
# obj.save('/var/www/uploads/' + secure_filename(f.filename)) 将文件保存在某个地方
# 响应相关信息
# return "字符串"
# return render_template('html模板路径',**{})
# return redirect('/index.html')
#return jsonify({'k1':'v1'})
# response = make_response(render_template('index.html'))
# response是flask.wrappers.Response类型
# response.delete_cookie('key') # 删除指定的cookie的值
# response.set_cookie('key', 'value') # 添加指定的cookie值
# response.headers['X-Something'] = 'A value' # 设置头中的信息
# return response
return "内容"
if __name__ == '__main__':
app.run()
8.session
app.session_interface里边发生的事情:
存session:save_session
1.存session的时候,调用save_session,将我们的session加密的val,读取配置文件得到key,
2.将1中的key,val存储到cookies
取session
1.获取request里边的cookies,获取里面的key,这个key就是['SESSION_COOKIE_NAME'],值就是加密的值
2.对该值进行解密
关于session的一些设置
app.secret_key="askjdaksd" # 使用cookie的时候必须添加这个参数
app.config['SESSION_COOKIE_NAME']="dsb" # 设置在cookie中的key键
使用
from flask import Flask,render_template,make_response,request
from flask import session
app = Flask(__name__)
app.secret_key = 'jlaksjdfl' # 设置session使用的密钥
@app.route('/',methods=['get'])
def index():
session['name'] = 'wang'
response = make_response('hahahahah')
return response
@app.route('/login')
def login():
response = make_response(session['name'])
return response
if __name__ == '__main__':
app.run()
9.闪现
什么是闪现
闪现就是闪一下就没有了,a产生信息,传给c页面,但是用户访问a页面以后,不是直接跳转到c,而是到b,或者是到其他页面,但是用户访问c页面的时候,可以把a给我的信息拿到
闪现用到的两个方法
flash,get_flashed_messages # 存的时候使用flash来存,取的时候使用get_flash_messages来取
from flask import Flask,flash,get_flashed_messages,request
1.如果要用闪现,必须先设置 app.secret_key = 'aaaaaa'
2.只能取一次,再取就没有了
3.我们可以通过flash('普通信息',category="info"),对信息做分类
4.get_flashed_messages(with_categories=True,category_filter=("error",)),with_categories以键值对的形式获取我们设置的闪现,category_filter=("error",)对设置的闪现 进行信息分类的过滤,只会取出元组中的值
5.在同一个视图函数中可以使用多次get_flashed_messages,
from flask import Flask,flash,get_flashed_messages,request
app = Flask(__name__)
app.secret_key = 'asdfasdf'
@app.route('/index1')
def index():
flash('超时错误',category="error")
flash('普通信息',category="info")
return "ssdsdsdfsd"
@app.route('/error')
def error():
data = get_flashed_messages(with_categories=True,category_filter=("error","info"))
data1 = get_flashed_messages(with_categories=True, category_filter=("error", "info")) # 在一个视图函数中可以使用多次
print("data1",data1)
print("data",data)
return "错误信息"
if __name__ == '__main__':
app.run()
# 注:没有with_categories=True,返回['超时错误']
# 加上with_categories=rue,返回[('error', '超时错误')]
demon
# 使用闪现做登录之后的提示信息
# 前端代码
<form action="" method="post">
<p>用户名:<input type="text" name="username">{{error}}</p>
<p>密码:<input type="text" name="password">{{error}}</p>
<p>{{get_flashed_messages()[0]}}</p>
<button type="submit">提交</button>
</form>
# 后端代码
from flask import flash,get_flashed_messages
@app.route('/login',endpoint='login',methods=['POST'])
def login():
error = ''
if request.method == 'POST':
if request.form['username'] == 'wang'
and request.form['password'] == '123':
print(request.form)
flash("登录成功")
else:
error = '用户名或密码错误'
return render_template('html_test.html',error=error)
10.请求扩展
与django中的中间件相似,使用对应的请求扩展绑定对应的方法,这样在执行视图函数之前都会先执行已经做了请求扩展的方法,之后再执行视图函数
1.before_request
类比django中间件中的process_request,在请求收到之前绑定一个函数做一些事情
#基于它做用户登录认证
@app.before_request
def process_request(*args,**kwargs):
if request.path == '/login':
return None
user = session.get('user_info')
if user:
return None
return redirect('/login')
2.after_request
类比django中间件中的process_response,每一个请求之后绑定一个函数,如果请求没有异常
@app.after_request
def process_response1(response):
print('process_response1 走了')
return response
3.before_first_request
第一次请求时,跟浏览器无关
@app.before_first_request
def first():
pass
4.teardown_request
每一个请求之后绑定一个函数,即使遇到了异常,无论有没有异常都会执行,如果没有异常这个参数就是None,有就记录异常,我们可以在这里设置一个异常日志,当有异常的时候就记录到日志中
@app.teardown_request
def ter(e):
pass
5.errorhandler
路径不存在时404,服务器内部错误500,也可以使用其他的状态码,当出现这种状态码的时候触发。
@app.errorhandler(404) # 可以拦截到指定的错误状态码,当出现这种错误的时候会执行这个请求中来,并且代码中不会报错。
def error_404(arg):
return "404错误了"
6.template_global
标签,把一个方法变成一个全局的方法,在使用的时候不需要再传了,直接使用就行了,也可以在前端直接使用
@app.template_global()
def sb(a1, a2):
return a1 + a2
#{{sb(1,2)}}
7.template_filter
过滤器
@app.template_filter()
def db(a1, a2, a3):
return a1 + a2 + a3
#{{ 1|db(2,3)}} # 1是默认传的第一个参数,(2,3)是自己传入的
8.context_processor
@app.context_processor
def myprocess():
return {}
# 上下文管理器应该返回一个字典,字典中的 key 会被模板中当成变量来渲染
# 上下文管理器中返回的字典,在所有页面中都可以用
# 被这个装饰器修改的钩子函数,必须要返回一个字典,即使为空也要返回
总结:
1.重点掌握befor_request和after_request
2.注意有多个情况下的执行顺序
3.befor_request请求拦截后(也就是有return值),response所有都执行
flask中的钩子函数
# 钩子函数可以分为两层说明,第一层是 app 层,第二层则是 blueprint 层
app 层的钩子函数有 before_request,before_first_request,after_request,teardown_request,
blueprint 层关于request 的钩子函数其实和app层基本一致,有两点区别:
1.与 app 层相比,blueprint 层少了一个 before_first_request 的钩子函数多了一些,可以在蓝图调用 2.app 层的钩子函数before_app_first_request,before_app_request,after_app_request,after_app_request,teardown_app_request
11.中间件
from flask import Flask
app = Flask(__name__)
@app.route('/')
def index():
return 'Hello World!'
# 模拟中间件class Md(object):
def __init__(self,old_wsgi_app):
self.old_wsgi_app = old_wsgi_app
def __call__(self, environ, start_response):
print('开始之前')
ret = self.old_wsgi_app(environ, start_response)
print('结束之后')
return ret
if __name__ == '__main__':
#1我们发现当执行app.run方法的时候,最终执行run_simple,最后执行app(),也就是在执行app.__call__方法
#2 在__call__里面,执行的是self.wsgi_app().那我们希望在执行他本身的wsgi之前做点事情。
#3 所以我们先用Md类中__init__,保存之前的wsgi,然后我们用将app.wsgi转化成Md的对象。
#4 那执行新的的app.wsgi_app,就是执行Md的__call__方法。
#把原来的wsgi_app替换为自定义的,
app.wsgi_app = Md(app.wsgi_app)
app.run()
12.请求流程
flask的请求流程是从请求来到响应的时候这一段使时间内走过的流程
1.执行app.__call__
2.执行wsgi_app()
3.
请求所有的流程
ctx = self.request_context(environ)
error = None
try:
try:
ctx.push()
#根据路径去执行视图函数,视图类
response = self.full_dispatch_request()
except Exception as e:
error = e
response = self.handle_exception(e)
except: # noqa: B001
error = sys.exc_info()[1]
raise
return response(environ, start_response)
finally:
#不管出不出异常,都会走这里
if self.should_ignore_error(error):
error = None
ctx.auto_pop(error)
13.local
local就是一把锁
兼容线程和协程(源码到request中去看,看local的__getattr__,setattr)
try:
from greenlet import getcurrent as get_ident
except Exception as e:
from threading import get_ident
from threading import Thread
import time
class Local(object):
def __init__(self):
object.__setattr__(self,'storage',{})
def __setattr__(self, k, v):
ident = get_ident()
if ident in self.storage:
self.storage[ident][k] = v
else:
self.storage[ident] = {k: v}
def __getattr__(self, k):
ident = get_ident()
return self.storage[ident][k]
obj = Local()
def task(arg):
obj.val = arg
obj.xxx = arg
print(obj.val)
for i in range(10):
t = Thread(target=task,args=(i,))
t.start()
14.偏函数
参数在使用的时候可以先传入一部分参数,之后再调用的时候将剩余的值传入就可以执行了
from functools import partial
def test(a,b,c,d):
return a + b + c + d
tes = partial(test,2,2) # 括号内的参数为 需要调用的函数名,剩余的参数
print(tes(3,4)) # 传入的2,3是c,d的值
15.请求上下文
'''
1 app.__call__
2 wsgi_app(environ, start_response)
2.1 ctx = self.request_context(environ)
2.1.1 return RequestContext(self, environ)
这里的self是app,environ请求相关
2.1.2 return RequestContext(self, environ)
得到了RequestContext的对象,而且有request属性
2.2 2.1中的ctx就是RequestContext的对象
2.3 ctx.push()执行这个,就是RequestContext的对象的push方法
2.3.1 #执行这个,self-->ctx
_request_ctx_stack.push(self)
2.3.1.1 我们发现_request_ctx_stack = LocalStack()
他的push方法的源码:
def push(self, obj):
rv = getattr(self._local, "stack", None)
if rv is None:
# self._local=>stack-->storage['线程id']['stack']=[ctx,]
self._local.stack = rv = []
rv.append(obj)
return rv
3在请求中获取request.form
3.1 request是LocalProxy的对象,当获取属性的时候会走__getattr__
def __getattr__(self, name):
if name == "__members__":
return dir(self._get_current_object())
#name-->form,
#self._get_current_object()===>ctx.request,form
#_get_current_object()---》self.__local()
return getattr(self._get_current_object(), name)
3.1.1 self._get_current_object():源码:最终:partial(_lookup_req_object, "request")
def _get_current_object(self):
if not hasattr(self.__local, "__release_local__"):
#local==>partial(_lookup_req_object, "request")
#def __init__(self, local, name=None):
# object.__setattr__(self, "_LocalProxy__local", local)
#self.__local()===>local()
return self.__local()
try:
return getattr(self.__local, self.__name__)
except AttributeError:
raise RuntimeError("no object bound to %s" % self.__name__)
4 partial(_lookup_req_object, "request")偏函数的源码
def _lookup_req_object(name):
#name是request
#ctx
top = _request_ctx_stack.top
if top is None:
raise RuntimeError(_request_ctx_err_msg)
#ctx-->request
return getattr(top, name)
4.1中_request_ctx_stack.top
@property
def top(self):
try:
return self._local.stack[-1]
except (AttributeError, IndexError):
return None
'''
程序运行,两个LocalStack()对象,一个里面放request和session,另一个方g和current_app
16.蓝图
构建项目的目录,可以直接使用模板,直接使用项目文件夹的模板
做一个项目的框架,划分项目目录
views视图类中
from flask import Blueprint
user = Blurprint('文件包的名',__name__) # 此时的user相当于app,文件包的名是做反向解析的时候使用的名,例如:url_for(acc.login)
每一个包中都有一个包名,可以在不同的视图中使用请求扩展,如果需要对所有的视图都要进行操作,就把指定的请求扩展写在__init__中
# 路由中的使用
@user.route()
__init.py中
from flask import Flask
app = Flask(__name__,template_folder='templates',static_folder='statics',static_url_path='/static')
# 对所有的视图做请求扩展
@app.before_request
def a():
print("我是app里面的befor_request")
from .views.account import account
from .views.blog import blog
from .views.user import user
app.register_blueprint(admin, url_prefix='/admin') # url_prefix 在请求指定的函数中的方法的时候需要加上对应的url别名,不然没有办法直接访问,类似于前缀
# 蓝图总结
1.先导入蓝图,在需要使用蓝图的视图函数中定义蓝图
2.对定义好的蓝图进行注册
3.在对应的视图函数中写视图函数
4.做反向解析的时候需要把蓝图的前缀加上,防止不同视图中的视图函数重复出现
17.g对象
g对象:global
1.g对象是专门用来保存用户的数据的
2.g对象是在一次请求中的所有的代码的地方,都是可以使用的
g对象的特性:
当前请求内你设置就可以取,必须先设置后取,当前请求类可以取且无限制
就算你当前请求设置了,如果不去,其他请求过来,也取不到
from flask import g
def set_g(): # 设置g对象的使用
g.name = 'wang'
@app.route('/aa')
def a():
g.name = 'zhang' # 这里改变后在这个视图函数中使用的g对象都是改变后的对象
set_g() # 调用g对象,在使用的时候也可以不写
print(g.name)
return '1111'
18.信号
Flask框架中的信号基于blinker,其主要作用就是让开发者在flask请求过程中定制一些用户行为,
信号和请求扩展的区别
请求扩展:只能使用内置的一些请求扩展,在指定的位置执行
信号:可以自定义信号,在需要使用的地方调用就可以执行对应的方法,
安装
pip install blinker
往信号中注册函数
from flask import Flask,signals,render_template
app = Flask(__name__)
# 往信号中注册函数
#1给信号绑定要执行的函数
#无需管调用,因为flask,已经给我们设置调用点
def func(*args,**kwargs):
print('触发信号',args,kwargs)
#与该信号进行绑定
signals.request_started.connect(func)
signals.request_started.send()
# 触发信号: signals.request_started.send()
一个流程中的信号出发点
a. before_first_request
b. 触发 request_started 信号
c. before_request
d. 模板渲染
渲染前的信号 before_render_template.send(app, template=template, context=context)
rv = template.render(context) # 模板渲染
渲染后的信号 template_rendered.send(app, template=template, context=context)
e. after_request
f. session.save_session()
g. 触发 request_finished信号
如果上述过程出错:
触发错误处理信号 got_request_exception.send(self, exception=e)
h. 触发信号 request_tearing_down
flask内置的一些信号
request_started = _signals.signal('request-started') # 请求到来前执行
request_finished = _signals.signal('request-finished') # 请求结束后执行
before_render_template = _signals.signal('before-render-template') # 模板渲染前执行
template_rendered = _signals.signal('template-rendered') # 模板渲染后执行
got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行
request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)
appcontext_pushed = _signals.signal('appcontext-pushed') # 应用上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped') # 应用上下文pop时执行
message_flashed = _signals.signal('message-flashed') # 调用flask在其中添加数据时,自动触发
自定义信号
from flask import Flask, current_app, flash, render_template
from flask.signals import _signals
app = Flask(import_name=__name__)
# 1.自定义信号,xxxxxx就是这个信号的名字,之后调用的时候就是用xxx来进行调用
xxxxx = _signals.signal('xxxxx')
def func(sender,a):
print(sender,a)
print("我是自定义信号")
# 2.自定义信号中注册函数
xxxxx.connect(func)
@app.route("/x")
def index():
# 触发信号
xxxxx.send("sb",a="1") # 3.调用自定义的信号,必须要有一个参数,第一个是位置参数,第二个是关键字参数
return 'Index'
if __name__ == '__main__':
app.run()
19.flask-session
作用:将默认保存的签名cookie中的值,保存到redis/memcached/file/Mongodb/SQLAlchemy,简单的说就是把session保存到数据库中
安装:pip install flask-session
使用1:(直接使用RedisSessionInterface方法)
from flask import Flask,session
from flask_session import RedisSessionInterface
import redis
app = Flask(__name__)
app.secret_key="ajksda"
conn=redis.Redis(host='127.0.0.1',port=6379)
#use_signer是否对key签名
app.session_interface=RedisSessionInterface(conn,key_prefix='jason',use_signer=True, permanent=False)
"""
参数解析
permanent=False 关闭浏览器是否清除cookie,默认为True,不清除
key_prefix='jason' 设置前缀,保存在数据库中的时候的前缀
use_signer=True 设置密钥
"""
@app.route('/')
def hello_world():
session['sb']='jason'
return 'Hello World!'
@app.route("/index")
def index():
print(session['sb'])
return "ok"
if __name__ == '__main__':
app.run()
使用2:(推荐使用)
from flask import Flask,session
import redis
from flask_session import Session # 直接导入Session就行了
app = Flask(__name__)
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] =redis.Redis(host='127.0.0.1',port='6379')
app.config['SESSION_KEY_PREFIX']="jason"
Session(app)
@app.route('/')
def hello_world():
session['sb']='jason'
return 'Hello World!'
@app.route("/index")
def index():
print(session['sb'])
return "ok"
if __name__ == '__main__':
app.run()
问题:设置cookie时,如何设定关闭浏览器则cookie失效
response.set_cookie('k','v',exipre=None)#这样设置即可:exipre=None 最长过期时间
#在session中设置
app.session_interface=RedisSessionInterface(conn,key_prefix='lqz',permanent=False)
#一般不用,我们一般都设置超时时间,多长时间后失效
问题:cookie默认超时时间是多少?如何设置超时时间
#源码expires = self.get_expiration_time(app, session)
'PERMANENT_SESSION_LIFETIME': timedelta(days=31),#这个配置文件控制
20.自定义命令:flask-script
可以自己定义使用一些命令,在不用启动项目的情况下操作项目中的一些方法
#第一步安装:pip3 install flask-script
from flask import Flask
from flask_script import Manager # 使用命令行启动的时候要导入Manage
app = Flask(__name__)
manager=Manager(app)
@app.route("/")
def index():
return "ok"
# 位置传参
@manager.command
def custom1(arg,a):
"""
自定义命令
python manage.py custom 123
:param arg:
:return:
"""
print(arg,a)
# 关键字传参
@manager.option('-n', '--name', dest='name') # -n 是简写,--name是全称,name必须与参数中的name一致
@manager.option('-u', '--url', dest='url')
def cmd1(name, url):
"""
自定义命令(-n也可以写成--name)
执行: python manage.py cmd -n lqz -u http://www.oldboyedu.com
执行: python manage.py cmd --name lqz --url http://www.oldboyedu.com
:param name:
:param url:
:return:
"""
print(name, url)
if __name__ == '__main__':
manager.run()
21.多app应用
from werkzeug.wsgi import DispatcherMiddleware
from werkzeug.serving import run_simple
from flask import Flask
app1 = Flask('app01')
app2 = Flask('app02')
@app1.route('/index')
def index():
return "app01"
@app2.route('/index')
def index2():
return "app2"
dm = DispatcherMiddleware(app1, {
'/sec12': app2,
})
if __name__ == "__main__":
run_simple('localhost', 5000, dm)
# 访问app2应用的路由:localhose:5000/sec/index
22.wtforms
安装:pip install wtforms
作用:与django中的forms组件作用相似
使用1:
from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets
app = Flask(__name__, template_folder='templates')
app.debug = True
class LoginForm(Form):
# 字段(内部包含正则表达式)
name = simple.StringField(
label='用户名',
# 对字段的判断及错误信息展示
validators=[
validators.DataRequired(message='用户名不能为空.'),
validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
],
widget=widgets.TextInput(), # 页面上显示的插件
render_kw={'class': 'form-control'}, # 添加类和属性
default='axal' # 设置默认值
)
# 字段(内部包含正则表达式)
pwd = simple.PasswordField(
label='密码',
validators=[
validators.DataRequired(message='密码不能为空.'),
validators.Length(min=8, message='用户名长度必须大于%(min)d'),
validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*d)(?=.*[$@$!%*?&])[A-Za-zd$@$!%*?&]{8,}",
message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'GET':
form = LoginForm()
return render_template('login.html', form=form)
else:
form = LoginForm(formdata=request.form)
if form.validate(): # 验证信息是否正确
print('用户提交数据通过格式验证,提交的值为:', form.data)
else:
print(form.errors)
return render_template('login.html', form=form)
if __name__ == '__main__':
app.run()
login.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>登录</h1>
<form method="post">
<p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p> # 获取某个字段的错误
<p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
<input type="submit" value="提交">
</form>
</body>
</html>
使用2:
from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets
app = Flask(__name__, template_folder='templates')
app.debug = True
class RegisterForm(Form):
name = simple.StringField(
label='用户名',
validators=[
validators.DataRequired()
],
widget=widgets.TextInput(),
render_kw={'class': 'form-control'},
default='alex'
)
pwd = simple.PasswordField(
label='密码',
validators=[
validators.DataRequired(message='密码不能为空.')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
pwd_confirm = simple.PasswordField(
label='重复密码',
validators=[
validators.DataRequired(message='重复密码不能为空.'),
validators.EqualTo('pwd', message="两次密码输入不一致") # 直接做判断,相当于钩子
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
# 邮箱
email = html5.EmailField(
label='邮箱',
validators=[
validators.DataRequired(message='邮箱不能为空.'),
validators.Email(message='邮箱格式错误') # 自动校验邮箱格式
],
widget=widgets.TextInput(input_type='email'),
render_kw={'class': 'form-control'}
)
# 单选
gender = core.RadioField(
label='性别',
choices=(
(1, '男'),
(2, '女'),
),
coerce=int # “1” “2”,指定传入的是那种类型,默认是字符串
)
city = core.SelectField(
label='城市',
choices=(
('bj', '北京'),
('sh', '上海'),
)
)
# 多选
hobby = core.SelectMultipleField(
label='爱好',
choices=(
(1, '篮球'),
(2, '足球'),
),
coerce=int
)
favor = core.SelectMultipleField(
label='喜好',
choices=(
(1, '篮球'),
(2, '足球'),
),
widget=widgets.ListWidget(prefix_label=False),
option_widget=widgets.CheckboxInput(),
coerce=int,
default=[1, 2]
)
# 可以改变已经定义好的字段的值
def __init__(self, *args, **kwargs):
super(RegisterForm, self).__init__(*args, **kwargs)
self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))
# 局部校验
def validate_pwd_confirm(self, field):
"""
自定义pwd_confirm字段规则,例:与pwd字段是否一致
:param field:
:return:
"""
# 最开始初始化时,self.data中已经有所有的值
if field.data != self.data['pwd']:
# raise validators.ValidationError("密码不一致") # 继续后续验证
raise validators.StopValidation("密码不一致") # 不再继续后续验证
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'GET':
form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial
return render_template('register.html', form=form)
else:
form = RegisterForm(formdata=request.form)
if form.validate():
print('用户提交数据通过格式验证,提交的值为:', form.data)
else:
print(form.errors)
return render_template('register.html', form=form)
if __name__ == '__main__':
app.run()
login.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>用户注册</h1>
<form method="post" novalidate style="padding:0 50px">
{% for field in form %}
<p>{{field.label}}: {{field}} {{field.errors[0] }}</p>
{% endfor %}
<input type="submit" value="提交">
</form>
</body>
</html>
23.SQLAlchemy
# https://www.cnblogs.com/xiaoyuanqujing/protected/articles/11715497.html, 学习网址
SQLAlchemy是一个基于python实现的ORM框架,该框架建立在DB API之上,使用关系对象映射进行数据库操作,简而言之就是:将类和对象转换成SQL,然后使用数据API执行SQL并回去执行结果
pip install sqlalchemy
组成部分
Engine,框架的引擎
Connection Pooling ,数据库连接池
Dialect,选择连接数据库的DB API种类
Schema/Types,架构和类型
SQL Exprression Language,SQL表达式语言
SQLAlchemy本身无法操作数据库,其必须依赖pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作
MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
django中如何反向生成models
python manage.py inspectdb > app/models.py
简单使用(能创建表,删除表,不能修改表)
1.执行原生sql(不常用)
import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
def task(arg):
conn = engine.raw_connection()
cursor = conn.cursor()
cursor.execute(
"select * from app01_book"
)
result = cursor.fetchall()
print(result)
cursor.close()
conn.close()
for i in range(20):
t = threading.Thread(target=task, args=(i,))
t.start()
2.orm使用
models.py
import datetime
from sqlalchemy import create_engine # 必须导入
from sqlalchemy.ext.declarative import declarative_base # 必须导入
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime,
UniqueConstraint, Index
from werkzeug.security import generate_password_hash, check_password_hash # 密码加密反解
Base = declarative_base() # 要用表就要产生这个对象
class Users(Base):
__tablename__ = 'users' # 数据库表名称,必须
id = Column(Integer, primary_key=True) # id 主键
name = Column(String(32), index=True, nullable=False) # name列,索引,不可为空
# email = Column(String(32), unique=True)
#datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
# ctime = Column(DateTime, default=datetime.datetime.now)
# extra = Column(Text, nullable=True)
__table_args__ = (
# UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一
# Index('ix_id_name', 'name', 'email'), #索引
)
def init_db():
"""
根据类创建数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/aaa?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Base.metadata.create_all(engine) # 创建表的语句
def drop_db():
"""
根据类删除数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/aaa?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Base.metadata.drop_all(engine) # 删除表的语句
if __name__ == '__main__':
# drop_db()
init_db()
# 1.创建表的第一步:创建引擎去链接
# 2.按照指定的字段去设置字段
# 3.执行指定的添加表和删除表的命令
# 4.表创建好之后不能再更改
app.py
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users
#"mysql+pymysql://root@127.0.0.1:3306/aaa"
engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
Connection = sessionmaker(bind=engine)
# 每次执行数据库操作时,都需要创建一个Connection
con = Connection()
# ############# 执行ORM操作 #############
obj1 = Users(name="lqz")
con.add(obj1)
# 提交事务
con.commit()
# 关闭session,其实是将连接放回连接池
con.close()
3.一对多关系
class Hobby(Base):
__tablename__ = 'hobby'
id = Column(Integer, primary_key=True)
caption = Column(String(50), default='篮球')
class Person(Base):
__tablename__ = 'person'
nid = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=True)
# hobby指的是tablename而不是类名,uselist=False
hobby_id = Column(Integer, ForeignKey("hobby.id")) # 和hobby表的id建立外键关系
# 跟数据库无关,不会新增字段,只用于快速链表操作
# 类名,backref 用于反向查询
hobby=relationship('Hobby',backref='pers') # backref 是设置的反向字段
4.多对多关系
class Boy2Girl(Base):
__tablename__ = 'boy2girl'
id = Column(Integer, primary_key=True, autoincrement=True)
girl_id = Column(Integer, ForeignKey('girl.id'))
boy_id = Column(Integer, ForeignKey('boy.id'))
class Girl(Base):
__tablename__ = 'girl'
id = Column(Integer, primary_key=True)
name = Column(String(64), unique=True, nullable=False)
class Boy(Base):
__tablename__ = 'boy'
id = Column(Integer, primary_key=True, autoincrement=True)
hostname = Column(String(64), unique=True, nullable=False)
# 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以
servers = relationship('Girl', secondary='boy2girl', backref='boys')
5.操作数据库
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users
engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
# 每次执行数据库操作时,都需要创建一个session
session = Session()
# ############# 执行ORM操作 #############
obj1 = Users(name="lqz")
session.add(obj1)
# 提交事务
session.commit()
# 关闭session
session.close()
6.基于scoped_session实现线程安全
保证线程的安全性,在使用原生SQLAchemy的时候需要注意,使用flask_sqlalchemy的时候不需要
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Users
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
"""
# 线程安全,基于本地线程实现每个线程用同一个session
# 特殊的:scoped_session中有原来方法的Session中的一下方法:
public_methods = (
'__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested',
'close', 'commit', 'connection', 'delete', 'execute', 'expire',
'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind',
'is_modified', 'bulk_save_objects', 'bulk_insert_mappings',
'bulk_update_mappings',
'merge', 'query', 'refresh', 'rollback',
'scalar'
)
"""
#scoped_session类并没有继承Session,但是却又它的所有方法
session = scoped_session(Session)
# ############# 执行ORM操作 #############
obj1 = Users(name="alex1")
session.add(obj1)
# 提交事务
session.commit()
# 关闭session
session.close()
7.基本的增删改查
import time
import threading
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from db import Users, Hosts
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# ################ 添加 ################
# 单增
obj1 = Users(name="wupeiqi")
session.add(obj1)
session.commit() # 提交一定要写commie()
# 群增
session.add_all([
Users(name="lqz"),
Users(name="egon"),
Hosts(name="c1.com"),
])
session.commit()
# ################ 删除 ################
session.query(Users).filter(Users.id > 2).delete() # 不写条件的时候全部删除
session.commit()
# ################ 修改 ################
#传字典
session.query(Users).filter(Users.id > 0).update({"name" : "lqz"})
#类似于django的F查询
session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False) # 添加字符串必须加上 synchronize_session=False
session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")
session.commit()
# ################ 查询 ################
r1 = session.query(Users,Auther).all() # 可以查多个表,不加 .all()就是原生SQL
#只取age列,把name重命名为xx,不改变数据库中name的值,改变返回到前端的name的值
r2 = session.query(Users.name.label('xx'), Users.age).all()
#filter传的是表达式,filter_by传的是参数
r3 = session.query(Users).filter(Users.name == "lqz").all()
r4 = session.query(Users).filter_by(name='lqz').all()
r5 = session.query(Users).filter_by(name='lqz').first()
#:value 和:name 相当于占位符,用params传参数
r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all()
#自定义查询sql
r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()
*****************************总结*****************************
# 增,删,改都要 session.commit()
# 最后使用close进行关闭 session.close()
# 如果需要打印的对象是按名的形式展示,使用的是__rper___
8.常用操作
# 条件
ret = session.query(Users).filter_by(name='lqz').all()
#表达式,and条件连接
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
#注意下划线 in_ 包裹的是属于的条件
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
#~非,除。。外
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
#二次筛选
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
#or_包裹的都是or条件,and_包裹的都是and条件
from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(
or_(
Users.id < 2,
and_(Users.name == 'eric', Users.id > 3),
Users.extra != ""
)).all()
# 通配符,以e开头,不以e开头 like
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all()
# 限制,用于分页,区间
ret = session.query(Users)[1:2]
# 排序,根据name降序排列(从大到小) order_by
ret = session.query(Users).order_by(Users.name.desc()).all()
#第一个条件重复后,再按第二个条件升序排 order_by
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
# 分组 group_by
from sqlalchemy.sql import func
ret = session.query(Users).group_by(Users.extra).all()
#分组之后取最大id,id之和,最小id group_by
from sqlalchemy.sql import func
ret = session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).all()
#haviing 筛选
from sqlalchemy.sql import func
ret = session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()
# 连表(默认用forinkey关联)
ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()
#join表,默认是inner join
ret = session.query(Person).join(Favor, Person.name == Favor.name).all()
#isouter=True 外连,表示Person left join Favor,没有right join,反过来可实现 right join
ret = session.query(Person).join(Favor, isouter=True).all()
#打印原生sql 没有加 .all() 的时候打印的是原生sql
aa=session.query(Person).join(Favor, isouter=True)
# 自己指定on条件(连表条件),第二个参数,支持on多个条件,用and_,同上
ret = session.query(Person).join(Favor,Person.id==Favor.id, isouter=True).all()
# 组合(了解)UNION 操作符用于合并两个或多个 SELECT 语句的结果集
#union和union all的区别?
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()
9.执行原生sql
import time
import threading
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# 查询
# cursor = session.execute('select * from users')
# result = cursor.fetchall()
# 添加
cursor = session.execute('insert into users(name) values(:value)',params={"value":'lqz'})
session.commit()
print(cursor.lastrowid)
session.close()
10.一对多
import time
import threading
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
*************************** 添加 ***************************
# 两个表一起添加
session.add_all([
Hobby(caption='乒乓球'),
Hobby(caption='羽毛球'),
Person(name='张三', hobby_id=3),
Person(name='李四', hobby_id=4),
])
# 添加一 正向插入数据
person = Person(name='张九', hobby=Hobby(caption='姑娘'))
session.add(person)
session.commit()
# 添加二 反向插入数据,使用设置的反向字段
hb = Hobby(caption='人妖')
hb.pers = [Person(name='文飞'), Person(name='博雅')]
session.add(hb)
session.commit()
*************************** 查询 ***************************
# 使用relationship正向查询
v = session.query(Person).first()
print(v.name)
print(v.hobby.caption)
# 使用relationship反向查询,使用关联的反向字段操作
v = session.query(Hobby).first()
print(v.caption)
print(v.pers)
#方式一,自己链表 isouter=True 不加是inner join查询,加上是left join查询,没有right查询
# person_list=session.query(models.Person.name,models.Hobby.caption).join(models.Hobby,isouter=True).all()
person_list=session.query(models.Person,models.Hobby).join(models.Hobby, isouter=True).all()
for row in person_list:
# print(row.name,row.caption)
print(row[0].name,row[1].caption)
#方式二:通过relationship
person_list=session.query(models.Person).all()
for row in person_list:
print(row.name,row.hobby.caption)
#查询喜欢姑娘的所有人
obj=session.query(models.Hobby).filter(models.Hobby.id==1).first()
persons=obj.pers
print(persons)
session.close()
# 如果没有外键
ret = session.query(Person).join(Hobby, Person.nid=Hobby.id, isouter=True).all()
11.多对多
import time
import threading
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
************************ 添加 ************************
# 方式一 麻烦
session.add_all([
Server(hostname='c1.com'),
Server(hostname='c2.com'),
Group(name='A组'),
Group(name='B组'),
])
session.commit()
s2g = Server2Group(server_id=1, group_id=1)
session.add(s2g)
session.commit()
# 方法二 正向添加
gp = Group(name='C组')
gp.servers = [Server(hostname='c3.com'),Server(hostname='c4.com')]
session.add(gp)
session.commit()
# 方法三 反向添加
ser = Server(hostname='c6.com')
ser.groups = [Group(name='F组'),Group(name='G组')]
session.add(ser)
session.commit()
************************ 查询 ************************
# 使用relationship正向查询
v = session.query(Group).first()
print(v.name)
print(v.servers)
# 使用relationship反向查询 直接使用反向字段操作
v = session.query(Server).first()
print(v.hostname)
print(v.groups)
session.close()
12.其他
import time
import threading
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text, func
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# 关联子查询:correlate(Group)表示跟Group表做关联,as_scalar相当于对该sql加括号,用于放在后面当子查询
subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()
result = session.query(Group.name, subqry)
"""
SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid
FROM server
WHERE server.id = `group`.id) AS anon_1
FROM `group`
"""
'''
select * from tb where id in [select id from xxx];
select id,
name,
#必须保证此次查询只有一个值
(select max(id) from xxx) as mid
from tb
例如,第三个字段只能有一个值
id name mid
1 lqz 1,2 不合理
2 egon 2
'''
'''
成绩表:
id sid cid score
1 1 物理 99
2 1 化学 88
3 2 物理 95
学生表:
id name 每个学生总分数
1 xx 88
2 yy 77
select id,name,
(select avr(score) from 成绩表 where 成绩表.sid=学生表.id) as x
from 学生表
subqry = session.query(func.count(成绩表.scort).label("sc")).filter(学生表.id == 成绩表.sid).correlate(学生表).as_scalar()
result = session.query(学生表.name, subqry)
'''
# 原生SQL
"""
# 查询
cursor = session.execute('select * from users')
result = cursor.fetchall()
# 添加
cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
session.commit()
print(cursor.lastrowid)
"""
session.close()
13.Flask-SQLAlchemy
flask和SQLAlchemy的管理者,通过他把他们做连接
db = SQLAlchemy()
- 包含配置
- 包含ORM基类
- 包含create_all
- engine
- 创建连接
离线脚本,创建表
# 存放models模型
from flask import Flask
from flask_script import Manager
from flask_migrate import MigrateCommand,Migrate
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
db = SQLAlchemy() # 必须
db.init_app(app) # 必须
manage = Manager(app) # 必须
Migrate(app,db) # 必须
manage.add_command('db',MigrateCommand) # 必须
# 添加迁移脚本的命令到manage中
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql+pymysql://root:123@127.0.0.1:3306/flask?charset=utf8"
# 支持改表
class User(db.Model):
__tablename__ = 'user'
id = db.Column(db.Integer,primary_key=True,autoincrement=True)
user = db.Column(db.String(32),unique=True,nullable=True)
def __repr__(self):
return '<User%r>'%self.username
if __name__ == '__main__':
manage.run()
详见代码
flask-migrate
python3 manage.py db init 初始化:只执行一次
python3 manage.py db migrate 等同于makemigrations
python3 manage.py db init upgrade 等同于migrate
# 初始化之后再需要添加表的时候,只需要执行后两部即可
单表操作
******************* 增 ***********************
db.session.add(User(username='zhang'))
db.session.commit()
******************* 查 ***********************
obj = db.session.query(User).filter(User.username=='wang').first()
# 使用方法和SQLAlchemy方法是一样的,只需要将Session换成db.session就可以了
项目中使用案例
from flask_sqlalchemy import BaseQuery
class ProductNetValueQuery(BaseQuery):
def get_all_products(self):
return self.with_entities(ProductNetValue.id, ProductNetValue.name).group_by(
ProductNetValue.name).all()
def get__(self, trading_at, p):
return self.filter(ProductNetValue.trading_at == trading_at).filter(
ProductNetValue.name == p).first()
class ProductNetValue(CRUDModel):
__tablename__ = "product_net_values"
query_class = ProductNetValueQuery
使用的时候可以直接使用 res = ProductNetValue.query.get_all_products() 操作
flask-sqlalchemy 字段
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String(64), index=True)
acc_net_value = db.Column(db.Float)
cnt = db.Column(db.Integer)
purchase = db.Column(db.DECIMAL(19, 2), name="Purchase")
comment = db.Column(db.Text, name="Comment")
is_last_update = db.Column(db.Boolean, name="IsLastUpdate", default=Fund, onupdate=True)
update_time = db.Column(db.DateTime, name="UpdateTime", default=datetime.datetime.now, onupdate=datetime.datetime.now) # onupdate 记录最后一次更新的时间
外键约束字段
选项名 说明
backref 在关系的另一个模型中添加反向引用
primaryjoin 明确指定两个模型之间使用的联结条件。只在模棱两可的关系中需要指定.
lazy 指定如何加载相关记录。可选值如下 :
select(首次访问时按需加载)
immediate(源对象加载后就加载)
joined(加载记录,但使用联结)
subquery(立即加载,但使用子查询)
noload(永不加载)
dynamic(不加载记录,但提供加载记录的查询)
uselist 如果设为 Fales,不使用列表,而使用标量值
order_by 指定关系中记录的排序方式
secondary 指定多对多关系中关系表的名字
secondaryjoin SQLAlchemy 无法自行决定时,指定多对多关系中的二级联结条件
14.项目依赖包处理
#### 导出项目环境
```
1)进入本地项目根目录
>: cd 项目根目录
2)本地导出项目环境
pip3 freeze > packages.txt
>:
3)如果环境中有特殊的安装包,需要处理一下xadmin
packages.txt中的
xadmin==2.0.1
要被替换为
https://codeload.github.com/sshwsfc/xadmin/zip/django2
```
#### 项目虚拟环境
```
1)创建线上luffy项目虚拟环境
>: mkvirtualenv luffy
>: workon luffy
2)安装所需环境,在packages.txt所在目录下安装执行packages.txt文件
>: pip install uwsgi
>: pip install -r /home/project/luffyapi/packages.txt
```
#### 项目提交到远程git仓库
```
1)去向本地项目仓库
>: cd 项目根目录
2)本地版本库操作
>: git status
>: git add .
>: git commit -m '项目2.0上线'
3)提交到远程版本库
>: git pull origin master
>: git push origin master
```
15.虚拟环境
### 安装虚拟环境
```
1)安装依赖
>: pip3 install virtualenv
>: pip3 install virtualenvwrapper
2)建立虚拟环境软连接
>: ln -s /usr/local/python3/bin/virtualenv /usr/bin/virtualenv
3)配置虚拟环境:填入下方内容
>: vim ~/.bash_profile
VIRTUALENVWRAPPER_PYTHON=/usr/bin/python3
source /usr/local/python3/bin/virtualenvwrapper.sh
4)退出编辑状态
>: esc
5)保存修改并退出
>: :wq
6)更新配置文件内容
>: source ~/.bash_profile
7)虚拟环境默认根目录:~/.virtualenvs
```
24.flask前后端分离项目总结
前端使用Vue,后端使用flask
第一种情况:前端发送get或post请求
如果前端直接发送get或者使用form表单提交post请求,
get请求在flask中使用request.args 获取发送过来的请求
post请求在flask中直接使用request.form.get()来获取发送过来的信息
第二种情况:前端使用axios发送post请求
当前端使用axios发送请求的时候,后端接收到的请求是json格式的,而且数据是在request.data中,不在request.form中,这点要注意
后端接收的时候使用:request.data接收,再使用json做一个转化
data = json.loads(request.data)
username = data.get('username')
password = data.get('password')
re_password = data.get('re_password')
25.pipenv 虚拟环境
pipenv --where 列出本地工程路径
pipenv --venv 列出虚拟环境路径
pipenv --py 列出虚拟环境的Python可执行文件
pipenv install 创建虚拟环境
pipenv isntall [moduel] 安装包
pipenv install [moduel] --dev 安装包到开发环境
pipenv uninstall[module] 卸载包
pipenv uninstall --all 卸载所有包
pipenv graph 查看包依赖
pipenv lock 生成lockfile
pipenv run python [pyfile] 运行py文件
pipenv --rm 删除虚拟环境
pipenv run python xxx.py 运行python代码
pipenv lock -r --dev > requirements.txt pipenv生成txt命令
pipenv install -r requirements.txt 安装txt包
pipenv shell 启动虚拟环境
pipenv run flask run --port=5080
yarn run serve