参考 http://www.cnblogs.com/wupeiqi/articles/8259356.html
概述
作用
1,提供简单的规则
2,自动将类转换成SQL语句并执行
两种设计模型
DBfirst:手动创建数据库以及表 --> ORM框架 --> 自动生成表
CODEfirst:手动创建类、数据库 --> ORM框架 --> 自动生成表 ---> SQLAlchemy
连接数据库
conn = "mysql+pymysql://root:@127.0.0.1:3306/db6?charset=utf8" ngine = create_engine(conn, max_overflow=5) # 连接mysql
表操作

from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHAR from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine Base = declarative_base() class Type(Base): __tablename__ = 'type' tid = Column(Integer, primary_key=True, autoincrement=True) tname = Column(String(32)) class User(Base): __tablename__ = 'users' # 表名 id = Column(Integer, primary_key=True) # 主键 name = Column(String(32)) email = Column(String(32)) type_id = Column(Integer, ForeignKey('type.tid')) # 外键 # __table_args__ = ( # 联合 # UniqueConstraint('id', 'name', name='uix_id_name'), # 联合唯一 # # Index('ix_id_name', 'name', 'extra') # 联合索引 # ) conn = "mysql+pymysql://root:@127.0.0.1:3306/db6?charset=utf8" def create_table(conn): engine = create_engine(conn, max_overflow=5) # 连接mysql Base.metadata.create_all(engine) # 找到继承了Base的所有的类,创建表 --> 全建表 def drop_table(conn): engine = create_engine(conn, max_overflow=5) # 连接mysql Base.metadata.drop_all(engine) # 找到继承了Base的所有的类,删除表 --> 全删表 create_table(conn)

autuincrement = True # 自增 nullable = True # 不为空 default = ' ' # 默认值 index = True # 索引 unique = True # 唯一索引
数据行操作
增删改查

from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHAR from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine Base = declarative_base() class Type(Base): __tablename__ = 'type' tid = Column(Integer, primary_key=True, autoincrement=True) tname = Column(String(32)) class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32)) email = Column(String(32)) type_id = Column(Integer, ForeignKey('type.tid')) conn = "mysql+pymysql://root:@127.0.0.1:3306/db6?charset=utf8" def create_table(conn): engine = create_engine(conn, max_overflow=5) # 连接mysql Base.metadata.create_all(engine) # 找到继承了Base的所有的类,创建表 def drop_table(conn): engine = create_engine(conn, max_overflow=5) # 连接mysql Base.metadata.drop_all(engine) # 找到继承了Base的所有的类,删除表 # create_table(conn) engine = create_engine(conn, max_overflow=5) Session = sessionmaker(bind=engine) session = Session() # 获取一个连接 """增加一条用户""" # obj1 = Type(tname='普通用户') # session.add(obj1) """增加多条用户""" # objs = [ # Type(tname='超级用户'), # Type(tname='白金用户'), # Type(tname='黑金用户'), # ] # session.add_all(objs) """查全部""" # type_list = session.query(Type).all() # for row in type_list: # print(row.tid, row.tname) """查符合条件的""" # type_list = session.query(Type).filter(Type.tid > 2) # for row in type_list: # print(row.tid, row.tname) """查某个字段[列]""" # type_list = session.query(Type.tname).all() # for row in type_list: # print(row.tname) """删【先查到再删】""" # session.query(Type).filter(Type.tid > 2).delete() """普通改【先查到再改】""" # session.query(Type).filter(Type.tid > 0).update({'tname': '白金'}) """在原有基础上改【字符串类型】""" session.query(Type).filter(Type.tid > 0).update({Type.tname: Type.tname + 'X'}, synchronize_session = False) """在原有基础上改【数字类型类型】""" # session.query(Type).filter(Type.tid > 0).update({'num': Type.num+1}, synchronize_session = 'evaluate') # 字段没有创建num字段,无法测试 session.commit() # 提交 session.close() # 关闭连接

