1 flask源码分析
-ctx:request,session,
-app_ctx:current_app,g
-ctx放到了某个位置,先执行了beforefirstrerquest---》send--》before_request--》路由匹配--》执行视图函数---》after_request--》返回response,把ctx剔除
2 flask-session:放到redis中,两种方式
-第二种方式:
-在配置中配置
-在app创建后 Session(app)
-以后直接使用session对象即可
3 flask中使用mysql---》pymsql
-数据库连接池
-dbutils:生成一个池对象,每次从池对象中拿一个连接
4 wtforms
5 信号
-内置信号,当执行到某个位置会自动触发,只需要绑定一个函数即可
-自定义信号,定义个信号,绑定函数,触发
2 三层模式和DDD(了解)
1 多app应用(了解)
from werkzeug.wsgi import DispatcherMiddleware
from werkzeug.serving import run_simple
from flask import Flask, current_app
app1 = Flask('app01')
app2 = Flask('app02')
2 flask-script
2.1 基本使用
# pip3 install flask-script 自定制命令
from flask import Flask, current_app
from flask_script import Manager
app1 = Flask('app01')
manager=Manager(app1)
2.2 自定制命令
# pip3 install flask-script 自定制命令
from flask import Flask, current_app
from flask_script import Manager
app1 = Flask('app01')
manager=Manager(app1)
'''
自定制命令有什么用?
# 数据初始化
-python3 manage.py initdb xx.sql
# 从excel中直接导入数据库
-python3 manage.py excet_to_db -t=user -e=xx.xls
# 项目中使用celery
-python3 manage.py startcelerywork
-python3 manage.py startcelerywork
'''
3 SQLAlchemy介绍
1 python中的orm框架
Django's ORM
优点:易用,学习曲线短
和Django紧密集合,用Django时使用约定俗成的方法去操作数据库
缺点:不好处理复杂的查询,强制开发者回到原生SQL
紧密和Django集成,使得在Django环境外很难使用
peewee
优点:
Django式的API,使其易用
轻量实现,很容易和任意web框架集成
缺点:
多对多查询写起来不直观
SQLAlchemy
优点:
企业级 API,使得代码有健壮性和适应性
灵活的设计,使得能轻松写复杂查询
缺点:
重量级 API,导致长学习曲线
2 SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果
# 安装
pip3 install sqlalchemy
# SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件
4 简单使用
#orm的简单使用(创建表和删除表)
-第一步创建一个models.py
-在models中写类,写字段
-把表同步到数据库:Base.metadata.create_all(engine)
-把表从数据库删除:Base.metadata.drop_all(engine)
# 详情见models.py
5 一对多关系,多对多关系
# 一对多关系
class Hobby(Base):
__tablename__ = 'hobby'
id = Column(Integer, primary_key=True)
caption = Column(String(50), default='篮球')
class Person(Base):
__tablename__ = 'person'
nid = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=True)
# hobby指的是tablename而不是类名,uselist=False
hobby_id = Column(Integer, ForeignKey("hobby.id"))
# 跟数据库无关,不会新增字段,只用于快速链表操作
# 类名,backref用于反向查询
# hobby = relationship('Hobby', backref='pers')
class Boy2Girl(Base):
__tablename__ = 'boy2girl'
id = Column(Integer, primary_key=True, autoincrement=True)
girl_id = Column(Integer, ForeignKey('girl.id'))
boy_id = Column(Integer, ForeignKey('boy.id'))
class Girl(Base):
__tablename__ = 'girl'
id = Column(Integer, primary_key=True)
name = Column(String(64), unique=True, nullable=False)
class Boy(Base):
__tablename__ = 'boy'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(64), unique=True, nullable=False)
# 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以
# servers = relationship('Girl', secondary='boy2girl', backref='boys')
6 基本操作,一对多,多对多操作
6.1 基本操作
######## 单表操作
# 在users表中插入一条数据
# user=Users(name='lqz',extra='介绍',email='33@qq.com')
# session.add(user)
# 查询一条记录
# res=session.query(Users).filter(Users.id >= 4).all() # filter中是使用条件
# res=session.query(Users).filter(Users.id >= 4) # filter中是使用条件,直接打印结果看到的是原生sql
# res=session.query(Users).filter(Users.id > 0).all() # filter中是使用条件
# res=session.query(Users).filter_by(name='lqz').all() # filter_by中使用关键字
# res=session.query(Users).filter_by(name='lqz').first() # filter_by中使用关键字、
# 删除(不要all或者first后再调用delete)
# res=session.query(Users).filter_by(name='lqz').delete()
# 更新
# session.query(Users).filter(Users.id > 0).update({"name" : "lqz"})
# 类似于django的F查询
# 字符串相加
# session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
# 数字相加
# session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")
6.2 一对多
####一对多
# 增加(最基本的方式,麻烦一些)
# hobby=Hobby(caption='篮球')
# person=Person(name='张三',hobby_id=1)
# session.add_all([hobby,person])
# 使用
# 增加
# hobby=session.query(Hobby).filter_by(id=1).first()
# person=Person(name='张三',hobby=hobby)
# person=Person(name='李四',hobby_id=hobby.id)
# session.add(person)
# 查询(基于对象的跨表查询,需要借助于relationship)
# 正向查询
# person=session.query(Person).filter_by(nid=1).first()
# print(person.name)
# print(person.hobby_id)
# # hobby=session.query(Hobby).filter_by(id=person.hobby_id).first()
# print(person.hobby.caption)
# 反向查询(有relationship,指定了backref,有了hobby查询person)
# hobby = session.query(Hobby).filter_by(id=2).first()
# 如果没有relationship的反向查询
# persons = session.query(Person).filter(Person.hobby_id == hobby.id).all()
# print(persons)
# 反向查询(有relationship,指定了backref,有了hobby查询person)
# print(hobby.persons)
#
# print(hobby)
#
#
# # 把egon的hobby改成篮求
# # egon = session.query(Person).filter_by(name='egon').update({Person.hobby:hobby}) #不能使用对象直接修改
# egon = session.query(Person).filter_by(name='egon').update({Person.hobby_id:hobby.id}) #这个key
#
# print(egon)
6.3 多对多
####多对多
# girl = Girl(name='刘亦菲')
# boy = Boy(name='吴亦凡')
# session.add_all([girl, boy])
# b2g = Boy2Girl(boy_id=2, girl_id=1)
# session.add(b2g)
# 基于对象的跨表查(正向和反向)
# wyz=session.query(Boy).filter_by(name='吴彦祖').first()
# print(wyz.girls)
lyf=session.query(Girl).filter_by(name='刘亦菲').first()
print(lyf.boys)
7 基于scoped_session实现线程安全
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from models import Users, Hobby, Person, Girl, Boy, Boy2Girl
from sqlalchemy.orm import scoped_session
# 1 创建engine对象
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/bbb?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 2 生成一个session对象
Session = sessionmaker(bind=engine)
# session = Session() # 就是一个连接对象,就是一个conn
# 3 多线程并发情况下,不要使用session对象,
session = scoped_session(Session)
#全局使用这个session
# scoped_session为什么会有session对象的所有方法(for循环一个个的把session的方法写到到了scoped_session没,点的时候,没有提示)
# session.add(Users(name='ass',extra='介绍',email='33eeee@qq.com',age=20))
res=session.query(Users).filter_by(name='ass').all()
print(res)
# print(session.__dict__)
# session.commit()
# session.close()
8 基本增删查改,常用操作
# 常用api,查询
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from models import Users, Hobby, Person, Girl, Boy, Boy2Girl
from sqlalchemy.orm import scoped_session
# 1 创建engine对象
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/bbb?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 2 生成一个session对象
Session = sessionmaker(bind=engine)
session = Session() # 就是一个连接对象,就是一个conn
# 条件
# ret = session.query(Users).filter_by(name='lqz').all()
#表达式,and条件连接
# ret = session.query(Users).filter(Users.id > 1, Users.name == 'lqz').all()
# ret = session.query(Users).filter(Users.id.between(7, 9), Users.name == 'lqz').all()
# #注意下划线
# ret = session.query(Users).filter(Users.id.in_([1,3,4,6])).all()
# #~非,除。。外
# ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
# #二次筛选
# ret=session.query(Users.id,Users.name).filter_by(name='lqz').all()
# ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='lqz')))
from sqlalchemy import and_, or_
# #or_包裹的都是or条件,and_包裹的都是and条件
# ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'lqz')).all()
# ret = session.query(Users).filter(or_(Users.id >15, Users.name == 'lqz')).all()
# ret = session.query(Users).filter(
# or_(
# Users.id < 2,
# and_(Users.name == 'lqz', Users.id > 3),
# Users.extra != ""
# ))
#
#
# # 通配符,以e开头,不以e开头
# ret = session.query(Users).filter(Users.name.like('a%'))
# ret = session.query(Users).filter(~Users.name.like('l%')).all()
#
# # 限制,用于分页,区间
# ret = session.query(Users)[1:2]
#
# # 排序,根据name降序排列(从大到小)
# ret = session.query(Users).order_by(Users.name.desc()).all()
# #第一个条件重复后,再按第二个条件升序排
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
#
# # 分组
from sqlalchemy.sql import func
#
# ret = session.query(Users.name).group_by(Users.name).all()
# #分组之后取最大id,id之和,最小id
# ret = session.query(
# Users.name,
# func.max(Users.id),
# func.sum(Users.id),
# func.min(Users.id)).group_by(Users.name).all()
# select name,max(id),sum(id),min(id) from users group by name;
# #haviing筛选
# ret = session.query(
# func.max(Users.id),
# func.sum(Users.id),
# func.min(Users.id)).filter(Users.id>1).group_by(Users.name).having(func.min(Users.id) >2).all()
# select name,max(id),sum(id),min(id) as min_id from users where id >1 group by name having min_id>2;
# # 连表(默认用forinkey关联)
# ret = session.query(Hobby.id,Hobby.caption, Person.name).filter(Hobby.id == Person.hobby_id).all()
# ret = session.query(Hobby, Person).filter(Hobby.id == Person.hobby_id).all()
# select * from hobby,person where hobby.id=person.hobby_id;
# #join表,默认是inner join
# ret = session.query(Person).join(Hobby)
# select * from Person inner join Hobby on Person.hobby_id=Hobby.id;
# #isouter=True 外连,表示Person left join Favor,没有右连接,反过来即可
# ret = session.query(Person).join(Hobby, isouter=True)
###如果不指定isouter表示inner join,默认使用外键字段关联
###如果isouter=True,就是left join
### 如果想实现right join 反过来写ret = session.query(Hobby).join(Person, isouter=True)
# # 自己指定on条件(连表条件),第二个参数,支持on多个条件,用and_,同上
# ret = session.query(Person).join(Hobby,Person.nid==Hobby.id, isouter=True)
print(ret)
10 执行原生sql
import time
import threading
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# 查询
# cursor = session.execute('select * from users')
# result = cursor.fetchall()
# 添加
cursor = session.execute('insert into users(name) values(:value)',params={"value":'lqz'})
session.commit()
print(cursor.lastrowid)
session.close()
11 flask-SQLAlchemy
把sqlalchemy集成到flask中
使用步骤:(可能会因为版本出问题)
# 第一步,导入:
from flask_sqlalchemy import SQLAlchemy
# 第二部:实例化得到对象
db = SQLAlchemy()
# 第三步:注册到app中
db.init_app(app)
# 使用session
from models import User
db.session.add(User(name='lqz'))
# 建表,基类db.Model
12 flask-migrate
模拟django可以自动处理表和字段
from flask_migrate import Migrate,MigrateCommand # 2.5.3
Migrate(app,db)
manager.add_command('db', MigrateCommand)
# 在命令行下执行
'''
python3 manage.py db init 初始化:只执行一次,会在项目路径生成一个migrations文件夹
python3 manage.py db migrate 等同于 makemigartions 还没有同步到数据库
python3 manage.py db upgrade 等同于migrate 同步到数据库
'''
补充
1 导出项目的依赖
-pip3 freeze >req.txt # 把所有模块都会导出
-pip3 install pipreqs # 只导出当前项目依赖的模块
-在当前路径下生成requirements.txt
-pipreqs ./ --encoding=utf-8 #如果是window需要指定编码
-py2,和py3 兼容报错(自行百度一下:使用0.4版本以下)
2 运行给你的项目
-创建一个movie库
-在models.py中注释掉顶部两行,解开下面的导入和main,右键执行
-把表创建到数据库中,并且插入一个超级用户
-安装依赖
-启动项目
-python3 manage.py runserver