zoukankan      html  css  js  c++  java
  • sqlachelmy的使用

    一、增删改查的使用

    数据库表的初始化

    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column
    from sqlalchemy import Integer,String,Text,Date,DateTime
    from sqlalchemy import create_engine
    
    
    Base = declarative_base()
    
    class Users(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=False)
        depart_id = Column(Integer)
    
    def create_all():
        engine = create_engine(
            "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
        Base.metadata.create_all(engine)
    
    def drop_all():
        engine = create_engine(
            "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
        Base.metadata.drop_all(engine)
    
    if __name__ == '__main__':
        drop_all()
        create_all()
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine

    engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/mytest?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) # 根据Users类对users表进行增删改查 session = SessionFactory() # 创建表结构,只有继承了Base的类才会被初始化 Base.metadata.create_all(engine)

    增删改查

    1. 增加
    obj = Users(name='alex')
    session.add(obj)
    session.commit()
    
    session.add_all([
            Users(name='小东北'),
            Users(name='龙泰')
    ])
    session.commit()
    session.close()
    增加
    session.query(Users).filter(Users.id >= 2).delete()
    session.commit()
    删除
    session.query(Users).filter(Users.id == 4).update({Users.name:'二郎神'})
    session.query(Users).filter(Users.id == 4).update({'name':'孙悟空'})
    session.query(Users).filter(Users.id == 4).update({'name':Users.name+"DSB"},synchronize_session=False)
    session.commit()
    
    # synchronize_session代表以字符串的形式更新,如果不添加此参数,默认以数字的形式进行更新。
    更改
    result = session.query(Users).all()
    for row in result:
            print(row.id,row.name)
    
    result = session.query(Users).filter(Users.id >= 2)
    for row in result:
            print(row.id,row.name)
    
    result = session.query(Users).filter(Users.id >= 2).first()
    print(result)
    查看

    指定查询字段(列)

    # 原生sql
    select id,name as cname from users;
    
    # sqlachelmy语句
    result = session.query(Users.id,Users.name.label('cname')).all()
    for item in result:
            print(item[0],item.id,item.cname)

    and查询(默认)

    session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()

    between查询

    session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()

    in查询

    session.query(Users).filter(Users.id.in_([1,3,4])).all()
    session.query(Users).filter(~Users.id.in_([1,3,4])).all()
    
    # in查询要使用in_
    # ~代表反向查询

    子查询

    session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='eric'))).all()

    and 和 or查询

    from sqlalchemy import and_, or_
    session.query(Users).filter(Users.id > 3, Users.name == 'eric').all()
    # and_为默认的使用方法
    session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
    session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
    session.query(Users).filter(
        or_(
            Users.id < 2,
            and_(Users.name == 'eric', Users.id > 3),
            Users.extra != ""
        )).all()

    filter_by

    session.query(Users).filter_by(name='alex').all()
    
    # filter_by与filter功能相同,只是filter传入的是一个值

    通配符查询

    ret = session.query(Users).filter(Users.name.like('e%')).all()
    ret = session.query(Users).filter(~Users.name.like('e%')).all()
    
    # 相当于原生sql中的 like 'e%'

    切片

    result = session.query(Users)[1:2]

    排序

    ret = session.query(Users).order_by(Users.name.desc()).all()
    ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

    group_by

    from sqlalchemy.sql import func
    
    ret = session.query(
            Users.depart_id,
            func.count(Users.id),
    ).group_by(Users.depart_id).all()
    for item in ret:
            print(item)
    
    from sqlalchemy.sql import func
    
    ret = session.query(
            Users.depart_id,
            func.count(Users.id),
    ).group_by(Users.depart_id).having(func.count(Users.id) >= 2).all()
    for item in ret:
            print(item)
    
    # func中含有sql计算方法
    # 一旦使用了func方法,再想过滤查询只能使用having方法

    union 和 union_all

    """
    select id,name from users
    UNION
    select id,name from users;
    """
    q1 = session.query(Users.name).filter(Users.id > 2)
    q2 = session.query(Favor.caption).filter(Favor.nid < 2)
    ret = q1.union(q2).all()
    
    q1 = session.query(Users.name).filter(Users.id > 2)
    q2 = session.query(Favor.caption).filter(Favor.nid < 2)
    ret = q1.union_all(q2).all()
    
    # 二者的区别在于union会合并重复的查询结果,而union_all不会,它让然会将重复的结果累加到结果下面。
    
    # union查询类似于left join或者right join查询

     二、外键查询

    1.ForeignKey

    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column
    from sqlalchemy import Integer,String,Text,Date,DateTime,ForeignKey,UniqueConstraint, Index
    from sqlalchemy import create_engine
    from sqlalchemy.orm import relationship
    
    
    Base = declarative_base()
    
    class Depart(Base):
        __tablename__ = 'depart'
        id = Column(Integer, primary_key=True)
        title = Column(String(32), index=True, nullable=False)
    
    class Users(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=False)
        depart_id = Column(Integer,ForeignKey("depart.id"))
    
        dp = relationship("Depart", backref='pers')
    数据库模型

    使用

    # 1.直接查询
    ret = session.query(Users).all()
    for row in ret:
        print(row.id, row.name, row.depart_id)
        
    session.close()
    
    # 2.联表查询
    ret = session.query(Users.id,Users.name,Depart.title).join(Depart).all()
    # 二者结果相同,外键自动匹配主键,默认为inner查询,添加isouter=True后变为left join
    ret = session.query(Users.id,Users.name,Depart.title).join(Depart, Users.depart_id==Depart.id,isouter=True).all()
    for row in ret:
        # print(row)    (1, 'alex', '研发')
        print(row.id, row.name, row.title)  # 1 alex 研发
    
    session.close()
    
    # 3.外键关联查询
    ret = session.query(Users).all()
    for row in ret:
        print(row.id, row.name, row.dp.title)
    
    session.close()
    
    # 4.反向查询
    ret = session.query(Depart).filter(Depart.title=="运维").first()
    for row in ret.pers:
        print(row.id,row.name,ret.title)
    
    session.close()
    
    # 5.外键表中创建一个值,引用外键的表中创建多个数据
    # 方式一:各自设置自己的表
    d1=Depart(title="前端")
    session.add(d1)
    session.commit()
    
    u1=Users(name="小强",depart_id=d1.id)
    session.add(u1)
    session.commit()
    
    session.close()
    
    # 方式2:引用外键的表通过relationship正向设置外键表的值
    u1=Users(name="小红",dp=Depart(title="java开发"))
    session.add(u1)
    session.commit()
    
    session.close()
    
    # 方式3:外键表通过backref反向设置引用外键的表
    d1=Depart(title="大数据")
    d1.pers=[Users(name="AAA"),Users(name="BBB"),Users(name="CCC")]
    session.add(d1)
    session.commit()
    
    session.close()

    2.m2m查询

    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column
    from sqlalchemy import Integer,String,Text,Date,DateTime,ForeignKey,UniqueConstraint, Index
    from sqlalchemy import create_engine
    from sqlalchemy.orm import relationship
    
    
    Base = declarative_base()
    
    class Student(Base):
        __tablename__ = 'student'
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=False)
    
        course_list = relationship('Course', secondary='student2course', backref='student_list')
    
    class Course(Base):
        __tablename__ = 'course'
        id = Column(Integer, primary_key=True)
        title = Column(String(32), index=True, nullable=False)
    
    class Student2Course(Base):
        __tablename__ = 'student2course'
        id = Column(Integer, primary_key=True, autoincrement=True)
        student_id = Column(Integer, ForeignKey('student.id'))
        course_id = Column(Integer, ForeignKey('course.id'))
    
        __table_args__ = (
            UniqueConstraint('student_id', 'course_id', name='uix_stu_cou'), # 联合唯一索引
            # Index('ix_id_name', 'name', 'extra'),                          # 联合索引
        )
    数据库模型
    # 1.录入数据
    session.add_all([
        Student(name='张三'),
        Student(name='李四'),
        Course(title='物理'),
        Course(title='化学'),
    ])
    
    session.commit()
    
    session.add_all([
        Student2Course(student_id=1,course_id=1),
        Student2Course(student_id=1,course_id=2),
        Student2Course(student_id=2,course_id=1),
    ])
    
    session.commit()
    session.close()
    
    # 2.三张表关联
    ret = session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id).join(Course,Student2Course.course_id==Course.id).order_by(Student2Course.id.asc())
    for row in ret:
        print(row)
        
    session.close()
    
    # 3.三张表关联后,筛选数据
    ret = session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id).join(Course,Student2Course.course_id==Course.id).filter(Student.name=="张三").order_by(Student2Course.id.asc())
    for row in ret:
        print(row)
    
    session.close()
    
    # 4.使用relationship后,跨表多对多查询
    ret = session.query(Student).filter(Student.name=="张三").first()
    # 在设置了relationship的表中,通过设置的字段名称来进行m2m查询
    for row in ret.course_list:
        print(row.title)
    
    ret2 = session.query(Course).filter(Course.title=="物理").first()
    # 在没有设置relationship的表中,通过设置了relationship表中的backref的值来进行m2m查询
    for row in ret2.student_list:
        print(row.name)
        
    session.close()
    
    # 5.多对多关系插入数据
    obj = Course(title="英语")
    obj.student_list = [Student(name="小红"), Student(name="小强")]
    
    session.add(obj)
    session.commit()
    
    session.close()

    三、sqlalchemy的两种连接方式

    方式一:每连接一次,需要指定生成一个线程

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import Student,Course,Student2Course
    
    engine = create_engine(
            "mysql+pymysql://root:123456@127.0.0.1:3306/mywork?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    SessionFactory = sessionmaker(bind=engine)
    
    def task():
        # 去连接池中获取一个连接
        session = SessionFactory()
    
        ret = session.query(Student).all()
    
        # 将连接交还给连接池
        session.close()
    
    
    from threading import Thread
    
    for i in range(20):
        t = Thread(target=task)
        t.start()
    View Code

    方式二:每一次连接自动生成新的线程

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from sqlalchemy.orm import scoped_session
    from models import Student,Course,Student2Course
    
    engine = create_engine(
            "mysql+pymysql://root:123456@127.0.0.1:3306/mywork?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    SessionFactory = sessionmaker(bind=engine)
    session = scoped_session(SessionFactory)
    
    
    def task():
        ret = session.query(Student).all()
        # 将连接交还给连接池
        session.remove()
    
    
    from threading import Thread
    
    for i in range(20):
        t = Thread(target=task)
        t.start()
    View Code

    四、sqlalchemy执行原生sql

    方式一:

    # 查询
    cursor = session.execute('select * from users')
    result = cursor.fetchall()
    
    # 添加
    cursor = session.execute('INSERT INTO users(name) VALUES(:value)', params={"value": 'alex'})
    session.commit()
    print(cursor.lastrowid)
    
    # 这里使用变量赋值需要主意冒号":value"

    方式二:

    conn = engine.raw_connection()
    cursor = conn.cursor()
    cursor.execute(
        "select * from t1"
    )
    result = cursor.fetchall()
    cursor.close()
    conn.close()
    
    # 不使用sessionmaker,直接使用连接池
  • 相关阅读:
    CentOS中JAVA_HOME的环境变量设置
    Macserver服务更新经常使用的几个shell命令
    一个技术派创业者的反思
    巴斯卡三角形
    iOS中基于 Socket 的 C/S 结构网络通信(中)
    poj 3267 The Cow Lexicon (动态规划)
    Android入门:短信和拨打电话
    HDUOJ--4888--Redraw Beautiful Drawings【isap】网络流+判环
    Dynamics CRM 2015 New Feature (9): Services Changes
    Class 找出一个整形数组中的元素的最大值
  • 原文地址:https://www.cnblogs.com/ttyypjt/p/11084132.html
Copyright © 2011-2022 走看看