一,Flask-session
Flask-session跟框架自带的session的区别
框架自带的session是通过请求上下文放入Local中的,如果我们想把session放入别的地方怎么办,比如redis或者数据库等,Flask-session就提供了这些功能
下载安装
pip install flask-session
导入并实例化
from flask-session import Session # 导入 def create_app(): app = Flask(__name__) app.config.from_object("settings.BaseConfig") app.register_blueprint(xBule) # 实例化 Session(app) return app
配置文件
class BaseConfig(object): # ... SESSION_TYPE = "redis" SESSION_REDIS = Redis(host="127.0.0.1",port="6379")
实现原理
# 原生session的实现原理 # 请求进来先把request以及session封装到RequestContext对象中 # 调用push方法通过LocalStark放入到Local中 # 这时候放入到Local中的ctx.session还是None session_interface = self.app.session_interface self.session = session_interface.open_session(self.app,self.request) # 然后调用了session_interface中的open_session和save_session方法 # Flask-session的实现原理 # Session(app) def __init__(self,app=None): self.app = app if app is not None: self.init_app(app) # 执行了init_app方法 def init_app(self,app): # 把Session类的_get_interface(app)赋值给了app.session_interface # 相当于self.session = Session()._get_interface(app).open_session(self.app,self.request) app.session_interface = self._get_interface(app) # _get_interface方法找配置文件中的配置的类里的方法,从而实现了session存储地方的不同
二,SQLALchemy
SQLALchemy是一个基于Python的ORM框架。该框架是建立在DB-API之上,使用关系对象映射进行数据库的操作,简而言之就是将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
什么是DB-API:
Python的数据库接口规范,在没有DB-APi之前,各数据库之间的应用接口非常混乱,实现各不相同,项目需要更换数据库的时候,需要做大量的修改,非常不方便,DB-API就是为了解决这样的问题。
下载安装
pip install sqlalchemy
组成部分:
engin:框架的引擎
connectiong pooling:数据库连接池
Dialect:选择链路数据库的DB-API种类(实际选择哪个模块链接数据库)
Schema/Types:架构和类型
SQL Expression Language:SQL表达式语言
连接数据库
SQLALchemy本身无法操作数据库,其必须依赖遵循DB-API规范的三方模块
Dialect用于和数据API进行交互,根据配置的不同调用不同数据库API,从而实现数据库的操作
# 不同的数据库API # MySQL-PYthon mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> #pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] # MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> # cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] # 更多 # http://docs.sqlalchemy.org/en/latest/dialects/index.html
# 连接数据库 from sqlalchemy import create_engine engine = create_engine( "mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接数 pool_size=5, # 连接池大小 pool_timeout=30, # 连接池中没有线程最多等待时间,否则报错 pool_recycle=-1, # 多久之后对连接池中的连接进行回收(重置)-1不回收 )
执行原生SQL
from sqlalchemy import create_engine engine = create_engine( "mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8", max_overflow=0, pool_size=5, ) def test(): cur = engine.execute("select * from Course") result = cur.fetchall() print(result) cur.close() if __name__ == '__main__': test() # [(1, '生物', 1), (2, '体育', 2), (3, '物理', 1)]
ORM操作
创建表
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy import Index, UniqueConstraint import datetime ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",) Base = declarative_base() class UserInfo(Base): __tablename__ = "user_info" id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) email = Column(String(32), unique=True) create_time = Column(DateTime, default=datetime.datetime.now) __table_args__ = ( UniqueConstraint("id", "name", name="uni_id_name"), Index("name", "email") ) def create_db(): Base.metadata.create_all(ENGINE) def drop_db(): Base.metadata.drop_all(ENGINE) if __name__ == '__main__': create_db()
一对多的创建
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy import Index, UniqueConstraint, ForeignKey from sqlalchemy.orm import relationship import datetime ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",) Base = declarative_base() # ======一对多示例======= class UserInfo(Base): __tablename__ = "user_info" id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) email = Column(String(32), unique=True) create_time = Column(DateTime, default=datetime.datetime.now) # FK字段的建立 hobby_id = Column(Integer, ForeignKey("hobby.id")) # 不生成表结构 方便查询使用 hobby = relationship("Hobby", backref="user") __table_args__ = ( UniqueConstraint("id", "name", name="uni_id_name"), Index("name", "email") ) class Hobby(Base): __tablename__ = "hobby" id = Column(Integer, primary_key=True) title = Column(String(32), default="码代码") def create_db(): Base.metadata.create_all(ENGINE) def drop_db(): Base.metadata.drop_all(ENGINE) if __name__ == '__main__': create_db() # drop_db()
多对多的创建
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy import Index, UniqueConstraint, ForeignKey from sqlalchemy.orm import relationship import datetime ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",) Base = declarative_base() # ======多对多示例======= class Book(Base): __tablename__ = "book" id = Column(Integer, primary_key=True) title = Column(String(32)) # 不生成表字段 仅用于查询方便 tags = relationship("Tag", secondary="book2tag", backref="books") class Tag(Base): __tablename__ = "tag" id = Column(Integer, primary_key=True) title = Column(String(32)) class Book2Tag(Base): __tablename__ = "book2tag" id = Column(Integer, primary_key=True) book_id = Column(Integer, ForeignKey("book.id")) tag_id = Column(Integer, ForeignKey("tag.id")) def create_db(): Base.metadata.create_all(ENGINE) def drop_db(): Base.metadata.drop_all(ENGINE) if __name__ == '__main__': create_db() # drop_db()
对数据库表的操作(增删改查)
scoped_session
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session from models_demo import Tag ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",) Session = sessionmaker(bind=ENGINE) # 每次执行数据库操作的时候,都需要创建一个session # 线程安全,基于本地线程实现每个线程用同一个session session = scoped_session(Session) # =======执行ORM操作========== tag_obj = Tag(title="SQLAlchemy") # 添加 session.add(tag_obj) # 提交 session.commit() # 关闭session session.close()
基本的增删改查
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session from models_demo import Tag, UserInfo import threading ENGINE = create_engine("mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8",) Session = sessionmaker(bind=ENGINE) # 每次执行数据库操作的时候,都需要创建一个session session = Session() session = scoped_session(Session) # ============添加============ # tag_obj = Tag(title="SQLAlchemy") # # 添加 # session.add(tag_obj) # session.add_all([ # Tag(title="Python"), # Tag(title="Django"), # ]) # # 提交 # session.commit() # # 关闭session # session.close() # ============基础查询============ # ret1 = session.query(Tag).all() # ret2 = session.query(Tag).filter(Tag.title == "Python").all() # ret3 = session.query(Tag).filter_by(title="Python").all() # ret4 = session.query(Tag).filter_by(title="Python").first() # print(ret1, ret2, ret3, ret4) # ============删除=========== # session.query(Tag).filter_by(id=1).delete() # session.commit() # ===========修改=========== session.query(Tag).filter_by(id=22).update({Tag.title: "LOL"}) session.query(Tag).filter_by(id=23).update({"title": "王者毒药"}) session.query(Tag).filter_by(id=24).update({"title": Tag.title + "~"}, synchronize_session=False) # synchronize_session="evaluate" 默认值进行数字加减 session.commit()
常用操作
# 条件查询 ret1 = session.query(Tag).filter_by(id=22).first() ret2 = session.query(Tag).filter(Tag.id > 1, Tag.title == "LOL").all() ret3 = session.query(Tag).filter(Tag.id.between(22, 24)).all() ret4 = session.query(Tag).filter(~Tag.id.in_([22, 24])).first() from sqlalchemy import and_, or_ ret5 = session.query(Tag).filter(and_(Tag.id > 1, Tag.title == "LOL")).first() ret6 = session.query(Tag).filter(or_(Tag.id > 1, Tag.title == "LOL")).first() ret7 = session.query(Tag).filter(or_( Tag.id>1, and_(Tag.id>3, Tag.title=="LOL") )).all() # 通配符 ret8 = session.query(Tag).filter(Tag.title.like("L%")).all() ret9 = session.query(Tag).filter(~Tag.title.like("L%")).all() # 限制 ret10 = session.query(Tag).filter(~Tag.title.like("L%")).all()[1:2] # 排序 ret11 = session.query(Tag).order_by(Tag.id.desc()).all() # 倒序 ret12 = session.query(Tag).order_by(Tag.id.asc()).all() # 正序 # 分组 ret13 = session.query(Tag.test).group_by(Tag.test).all() # 聚合函数 from sqlalchemy.sql import func ret14 = session.query( func.max(Tag.id), func.sum(Tag.test), func.min(Tag.id) ).group_by(Tag.title).having(func.max(Tag.id > 22)).all() # 连表 ret15 = session.query(UserInfo, Hobby).filter(UserInfo.hobby_id == Hobby.id).all() # print(ret15) 得到一个列表套元组 元组里是两个对象 ret16 = session.query(UserInfo).join(Hobby).all() # print(ret16) 得到列表里面是前一个对象 # 相当于inner join # for i in ret16: # # print(i[0].name, i[1].title) # print(i.hobby.title) ret17 = session.query(Hobby).join(UserInfo, isouter=True).all() ret17_1 = session.query(UserInfo).join(Hobby, isouter=True).all() ret18 = session.query(Hobby).outerjoin(UserInfo).all() ret18_1 = session.query(UserInfo).outerjoin(Hobby).all() # 相当于left join print(ret17) print(ret17_1) print(ret18) print(ret18_1)
基于relationship的FK
# 基于relationship的FK # 添加 user_obj = UserInfo(name="提莫", hobby=Hobby(title="种蘑菇")) session.add(user_obj) hobby = Hobby(title="弹奏一曲") hobby.user = [UserInfo(name="琴女"), UserInfo(name="妹纸")] session.add(hobby) session.commit() # 基于relationship的正向查询 user_obj_1 = session.query(UserInfo).first() print(user_obj_1.name) print(user_obj_1.hobby.title) # 基于relationship的反向查询 hb = session.query(Hobby).first() print(hb.title) for i in hb.user: print(i.name) session.close()
基于relationship的M2M
# 添加 book_obj = Book(title="Python源码剖析") tag_obj = Tag(title="Python") b2t = Book2Tag(book_id=book_obj.id, tag_id=tag_obj.id) session.add_all([ book_obj, tag_obj, b2t, ]) session.commit() # 上面有坑哦~~~~ book = Book(title="测试") book.tags = [Tag(title="测试标签1"), Tag(title="测试标签2")] session.add(book) session.commit() tag = Tag(title="LOL") tag.books = [Book(title="大龙刷新时间"), Book(title="小龙刷新时间")] session.add(tag) session.commit() # 基于relationship的正向查询 book_obj = session.query(Book).filter_by(id=4).first() print(book_obj.title) print(book_obj.tags) # 基于relationship的反向查询 tag_obj = session.query(Tag).first() print(tag_obj.title) print(tag_obj.books)
三,Flask-SQLALchemy
下载安装
pip3 install flask-sqlalchemy
导入并实例化SQLALchemy
from flask_sqlalchemy import SQLALchemy db = SQLALchemy() # 必须在蓝图之前导入,否则会引发循环引用 from .views.user import Blue
初始化
def create_app(): app = Flask(__name__) app.config.from_object("settings.BaseConfig") app.register_blueprint(Blue) # 实例化session Session(app) # 初始化db db.init_app(app) return app
在配置文件写入配置信息
class BaseConfig(object): # SESSION_TYPE = "redis" # SESSION_REDIS = Redis(host="192.168.0.94",port="6379") SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:root1234@127.0.0.1:3305/database1?charset=utf8" SQLALCHEMY_POOL_SIZE = 10 SQLALCHEMY_MAX_OVERFLOW = 5 # SQLALCHEMY_TRACK_MODIFICATIONS = False
创建model
from sqlalchemy import Column,Integer,String from flask_demo import db class User(db.Model): __tablename__ = "users" id = Column(Integer,primary_key=True) name = Column(String(32),index=True,nullable=False)
生成表(需要使用app上下文)
from flask_demo import db,create_app from flask_demo.models import * # 一定要导入models否则找不到表创建不出来 app = create_app() app_ctx = app.app_context() with app_ctx: db.create_all()
基于ORM对数据库操作
from flask import Blueprint from flask_demo import db from flask_demo.models import Users Blue = Blueprint("Blue",__name__) @Blue.route("/index") def index(): # db.session.add(Users(name="gaoxin")) # db.session.commit() # db.session.remove() ret = db.session.query(Users).all() print(ret) # db.session.remove()
db.session.close() return "Index"
四,Flask-Script
下载安装
pip install flask-script
增加的功能runserver
from flask_demo import create_app from flask_script import Manager app = create_app() manager = Manager(app) if __name__ == "__main__" # app.run() manager.run() # 项目启动命令变成 python manager.py runserver -h 127.0.0.1 -p 8000
自定义命令
from flask_demo import create_app from flask_script import Manager app = create_app() manager = Manager(app) # 位置传参 @manager.command def custom(arg): # 自定义命令 python manage.py custom 123 print(arg) # 123 # 关键字传参 @manager.option("-n","--name",dest="name") @manager.option("-u","--url".dest="url") def cmd(name,url): # 自定义命令:python manage.py cmd -n alex -u dsb print(name,url) if __name__ == "__main__": # app.run() manager.run()
五,Flask-migrate
下载安装
pip3 install flask-migrate
增加的命令(依赖flask-script)
from flask_demo import create_app,db from flask_demo.models import * from flask_script import Manager from flask_migrate import Migrate,MigrateCommand app = create_app() manager = Manager(app) Migrate(app,db) """ 数据库迁移命令 python manage.py db init # 初始化 python manage.py db migrate # makemigrations python manage.py db upgrade # migrate """ manager.add_command("db",MigrateCommand) if __name__ == "__main__" # app.run() manager.run()
六,wtforms
Form组件主要应用是,帮助我们自动生成HTML,以及做表单数据的验证,类比我们django的Form组件
下载安装
pip install wtforms
自动生成HTML,生成一个Form类,实例化这个Form类,把这个实例化对象当成参数传递给前端
from wtforms import Form,widgets,validators from wtforms.fields import simple,simple,core,html5
# simple 字段类型
# core 核心字段类型
# html5 H5新增的字段类型 # 生成一个Form类 class MyForm(Form): name = simple.StringFied( label="用户名", render_kw = {"placeholder":"请输入用户名"}, widget = widgets.TextArea(), default = "sb" ) pwd = simple.PasswordField() @Blue.route("/login",methods=["GET","POST"]) def login(): if request.method == "GET": form = MyForm(data={"name":"alex"}) # 实例化这个Form类,把这个实例化对象当成参数传递给前端 return render_template("login.html",form=form)
<!--html页面--> <form action="" novalidate> 用户名: {{form.name}} 密码: {{form.pwd}} <button type="submit">提交</button> </form> <!--循环出来的页面--> <form action=""> {% for field in form %} {{field.label}}: {{field}} {% endfor %} <button type="submit">提交</button> </form>
表单数据校验
# 视图页面 class MyForm(Form): # 在Form类中增加验证信息 name = simple.StringField( label="用户名", render_kw={"placeholder":"请输入用户名"}, validators=[ validators.DataRequired(message="用户名不能为空"), validators.Length(min=6,max=18,message="用户名长度必须大于%(min)d且小于%(max)d") ], ) pwd = simple.PasswordField( label = "密码" widget = widgets.PasswordInput(), validators=[ validators.DataRequired(message="密码不能为空"), validators.Length(min=8,message="用户名长度必须大于%(min)d"), validators.Regexp(regex"^(?=.*[a-z])(?=.*[A-Z])(?=.*d)(?=.*[$@$!%*?&])[A-Za-zd$@$!%*?&]{8,}",messsage="密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符") ] ) pwd2 = simple.PasswordField(
label="确认密码"
validators=[
validators.DataRequired(message="不能为空")
validators.EqualTo("pwd",message="两次密码不一致")
]
) @Blue.route("/login",methods=["GET","POST"]) def login(): if request.method == "GET": form = MyForm(data={"name":"alex"}) return render_template("login.html",form=form) form = MyForm(formdata=request.form) if form.validate(): # 在视图中做数据的校验 并且页面展示错误信息 print(form.data) else: return render_template("login.html",form=form) return "login"
<!--循环出来的页面--> <form action="" method="post" novalidate> {% for field in form %} {{field.label}}: {{field}} {{field.errors[0]}} {% endfor %} <button type="submit">提交</button> </form>
拓展字段
注意选项字段需要去数据库取数据,还有就是从数据库取数据的实时更新
# 以用户注册为例,输入用户名,密码,重复密码,性别和爱好 class RegisterForm(Form): name = simple.StringField( label='用户名', validators=[ validators.DataRequired() ], widget=widgets.TextInput(), # render_kw={'class': 'form-control'}, default='gaoxin' ) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.') ], widget=widgets.PasswordInput(), # render_kw={'class': 'form-control'} ) pwd_confirm = simple.PasswordField( label='重复密码', validators=[ validators.DataRequired(message='重复密码不能为空.'), validators.EqualTo('pwd', message="两次密码输入不一致") ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) email = html5.EmailField( label='邮箱', validators=[ validators.DataRequired(message='邮箱不能为空.'), validators.Email(message='邮箱格式错误') ], widget=widgets.TextInput(input_type='email'), render_kw={'class': 'form-control'} ) gender = core.RadioField( label='性别', choices=( (1, '男'), (2, '女'), ), coerce=int ) city = core.SelectField( label='城市', choices=( ('bj', '北京'), ('sh', '上海'), ) ) hobby = core.SelectMultipleField( label='爱好', choices=( (1, '篮球'), (2, '足球'), ), coerce=int ) favor = core.SelectMultipleField( label='喜好', choices=( (1, '篮球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, 2] ) def __init__(self, *args, **kwargs): super(RegisterForm, self).__init__(*args, **kwargs) # 从数据库获取数据 做到实时更新 self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球')) @ac.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'GET': form = RegisterForm(data={'gender': 1}) return render_template('register.html', form=form) else: form = RegisterForm(formdata=request.form) if form.validate(): return "注册成功" else: print(form.errors) return render_template('register.html', form=form)
<!--register.html--> <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用户注册</h1> <form method="post" novalidate style="padding:0 50px"> {% for item in form %} <p>{{item.label}}: {{item}} {{item.errors[0] }}</p> {% endfor %} <input type="submit" value="提交"> </form> </body> </html>
def __init__(self,*args,**kwargs): # 有bug待修 # 类的所有静态属性只加载异常 super(RegisterForm,self).__init__(*args,**kwargs) # 从数据库获取数据,做到实时更新 self.favor.choices = 动态操作:ORM获取数据库的数据 # sqlalchemy 没有 chioces字段 # pip install sqlalchemy_utils # from sqlalchemy_utils import ChioceType category = Column(ChioceType(chioces),default=1) # 不能用flask-migrate命令,不识别sqlalchemy_utlis