- 增: """增加一条用户""" obj1 = Type(tname='普通用户') session.add(obj1) """增加多条用户""" objs = [ Type(tname='超级用户'), Type(tname='白金用户'), Type(tname='黑金用户'), ] session.add_all(objs) - 删: """删【先查到再删】""" session.query(Type).filter(Type.tid > 2).delete() - 改: """普通改【先查到再改】""" session.query(Type).filter(Type.tid > 0).update({'tname':'白金'}) """在原有基础上改【字符串类型】""" session.query(Type).filter(Type.tid > 0).update({Type.tname: Type.tname + 'X'}, synchronize_session = False) """在原有基础上改【数字类型类型】""" session.query(Type).filter(Type.tid > 0).update({'num': Type.num+1}, synchronize_session = 'evaluate') # 字段没有创建num字段,无法测试 - 查: """查全部""" type_list = session.query(Type) for row in type_list: print(row.tid, row.tname) """查符合条件的""" type_list = session.query(Type).filter(Type.tid > 0 ) for row in type_list: print(row.tid, row.tname) """查某个字段[列]""" # type_list = session.query(Type.tname) for row in type_list: print(row.tname)
查的扩展

from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHAR from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy import and_, or_ Base = declarative_base() class Type(Base): __tablename__ = 'type' tid = Column(Integer, primary_key=True, autoincrement=True) tname = Column(String(32)) class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32)) email = Column(String(32)) type_id = Column(Integer, ForeignKey('type.tid')) conn = "mysql+pymysql://root:@127.0.0.1:3306/db6?charset=utf8" def create_table(conn): engine = create_engine(conn, max_overflow=5) # 连接mysql Base.metadata.create_all(engine) # 找到继承了Base的所有的类,创建表 def drop_table(conn): engine = create_engine(conn, max_overflow=5) # 连接mysql Base.metadata.drop_all(engine) # 找到继承了Base的所有的类,删除表 engine = create_engine(conn, max_overflow=5) Session = sessionmaker(bind=engine) session = Session() # 获取一个连接 """查全部""" # type_list = session.query(Type) # for row in type_list: # print(row.tid, row.tname) """查符合条件的""" # type_list = session.query(Type).filter(Type.tid > 0 ) # for row in type_list: # print(row.tid, row.tname) """查某个字段[列]""" # type_list = session.query(Type.tname) # for row in type_list: # print(row.tname) """查询符合条件的第一个""" # ret = session.query(Users).filter_by(name='alex').first() # print(ret.name, ret.email, ret.type_id) """and""" # ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric') # for row in ret: # print(row.name) """between""" # ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric') # for row in ret: # print(row.id, row.name) """in""" # ret = session.query(Users).filter(Users.id.in_([1, 3, 4])) # for row in ret: # print(row.id, row.name) """not in""" # ret = session.query(Users).filter(~Users.id.in_([1, 3, 4])) # for row in ret: # print(row.id, row.name) """子查询""" # ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))) # for row in ret: # print(row.id, row.name) """ and or """ # from sqlalchemy import and_, or_ # ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')) # for row in ret: # print(row.id, row.name) # ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')) # for row in ret: # print(row.id, row.name) # ret = session.query(Users).filter( # or_( # Users.id < 2, # and_(Users.name == 'eric', Users.id > 3), # Users.email != "" # )) # for row in ret: # print(row.id, row.name) """通配符""" # ret = session.query(Users).filter(Users.name.like('e%')) # for row in ret: # print(row.id, row.name) # ret = session.query(Users).filter(~Users.name.like('e%')) # not like # for row in ret: # print(row.id, row.name) """类似limit,但是通过切片取值""" # ret = session.query(Users)[1:3] # for row in ret: # print(row.id, row.name) """排序""" # ret = session.query(Users).order_by(Users.id.desc()) # for row in ret: # print(row.id, row.name) # ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()) # for row in ret: # print(row.id, row.name) """分组""" from sqlalchemy.sql import func # ret = session.query(Users.name).group_by(Users.name) # for row in ret: # # print(row.name) # print(row[0]) # ret = session.query( # Users.name, # func.max(Users.id), # func.sum(Users.id), # func.min(Users.id)).group_by(Users.name) # print(ret) # for row in ret: # row 是一个元祖 # print(row[0], row[1], row[2], row[3]) # ret = session.query( # func.max(Users.id), # func.sum(Users.id), # func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) > 2) # 对聚合函数的返回值做第二次使用 # for row in ret: # print(row[0], row[1], row[2]) """连表""" # ret = session.query(Users).join(Type) # inner join # print(ret) # for row in ret: # print(row) # result = session.query(Users).join(Type, isouter=True) # left join # print(result) # for row in result: # print(row.name) """子查询""" # 【一】 select * from user where user.id in (子查询) # q1 = session.query(Users.id).filter_by(name='eric') # 查找到名字是eric的用户id 的【子查询】 # ret = session.query(Users).filter(Users.id.in_(q1)) # for row in ret: # print(row.id, row.name) # 【二】 select * from (子查询) # q2 = session.query(Type).filter(Type.tid > 1).subquery() # 查找到type的tid大于1的所有信息 的【子查询】 # ret = session.query(q2) # for row in ret: # print(row.tid, row.tname) # 【三*****】 select users.id,users.name,(子查询) from users q3 = session.query(Type.tname).filter(Users.type_id == Type.tid).as_scalar() # 选出tid == type_id 的 tname ret = session.query(Users.id, Users.name, q3) for row in ret: print(row)

