SQLAlchemy是一个基于Python的ORM框架。该框架是建立在DB-API之上,使用关系对象映射进行数据库操作。
简而言之就是,将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
补充:什么是DB-API ? 是Python的数据库接口规范。
在没有DB-API之前,各数据库之间的应用接口非常混乱,实现各不相同,
项目需要更换数据库的时候,需要做大量的修改,非常不方便,DB-API就是为了解决这样的问题。
1
|
pip install sqlalchemy |
组成部分: -- engine,框架的引擎 -- connection pooling 数据库连接池 -- Dialect 选择链接数据库的DB-API种类(实际选择哪个模块链接数据库) -- Schema/Types 架构和类型 -- SQL Expression Language SQL表达式语言
连接数据库
SQLAlchemy 本身无法操作数据库,其必须依赖遵循DB-API规范的三方模块,
Dialect 用于和数据API进行交互,根据配置的不同调用不同数据库API,从而实现数据库的操作。

# MySQL-PYthon mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> #pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] # MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> # cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] # 更多 # http://docs.sqlalchemy.org/en/latest/dialects/index.html 不同的数据库API

from sqlalchemy import create_engine engine = create_engine( "mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接数 pool_size=5, # 连接池大小 pool_timeout=30, # 连接池中没有线程最多等待时间,否则报错 pool_recycle=-1, # 多久之后对连接池中的连接进行回收(重置)-1不回收 )
执行原生SQL
from sqlalchemy import create_engine engine = create_engine( "mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接数 pool_size=5, # 连接池大小 pool_timeout=30, # 连接池中没有线程最多等待时间,否则报错 pool_recycle=-1, # 多久之后对连接池中的连接进行回收(重置)-1不回收 ) def test(): conn = engine.raw_connection() cursor = conn.cursor() cursor.execute("select * from Course") result = cursor.fetchall() print(result) cursor.close() conn.close() if __name__ == '__main__': test() # ((1, '生物', 1), (2, '体育', 2), (3, '物理', 1)) raw_connection raw_connection
from sqlalchemy import create_engine engine = create_engine( "mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8", max_overflow=0, pool_size=5, ) def test(): conn = engine.contextual_connect() with conn: cur = conn.execute( "select * from Course" ) result = cur.fetchall() print(result) if __name__ == '__main__': test() # [(1, '生物', 1), (2, '体育', 2), (3, '物理', 1)] contextual_connect contextual_connect
from sqlalchemy import create_engine engine = create_engine( "mysql+pymysql://root:root1234@127.0.0.1:3306/code_record?charset=utf8", max_overflow=0, pool_size=5, ) def test(): cur = engine.execute("select * from Course") result = cur.fetchall() print(result) cur.close() if __name__ == '__main__': test() # [(1, '生物', 1), (2, '体育', 2), (3, '物理', 1)] engine.execute engine.execute
ORM操作
一、创建表
# 单表的创建 app.py from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy import Index, UniqueConstraint import datetime # sqlalchemy要依赖pysysql,用户名,密码,ip,端口号,数据库名字,编码方式 ENGINE = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/database111?charset=utf8",) Base = declarative_base() class UserInfo(Base): __tablename__ = "user_info" # 表的名字就叫user_info id = Column(Integer, primary_key=True, autoincrement=True) # 整数,默认主键,自增 name = Column(String(32), index=True, nullable=False) # 字符串 extra = Column(String(32), unique=True) # 字符串def create_db(): # 创建表 Base.metadata.create_all(ENGINE) # 就是将继承的Base的类的所有的表都创建,创建到ENGINE数据库,就上上面那个mysql+pymysql数据库 def drop_db(): # 删除表 Base.metadata.drop_all(ENGINE) if __name__ == '__main__': create_db()
单表的增加数据
# ad.py
import app from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session # 创建连接 ENGINE = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/db111?charset=utf8",) Session = sessionmaker(bind=ENGINE) # 每次执行数据库操作的时候,都需要创建一个session,就会开辟内存空间放这个session session = Session() # 单条数据增加 obj1 = app.UserInfo(name="xiaoming", extra="bangbangbang") # 先实例化一个对象,这样就拿到一个对象 obj2 = app.UserInfo(name="xiaojun", extra="bangbang") # 先实例化一个对象,这样就拿到一个对象 session.add(obj1) # 把这两个对象传给session了 session.add(obj2) # 把这俩对象放内存了 session.commit() # 就把这个数据提交到数据库了
session.close() # 关闭连接
基于SQLAlchemy操作原生SQL
from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/db111?charset=utf8") cur = engine.execute('select * from user_info') # 执行原生sql result = cur.fetchall() # 拿到所有的数据 print(result)
# 打印结果 [(5, 'xiaoming', 'bangbangbang'), (6, 'xiaojun', 'bangbang')]
一对多和多对多表的创建
# 一对多和对对多表的创建
# app1.py
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, DateTime, ForeignKey from sqlalchemy import Index, UniqueConstraint import datetime # sqlalchemy要依赖pysysql,用户名,密码,ip,端口号,数据库名字,编码方式 ENGINE = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/db111?charset=utf8",) Base = declarative_base() # 创建班级表 class Classes(Base): __tablename__ = "classes" id = Column(Integer, primary_key=True, autoincrement=True) # 整数,默认主键,自增 name = Column(String(32), nullable=False, unique=True) # 创建学生表 class Student(Base): __tablename__ = "student" id = Column(Integer, primary_key=True, autoincrement=True) # 整数,默认主键,自增 username = Column(String(32), nullable=False, unique=True) # 字符串,不能为空,唯一 password = Column(String(64), nullable=False) # 字符串,不能为空,可以不唯一 ctime = Column(DateTime, default=datetime.datetime.now) # 注意这里的new不加括号,如果加了括号,时间一直都是这个程序的启动时间 class_id = Column(Integer, ForeignKey("classes.id")) # 外键关联,要关联它的别名,关联id # 创建爱好表 class Hobby(Base): __tablename__ = 'hobby' id = Column(Integer, primary_key=True) # 整数,默认主键 caption = Column(String(50), default='篮球') # 字符串,默认值是篮球 # 创建学生表和爱好表的多对多关系表,第三张表 class Student2Hobby(Base): # 要创建多对多关系,需要自己创建第三张表 __tablename__ = 'student2hobby' id = Column(Integer, primary_key=True, autoincrement=True) student_id = Column(Integer, ForeignKey('student.id')) # 外键关联关系 hobby_id = Column(Integer, ForeignKey('hobby.id')) # 外键关联关系 __table_args__ = ( UniqueConstraint('student_id', 'hobby_id', name='uix_student_id_hobby_id'), # 创建联合唯一索引 # Index('ix_student_id_hobby_id', 'student_id', 'hobby_id') # 普通的联合索引,不约束唯一 ) def create_db(): Base.metadata.create_all(ENGINE) # 就是将继承的Base的类的所有的表都创建,创建到ENGINE数据库,就上上面那个mysql+pymysql数据库 if __name__ == '__main__': create_db()
多条数据增加
# ad1.py
import app1 from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session # 创建连接 ENGINE = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/db111?charset=utf8",) Session = sessionmaker(bind=ENGINE) # 每次执行数据库操作的时候,都需要创建一个session,就会开辟内存空间放这个session session = Session() # 多条数据增加 objs = [ app1.Classes(name="1班"), app1.Classes(name="2班"), app1.Classes(name="3班"), app1.Classes(name="4班"), app1.Classes(name="5班") ] session.add_all(objs) # 把这两个对象传给session了 session.commit() # 就把这个数据提交到数据库了 session.close() # 关闭连接
查询表数据
# index1.py
import app1 from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session # 创建连接 ENGINE = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/db111?charset=utf8",) Session = sessionmaker(bind=ENGINE) # 每次执行数据库操作的时候,都需要创建一个session,就会开辟内存空间放这个session session = Session() # 查询表数据 result = session.query(app1.Classes).all() print(result) # 打印出的是一个对象列表
for item in result:
print(item.id, item.name)
session.close() # 关闭连接
删除表数据
# del.py
import app1 from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session # 创建连接 ENGINE = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/db111?charset=utf8",) Session = sessionmaker(bind=ENGINE) # 每次执行数据库操作的时候,都需要创建一个session,就会开辟内存空间放这个session session = Session() # 将操作提交到数据库 # 查询表数据 session.query(app1.Classes).filter(app1.Classes.id > 2).delete() # 删除id>2的班级 session.commit() session.close() # 关闭连接
修改表数据
import app1 from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session # 创建连接 ENGINE = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/db111?charset=utf8",) Session = sessionmaker(bind=ENGINE) # 每次执行数据库操作的时候,都需要创建一个session,就会开辟内存空间放这个session session = Session() # 修改表数据# session.query(app1.Classes).filter_by(id=1).update({app1.Classes.name: "Ming"}) # 修改成Ming # session.query(app1.Classes).filter_by(id=2).update({"name": "jun"}) # 修改成jun # session.query(app1.Classes).filter_by(id=3).update({"name": app1.Classes.name + "~"}, synchronize_session=False) # 后面加上 # synchronize_session="evaluate" 默认值进行数字加减 session.commit() session.close() # 关闭连接
常用的条件查询
import app1 from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session # 创建连接 ENGINE = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/db111?charset=utf8",) Session = sessionmaker(bind=ENGINE) # 每次执行数据库操作的时候,都需要创建一个session,就会开辟内存空间放这个session session = Session() # 条件查询 # ret1 = session.query(app1.Classes).filter_by(id=1).first() # print(ret1.id, ret1.name) # ret2 = session.query(app1.Classes).filter(app1.Classes.id > 4, app1.Classes.name == "7班").first() # print(ret2.id, ret2.name) # ret3 = session.query(app1.Classes).filter(app1.Classes.id.between(1, 10)).all() # 范围内 # for i in ret3: # print(i.id) # ret4 = session.query(app1.Classes).filter(app1.Classes.id.in_([1, 2, 6, 10])).all() # id在这个列表里面 # for k in ret4: # print(k.name) # from sqlalchemy import and_, or_ # ret5 = session.query(app1.Classes).filter(and_(app1.Classes.id > 3, app1.Classes.name == "6班")).first() # 两个条件都要满足 # print(ret5.name) # ret6 = session.query(app1.Classes).filter(or_(app1.Classes.id > 3, app1.Classes.name == "没有这个名字")).first() # 只需要满足一个条件 # print(ret6.name) # ret7 = session.query(app1.Classes).filter(or_( # app.Classes.id > 1, # and_(app1.Classes.id > 3, app1.Classes.name == "8班") # )).all() # for g in ret7: # print(g.name) # 通配符 # ret8 = session.query(app1.Classes).filter(app1.Classes.name.like("%班")).all() # print(ret8) # ret9 = session.query(app1.Classes).filter(app1.Classes.name.like("班%")).all() # 限制 # ret10 = session.query(app1.Classes).filter(app1.Classes.name.like("%班")).all()[1:10] # for l in ret10: # print(l.name) # # 排序 # ret11 = session.query(app1.Classes).order_by(app1.Classes.id.desc()).all() # 倒序 # for r in ret11: # print(r.name) # ret12 = session.query(app1.Classes).order_by(app1.Classes.id.asc()).all() # 正序 # for a in ret12: # print(a.name) # 分组 # ret13 = session.query(app1.Classes.name).group_by(app1.Classes.name).all() # print(ret13) # for x in ret13: # print(x.name) # 聚合函数 # from sqlalchemy.sql import func # ret14 = session.query( # func.max(app1.Classes.id), # func.sum(app1.Classes.id), # func.min(app1.Classes.id) # ).group_by(app1.Classes.name).having(func.max(app1.Classes.id > 1)).all() # print(ret14) # 连表 # ret15 = session.query(app1.Student, app1.Classes).filter(app1.Student.class_id == app1.Classes.id).all() # for m in ret15: # print(m[0].username) # print(ret15) 得到一个列表套元组 元组里是两个对象 # ret16 = session.query(app1.Student).join(app1.Classes).all() # print(ret16[2].username) # 得到列表里面是前一个表的对象 # 相当于inner join # for i in ret16: # print(i[0].username, i[1].username) # ret17 = session.query(Hobby).join(UserInfo, isouter=True).all() # ret17_1 = session.query(UserInfo).join(Hobby, isouter=True).all() # ret18 = session.query(Hobby).outerjoin(UserInfo).all() # ret18_1 = session.query(UserInfo).outerjoin(Hobby).all() # 相当于left join # print(ret17) # print(ret17_1) # print(ret18) # print(ret18_1) session.commit() session.close() # 关闭连接
# 基于relationship的FK外键 # 添加 user_obj = UserInfo(name="提莫", hobby=Hobby(title="种蘑菇")) session.add(user_obj) hobby = Hobby(title="弹奏一曲") hobby.user = [UserInfo(name="琴女"), UserInfo(name="妹纸")] session.add(hobby) session.commit() # 基于relationship的正向查询 user_obj_1 = session.query(UserInfo).first() print(user_obj_1.name) print(user_obj_1.hobby.title) # 基于relationship的反向查询 hb = session.query(Hobby).first() print(hb.title) for i in hb.user: print(i.name) session.close() 基于relationship的FK
# 添加 book_obj = Book(title="Python源码剖析") tag_obj = Tag(title="Python") b2t = Book2Tag(book_id=book_obj.id, tag_id=tag_obj.id) session.add_all([ book_obj, tag_obj, b2t, ]) session.commit() # 上面有坑哦~~~~ book = Book(title="测试") book.tags = [Tag(title="测试标签1"), Tag(title="测试标签2")] session.add(book) session.commit() tag = Tag(title="LOL") tag.books = [Book(title="大龙刷新时间"), Book(title="小龙刷新时间")] session.add(tag) session.commit() # 基于relationship的正向查询 book_obj = session.query(Book).filter_by(id=4).first() print(book_obj.title) print(book_obj.tags) # 基于relationship的反向查询 tag_obj = session.query(Tag).first() print(tag_obj.title) print(tag_obj.books) 基于relationship的M2M多对多