SQLAlchemy is the Python SQL toolkit and Object Relational Mapper that gives application developers the full power and flexibility of SQL. SQLAlchemy provides a full suite of well known enterprise-level persistence patterns, designed for efficient and high-performing database access, adapted into a simple and Pythonic domain language.
SQLAlchemy 比Django的orm更接近原生sql的语法书写
1 from sqlalchemy.ext.declarative import declarative_base 2 from sqlalchemy import Column 3 from sqlalchemy import Integer,String,Text,Date,DateTime 4 from sqlalchemy import create_engine 5 7 Base = declarative_base() 8 9 class Users(Base): 10 __tablename__ = 'users' 11 12 id = Column(Integer, primary_key=True) 13 name = Column(String(32), index=True, nullable=False) 14 depart_id = Column(Integer) 15 16 def create_all(): 17 engine = create_engine( 18 "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8", 19 max_overflow=0, # 超过连接池大小外最多创建的连接 20 pool_size=5, # 连接池大小 21 pool_timeout=10, # 池中没有连接最多等待的时间,否则报错 22 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) 23 ) 25 Base.metadata.create_all(engine) 26 27 def drop_all(): 28 engine = create_engine( 29 "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8", 30 max_overflow=0, # 超过连接池大小外最多创建的连接 31 pool_size=5, # 连接池大小 32 pool_timeout=10, # 池中没有连接最多等待的时间,否则报错 33 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) 34 ) 35 Base.metadata.drop_all(engine) 36 37 if __name__ == '__main__': 38 drop_all() 39 create_all() # 已经有表的话,不会重复创建
注意:SQLAlchemy本身创建表之后,不支持删除表中的字段,再次运行修改表结构的,需要借助第三方模块(靠生成migration文件)

1 from sqlalchemy.orm import sessionmaker 2 from sqlalchemy import create_engine 3 from models import Users 4 5 engine = create_engine( 6 "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8", 7 max_overflow=0, # 超过连接池大小外最多创建的连接 8 pool_size=5, # 连接池大小 9 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 10 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) 11 ) 12 SessionFactory = sessionmaker(bind=engine) 13 14 # 根据Users类对users表进行增删改查 15 session = SessionFactory() 16 17 # ############################## 基本增删改查 ############################### 18 # 1. 增加 19 obj = Users(name='alex') 20 session.add(obj) 21 session.commit() 22 23 session.add_all([ 24 Users(name='小东北'), 25 Users(name='龙泰') 26 ]) 27 session.commit() 28 29 # 2. 查 30 result = session.query(Users).all() 31 for row in result: 32 print(row.id,row.name) 33 34 result = session.query(Users).filter(Users.id >= 2) 35 for row in result: 36 print(row.id,row.name) 37 38 result = session.query(Users).filter(Users.id >= 2).first() 39 print(result) 40 41 # 3.删 42 session.query(Users).filter(Users.id >= 2).delete() 43 session.commit() 44 45 # 4.改 46 session.query(Users).filter(Users.id == 4).update({Users.name:'东北'}) 47 session.query(Users).filter(Users.id == 4).update({'name':'小东北'}) 48 session.query(Users).filter(Users.id == 4).update({'name':Users.name+"DSB"},synchronize_session=False) 49 # synchronize_session 让Users.name+"DSB"变成字符串连接操作 50 51 session.commit() 52 session.close()

1 from sqlalchemy.orm import sessionmaker 2 from sqlalchemy import create_engine 3 from models import Users 4 5 engine = create_engine( 6 "mysql + pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8", 7 max_overflow=0, # 超过连接池大小外最多创建的连接 8 pool_size=5, # 连接池大小 9 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 10 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) 11 ) 12 SessionFactory = sessionmaker(bind=engine) 13 14 # 根据Users类对users表进行增删改查 15 session = SessionFactory() 16 17 18 # 1. 指定列 19 # select id,name as cname from users; 20 result = session.query(Users.id,Users.name.label('cname')).all() 21 for item in result: 22 print(item[0],item.id,item.cname) 23 24 sql_query = session.query(Users.id,Users.name.label('cname')) # 查看sql语句 25 26 # 2. 默认条件and 27 session.query(Users).filter(Users.id > 1, Users.name == 'eric').all() 28 # 3. between 29 session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all() 30 # 4. in 31 session.query(Users).filter(Users.id.in_([1,3,4])).all() 32 session.query(Users).filter(~Users.id.in_([1,3,4])).all() 33 # 5. 子查询 34 session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='eric'))).all() 35 # 6. and 和 or 36 from sqlalchemy import and_, or_ 37 session.query(Users).filter(Users.id > 3, Users.name == 'eric').all() 38 session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all() 39 session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all() 40 session.query(Users).filter( 41 or_( 42 Users.id < 2, 43 and_(Users.name == 'eric', Users.id > 3), 44 Users.extra != "" 45 )).all() 46 47 # 7. filter_by 48 session.query(Users).filter_by(name='ppp').all() # 内部还是转换成表达式,调用filter方法 49 50 # 8. 通配符 51 ret = session.query(Users).filter(Users.name.like('e%')).all() 52 ret = session.query(Users).filter(~Users.name.like('e%')).all() 53 54 # 9. 切片 55 result = session.query(Users)[1:2] 56 57 # 10.排序 58 ret = session.query(Users).order_by(Users.name.desc()).all() 59 ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() 60 61 # 11. group by 62 from sqlalchemy.sql import func 63 64 ret = session.query( 65 Users.depart_id, 66 func.count(Users.id), # 指定按照什么来聚合,如果不指定则会按照默认底层配置来取,这样可能就不是我们想要的 67 ).group_by(Users.depart_id).all() 68 for item in ret: 69 print(item) 70 71 72 ret = session.query( 73 Users.depart_id, 74 func.count(Users.id), 75 ).group_by(Users.depart_id).having(func.count(Users.id) >= 2).all() # having 76 for item in ret: 77 print(item) 78 79 # 12.union 和 union all 80 """ 81 select id,name from users 82 UNION 83 select id,name from users; 84 """ 85 q1 = session.query(Users.name).filter(Users.id > 2) 86 q2 = session.query(Favor.caption).filter(Favor.nid < 2) 87 ret = q1.union(q2).all() 88 89 q1 = session.query(Users.name).filter(Users.id > 2) 90 q2 = session.query(Favor.caption).filter(Favor.nid < 2) 91 ret = q1.union_all(q2).all() 92 93 94 session.close()
多表操作

1 from sqlalchemy.ext.declarative import declarative_base 2 from sqlalchemy import Column 3 from sqlalchemy import Integer,String,Text,Date,DateTime,ForeignKey,UniqueConstraint, Index 4 from sqlalchemy import create_engine 5 from sqlalchemy.orm import relationship 6 7 8 Base = declarative_base() 9 10 class Depart(Base): 11 __tablename__ = 'depart' 12 id = Column(Integer, primary_key=True) 13 title = Column(String(32), index=True, nullable=False) 14 15 class Users(Base): 16 __tablename__ = 'users' 17 18 id = Column(Integer, primary_key=True) 19 name = Column(String(32), index=True, nullable=False) 20 depart_id = Column(Integer,ForeignKey("depart.id")) 21 22 dp = relationship("Depart", backref='pers') 23 24 class Student(Base): 25 __tablename__ = 'student' 26 id = Column(Integer, primary_key=True) 27 name = Column(String(32), index=True, nullable=False) 28 29 course_list = relationship('Course', secondary='student2course', backref='student_list') 30 31 class Course(Base): 32 __tablename__ = 'course' 33 id = Column(Integer, primary_key=True) 34 title = Column(String(32), index=True, nullable=False) 35 36 class Student2Course(Base): 37 __tablename__ = 'student2course' 38 id = Column(Integer, primary_key=True, autoincrement=True) 39 student_id = Column(Integer, ForeignKey('student.id')) 40 course_id = Column(Integer, ForeignKey('course.id')) 41 42 __table_args__ = ( 43 UniqueConstraint('student_id', 'course_id', name='uix_stu_cou'), # 联合唯一索引 44 # Index('ix_id_name', 'name', 'extra'), # 联合索引 45 ) 46 47 def create_all(): 48 engine = create_engine( 49 "mysql+pymysql://root:@127.0.0.1:3306/s9day120?charset=utf8", 50 max_overflow=0, # 超过连接池大小外最多创建的连接 51 pool_size=5, # 连接池大小 52 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 53 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) 54 ) 55 56 Base.metadata.create_all(engine) 57 58 def drop_all(): 59 engine = create_engine( 60 "mysql+pymysql://root:@127.0.0.1:3306/s9day120?charset=utf8", 61 max_overflow=0, # 超过连接池大小外最多创建的连接 62 pool_size=5, # 连接池大小 63 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 64 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) 65 ) 66 Base.metadata.drop_all(engine) 67 68 if __name__ == '__main__': 69 create_all()

1 # 查询 2 # 查询所有用户+所属部门名称 3 ret = session.query(Users.id,Users.name,Depart.title).join(Depart,Users.depart_id == Depart.id).all() 4 for row in ret: 5 print(row.id,row.name,row.title) 6 7 # isouter默认是False,表示inner join 8 query = session.query(Users.id,Users.name,Depart.title).join(Depart,Users.depart_id == Depart.id,isouter=True) 9 print(query) 10 11 # relation字段:查询所有用户+所属部门名称 12 ret = session.query(Users).all() 13 for row in ret: 14 print(row.id,row.name,row.depart_id,row.dp.title) 15 16 # relation字段:查询销售部所有的人员 17 obj = session.query(Depart).filter(Depart.title == '销售').first() 18 for row in obj.pers: 19 print(row.id,row.name,obj.title) 20 21 22 # 增加 23 # 创建一个名称叫:IT部门,再在该部门中添加一个员工:田硕 24 # 方式一: 25 d1 = Depart(title='IT') 26 session.add(d1) 27 session.commit() 28 29 u1 = Users(name='田硕',depart_id=d1.id) 30 session.add(u1) 31 session.commit() 32 33 # 方式二: 34 u1 = Users(name='田硕',dp=Depart(title='IT')) 35 session.add(u1) 36 session.commit() 37 38 # 创建一个名称叫:王者荣耀,再在该部门中添加一个员工:龚林峰/长好梦/王爷们 39 d1 = Depart(title='王者荣耀') 40 d1.pers = [Users(name='龚林峰'),Users(name='长好梦'),Users(name='王爷们'),] # 批量添加 41 session.add(d1) 42 session.commit()

1 # 1. 录入数据 2 session.add_all([ 3 Student(name='先用'), 4 Student(name='佳俊'), 5 Course(title='生物'), 6 Course(title='体育'), 7 ]) 8 session.commit() 9 10 session.add_all([ 11 Student2Course(student_id=2, course_id=1) 12 ]) 13 session.commit() 14 15 16 # 2. 三张表关联 17 ret = session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id,isouter=True).join(Course,Student2Course.course_id==Course.id,isouter=True).order_by(Student2Course.id.asc()) 18 for row in ret: 19 print(row) 20 21 # 3. “先用”选的所有课 22 # 方式一 23 ret = session.query(Student2Course.id,Student.name,Course.title).outerjoin(Student,Student2Course.student_id==Student.id).outerjoin(Course,Student2Course.course_id==Course.id).filter(Student.name=='先用').order_by(Student2Course.id.asc()).all() 24 print(ret) 25 26 # 方式二 27 obj = session.query(Student).filter(Student.name=='先用').first() 28 for item in obj.course_list: 29 print(item.title) 30 31 32 # 4. 选了“生物”的所有人 33 ret = session.query(Course).filter(Course.title=='生物').first() 34 for row in ret.student_list: 35 print(row.name) 36 37 38 # 5. 创建一个课程,创建2学生,两个学生选新创建的课程。 39 obj = Course(title='英语') 40 obj.student_list = [Student(name='为名'),Student(name='广宗')] 41 42 session.add(obj) 43 session.commit()

1 from sqlalchemy.orm import sessionmaker 2 from sqlalchemy import create_engine 3 from models import Student,Course,Student2Course 4 5 engine = create_engine( 6 "mysql+pymysql://root:@127.0.0.1:3306/s9day120?charset=utf8", 7 max_overflow=0, # 超过连接池大小外最多创建的连接 8 pool_size=5, # 连接池大小 9 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 10 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) 11 ) 12 SessionFactory = sessionmaker(bind=engine) 13 14 def task(): 15 # 去连接池中获取一个连接,不能用全局变量中的连接 16 session = SessionFactory() 17 18 ret = session.query(Student).all() 19 print(ret) 20 # 将连接交还给连接池 21 session.close() 22 23 24 from threading import Thread 25 26 for i in range(20): 27 t = Thread(target=task) 28 t.start()

1 from sqlalchemy.orm import sessionmaker 2 from sqlalchemy import create_engine 3 from sqlalchemy.orm import scoped_session 4 from models import Student 5 6 engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, pool_size=5) 7 Session = sessionmaker(bind=engine) # 基于threading.local实现 8 9 """ 10 # 线程安全,基于本地线程实现每个线程用同一个session 11 # 特殊的:scoped_session中有原来方法的Session中的一下方法: 12 13 public_methods = ( 14 '__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested', 15 'close', 'commit', 'connection', 'delete', 'execute', 'expire', 16 'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind', 17 'is_modified', 'bulk_save_objects', 'bulk_insert_mappings', 18 'bulk_update_mappings', 19 'merge', 'query', 'refresh', 'rollback', 20 'scalar' 21 ) 22 """ 23 session = scoped_session(Session) 24 25 26 def task(): 27 ret = session.query(Student).all() 28 print(ret) 29 # 将连接交还给连接池 30 session.remove() 31 32 33 from threading import Thread 34 35 for i in range(10): 36 t = Thread(target=task) 37 t.start()

1 import time 2 import threading 3 4 from sqlalchemy.ext.declarative import declarative_base 5 from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index 6 from sqlalchemy.orm import sessionmaker, relationship 7 from sqlalchemy import create_engine 8 from sqlalchemy.sql import text 9 from sqlalchemy.engine.result import ResultProxy 10 from models import Users 11 12 engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, pool_size=5) 13 Session = sessionmaker(bind=engine) 14 15 session = Session() 16 17 # 查询 18 cursor = session.execute('select * from users') 19 result = cursor.fetchall() 20 print(result) 21 22 # 添加 23 cursor = session.execute('insert into users(name) values(:value)',params={"value":'cpp'}) # 注意有: 24 session.commit() 25 print(cursor.lastrowid) 26 27 # 方式二: 28 conn = engine.raw_connection() 29 cursor = conn.cursor() 30 cursor.execute( 31 "select * from users" 32 ) 33 result = cursor.fetchall() 34 print(result) 35 36 cursor.close() 37 conn.close() 38 39 40 session.close()

1 class scoped_session(object): 2 3 session_factory = None 4 5 def __init__(self, session_factory, scopefunc=None): 6 self.session_factory = session_factory 7 8 if scopefunc: 9 self.registry = ScopedRegistry(session_factory, scopefunc) 10 else: 11 self.registry = ThreadLocalRegistry(session_factory) 12 13 14 def instrument(name): # 闭包 15 def do(self, *args, **kwargs): 16 return getattr(self.registry(), name)(*args, **kwargs) 17 return do 18 19 class Session(_SessionClassMethods): 20 public_methods = ( 21 '__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested', 22 'close', 'commit', 'connection', 'delete', 'execute', 'expire', 23 'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind', 24 'is_modified', 'bulk_save_objects', 'bulk_insert_mappings', 25 'bulk_update_mappings', 26 'merge', 'query', 'refresh', 'rollback', 27 'scalar') 28 29 for meth in Session.public_methods: # 动态给scoped_session(Session) 设置方法 30 setattr(scoped_session, meth, instrument(meth)) 31 32 33 class ThreadLocalRegistry(ScopedRegistry): 34 def __init__(self, createfunc): 35 self.createfunc = createfunc 36 self.registry = threading.local() 37 38 def __call__(self): 39 try: 40 return self.registry.value 41 except AttributeError: 42 val = self.registry.value = self.createfunc() # self.createfunc就是 sessionmaker(bind=engine) 加()生成socket连接 43 return val
参考:
https://www.cnblogs.com/wupeiqi/articles/8259356.html