zoukankan      html  css  js  c++  java
  • SQLALchemy之ORM操作

    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column
    from sqlalchemy import Integer,String,Text,Date,DateTime,ForeignKey,UniqueConstraint, Index
    from sqlalchemy import create_engine
    from sqlalchemy.orm import relationship
    
    
    Base = declarative_base()
    
    class Depart(Base):
        __tablename__ = 'depart'
        id = Column(Integer, primary_key=True)
        title = Column(String(32), index=True, nullable=False)
    
    class Users(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=False)
        depart_id = Column(Integer,ForeignKey("depart.id"))
    
        dp = relationship("Depart", backref='pers')
    
    class Student(Base):
        __tablename__ = 'student'
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=False)
    
        course_list = relationship('Course', secondary='Student2Course', backref='student_list')
    
    class Course(Base):
        __tablename__ = 'course'
        id = Column(Integer, primary_key=True)
        title = Column(String(32), index=True, nullable=False)
    
    class Student2Course(Base):
        __tablename__ = 'student2course'
        id = Column(Integer, primary_key=True, autoincrement=True)
        student_id = Column(Integer, ForeignKey('student.id'))
        course_id = Column(Integer, ForeignKey('course.id'))
    
        __table_args__ = (
            UniqueConstraint('student_id', 'course_id', name='uix_stu_cou'), # 联合唯一索引
            # Index('ix_id_name', 'name', 'extra'),                          # 联合索引
        )
    
    def create_all():
        engine = create_engine(
            "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
        Base.metadata.create_all(engine)
    
    def drop_all():
        engine = create_engine(
            "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
        Base.metadata.drop_all(engine)
    
    if __name__ == '__main__':
        # drop_all()
        create_all()
    models.py
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import Users, config, Department, Student, Course
    
    Session = sessionmaker(bind=config())
    
    # 每次执行数据库操作时,都需要创建一个session
    session = Session()
    
    # ############# 执行ORM操作 #############
    # 添加一条记录,也就是实例化一个对象
    # obj1 = Users(name="alex1")
    # session.add(obj1)
    
    # 添加多条数据
    # session.add_all([Users(name="aike"),
    #                  Users(name="yoyo")])
    
    # 删除一条数据
    # session.query(Users).filter(Users.id == 1).delete()
    
    # 更新
    # session.query(Users).filter(Users.id == 2).update({"name": "jack"})
    # session.query(Users).filter(Users.id == 2).update({"name": Users.name + "kkkkk",}, synchronize_session=False)
    
    # 查询
    # ret = session.query(Users).all()
    # print(ret)
    # for items in ret:
    #     print(items.id, items.name)
    
    # 加条件查询
    # 单个条件
    # ret = session.query(Users.id).filter(Users.name == "yoyo").all()
    # print(ret)
    
    # 多个条件
    # ret = session.query(Users.id).filter(Users.name == "yoyo", Users.id == 3).all()
    # print(ret)
    
    # 取区间
    # ret = session.query(Users.id).filter(Users.id.between(1,5)).all()
    # print(ret)
    
    # 在这里面
    # ret = session.query(Users.id).filter(Users.id.in_([1,2])).all()
    # print(ret)
    
    # 除了符合条件的  “~”
    # ret = session.query(Users.id).filter(~Users.id.in_([1,2])).all()
    # print(ret)
    
    # 子查询
    # ret = session.query(Users).filter(Users.id == (session.query(Users.id).filter_by(name="yoyo"))).all()
    # # ret = session.query(Users.id).filter_by(name="yoyo").all()
    # print(ret[0].name)
    
    # 条件与 条件或
    from sqlalchemy import and_, or_
    
    # ret = session.query(Users).filter(and_(Users.id < 3, Users.name == 'yoyo')).all()
    # ret = session.query(Users).filter(or_(Users.id < 3, Users.name == 'yoyo')).all()
    # ret = session.query(Users).filter(
    #     or_(
    #         Users.id >= 2,
    #         and_(Users.name == 'yoyo', Users.id < 3),
    #     )).all()
    # print(ret)
    
    
    # 通配符,%代表所有
    # ret = session.query(Users).filter(Users.name.like("y%")).all()
    # print(ret)
    
    # 切片
    # ret = session.query(Users).filter(Users.name.like("y%"))[2:5]
    # print(ret)
    
    
    # 查询后返回记录对象
    # ret = session.query(Users).filter_by(name='yoyo').first()
    # print(ret.name)
    
    # 分组
    # ret = session.query(Users.id).group_by(Users.extra).all()
    # for items in ret:
    #     print(items)
    
    # 分组和聚合函数
    # from sqlalchemy.sql import func
    # ret = session.query(func.max(Users.id)).group_by(Users.extra).all()
    # for items in ret:
    #     print(items)
    
    # 分组和聚合函数,过滤条件having
    # from sqlalchemy.sql import func
    # ret = session.query(func.max(Users.id)).group_by(Users.extra).having(func.min(Users.id) > 2).all()
    # for items in ret:
    #     print(items)
    
    # 一对多查询
    # 正向查询
    # user = session.query(Users).filter(Users.department_id == 2).all()
    # for items in user:
    #     print(items.department.name)
    
    # 反向查询
    # departments = session.query(Department).filter(Department.id == 2).first()
    # for items in departments.to_users:
    #     print(items.name)
    # for items in user:
    #     print(items.name)
    
    # 一对多添加
    # 正向添加
    # user = Users(name='yoyo', department=Department(name='人事部'))
    # session.add(user)
    
    # 反向添加
    # department = Department(name="销售部")
    # department.to_users = [Users(name='aike'), Users(name='jack'), Users(name='dddd')]
    # session.add(department)
    
    
    # 多对多正向查找
    # student = session.query(Student).filter(Student.id == 2).first()
    # for i in student.course_list:
    #     print(i.name)
    
    # 多对多反向查找
    # course = session.query(Course).filter(Course.id == 4).first()
    # print(course.student_list)
    # for i in course.student_list:
    #     print(i.name)
    
    # 多对多正向添加
    # obj = Student(name='aike', course_list=[Course(name='语文'),Course(name='物理')])
    # session.add(obj)
    
    # 多对多反向添加
    # obj = Course(name='化学')
    # obj.student_list = [Student(name='yoyo'),Student(name='asdada')]
    # session.add(obj)
    
    
    # 连表查询  默认为inner_join,,,加上isouter=True为left join
    ret = session.query(Department.name, Users.name).join(Users, isouter=True).all()
    print(ret)
    
    
    # 提交事务
    session.commit()
    # 关闭session
    session.close()
    View Code

    1、仍然要创建引擎

    2、创建session会话

    (1)方式一

    engine =create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    
    # 每次执行数据库操作时,都需要创建一个session
    session = Session()

    (2)方式二

    基于threading.local的session

    from sqlalchemy.orm import scoped_session
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    engine =create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    session = scoped_session(Session)

    3、插入记录

    (1)添加一条

    obj1 = Users(name="wupeiqi")
    session.add(obj1)
    
    session.commit()

    (2)添加多条

    session.add_all([
        Users(name="wupeiqi"),
        Users(name="alex"),
        Hosts(name="c1.com"),
    ])
    session.commit()

    4、删除记录

    delete方法

    session.query(Users).filter(Users.id > 2).delete()
    session.commit()

    5、更新记录

    update方法

    (1)字典形式传参

    # 字典中key就是定义类中字段名,value就是要更新的值
    session.query(Users).filter(Users.id > 0).update({"name" : "099"})
    
    session.commit()

    (2)实现字段的相加

    # 相加时类名.字段 = 类名.字段 + xxx
    
    # 字符串类型的相加,注意必须要加参数synchronize_session=False
    session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
    
    # 数字类型的相加,注意必须加参数synchronize_session="evaluate"
    session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")
    session.commit()

    6、查询记录

    (1)查询所有数据

    》》》query中要写类名,不是__tablename__参数值

    》》》返回列表,元素就是一个个记录对象

    》》》循环列表,记录对象.字段名就可以取出字段值

    r1 = session.query(Users).all()

    (2)查询指定数据

    》》》query中写类名.字段名,就可以取出我们指定的字段了

    》》》类名.字段名.label("ssss")   是给这个字段取一个别名,类似mysql中的as

    》》》返回列表,元素是一个个记录对象,记录对象.字段名或者.ssss就可以取出指定的字段

    r2 = session.query(Users.name.label('xx'), Users.age).all()

    (3)加条件的查询

    1)单个条件

    》》》filter中写表示式,就类似python中的判断条件,比如类名.字段名 == "xxxx";类名.id  > 2

    》》》filter_by中写参数,比如类名.字段名 = "sss"

    》》》仍然返回一个个记录对象的列表

    r3 = session.query(Users).filter(Users.name == "alex").all()
    r4 = session.query(Users).filter_by(name='alex').all()

    2)多个条件,条件之间默认是and关系

    ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()

    3)between,在1,2,3中的某个值

    ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()

    4).in_的用法

    ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()

    5)~是 非 的意思,相反

    ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()

    6)查询语句的返回值也可以作为查询条件

    ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()

    7)表示与或非的查询条件

    from sqlalchemy import and_, or_
    ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
    ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
    ret = session.query(Users).filter(
        or_(
            Users.id < 2,
            and_(Users.name == 'eric', Users.id > 3),
            Users.extra != ""
        )).all()

    8)通配符,%代表所有

    ret = session.query(Users).filter(Users.name.like('e%')).all()
    ret = session.query(Users).filter(~Users.name.like('e%')).all()

    9)限制,切片

    ret = session.query(Users)[1:2]

    (4)first()

    查询后返回记录对象

    r5 = session.query(Users).filter_by(name='alex').first()

    (5)带占位符的过滤条件查询

    》》》text中的:name和:value代表占位符,params括号中传参

    》》》仍然返回一个个记录对象的列表

    r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred')all()

    (6)排序

    》》》order_by(类名.字段名)  默认按照字段值的升序排序

    r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all()

    》》》类名.字段名.desc() 是降序   asc()是升序

    》》》逗号隔开,代表第一个相同的话,就按第二个排序

    ret = session.query(Users).order_by(Users.name.desc()).all()
    ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

    (7)分组

    1)首先引入聚合函数

    from sqlalchemy.sql import func

    2)把某个字段作为分组依据

    ret = session.query(Users).group_by(Users.extra).all()

    3)使用聚合函数

    ret = session.query(func.max(Users.id),func.sum(Users.id),func.min(Users.id)).group_by(Users.name).all()

    4)加过滤条件having

    ret = session.query(
        func.max(Users.id),
        func.sum(Users.id),
        func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()

    (8)组合,就是inner join的基础上,保留左表和右表的全部记录

     
    q1 = session.query(Users.name).filter(Users.id > 2)
    q2 = session.query(Favor.caption).filter(Favor.nid < 2)
    ret = q1.union(q2).all()
    
    q1 = session.query(Users.name).filter(Users.id > 2)
    q2 = session.query(Favor.caption).filter(Favor.nid < 2)
    ret = q1.union_all(q2).all()

    (9)连表查询

    1)filter中是连表条件,query中写的是显示哪些字段,写类名代表显示这个表的所有字段

    ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()

    2)inner join

    》》》query括号中只有一个类,只显示这个表的所有字段

    ret = session.query(Person).join(Favor).all()

    3)left join

    》》》加参数isouter=true

    ret = session.query(Person).join(Favor, isouter=True).all()

    注意:

    》》》默认inner join

    》》》query括号中写的是显示哪些字段

    》》》可以不断的join,进行多张表的连表操作,query括号中就是要显示的字段,就写成类名.字段名这样的形式就可以,取别名用label

    》》》默认是没有right join的

    》》》默认连表条件是按foreignkey字段

    》》》session.query().join()   这样打印出来的就是连表操作的原生sql语句

    》》》若没有外键关联的字段,就要自己写连表条件

    在join中写连表条件,也可以用and_(表达式1,表达式2) 或者or_(表达式3,表达式4)来表示连表条件

    session.query(Person).join(Favor, Person.id == Favor.id, isouter=True).all()

    7、执行原生sql语句

    (1)利用text,使用占位符

    r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()

    (2)利用execute

    # 查询
    # cursor = session.execute('select * from users')
    # result = cursor.fetchall()
    
    # 添加
    cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
    session.commit()
    print(cursor.lastrowid)
    
    session.close()

    8、基于relationship的查询和添加

    from sqlalchemy.orm import relationship

    1)一对多

    首先在定义类中(表)添加relationship字段

    backref用于反向查询

        # 与生成表结构无关,仅用于查询方便
        hobby = relationship("Hobby", backref='pers')

    》》》》查询

    正向查询:对象.hobby就是关联的对象,再.字段就获取到关联对象的字段值了

    # 使用relationship正向查询
    
    v = session.query(Person).first()
    print(v.name)
    print(v.hobby.caption)

    反向查询:对象.pers(就是backref参数的值)就得到了和当前对象关联的所有记录对象的列表

    # 使用relationship反向查询
    
    v = session.query(Hobby).first()
    print(v.caption)
    print(v.pers)

    》》》》添加

    正向添加

    实例对象时,relationship字段=另一个类的对象,这样就会在关联表中也自动添加一条记录

    person = Person(name='张九', hobby=Hobby(caption='姑娘'))
    session.add(person)
    
    session.commit()

    反向添加

    实例对象.relationship字段 = [另一个类的对象1,另一个类的对象2 ],这样就会在自动被关联的表中一次创建多条记录

    hb = Hobby(caption='人妖')
    hb.pers = [Person(name='文飞'), Person(name='博雅')]
    session.add(hb)
    
    session.commit()

    2)多对多

    首先在定义类中(表)添加relationship字段

    backref用于反向查询

    secondary是自己创的关系表

        # 与生成表结构无关,仅用于查询方便
        servers = relationship('Server', secondary='server2group', backref='groups')

    》》》》查询

    正向查询

    # 使用relationship正向查询
    
    v = session.query(Group).first()
    print(v.name)
    print(v.servers)

    反向查询

    # 使用relationship反向查询
    """
    v = session.query(Server).first()
    print(v.hostname)
    print(v.groups)
    """

    》》》》添加

    正向添加

    gp = Group(name='C组')
    gp.servers = [Server(hostname='c3.com'),Server(hostname='c4.com')]
    session.add(gp)
    session.commit()

    反向添加

    ser = Server(hostname='c6.com')
    ser.groups = [Group(name='F组'),Group(name='G组')]
    session.add(ser)
    session.commit()

    9、其它操作

     
    import time
    import threading
    
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine
    from sqlalchemy.sql import text, func
    from sqlalchemy.engine.result import ResultProxy
    from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    session = Session()
    
    # 关联子查询
    subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()
    result = session.query(Group.name, subqry)
    """
    SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid 
    FROM server 
    WHERE server.id = `group`.id) AS anon_1 
    FROM `group`
    """
    
    
    # 原生SQL
    """
    # 查询
    cursor = session.execute('select * from users')
    result = cursor.fetchall()
    
    # 添加
    cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
    session.commit()
    print(cursor.lastrowid)
    """
    
    session.close()
  • 相关阅读:
    【纯水题】POJ 1852 Ants
    【树形DP】BZOJ 1131 Sta
    【不知道怎么分类】HDU
    【树形DP】CF 1293E Xenon's Attack on the Gangs
    【贪心算法】CF Emergency Evacuation
    【思维】UVA 11300 Spreading the Wealth
    【树形DP】NOI2003 逃学的小孩
    【树形DP】BZOJ 3829 Farmcraft
    【树形DP】JSOI BZOJ4472 salesman
    【迷宫问题】CodeForces 1292A A NEKO's Maze Game
  • 原文地址:https://www.cnblogs.com/aizhinong/p/11746804.html
Copyright © 2011-2022 走看看