zoukankan      html  css  js  c++  java
  • SQLAlchemy的使用

    1. 介绍

      SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

    2. orm单表操作

      2.1 orm操作数据表

    from coreschema import Integer
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
    
    
    
    Base = declarative_base()
    
    class Users(Base):
        __tablename__ = "users"
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), nullable=False, index=True)
        depart_id = Column(Integer)
    
    
    
    # 可以将 engine 提取出来共用
    engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8",
            max_overflow=0,     # 超过连接池大小外最多创建的连接
            pool_size=5,        # 连接池大小
            pool_timeout=10,    # 池中没有线程时最多等待的时间,否则报错
            pool_recycle=-1     # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
    
    def create_tb():
        """
        创建表
        :return: 
        """
        engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8",
            max_overflow=0,     # 超过连接池大小外最多创建的连接
            pool_size=5,        # 连接池大小
            pool_timeout=10,    # 池中没有线程时最多等待的时间,否则报错
            pool_recycle=-1     # 多久之后对线程池中的线程进行一次连接的回收(重置),-1表示不重置
        )
        Base.metadata.create_all(engine)    # create_all() 必须是这个函数名, 表示创建表
    
    
    def drop_tb():
        """
        删除表
        :return: 
        """
        engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8",
            max_overflow=0,     # 超过连接池大小外最多创建的连接
            pool_size=5,        # 连接池大小
            pool_timeout=10,    # 池中没有线程时最多等待的时间,否则报错
            pool_recycle=-1     # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
        Base.metadata.drop_all(engine)
    
    # 执行操作
    create_tb()
    drop_tb()

      2.2 orm操作数据行

    from sqlalchemy import create_engine, and_, or_
    from sqlalchemy.orm import sessionmaker
    from models import Users
    
    
    
    engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8",
            max_overflow=0,     # 超过连接池大小外最多创建的连接
            pool_size=5,        # 连接池大小
            pool_timeout=10,    # 池中没有线程时最多等待的时间,否则报错
            pool_recycle=-1     # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    
    # 创造一个连接池 SessionFactory
    = sessionmaker(bind=engine) # 每次进行数据库操作时都需要创建一个session, 表示从连接池中获取到一个连接 session = SessionFactory() ###################### 单表的 crud 操作 ''' # 1 增加 user = Users(name="张三") session.add(user) session.commit() # 批量增加 session.add_all([ Users(name="李四"), Users(name="王五"), Users(name="赵六"), ]) session.commit() # 2 查找 result = session.query(Users).all() print(result) # [Users object, Users object, Users object, .....] for item in result: print(item.id, "----",item.name) result = session.query(Users).filter(Users.id >= 3) print(result) # sql 语句 for i in result: print(i.name) result = session.query(Users).filter(Users.id >= 3).first() print(result.name) # 3 删除 ret = session.query(Users).filter(Users.id > 3).delete() print(ret) # 1 返回删除的数据量 session.commit() # 4 修改 ret = session.query(Users).filter(Users.id == 1).update({Users.name:"张三三"}) print(ret) # 1 返回修改的数据量 session.commit() session.query(Users).filter(Users.id == 2).update({"name":"李思思"}) session.commit() session.query(Users).filter(Users.id == 3).update({"name":Users.name+"老五"}, synchronize_session=False) # 当要拼接字符串时候, 需要加参数 synchronize_session=False # 数字相加时, 不需要这个参数 session.commit() ###################### 单表的其他操作 # 1 获取指定字段信息 ret = session.query(Users.id,Users.name,Users.name.label("nick_name")).all() print(ret) for i in ret: print(i[0],i.name,i.nick_name) #2 and, 两种方式一样 ret = session.query(Users).filter(Users.id==2,Users.name=="李思思").all() print(ret) ret = session.query(Users).filter(and_(Users.id==2,Users.name=="李思思")).all() print(ret) # 3 or ret = session.query(Users).filter(or_(Users.id==1,Users.id==2)).all() print(ret) # and 和 or 结合使用 ret = session.query(Users).filter(or_( Users.id == 1, and_(Users.name=="李思思", Users.id>=1), )).all() print(ret) # 4 between, 闭区间, 包括前后范围(1,3) ret = session.query(Users).filter(Users.id.between(1,3)).all() print(ret) # 5 in 和 ~in(就是not in) ret = session.query(Users).filter(Users.id.in_([1,2,3,4])).all() print(ret) ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all() print(ret) # 6 子查询 ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(or_(Users.id>2, Users.name=="张三三")))).all() print(ret) # 7 filter_by, 里面是参数, 不是语句(比如 name == "李思思") ret = session.query(Users).filter_by(name="李思思").all() print(ret) # 8 通配符 ( % :表示0个或多个, _ :表示一个) ret = session.query(Users).filter(Users.name.like("李%")).all() print(ret) ret = session.query(Users).filter(~Users.name.like("李%")).all() print(ret) ret = session.query(Users).filter(Users.name.like("李__")).all() print(ret) # 9 切片 ret = session.query(Users).filter(Users.id>=1)[1:3] print(ret) # 10 order_by 排序 ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 11 group_by 分组 from sqlalchemy.sql import func ret = session.query( Users.depart_id, func.count(Users.id) ).group_by(Users.depart_id).all() for i in ret: print(i) # group_by 和 having 结合使用 from sqlalchemy.sql import func ret = session.query( Users.id, func.count(Users.id) ).group_by(Users.depart_id).having(func.count(Users.id)>=2).all() print(ret) for i in ret: print(i) # 12 union 和 union_all # select id,name from users UNION select id,name from users; q1 = session.query(Users.name).filter(Users.id > 3) q2 = session.query(Users.name).filter(Users.id >= 1) ret = q1.union(q2).all() # union() 去重 print(ret) # [('赵六',), ('张三三',), ('李思思',), ('王五老五',)] q1 = session.query(Users.name).filter(Users.id > 3) q2 = session.query(Users.name).filter(Users.id >= 1) ret = q1.union_all(q2).all() # union_all() 不去重 print(ret) # [('赵六',), ('张三三',), ('李思思',), ('王五老五',), ('赵六',)] '''

     3. orm多表操作

      3.1 一对多操作

       操作数据表

    from sqlalchemy import Column, create_engine, UniqueConstraint
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Integer, String, ForeignKey
    from sqlalchemy.orm import relationship
    
    Base = declarative_base()
    
    class Depart(Base):
        __tablename__ = "depart"
    
        id = Column(Integer, primary_key=True)
        title = Column(String(32), nullable=True, index=True)
    
    
    class Users(Base):
        __tablename__ = "users"
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), nullable=True, index=True)
        dep_id = Column(Integer, ForeignKey("depart.id"))
        dp = relationship("Depart", backref="uuss")
    
    
    def create_tb():
        engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=10,  # 池中没有线程时最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
        Base.metadata.create_all(engine)
    
    
    create_tb()

    注意: 为什么有了 foreignkey 外键, 还要 relationship ? 这两者什么关系?

      relationship 是为了简化联合查询 join 等, 创建的两个表之间的虚拟关系, 这种关系与标的结构时无关的. 他与外键十分相似, 确实, relationship 必须在外键的基础上才允许使用, 不然会报错.  

      如果上面 User 模型类中只存在 foreignkey, 需要使用手写 join 才能取出数据; 而如果有 relationship 则可以使两张表之间产生关联, 类似于合成一张表, 可以直接取出数据而不需要手写 join 操作, 具体使用如下. 

      操作数据行

    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    
    from models import Users, Depart
    
    
    
    engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8",
            max_overflow=0,     # 超过连接池大小外最多创建的连接
            pool_size=5,        # 连接池大小
            pool_timeout=10,    # 池中没有线程时最多等待的时间,否则报错
            pool_recycle=-1     # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    
    SessionFactory = sessionmaker(bind=engine)
    session = SessionFactory()
    
    """
    # 1 查询所有用户
    ret = session.query(Users).all()
    print(ret)
    
    
    # 2 查询所有用户及对应的部门(不用relationship的操作)
    ret = session.query(Users,Depart).join(Depart).all()
    for i in ret:
        print(i[0].id, i[0].name, i[1].title)
    
    
    ret = session.query(Users.id, Users.name, Depart.title).join(Depart, Users.dep_id == Depart.id).all()
                                                                # 默认有 Users.dep_id == Depart.id, 并且是 inner join
    for i in ret:
        print(i.id, i.name, i.title)
    
    
    ret = session.query(Users.id, Users.name, Depart.title).join(Depart, Users.dep_id == Depart.id, isouter=True).all()
                                                                # isouter=True , 表示 左外连接
    for i in ret:
        print(i.id, i.name, i.title)
    
    
    # 3 relationship 使用(有了relationship后就可以不用手写join()连表)
    # 正查: 查询每个人及其所在部门名称
    ret = session.query(Users).all()
    for i in ret:
        print(i.id, i.name, i.dp.title)
    
    # 反查: 查询it部门的人员名称
    ret = session.query(Depart).filter(Depart.title=="it").first()
    for i in ret.uuss:
        print(ret.title, i.id, i.name)
    
    
    # 4 创建一个研发部门, 并在该部门中添加一个员工--霸天虎
    user = Users(name="霸天虎", dp=Depart(title="研发"))
    session.add(user)
    session.commit()
    """
    
    # 5 创建一个汽车人的部门, 并添加员工--大黄蜂, 威震天, 小蜜蜂
    depart = Depart(title="汽车人")
    depart.uuss = [Users(name="威震天"), Users(name="大黄蜂"), Users(name="小蜜蜂")]
    session.add(depart)
    session.commit()

      3.2 多对多操作

       操作数据表

    from sqlalchemy import Column, create_engine, UniqueConstraint
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Integer, String, ForeignKey
    from sqlalchemy.orm import relationship
    
    Base = declarative_base()
    
    
    class Student(Base):
        __tablename__ = "student"
        id = Column(Integer, primary_key=True)
        name = Column(String(32), nullable=True, index=True)
        course_lst = relationship("Course", secondary="student2course", backref="student_lst")   # 添加了多对多信息
    
    
    
    class Course(Base):
        __tablename__ = "course"
        id = Column(Integer, primary_key=True)
        title = Column(String(32), nullable=True, index=True)
    
    
    
    class Student2Course(Base):
        __tablename__ = "student2course"
        id = Column(Integer, primary_key=True)
        sid = Column(Integer, ForeignKey("student.id"))
        cid = Column(Integer, ForeignKey("course.id"))
    
        __table_args__ = (
            UniqueConstraint('sid', 'cid', name='unix_stu_cou'),        # 联合唯一索引
            # Index('ix_id_name', 'name', 'extra'),                     # 联合索引
        )

       操作数据行

    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    
    from models import Users, Depart, Student, Course, Student2Course
    
    
    engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8",
            max_overflow=0,     # 超过连接池大小外最多创建的连接
            pool_size=5,        # 连接池大小
            pool_timeout=10,    # 池中没有线程时最多等待的时间,否则报错
            pool_recycle=-1     # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    
    SessionFactory = sessionmaker(bind=engine)
    session = SessionFactory()
    
    """
    # 1 录入数据
    session.add_all([
        Student(name="猫女"),
        Student(name="薇恩"),
        Course(title="ad"),
        Course(title="ap")
    ])
    session.commit()
    
    
    session.add_all([
        Student2Course(sid=1,cid=1),
        Student2Course(sid=1,cid=2),
        Student2Course(sid=2,cid=1),
    ])
    session.commit()
    
    # 2 每个学生选的所有课程名 (不用多对多字段)
    ret = session.query(Student2Course.id, Student.id, Student.name, Course.title).join(Student,Student2Course.sid==Student.id,isouter=True).join(Course,Student2Course.cid==Course.id,isouter=True).all()
    print(ret)
    for i in ret:
        print(i)
    
    # 3. 猫女 选的所有课 (不用多对多字段)
    ret = session.query(Student2Course.id, Student.id, Student.name, Course.title).join(Student,Student2Course.sid==Student.id,isouter=True).join(Course,Student2Course.cid==Course.id,isouter=True).filter(Student.name=="猫女").all()
    for i in ret:
        print(i)
    
    # 3. 猫女 选的所有课 (用了多对多字段)
    student = session.query(Student).filter(Student.name=="猫女").first()
    for i in student.course_lst:
        print(student.name, i.title)
    
    # 4. 选了 ap课 的所有人
    course = session.query(Course).filter(Course.title=="ad").first()
    for i in course.student_lst:
        print(course.title, i.name)
    """
    
    # 5. 创建一个课程,创建2学生并选择该课程
    course = Course(title="ye")
    course.student_lst = [Student(name="小强"), Student(name="狮子狗"), Student(name="斯巴达")]
    session.add(course)
    session.commit()

     4. 多线程连接数据库连接池

      4.1 方式1: scoped_session ( 推荐使用 )

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from sqlalchemy.orm import scoped_session
    from models import Student,Course,Student2Course
    
    engine = create_engine(
            "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,     # 连接池大小
            pool_timeout=10, # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    SessionFactory = sessionmaker(bind=engine)
    # scoped_session里面维护着一个 threading.local(), 为每一个线程独立开辟一个空间,在这里就可以放每一个线程的连接
    session = scoped_session(SessionFactory)
    
    
    def task():
        # 进行数据库操作
        ret = session.query(Student).all()
    
        # 将连接交还给连接池
        session.remove()
    
    
    
    
    from threading import Thread
    
    for i in range(20):
        t = Thread(target=task)
        t.start()

      4.2 方式2: 在执行函数中获取连接

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import Student, Course, Student2Course
    
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/peiqi_120?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,     # 连接池大小
        pool_timeout=10, # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    SessionFactory = sessionmaker(bind=engine)
    
    
    def task():
        # 去连接池中获取一个连接
        session = SessionFactory()
    
        # 数据库操作
        ret = session.query(Student).all()
    
        # 将连接交还给连接池
        session.close()
    
    
    
    
    
    from threading import Thread
    
    for i in range(20):
        t = Thread(target=task)
        t.start()
  • 相关阅读:
    multition pattern
    singleton pattern
    strategy pattern
    设置Activity的属性
    iphone自动旋转与调整大小
    游戏编程从哪里开始呢
    TTF字体文件使用
    TextMate介绍
    ios程序崩溃处理
    ios笔试题
  • 原文地址:https://www.cnblogs.com/bk9527/p/10994509.html
Copyright © 2011-2022 走看看