一、介绍
SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
pip3 install sqlalchemy
SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
二、基本使用
1、执行原生sql语句
见wusir
2、ORM
(1)创建数据库表
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column from sqlalchemy import Integer,String from sqlalchemy import create_engine Base = declarative_base() class Users(Base): __tablename__ = 'users' #为数据库中生成的表命名 id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) depart_id = Column(Integer) #创建数据库连接及连接池 def create_all(): engine = create_engine( "mysql+pymysql://root:2180736@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置),-1表示不回收 ) Base.metadata.create_all(engine) #找到当前py下的继承了Base的所有的类,把它在数据库中生成一张表 # # def drop_all(): # engine = create_engine( # "mysql+pymysql://root:2180736@127.0.0.1:3306/s9day120?charset=utf8", # max_overflow=0, # 超过连接池大小外最多创建的连接 # pool_size=5, # 连接池大小 # pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 # pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) # ) # Base.metadata.drop_all(engine) # if __name__ == '__main__': create_all() # drop_all()
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column from sqlalchemy import Integer,String,ForeignKey,UniqueConstraint from sqlalchemy import create_engine from sqlalchemy.orm import relationship Base = declarative_base() # ##################### 一对多示例 ######################### class Depart(Base): __tablename__ = 'depart' id = Column(Integer, primary_key=True) title = Column(String(32), index=True, nullable=False) class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) depart_id = Column(Integer,ForeignKey("depart.id")) #创建ForeignKey # 与生成表结构无关,仅用于查询方便 dp = relationship("Depart", backref='pers') #通过dp可直接进行跨表查询,backref='pers'用于反向查询 # ##################### 多对多示例 ######################### class Student(Base): __tablename__ = 'student' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) # 与生成表结构无关,仅用于查询方便:根据student2course表与Course表做关联 course_list = relationship('Course', secondary='student2course', backref='student_list') class Course(Base): __tablename__ = 'course' id = Column(Integer, primary_key=True) title = Column(String(32), index=True, nullable=False) class Student2Course(Base): __tablename__ = 'student2course' id = Column(Integer, primary_key=True, autoincrement=True) student_id = Column(Integer, ForeignKey('student.id')) course_id = Column(Integer, ForeignKey('course.id')) __table_args__ = ( UniqueConstraint('student_id', 'course_id', name='uix_stu_cou'), # 联合唯一索引 # Index('ix_id_name', 'name', 'extra'), # 联合索引 ) def create_all(): engine = create_engine( "mysql+pymysql://root:2180736@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.create_all(engine) def drop_all(): engine = create_engine( "mysql+pymysql://root:2180736@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == '__main__': # drop_all() create_all()
(2)操作数据库表
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users # 连接数据库 engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) SessionFactory = sessionmaker(bind=engine) # 每次执行数据库操作时,都需要创建一个session session = SessionFactory() # ############# 执行ORM操作 ############# obj1 = Users(name="alex1") session.add(obj1) # 提交事务 session.commit() # 关闭session session.close()
基本增删改查示例:
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users #连接数据库 engine = create_engine( "mysql+pymysql://root:2180736@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) # 根据Users类对users表进行增删改查 session = SessionFactory() # ############################## 基本增删改查 ############################### # 1. 增加 #增加单条数据 #创建对象 # obj = Users(name='alex') # session.add(obj) # session.commit() #增加多条数据 # session.add_all([ # Users(name='小东北'), # Users(name='龙泰') # ]) # session.commit() # 2. 查 # result = session.query(Users).all() # print(result) ''' [<models.Users object at 0x038C1710>, <models.Users object at 0x038C1750>, <models.Users object at 0x038C1790>] ''' # for row in result: # print(row.id,row.name) ''' 1 alex 2 小东北 3 龙泰 ''' #加条件进行查找 # result = session.query(Users).filter(Users.id >= 2) # for row in result: # print(row.id,row.name) ''' 2 小东北 3 龙泰 ''' # result = session.query(Users).filter(Users.id >= 2).first() # print(result) #<models.Users object at 0x03E25890> # 3.删 # session.query(Users).filter(Users.id >= 2).delete() # session.commit() # 4.改 # session.query(Users).filter(Users.id == 4).update({Users.name:'东北'}) # session.query(Users).filter(Users.id == 4).update({'name':'小东北'}) session.query(Users).filter(Users.id == 4).update({'name':Users.name+"DSB"},synchronize_session=False) #默认 synchronize_session = "evaluate" 进行数值的计算 session.commit() session.close()
# ############################## 常用操作 ############################### # 1. 指定列 # select id,name as cname from users; result = session.query(Users.id,Users.name.label('cname')).all() for item in result: print(item) print(item[0],item.id,item.cname) ''' (1, 'alex') 1 1 alex (4, '小东北DSB') 4 4 小东北DSB ''' # 2. 默认条件and session.query(Users).filter(Users.id > 1, Users.name == 'eric').all() # 3. between session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all() # 4. in session.query(Users).filter(Users.id.in_([1,3,4])).all() session.query(Users).filter(~Users.id.in_([1,3,4])).all() # 相当于 not in # 5. 子查询 session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='eric'))).all() # 6. and 和 or from sqlalchemy import and_, or_ session.query(Users).filter(Users.id > 3, Users.name == 'eric').all() #默认是and关系 session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all() session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all() session.query(Users).filter( or_( Users.id < 2, and_(Users.name == 'eric', Users.id > 3), Users.extra != "" )).all() # 7. filter_by session.query(Users).filter_by(name='alex').all() # 8. 通配符 ret = session.query(Users).filter(Users.name.like('e%')).all() # ret = session.query(Users).filter(~Users.name.like('e%')).all() # 9. 切片 result = session.query(Users)[1:2] # 10.排序 ret = session.query(Users).order_by(Users.name.desc()).all() ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() #先按name从大到小排,如果name重名按照id从小到大排 # 11. group by from sqlalchemy.sql import func ret = session.query( Users.depart_id, func.count(Users.id), ).group_by(Users.depart_id).all() print(ret) #[(1, 2), (2, 1), (3, 1)] for item in ret: print(item) ''' (1, 2) (2, 1) (3, 1) ''' #having 根据聚合条件进行二次筛选 from sqlalchemy.sql import func ret = session.query( Users.depart_id, func.count(Users.id), ).group_by(Users.depart_id).having(func.count(Users.id) >= 2).all() for item in ret: print(item) (1, 2) # # 12.组合 # union 去重 # union all:上下拼接不去重 # """ # select id,name from users # UNION # select id,name from users; # """ q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all() q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union_all(q2).all()
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users,Depart engine = create_engine( "mysql+pymysql://root:2180736@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) # 根据Users类对users表进行增删改查 session = SessionFactory() # ##################### 一对多示例 ######################### # 1. 查询所有用户 ret = session.query(Users).all() for row in ret: print(row.id,row.name,row.depart_id) # 2. 查询所有用户+所属部门名称 ret = session.query(Users.id,Users.name,Depart.title).join(Depart,Users.depart_id == Depart.id).all() for row in ret: print(row.id,row.name,row.title) # query = session.query(Users.id,Users.name,Depart.title).join(Depart,Users.depart_id == Depart.id,isouter=True) ## 默认是INNER JOIN ,加上isouter=True 变为LEFT JOIN # print(query) ''' # SELECT users.id AS users_id, users.name AS users_name, depart.title AS depart_title # FROM users LEFT OUTER JOIN depart ON users.depart_id = depart.id ''' # 3. relation字段:查询所有用户+所属部门名称 --跨表操作 dp = relationship("Depart", backref='pers') ret = session.query(Users).all() for row in ret: print(row.id,row.name,row.depart_id,row.dp.title) # 4. relation字段:查询销售部所有的人员 --跨表操作 --反向查询 obj = session.query(Depart).filter(Depart.title == '销售').first() for row in obj.pers: print(row.id,row.name,obj.title) # 5. 创建一个名称叫:IT部门,再在该部门中添加一个员工:田硕 # 方式一: d1 = Depart(title='IT') #创建部门 session.add(d1) session.commit() u1 = Users(name='田硕',depart_id=d1.id) #创建员工 session.add(u1) session.commit() # 方式二: u1 = Users(name='田硕',dp=Depart(title='IT')) #一次性创建关联的数据 session.add(u1) session.commit() # 6. 创建一个名称叫:王者荣耀,再在该部门中添加一个员工:龚林峰/长好梦/王爷们 d1 = Depart(title='王者荣耀') #创建部门 d1.pers = [Users(name='龚林峰'),Users(name='长好梦'),Users(name='王爷们'),] #部门中再创建三个人 session.add(d1) session.commit() session.close()
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Student,Course,Student2Course,Users engine = create_engine( "mysql+pymysql://root:2180736@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) # 根据Users类对users表进行增删改查 session = SessionFactory() # ##################### 多对多示例 ######################### # 1. 录入数据 # session.add_all([ # Student(name='先用'), # Student(name='佳俊'), # Course(title='生物'), # Course(title='体育'), # ]) # session.commit() # session.add_all([ # Student2Course(student_id=2,course_id=1) # ]) # session.commit() # 2. 三张表关联 # ret = session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id,isouter=True).join(Course,Student2Course.course_id==Course.id,isouter=True).order_by(Student2Course.id.asc()) # for row in ret: # print(row) # 3. 查找“先用”选的所有课 #方式一 麻烦 # ret = session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id,isouter=True).join(Course,Student2Course.course_id==Course.id,isouter=True).filter(Student.name=='先用').order_by(Student2Course.id.asc()).all() # print(ret) # ##################### relationship的使用 ######################### # course_list = relationship('Course', secondary='student2course', backref='student_list') # 方式二 # obj = session.query(Student).filter(Student.name=='先用').first() #先找到先用这个人 # for item in obj.course_list: #再拿到和他相关联的课程 # print(item.title) # 4. 选了“生物”的所有人 # obj = session.query(Course).filter(Course.title=='生物').first() # for item in obj.student_list: # print(item.name) # 5. 创建一个课程,创建2学生,两个学生选新创建的课程。 obj = Course(title='英语') obj.student_list = [Student(name='为名'),Student(name='广宗')] session.add(obj) #上面会帮我们在表的内部创建关系,帮我们对三张表做操作: #在Course表增加一条数据 #在Student表增加两条数据 #在关系表中增加两条数据 session.commit() session.close()
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session engine = create_engine( "mysql+pymysql://root:2180736@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) session = scoped_session(SessionFactory) def task(): """""" # 方式一: """ # 查询 # cursor = session.execute('select * from users') # result = cursor.fetchall() # 添加 cursor = session.execute('INSERT INTO users(name) VALUES(:value)', params={"value": 'wupeiqi'}) session.commit() print(cursor.lastrowid) """ # 方式二: """ # conn = engine.raw_connection() # cursor = conn.cursor() # cursor.execute( # "select * from t1" # ) # result = cursor.fetchall() # cursor.close() # conn.close() """ # 将连接交还给连接池 session.remove() from threading import Thread for i in range(20): t = Thread(target=task) t.start()
三、SQLALchemy连接的两种方式
方式1:把session = SessionFactory()定义在函数里面
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Student,Course,Student2Course engine = create_engine( "mysql+pymysql://root:2180736@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) def task(): # 去连接池中获取一个连接 session = SessionFactory() ret = session.query(Student).all() # 将连接交还给连接池 session.close() from threading import Thread for i in range(20): t = Thread(target=task) t.start()
方式2:推荐,基于Threading.local实现
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session from models import Student,Course,Student2Course engine = create_engine( "mysql+pymysql://root:2180736@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) session = scoped_session(SessionFactory) ''' { 1211:连接1, 1212:连接2, 1213:连接3, 1214:连接4, 1215:连接5, 1216:连接6, } ''' def task(): ret = session.query(Student).all() #执行session.query的时候自动会去连接池中拿一个连接过来,放到当前线程的threading.local中去 # 将连接交还给连接池 session.remove() from threading import Thread for i in range(20): t = Thread(target=task) t.start()