初识flask
介绍
Flask诞生于2010年,是Armin ronacher(人名)用 Python 语言基于 Werkzeug(wo ke zou ge) 工具箱编写的轻量级Web开发框架。
Flask 本身相当于一个内核,其他几乎所有的功能都要用到扩展(邮件扩展Flask-Mail,用户认证Flask-Login,数据库Flask-SQLAlchemy),都需要用第三方的扩展来实现。比如可以用 Flask 扩展加入ORM、窗体验证工具,文件上传、身份验证等。Flask 没有默认使用的数据库,你可以选择 MySQL,也可以用 NoSQL。
其 WSGI 工具箱采用 Werkzeug(路由模块),模板引擎则使用 Jinja2。这两个也是 Flask 框架的核心。
官网: http://flask.pocoo.org/
官方文档: http://docs.jinkan.org/docs/flask/
Flask常用的扩展包:
Flask-SQLalchemy:操作数据库,ORM;
Flask-script:终端脚本工具,脚手架;
Flask-migrate:管理迁移数据库;
Flask-Session:Session存储方式指定;
Flask-WTF:表单;
Flask-Mail:邮件;
Flask-Bable:提供国际化和本地化支持,翻译;
Flask-Login:认证用户状态;
Flask-OpenID:认证, OAuth;
Flask-RESTful:开发REST API的工具;
Flask JSON-RPC: 开发rpc远程服务[过程]调用
Flask-Bootstrap:集成前端Twitter Bootstrap框架
Flask-Moment:本地化日期和时间
Flask-Admin:简单而可扩展的管理接口的框架
创建虚拟环境
#安装依赖
pip3 install virtualenv
pip3 install virtualenvwrapper
#建立软连接(相当于windows快捷方式)
ln -s /usr/local/python3/bin/virtualenv /usr/bin/virtualenv
#配置虚拟环境,填入下面内容,填入内容后按esc,:wq保存并退出,
vim ~/.bash_profile
填入内容,看图1
VIRTUALENVWRAPPER_PYTHON=/usr/local/python3/bin/python3
source /usr/local/python3/bin/virtualenvwrapper.sh
#查看下修改好的neir
cat ~/.bash_profile
#更新配置文件内容
source ~/.bash_profile
#虚拟环境默认的根目录
cd ~/.virtualenvs
#创建虚拟环境
1 创建虚拟环境到配置的WORKON_HOME路径下
mkvirtualenv -p python3 自定义的虚拟环境名称(比如:luffy) 使用的是python3
mkvirtualenv -p python2 自定义的虚拟环境名称(比如:luffy) 使用的是python2
2 查看已有的虚拟环境
workon
3 使用虚拟环境
workon 已有的虚拟环境名称(比如:luffy)
4 进入、退出该虚拟环境的Python环境
python、exit()
5 为虚拟环境安装模块
pip install 模块名称(比如:django==2.2)
6 退出当前虚拟环境
deactivate
7 删除虚拟环境(删除当前虚拟环境要先退出)
rmvirtualenv 自定义的虚拟环境名称(比如:luffy)
快速入门
#安装flask
workon flask
pip install flask
#pycharm创建flask项目,flask_first,看下图步骤
#app.py
from flask import Flask, request
app = Flask(__name__)
@app.route('/')
def hello_world():
print(request.path) # /
return 'Hello World!'
@app.route('/hello')
def hello():
print(request.path) # /hello
return 'hello'
if __name__ == '__main__':
app.run()
# 请求来了,会执行 app(request),会触发谁?触发__call__方法
图1
图2
登录,显示用户信息
#登录成功之后可以访问index页面
#App2.py
from flask import Flask, render_template, request, redirect, session, url_for
app = Flask(__name__)
app.debug = True
app.secret_key = 'sdfsdfsdfsdf'
USERS = {
1: {'name': '张三', 'age': 18, 'gender': '男', 'text': "道路千万条"},
2: {'name': '李四', 'age': 28, 'gender': '男', 'text': "安全第一条"},
3: {'name': '王五', 'age': 18, 'gender': '女', 'text': "行车不规范"},
}
@app.route('/detail/<int:nid>', methods=['GET'])
def detail(nid):
user = session.get('user_info')
if not user:
return redirect('/login')
info = USERS.get(nid)
return render_template('detail.html', info=info)
@app.route('/index', methods=['GET'])
def index():
user = session.get('user_info')
if not user:
# return redirect('/login')
url = url_for('l1')
return redirect(url)
return render_template('index.html', user_dict=USERS)
@app.route('/login', methods=['GET', 'POST'], endpoint='l1')
def login():
if request.method == "GET":
return render_template('login.html')
else:
# request.query_string
user = request.form.get('user')
pwd = request.form.get('pwd')
if user == 'joab' and pwd == '123':
session['user_info'] = user
return redirect('http://www.baidu.com')
return render_template('login.html', error='用户名或密码错误')
if __name__ == '__main__':
app.run()
#login.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>用户登录</h1>
<form method="post">
<input type="text" name="user">
<input type="text" name="pwd">
<input type="submit" value="登录">{{error}}
</form>
</body>
</html>
#index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>用户列表</h1>
<table>
{% for k,v in user_dict.items() %}
<tr>
<td>{{k}}</td>
<td>{{v.name}}</td>
<td>{{v['name']}}</td>
<td>{{v.get('name')}}</td>
<td><a href="/detail/{{k}}">查看详细</a></td>
</tr>
{% endfor %}
</table>
</body>
</html>
#templatesdetail.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>详细信息 {{info.name}}</h1>
<div>
{{info.text}}
</div>
</body>
</html>
总结
1 三板斧
return 字符串 #直接返回字符串
return render_template('index.html') #返回html页面
return redirect('/login') #重定向
2 路由写法,(路径,支持的请求方式,别名)别名不写默认是函数名 #@app.route('/login',methods=['GET','POST'],endpoint='l1')
3 模板语言渲染
-同django的dtl,但是比dtl强大,支持加括号执行,字典支持中括号取值和get取值
如:
{% for k,v in user_dict.items() %}
<tr>
<td>{{k}}</td>
<td>{{v.name}}</td>
<td>{{v['name']}}</td>
<td>{{v.get('name')}}</td>
<td><a href="/detail/{{k}}">查看详细</a></td>
</tr>
{% endfor %}
4 分组(就是django中的有名分组)
@app.route('/detail/<int:nid>',methods=['GET'])
def detail(nid):
5 反向解析
-url_for('别名'),设置别名,重定向到该url
列子:
@app.route('/index', methods=['GET'])
def index():
user = session.get('user_info')
if not user:
# return redirect('/login')
url = url_for('l1')
return redirect(url)
return render_template('index.html', user_dict=USERS)
6 获取前端传递过来的数据
# get 请求
request.query_string
# post请求
user = request.form.get('user')
pwd = request.form.get('pwd')
配置文件
#方式1
app.config['DEBUG'] = True
PS: 由于Config对象本质上是字典,所以还可以使用 app.config.update(...)
#方式2
#通过py文件配置
app.config.from_pyfile("python文件名称")
如:
settings.py
DEBUG = True
#方式3
app.config.from_object('settings.TestingConfig')
class Config(object):
DEBUG = False
TESTING = False
DATABASE_URI = 'sqlite://:memory:'
class ProductionConfig(Config):
DATABASE_URI = 'mysql://user@localhost/foo'
class DevelopmentConfig(Config):
DEBUG = True
class TestingConfig(Config):
TESTING = True
路由系统
#基本使用
@app.route('/detail/<int:nid>',methods=['GET'],endpoint='detail')
#转换器
DEFAULT_CONVERTERS = {
'default': UnicodeConverter,
'string': UnicodeConverter,
'any': AnyConverter,
'path': PathConverter,
'int': IntegerConverter,
'float': FloatConverter,
'uuid': UUIDConverter,
}
#路由本质
1 本质就是:app.add_url_rule()
2 endpoint:如果不写默认是函数名,endpoint不能重名
#@app.route和app.add_url_rule参数:
rule, URL规则
view_func, 视图函数名称
defaults = None, 默认值, 当URL中无参数,函数需要参数时,使用defaults = {'k': 'v'},为函数提供参数
endpoint = None, 名称,用于反向生成URL,即: url_for('名称')
methods = None, 允许的请求方式,如:["GET", "POST"]
#对URL最后的 / 符号是否严格要求,默认严格,False,就是不严格
@app.route('/index', strict_slashes=False)
#访问http://www.xx.com/index/ 或http://www.xx.com/index均可
@app.route('/index', strict_slashes=True)
#仅访问http://www.xx.com/index
#重定向到指定地址
@app.route('/index/<int:nid>', redirect_to='/home/<nid>')
CBV(前后端分离开发)
from flask import Flask,request,render_template,redirect
from flask import views
app=Flask(__name__)
class IndexView(views.View):
methods = ['GET']
decorators = [auth, ]
def dispatch_request(self):
print('Index')
return 'Index!'
def auth(func):
def inner(*args, **kwargs):
print('before')
result = func(*args, **kwargs)
print('after')
return result
return inner
class IndexView(views.MethodView):
methods = ['GET'] # 指定请求方式
# 登录认证装饰器加在哪?
decorators = [auth, ] #加多个就是从上往下的效果
def get(self):
return "我是get请求"
def post(self):
return '我是post请求'
# 注册路由
app.add_url_rule('/index',view_func=IndexView.as_view('index')) # IndexView.as_view('index'),必须传name
if __name__ == '__main__':
app.run()
# views.View用的比较少
# 继承views.MethodView,只需要写get,post,delete方法
# 如果加装饰器decorators = [auth, ],加多个就是从上往下的效果
# 允许的请求方法methods = ['GET']
HTTP请求
配置文件
# SECRET_KEY:如果使用session,必须配置
# SESSION_COOKIE_NAME:cookie名字
# 数据库地址,端口号,也要放到配置文件中,但是不是内置的参数
# flask内置session如何实现?
-通过SECRET_KEY加密以后,当做cookie返回给浏览器
-下次发送请求,携带cookie过来,反解,再放到session中
flask-resful跟drf挺像的,可以看看
路由支持正则
#写类,继承BaseConverter
#注册:app.url_map.converters['regex'] = RegexConverter
#使用:@app.route('/index/<regex("d+"):nid>') 正则表达式会当作第二个参数传递到类中
from flask import Flask, views, url_for
from werkzeug.routing import BaseConverter
app = Flask(import_name=__name__)
class RegexConverter(BaseConverter):
"""
自定义URL匹配正则表达式
"""
def __init__(self, map, regex):
super(RegexConverter, self).__init__(map)
self.regex = regex
def to_python(self, value):
"""
路由匹配时,匹配成功后传递给视图函数中参数的值
"""
return int(value)
def to_url(self, value):
"""
使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数
"""
val = super(RegexConverter, self).to_url(value)
return val
# 添加到flask中
app.url_map.converters['regex'] = RegexConverter
@app.route('/index/<regex("d+"):nid>')
def index(nid):
print(url_for('index', nid='888'))
return 'Index'
if __name__ == '__main__':
app.run()
模板
# 同django的dtl,但是比dtl强大,支持加括号执行,字典支持中括号取值和get取值
如:
{% for k,v in user_dict.items() %}
<tr>
<td>{{k}}</td>
<td>{{v.name}}</td>
<td>{{v['name']}}</td>
<td>{{v.get('name')}}</td>
<td><a href="/detail/{{k}}">查看详细</a></td>
</tr>
{% endfor %}
# 模板有没有处理xss攻击,在页面显示标签,内部怎么实现的?
-1 模板层 要渲染的字符串|safe # 前端使用 要渲染的字符串|safe 渲染出input框
-2 后端:Markup('<input type="text">') #使用Markup()前端可以渲染出input框
# Markup等价django的mark_safe ,
# extends,include一模一样
视图函数和视图类
request对象的属性和方法
request.method #提交的方法
request.args #get请求提及的数据
request.form #post请求提交的数据
request.values #post和get提交的数据总和
request.cookies #客户端所带的cookie
request.headers #请求头
request.path #不带域名,请求路径
request.full_path #不带域名,带参数的请求路径
request.url #带域名带参数的请求路径
request.base_url #带域名请求路径
request.url_root #域名
request.host_url #域名
request.host #127.0.0.1:5000
request.files #文件
响应对象的方法
return "字符串"
return render_template('html模板路径',**{})
return redirect('/index.html')
return jsonify({'k1':'v1'}) #对着django,JsonResponse
#添加/更新cookie
aa='hello world'
res=make_response(aa)
res.set_cookie('xxx','lqz')
# 往响应头中放东西
res.headers['X-Something'] = 'A value'
print(type(res))
from flask.wrappers import Response
return res
response = make_response(render_template('index.html')) #response是flask.wrappers.Response类型
response.delete_cookie('key') #删除cookie
response.set_cookie('key', 'value') #添加/更新cookie
response.headers['X-Something'] = 'A value' # 往响应头中放东西
return response #记得返回响应对象
return 'hello' #返回字符串
补充知识
varchr :65535个字节的数据
utf8:中文占2个字节,varchar(300)
utf8mb4:中文占3个字节,varchar(300)
# 快速导出requestment.txt
pip3 install pipreqs
# pipreqs ./ --encoding=utf-8
闪现
#应用场景,把前端上个页面报错信息,下个页面get_flashed_message()展示出来
-设置:flash('aaa')
-取值:get_flashed_message()
-设置:flash('lqz',category='error1')
-取值:res=get_flashed_messages(category_filter=['error1'])
-假设在a页面操作出错,跳转到b页面,在b页面显示a页面的错误信息
session的使用
# 全局导入
# 视图函数中 session['key']=value
# 删除:session.pop('key')
# 取:session['key']
# open_session
# save_session
请求扩展(像django中间件)
#类似于django的中间件,请求来了,请求走了,什么操作
#请求来了就会触发,类似于django的process_request,如果有多个,顺序是从上往下
@app.before_request
def before(*args,**kwargs):
if request.path=='/login':
return None
else:
name=session.get('user')
if not name:
return redirect('/login')
else:
return None
#请求走了就会触发,类似于django的process_response,如果有多个,顺序是从下往上执行
@app.after_request
def after(response):
print('我走了')
return response
#3 before_first_request 项目启动起来第一次会走,以后都不会走了,也可以配多个(项目启动初始化的一些操作)
@app.before_first_request
def first():
print('我的第一次')
# 4 每次视图函数执行完了都会走它,# 用来记录出错日志
@app.teardown_request # 用来记录出错日志
def ter(e):
print(e)
print('我是teardown_request ')
# 5 errorhandler绑定错误的状态码,只要码匹配,就走它
@app.errorhandler(404)
def error_404(arg):
return render_template('error.html',message='404错误')
# 6 全局标签
@app.template_global()
def sb(a1, a2):
return a1 + a2
在模板中使用:{{ sb(3,4) }}
# 7 全局过滤器
@app.template_filter()
def db(a1, a2, a3):
return a1 + a2 + a3
在模板中使用:{{ 1|db(2,3)}}
1 重点掌握before_request和after_request
2 注意有多个的情况,执行顺序,请求来的时候,顺序是从上往下,走的时候是顺序是从下往上
3 before_request请求拦截后(也就是有return值),response所有都执行
中间件(跟django中间件完全一样)
from flask import Flask
app = Flask(__name__)
@app.route('/')
def index():
return 'Hello World!'
# 模拟中间件
class Md(object):
def __init__(self,old_wsgi_app):
self.old_wsgi_app = old_wsgi_app
def __call__(self, environ, start_response):
print('开始之前')
ret = self.old_wsgi_app(environ, start_response)
print('结束之后')
return ret
if __name__ == '__main__':
#1我们发现当执行app.run方法的时候,最终执行run_simple,最后执行app(),也就是在执行app.__call__方法
#2 在__call__里面,执行的是self.wsgi_app().那我们希望在执行他本身的wsgi之前做点事情。
#3 所以我们先用Md类中__init__,保存之前的wsgi,然后我们用将app.wsgi转化成Md的对象。
#4 那执行新的的app.wsgi_app,就是执行Md的__call__方法。
#把原来的wsgi_app替换为自定义的,
app.wsgi_app = Md(app.wsgi_app)
app.run()
猴子补丁
'''什么是猴子补丁?
只是一个概念,不属于任何包和模块
利用了python一切皆对象的理念,在程序运行过程中,动态修改方法
概念'''
class Monkey():
def play(self):
print('猴子在玩')
class Dog():
def play(self):
print('狗子在玩')
m=Monkey()
# m.play()
m.play=Dog().play
m.play()
'''有什么用?
这里有一个比较实用的例子,
很多用到import json,
后来发现ujson性能更高,
如果觉得把每个文件的import json改成import ujson as json成本较高,
或者说想测试一下ujson替换是否符合预期, 只需要在入口加上:
只需要在程序入口'''
import json
import ujson
def monkey_patch_json():
json.__name__ = 'ujson'
json.dumps = ujson.dumps
json.loads = ujson.loads
monkey_patch_json()
aa=json.dumps({'name':'lqz','age':19})
print(aa)
#协程:单线程下实现并发
from gevent import monkey;monkey.patch_all()
import gevent
import time
def eat():
print('eat food 1')
time.sleep(2)
print('eat food 2')
def play():
print('play 1')
time.sleep(1)
print('play 2')
g1=gevent.spawn(eat)
g2=gevent.spawn(play)
gevent.joinall([g1,g2])
print('主')
蓝图
1 没有蓝图之前前,都是单文件
2 有了蓝图可以分文件,分app,之前的请求扩展还是一样用,只是在当前蓝图对象管理下的有效
3 蓝图使用
#第一步在app中注册蓝图,括号里是一个蓝图对象
app.register_blueprint(user.us)
# 第二步,在不同文件中注册路由时,直接使用蓝图对象注册,不用使用app了,避免了循环导入的问题
@account.route('/login.html', methods=['GET', "POST"])
4 中小型项目目录划分
项目名字
-pro_flask文件夹
-__init__.py
-templates
-login.html
-statics
-code.png
-views
-blog.py
-account.py
-user.py
-run.py
5 大型项目
项目名
-pro_flask文件夹
-__init__.py
-web
-__init__.py
-static
-views.py
-templates
-admin
-templates
-static
-views.py
-__init__.py
-run.py
threading.local
###############1 不用local,多线程写同一个数据,会导致错乱
from threading import Thread
import time
lqz = -1
def task(arg):
global lqz
lqz = arg
time.sleep(2)
print(lqz)
for i in range(10):
t = Thread(target=task,args=(i,))
t.start()
################2 使用local对象,多线程写同一数据不会错乱,因为每个线程操作自己的数据
from threading import Thread
from threading import local
import time
from threading import get_ident
# 特殊的对象
lqz = local()
# {'线程id':{value:1},'线程id':{value:2}....}
def task(arg):
lqz.value = arg
time.sleep(2)
print(lqz.value)
for i in range(10):
t = Thread(target=task,args=(i,))
t.start()
#######3自己写一个类似local的东西,函数版本
from threading import get_ident,Thread
import time
storage = {}
#{'线程id':{value:1},'线程id':{value:2}....}
def set(k,v):
ident = get_ident()
if ident in storage:
storage[ident][k] = v
else:
storage[ident] = {k:v}
def get(k):
ident = get_ident()
return storage[ident][k]
def task(arg):
set('val',arg)
v = get('val')
print(v)
for i in range(10):
t = Thread(target=task,args=(i,))
t.start()
#######4自己写一个类似local的东西,面向对象版本
from threading import get_ident,Thread
import time
class Local(object):
storage = {}
def set(self, k, v):
ident = get_ident()
if ident in Local.storage:
Local.storage[ident][k] = v
else:
Local.storage[ident] = {k: v}
def get(self, k):
ident = get_ident()
return Local.storage[ident][k]
obj = Local()
def task(arg):
obj.set('val',arg)
time.sleep(1)
v = obj.get('val')
print(v)
for i in range(10):
t = Thread(target=task,args=(i,))
t.start()
# #######5自己写一个类似local的东西,面向对象支持 . 取值赋值
from threading import get_ident,Thread
import time
class Local(object):
storage = {}
def __setattr__(self, k, v):
ident = get_ident()
if ident in Local.storage:
Local.storage[ident][k] = v
else:
Local.storage[ident] = {k: v}
def __getattr__(self, k):
ident = get_ident()
return Local.storage[ident][k]
obj = Local()
def task(arg):
obj.val = arg
time.sleep(1)
print(obj.val)
for i in range(10):
t = Thread(target=task,args=(i,))
t.start()
#######6 每次实例化得到一个local对象,用自己的字典存储
from threading import get_ident,Thread
import time
class Local(object):
def __init__(self):
object.__setattr__(self,'storage',{})
# self.storage={} # 这种方式不能用
def __setattr__(self, k, v):
ident = get_ident()
if ident in self.storage:
self.storage[ident][k] = v
else:
self.storage[ident] = {k: v}
def __getattr__(self, k):
ident = get_ident()
return self.storage[ident][k]
obj = Local()
def task(arg):
obj.val = arg
time.sleep(1)
print(obj.val)
for i in range(10):
t = Thread(target=task,args=(i,))
t.start()
######7支持线程和协程
try:
from greenlet import getcurrent as get_ident
except Exception as e:
from threading import get_ident
from threading import Thread
import time
class Local(object):
def __init__(self):
object.__setattr__(self,'storage',{})
def __setattr__(self, k, v):
ident = get_ident()
if ident in self.storage:
# self.storage[ident][k] = v
# else:
# self.storage[ident] = {k: v}
def __getattr__(self, k):
ident = get_ident()
return self.storage[ident][k]
obj = Local()
def task(arg):
obj.val = arg
# obj.xxx = arg
print(obj.val)
for i in range(10):
t = Thread(target=task,args=(i,))
t.start()
请求上下文执行流程
请求上下文执行流程(ctx):
-0 flask项目一启动,有6个全局变量
-_request_ctx_stack:LocalStack对象
-_app_ctx_stack :LocalStack对象
-request : LocalProxy对象
-session : LocalProxy对象
-1 请求来了 app.__call__()---->内部执行:self.wsgi_app(environ, start_response)
-2 wsgi_app()
-2.1 执行:ctx = self.request_context(environ):返回一个RequestContext对象,并且封装了request(当次请求的request对象),session
-2.2 执行: ctx.push():RequestContext对象的push方法
-2.2.1 push方法中中间位置有:_request_ctx_stack.push(self),self是ctx对象
-2.2.2 去_request_ctx_stack对象的类中找push方法(LocalStack中找push方法)
-2.2.3 push方法源码:
def push(self, obj):
#通过反射找self._local,在init实例化的时候生成的:self._local = Local()
#Local()flask封装的支持线程和协程的local对象
# 一开始取不到stack,返回None
rv = getattr(self._local, "stack", None)
if rv is None:
#走到这,self._local.stack=[],rv=self._local.stack
self._local.stack = rv = []
# 把ctx放到了列表中
#self._local={'线程id1':{'stack':[ctx,]},'线程id2':{'stack':[ctx,]},'线程id3':{'stack':[ctx,]}}
rv.append(obj)
return rv
-3 如果在视图函数中使用request对象,比如:print(request)
-3.1 会调用request对象的__str__方法,request类是:LocalProxy
-3.2 LocalProxy中的__str__方法:lambda x: str(x._get_current_object())
-3.2.1 内部执行self._get_current_object()
-3.2.2 _get_current_object()方法的源码如下:
def _get_current_object(self):
if not hasattr(self.__local, "__release_local__"):
#self.__local() 在init的时候,实例化的,在init中:object.__setattr__(self, "_LocalProxy__local", local)
# 用了隐藏属性
#self.__local 实例化该类的时候传入的local(偏函数的内存地址:partial(_lookup_req_object, "request"))
#加括号返回,就会执行偏函数,也就是执行_lookup_req_object,不需要传参数了
#这个地方的返回值就是request对象(当此请求的request,没有乱)
return self.__local()
try:
return getattr(self.__local, self.__name__)
except AttributeError:
raise RuntimeError("no object bound to %s" % self.__name__)
-3.2.3 _lookup_req_object函数源码如下:
def _lookup_req_object(name):
#name是'request'字符串
#top方法是把第二步中放入的ctx取出来,因为都在一个线程内,当前取到的就是当次请求的ctx对象
top = _request_ctx_stack.top
if top is None:
raise RuntimeError(_request_ctx_err_msg)
#通过反射,去ctx中把request对象返回
return getattr(top, name)
-3.2.4 所以:print(request) 实质上是在打印当此请求的request对象的__str__
-4 如果在视图函数中使用request对象,比如:print(request.method):实质上是取到当次请求的reuquest对象的method属性
-5 最终,请求结束执行: ctx.auto_pop(error),把ctx移除掉
其他的东西:
-session:
-请求来了opensession
-ctx.push()---->也就是RequestContext类的push方法的最后的地方:
if self.session is None:
#self是ctx,ctx中有个app就是flask对象, self.app.session_interface也就是它:SecureCookieSessionInterface()
session_interface = self.app.session_interface
self.session = session_interface.open_session(self.app, self.request)
if self.session is None:
#经过上面还是None的话,生成了个空session
self.session = session_interface.make_null_session(self.app)
-请求走了savesession
-response = self.full_dispatch_request() 方法内部:执行了before_first_request,before_request,视图函数,after_request,savesession
-self.full_dispatch_request()---->执行:self.finalize_request(rv)-----》self.process_response(response)----》最后:self.session_interface.save_session(self, ctx.session, response)
-请求扩展相关
before_first_request,before_request,after_request依次执行
-flask有一个请求上下文,一个应用上下文
-ctx:
-是:RequestContext对象:封装了request和session
-调用了:_request_ctx_stack.push(self)就是把:ctx放到了那个位置
-app_ctx:
-是:AppContext(self) 对象:封装了当前的app和g
-调用 _app_ctx_stack.push(self) 就是把:app_ctx放到了那个位置
-g是个什么鬼?
专门用来存储用户信息的g对象,g的全称的为global
g对象在一次请求中的所有的代码的地方,都是可以使用的
-代理模式
-request和session就是代理对象,用的就是代理模式
G对象是什么
#1 g是一个全局变量,在当前请求中可以放值,取值
#2 session对象是可以跨request的,只要session还未失效,不同的request的请求会获取到同一个session,但是g对象不是,g对象不需要管过期时间,请求一次就g对象就改变了一次,或者重新赋值了一次
flask-session
# 1 替换flask内置的session,支持存到redis,存到数据库
#2 flask-session如何使用
方式一:
conn=redis.Redis(host='127.0.0.1',port=6379)
app.session_interface=RedisSessionInterface(conn,'lqz',permanent=False)
方式二:
# from redis import Redis
# from flask.ext.session import Session
# app.config['SESSION_TYPE'] = 'redis'
#
# app.config['SESSION_KEY_PREFIX'] = 'lqz'
#
# app.config['SESSION_REDIS'] = Redis(host='127.0.0.1',port='6379')
# # 本质跟上面一样
# # 类似的用法在flask中很常见 函数(app)
# Session(app)
# 问题1
-关闭浏览器cookie失效
app.session_interface=RedisSessionInterface(conn,key_prefix='lqz',permanent=False)
# 问题2
-cookie默认超时时间是多少?如何设置超时时间
'PERMANENT_SESSION_LIFETIME': timedelta(days=31),#这个配置文件控制
数据库连接池
#pip install DBUtils
import pymysql
from DBUtils.PooledDB import PooledDB
import time
from threading import Thread
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。
ping=0,
# ping MySQL服务端,检查是否服务可用。
host='127.0.0.1',
port=3306,
user='root',
password='123',
database='flask',
charset='utf8'
)
def func():
# 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
# 否则
# 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
# 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
# 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
# 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
conn = POOL.connection()
# print(th, '链接被拿走了', conn1._con)
# print(th, '池子里目前有', pool._idle_cache, '
')
cursor = conn.cursor()
cursor.execute('select * from boy')
result = cursor.fetchall()
time.sleep(2)
print(result)
conn.close()
if __name__ == '__main__':
for i in range(10):
t=Thread(target=func)
t.start()
import pymysql
from settings import Config
class SQLHelper(object):
@staticmethod
def open(cursor):
POOL = Config.PYMYSQL_POOL
conn = POOL.connection()
cursor = conn.cursor(cursor=cursor)
return conn,cursor
@staticmethod
def close(conn,cursor):
conn.commit()
cursor.close()
conn.close()
@classmethod
def fetch_one(cls,sql,args,cursor =pymysql.cursors.DictCursor):
conn,cursor = cls.open(cursor)
cursor.execute(sql, args)
obj = cursor.fetchone()
cls.close(conn,cursor)
return obj
@classmethod
def fetch_all(cls,sql, args,cursor =pymysql.cursors.DictCursor):
conn, cursor = cls.open(cursor)
cursor.execute(sql, args)
obj = cursor.fetchall()
cls.close(conn, cursor)
return obj
@classmethod
def execute(cls,sql, args,cursor =pymysql.cursors.DictCursor):
conn, cursor = cls.open(cursor)
cursor.execute(sql, args)
cls.close(conn, cursor)
wtforms(forms组件)
1 校验数据
2 渲染标签
详见代码
#pip3 install wtforms
from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets
app = Flask(__name__, template_folder='templates')
app.debug = True
class LoginForm(Form):
# 字段(内部包含正则表达式)
name = simple.StringField(
label='用户名',
validators=[
validators.DataRequired(message='用户名不能为空.'),
validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
],
widget=widgets.TextInput(), # 页面上显示的插件
render_kw={'class': 'form-control'}
)
# 字段(内部包含正则表达式)
pwd = simple.PasswordField(
label='密码',
validators=[
validators.DataRequired(message='密码不能为空.'),
validators.Length(min=8, message='用户名长度必须大于%(min)d'),
validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*d)(?=.*[$@$!%*?&])[A-Za-zd$@$!%*?&]{8,}",
message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'GET':
form = LoginForm()
return render_template('login.html', form=form)
else:
form = LoginForm(formdata=request.form)
if form.validate():
print('用户提交数据通过格式验证,提交的值为:', form.data)
else:
print(form.errors)
return render_template('login.html', form=form)
if __name__ == '__main__':
app.run()
#login.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>登录</h1>
<form method="post">
<p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>
<p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
<input type="submit" value="提交">
</form>
</body>
</html>
#使用2
from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets
app = Flask(__name__, template_folder='templates')
app.debug = True
class RegisterForm(Form):
name = simple.StringField(
label='用户名',
validators=[
validators.DataRequired()
],
widget=widgets.TextInput(),
render_kw={'class': 'form-control'},
default='alex'
)
pwd = simple.PasswordField(
label='密码',
validators=[
validators.DataRequired(message='密码不能为空.')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
pwd_confirm = simple.PasswordField(
label='重复密码',
validators=[
validators.DataRequired(message='重复密码不能为空.'),
validators.EqualTo('pwd', message="两次密码输入不一致")
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
email = html5.EmailField(
label='邮箱',
validators=[
validators.DataRequired(message='邮箱不能为空.'),
validators.Email(message='邮箱格式错误')
],
widget=widgets.TextInput(input_type='email'),
render_kw={'class': 'form-control'}
)
gender = core.RadioField(
label='性别',
choices=(
(1, '男'),
(2, '女'),
),
coerce=int # “1” “2”
)
city = core.SelectField(
label='城市',
choices=(
('bj', '北京'),
('sh', '上海'),
)
)
hobby = core.SelectMultipleField(
label='爱好',
choices=(
(1, '篮球'),
(2, '足球'),
),
coerce=int
)
favor = core.SelectMultipleField(
label='喜好',
choices=(
(1, '篮球'),
(2, '足球'),
),
widget=widgets.ListWidget(prefix_label=False),
option_widget=widgets.CheckboxInput(),
coerce=int,
default=[1, 2]
)
def __init__(self, *args, **kwargs):
super(RegisterForm, self).__init__(*args, **kwargs)
self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))
def validate_pwd_confirm(self, field):
"""
自定义pwd_confirm字段规则,例:与pwd字段是否一致
:param field:
:return:
"""
# 最开始初始化时,self.data中已经有所有的值
if field.data != self.data['pwd']:
# raise validators.ValidationError("密码不一致") # 继续后续验证
raise validators.StopValidation("密码不一致") # 不再继续后续验证
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'GET':
form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial
return render_template('register.html', form=form)
else:
form = RegisterForm(formdata=request.form)
if form.validate():
print('用户提交数据通过格式验证,提交的值为:', form.data)
else:
print(form.errors)
return render_template('register.html', form=form)
if __name__ == '__main__':
app.run()
#login.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>用户注册</h1>
<form method="post" novalidate style="padding:0 50px">
{% for field in form %}
<p>{{field.label}}: {{field}} {{field.errors[0] }}</p>
{% endfor %}
<input type="submit" value="提交">
</form>
</body>
</html>
信号
1 semaphore跟线程的它没有半毛钱关系
2 signal翻译过来的,flask的signal
3 信号是同步操作
4 如何使用(内置的)
######### 内置信号的使用
##第一步写一个函数(触发某些动作)
# 往信号中注册函数
def func(*args,**kwargs):
print(args[0]) # 当前app对象
print('触发信号',args,kwargs)
# 第二步:函数跟内置信号绑定
signals.request_started.connect(func)
5 自定义信号的使用
# 自定义信号
# #第一步:定义一个信号
# xxxxx = _signals.signal('xxxxx')
# # 第二步:定义一个函数
# def func3(*args,**kwargs):
# import time
# time.sleep(1)
# print('触发信号',args,kwargs)
# #第三步:信号跟函数绑定
# xxxxx.connect(func3)
#第四步:触发信号
xxxxx.send(1,k='2')
多app应用(了解)
### 多个app实例(启用)
from werkzeug.wsgi import DispatcherMiddleware
from werkzeug.serving import run_simple
from flask import Flask, current_app
app1 = Flask('app01')
app2 = Flask('app02')
@app1.route('/index')
def index():
return "app01"
@app2.route('/index2')
def index2():
return "app2"
# http://www.oldboyedu.com/index
# http://www.oldboyedu.com/sec/index2
dm = DispatcherMiddleware(app1, {
'/sec': app2,
})
if __name__ == "__main__":
run_simple('localhost', 5000, dm)
# 请求来了,会执行dm()--->__call__
flask-script(制定命令)
1 模拟出类似django的启动方式:python manage.py runserver
2 pip install flask-script
3 把excel的数据导入数据库,定制个命令,去执行(openpyxl)
python manage.py insertdb -f xxx.excl -t aa
4 使用
-方式一:python manage.py runserver
from flask import Flask
from flask_script import Manager
app = Flask(__name__)
manager=Manager(app)
if __name__ == '__main__':
manager.run()
-方式二:自定制命令
@manager.command
def custom(arg):
print(arg)
@manager.option('-n', '--name', dest='name')
@manager.option('-u', '--url', dest='url')
def cmd(name, url):
print(name, url)
5 创建超级用户
6 现在有一万条excel用户,批量导入到数据库中
-navicate直接支持
-脚本
-flask-script
sqlalchemy
概念
1 sqlachemy:第三方orm框架(对象关系映射)
-go 中gorm,xorm
-python中:django orm,sqlachemy,peewee
-老刘带你手写的:https://www.cnblogs.com/liuqingzheng/articles/9006025.html
2 django orm,只能在django中用,不能单独用
3 使用 pip install sqlachemy
4 SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件
5 补充:django orm反向生成models
-python manage.py inspectdb > app/models.py
基本使用(原生sql)
import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
# 第一步生成一个engine对象
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/flask?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 第二步:创建连接(执行原生sql)
conn = engine.raw_connection()
# 第三步:获取游标对象
cursor = conn.cursor()
# 第四步:具体操作
cursor.execute('select * from boy')
res=cursor.fetchall()
print(res)
# 比pymysql优势在,有数据库连接池
orm使用
# 创建一个个类(继承谁?字段怎么写)
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
# 字段和字段属性
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
# 制造了一个类,作为所有模型类的基类
Base = declarative_base()
class User(Base):
__tablename__ = 'users' # 数据库表名称(固定写法),如果不写,默认以类名小写作为表的名字
id = Column(Integer, primary_key=True) # id 主键
# mysql中主键自动建索引:聚簇索引
# 其他建建的索引叫:辅助索引
name = Column(String(32), index=True, nullable=False) # name列,索引,不可为空
# email = Column(String(32), unique=True) # 唯一
# #datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
# ctime = Column(DateTime, default=datetime.datetime.now) # default默认值
# extra = Column(Text, nullable=True)
#类似于djagno的 Meta
# __table_args__ = (
# UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一
# Index('ix_id_name', 'name', 'email'), #索引
# )
# 创建表
def create_table():
# 创建engine对象
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 通过engine对象创建表
Base.metadata.create_all(engine)
# 删除表
def drop_table():
# 创建engine对象
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 通过engine对象删除所有表
Base.metadata.drop_all(engine)
if __name__ == '__main__':
# create_table()
drop_table()
# 创建库?手动创建库
# 问题,sqlachemy支持修改字段吗?不支持
线程安全
#基于scoped_session实现线程安全
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import User # pycharm报错,不会影响我们
from sqlalchemy.orm import scoped_session
# 1 制作engine
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
# 2 制造一个 session 类(会话)
Session = sessionmaker(bind=engine) # 得到一个类
# 3 得到一个session对象(线程安全的session)
#现在的session已经不是session对象了
#为什么线程安全,还是用的local
session = scoped_session(Session)
# session=Session()
# 4 创建一个对象
obj1 = User(name="2008")
# 5 把对象通过add放入
session.add(obj1)
# session.aaa()
# 6 提交
session.commit()
session.close()
# 类不继承Session类,但是有该类的所有方法(通过反射,一个个放进去)
# scoped_session.add------->instrument(name)--->do函数内存地址---》现在假设我要这么用:session.add()--->do()
# scoped_session.close----->instrument(name)--->do函数内存地址
基本的增删查改
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import User,Person,Hobby
from sqlalchemy.orm import scoped_session
from sqlalchemy.sql import text
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
# session = scoped_session(Session)
session=Session()
####1 新增多个对象
# obj=User(name='xxx')
# obj2=User(name='yyyy')
# obj3=User(name='zzz')
#新增同样对象
# session.add_all([obj,obj2,obj3])
#新增不同对象
# session.add_all([Person(name='lqz'),Hobby()])
####2 简单删除(查到删除)
# res=session.query(User).filter_by(name='2008').delete()
# res=session.query(User).filter(User.id>=2).delete()
# # 影响1行
# print(res)
#### 3 修改
# res=session.query(User).filter_by(id=1).update({User.name:'ccc'})
# res=session.query(User).filter_by(id=1).update({'name':'ccc'})
# session.query(User).filter(User.id > 0).update({User.name: User.name + "099"}, synchronize_session=False) # 如果要把它转成字符串相加
# session.query(User).filter(User.id > 0).update({"age": User.age + 1}, synchronize_session="evaluate") ## 如果要把它转成数字相加
####4 基本查询操作
# res=session.query(User).all()
# print(type(res))
# res=session.query(User).first()
# print(res)
#filter传的是表达式,filter_by传的是参数
# res=session.query(User).filter(User.id==1).all()
# res=session.query(User).filter(User.id>=1).all()
# res=session.query(User).filter(User.id<1).all()
# res=session.query(User).filter_by(name='ccc099').all()
#了解
# res = session.query(User).from_statement(text("SELECT * FROM users where name=:name")).params(name='ccc099').all()
# print(res)
session.commit()
# 并没有真正关闭连接,而是放回池中
session.close()
高级操作
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import User,Person,Hobby
from sqlalchemy.sql import text
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session=Session()
# 1 查询名字为lqz的所有user对象
# ret = session.query(User).filter_by(name='ccc099').all()
# 2 表达式,and条件连接
# ret = session.query(User).filter(User.id > 1, User.name == 'egon').all()
# 查找id在1和10之间,并且name=egon的对象
# ret = session.query(User).filter(User.id.between(1, 10), User.name == 'egon').all()
# in条件(class_,因为这是关键字,不能直接用)
# ret = session.query(User).filter(User.id.in_([1,3,4])).all()
# 取反 ~
ret = session.query(User).filter(~User.id.in_([1,3,4])).all()
#二次筛选
# select *
# ret = session.query(User).filter(User.id.in_(session.query(User.id).filter_by(name='egon'))).all()
# # select name,id 。。。。
# ret = session.query(User.id,User.name).filter(User.id.in_(session.query(User.id).filter_by(name='egon'))).all()
'''
SELECT users.id AS users_id, users.name AS users_name
FROM users
WHERE users.id IN (SELECT users.id AS users_id
FROM users
WHERE users.name = %(name_1)s)
'''
#
from sqlalchemy import and_, or_
#or_包裹的都是or条件,and_包裹的都是and条件
#查询id>3并且name=egon的人
# ret = session.query(User).filter(and_(User.id > 3, User.name == 'egon')).all()
# 查询id大于2或者name=ccc099的数据
# ret = session.query(User).filter(or_(User.id > 2, User.name == 'ccc099')).all()
# ret = session.query(User).filter(
# or_(
# User.id < 2,
# and_(User.name == 'egon', User.id > 3),
# User.extra != ""
# )).all()
# print(ret)
'''
select *from user where id<2 or (name=egon and id >3) or extra !=''
'''
# 通配符,以e开头,不以e开头
# ret = session.query(User).filter(User.name.like('e%')).all()
# ret = session.query(User).filter(~User.name.like('e%')).all()
# 限制,用于分页,区间 limit
# 前闭后开区间,1能取到,3取不到
ret = session.query(User)[1:3]
'''
select * from users limit 1,2;
'''
# 排序,根据name降序排列(从大到小)
# ret = session.query(User).order_by(User.name.desc()).all()
# ret = session.query(User).order_by(User.name.asc()).all()
#第一个条件降序排序后,再按第二个条件升序排
# ret = session.query(User).order_by(User.id.asc(),User.name.desc()).all()
# ret = session.query(User).order_by(User.name.desc(),User.id.asc()).all()
# 分组
from sqlalchemy.sql import func
# ret = session.query(User).group_by(User.name).all()
#分组之后取最大id,id之和,最小id
# sql 分组之后,要查询的字段只能有分组字段和聚合函数
# ret = session.query(
# func.max(User.id),
# func.sum(User.id),
# func.min(User.id),
# User.name).group_by(User.name).all()
# '''
# select max(id),sum(id),min(id) from user group by name;
#
# '''
# for obj in ret:
# print(obj[0],'----',obj[1],'-----',obj[2],'-----',obj[3])
# print(ret)
#haviing筛选
# ret = session.query(
# func.max(User.id),
# func.sum(User.id),
# func.min(User.id)).group_by(User.name).having(func.min(User.id) >2).all()
'''
select max(id),sum(id),min(id) from user group by name having min(id)>2;
'''
print(ret)
session.commit()
session.close()
多表操作
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import User,Person,Hobby,Boy,Girl,Boy2Girl
from sqlalchemy.sql import text
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session=Session()
### 1 一对多插入数据
# obj=Hobby(caption='足球')
# session.add(obj)
# p=Person(name='张三',hobby_id=2)
# session.add(p)
### 2 方式二(默认情况传对象有问题)
###### Person表中要加 hobby = relationship('Hobby', backref='pers')
# p=Person(name='李四',hobby=Hobby(caption='美女'))
# 等同于
# p=Person(name='李四2')
# p.hobby=Hobby(caption='美女2')
# session.add(p)
## 3 方式三,通过反向操作
# hb = Hobby(caption='人妖')
# hb.pers = [Person(name='文飞'), Person(name='博雅')]
# session.add(hb)
#### 4 查询(查询:基于连表的查询,基于对象的跨表查询)
### 4.1 基于对象的跨表查询(子查询,两次查询)
# 正查
# p=session.query(Person).filter_by(name='张三').first()
# print(p)
# print(p.hobby.caption)
# 反查
# h=session.query(Hobby).filter_by(caption='人妖').first()
# print(h.pers)
### 4.2 基于连表的跨表查(查一次)
# 默认根据外键连表
# isouter=True 左外连,表示Person left join Hobby,没有右连接,反过来即可
# 不写 inner join
# person_list=session.query(Person,Hobby).join(Hobby,isouter=True).all()
# print(person_list)
# print(person_list)
# for row in person_list:
# print(row[0].name,row[1].caption)
# '''
# select * from person left join hobby on person.hobby_id=hobby.id
# '''
#
# ret = session.query(Person, Hobby).filter(Person.hobby_id == Hobby.id)
# print(ret)
# '''
# select * from user,hobby where user.id=favor.nid;
#
# '''
#join表,默认是inner join
# ret = session.query(Person).join(Hobby)
# # ret = session.query(Hobby).join(Person,isouter=True)
# '''
# SELECT *
# FROM person INNER JOIN hobby ON hobby.id = person.hobby_id
# '''
# print(ret)
# 指定连表字段(从来没用过)
# ret = session.query(Person).join(Hobby,Person.nid==Hobby.id, isouter=True)
# # ret = session.query(Person).join(Hobby,Person.hobby_id==Hobby.id, isouter=True).all()
# print(ret)
'''
SELECT *
FROM person LEFT OUTER JOIN hobby ON person.nid = hobby.id
'''
# print(ret)
# 组合(了解)UNION 操作符用于合并两个或多个 SELECT 语句的结果集
# union和union all的区别?
# q1 = session.query(User.name).filter(User.id > 2) # 6条数据
# q2 = session.query(User.name).filter(User.id < 8) # 2条数据
# q1 = session.query(User.id,User.name).filter(User.id > 2) # 6条数据
# q2 = session.query(User.id,User.name).filter(User.id < 8) # 2条数据
# ret = q1.union_all(q2).all()
# ret1 = q1.union(q2).all()
# print(ret)
# print(ret1)
#
# q1 = session.query(User.name).filter(User.id > 2)
# q2 = session.query(Hobby.caption).filter(Hobby.nid < 2)
# ret = q1.union_all(q2).all()
#### 多对多
# session.add_all([
# Boy(hostname='霍建华'),
# Boy(hostname='胡歌'),
# Girl(name='刘亦菲'),
# Girl(name='林心如'),
# ])
# session.add_all([
# Boy2Girl(girl_id=1, boy_id=1),
# Boy2Girl(girl_id=2, boy_id=1)
# ])
##### 要有girls = relationship('Girl', secondary='boy2girl', backref='boys')
# girl = Girl(name='张娜拉')
# girl.boys = [Boy(hostname='张铁林'),Boy(hostname='费玉清')]
# session.add(girl)
# boy=Boy(hostname='蔡徐坤')
# boy.girls=[Girl(name='谢娜'),Girl(name='巧碧螺')]
# session.add(boy)
# session.commit()
# 基于对象的跨表查
# girl=session.query(Girl).filter_by(id=3).first()
# print(girl.boys)
#### 基于连表的跨表查询
# 查询蔡徐坤约过的所有妹子
'''
select girl.name from girl,boy,Boy2Girl where boy.id=Boy2Girl.boy_id and girl.id=Boy2Girl.girl_id where boy.name='蔡徐坤'
'''
# ret=session.query(Girl.name).filter(Boy.id==Boy2Girl.boy_id,Girl.id==Boy2Girl.girl_id,Boy.hostname=='蔡徐坤').all()
'''
select girl.name from girl inner join Boy2Girl on girl.id=Boy2Girl.girl_id inner join boy on boy.id=Boy2Girl.boy_id where boy.hostname='蔡徐坤'
'''
# ret=session.query(Girl.name).join(Boy2Girl).join(Boy).filter(Boy.hostname=='蔡徐坤').all()
ret=session.query(Girl.name).join(Boy2Girl).join(Boy).filter_by(hostname='蔡徐坤').all()
print(ret)
### 执行原生sql(用的最多的)
### django中orm如何执行原生sql
#
# cursor = session.execute('insert into users(name) values(:value)',params={"value":'xxx'})
# print(cursor.lastrowid)
# session.commit()
session.close()
models.py
# 创建一个个类(继承谁?字段怎么写)
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
# 字段和字段属性
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship
# 制造了一个类,作为所有模型类的基类
Base = declarative_base()
class User(Base):
__tablename__ = 'users' # 数据库表名称(固定写法),如果不写,默认以类名小写作为表的名字
id = Column(Integer, primary_key=True) # id 主键
# mysql中主键自动建索引:聚簇索引
# 其他建建的索引叫:辅助索引
name = Column(String(32), index=True, nullable=False) # name列,索引,不可为空
# email = Column(String(32), unique=True) # 唯一
# #datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
# ctime = Column(DateTime, default=datetime.datetime.now) # default默认值
# extra = Column(Text, nullable=True)
#类似于djagno的 Meta
# __table_args__ = (
# UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一
# Index('ix_id_name', 'name', 'email'), #索引
# )
def __str__(self):
return self.name
def __repr__(self):
# python是强类型语言
return self.name+str(self.id)
# 一对多关系
# 一个Hobby可以有很多人喜欢
# 一个人只能由一个Hobby
class Hobby(Base):
__tablename__ = 'hobby'
id = Column(Integer, primary_key=True)
caption = Column(String(50), default='篮球')
class Person(Base):
__tablename__ = 'person'
nid = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=True)
# hobby指的是tablename而不是类名,uselist=False
# 一对多的关系,关联字段写在多的一方
hobby_id = Column(Integer, ForeignKey("hobby.id")) # 默认可以为空
# 跟数据库无关,不会新增字段,只用于快速链表操作
# 类名,backref用于反向查询
hobby = relationship('Hobby', backref='pers')
# 多对多关系
# 实实在在存在的表
class Boy2Girl(Base):
__tablename__ = 'boy2girl'
id = Column(Integer, primary_key=True, autoincrement=True) # autoincrement自增,默认是True
girl_id = Column(Integer, ForeignKey('girl.id'))
boy_id = Column(Integer, ForeignKey('boy.id'))
class Girl(Base):
__tablename__ = 'girl'
id = Column(Integer, primary_key=True)
name = Column(String(64), unique=True, nullable=False)
class Boy(Base):
__tablename__ = 'boy'
id = Column(Integer, primary_key=True, autoincrement=True)
hostname = Column(String(64), unique=True, nullable=False)
# 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以
# secondary 通过哪个表建关联,跟django中的through一模一样
girls = relationship('Girl', secondary='boy2girl', backref='boys')
# 创建表
def create_table():
# 创建engine对象
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 通过engine对象创建表
Base.metadata.create_all(engine)
# 删除表
def drop_table():
# 创建engine对象
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 通过engine对象删除所有表
Base.metadata.drop_all(engine)
if __name__ == '__main__':
create_table() # 原来已经存在user表,再执行一次不会有问题
# drop_table()
# 创建库?手动创建库
# 问题,sqlachemy支持修改字段吗?不支持
Flask-SQLAlchemy
1 Flask-SQLAlchemy
2 flask-migrate
-python3 manage.py db init 初始化:只执行一次
-python3 manage.py db migrate 等同于 makemigartions
-python3 manage.py db upgrade 等同于migrate
3 看代码
4 Flask-SQLAlchemy如何使用
1 from flask_sqlalchemy import SQLAlchemy
2 db = SQLAlchemy()
3 db.init_app(app)
4 以后在视图函数中使用
-db.session 就是咱们讲的session
5 flask-migrate的使用(表创建,字段修改)
1 from flask_migrate import Migrate,MigrateCommand
2 Migrate(app,db)
3 manager.add_command('db', MigrateCommand)
6 直接使用
-python3 manage.py db init 初始化:只执行一次,创建migrations文件夹
-python3 manage.py db migrate 等同于 makemigartions
-python3 manage.py db upgrade 等同于migrate