zoukankan      html  css  js  c++  java
  • Flask-SQLAlchemy

    wusir

    一、介绍

    SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

    pip3 install sqlalchemy

    SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:

    MySQL-Python
        mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
        
    pymysql
        mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
        
    MySQL-Connector
        mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
        
    cx_Oracle
        oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
        

    二、基本使用

    1、执行原生sql语句

      见wusir

    2、ORM

    (1)创建数据库表

    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column
    from sqlalchemy import Integer,String
    from sqlalchemy import create_engine
    
    
    Base = declarative_base()
    
    
    class Users(Base):
        __tablename__ = 'users'  #为数据库中生成的表命名
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=False)
        depart_id = Column(Integer)
    
    
    #创建数据库连接及连接池
    def create_all():
        engine = create_engine(
            "mysql+pymysql://root:2180736@127.0.0.1:3306/s9day120?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置),-1表示不回收
        )
    
        Base.metadata.create_all(engine) #找到当前py下的继承了Base的所有的类,把它在数据库中生成一张表
    
    #
    # def drop_all():
    #     engine = create_engine(
    #         "mysql+pymysql://root:2180736@127.0.0.1:3306/s9day120?charset=utf8",
    #         max_overflow=0,  # 超过连接池大小外最多创建的连接
    #         pool_size=5,  # 连接池大小
    #         pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    #         pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    #     )
    #     Base.metadata.drop_all(engine)
    #
    if __name__ == '__main__':
    
        create_all()
        # drop_all()
    创建单表 
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column
    from sqlalchemy import Integer,String,ForeignKey,UniqueConstraint
    from sqlalchemy import create_engine
    from sqlalchemy.orm import relationship
    
    
    Base = declarative_base()
    
    # ##################### 一对多示例 #########################
    
    class Depart(Base):
        __tablename__ = 'depart'
        id = Column(Integer, primary_key=True)
        title = Column(String(32), index=True, nullable=False)
    
    class Users(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=False)
        depart_id = Column(Integer,ForeignKey("depart.id"))    #创建ForeignKey
    
        # 与生成表结构无关,仅用于查询方便
        dp = relationship("Depart", backref='pers')
        #通过dp可直接进行跨表查询,backref='pers'用于反向查询
    
    
    # ##################### 多对多示例 #########################
    
    class Student(Base):
        __tablename__ = 'student'
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=False)
    
        # 与生成表结构无关,仅用于查询方便:根据student2course表与Course表做关联
        course_list = relationship('Course', secondary='student2course', backref='student_list')
    
    class Course(Base):
        __tablename__ = 'course'
        id = Column(Integer, primary_key=True)
        title = Column(String(32), index=True, nullable=False)
    
    class Student2Course(Base):
        __tablename__ = 'student2course'
        id = Column(Integer, primary_key=True, autoincrement=True)
        student_id = Column(Integer, ForeignKey('student.id'))
        course_id = Column(Integer, ForeignKey('course.id'))
    
        __table_args__ = (
            UniqueConstraint('student_id', 'course_id', name='uix_stu_cou'), # 联合唯一索引
            # Index('ix_id_name', 'name', 'extra'),                          # 联合索引
        )
    
    def create_all():
        engine = create_engine(
            "mysql+pymysql://root:2180736@127.0.0.1:3306/s9day120?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
        Base.metadata.create_all(engine)
    
    def drop_all():
        engine = create_engine(
            "mysql+pymysql://root:2180736@127.0.0.1:3306/s9day120?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
        Base.metadata.drop_all(engine)
    
    if __name__ == '__main__':
        # drop_all()
        create_all()
    创建多表包含FK,M2M关系

    (2)操作数据库表

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import Users
    
    # 连接数据库
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    SessionFactory = sessionmaker(bind=engine)
    
    # 每次执行数据库操作时,都需要创建一个session
    session = SessionFactory()
    
    # ############# 执行ORM操作 #############
    obj1 = Users(name="alex1")
    session.add(obj1)
    
    # 提交事务
    session.commit()
    # 关闭session
    session.close()

    基本增删改查示例:

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import Users
    
    #连接数据库
    engine = create_engine(
            "mysql+pymysql://root:2180736@127.0.0.1:3306/s9day120?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    SessionFactory = sessionmaker(bind=engine)
    
    # 根据Users类对users表进行增删改查
    session = SessionFactory()
    
    # ############################## 基本增删改查 ###############################
    # 1. 增加
    #增加单条数据
    #创建对象
    # obj = Users(name='alex')
    
    # session.add(obj)
    # session.commit()
    
    #增加多条数据
    # session.add_all([
    #         Users(name='小东北'),
    #         Users(name='龙泰')
    # ])
    # session.commit()
    
    # 2. 查
    # result = session.query(Users).all()
    # print(result)
    '''
    [<models.Users object at 0x038C1710>, 
    <models.Users object at 0x038C1750>, 
    <models.Users object at 0x038C1790>]
    '''
    # for row in result:
    #         print(row.id,row.name)
    '''
    1 alex
    2 小东北
    3 龙泰
    '''
    
    #加条件进行查找
    # result = session.query(Users).filter(Users.id >= 2)
    # for row in result:
    #         print(row.id,row.name)
    '''
    2 小东北
    3 龙泰
    '''
    # result = session.query(Users).filter(Users.id >= 2).first()
    # print(result)  #<models.Users object at 0x03E25890>
    
    # 3.删
    # session.query(Users).filter(Users.id >= 2).delete()
    # session.commit()
    
    # 4.改
    # session.query(Users).filter(Users.id == 4).update({Users.name:'东北'})
    # session.query(Users).filter(Users.id == 4).update({'name':'小东北'})
    session.query(Users).filter(Users.id == 4).update({'name':Users.name+"DSB"},synchronize_session=False)
    #默认 synchronize_session = "evaluate" 进行数值的计算
    session.commit()
    
    session.close()
    # ############################## 常用操作 ###############################
    # 1. 指定列
    # select id,name as cname from users;
    result = session.query(Users.id,Users.name.label('cname')).all()
    for item in result:
            print(item)
            print(item[0],item.id,item.cname)
    '''
    (1, 'alex')
    1 1 alex
    (4, '小东北DSB')
    4 4 小东北DSB
    '''
    # 2. 默认条件and
    session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
    # 3. between
    session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
    # 4. in
    session.query(Users).filter(Users.id.in_([1,3,4])).all()
    session.query(Users).filter(~Users.id.in_([1,3,4])).all()  # 相当于 not in
    # 5. 子查询
    session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='eric'))).all()
    # 6. and 和 or
    from sqlalchemy import and_, or_
    session.query(Users).filter(Users.id > 3, Users.name == 'eric').all() #默认是and关系
    session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
    session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
    session.query(Users).filter(
        or_(
            Users.id < 2,
            and_(Users.name == 'eric', Users.id > 3),
            Users.extra != ""
        )).all()
    
    # 7. filter_by
    session.query(Users).filter_by(name='alex').all()
    
    # 8. 通配符
    ret = session.query(Users).filter(Users.name.like('e%')).all()
    # ret = session.query(Users).filter(~Users.name.like('e%')).all()
    
    # 9. 切片
    result = session.query(Users)[1:2]
    
    # 10.排序
    ret = session.query(Users).order_by(Users.name.desc()).all()
    ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
    #先按name从大到小排,如果name重名按照id从小到大排
    
    # 11. group by
    from sqlalchemy.sql import func
    
    ret = session.query(
            Users.depart_id,
            func.count(Users.id),
    ).group_by(Users.depart_id).all()
    print(ret) #[(1, 2), (2, 1), (3, 1)]
    for item in ret:
            print(item)
    '''
    (1, 2)
    (2, 1)
    (3, 1)
    '''
    
    #having 根据聚合条件进行二次筛选
    from sqlalchemy.sql import func
    
    ret = session.query(
            Users.depart_id,
            func.count(Users.id),
    ).group_by(Users.depart_id).having(func.count(Users.id) >= 2).all()
    for item in ret:
            print(item)
    (1, 2)
    #
    # 12.组合
    # union 去重
    # union all:上下拼接不去重
    # """
    # select id,name from users
    # UNION
    # select id,name from users;
    # """
    q1 = session.query(Users.name).filter(Users.id > 2)
    q2 = session.query(Favor.caption).filter(Favor.nid < 2)
    ret = q1.union(q2).all()
    
    q1 = session.query(Users.name).filter(Users.id > 2)
    q2 = session.query(Favor.caption).filter(Favor.nid < 2)
    ret = q1.union_all(q2).all()
    常用操作 
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import Users,Depart
    
    engine = create_engine(
            "mysql+pymysql://root:2180736@127.0.0.1:3306/s9day120?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    SessionFactory = sessionmaker(bind=engine)
    
    # 根据Users类对users表进行增删改查
    session = SessionFactory()
    
    # ##################### 一对多示例 #########################
    
    
    # 1. 查询所有用户
    ret = session.query(Users).all()
    for row in ret:
        print(row.id,row.name,row.depart_id)
    # 2. 查询所有用户+所属部门名称
    ret = session.query(Users.id,Users.name,Depart.title).join(Depart,Users.depart_id == Depart.id).all()
    for row in ret:
        print(row.id,row.name,row.title)
    
    # query = session.query(Users.id,Users.name,Depart.title).join(Depart,Users.depart_id == Depart.id,isouter=True)
    ## 默认是INNER JOIN ,加上isouter=True 变为LEFT JOIN
    # print(query)
    '''
    # SELECT users.id AS users_id, users.name AS users_name, depart.title AS depart_title 
    # FROM users LEFT OUTER JOIN depart ON users.depart_id = depart.id
    '''
    
    # 3. relation字段:查询所有用户+所属部门名称  --跨表操作
    dp = relationship("Depart", backref='pers')
    ret = session.query(Users).all()
    for row in ret:
        print(row.id,row.name,row.depart_id,row.dp.title)
    
    # 4. relation字段:查询销售部所有的人员   --跨表操作  --反向查询
    obj = session.query(Depart).filter(Depart.title == '销售').first()
    for row in obj.pers:
        print(row.id,row.name,obj.title)
    
    # 5. 创建一个名称叫:IT部门,再在该部门中添加一个员工:田硕
    # 方式一:
    d1 = Depart(title='IT')   #创建部门
    session.add(d1)
    session.commit()
    
    u1 = Users(name='田硕',depart_id=d1.id) #创建员工
    session.add(u1)
    session.commit()
    
    # 方式二:
    u1 = Users(name='田硕',dp=Depart(title='IT')) #一次性创建关联的数据
    session.add(u1)
    session.commit()
    
    # 6. 创建一个名称叫:王者荣耀,再在该部门中添加一个员工:龚林峰/长好梦/王爷们
    d1 = Depart(title='王者荣耀') #创建部门
    d1.pers = [Users(name='龚林峰'),Users(name='长好梦'),Users(name='王爷们'),]  #部门中再创建三个人
    session.add(d1)
    session.commit()
    
    session.close()
    基与relationship操作ForeignKey
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import Student,Course,Student2Course,Users
    
    
    engine = create_engine(
            "mysql+pymysql://root:2180736@127.0.0.1:3306/s9day120?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    SessionFactory = sessionmaker(bind=engine)
    
    # 根据Users类对users表进行增删改查
    session = SessionFactory()
    
    # ##################### 多对多示例 #########################
    
    # 1. 录入数据
    # session.add_all([
    #     Student(name='先用'),
    #     Student(name='佳俊'),
    #     Course(title='生物'),
    #     Course(title='体育'),
    # ])
    # session.commit()
    
    # session.add_all([
    #     Student2Course(student_id=2,course_id=1)
    # ])
    # session.commit()
    
    # 2. 三张表关联
    # ret = session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id,isouter=True).join(Course,Student2Course.course_id==Course.id,isouter=True).order_by(Student2Course.id.asc())
    # for row in ret:
    #     print(row)
    # 3. 查找“先用”选的所有课
    #方式一 麻烦
    # ret = session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id,isouter=True).join(Course,Student2Course.course_id==Course.id,isouter=True).filter(Student.name=='先用').order_by(Student2Course.id.asc()).all()
    # print(ret)
    
    # ##################### relationship的使用 #########################
    # course_list = relationship('Course', secondary='student2course', backref='student_list')
    
    
    # 方式二
    # obj = session.query(Student).filter(Student.name=='先用').first() #先找到先用这个人
    # for item in obj.course_list:  #再拿到和他相关联的课程
    #     print(item.title)
    
    # 4. 选了“生物”的所有人
    # obj = session.query(Course).filter(Course.title=='生物').first()
    # for item in obj.student_list:
    #     print(item.name)
    
    # 5. 创建一个课程,创建2学生,两个学生选新创建的课程。
    obj = Course(title='英语')
    obj.student_list = [Student(name='为名'),Student(name='广宗')]
    session.add(obj)
    #上面会帮我们在表的内部创建关系,帮我们对三张表做操作:
          #在Course表增加一条数据
          #在Student表增加两条数据
          #在关系表中增加两条数据
    
    session.commit()
    
    
    session.close()
    基于relationship操作M2M
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from sqlalchemy.orm import scoped_session
    
    engine = create_engine(
            "mysql+pymysql://root:2180736@127.0.0.1:3306/s9day120?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    SessionFactory = sessionmaker(bind=engine)
    session = scoped_session(SessionFactory)
    
    
    def task():
        """"""
        # 方式一:
        """
        # 查询
        # cursor = session.execute('select * from users')
        # result = cursor.fetchall()
    
        # 添加
        cursor = session.execute('INSERT INTO users(name) VALUES(:value)', params={"value": 'wupeiqi'})
        session.commit()
        print(cursor.lastrowid)
        """
        # 方式二:
        """
        # conn = engine.raw_connection()
        # cursor = conn.cursor()
        # cursor.execute(
        #     "select * from t1"
        # )
        # result = cursor.fetchall()
        # cursor.close()
        # conn.close()
        """
    
        # 将连接交还给连接池
        session.remove()
    
    
    from threading import Thread
    
    for i in range(20):
        t = Thread(target=task)
        t.start()
    原生SQL语句

    三、SQLALchemy连接的两种方式

    方式1:把session = SessionFactory()定义在函数里面

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import Student,Course,Student2Course
    
    engine = create_engine(
            "mysql+pymysql://root:2180736@127.0.0.1:3306/s9day120?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    SessionFactory = sessionmaker(bind=engine)
    
    def task():
        # 去连接池中获取一个连接
        session = SessionFactory()
    
        ret = session.query(Student).all()
    
        # 将连接交还给连接池
        session.close()
    
    
    from threading import Thread
    
    for i in range(20):
        t = Thread(target=task)
        t.start()
    方式1

    方式2:推荐,基于Threading.local实现

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from sqlalchemy.orm import scoped_session
    from models import Student,Course,Student2Course
    
    engine = create_engine(
            "mysql+pymysql://root:2180736@127.0.0.1:3306/s9day120?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    SessionFactory = sessionmaker(bind=engine)
    
    session = scoped_session(SessionFactory)
    '''
    {
        1211:连接1,
        1212:连接2,
        1213:连接3,
        1214:连接4,
        1215:连接5,
        1216:连接6,
    }
    '''
    
    def task():
        ret = session.query(Student).all()
        #执行session.query的时候自动会去连接池中拿一个连接过来,放到当前线程的threading.local中去
    
        # 将连接交还给连接池
        session.remove()
    
    
    from threading import Thread
    
    for i in range(20):
        t = Thread(target=task)
        t.start()
    View Code
  • 相关阅读:
    Windows 7安装 OneDrive
    MySQL8.0降级为MySQL5.7
    Windows和Linux下安装Rsync
    Jenkins持续集成工具安装
    Pure-Ftpd安装配置
    redis安装配置
    Tcp粘包处理
    .Net Core Socket 压力测试
    使用RpcLite构建SOA/Web服务(Full .Net Framework)
    使用RpcLite构建SOA/Web服务
  • 原文地址:https://www.cnblogs.com/zh-xiaoyuan/p/13261952.html
Copyright © 2011-2022 走看看