"""查全部""" type_list = session.query(Type) for row in type_list: print(row.tid, row.tname) """查符合条件的""" type_list = session.query(Type).filter(Type.tid > 0 ) for row in type_list: print(row.tid, row.tname) """查某个字段[列]""" # type_list = session.query(Type.tname) for row in type_list: print(row.tname) """查询符合条件的第一个""" ret = session.query(Users).filter_by(name='alex').first() print(ret.name, ret.email, ret.type_id) """and""" ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric') for row in ret: print(row.name) """between""" ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric') for row in ret: print(row.id, row.name) """in""" ret = session.query(Users).filter(Users.id.in_([1, 3, 4])) for row in ret: print(row.id, row.name) """not in""" ret = session.query(Users).filter(~Users.id.in_([1, 3, 4])) for row in ret: print(row.id, row.name) """子查询""" ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))) for row in ret: print(row.id, row.name) """ and or """ from sqlalchemy import and_, or_ ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')) for row in ret: print(row.id, row.name) ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')) for row in ret: print(row.id, row.name) ret = session.query(Users).filter( or_( Users.id < 2, and_(Users.name == 'eric', Users.id > 3), Users.email != "" )) for row in ret: print(row.id, row.name) """通配符""" ret = session.query(Users).filter(Users.name.like('e%')) for row in ret: print(row.id, row.name) ret = session.query(Users).filter(~Users.name.like('e%')) not like for row in ret: print(row.id, row.name) """类似limit,但是通过切片取值""" ret = session.query(Users)[1:3] for row in ret: print(row.id, row.name) """排序""" ret = session.query(Users).order_by(Users.id.desc()) for row in ret: print(row.id, row.name) ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()) for row in ret: print(row.id, row.name) """分组""" from sqlalchemy.sql import func ret = session.query(Users.name).group_by(Users.name) for row in ret: print(row.name) print(row[0]) ret = session.query( Users.name, func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name) print(ret) for row in ret: # row 是一个元祖 print(row[0], row[1], row[2], row[3]) ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) > 2) # 对聚合函数的返回值做第二次使用 for row in ret: print(row[0], row[1], row[2]) """连表""" ret = session.query(Users).join(Type) # inner join print(ret) for row in ret: print(row) result = session.query(Users).join(Type, isouter=True) # left join print(result) for row in result: print(row.name) """子查询""" # 【一】 select * from user where user.id in (子查询) q1 = session.query(Users.id).filter_by(name='eric') # 查找到名字是eric的用户id 的【子查询】 ret = session.query(Users).filter(Users.id.in_(q1)) for row in ret: print(row.id, row.name) # 【二】 select * from (子查询) q2 = session.query(Type).filter(Type.tid > 1).subquery() # 查找到type的tid大于1的所有信息 的【子查询】 ret = session.query(q2) for row in ret: print(row.tid, row.tname) # 【*****三】 select users.id,users.name,(子查询) from users q3 = session.query(Type.tname).filter(Users.type_id == Type.tid).as_scalar() # 选出tid == type_id 的 tname ret = session.query(Users.id, Users.name, q3) for row in ret: print(row)
常用操作

