zoukankan      html  css  js  c++  java
  • SQLAlchemy(1):单表操作

    SQLAlchemy 是一个 ORM框架:类对应表,类中的字段对应表中的列,类的对象对应表的一条记录;作用:帮助我们使用类和对象快速实现数据库操作
    操作数据库的方式:

    1. 原生SQL
        - pymysql :支持 python2 和  python3
        - MySQLdb :只支持 python2
    2. ORM框架 :框架本身要是有ORM应用自己的(如 Django),要是没有就用 SQLAlchemy

    安装 SQLAlchemy:

    pip install sqlalchemy

    注: SQLAlchemy 默认不能修改表结构;想修改表结构需要引入第三方组件

    SQLAlchemy 单表

    创建数据库的表 --- 单表:models.py

    import datetime
    from sqlalchemy import create_engine  # 引入 创建引擎
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index  # 引入列和数据类型
    
    Base = declarative_base()  # Base 要自己实例化
    
    
    class Users(Base):  # Users是类名;类 继承 Base
        __tablename__ = 'users'  # 数据库的表名叫 "users"
    
        id = Column(Integer, primary_key=True)  # Colume表示列; id 是表中的列,Interger 表示整形,primary_key=True 表示 是主键
        name = Column(String(32), index=True, nullable=False)  # name 为表中的列, String(32) 表示 MySQL中的 Varchar(32), index=True 表示索引,nullable=False 表示不允许为空
        # email = Column(String(32), unique=True)
        # ctime = Column(DateTime, default=datetime.datetime.now)
        # extra = Column(Text, nullable=True)
    
    
    def init_db():
        """
        根据类创建数据库的表
        :return:
        """
        engine = create_engine(  # 创建数据库连接
            "mysql+pymysql://root:tj037778@127.0.0.1:3306/dbtest?charset=utf8",  # mysql 表示要连接的数据库;pymysql 表示用 pymysql 来连接;用户名是 root,密码是123,连接本地的 dbtest 这个数据库
            max_overflow=0,  # 超过连接池大小外最多创建的连接;即超过 pool_size 后最多能溢出多少个连接
            pool_size=5,  # 连接池大小
            pool_timeout=10,  # 池中没有线程(连接)最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置);-1表示不重建
        )
    
        Base.metadata.create_all(engine)  # Base.metadata.create_all() : 找到当前 py 文件下面 继承了Base的所有的类,在数据库中生成一张表
    
    
    def drop_db():
        """
        根据类删除数据库中的表
        :return:
        """
        engine = create_engine(
            "mysql+pymysql://root:tj037778@127.0.0.1:3306/dbtest?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
        Base.metadata.drop_all(engine)  # Base.metadata.drop_all() : 找到当前 py 文件下面 继承了Base的所有的类,在数据库中删除相应的表
    
    
    if __name__ == "__main__":
        init_db()
        # drop_db()

    单表基本增删改查示例:crud.py

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import Users  # 导入 Users 类
    
    # 创建数据库连接(crud 这个py文件的数据库连接)
    engine = create_engine("mysql+pymysql://root:tj037778@127.0.0.1:3306/dbtest", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)  # Session可理解成 连接池
    
    # 通过 SQLAlchemy 操作数据库:通过 Users 类对 users 表进行增删改查
    
    # 每次执行数据库操作时,都需要创建一个session (可理解成一个连接)
    session = Session()
    
    # ############# 执行ORM操作:增 #############
    # 1. 单条增加
    """
    obj1 = Users(name="neo") # 实例化一个 Users 的对象;该对象是在内存中创建
    session.add(obj1)  # session 根据 obj1 在表中增加一条数据
    session.commit()  # session.add() 之后要 commit() 一下
    session.close()  # 关闭 连接
    """
    
    # 2. 多条增加
    """
    obj2 = Users(name="egon")
    obj3 = Users(name="alex")
    
    session.add_all(  # 增加多个;也可以 session.add() 多次
        [obj2,obj3]  # 添加的记录(对象)放入 列表 中
    )
    
    session.commit()
    session.close()
    """
    
    # ############# 执行ORM操作:查 #############
    # 1. 查询所有
    res = session.query(Users).all()  # 查询 Users 表中的所有数据
    print(res)
    # 打印结果(按id 倒序排列): [<models.Users object at 0x000002132AEFE400>, <models.Users object at 0x000002132AEFE470>, <models.Users object at 0x000002132AEFE4E0>]  # 列表套对象的形式
    for row in res:
        print(row.id, row.name)
    
    # 2. 条件查询
    res_filter = session.query(Users).filter(Users.id >= 2)  # filter() 中直接加 表达式
    for row in res_filter:
        print(row.id, row.name)
    
    # 3. 只取第一个
    res_first = session.query(Users).filter(Users.id >= 2).first()  # first() 取到的结果不再是一个列表,而是一个对象
    print(res_first.id, res_first.name)
    
    # ############# 执行ORM操作:删 #############
    session.query(Users).filter(Users.id >= 3).delete()  # 查询结果 .delete()
    session.commit()  # 删除之后也要 commit()
    
    # ############# 执行ORM操作:改 #############
    session.query(Users).filter(Users.id == 1).update({Users.name: "NEO"})  # 查询结果.update({ }) ; 返回 更新的条数
    # session.query(Users).filter(Users.id==1).update({"name":"neo"})  # 也可以用这种写法
    session.commit()
    
    # update() 补充:
    # session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False) # 在字段原有值的基础上更新时,synchronize_session=False 表示是 字符串 的拼接
    # session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")  # 在字段原有值的基础上更新时,synchronize_session="evaluate" 表示是 数值 的相加减 (默认是这种)
    # session.commit()
    
    session.close()

    SQLAlchemy 单表的其它常用操作:

    # ##################### 其它常用操作 ########################
    # 1. 查询某些字段
    res = session.query(Users.id,Users.name).all()  # 查询 id字段 和 name字段;结果为列表
    for item in res:
        print(item,type(item),item[0],item.name)  # item 不是元组,只是形式像元组,并且可以利用 item[0],item[1] 这种元组的用法去获取item中的值,也可以利用 item.id 和 item.name 获取item中的值
    # 打印结果:
    # (2, 'egon') <class 'sqlalchemy.util._collections.result'> 2 egon
    # (1, 'NEO') <class 'sqlalchemy.util._collections.result'> 1 NEOsult'>
    
    # 2. 给某字段起个别名(sql 中的 as): .label("别名")
    res = session.query(Users.id,Users.name.label("cname")).all()
    for item in res:
        print(item.cname)
    # 打印结果:
    # egon
    # NEO
    
    # 3. 查看sql语句:去掉 .all()
    query = session.query(Users.id,Users.name.label("cname"))
    print(query)
    # 打印结果:
    # SELECT users.id AS users_id, users.name AS cname FROM users
    
    # 4. 其它
    ret1 = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()  # filter() 中的 “,” 表示的是 sql 条件中的 and (默认条件为 and )
    ret2 = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()  # .between : sql 条件中的 between...and
    ret3 = session.query(Users).filter(Users.id.in_([1,3,4])).all()  # Users.id.in_([1,3,4]) :sql 条件中的  in
    ret4 = session.query(Users).filter(~Users.id.in_([1,3,4])).all() # ~Users.id.in_([1,3,4]) :not in
    
    #
    5. 子查询 ret5 = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all() # 一个查询结果作为另一个的查询条件 # 6. and 和 or 的嵌套 from sqlalchemy import and_,or_ # 先导入 and 和 or session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all() # and_() 中的条件是 and 关系 (默认也是 and) session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all() # or_() 中的条件是 or 关系 session.query(Users).filter( or_( # 下面的三个条件是 or 的关系 Users.id < 2, and_(Users.name == 'eric', Users.id > 3), # 这两个是 and 的关系 # Users.extra != "" # 这个是 not (非) )).all() ret6 = session.query(Users).filter_by(name='alex').all() # filter_by() 过滤 """ filter() 和 filter_by() 的区别: filter:column == expression 1. 传入参数的写法,要用:类名.列名 两个等号 去判断 2. 更复杂的查询的语法,比如and_(),or_()等多个条件的查询,只支持filter filter_by:keyword = expression 1. 传入参数的写法,只需要用:(不带类名的)列名 单个等号 就可以判断 """ # 7. 通配符 ret7 = session.query(Users).filter(Users.name.like('n%')).all() # like ret8 = session.query(Users).filter(~Users.name.like('n%')).all() # not like ret9 = session.query(Users).filter(Users.name.like('n_')).all() # like # 8. 切片 (限制/分页) ret10 = session.query(Users)[1:2] # 9. 排序 ret11 = session.query(Users).order_by(Users.name.desc()).all() ret12 = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 10. 分组:group_by() from sqlalchemy.sql import func # 导入 func,func 中有聚合函数 # ret13 = session.query(Users).group_by(Users.depart_id).all() # group_by(字段)之后不要 query() 所有字段 ret14 = session.query( Users.name, func.max(Users.id), # 自己指定取哪个字段 func.sum(Users.id), # 非分组字段获取时,使用聚合函数 func.min(Users.id) ).group_by(Users.name).all() ret15 = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id) ).group_by(Users.name).having(func.count(Users.id) >2).all() # 根据 name 分组,func.count(Users.id) > 2 ;根据聚合函数进行二次筛选:having # 11. 组合(垂直/上下连表):union 和 union all --- union all 去重,union 不去重 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret16 = q1.union(q2).all() q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret17 = q1.union_all(q2).all()
  • 相关阅读:
    【作业】Python面向对象
    Python使用使用第三方源(国内源:豆瓣)下载包文件 超快!!!
    【案例】Python
    【个人笔记】Python
    定义函数相关内容
    列表,for循环相关.
    while应用和函数学习
    斗地主发牌器
    字符串索引切片.
    随机生成20以内加减法,5次答题并统计正确和错误题数
  • 原文地址:https://www.cnblogs.com/neozheng/p/10301692.html
Copyright © 2011-2022 走看看