用a*my写原味sql
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String from sqlalchemy import create_engine Base = declarative_base() # 创建单表 class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(32)) extra = Column(String(16)) # 数据库连接相关 engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/day128?charset=utf8") def init_db(): # 创建表 Base.metadata.create_all(engine) def drop_db(): # 删除表 # Base.metadata.drop_all(engine) pass if __name__ == '__main__': # drop_db() init_db()
from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/day128?charset=utf8") cur = engine.execute('SELECT * FROM users') result = cur.fetchall() print(result)
用a*my写ORM增删改查
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, UniqueConstraint, Index, DateTime, ForeignKey from sqlalchemy import create_engine from sqlalchemy.orm import relationship import datetime Base = declarative_base() class Classes(Base): __tablename__ = 'classes' id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(32), nullable=False, unique=True) class Student(Base): __tablename__ = 'student' id = Column(Integer, primary_key=True, autoincrement=True) username = Column(String(32), nullable=False, index=True) password = Column(String(64), nullable=False) ctime = Column(DateTime, default=datetime.datetime.now) class_id = Column(Integer, ForeignKey("classes.id")) # 关联字段 backref = 反向字段 cls = relationship("Classes", backref='stus') class Hobby(Base): __tablename__ = 'hobby' id = Column(Integer, primary_key=True) caption = Column(String(50), default='篮球') class Student2Hobby(Base): __tablename__ = 'student2hobby' id = Column(Integer, primary_key=True, autoincrement=True) student_id = Column(Integer, ForeignKey('student.id')) hobby_id = Column(Integer, ForeignKey('hobby.id')) __table_args__ = ( UniqueConstraint('student_id', 'hobby_id', name='uix_student_id_hobby_id'), # Index('ix_id_name', 'name', 'extra'), ) def init_db(): # 数据库连接相关 engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/day128?charset=utf8") # 创建表 Base.metadata.create_all(engine) def drop_db(): engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/day128?charset=utf8") # 删除表 Base.metadata.drop_all(engine) if __name__ == '__main__': # drop_db() init_db()
import models from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/day128?charset=utf8") XXXXXX = sessionmaker(bind=engine) session = XXXXXX() # 单条插入 # obj = models.Classes(name='第一期') # session.add(obj) # session.commit() # 多条插入 # objs = [ # models.Classes(name='第二期'), # models.Classes(name='第三期'), # models.Classes(name='第四期'), # models.Classes(name='第五期'), # ] # # session.add_all(objs) # session.commit() session.close()
import models from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/day128?charset=utf8") session = sessionmaker(bind=engine)() result = session.query(models.Classes).all() for item in result: print(item.id, item.name) session.close()
import models from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/day128?charset=utf8") session = sessionmaker(bind=engine)() session.query(models.Classes).filter(models.Classes.id>4).delete() session.commit() session.close()
import models from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/day128?charset=utf8") session = sessionmaker(bind=engine)() session.query(models.Classes).filter(models.Classes.id > 0).update({models.Classes.name: models.Classes.name + '666'}, synchronize_session=False) session.commit() session.close()
import models from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine,text engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/day128?charset=utf8") session = sessionmaker(bind=engine)() # 1 别名查询 result = session.query(models.Classes.id, models.Classes.name.label('xx')).all() # label 起别名 for item in result: print(item.id, item.xx) # 2. filter接表达式/filter_by接参数 r3 = session.query(models.Classes).filter(models.Classes.name == "anne").all() r4 = session.query(models.Classes).filter_by(name='anne').all() # 3. 子查询 result = session.query(models.Classes).from_statement(text("SELECT * FROM classes where name=:name")).params(name='ed').all() result = session.query(models.Classes).from_statement(text("SELECT * FROM classes where name=:name")).params(name='ed') # 子查询 ret = session.query(models.Classes).filter(models.Classes.id.in_(session.query(models.Classes.id).filter_by(name='eric'))).all() # 关联子查询 subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar() result = session.query(Group.name, subqry) session.close()
import models from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/day128?charset=utf8") session = sessionmaker(bind=engine)() 1.在学生表中插入数据 obj = models.Student(username='anne', password=123, class_id=3) obj = models.Student(username='bob', password=123, class_id=3) obj = models.Student(username='fry', password=123, class_id=3) session.add(obj) session.commit() 2. 在学生表中找到anne obj = session.query(models.Student).filter(models.Student.username=='anne').first() print(obj.id) 3. 找到所有学生,并打印信息 方法1 objs =session.query(models.Student).all() for obj in objs: cls_obj = session.query(models.Classes).filter(models.Classes.id ==obj.class_id).first() print(obj.id,obj.username,obj.class_id,cls_obj.name) 方法2 objs = session.query(models.Student.id, models.Student.username, models.Classes.name).join(models.Classes,isouter=True).all() print(objs) 方法3 objs = session.query(models.Student).all() for item in objs: print(item.id, item.username, item.class_id, item.cls.name) # 4. 第三期所有的学生 obj = session.query(models.Classes).filter(models.Classes.id == 3).first() student_list = obj.stus # 反向查询 for item in student_list: print(item.id, item.username) session.close()
.