wtforms
from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class LoginForm(Form): # 字段(内部包含正则表达式) name = simple.StringField( label='用户名', validators=[ # 检验规则 validators.DataRequired(message='用户名不能为空.'), # 错误信息 validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d') ], widget=widgets.TextInput(), # 页面上显示的插件类型 render_kw={'class': 'form-control'} # 设定类属性,form-control 填满 ) # 字段(内部包含正则表达式) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.'), validators.Length(min=8, message='用户名长度必须大于%(min)d'), validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*d)(?=.*[$@$!%*?&])[A-Za-zd$@$!%*?&]{8,}", message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'GET': form = LoginForm() return render_template('login.html', form=form) else: form = LoginForm(formdata=request.form) # request.form接收到的表单数据 if form.validate(): # 判断数据是否合法 print('用户提交数据通过格式验证,提交的值为:', form.data) else: print(form.errors) return render_template('login.html', form=form) if __name__ == '__main__': app.run()
login.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>登录</h1> <form method="post"> <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p> <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p> <input type="submit" value="提交"> </form> </body> </html>
from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class RegisterForm(Form): def validate_pwd_confirm (self, field): # 钩子 """ 自定义pwd_confirm字段规则,例:与pwd字段是否一致 :param field: :return: """ # 最开始初始化时,self.data中已经有所有的值 if field.data != self.data['pwd']: # field指代钩子函数中的pwd_confirm,所有数据都在self.data中 raise validators.ValidationError("密码不一致") # 继续后续验证 #raise validators.StopValidation("密码不一致123123123") # 不再继续后续验证,不会执行下面的pwd_confirm equalto方法 name = simple.StringField( label='用户名', validators=[ validators.DataRequired() ], widget=widgets.TextInput(), render_kw={'class': 'form-control'}, default='alex' # 默认值 ) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) pwd_confirm = simple.PasswordField( label='重复密码', validators=[ #validators.DataRequired(message='重复密码不能为空.'), validate_pwd_confirm, validators.EqualTo('pwd', message="两次密码输入不一致") # 判断是否和pwd相等 ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) email = html5.EmailField( label='邮箱', validators=[ validators.DataRequired(message='邮箱不能为空.'), validators.Email(message='邮箱格式错误') ], widget=widgets.TextInput(input_type='email'), render_kw={'class': 'form-control'} ) gender = core.RadioField( label='性别', choices=( (1, '男'), (2, '女'), ), coerce=int # “1” “2” # 强制为int类型 ) city = core.SelectField( label='城市', choices=( ('bj', '北京'), ('sh', '上海'), ) ) hobby = core.SelectMultipleField( label='爱好', choices=( (1, '篮球'), (2, '足球'), ), coerce=int ) favor = core.SelectMultipleField( label='喜好', choices=( (1, '篮球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, 2] ) # def __init__(self, *args, **kwargs): # super(RegisterForm, self).__init__(*args, **kwargs) # self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球')) @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'GET': form = RegisterForm() # initial return render_template('register.html', form=form) else: form = RegisterForm(formdata=request.form) # request.form接收到的表单数据 if form.validate(): print('用户提交数据通过格式验证,提交的值为:', form.data) else: print(form.errors) return render_template('register.html', form=form) if __name__ == '__main__': app.run()
register.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用户注册</h1> <form method="post" novalidate style="padding:0 50px"> {% for field in form %} <p>{{field.label}}: {{field}} {{field.errors[0] }}</p> {% endfor %} <input type="submit" value="提交"> </form> </body> </html>
flask的ORM sqlalchemy
SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
注意:sqlalchemy不支持改表,只能删除表后再创表
import datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index from sqlalchemy.orm import relationship Base = declarative_base() class Users(Base): __tablename__ = 'users' # 数据库表名称 id = Column(Integer, primary_key=True) # id 主键 name = Column(String(32), index=True, nullable=False) # name列,index索引,不可为空 age = Column(Integer) # email = Column(String(32), unique=True) #datetime.datetime.now不能加括号,加了括号,以后永远是当前时间 # ctime = Column(DateTime, default=datetime.datetime.now) # extra = Column(Text, nullable=True) __table_args__ = ( # UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一 # Index('ix_id_name', 'name', 'email'), #索引 ) def __repr__(self): # 更为底层,如果为__str__不能清晰显示 return self.name class Hobby(Base): __tablename__ = 'hobby' id = Column(Integer,primary_key=True) caption = Column(String(50),default='双色球') class Person(Base): __tablename__ = 'person' nid = Column(Integer,primary_key=True) name = Column(String(32)) # hobby是tablename而不是Hobby类名 hobby_id = Column(Integer,ForeignKey('hobby.id')) # 更新数据库没有关系,不会信增加字段,只能用于快速的链表查询操作 # relationship的第一个参数,是类名,第二个参数backref,用于反向查询 hobby = relationship('Hobby',backref='pres') def __repr__(self): return self.name class Boy2Girl(Base): __tablename__ = 'boy2girl' id = Column(Integer,primary_key=True) girl_id = Column(Integer,ForeignKey('girl.id')) boy_id = Column(Integer,ForeignKey('boy.id')) class Girl(Base): __tablename__ = 'girl' id = Column(Integer,primary_key=True) name = Column(String(100),nullable=False) def __repr__(self): return self.name class Boy(Base): __tablename__ = 'boy' id = Column(Integer,primary_key=True) name = Column(String(100),nullable=False) # secondary=boy2girl 为中间表的表名 girl = relationship('Girl',secondary='boy2girl',backref='boys') def __repr__(self): return self.name def init_db(): """ 根据类创建数据库表 :return: """ engine = create_engine( "mysql+pymysql://root:@127.0.0.1:3306/day95?charset=utf8", # "什么数据库(mysql,orcal)+用什么取链接数据库(pymysql)://数据库用户名:密码@mysqlip:端口/数据库名?charset=字符集" max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.create_all(engine) # 创建数据库 def drop_db(): """ 根据类删除数据库表 :return: """ engine = create_engine( "mysql+pymysql://root:@127.0.0.1:3306/day95?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.drop_all(engine) # 删除数据库 if __name__ == '__main__': # sqlalchemy不支持该表,只能删除表后再创表 drop_db() init_db()
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users #"mysql+pymysql://root@127.0.0.1:3306/aaa" engine = create_engine("mysql+pymysql://root:@127.0.0.1:3307/python13", max_overflow=0, pool_size=5) Connection = sessionmaker(bind=engine) # 每次执行数据库操作时,都需要创建一个Connection con = Connection() # 1 单增 # obj1 = Users(name="lsb1",age=12) # con.add(obj1) # 2 多个增加 # con.add_all([ # Users(name="lsb1",age=12), # Users(name="esb",age=40), # Users(name="jsb",age=30), # Users(name="tsb",age=12), # #Host(name = "tsb",time=123213) # ]) # 3 删除 # con.query(Users).delete() #4 改 # con.query(Users).update({"name":"sb","age":14}) # 要传字典 # 如果拼接是字符串,此处要设置synchronize_session=False # con.query(Users).update({Users.name:Users.name +" is true","age":1},synchronize_session=False) # con.query(Users).update({Users.age:Users.age + 10}) # 5查(查是不需要commit,也能拿到结果) #打印sql # r1 = con.query(Users) #查询所有 # r1 = con.query(Users).all() # #查单条记录 # r1 = con.query(Users).first() #查哪些字段 # r1 = con.query(Users.age,Users.name.label("sb")).first() # 起别名sb 打印要用print(r1.sb) #过滤用filter_by(传参数)或者filter(传表达式) # r1 = con.query(Users).filter(Users.name == "tsb").first() # con.query(Users).filter(Users.name == "tsb").update({"name": "sb", "age": 14}) r1 = con.query(Users).filter_by(name = "esb").first() print(r1) #必须提交才能生效 con.commit() #关闭链接 con.close()
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users #"mysql+pymysql://root@127.0.0.1:3306/aaa" engine = create_engine("mysql+pymysql://root:@127.0.0.1:3307/python13", max_overflow=0, pool_size=5) Connection = sessionmaker(bind=engine) # 每次执行数据库操作时,都需要创建一个Connection session = Connection() # 条件 # ret = session.query(Users).filter_by(name = "esb").all() #表达式,and 条件链接 # ret = session.query(Users).filter(Users.name == "sb",Users.age ==14 ).first() # print(ret.age,ret.name) # 表示的between,条件,30<=age<=40 # ret = session.query(Users).filter(Users.age.between(30,40)).all() # print(ret) # sql查询的in_操作,相当于django中的__in # ret =session.query(Users).filter(Users.id.in_([9,11,13])).all() # print(ret) # # sql查询取反 # ret1 = session.query(Users).filter(~Users.id.in_([9,11,13])).all() # print(ret1) #or查询 ,or和and ,做整合 from sqlalchemy import or_,and_ # ret = session.query(Users).filter(or_(Users.id == 9,Users.name=="jsb")).all() # ret = session.query(Users).filter(and_(Users.id == 9,Users.name=="lsb1")).all() # ret = session.query(Users).filter(or_( # Users.id == 9, # and_(Users.name=="jsb",Users.id==13), # # ) # ).all() # like查询, # _代表一个字符,%代表任意字符 #必须以b开头 # ret = session.query(Users).filter(Users.name.like("b%")).all() # #第二字母是b # ret = session.query(Users).filter(Users.name.like("_b%")).all() #不以b开头 # ret = session.query(Users).filter(~Users.name.like("b%")).all() #排序 #desc重大到小排序 # ret = session.query(Users).filter(Users.id>1).order_by(Users.id.desc()).all() #desc重小到大排序 #ret = session.query(Users).filter(Users.id>1).order_by(Users.id.asc()).all() #多条件排序,先以年纪从大到小排,如果年龄相同,再以id从小到大排 # ret = session.query(Users).filter(Users.id>1).order_by(Users.age.desc(),Users.id.asc()).all() # print(ret) #分组查询 # ret = session.query(Users).group_by(Users.name).all() # 再分组的时候如果要用聚合操作,就要导入func from sqlalchemy.sql import func #选出组内最小年龄要大于等于30的组 # ret = session.query(Users).group_by(Users.name).having(func.min(Users.age)>=30).all() #选出组内最小年龄要大于等于30的组,查询组内的最小年龄,最大年纪,年纪之和, ret = session.query( func.min(Users.age), # 显示组中最小值 func.max(Users.age), func.sum(Users.age), Users.name ).group_by(Users.name).having(func.min(Users.age)>=30).all() print(ret)
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Hobby,Person #"mysql+pymysql://root@127.0.0.1:3306/aaa" engine = create_engine("mysql+pymysql://root:@127.0.0.1:3307/python13", max_overflow=0, pool_size=5) Connection = sessionmaker(bind=engine) # 每次执行数据库操作时,都需要创建一个Connection session = Connection() #1添加,没有用关联关系 # session.add_all([ # Hobby(catption="淫诗"), # Hobby(catption="推背"), # Person(name="tank",hobby_id=1), # Person(name="jason",hobby_id=2) # ]) # 2添加 用关联关系 # preson = Person(name="egon",hobby=Hobby(catption="相亲")) #session.add(preson) # # hobb = Hobby(catption="人妖") # hobb.pres = [Person(name="owen"),Person(name="sean")] # session.add(hobb) #session.commit() #正向查询 # pr = session.query(Person).filter( Person.name == "tank").first() # print(pr.name) # print(pr.hobby.catption) #反向查 # v = session.query(Hobby).filter(Hobby.catption=="人妖").first() # print(v.catption) # print(v.pres) # 如果没用foreign key,为断关联。自己连表,isouter=True表示是left join,不填默认为inner join person_list = session.query( Hobby).join(Person,Person.hobby_id==Hobby.id,isouter=True) # print(person_list) session.close()
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Boy,Boy2Girl,Girl #"mysql+pymysql://root@127.0.0.1:3306/aaa" engine = create_engine("mysql+pymysql://root:@127.0.0.1:3307/python13", max_overflow=0, pool_size=5) Connection = sessionmaker(bind=engine) # 每次执行数据库操作时,都需要创建一个Connection session = Connection() #添加 # session.add_all([ # Boy(name="tank"), # Boy(name="sean"), # Girl(name="仓老师"), # Girl(name="小泽老师") # ]) # b2g = Boy2Girl(boy_id=1,girl_id=2) # session.add(b2g ) # b2g = Boy2Girl(boy_id=2,girl_id=1) # session.add(b2g ) # session.commit() # session.close() # 关联添加 # boy = Boy(name="亚峰") # boy.girl=[Girl(name="迪丽热巴"),Girl(name="三上")] # session.add(boy) # session.commit() # # girl = Girl(name="丹丹") # girl.boys=[Boy(name="吴彦祖"),Boy(name="鹿晗")] # session.add(girl) # session.commit() # 使用relationship的关系,正向查 # b = session.query(Boy).filter(Boy.name == "亚峰").first() # print(b.name) # print(b.girl) #反向查询 # g = session.query(Girl).filter(Girl.name=="丹丹").first() # print(g.name) # print(g.boys)
要用就必须先安装。 pip install flask-sqlalchemy 所有的到导入都找 下面的db from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy()
安装 pip install flask-migrate 命令:manager.add_command('db1', MigrateCommand) 1 当项目第一次执行迁移的时候。 python3 manage.py db1 init 只需要初始化一次 2 python3 manage.py db1 migrate # 等同于django的makemigrations 3 python3 manage.py db1 upgrade # 等同于django的migrate 启动项目 python manage.py runserver
示例:
settings.py
class BaseConfig(object): # SESSION_TYPE = 'redis' # session类型为redis # SESSION_KEY_PREFIX = 'session:' # 保存到session中的值的前缀 # SESSION_PERMANENT = True # 如果设置为False,则关闭浏览器session就失效。 # SESSION_USE_SIGNER = False # 是否对发送到浏览器上 session:cookie值进行加密 SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:@127.0.0.1:3306/day95?charset=utf8" SQLALCHEMY_POOL_SIZE = 5 SQLALCHEMY_POOL_TIMEOUT = 30 SQLALCHEMY_POOL_RECYCLE = -1 # 追踪对象的修改并且发送信号 SQLALCHEMY_TRACK_MODIFICATIONS = False
manage.py
""" 必须 pip install flask-migrate,,为管理数据库的版本 flask-sqlalchemy,这个orm """ from sansa import create_app from flask_script import Manager # flask_migrate管理数据迁移的 from flask_migrate import Migrate,MigrateCommand from sansa import db app = create_app() manager=Manager(app) # 将当前app,与db注册到Migrate Migrate(app,db) # 添加管理数据的命令 manager.add_command('db1', MigrateCommand) if __name__ == '__main__': # app.run() manager.run()
sansa/models.py
from . import db class Users(db.Model): """ 用户表 """ __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) email = db.Column(db.String(120), unique=True, nullable=False) ids = db.Column(db.Integer) def __repr__(self): return '<User %r>' % self.username
sansa/__init__.py
from flask import Flask from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy() # from .models import * from .views import account def create_app(): app = Flask(__name__) app.config.from_object('settings.BaseConfig') # 将db注册到app中 db.init_app(app) # 注册蓝图 app.register_blueprint(account.account) return app
sansa/views/account.py
from flask import Blueprint from .. import db from .. import models account = Blueprint('account', __name__) @account.route('/login') def login(): # db.session.add(models.Users(username='lqz', email='123')) #db.session.query(models.Users).all() # db.session.commit() # 添加示例 user_list = db.session.query(models.Users).all() for item in user_list: print(item.username,"is",item.email) return 'login'