1. 介绍
SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
2. orm单表操作
2.1 orm操作数据表
from coreschema import Integer from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Users(Base): __tablename__ = "users" id = Column(Integer, primary_key=True) name = Column(String(32), nullable=False, index=True) depart_id = Column(Integer) # 可以将 engine 提取出来共用 engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=10, # 池中没有线程时最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) def create_tb(): """ 创建表 :return: """ engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=10, # 池中没有线程时最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置),-1表示不重置 ) Base.metadata.create_all(engine) # create_all() 必须是这个函数名, 表示创建表 def drop_tb(): """ 删除表 :return: """ engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=10, # 池中没有线程时最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.drop_all(engine) # 执行操作 create_tb() drop_tb()
2.2 orm操作数据行
from sqlalchemy import create_engine, and_, or_ from sqlalchemy.orm import sessionmaker from models import Users engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=10, # 池中没有线程时最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) )
# 创造一个连接池 SessionFactory = sessionmaker(bind=engine) # 每次进行数据库操作时都需要创建一个session, 表示从连接池中获取到一个连接 session = SessionFactory() ###################### 单表的 crud 操作 ''' # 1 增加 user = Users(name="张三") session.add(user) session.commit() # 批量增加 session.add_all([ Users(name="李四"), Users(name="王五"), Users(name="赵六"), ]) session.commit() # 2 查找 result = session.query(Users).all() print(result) # [Users object, Users object, Users object, .....] for item in result: print(item.id, "----",item.name) result = session.query(Users).filter(Users.id >= 3) print(result) # sql 语句 for i in result: print(i.name) result = session.query(Users).filter(Users.id >= 3).first() print(result.name) # 3 删除 ret = session.query(Users).filter(Users.id > 3).delete() print(ret) # 1 返回删除的数据量 session.commit() # 4 修改 ret = session.query(Users).filter(Users.id == 1).update({Users.name:"张三三"}) print(ret) # 1 返回修改的数据量 session.commit() session.query(Users).filter(Users.id == 2).update({"name":"李思思"}) session.commit() session.query(Users).filter(Users.id == 3).update({"name":Users.name+"老五"}, synchronize_session=False) # 当要拼接字符串时候, 需要加参数 synchronize_session=False # 数字相加时, 不需要这个参数 session.commit() ###################### 单表的其他操作 # 1 获取指定字段信息 ret = session.query(Users.id,Users.name,Users.name.label("nick_name")).all() print(ret) for i in ret: print(i[0],i.name,i.nick_name) #2 and, 两种方式一样 ret = session.query(Users).filter(Users.id==2,Users.name=="李思思").all() print(ret) ret = session.query(Users).filter(and_(Users.id==2,Users.name=="李思思")).all() print(ret) # 3 or ret = session.query(Users).filter(or_(Users.id==1,Users.id==2)).all() print(ret) # and 和 or 结合使用 ret = session.query(Users).filter(or_( Users.id == 1, and_(Users.name=="李思思", Users.id>=1), )).all() print(ret) # 4 between, 闭区间, 包括前后范围(1,3) ret = session.query(Users).filter(Users.id.between(1,3)).all() print(ret) # 5 in 和 ~in(就是not in) ret = session.query(Users).filter(Users.id.in_([1,2,3,4])).all() print(ret) ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all() print(ret) # 6 子查询 ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(or_(Users.id>2, Users.name=="张三三")))).all() print(ret) # 7 filter_by, 里面是参数, 不是语句(比如 name == "李思思") ret = session.query(Users).filter_by(name="李思思").all() print(ret) # 8 通配符 ( % :表示0个或多个, _ :表示一个) ret = session.query(Users).filter(Users.name.like("李%")).all() print(ret) ret = session.query(Users).filter(~Users.name.like("李%")).all() print(ret) ret = session.query(Users).filter(Users.name.like("李__")).all() print(ret) # 9 切片 ret = session.query(Users).filter(Users.id>=1)[1:3] print(ret) # 10 order_by 排序 ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 11 group_by 分组 from sqlalchemy.sql import func ret = session.query( Users.depart_id, func.count(Users.id) ).group_by(Users.depart_id).all() for i in ret: print(i) # group_by 和 having 结合使用 from sqlalchemy.sql import func ret = session.query( Users.id, func.count(Users.id) ).group_by(Users.depart_id).having(func.count(Users.id)>=2).all() print(ret) for i in ret: print(i) # 12 union 和 union_all # select id,name from users UNION select id,name from users; q1 = session.query(Users.name).filter(Users.id > 3) q2 = session.query(Users.name).filter(Users.id >= 1) ret = q1.union(q2).all() # union() 去重 print(ret) # [('赵六',), ('张三三',), ('李思思',), ('王五老五',)] q1 = session.query(Users.name).filter(Users.id > 3) q2 = session.query(Users.name).filter(Users.id >= 1) ret = q1.union_all(q2).all() # union_all() 不去重 print(ret) # [('赵六',), ('张三三',), ('李思思',), ('王五老五',), ('赵六',)] '''
3. orm多表操作
3.1 一对多操作
操作数据表
from sqlalchemy import Column, create_engine, UniqueConstraint from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Integer, String, ForeignKey from sqlalchemy.orm import relationship Base = declarative_base() class Depart(Base): __tablename__ = "depart" id = Column(Integer, primary_key=True) title = Column(String(32), nullable=True, index=True) class Users(Base): __tablename__ = "users" id = Column(Integer, primary_key=True) name = Column(String(32), nullable=True, index=True) dep_id = Column(Integer, ForeignKey("depart.id")) dp = relationship("Depart", backref="uuss") def create_tb(): engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=10, # 池中没有线程时最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.create_all(engine) create_tb()
注意: 为什么有了 foreignkey 外键, 还要 relationship ? 这两者什么关系?
relationship 是为了简化联合查询 join 等, 创建的两个表之间的虚拟关系, 这种关系与标的结构时无关的. 他与外键十分相似, 确实, relationship 必须在外键的基础上才允许使用, 不然会报错.
如果上面 User 模型类中只存在 foreignkey, 需要使用手写 join 才能取出数据; 而如果有 relationship 则可以使两张表之间产生关联, 类似于合成一张表, 可以直接取出数据而不需要手写 join 操作, 具体使用如下.
操作数据行
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from models import Users, Depart engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=10, # 池中没有线程时最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) session = SessionFactory() """ # 1 查询所有用户 ret = session.query(Users).all() print(ret) # 2 查询所有用户及对应的部门(不用relationship的操作) ret = session.query(Users,Depart).join(Depart).all() for i in ret: print(i[0].id, i[0].name, i[1].title) ret = session.query(Users.id, Users.name, Depart.title).join(Depart, Users.dep_id == Depart.id).all() # 默认有 Users.dep_id == Depart.id, 并且是 inner join for i in ret: print(i.id, i.name, i.title) ret = session.query(Users.id, Users.name, Depart.title).join(Depart, Users.dep_id == Depart.id, isouter=True).all() # isouter=True , 表示 左外连接 for i in ret: print(i.id, i.name, i.title) # 3 relationship 使用(有了relationship后就可以不用手写join()连表) # 正查: 查询每个人及其所在部门名称 ret = session.query(Users).all() for i in ret: print(i.id, i.name, i.dp.title) # 反查: 查询it部门的人员名称 ret = session.query(Depart).filter(Depart.title=="it").first() for i in ret.uuss: print(ret.title, i.id, i.name) # 4 创建一个研发部门, 并在该部门中添加一个员工--霸天虎 user = Users(name="霸天虎", dp=Depart(title="研发")) session.add(user) session.commit() """ # 5 创建一个汽车人的部门, 并添加员工--大黄蜂, 威震天, 小蜜蜂 depart = Depart(title="汽车人") depart.uuss = [Users(name="威震天"), Users(name="大黄蜂"), Users(name="小蜜蜂")] session.add(depart) session.commit()
3.2 多对多操作
操作数据表
from sqlalchemy import Column, create_engine, UniqueConstraint from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Integer, String, ForeignKey from sqlalchemy.orm import relationship Base = declarative_base() class Student(Base): __tablename__ = "student" id = Column(Integer, primary_key=True) name = Column(String(32), nullable=True, index=True) course_lst = relationship("Course", secondary="student2course", backref="student_lst") # 添加了多对多信息 class Course(Base): __tablename__ = "course" id = Column(Integer, primary_key=True) title = Column(String(32), nullable=True, index=True) class Student2Course(Base): __tablename__ = "student2course" id = Column(Integer, primary_key=True) sid = Column(Integer, ForeignKey("student.id")) cid = Column(Integer, ForeignKey("course.id")) __table_args__ = ( UniqueConstraint('sid', 'cid', name='unix_stu_cou'), # 联合唯一索引 # Index('ix_id_name', 'name', 'extra'), # 联合索引 )
操作数据行
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from models import Users, Depart, Student, Course, Student2Course engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=10, # 池中没有线程时最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) session = SessionFactory() """ # 1 录入数据 session.add_all([ Student(name="猫女"), Student(name="薇恩"), Course(title="ad"), Course(title="ap") ]) session.commit() session.add_all([ Student2Course(sid=1,cid=1), Student2Course(sid=1,cid=2), Student2Course(sid=2,cid=1), ]) session.commit() # 2 每个学生选的所有课程名 (不用多对多字段) ret = session.query(Student2Course.id, Student.id, Student.name, Course.title).join(Student,Student2Course.sid==Student.id,isouter=True).join(Course,Student2Course.cid==Course.id,isouter=True).all() print(ret) for i in ret: print(i) # 3. 猫女 选的所有课 (不用多对多字段) ret = session.query(Student2Course.id, Student.id, Student.name, Course.title).join(Student,Student2Course.sid==Student.id,isouter=True).join(Course,Student2Course.cid==Course.id,isouter=True).filter(Student.name=="猫女").all() for i in ret: print(i) # 3. 猫女 选的所有课 (用了多对多字段) student = session.query(Student).filter(Student.name=="猫女").first() for i in student.course_lst: print(student.name, i.title) # 4. 选了 ap课 的所有人 course = session.query(Course).filter(Course.title=="ad").first() for i in course.student_lst: print(course.title, i.name) """ # 5. 创建一个课程,创建2学生并选择该课程 course = Course(title="ye") course.student_lst = [Student(name="小强"), Student(name="狮子狗"), Student(name="斯巴达")] session.add(course) session.commit()
4. 多线程连接数据库连接池
4.1 方式1: scoped_session ( 推荐使用 )
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session from models import Student,Course,Student2Course engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=10, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) # scoped_session里面维护着一个 threading.local(), 为每一个线程独立开辟一个空间,在这里就可以放每一个线程的连接 session = scoped_session(SessionFactory) def task(): # 进行数据库操作 ret = session.query(Student).all() # 将连接交还给连接池 session.remove() from threading import Thread for i in range(20): t = Thread(target=task) t.start()
4.2 方式2: 在执行函数中获取连接
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Student, Course, Student2Course engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=10, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) def task(): # 去连接池中获取一个连接 session = SessionFactory() # 数据库操作 ret = session.query(Student).all() # 将连接交还给连接池 session.close() from threading import Thread for i in range(20): t = Thread(target=task) t.start()