zoukankan      html  css  js  c++  java
  • ORM之SQLAlchemy

     在python中操作数据库,最常见的方式是使用SQLAlchemy,我们来了解一下它的具体使用

      安装:

    pip3 install sqlalchemy

      基础使用:

    # 导入:
    from sqlalchemy import Column, String, create_engine
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy.ext.declarative import declarative_base
    
    # 创建对象的基类:
    Base = declarative_base()
    
    # 定义User对象:
    class User(Base):
        # 表的名字:
        __tablename__ = 'user'
    
        # 表的结构:
        id = Column(String(20), primary_key=True)
        name = Column(String(20))
    
    # 初始化数据库连接:
    engine = create_engine('mysql+pymysql://root:password@localhost:3306/test', echo=True)
    # 创建DBSession类型:
    DBSession = sessionmaker(bind=engine)
    
    
    
    初始化数据库连接:
    '数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名?charset=utf8'
      这里的数据库驱动名称可省略 '
    数据库类型://用户名:口令@机器地址:端口号/数据库名?charset=utf8'

    echo参数为True时,会显示每条执行的SQL语句,可以关闭,
     
      

    2020-02-29 14:58:26,843 INFO sqlalchemy.engine.base.Engine SHOW VARIABLES LIKE 'sql_mode'
    2020-02-29 14:58:26,843 INFO sqlalchemy.engine.base.Engine {}
    2020-02-29 14:58:26,845 INFO sqlalchemy.engine.base.Engine SHOW VARIABLES LIKE 'lower_case_table_names'
    2020-02-29 14:58:26,845 INFO sqlalchemy.engine.base.Engine {}
    2020-02-29 14:58:26,846 INFO sqlalchemy.engine.base.Engine SELECT DATABASE()
    2020-02-29 14:58:26,846 INFO sqlalchemy.engine.base.Engine {}
    2020-02-29 14:58:26,847 INFO sqlalchemy.engine.base.Engine show collation where `Charset` = 'utf8mb4' and `Collation` = 'utf8mb4_bin'
    2020-02-29 14:58:26,847 INFO sqlalchemy.engine.base.Engine {}
    2020-02-29 14:58:26,849 INFO sqlalchemy.engine.base.Engine SELECT CAST('test plain returns' AS CHAR(60)) AS anon_1
    2020-02-29 14:58:26,849 INFO sqlalchemy.engine.base.Engine {}
    2020-02-29 14:58:26,849 INFO sqlalchemy.engine.base.Engine SELECT CAST('test unicode returns' AS CHAR(60)) AS anon_1
    2020-02-29 14:58:26,849 INFO sqlalchemy.engine.base.Engine {}
    2020-02-29 14:58:26,850 INFO sqlalchemy.engine.base.Engine SELECT CAST('test collated returns' AS CHAR CHARACTER SET utf8mb4) COLLATE utf8mb4_bin AS anon_1
    2020-02-29 14:58:26,850 INFO sqlalchemy.engine.base.Engine {}
    2020-02-29 14:58:26,851 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
    2020-02-29 14:58:26,852 INFO sqlalchemy.engine.base.Engine SELECT user.id AS user_id, user.name AS user_name, user.password AS user_password, user.create_time AS user_create_time
    FROM user
    WHERE user.id = %(id_1)s
    2020-02-29 14:58:26,852 INFO sqlalchemy.engine.base.Engine {'id_1': 1}



       


      添加:

    # 创建session对象:
    session = DBSession()
    # 创建新User对象:
    new_user = User(id='5', name='Bob')
    # 添加到session:
    session.add(new_user)
    # 提交即保存到数据库:
    session.commit()
    # 关闭session:
    session.close()

      添加多条:

    # 添加多条记录
    user1 = User(name='xing1', password='111111')
    user2 = User(name='xing2', password='222222')
    user3 = User(name='xing3', password='333333')
    
    
    session.add_all([user1, user2, user3])
    session.commit()
    session.close()

      查询:

    # 创建Session:
    session = DBSession()
    # 创建Query查询,filter是where条件,最后调用one()返回唯一行,如果调用all()则返回所有行:
    user = session.query(User).filter(User.id=='5').one()
    # 打印类型和对象的name属性:
    print('type:', type(user))
    print('name:', user.name)
    # 关闭Session:
    session.close()


    说明:
    all() 返回一个列表 可以通过遍历列表来获取每个对象。

    one() 返回且仅返回一个查询结果。当结果的数量不足一个或者多于一个时会报错

    first() 返回至多一个结果,而且以单项形式,而不是只有一个元素的tuple形式返回这个结果
    get(1) 返回一个结果 通过主键查询

      过滤条件:

    
    
    from sqlalchemy import func


    使用filter 或者filter_by filter 需要使用User.name 方式指定筛选条件,filter_by只通过字段名称即可 filter_by最后的结果就是一个sql语句,我们排错的时候就可以通过这个来排查我们sql是否正确 常用筛选条件: equals: query(Student).filter(Student.
    id == 10001) not equals: query(Student).filter(Student.id != 100) LIKE: query(Student).filter(Student.name.like(“%feng%”)) IN: query(Student).filter(Student.name.in_(['feng', 'xiao', 'qing'])) not in query(Student).filter(~Student.name.in_(['feng', 'xiao', 'qing']))
    is null
    query(
    Student).filter(Student.count==none).all()
    is not null
    query(Student).filter(Student.count!=none).all()

    AND: from sqlalchemy import and_ query(Student).filter(and_(Student.name
    == 'fengxiaoqing', Student.id ==10001)) 或者 query(Student).filter(Student.name == 'fengxiaoqing').filter(Student.address == 'chengde') OR: from sqlalchemy import or_ query.filter(or_(Student.name == 'fengxiaoqing', Student.age ==18))

    包含:
    session.query(Staff).filter(Staff.name.contains("a")).all()

    区间:
    session.query(Staff).filter(Staff.id.between(1,2)).all()

    字段筛选:
    users = session.query(User.id, User.name).all()
    for user in users:
    print(user.id, user.name)

    去重:
    users = session.query(User.password).distinct().all()
    for user in users:
    print(user.password)



    filter_by 不支持组合查询,只能连续调用filter来变相实现。
    filter_by的参数是**kwargs,直接支持组合查询。

    user = session.query(User).filter_by(name = 'xing1').filter_by(password = '111111').one()

      关联查询:

    1)查询 gameuid 1000 账号下绑定的所有帐号
    
     print(db.session.query(Bind.bindid, Bind.fromid, Bind.toid, Account.gameuid, Account.nickname). 
        filter(Bind.toid == 1000). 
        filter(Bind.fromid == Account.gameuid))
    SELECT bind.bindid AS bind_bindid, bind.fromid AS bind_fromid, bind.toid AS bind_toid, account.gameuid AS account_gameuid, account.nickname AS account_nickname
    FROM bind, account
    WHERE bind.toid = %(toid_1)s AND bind.fromid = account.gameuid
    
    这里的联表查询使用的是 WHERE 语句。如果希望使用 JOIN 语句,可以这样写:
    print(db.session.query(Bind.bindid, Account.gameuid, Account.nickname). 
        join(Account, Account.gameuid==Bind.fromid). 
        filter(Bind.toid == 1000))
    SELECT bind.bindid AS bind_bindid, bind.fromid AS bind_fromid, account.gameuid AS account_gameuid, account.nickname AS account_nickname
    FROM bind INNER JOIN account ON account.gameuid = bind.fromid
    WHERE bind.toid = %(toid_1)s
    query 中参数的顺序很重要,第一个参数所代表的 table 就是 JOIN 时放在前面的那个 table。因此,此处 JOIN 的目标应该是 Account, 而不应该是 Bind 自身。
    

      

    另外 一种关联查询
    
    在 Flask-SQLAlchemy 提供的 Model 对象中,可以使用 Model.query 这样的语法来直接得到一个查询对象,这是由于 Flask-SQLAlchemy 中存在一个 _QueryProperty 类,每次调用 Model.__get__ 时,会自动生成一个基于当前 session 的 query 对象
    
    >>> Account.query.join(Bind, Bind.fromid == Account.gameuid).filter(Bind.toid == 1000).all()
    [<Account 10001>, <Account 10002>, <Account 10003>, <Account 10004>, <Account 10005>, <Account 10006>, <Account 10007>, <Account 10008>, <Account 10009>, <Account 10000>, <Account 11000>]
    
    SELECT account.gameuid AS account_gameuid, account.nickname AS account_nickname
    FROM account INNER JOIN bind ON bind.fromid = account.gameuid
    WHERE bind.toid = %(toid_1)s
    
    使用 Model.query 得到的这个 query 对象可以直接进行 JOIN 操作,得到的结果是 Model 对象。这样就方便多了
    可以看出,这样的查询结果和使用 db.session.query 并没有什么不同。由于返回的是 Model 对象,使用上可能还更加方便了
    

      

    条件筛选:
    >>> Account.query.join(Bind, Bind.fromid == Account.gameuid). 
        filter(Bind.toid == 1000). 
        with_entities(Bind.bindid, Account.nickname).all()
    [(2, '玩家10001'), (3, '玩家10002'), (4, '玩家10003'), (5, '玩家10004'), (6, '玩家10005'), (7, '玩家10006'), (8, '玩家10007'), (9, '玩家10008'), (10, '玩家10009'), (53, '玩家10000'), (54, '玩家11000')]
    >>>
    注意,列表中的项 (2, '玩家10001') 并不是标准的 Python tuple。你如果查看它的类型,会发现一个奇怪的名称: <class 'sqlalchemy.util._collections.result'> 。它是一个 AbstractKeyedTuple 对象,拥有一个 keys() 方法,这样可以很容易将其转换成 dict :
    >>> results = Account.query.join(Bind, Bind.fromid == Account.gameuid). 
        filter(Bind.toid == 1000). 
        with_entities(Bind.bindid, Account.nickname).all()
    >>> [dict(zip(result.keys(), result)) for result in results]
    [{'bindid': 2, 'nickname': '玩家10001'}, {'bindid': 3, 'nickname': '玩家10002'}, {'bindid': 4, 'nickname': '玩家10003'}, {'bindid': 5, 'nickname': '玩家10004'}, {'bindid': 6, 'nickname': '玩家10005'}, {'bindid': 7, 'nickname': '玩家10006'}, {'bindid': 8, 'nickname': '玩家10007'}, {'bindid': 9, 'nickname': '玩家10008'}, {'bindid': 10, 'nickname': '玩家10009'}, {'bindid': 53, 'nickname': '玩家10000'}, {'bindid': 54, 'nickname': '玩家11000'}]
    
    
    
    除了筛选字段外,还可以用另一个方法获取多个 Model 的记录。那就是,返回两个 Model 的所有字段
    >>> db.session.query(Account, Bind).join(Bind, Account.gameuid==Bind.fromid).filter(Bind.toid==1000).all()
    [(<Account 10001>, <Bind 10001, 1000>), (<Account 10002>, <Bind 10002, 1000>), (<Account 10004>, <Bind 10004, 1000>), (<Account 10005>, <Bind 10005, 1000>), (<Account 10006>, <Bind 10006, 1000>), (<Account 10007>, <Bind 10007, 1000>), (<Account 10008>, <Bind 10008, 1000>), (<Account 10009>, <Bind 10009, 1000>), (<Account 10000>, <Bind 10000, 1000>), (<Account 11000>, <Bind 11000, 1000>)]
    
    使用上面的语法直接返回 Account 和 Bind 对象,可以进行更加灵活的操作
    

     

      join默认是一种内连接 , 也就是inner join, 还有外连接

    member = db.session.query(
                AppMember.member_nickname, WxGuestLogin.headimgurl, AppMember.member_phone,
                AppMemberExt.vip_id, AppMemberExt.vip_end_time, AppMemberVip.name) 
                .outerjoin(WxGuestLogin, AppMember.wxmp_openid == WxGuestLogin.wxmp_openid) 
                .outerjoin(AppMemberExt, AppMember.id == AppMemberExt.uid) 
                .outerjoin(AppMemberVip, AppMemberExt.vip_id == AppMemberVip.id) 
                .filter(AppMember.id == g.uid).first()
    
    # outerjoin 外部链接   左连接

      更新:

    my_stdent = session.query(Student).filter(Student.id == 1002).first()
    my_stdent.name = "lanlang"
    session.commit()
    session.close()

      批量更新: 

    AppOrder.query.filter(AppOrder.id.in_(order_ids)).update({'is_deducted': 1}, synchronize_session='fetch')
    db.session.commit()

      删除:

    user = session.query(User).filter_by(name='xing3').delete()
    session.commit()
    session.close()

      取数: count

    user_total = session.query(User).count()

      分组: group_by

    user_states = session.query(User).group_by(User.password).all()
    for state in user_states:
        print(state.id, state.name, state.password)

      排序:order_by

    users = session.query(User).order_by(User.id.desc()).all()
    for user in users:
        print(user.name)

      统计: sum

    total = session.query(User).with_entities(func.sum(User.id)).scalar()
    
    或者
    
    total = session.query(func.sum(User.id)).scalar()

      平均值:avg

    avg = session.query(func.avg(User.id)).scalar()

    除了直接操作sqlalchemy,在框架中是如果操作的呢? 例如:flask中 是使用的Flask_sqlalchemy.操作上稍有不同支持

      查询操作:

    模糊匹配
    Staff.query.filter(Staff.name.like("%a%")).all()
    
    不等于
    Staff.query.filter(Staff.id!=1).all()
    
    大于,小于
    Staff.query.filter(Staff.id>1,Staff.score>1).all()
    
    或
    Staff.query.filter(or_(Staff.id>1, Staff.score<4)).all()
    
    包含
    Staff.query.filter(Staff.name.contains("a")).all()
    
    区间
    Staff.query.filter(Staff.id.between(1,2)).all()
    
    与
    Staff.query.filter(and_(Staff.id>1, Staff.score>2)).all()
    
    字段筛选
    Staff.query.with_entities(Staff.name, Staff.id).all()
    [(1, 'aa'), (2, 'bb')]
    
    去重
    Staff.query.with_entities(Staff.name).distinct().all()
    [('aa',), ('bb',)]

      事务处理:

    示例:

    from functools import wraps
    from contextlib import ContextDecorator
    
    '''
    示例程序:
    创建一个新用户,同时将新用户关联到一家公司下,
    这需要两步数据库操作,但是这应该是一个事务,
    要么都完成,要么都未完成
    注意:即使只有一步,也需要如下操作
    flush和commit区别:
        > flush: 写数据库,但不提交,也就是事务未结束
        > commit: 是先调用flush写数据库,然后提交,结束事务,并开始新的事务
    
    '''
    def create_user(name, phone): data
    = { 'name': name, 'phone': phone, } user = User(name) db.session.add(user) db.session.flush() return user def create_user_and_company_mapping(user_id, commpany_id): data = { 'user_id': user_id, 'company_id': company_id, } mapping = UserCompanyMapping(**data) db.session.add(mapping) db.session.flush() return mapping class CreateUser(Api): def post(self, params): name = params['name'] phone = params['phone'] company_id = params['company_id'] # 这里用with语句将两个操作封闭成一个原子操作 with atomic(db): user = create_user(name, phone) create_user_and_company_mapping(user.id, company_id) # 注意:即使只有一步创建用户的操作也需要这样写 with atomic(db) user = create_user(name, phone) def atomic(db): if callable(db): return Atomic(db)(db) else: return Atomic(db) class Atomic(ContextDecorator): def __init__(self, db): self.db = db def __enter__(self): pass def __exit__(self, exc_typ, exc_val, tb): if exc_typ: self.db.session.rollback() else: self.db.session.commit()

    其他配置项:

    1, 想要查看具体的SQL语句

    SQLAlchemy打开SQL语句方法如下,echo=true将开启该功能:
    engine = create_engine(“”, echo=True)
    
    Flask-SQLAlchemy打开SQL方法如下:
    app.config[“SQLALCHEMY_ECHO”] = True
  • 相关阅读:
    T-SQL查询语句
    数据库和表的管理
    数据库概念
    IRF2实验
    IFR2笔记
    校园网双网出口实验案例
    双机热备实验
    华为H3C(NAT)实验
    BGP(边界网关协议)实验
    Hybrid实验
  • 原文地址:https://www.cnblogs.com/xingxia/p/orm_sqlalchemy.html
Copyright © 2011-2022 走看看