from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users """ # Model结构 Base = declarative_base() class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32)) depart_id = Column(Integer) """ engine = create_engine( "mysql+pymysql://root:@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) session = SessionFactory() # 1 指定列 # result =session.query(Users.id, Users.name.label('cname')).all() # print(result) # for i in result: # print(i.id, i.cname) # 2 默认条件and # ret = session.query(Users).filter(Users.id >= 1, Users.name == 'alexX').all() # print(ret) # 3 between # ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'helloX').all() # for i in ret: # print(i.id) # 4 in # ret = session.query(Users).filter(Users.id.in_([1,3,4])).all() # ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all() # for i in ret: # print(i.id) # 5 子查询 # ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='helloX'))).all() # for i in ret: # print(i.id) # 6 and 和 or from sqlalchemy import and_, or_ # ret = session.query(Users).filter(Users.id < 3, Users.name == 'helloX').all() # ret = session.query(Users).filter(and_(Users.id < 3, Users.name == 'helloX')).all() # ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'helloX')).all() # ret = session.query(Users).filter( # or_( # Users.id > 2, # and_(Users.name == 'alexX', Users.id < 3), # )).all() # # for i in ret: # print(i.id) # 7 filter_by # ret = session.query(Users).filter_by(name='alexX').all() # for i in ret: # print(i.id) # 8 通配符 # ret = session.query(Users).filter(Users.name.like('A%')).all() # 注意不区分大小写 # ret = session.query(Users).filter(Users.name.like('%x')).all() # 注意不区分大小写 # ret = session.query(Users).filter(~Users.name.like('A%')).all() # 注意不区分大小写 # for i in ret: # print(i.id) # 9 切片 # result = session.query(Users)[1:4] # 取第二个到第五个(不包括第五个)记录 # for i in result: # print(i.id) # 10 排序 # ret = session.query(Users).order_by(Users.name.desc()).all() # 从大到小 # ret = session.query(Users).order_by(Users.name.asc(), Users.id.desc()).all() # 从按名字从小到大, 再按id从大到小 # for i in ret: # print(i.id, i.name) # 11 group by # from sqlalchemy.sql import func # ret = session.query( # Users.depart_id, # func.count(Users.id), # ).group_by(Users.depart_id).all() # # for i in ret: # print(i) # from sqlalchemy.sql import func # ret = session.query( # Users.depart_id, # func.count(Users.id), # ).group_by(Users.depart_id).having(func.count(Users.id) > 1).all() # # for i in ret: # print(i) # 12.union 和 union all """ select id,name from users UNION select id,name from users; """ # q1 = session.query(Users.name).filter(Users.id > 2) # q2 = session.query(Users.depart_id).filter(Users.id <= 2) # ret = q1.union(q2).all() # for i in ret: # print(i) # q1 = session.query(Users.name).filter(Users.id > 2) # q2 = session.query(Users.depart_id).filter(Users.id <= 2) # ret = q1.union_all(q2).all() # for i in ret: # print(i)
便利的连表

from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHAR from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine Base = declarative_base() class Type(Base): __tablename__ = 'type' tid = Column(Integer, primary_key=True, autoincrement=True) tname = Column(String(32)) class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32)) email = Column(String(32)) type_id = Column(Integer, ForeignKey('type.tid')) users_type = relationship('Type', backref='type_users') # 做关联,更便利的连表 conn = "mysql+pymysql://root:@127.0.0.1:3306/db6?charset=utf8" def create_table(conn): engine = create_engine(conn, max_overflow=5) # 连接mysql Base.metadata.create_all(engine) # 找到继承了Base的所有的类,创建表 def drop_table(conn): engine = create_engine(conn, max_overflow=5) # 连接mysql Base.metadata.drop_all(engine) # 找到继承了Base的所有的类,删除表 engine = create_engine(conn, max_overflow=5) Session = sessionmaker(bind=engine) session = Session() # 获取一个连接 """正向操作""" print('----正向操作----') result_list = session.query(Users) for row in result_list: print(row.name, row.type_id, row.users_type.tname) """反向操作""" print('----反向操作----') result_list = session.query(Type) for row in result_list: # print(row.tid, row.tname) print('会员等级:%s' % row.tname) for i in row.type_users: print(i.id, i.name, i.type_id)

- 正向操作 result_list = session.query(Users) for row in result_list: print(row.name, row.type_id, row.users_type.tname) - 反向操作 result_list = session.query(Type) for row in result_list: # print(row.tid, row.tname) print('会员等级:%s' % row.tname) for i in row.type_users: print(i.id, i.name, i.type_id)