使用方法
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column # 列 from sqlalchemy import Integer, String, Text, Date, DateTime Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(24), index=True, nullable=False) from sqlalchemy import create_engine engine = create_engine( "mysql+pymysql://root:12345678@127.0.0.1:3306/mypro?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1, # 多久之后对线程池的线程进行一次连接的回收(重置) ) # 创建 Base.metadata.create_all(engine) # 删除 # Base.metadata.drop_all(engine)
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from models import User from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker engine = create_engine( "mysql+pymysql://root:12345678@127.0.0.1:3306/mypro?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1, # 多久之后对线程池的线程进行一次连接的回收(重置) ) Session = sessionmaker(bind=engine) # 根据User类对users表进行增删改查 session = Session() """ 增加 """ obj = User(name='alex') session.add(obj) session.commit() session.add_all([ User(name='小东北'), User(name='龙泰') ]) session.commit() """ 查 """ result = session.query(User).all() for row in result: print(row.id, row.name) result = session.query(User).filter(User.id > 2) for row in result: print(row.id, row.name) result = session.query(User).all().filter(User.id > 2).first() print(result) # 对象 """ 删 """ session.query(User).filter(User.id >= 2).delete() session.commit() """ 改 """ session.query(User).filter(User.id == 4).update({User.name: "东北"}) session.query(User).filter(User.id == 4).update({"name": User.name + "DSB"}, synchronize_session=False) """ and 和 or """ from sqlalchemy import and_, or_ session.query(User).filter(and_(User.id > 3, User.name == 'eric')).all() session.query(User).filter(or_(User.id < 2, User.name == "eric")).all() session.query(User).filter( or_( User.id < 2, and_(User.name == 'eric', User.id > 3), User.extra != "" ) ).all() """ filter_by """ session.query(User).filter_by(name='alex').all() """ 通配符 """ session.query(User).filter(User.name.like('e%')).all() session.query(User).filter(User.name.like('e%')).all() """ 切片 """ session.query(User)[1:2] """ 排序 """ session.query(User).order_by(User.name.desc()).all() session.query(User).order_by(User.name.desc(), User.id.asc()).all() """ group_by """ from sqlalchemy.sql import func ret = session.query( func.max(User.id), func.min(User.id), ).group_by(User.depart_id).all() for item in ret: print(item) """ union union_all表数据上下并接 """ q1 = session.query(User.name).filter(User.id > 2) q2 = session.query(User.caption).filter(User.nid < 2) ret = q1.union_all(q2).all() # 不去重 ret = q1.union(q2).all() # 去重