zoukankan      html  css  js  c++  java
  • ORM之SQLAlchemy

    SQLAlchemy,ORM框架

    • 类 --> 表
    • 对象 --> 一行数据
    • 类的字段 --> 数据库表的一列

    作用:帮助我们使用类和对象快速实现数据库操作

    数据库连接

    from sqlalchemy import create_engine
    engine=create_engine('mysql+pymysql://username:password@hostname:port/dbname', echo=True) #echo=True 打印sql语句信息
    
    # '数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名'
    # 常用的
    engine = create_engine('sqlite:///:memory:', echo=True)   # sqlite内存
    engine = create_engine('sqlite:///./cnblogblog.db',echo=True) # sqlite文件
    engine = create_engine("mysql+pymysql://username:password@hostname:port/dbname",echo=True) # mysql+pymysql
    engine = create_engine('mssql+pymssql://username:password@hostname:port/dbname',echo=True) # mssql+pymssql
    engine = create_engine('postgresql://scott:tiger@hostname:5432/dbname') # postgresql示例
    engine = create_engine('oracle://scott:tiger@hostname:1521/sidname') # oracle
    engine = create_engine('oracle+cx_oracle://scott:tiger@tnsname') #pdb就可以用tns连接
    
    from sqlalchemy import create_engine, Column, Integer, String
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy.ext.declarative import declarative_base 
    engine = create_engine('oracle://spark:a@orclpdb',echo=True) #echo要求打印sql语句等调试信息
    session_maker = sessionmaker(bind=engine)
    session = session_maker()
    Base = declarative_base()
    #对应一张表
    class Student(Base): 
      __tablename__ = 'STUDENT'
      id = Column('STUID', Integer, primary_key=True)
      name = Column('STUNAME', String(32), nullable=False)
      age = Column('STUAGE', Integer)
      def __repr__(self):
        return '<Student(id:%s, name:%s, age:%s)>' % (self.id, self.name, self.age)
    Base.metadata.create_all(engine) #若存在STUDENT表则不做,不存在则创建。
    queryObject = session.query(Student).order_by(Student.id.desc())
    for ins in queryObject:
      print(ins.id, ins.name, ins.age)
    '''
    4 hey 24
    3 lwtxxs 27
    2 gyb 89
    1 ns 23
    '''
    demo

    单表操作

    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column
    from sqlalchemy import Integer,String,Text,Date,DateTime
    from sqlalchemy import create_engine
    
    
    Base = declarative_base()
    
    
    class Users(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)      # 主键,默认自增
        name = Column(String(32), index=True, nullable=False)   # 设置索引, 非空
        depart_id = Column(Integer)
    
    
    def create_all():
        engine = create_engine(
            "mysql+pymysql://root:0000@127.0.0.1:3306/sqlalchemy?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
        Base.metadata.create_all(engine)
    
    
    def drop_all():
        engine = create_engine(
            "mysql+pymysql://root:0000@127.0.0.1:3306/sqlalchemy?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
        Base.metadata.drop_all(engine)
    
    
    if __name__ == '__main__':
        # drop_all()
        create_all()
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import Users
    
    engine = create_engine(
            "mysql+pymysql://root:0000@127.0.0.1:3306/sqlalchemy?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    SessionFactory = sessionmaker(bind=engine)
    
    # 根据Users类对users表进行增删改查
    session = SessionFactory()
    
    # 1. 增加
    # obj = Users(name='zhangyafeo')
    # session.add(obj)
    # session.commit()
    
    # session.add_all([
    #         Users(name='姚明'),
    #         Users(name='科比')
    # ])
    # session.commit()
    
    # 2. 查
    # result = session.query(Users).all()
    # for row in result:
    #         print(row.id,row.name)
    
    # result = session.query(Users).filter(Users.id >= 2)
    # for row in result:
    #         print(row.id,row.name)
    
    # result = session.query(Users).filter(Users.id >= 2).first()
    # print(result)
    
    # 3.删
    # session.query(Users).filter(Users.id >= 2).delete()
    # session.commit()
    
    # 4.改
    # session.query(Users).filter(Users.id == 1).update({Users.depart_id: 1})
    # session.query(Users).filter(Users.id == 2).update({'name':'姚明'})
    # session.query(Users).filter(Users.id == 3).update({'name':Users.name+"NB"},synchronize_session=False)  # 字符串的增加
    # session.commit()
    
    
    
    session.close()
    行操作示例
    # 添加
    session.add(对象)
    session.add_all([
        对象1,
        对象2
    ])
    session.commit()
    
    # 查询
    session.query(Users).all()
    session.query(Users).filter(Users.id>4)
    
    # 删除
    session.query(Users).filter(Users.id>4).delete()
    
    # 修改
    session.query(Users).filter(Users.id>4).update({Users.age:19})
    基本的增删改查
    # 1. 指定列
    # select id,name as cname from users;
    # result = session.query(Users.id,Users.name.label('cname')).all()
    # for item in result:
    #         print(item[0],item.id,item.cname)
    # 查看sql语句
    # query = session.query(Users.id,Users.name.label('cname'))
    # print(query)
    # 2. 默认条件and
    # session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
    # 3. between
    # session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
    # 4. in
    # session.query(Users).filter(Users.id.in_([1,3,4])).all()
    # session.query(Users).filter(~Users.id.in_([1,3,4])).all()
    # 5. 子查询
    # session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='eric'))).all()
    # 6. and 和 or
    # from sqlalchemy import and_, or_
    # session.query(Users).filter(Users.id > 3, Users.name == 'eric').all()
    # session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
    # session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
    # session.query(Users).filter(
    #     or_(
    #         Users.id < 2,
    #         and_(Users.name == 'eric', Users.id > 3),
    #         Users.extra != ""
    #     )).all()
    
    # 7. filter_by
    # session.query(Users).filter_by(name='alex').all()
    
    # 8. 通配符
    # ret = session.query(Users).filter(Users.name.like('e%')).all()
    # ret = session.query(Users).filter(~Users.name.like('e%')).all()
    
    # 9. 切片
    # result = session.query(Users)[1:2]
    
    # 10.排序
    # ret = session.query(Users).order_by(Users.name.desc()).all()
    # ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
    
    # 11. group by
    from sqlalchemy.sql import func
    
    # ret = session.query(
    #         Users.depart_id,
    #         func.count(Users.id),
    # ).group_by(Users.depart_id).all()
    # for item in ret:
    #         print(item)
    #
    # from sqlalchemy.sql import func
    #
    # ret = session.query(
    #         Users.depart_id,
    #         func.count(Users.id),
    # ).group_by(Users.depart_id).having(func.count(Users.id) >= 2).all()
    # for item in ret:
    #         print(item)
    
    # 12.union 和 union all
    """
    select id,name from users
    UNION
    select id,name from users;
    """
    # q1 = session.query(Users.name).filter(Users.id > 2)
    # q2 = session.query(Favor.caption).filter(Favor.nid < 2)
    # ret = q1.union(q2).all()
    #
    # q1 = session.query(Users.name).filter(Users.id > 2)
    # q2 = session.query(Favor.caption).filter(Favor.nid < 2)
    # ret = q1.union_all(q2).all()
    常用操作

    limit分页

    1.用offset()设置索引偏移量,limit()限制取出量
    db.session.query(User.name).filter(User.email.like('%'+email+'%')).limit(page_size).offset((page_index-1)*page_size)
    #filter语句后面可以跟order_by语句
    
    2.用slice(偏移量,取出量)函数
    db.session.query(User.name).filter(User.email.like('%'+email+'%')).slice((page_index - 1) * page_size, page_index * page_size)
    #filter语句后面可以跟order_by语句
    注释:此方法和第一种相同的效果。
    
    因为:由一下内部方法可知,slice()函数第一个属性就是offset()函数值,第二个属性就是limit()函数值
    
    3.用paginate(偏移量,取出量)函数,用于BaseQuery
    
    user_obj=User.query.filter(User.email.like('%'+email+'%')).paginate(int(page_index), int(page_size),False)
    #遍历时要加上items 
    object_list =user_obj.items
     
    
    4.filter中使用limit
    
    db.session.query(User.name).filter(User.email.like('%'+email+'%') and limit (page_index - 1) * page_size, page_size)
    #此处不能再跟order_by语句,否则报错
    4种方式

    一对多操作

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import Users, Depart
    
    engine = create_engine(
            "mysql+pymysql://root:0000@127.0.0.1:3306/sqlalchemy?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    SessionFactory = sessionmaker(bind=engine)
    
    session = SessionFactory()
    
    ############################一对多操作####################################
    
    # 1.查询所有用户
    ret = session.query(Users).all()
    for row in ret:
        print(row.id, row.name, row.depart_id)
    
    # 2.查询所有用户及所属部门名称
    # ret1 = session.query(Users, Depart).join(Depart).all()
    ret1 = session.query(Users.id, Users.name, Depart.title).join(Depart,Users.depart_id == Depart.id).all()
    for row in ret1:
        # print(row)
        # print(row[0].id, row[0].name, row[0].depart_id, row[1].title)
        print(row.id, row.name, row.title)
    
    # 3.relation字段:查询所有用户+所属部门名称
    ret2 = session.query(Users).all()
    for row in ret2:
        print(row.id, row.name, row.depart_id, row.dp.title)
    
    # 4.relation:查询所有技术部的成员
    ret3 = session.query(Depart).filter(Depart.title == '技术部').first()
    for row in ret3.pers:
        print(row.id, row.name, ret3.title)
    
    # 5.创建一个名称叫 IT部门,并在该部门中添加一个员工:浩南
    # 方式一
    # d1 = Depart(title='IT')
    # session.add(d1)
    # session.commit()
    # print(d1.title, d1.id)
    # u = Users(name='浩南',depart_id=d1.id)
    # session.add(u)
    # session.commit()
    
    # 方式二
    # u = Users(name='山鸡',dp=Depart(title='IT'))
    # session.add(u)
    # session.commit()
    #
    
    
    # 6.创建一个名称叫王者荣耀的部门,并在该部门中加入詹姆斯,科比,梅西
    # d2 = Depart(title='王者荣耀')
    # d2.pers = [Users(name='詹姆斯'), Users(name='科比'), Users(name='梅西')]
    # session.add(d2)
    # session.commit()
    
    for row in session.query(Users).all():
        print(row.id, row.name, row.dp.title)
    
    for row in session.query(Depart).all():
        print(row.title)
        for ret2 in row.pers:
            print('	',ret2.name)
    
    session.close()
    FK操作

      多对多操作

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import Users, Depart, Student, Course, Student2Course
    
    engine = create_engine(
            "mysql+pymysql://root:0000@127.0.0.1:3306/sqlalchemy?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    SessionFactory = sessionmaker(bind=engine)
    
    session = SessionFactory()
    
    ############################多对多操作####################################
    
    # 1.录入数据, 添加两个学生和班级
    # session.add_all([
    #     Student(name='大宝贝'),
    #     Student(name='小哥哥'),
    #     Course(title='生物'),
    #     Course(title='体育'),
    # ])
    #
    # session.commit()
    
    # 2.为学生选课
    # session.add_all([
    #     Student2Course(student_id=1, course_id=1),
    #     Student2Course(student_id=1, course_id=2),
    #     Student2Course(student_id=2, course_id=1),
    # ])
    # session.commit()
    
    # 3.三张表关联
    ret = session.query(Student2Course.id, Student.name, Course.title).join(Student, Student2Course.student_id==Student.id).join(Course, Student2Course.course_id==Course.id, isouter=True).order_by(Student2Course.id.asc())
    for row in ret:
        print(row)
    
    # 4.大宝贝选的课
    # ret = session.query(Student2Course.id, Student.name, Course.title).join(Student, Student2Course.student_id==Student.id).join(Course, Student2Course.course_id==Course.id).filter(Student.name=='大宝贝').all()
    # print(ret)
    #
    # obj = session.query(Student).filter(Student.name=='大宝贝').first()
    # for row in obj.course_list:
    #     print(row.title)
    
    # 5.选了生物的所有人
    # objs = session.query(Course).filter(Course.title=='生物')
    # print(objs)
    # obj = session.query(Course).filter(Course.title=='生物').first()
    # for row in obj.student_list:
    #     print(row.name)
    
    # 6.创建一个课程,创建2学生,两个学生选新创建的课程。
    # obj = Course(title='英语')
    # obj.student_list = [Student(name='浩哥'),Student(name='伙计')]
    # session.add(obj)
    # session.commit()
    
    
    # students = session.query(Student).all()
    # for row in students:
    #     print(row.id, row.name)
    #
    # courses = session.query(Course).all()
    # for row in courses:
    #     print(row.id, row.title)
    #
    # stu_cous = session.query(Student2Course).all()
    # for row in stu_cous:
    #     print(row.id, row.student_id, row.course_id)
    
    session.close()
    多对多操作

      多线程连接

      from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import Student,Course,Student2Course
    
    engine = create_engine(
            "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    SessionFactory = sessionmaker(bind=engine)
    
    def task():
        # 去连接池中获取一个连接
        session = SessionFactory()
    
        ret = session.query(Student).all()
    
        # 将连接交还给连接池
        session.close()
    
    
    from threading import Thread
    
    for i in range(20):
        t = Thread(target=task)
        t.start()
    方式一
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from sqlalchemy.orm import scoped_session
    from models import Student,Course,Student2Course
    
    engine = create_engine(
            "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    SessionFactory = sessionmaker(bind=engine)
    session = scoped_session(SessionFactory)
    
    
    def task():
        ret = session.query(Student).all()
        # 将连接交还给连接池
        session.remove()
    
    
    from threading import Thread
    
    for i in range(20):
        t = Thread(target=task)
        t.start()
    方式二(推荐,基于Threading.local实现)

      执行原生sql

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from sqlalchemy.orm import scoped_session
    from models import Student,Course,Student2Course
    
    engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    SessionFactory = sessionmaker(bind=engine)
    session = scoped_session(SessionFactory)
    
    
    def task():
    """"""
    # 方式一:
    """
    # 查询
    # cursor = session.execute('select * from users')
    # result = cursor.fetchall()
    
    # 添加
    cursor = session.execute('INSERT INTO users(name) VALUES(:value)', params={"value": 'wupeiqi'})
    session.commit()
    print(cursor.lastrowid)
    """
    # 方式二:
    """
    # conn = engine.raw_connection()
    # cursor = conn.cursor()
    # cursor.execute(
    #     "select * from t1"
    # )
    # result = cursor.fetchall()
    # cursor.close()
    # conn.close()
    """
    
    # 将连接交还给连接池
    session.remove()
    
    
    from threading import Thread
    
    for i in range(20):
        t = Thread(target=task)
        t.start()
    执行原生sql
  • 相关阅读:
    c++学习--面向对象一实验
    c++学习--面向对象一
    c#学习
    Linux安全之SSH 密钥创建及密钥登录,禁止密码登陆
    laravel 5.5 跨域问题 并且laravel的跨域 Access-Control-Allow-Origin 报错的坑
    安装 lnmp
    微信小程序-聊天功能下拉加载更多数据(历史聊天内容出现在顶部)
    简单实现小程序view拖拽功能
    mysql 常用命令
    有感而发——写给曼曼的信
  • 原文地址:https://www.cnblogs.com/zhangyafei/p/10204609.html
Copyright © 2011-2022 走看看