Flask上下文管理
铺垫
# by gaoxin from functools import partial def add(x, y, z): print(x + y + z) # 这是最简单的一个函数 # 如果我要实现一个功能,三个数相加,其中一个数必须是6 # 我们就可以使用偏函数来帮着我们传参 new_func = partial(add, 6) new_func(1,1) new_func(1,2) new_func(1,3)
# by gaoxin class A(object): def __init__(self): # self.x1 = {} object.__setattr__(self, "x1", {}) def __setattr__(self, key, value): print(key, value, self.x1) if key in self.x1: print(key, value) a = A() a.xx = 123
Flask上下文管理一
Flask的上下文管理我们可以简单的理解为一个生命周期~
也就是请求进来到请求走一共做了哪些事情~~我们从源码开始走~~
首先我们知道项目启动执行了app.run()方法~~调用了werkzeug的run_simple()方法~
run_simple(host, port, self, **options) 这时候的self就是我们的app~
run_simple会执行self(),也就是app(), 那么app = Flask() 所以会走Flask的__call__方法~
那么__call__做了什么呢~~
environ是我们请求来的原始数据~当成参数传递给了request_context方法~~
进入这个RequestContext对象~
在这里重新封装了request, 以及给session 赋值了 None~~
这是初始化这个类做的一些事情~~也就说~
ctx = RequestContext(app, environ)
ctx.request 是重新封装的request
ctx.session = None
然后我们继续往下走~~~
ctx.push() 那么这个RequestContext这个类里就一定有push方法~
我们是执行ctx.push()~~所以self应该就是我们这个ctx对象~~
执行了_request_ctx_stack.push(ctx)~也就是说~~_request_ctx_stack它把我们的ctx对象push到了一个地方~~
我们的ctx这个对象~里面有request以及session~~
self._local就是Local()对象~~
我们看这个初始化方法~跟我们上面回忆的知识点是不是很像~~
就是给我们这个Local类初始化了两个属性~~__storage__ = {} __ident_func__ = get_ident
那我们继续看LocalStark中push方法做了什么~~
回头看我们的wsgi_app, 里的ctx.push()就走完了~接下来就走我们的视图了~~~
那到这里~我们可以通过什么样的方法在我们视图中拿到这个request对象呢~~
request在ctx对象里~能通过ctx.request得到,那我们怎么得到ctx呢~
ctx被LocalStack对象放入到Local中了~~
# by gaoxin from flask import Flask from flask import globals app = Flask(__name__) @app.route("/") def index(): ctx = globals._request_ctx_stack.top print(ctx.request.method) return "index" if __name__ == '__main__': app.run()
在这里我们的上下文管理的第一部分就走完了~~
Flask上下文管理 二
我们之前走到ctx.push(), 接下来那个方法就是去走我们的视图函数了~~
在我们的视图函数中上面那种拿request的方法太费劲了~我们需要简单一点的拿到request~~
那么我们导入的request跟我们上面拿到的request一定是一样的~~那导入的这个request做了什么呢~~
reqeust是LocalProxy这个类的实例化对象~参数是一个偏函数~~
那当我们调用request.method 等方法的时候走的是LocalProxy这个类的__getattr__方法~~
这里的_get_current_object()相当于我们偏函数的执行~~
到这~就跟我们上面的取值方式一样了~~也就是说通过LocalStack方法去Local中去ctx对象~~
然后通过getattr 找到ctx.request~~~
也就是说这个LocalProxy就是一个帮助我们取值的代理~让我们的取值变的更加简单~~~
这个代理通过偏函数来绑定参数~~
ctx中封装了request,以及session~只不过到这里我们的session依然是空的~~
session的实现原理
首先请求进来的时候在Local中存放了ctx对象~这个对象里的session是None~~
当我们走完这个_reqeust_cts_stack.push(ctx)后,我们看它走了什么~~
也就是说~请求进来把ctx放入Local中后~~从前端解密了cookie~然后把解密数据好的数据给了self.session~~
然后继续走~
我们可以看到~session的原理是~~
从cookie中获取数据~解密存入session~~请求走的时候~~ 把session中数据取出来,加密, 给响应设置cookie~~
那么我们平时在视图中设置删除session的值~原来跟request是一样的~通过代理去修改Local中的数据~~~
应用上下文管理
应用上下文和请求上下文的原理和源码是一样的~~~
也就是说~我们请求上下文和应用上下文~分别建立了两个Local对象~~
两个Local对象数据结构都是一样的~~那么请求上下文和应用上下文为什么要分开存放呢~~~
以后我们写离线脚本的时候大家就知道了~~~总之~~有用!!!!!
全局对象g
我们说应用上下文里封装了g对象~~那么这个g对象是什么呢~~~
Flask中g的生命周期?
g和session有什么区别?
g和全局对象有什么区别~~
1--我们讲这么多上下文管理~~我们知道请求进来会为每个请求在Local中建立一个独立空间~~
也就是在应用上下文的Local对象中建立了一个g对象~~当请求走的时候~~~就会删除~~
所以g的生命周期是请求进来到走~~~
2--session不一样的是保存在cookie中~所以下次请求来的时候cookie带来了~~~
3--全局变量~~是在项目启动创建的~~
无论多少请求进来都可以访问全局变量~~
我们的g对象一般情况用于before_request中设置值~只为这一次请求建立全局变量~~~
!!注意在我们重定向的时候还可以取g的值么~~~
SQLAlchemy介绍
SQLAlchemy是一个基于Python的ORM框架。该框架是建立在DB-API之上,使用关系对象映射进行数据库操作。
简而言之就是,将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
补充:什么是DB-API ? 是Python的数据库接口规范。
在没有DB-API之前,各数据库之间的应用接口非常混乱,实现各不相同,
项目需要更换数据库的时候,需要做大量的修改,非常不方便,DB-API就是为了解决这样的问题。
1
|
pip install sqlalchemy |
组成部分:
-- engine,框架的引擎
-- connection pooling 数据库连接池
-- Dialect 选择链接数据库的DB-API种类(实际选择哪个模块链接数据库)
-- Schema/Types 架构和类型
-- SQL Expression Language SQL表达式语言
连接数据库
SQLAlchemy 本身无法操作数据库,其必须依赖遵循DB-API规范的三方模块,
Dialect 用于和数据API进行交互,根据配置的不同调用不同数据库API,从而实现数据库的操作。
# MySQL-PYthon mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> #pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] # MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> # cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] # 更多 # http://docs.sqlalchemy.org/en/latest/dialects/index.html
from sqlalchemy import create_engine engine = create_engine( "mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接数 pool_size=5, # 连接池大小 pool_timeout=30, # 连接池中没有线程最多等待时间,否则报错 pool_recycle=-1, # 多久之后对连接池中的连接进行回收(重置)-1不回收 )
执行原生SQL
# by gaoxin from sqlalchemy import create_engine engine = create_engine( "mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8", max_overflow=0, pool_size=5, ) def test(): cur = engine.execute("select * from Course") result = cur.fetchall() print(result) cur.close() if __name__ == '__main__': test() # [(1, '生物', 1), (2, '体育', 2), (3, '物理', 1)]
ORM
一、创建表
# by gaoxin from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy import Index, UniqueConstraint import datetime ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",) Base = declarative_base() class UserInfo(Base): __tablename__ = "user_info" id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) email = Column(String(32), unique=True) create_time = Column(DateTime, default=datetime.datetime.now) __table_args__ = ( UniqueConstraint("id", "name", name="uni_id_name"), Index("name", "email") ) def create_db(): Base.metadata.create_all(ENGINE) def drop_db(): Base.metadata.drop_all(ENGINE) if __name__ == '__main__': create_db()
# by gaoxin from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy import Index, UniqueConstraint, ForeignKey from sqlalchemy.orm import relationship import datetime ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",) Base = declarative_base() # ======一对多示例======= class UserInfo(Base): __tablename__ = "user_info" id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) email = Column(String(32), unique=True) create_time = Column(DateTime, default=datetime.datetime.now) # FK字段的建立 hobby_id = Column(Integer, ForeignKey("hobby.id")) # 不生成表结构 方便查询使用 hobby = relationship("Hobby", backref="user") __table_args__ = ( UniqueConstraint("id", "name", name="uni_id_name"), Index("name", "email") ) class Hobby(Base): __tablename__ = "hobby" id = Column(Integer, primary_key=True) title = Column(String(32), default="码代码") def create_db(): Base.metadata.create_all(ENGINE) def drop_db(): Base.metadata.drop_all(ENGINE) if __name__ == '__main__': create_db() # drop_db()
# by gaoxin from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy import Index, UniqueConstraint, ForeignKey from sqlalchemy.orm import relationship import datetime ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",) Base = declarative_base() # ======多对多示例======= class Book(Base): __tablename__ = "book" id = Column(Integer, primary_key=True) title = Column(String(32)) # 不生成表字段 仅用于查询方便 tags = relationship("Tag", secondary="book2tag", backref="books") class Tag(Base): __tablename__ = "tag" id = Column(Integer, primary_key=True) title = Column(String(32)) class Book2Tag(Base): __tablename__ = "book2tag" id = Column(Integer, primary_key=True) book_id = Column(Integer, ForeignKey("book.id")) tag_id = Column(Integer, ForeignKey("tag.id")) def create_db(): Base.metadata.create_all(ENGINE) def drop_db(): Base.metadata.drop_all(ENGINE) if __name__ == '__main__': create_db() # drop_db()
二、对数据库表的操作(增删改查)
# by gaoxin from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session from models_demo import Tag ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",) Session = sessionmaker(bind=ENGINE) # 每次执行数据库操作的时候,都需要创建一个session # 线程安全,基于本地线程实现每个线程用同一个session session = scoped_session(Session) # =======执行ORM操作========== tag_obj = Tag(title="SQLAlchemy") # 添加 session.add(tag_obj) # 提交 session.commit() # 关闭session session.close()
# by gaoxin from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session from models_demo import Tag, UserInfo import threading ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",) Session = sessionmaker(bind=ENGINE) # 每次执行数据库操作的时候,都需要创建一个session session = Session() session = scoped_session(Session) # ============添加============ # tag_obj = Tag(title="SQLAlchemy") # # 添加 # session.add(tag_obj) # session.add_all([ # Tag(title="Python"), # Tag(title="Django"), # ]) # # 提交 # session.commit() # # 关闭session # session.close() # ============基础查询============ # ret1 = session.query(Tag).all() # ret2 = session.query(Tag).filter(Tag.title == "Python").all() # ret3 = session.query(Tag).filter_by(title="Python").all() # ret4 = session.query(Tag).filter_by(title="Python").first() # print(ret1, ret2, ret3, ret4) # ============删除=========== # session.query(Tag).filter_by(id=1).delete() # session.commit() # ===========修改=========== session.query(Tag).filter_by(id=22).update({Tag.title: "LOL"}) session.query(Tag).filter_by(id=23).update({"title": "王者毒药"}) session.query(Tag).filter_by(id=24).update({"title": Tag.title + "~"}, synchronize_session=False) # synchronize_session="evaluate" 默认值进行数字加减 session.commit()
# 条件查询 ret1 = session.query(Tag).filter_by(id=22).first() ret2 = session.query(Tag).filter(Tag.id > 1, Tag.title == "LOL").all() ret3 = session.query(Tag).filter(Tag.id.between(22, 24)).all() ret4 = session.query(Tag).filter(~Tag.id.in_([22, 24])).first() from sqlalchemy import and_, or_ ret5 = session.query(Tag).filter(and_(Tag.id > 1, Tag.title == "LOL")).first() ret6 = session.query(Tag).filter(or_(Tag.id > 1, Tag.title == "LOL")).first() ret7 = session.query(Tag).filter(or_( Tag.id>1, and_(Tag.id>3, Tag.title=="LOL") )).all() # 通配符 ret8 = session.query(Tag).filter(Tag.title.like("L%")).all() ret9 = session.query(Tag).filter(~Tag.title.like("L%")).all() # 限制 ret10 = session.query(Tag).filter(~Tag.title.like("L%")).all()[1:2] # 排序 ret11 = session.query(Tag).order_by(Tag.id.desc()).all() # 倒序 ret12 = session.query(Tag).order_by(Tag.id.asc()).all() # 正序 # 分组 ret13 = session.query(Tag.test).group_by(Tag.test).all() # 聚合函数 from sqlalchemy.sql import func ret14 = session.query( func.max(Tag.id), func.sum(Tag.test), func.min(Tag.id) ).group_by(Tag.title).having(func.max(Tag.id > 22)).all() # 连表 ret15 = session.query(UserInfo, Hobby).filter(UserInfo.hobby_id == Hobby.id).all() # print(ret15) 得到一个列表套元组 元组里是两个对象 ret16 = session.query(UserInfo).join(Hobby).all() # print(ret16) 得到列表里面是前一个对象 # 相当于inner join # for i in ret16: # # print(i[0].name, i[1].title) # print(i.hobby.title) ret17 = session.query(Hobby).join(UserInfo, isouter=True).all() ret17_1 = session.query(UserInfo).join(Hobby, isouter=True).all() ret18 = session.query(Hobby).outerjoin(UserInfo).all() ret18_1 = session.query(UserInfo).outerjoin(Hobby).all() # 相当于left join print(ret17) print(ret17_1) print(ret18) print(ret18_1)
# 基于relationship的FK # 添加 user_obj = UserInfo(name="提莫", hobby=Hobby(title="种蘑菇")) session.add(user_obj) hobby = Hobby(title="弹奏一曲") hobby.user = [UserInfo(name="琴女"), UserInfo(name="妹纸")] session.add(hobby) session.commit() # 基于relationship的正向查询 user_obj_1 = session.query(UserInfo).first() print(user_obj_1.name) print(user_obj_1.hobby.title) # 基于relationship的反向查询 hb = session.query(Hobby).first() print(hb.title) for i in hb.user: print(i.name) session.close()
# 添加 book_obj = Book(title="Python源码剖析") tag_obj = Tag(title="Python") b2t = Book2Tag(book_id=book_obj.id, tag_id=tag_obj.id) session.add_all([ book_obj, tag_obj, b2t, ]) session.commit() # 上面有坑哦~~~~ book = Book(title="测试") book.tags = [Tag(title="测试标签1"), Tag(title="测试标签2")] session.add(book) session.commit() tag = Tag(title="LOL") tag.books = [Book(title="大龙刷新时间"), Book(title="小龙刷新时间")] session.add(tag) session.commit() # 基于relationship的正向查询 book_obj = session.query(Book).filter_by(id=4).first() print(book_obj.title) print(book_obj.tags) # 基于relationship的反向查询 tag_obj = session.query(Tag).first() print(tag_obj.title) print(tag_obj.books)
Flask-session
Flask-session跟框架自带的session有什么区别呢~
框架自带的session是通过请求上下文~放入到Local中的~那如果我们想把session放入别的地方怎么办呢~~
比如redis~或者数据库~等等~~Flask-session就提供了这些功能~~我们看下Flask-session怎么用~~
一、下载安装
pip install flask-session
二、导入并实例化
def create_app(): app = Flask(__name__) app.config.from_object("settings.BaseConfig") app.register_blueprint(us) # Flask-Session 第二步实例化session Session(app) return app
三、配置文件
class BaseConfig(object): # Flask-Session 第三步 # SESSION_TYPE = 'redis' # SESSION_REDIS = Redis(host='192.168.0.94', port='6379')
实现原理
回顾一下session的实现原理~请求进来先把request以及session封装到RequestContext对象中~~
然后调用push方法通过LocalStark放入到Local中~这时候放入到Local中的session还是空的~
然后调用了session_interface中的open_session 以及save_session方法~~
那我们再看下~~Flask-session都做了什么~~
修改了app.session_interface这个类~所以在我们调用open_session以及save_session的时候~调用的是我们配置的类里的方法~
从而实现了session存储地方的不同~
Flask SQLAlchemy
学习Flask-SQLAlchemy之前~大家要先学习一下SQLAlchemy~一个Python的ORM框架~~
接下来是Flask-SQLAlchemy的应用~~
一、下载安装
pip3 install flask-sqlalchemy
二、导入并实例化SQLAlchemy
# 在跟项目同名的文件夹下的 init.py中 from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy() from .views.user import us # !!! 注意事项 # 必须在导入蓝图之前
三、初始化
def create_app(): app = Flask(__name__) app.config.from_object("settings.BaseConfig") app.register_blueprint(us) # Flask-Session 第二步实例化session Session(app) # 初始化db db.init_app(app) return app
四、在配置文件写入配置信息
class BaseConfig(object): # Flask-Session 第三步 # SESSION_TYPE = 'redis' # SESSION_REDIS = Redis(host='192.168.0.94', port='6379') SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8" SQLALCHEMY_POOL_SIZE = 10 SQLALCHEMY_MAX_OVERFLOW = 5 # SQLALCHEMY_TRACK_MODIFICATIONS = False pass
五、创建model
# by gaoxin from sqlalchemy import Column, Integer, String from flask_demo import db class Users(db.Model): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False)
六、生成表(需要使用app上下文)
# by gaoxin from flask_demo import db, create_app from flask_demo.models import *
# 一定要导入models 否则找不到表创建不出来 app = create_app() app_ctx = app.app_context() with app_ctx: db.create_all() # db.drop_all()
七、基于ORM对数据库操作
# by gaoxin from flask import Blueprint from flask_demo import db from flask_demo.models import Users us = Blueprint("us", __name__) @us.route("/index") def index(): # db.session.add(Users(name="gaoxin")) # db.session.commit() # db.session.remove() ret = db.session.query(Users).all() print(ret) db.session.remove() return "Index"
Flask-Script
一、下载安装
pip3 install flask-script
二、增加的功能 runserver
# by gaoxin from flask_demo import create_app from flask_script import Manager app = create_app() manager = Manager(app) if __name__ == '__main__': # app.run() manager.run() # 启动命令变成 # python3 manager.py runserver -h 127.0.0.1 -p 8000 # Running on http://127.0.0.1:8000/ (Press CTRL+C to quit)
三、自定义命令
# by gaoxin from flask_demo import create_app from flask_script import Manager app = create_app() manager = Manager(app) # 位置传参 @manager.command def custom(arg): """ 自定义命令 python manage.py custom 123 :param arg: :return: """ print(arg) # 关键字传参 @manager.option('-n', '--name', dest='name') @manager.option('-u', '--url', dest='url') def cmd(name, url): """ 自定义命令 执行: python manage.py cmd -n gaoxin -u http://www.oldboyedu.com :param name: :param url: :return: """ print(name, url) if __name__ == '__main__': # app.run() manager.run()
Flask-migrate
一、下载安装
pip3 install flask-migrate
二、增加的命令
!!!! 依赖flask-script !!!!
# by gaoxin from flask_demo import create_app, db from flask_demo.models import * from flask_script import Manager from flask_migrate import Migrate, MigrateCommand app = create_app() manager = Manager(app) Migrate(app, db) """ # 数据库迁移命名 # 依赖 flask-script python manage.py db init # 初始化 python manage.py db migrate # makemigrations python manage.py db upgrade # migrate """ manager.add_command("db", MigrateCommand) if __name__ == '__main__': # app.run() manager.run()
wtforms
类比我们django的Form组件~
Form组件的主要应用是~帮助我们自动生成HTML,以及做表单数据的验证~~
用法跟Form组件大同小异~~
一、下载安装
pip3 install wtforms
二、自动生成HTML
第一步 生成一个Form类
第二步 实例化这个Form类,把这个实例化对象当成参数传递给前端
# 视图页面
from wtforms import Form, widgets, validators
from wtforms.fields import simple
class MyForm(Form): name = simple.StringField( label="用户名", render_kw={"placeholder": "请输入用户名"}, widget=widgets.TextArea(), default="gaoxin" ) pwd = simple.PasswordField() @ac.route("/login", methods=["GET", "POST"]) def login(): if request.method == "GET": form = MyForm(data={"name": "gao"}) return render_template("login.html", form=form)
<!--html页面--> <form action="" novalidate> 用户名: {{form.name}} 密码: {{form.pwd}} <button type="submit">提交</button> </form> <!--循环出来的页面--> <form action=""> {% for field in form %} {{field.label}}: {{field}} {% endfor %} <button type="submit">提交</button> </form>
三、验证
第一步 在Form类中增加验证信息
第二步 在视图中做数据的校验 并且页面展示错误信息
# 视图页面 class MyForm(Form): name = simple.StringField( label="用户名", render_kw={"placeholder": "请输入用户名"}, # widget=widgets.TextArea(), validators=[ validators.DataRequired(message="用户名不能为空"), validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d') ], # default="gaoxin" ) pwd = simple.PasswordField( label="密码", widget=widgets.PasswordInput(), validators=[ validators.DataRequired(message='密码不能为空.'), validators.Length(min=8, message='用户名长度必须大于%(min)d'), validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符') ] ) @ac.route("/login", methods=["GET", "POST"]) def login(): if request.method == "GET": form = MyForm(data={"name": "gao"}) return render_template("login.html", form=form) form = MyForm(formdata=request.form) if form.validate(): print(form.data) else: return render_template("login.html", form=form) return "lakdsjlga"
<!--循环出来的页面--> <form action="" method="post" novalidate> {% for field in form %} {{field.label}}: {{field}} {{field.errors[0]}} {% endfor %} <button type="submit">提交</button> </form>
四、拓展字段
以用户注册为例,输入用户名,密码,重复密码,性别和爱好。
class RegisterForm(Form): name = simple.StringField( label='用户名', validators=[ validators.DataRequired() ], widget=widgets.TextInput(), # render_kw={'class': 'form-control'}, default='gaoxin' ) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.') ], widget=widgets.PasswordInput(), # render_kw={'class': 'form-control'} ) pwd_confirm = simple.PasswordField( label='重复密码', validators=[ validators.DataRequired(message='重复密码不能为空.'), validators.EqualTo('pwd', message="两次密码输入不一致") ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) email = html5.EmailField( label='邮箱', validators=[ validators.DataRequired(message='邮箱不能为空.'), validators.Email(message='邮箱格式错误') ], widget=widgets.TextInput(input_type='email'), render_kw={'class': 'form-control'} ) gender = core.RadioField( label='性别', choices=( (1, '男'), (2, '女'), ), coerce=int ) city = core.SelectField( label='城市', choices=( ('bj', '北京'), ('sh', '上海'), ) ) hobby = core.SelectMultipleField( label='爱好', choices=( (1, '篮球'), (2, '足球'), ), coerce=int ) favor = core.SelectMultipleField( label='喜好', choices=( (1, '篮球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, 2] ) def __init__(self, *args, **kwargs): super(RegisterForm, self).__init__(*args, **kwargs) # 从数据库获取数据 做到实时更新 self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球')) @ac.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'GET': form = RegisterForm(data={'gender': 1}) return render_template('register.html', form=form) else: form = RegisterForm(formdata=request.form) if form.validate(): return "注册成功" else: print(form.errors) return render_template('register.html', form=form)
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用户注册</h1> <form method="post" novalidate style="padding:0 50px"> {% for item in form %} <p>{{item.label}}: {{item}} {{item.errors[0] }}</p> {% endfor %} <input type="submit" value="提交"> </form> </body> </html>
!!! 注意选项字段需要去数据库取数据 还有就是从数据库取数据的实时更新
Flask-Session
from flask import Flask,views,request,session from flask_session import Session from redis import Redis app=Flask(__name__) app.config['SESSION_TYPE'] = 'redis' app.config['SESSION_REDIS'] = Redis(host="127.0.0.1",port=6379) Session(app) @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": session["user"] = "123" return "post" return "get" if __name__ == '__main__': app.run(debug=True)
WTForms
from flask import Flask, request, render_template from wtforms import Form, validators, widgets from wtforms.fields import simple, core app = Flask(__name__) class RegForm(Form): username = simple.StringField( label="用户名", validators=[ validators.DataRequired(message="用户名不能为空"), ], render_kw={"class": "my_class"} ) pwd = simple.PasswordField( label="密码", validators=[ validators.DataRequired(message="密码不能为空"), validators.length(min=6, max=16, message="长度必须大于等于6,小于等于16") ], render_kw={"class": "form-control"}, widget = widgets.PasswordInput(), ) pwd_confim = simple.PasswordField( label="确认密码", validators=[ validators.EqualTo("pwd", message="两次密码不一致"), validators.length(min=6, max=16, message="长度必须大于等于6,小于等于16") ], ) gender = core.SelectField( label="性别", choices=( (1, "女"), (2, "男") ), default=1, coerce=int # 限制是int类型的 ) city = core.RadioField( label="城市", choices=( ("1", "北京"), ("2", "上海") ), default=2, coerce=str ) email = simple.StringField( label="邮箱", validators=[ validators.DataRequired(message="邮箱不能为空"), validators.Email(message="格式不对") ] ) hobby = core.SelectMultipleField( label="爱好", choices=( (1, "电影"), (2, "音乐"), (3, "画画"), (4, "看书") ), coerce=int, default=(1, 4) ) favor = core.SelectMultipleField( label="喜好", choices=( (1, '篮球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, 2] ) def __init__(self,*args,**kwargs): #这里的self是一个RegisterForm对象 '''重写__init__方法''' super().__init__(*args, **kwargs) #继承父类的init方法 self.favor.choices =((1, '篮球'), (2, '足球'), (3, '羽毛球')) #吧RegisterForm这个类里面的favor重新赋值 def validate_pwd_confim(self,field,): ''' 自定义pwd_config字段规则,例:与pwd字段是否一致 :param field: :return: ''' # 最开始初始化时,self.data中已经有所有的值 if field.data != self.data['pwd']: # raise validators.ValidationError("密码不一致") # 继续后续验证 raise validators.StopValidation("密码不一致") # 不再继续后续验证 class LoginForm(Form): username = simple.StringField( label="用户名", validators=[ validators.DataRequired(message="用户名不能为空"), validators.length(min=4, max=8, message="长度必须大于等于4,小于等于8") ] ) password = simple.PasswordField( label="密码", validators=[ validators.DataRequired(message="密码不能为空"), validators.length(min=4, max=8, message="长度必须大于等于4,小于等于8") ] ) @app.route('/register',methods=["GET","POST"]) def register(): if request.method=="GET": form = RegForm(data={'gender': 1}) #默认是1, return render_template("register.html",form=form) else: form = RegForm(formdata=request.form) if form.validate(): #判断是否验证成功 print('用户提交数据通过格式验证,提交的值为:', form.data) #所有的正确信息 else: print(form.errors) #所有的错误信息 return render_template('register.html', form=form) @app.route("/reg", methods=["GET", "POST"]) def reg(): if request.method == "GET": rf = RegForm() return render_template("reg.html", rf=rf) else: rf_data = RegForm(request.form) if rf_data.validate(): print(rf_data.data) return "OK" else: return render_template("reg.html", rf=rf_data) @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "GET": login_form = LoginForm() return render_template("login.html", lf=login_form) else: login_form_data = LoginForm(request.form) if login_form_data.validate(): user = login_form_data.data.get("username") return str(user) else: return render_template("login.html", lf=login_form_data) if __name__ == '__main__': app.run(debug=True)
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <form action="" method="post" novalidate> {% for field in rf %} <p>{{ field.label }}{{ field }}{{ field.errors[0] }}</p> {% endfor %} <input type="submit" value="submit"> </form> <form action="" method="post" novalidate> <p>{{ lf.username.label }}{{ lf.username }}{{ lf.username.errors.0 }}</p> <p>{{ lf.password.label }}{{ lf.password }}{{ lf.password.errors[0] }}</p> <input type="submit" value="submit"> </form> </body> </body> </html>
Flask-请求上下文
# 偏函数partial from functools import partial def my_sum(a, b): print(a) print(b) return a + b # new_my_sum = partial(my_sum, 10) # new_my_sum此时就是下面这样的 # def my_sum(b): # a=10 new_my_sum = partial(my_sum, b=10) # 位置参数一定要在关键字参数前面 不能写成new_my_sum = partial(my_sum, a=10),如果非要这样写,下面就改成res = new_my_sum(b=20) # new_my_sum此时就是下面这样的 # def my_sum(b): # b=10 res = new_my_sum(20) print(res) class My: def __call__(self, *args, **kwargs): print(666, '__call__') def __setattr__(self, key, value): print(key, value, '__setattr__') def __getattr__(self, item): print(item, '__getattr__') def __setitem__(self, key, value): print(key, value, '__setitem__') def __getitem__(self, item): print(item, '__getitem__') a = My() a() # 执行__call__ a.name = 'tom' # 执行__setattr__ a['age'] = 18 # __setitem__ a['name'] # __getitem__ a.qweqwewqe # __getattr__ # 空间换时间 # 线程安全 Flask上下文机制就是使用的Threading.local # 线程进来,开辟一个空间给这个线程,线程操作的所有任务,都会复制一份到空间中 from threading import local class Foo(local): pass print('##############################') # 栈Stack class Mystack(object): data = [] def __setattr__(self, key, value): self.push(key, value) def push(self, key, value): self.data.append({key:value}) def top(self): return self.data.pop() my_stack = Mystack() my_stack.name = 'tom' my_stack.name = 'rose' print(my_stack.data) #[{'name': 'tom'}, {'name': 'rose'}] #LocalStack # import threading # threading.current_thread().ident #当前线程的ID import threading from threading import get_ident #和上面是一样的 class MyLocalStack(object): data=[] storage={} def push(self, item): try: self.storage[get_ident()].append(item) except: self.storage[get_ident()]=[item] def top(self): return self.storage[get_ident()].pop() my_local_stack=MyLocalStack() import time def go(i): my_local_stack.push(i) time.sleep(1) # my_local_stack.top() for i in range(5): t = threading.Thread(target=go,args=(i,)) t.start() print(my_local_stack.storage) ########### from werkzeug.wrappers import Request, Response from werkzeug.serving import run_simple @Request.application def app(req): print(req) return Response("200ok") run_simple("0.0.0.0", 9527, app) """ run_simple("0.0.0.0",9527,app) 会执行app函数 from flask import Flask app=Flask(__name__) if __name__ == '__main__': app.run() """
#!/usr/bin/env python # -*- coding: utf-8 -*- from flask import Flask,request app=Flask(__name__) if __name__ == '__main__': app.run() #run_simple(host, port, self, **options) #app()=obj()=obj.__call__ #app.wsgi_app """ 上文 self._local=={"__storage__":{},"__ident_func__":get_ident} ctx = requestcontext对象 此对象中有 request/session 此时 self._local=={"__storage__":{3721:{"stack":[ctx] }},"__ident_func__":get_ident} _request_ctx_stack == LocalStack()==self._local=={"__storage__":{3721:{"stack":[ctx] }},"__ident_func__":get_ident} """ """ 下文源码 request = LocalProxy(partial(_lookup_req_object, "request")#返回的是request对象) class LocalProxy(object): __slots__ = ("__local", "__dict__", "__name__", "__wrapped__") def __init__(self, local, name=None): ####local==request偏函数==partial(_lookup_req_object, "request")#返回的是request对象 object.__setattr__(self, "_LocalProxy__local", local) #__local=local=request偏函数 object.__setattr__(self, "__name__", name) if callable(local) and not hasattr(local, "__release_local__"): object.__setattr__(self, "__wrapped__", local) def __getattr__(self, name): ##request.method执行的就是这 if name == "__members__": return dir(self._get_current_object()) return getattr(self._get_current_object(), name) #从request对象中取出 name ,name=method def _get_current_object(self): if not hasattr(self.__local, "__release_local__"): return self.__local() ##request偏函数的执行,返回的是request对象 try: return getattr(self.__local, self.__name__) except AttributeError: raise RuntimeError("no object bound to %s" % self.__name__) def _lookup_req_object(name): #name==request top = _request_ctx_stack.top #top==ctx ctx = requestcontext对象 此对象中有 request/session if top is None: raise RuntimeError(_request_ctx_err_msg) return getattr(top, name) #返回的是request对象 ,top是ctx name是request @property def top(self): try: return self._local.stack[-1] ## except (AttributeError, IndexError): return None """ """ 上文源码 def run(self, host=None, port=None, debug=None, load_dotenv=True, **options): # self==app==Flask from werkzeug.serving import run_simple try: run_simple(host, port, self, **options) #相当于执行self() 也就是 Flask.__call__() # self==app==Flask Flask.__call__() def __call__(self, environ, start_response): #environ==request return self.wsgi_app(environ, start_response) def wsgi_app(self, environ, start_response): ctx = self.request_context(environ) # self==app==Flask ## ctx = requestcontext对象 此对象中有 request/session ## error = None try: try: ctx.push()### ctx执行的push response = self.full_dispatch_request() except Exception as e: error = e response = self.handle_exception(e) except: # noqa: B001 error = sys.exc_info()[1] raise return response(environ, start_response) finally: if self.should_ignore_error(error): error = None ctx.auto_pop(error) def push(self): ##self==ctx top = _request_ctx_stack.top ###_request_ctx_stack == LocalStack()=={"__storage__":{},"__ident_func__":get_ident} if top is not None and top.preserved: top.pop(top._preserved_exc) app_ctx = _app_ctx_stack.top if app_ctx is None or app_ctx.app != self.app: app_ctx = self.app.app_context() app_ctx.push() self._implicit_app_ctx_stack.append(app_ctx) else: self._implicit_app_ctx_stack.append(None) if hasattr(sys, "exc_clear"): sys.exc_clear() _request_ctx_stack.push(self)######### self==ctx ctx = requestcontext对象 此对象中有 request/session ###{"__storage__":{3721:{"stack":[ctx] }},"__ident_func__":get_ident}### 上文 def push(self, obj): ######### obj就是ctx self==_request_ctx_stack == LocalStack()=={"__storage__":{},"__ident_func__":get_ident} rv = getattr(self._local, "stack", None) ##此时 self._local=={"__storage__":{},"__ident_func__":get_ident} if rv is None: self._local.stack = rv = [] ##此时 self.local=={"__storage__":{3721:{"stack":rv=[] }},"__ident_func__":get_ident} rv.append(obj) ##obj==ctx ctx = requestcontext对象 此对象中有 request/session 此时 {"__storage__":{3721:{"stack":[ctx] }},"__ident_func__":get_ident} return rv ##rv==ctx class Local(object): def __setattr__(self, name, value): ident = self.__ident_func__() storage = self.__storage__ try: storage[ident][name] = value except KeyError: storage[ident] = {name: value} class LocalStack(object): def __init__(self): self._local = Local() class Local(object): __slots__ = ("__storage__", "__ident_func__") def __init__(self): object.__setattr__(self, "__storage__", {}) object.__setattr__(self, "__ident_func__", get_ident) def request_context(self, environ): return RequestContext(self, environ) # self==app==Flask class RequestContext(object): def __init__(self, app, environ, request=None, session=None): #self==requestcontext对象 app==app==flask self.app = app if request is None: request = app.request_class(environ) #request_class = Request self.request = request ####### self.url_adapter = None try: self.url_adapter = app.create_url_adapter(self.request) except HTTPException as e: self.request.routing_exception = e self.flashes = None self.session = session class Request(RequestBase, JSONMixin): pass class BaseRequest(object): def __init__(self, environ, populate_request=True, shallow=False): self.environ = environ if populate_request and not shallow: self.environ["werkzeug.request"] = self self.shallow = shallow """
Flask-SQLAlchemy
首先要先安装一下Flask-SQLAlchemy这个模块
pip install Flask-SQLAlchemy
然后你要下载一个干净的Flask项目 点击下载
接下来基于这个Flask项目,我们要加入Flask-SQLAlchemy让项目变得生动起来
1.加入Flask-SQLAlchemy第三方组件
1 from flask import Flask 2 3 # 导入Flask-SQLAlchemy中的SQLAlchemy 4 from flask_sqlalchemy import SQLAlchemy 5 6 # 实例化SQLAlchemy 7 db = SQLAlchemy() 8 # PS : 实例化SQLAlchemy的代码必须要在引入蓝图之前 9 10 from .views.users import user 11 12 13 def create_app(): 14 app = Flask(__name__) 15 16 # 初始化App配置 这个app配置就厉害了,专门针对 SQLAlchemy 进行配置 17 # SQLALCHEMY_DATABASE_URI 配置 SQLAlchemy 的链接字符串儿 18 app.config["SQLALCHEMY_DATABASE_URI"] = "mysql+pymysql://root:DragonFire@127.0.0.1:3306/dragon?charset=utf8" 19 # SQLALCHEMY_POOL_SIZE 配置 SQLAlchemy 的连接池大小 20 app.config["SQLALCHEMY_POOL_SIZE"] = 5 21 # SQLALCHEMY_POOL_TIMEOUT 配置 SQLAlchemy 的连接超时时间 22 app.config["SQLALCHEMY_POOL_TIMEOUT"] = 15 23 app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False 24 25 # 初始化SQLAlchemy , 本质就是将以上的配置读取出来 26 db.init_app(app) 27 28 app.register_blueprint(user) 29 30 return app
2.建立models.py ORM模型文件
1 from MyApp import db 2 3 Base = db.Model # 这句话你是否还记的? 4 # from sqlalchemy.ext.declarative import declarative_base 5 # Base = declarative_base() 6 # 每一次我们在创建数据表的时候都要做这样一件事 7 # 然而Flask-SQLAlchemy已经为我们把 Base 封装好了 8 9 # 建立User数据表 10 class Users(Base): # Base实际上就是 db.Model 11 __tablename__ = "users" 12 __table_args__ = {"useexisting": True} 13 # 在SQLAlchemy 中我们是导入了Column和数据类型 Integer 在这里 14 # 就和db.Model一样,已经封装好了 15 id = db.Column(db.Integer,primary_key=True) 16 username = db.Column(db.String(32)) 17 password = db.Column(db.String(32)) 18 19 20 if __name__ == '__main__': 21 from MyApp import create_app 22 app = create_app() 23 # 这里你要回顾一下Flask应该上下文管理了 24 # 离线脚本: 25 with app.app_context(): 26 db.drop_all() 27 db.create_all()
3.登录视图函数的应用
1 from flask import Blueprint, request, render_template 2 3 user = Blueprint("user", __name__) 4 5 from MyApp.models import Users 6 from MyApp import db 7 8 @user.route("/login",methods=["POST","GET"]) 9 def user_login(): 10 if request.method == "POST": 11 username = request.form.get("username") 12 password = request.form.get("password") 13 14 # 还记不记得我们的 15 # from sqlalchemy.orm import sessionmaker 16 # Session = sessionmaker(engine) 17 # db_sesson = Session() 18 # 现在不用了,因为 Flask-SQLAlchemy 也已经为我们做好会话打开的工作 19 # 我们在这里做个弊: 20 db.session.add(Users(username=username,password=password)) 21 db.session.commit() 22 23 # 然后再查询,捏哈哈哈哈哈 24 user_info = Users.query.filter(Users.username == username and User.password == password).first() 25 print(user_info.username) 26 if user_info: 27 return f"登录成功{user_info.username}" 28 29 return render_template("login.html")
Flask-Script
Flask-Script 从字面意思上来看就是 Flask 的脚本
是的,熟悉Django的同学是否还记得Django的启动命令呢? python manager.py runserver 大概是这样对吧
其实Flask也可以做到,基于 Flask-Script 就可以了 - 但是你还是得有一个项目 点击下载
1.安装 Flask-Script
pip install Flask-Script
2.将 Flask-Script 加入到 Flask 项目中
1 import MyApp 2 # 导入 Flask-Script 中的 Manager 3 from flask_script import Manager 4 5 app = MyApp.create_app() 6 # 让app支持 Manager 7 manager = Manager(app) 8 9 if __name__ == '__main__': 10 #app.run() 11 # 替换原有的app.run(),然后大功告成了 12 manager.run()
3.使用命令启动 Flask 项目
python manager.py runserver
4.启动项目并更改配置参数(监听IP地址,监听端口)
python manager.py runserver -h 0.0.0.0 -p 9527
5.高级操作 - 自定制脚本命令
5.1.方式一 : @manager.command
1 import MyApp 2 # 导入 Flask-Script 中的 Manager 3 from flask_script import Manager 4 5 app = MyApp.create_app() 6 # 让app支持 Manager 7 manager = Manager(app) # type:Manager 8 9 @manager.command 10 def DragonFire(arg): 11 print(arg) 12 13 if __name__ == '__main__': 14 #app.run() 15 # 替换原有的app.run(),然后大功告成了 16 manager.run()
python manager.py DragonFire 666
5.2.方式二 : @manager.opation("-短指令","--长指令",dest="变量名")
1 import MyApp 2 # 导入 Flask-Script 中的 Manager 3 from flask_script import Manager 4 5 app = MyApp.create_app() 6 # 让app支持 Manager 7 manager = Manager(app) # type:Manager 8 9 @manager.command 10 def DragonFire(arg): 11 print(arg) 12 13 @manager.option("-n","--name",dest="name") 14 @manager.option("-s","--say",dest="say") 15 def talk(name,say): 16 print(f"{name}你可真{say}") 17 18 if __name__ == '__main__': 19 #app.run() 20 # 替换原有的app.run(),然后大功告成了 21 manager.run()
python manager.py talk -n 赵丽颖 -s 漂亮
python manager.py talk --name DragonFire --say NB-Class
Flask-Migrate
终于到了Flask-Migrate,之前在学习Flask-SQLAlchemy的时候,有的同学就提过类似的问题,Flask支持 makemigration / migrate 吗?
答案在这里该诉你,如果你同时拥有两个三方组件 Flask-Script 和 Flask-Migrate 那么就支持这样的动作
首先你要有几个准备工作
1.安装 Flask-Migrate
pip install Flask-Migrate
2.将 Flask-Migrate 加入到 Flask 项目中 - PS: 注意了 Flask-Migrate 是要依赖 Flask-Script 组件的
1 import MyApp 2 # 导入 Flask-Script 中的 Manager 3 from flask_script import Manager 4 5 # 导入 Flask-Migrate 中的 Migrate 和 MigrateCommand 6 # 这两个东西说白了就是想在 Flask-Script 中添加几个命令和指令而已 7 from flask_migrate import Migrate,MigrateCommand 8 9 app = MyApp.create_app() 10 # 让app支持 Manager 11 manager = Manager(app) # type:Manager 12 13 # Migrate 既然是数据库迁移,那么就得告诉他数据库在哪里 14 # 并且告诉他要支持那个app 15 Migrate(app,MyApp.db) 16 # 现在就要告诉manager 有新的指令了,这个新指令在MigrateCommand 中存着呢 17 manager.add_command("db",MigrateCommand) # 当你的命令中出现 db 指令,则去MigrateCommand中寻找对应关系 18 """ 19 数据库迁移指令: 20 python manager.py db init 21 python manager.py db migrate # Django中的 makemigration 22 python manager.py db upgrade # Django中的 migrate 23 """ 24 25 26 @manager.command 27 def DragonFire(arg): 28 print(arg) 29 30 @manager.option("-n","--name",dest="name") 31 @manager.option("-s","--say",dest="say") 32 def talk(name,say): 33 print(f"{name}你可真{say}") 34 35 if __name__ == '__main__': 36 #app.run() 37 # 替换原有的app.run(),然后大功告成了 38 manager.run()
3.执行数据库初始化指令
python manager.py db init
此时你会发现你的项目目录中出现了一个好玩儿的东西