Flask默认并没有提供任何数据库操作的API。
Flask中可以自己的选择数据,用原生语句实现功能,也可以选择ORM(SQLAlchemy,MongoEngine)
原生SQL缺点
代码利用率低,条件复杂代码语句越长,有很多相似语句
一些SQL是在业务逻辑中拼出来的,修改需要了解业务逻辑
直接写SQL容易忽视SQL问题。
一、orm
将对对象的操作转换为原生SQL
1、优点
易用性,可以有效减少重复SQL,性能损耗少设计灵活,可以轻松实现复杂查询,移植性好
Python中的orm是SQLAlchemy
针对于Flask的支持
pip install flask-sqlalchemy
2、连接数据库
dialect+driver://username:password@host:port/database
dialect数据库实现
driver数据库的驱动
username
password
host
port
database
连接数据库需要指定配置
app.config[‘SQLALCHEMY_DATABASE_URI’] = DB_URI
app.config[‘SQLALCHEMY_TRAKE_MODIFICATIONS’]=False
3、创建模型
class User(db.Model): __tablename__ = "UserModel" # 指定表名,默认是类名 id = db.Column(db.Integer, primary_key=True, autoincrement=True) u_name = db.Column(db.String(16), unique=True) u_des = db.Column(db.String(128), nullable=True)
(1)、字段类型
Integer
SmallInteger
BigInteger
Float
Numeric
String
Text
Unicode
Unicode Text
Boolean
Date
Time
DateTime
Interval
LargeBinary
(2)、常见约束
primary_key
autoincrement
unique
index
nullable
default
ForeignKey()
(3)、数据操作
db.create_all() 创建数据库
db.drop_all() 删除数据库
①、数据插入
数据插入是在事务中处理
db.session.add(object)
db.session.add_all(list[object])
db.session.commit()
@api.route('/adduser/') def adduser(): users = [] for i in range(5): user = User() user.u_name = "小花%d" % random.randrange(10000) users.append(user) db.session.add_all(users) db.session.commit() return 'Add success'
②、数据删除
db.session.delete(object)
db.session.commit()
修改和删除基于查询。
(4)、模型继承
默认继承并不会报错,它会将多个模型的数据映射到一张表中,导致数据混乱,不能满足基本使用
抽象的模型是不会在数据库中产生映射的
class Animal(db.Model): __abstract__ = True id = db.Column(db.Integer, primary_key=True, autoincrement=True) a_name = db.Column(db.String(16)) class Dog(Animal): d_legs = db.Column(db.Integer, default=4) class Cat(Animal): c_eat = db.Column(db.String(32), default='fish')
(5)、模型迁移
python manager.py db init #初次迁移,生成migration包 python manager.py db migrate # 创建迁移 python manager.py db upgrade # 更新
(6)、数据查询
①、查询单个对象
first
get
get_or_404
@api.route('/getuser/<int:id>/') def get_user(id): user = User.query.get(id) print(user) return 'GET success'
②、查询结果集
all:比较特殊,返回列表
@api.route('/getusers/') def get_users(): users = User.query.all() for user in users: print(user.u_name) return 'get success'
filter:BaseQuery对象
运算符:
contains
startswith
endswith
in_
like
__gt__
__ge__
__lt__
__le__
条件:
- 类名.属性名.魔术方法(临界值)
@api.route('/getdog/') def getdog(): dogs = Dog.query.filter(Dog.id.__le__(5)) for dog in dogs: print(dog.id, dog.a_name) return 'GET SUCCESS'
- 类名.属性名 操作符运算符 临界值
@api.route('/getdog/') def getdog(): dogs = Dog.query.filter(Dog.id > 5) for dog in dogs: print(dog.id, dog.a_name) return 'GET SUCCESS'
@api.route('/getdog/') def getdog(): dogs = Dog.query.filter(Dog.a_name.contains("2")) for dog in dogs: print(dog.id, dog.a_name) return 'GET SUCCESS'
offset和limit不区分顺序,都是先执行offset
@api.route('/getdog/') def getdog(): dogs = Dog.query.offset(5).limit(4) for dog in dogs: print(dog.id, dog.a_name) return 'GET SUCCESS'
- order_by 调用必须在 offset和limit 之前
使用offset以及limit实现分页
@api.route('/getdogs/') def get_dogs(): page = request.args.get("page", 1, type=int) per_page = request.args.get('per_page', 4, type=int) dogs = Dog.query.offset(per_page * (page - 1)).limit(per_page) return render_template('Dogs.html', dogs=dogs)
paginate实现分页
@api.route('/getdogs/') def get_dogs_with_page(): # dogs = Dog.query.paginate().items pagination = Dog.query.paginate() per_page = request.args.get('per_page', 4, type=int) return render_template('Dogs.html', pagination=pagination, per_page=per_page)
<div class=pagination> {% for page in pagination.iter_pages(left_edge=5,left_current=5,right_current=5,right_edge=5) %} {% if page %} {% if page != pagination.page %} <a href="{{ url_for('api.get_dogs_with_page') }}?page={{ page }}&per_page={{ per_page }}">{{ page }}</a> {% else %} <strong>{{ page }}</strong> {% endif %} {% else %} <span class=ellipsis>…</span> {% endif %} {% endfor %} </div>
filter_by
用在级联数据上,条件语法精准,字段 = 值
@blue.route('/getcatsfilterby/') def get_cats_filter_by(): cats = Cat.query.filter_by(id = 5) return render_template('Cats.html', cats=cats)
(7)级联数据
class Customer(db.Model): id = db.Column(db.Integer, primary_key=True, autoincrement=True) c_name = db.Column(db.String(16)) addresses = db.relationship('Address', backref='customer', lazy=True) class Address(db.Model): id = db.Column(db.Integer, primary_key=True, autoincrement=True) a_position = db.Column(db.String(128)) a_customer_id = db.Column(db.Integer, db.ForeignKey(Customer.id))
①添加数据
@api.route('/getcustomer/') def get(): customer = Customer.query.order_by(desc('id')).first() return str(customer.id) @api.route('/addaddress/') def add_address(): address = Address() address.a_position = '秀水街 %s' % random.randrange(10000) address.a_customer_id = Customer.query.order_by(desc('id')).first().id # 注意此处使用id的倒叙,不能直接用‘-id’ db.session.add(address) db.session.commit() return 'Address Add Success %s' % address.a_position
②查询数据
根据地址找到对应的人
@api.route('/getcustomer/') def get(): a_id = request.args.get('a_id', type=int) address = Address.query.get(a_id) customer = Customer.query.get(address.a_customer_id) #维护关系表中的外键存的是不维护关系表中的主键, return customer.c_name
根据人找到对应的地址
@api.route('/getaddress/') def get_address(): c_id = request.args.get('c_id') customer = Customer.query.get(c_id) # addresses = Address.query.filter_by(a_customer_id=customer.id) addresses = customer.addresses return render_template('address.html', addresses=addresses)
<ul> {% for address in addresses %} <li>{{ address.a_position }}</li> {% endfor %} </ul>
③逻辑运算
filter多个条件
@api.route('/getaddress/')
def get_address():
addresses = Address.query.filter(Address.a_customer_id.__eq__(1)).filter(Address.a_position.endswith('4'))
return render_template('address.html', addresses=addresses)
与 and
filter(and_(条件),条件…)
@api.route('/getaddress/')
def get_address():
addresses = Address.query.filter(and_(Address.a_customer_id.__eq__(1),Address.a_position.endswith('4')))
return render_template('address.html', addresses=addresses)
Django中可以将字段直接写在filter中,无需使用and_
或
or_
filter(or_(条件),条件…)
非
not_
filter(not_(条件),条件…)
@api.route('/getaddress/')
def get_address():
addresses = Address.query.filter(not_(or_(Address.a_customer_id.__eq__(1), Address.a_position.endswith('4'))))
return render_template('address.html', addresses=addresses)
二、缓存
pip install Flask-Caching
在ext.py中进行配置
from flask_caching import Cache
from flask_migrate import Migrate
from flask_session import Session
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
migrate = Migrate()
cache = Cache(config={
"CACHE_TYPE": "redis" # 默认是连接本地,可以设置远程。
})
# cache=Cache() # 配置可以写在settings中的Config类中
def init_ext(app):
db.init_app(app)
migrate.init_app(app, db)
Session(app)
cache.init_app(app)
在视图中使用
@api.route('/getaddress/')
@cache.cached(timeout=60)
def get_address():
addresses = Address.query.filter(not_(or_(Address.a_customer_id.__eq__(1), Address.a_position.endswith('4'))))
print('数据库中获取')
return render_template('address.html', addresses=addresses)