zoukankan      html  css  js  c++  java
  • python 之 sqlalchemy练习

    sqlalchemy

    single_table(单表)

    #!/bin/bin/env python
    # -*-coding:utf-8 -*-
    # Author : rain
    
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, String, Integer, ForeignKey, create_engine, UniqueConstraint, Index, or_, and_
    from sqlalchemy.orm import sessionmaker, relationship
    
    
    
    engine = create_engine("mysql+pymysql://root:123.com@10.10.36.101/s13", max_overflow=5)
    Base = declarative_base()
    
    # 单表
    class Single_table(Base):
        __tablename__ = 'single_table'
        id = Column(Integer, primary_key='True', autoincrement='True')
        name = Column(String(24))
        extra = Column(String(16))
    
        __table_args__ = (
        UniqueConstraint('id', 'name', name='uix_id_name'),
            Index('ix_id_name', 'name', 'extra'),
        )
    
    def init_db():
        Base.metadata.create_all(engine)
    
    def drop_db():
        Base.metadata.drop_all(engine)
    
    Session = sessionmaker(bind=engine)
    session = Session()
    
    # 添加一条数据
    obj = Single_table(name='rain', extra='set')
    # obj = Single_table(name='wind', extra='put')
    session.add(obj)
    session.commit()
    
    # 查询Single_table中id,name,extra的第一条记录
    obj = session.query(Single_table).filter(Single_table.id == 1).first()
    print(obj.id, obj.name, obj.extra)   # 1 rain set
    
    # 查询Single_table中id,name,extra的所有记录
    obj = session.query(Single_table.id, Single_table.name, Single_table.extra).all()
    print(obj)   # [(1, 'rain', 'set')]
    
    # 查询Single_table表中name为rain的所有记录
    obj = session.query(Single_table).filter_by(name='rain').all()
    print(obj)          # [<__main__.Single_table object at 0x0000000003ADB390>]
    
    # 查询Single_table表中name为rain的第一条记录
    ret = session.query(Single_table).filter_by(name='rain').first()
    print(ret)          # <__main__.Single_table object at 0x0000000003ADB390>
    
    # 查询Single_table中id为1,name为rain的记录
    ret = session.query(Single_table).filter(Single_table.id == 1, Single_table.name == 'rain').all()
    
    # 查询Single_table中id在[1, 3]之间,name为rain的记录
    ret = session.query(Single_table).filter(Single_table.id.between(1, 3), Single_table.name == 'rain').all()
    
    # in_
    ret = session.query(Single_table).filter(Single_table.id.in_([1, 3, 4])).all()
    # print(ret)
    # 非in_
    ret = session.query(Single_table).filter(~Single_table.id.in_([1, 3, 4])).all()
    
    ret = session.query(Single_table).filter(Single_table.id.in_(session.query(Single_table.id).filter_by(name='rain'))).all()
    print(ret[0].id, ret[0].name, ret[0].extra)     
    # 1 rain set
    
    ret = session.query(Single_table).filter(
        or_(
            Single_table.id < 2,
            and_(Single_table.name == 'rain', Single_table.id > 3),
            Single_table.extra != ""
        )).all()
    
    print(ret[0].id, ret[1].id, ret[2].id)
    
    
    ret = session.query(Single_table).filter(Single_table.name.like('ra%')).all()
    print(ret[0].name, ret[1].name)      # rain ray
    
    # 通配符
    ret = session.query(Single_table).filter(~Single_table.name.like('ra%')).all()
    print(ret[0].name)         
    # wind
    
    # 限制,只打印第二个
    ret = session.query(Single_table)[1:2]
    print(ret)
    
    # 排序,按名字排序
    ret = session.query(Single_table).order_by(Single_table.name.desc()).all()
    print(ret[0].id, ret[1].id, ret[2].id)          
    # 3 2 1
    
    ret = session.query(Single_table).order_by(Single_table.name.desc(), Single_table.id.asc()).all()
    print(ret[0].id, ret[1].id, ret[2].id)
    
    # 分组
    from sqlalchemy.sql import func
    ret = session.query(Single_table).group_by(Single_table.extra).all()
    
    ret = session.query(
        func.max(Single_table.id),
        func.sum(Single_table.id),
        func.min(Single_table.id),
    ).group_by(Single_table.name).all()
    print(ret)    # [(1, Decimal('1'), 1), (2, Decimal('2'), 2), (3, Decimal('3'), 3)]
    
    ret = session.query(
        func.max(Single_table.id),
        func.sum(Single_table.id),
        func.min(Single_table.id),
    ).group_by(Single_table.name).having(func.min(Single_table.id) > 2).all()
    print(ret)    # [(3, Decimal('3'), 3)]

    More than a pair of一对多

    #!/bin/bin/env python
    # -*-coding:utf-8 -*-
    # Author : rain
    
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, String, Integer, ForeignKey, create_engine, UniqueConstraint, Index, or_, and_
    from sqlalchemy.orm import sessionmaker, relationship
    
    engine = create_engine("mysql+pymysql://root:123.com@10.10.36.101/s13", max_overflow=5)
    Base = declarative_base()
    
    # 一对多
    class Groups(Base): __tablename__ = 'groups' nid = Column(Integer, primary_key='True', autoincrement='True') caption = Column(String(24), unique=True) class Users(Base): __tablename__ = 'users' nid = Column(Integer, primary_key='True', autoincrement='True') name = Column(String(24), nullable='True') group_id = Column(Integer, ForeignKey("groups.nid")) # 正向查找 ,反向查找 group = relationship('Groups', backref='user') def init_db(): Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine) Session = sessionmaker(bind=engine) session = Session() # session.add_all(/ session.add_all([ Users(name='tom', group_id=2), Users(name='rain', group_id=2), Users(name='jerry', group_id=2), Users(name='eric', group_id=1), ]) session.commit() # 输出原生sql sql = session.query(Users.name, Groups.caption).join(Groups, isouter=True).filter(Users.name=='rain') print(sql) # SELECT users.name AS users_name, groups.caption AS groups_caption # FROM users LEFT OUTER JOIN groups ON groups.nid = users.group_id # WHERE users.name = :name_1 # 查询rain属于哪个组 ret = session.query(Users.name, Groups.caption).join(Groups, isouter=True).filter(Users.name == 'rain').all() print(ret) # [('rain', 'A_group'), ('rain', 'B_group')] # 输出原生sql sql = session.query(Users.name, Groups.caption).join(Groups, isouter=True) print(sql) # SELECT users.name AS users_name, groups.caption AS groups_caption # FROM users LEFT OUTER JOIN groups ON groups.nid = users.group_id # 查询所有用户分别属于哪个组 ret = session.query(Users.name, Groups.caption).join(Groups, isouter=True).all() print(ret) # [('rain', 'A_group'), ('tom', 'B_group'), ('rain', 'B_group'), ('jerry', 'B_group'), ('eric', 'A_group')] # 输出原生sql sql = session.query(Users.name, Groups.caption).join(Groups, isouter=True).filter(Groups.caption == 'B_group') print(sql) # SELECT users.name AS users_name, groups.caption AS groups_caption # FROM users LEFT OUTER JOIN groups ON groups.nid = users.group_id # WHERE groups.caption = :caption_1 # 查看B_group中有哪些用户 ret = session.query(Users.name, Groups.caption).join(Groups, isouter=True).filter(Groups.caption == 'B_group').all() print(ret) # [('tom', 'B_group'), ('rain', 'B_group'), ('jerry', 'B_group')] # 正向查询 (group = relationship("Group", backref='user'),通过这一句建立关系,然后可以通过这种关系查询更方便) # 查询rain用户属于哪个组 ret = session.query(Users).filter(Users.name == 'rain').first() print(ret.name, ret.group.caption) # rain A_group # 查询所有用户分别属于哪个组 ret = session.query(Users).all() for obj in ret: # obj代指user表的每一行数据 # obj.group代指group对象, print(obj.name, obj.group.caption) # 反向查询 # 查询A_group组有哪些人 obj = session.query(Groups).filter(Groups.caption == 'A_group').first() # obj 指代groups表里组名为A_group的那一行数据 # obj.user 指代users对象(组为A_group的用户数据) for item in obj.user: print(item.name, end=' ')
    many_to_many(多对多)
    #!/bin/bin/env python
    # -*-coding:utf-8 -*-
    # Author : rain
    
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index, Table
    from sqlalchemy.orm import sessionmaker, relationship
    
    engine = create_engine("mysql+pymysql://root:123.com@10.10.36.101/s13", max_overflow=5)
    
    Base = declarative_base()
    
    class Host(Base):
        __tablename__ = 'host'
        nid = Column(Integer, primary_key='True', autoincrement='True')
        hostname = Column(String(32))
        port = Column(String(12))
        ip = Column(String(32))
        host_user = relationship('HostUser', secondary=lambda: HostToHostUser.__table__, backref='host')
    
    
    class HostUser(Base):
        __tablename__ = 'hostuser'
        nid = Column(Integer, primary_key='True', autoincrement='True')
        username = Column(String(32))
    
    # 多对多(需要第三张表,专门用来存关系,一个用户可以登录多个服务器,一个服务器上可以有多个用户)
    class HostToHostUser(Base):
        __tablename__ = 'host_to_host_user'
        nid = Column(Integer, primary_key='True', autoincrement='True')
        host_id = Column(Integer, ForeignKey('host.nid'))
        host_user_id = Column(Integer, ForeignKey('hostuser.nid'))
    
    def init_db():
        Base.metadata.create_all(engine)
    
    def drop_db():
        Base.metadata.drop_all(engine)
    
    Session = sessionmaker(bind=engine)
    session = Session()
    
    session.add_all([
        Host(hostname='web1', port='22', ip='192.168.1.65'),
        Host(hostname='web2', port='22', ip='192.168.1.66'),
        Host(hostname='web3', port='22', ip='192.168.1.67'),
        Host(hostname='web4', port='22', ip='192.168.1.68'),
        Host(hostname='web5', port='22', ip='192.168.1.69'),
    ])
    session.commit()
    
    session.add_all([
        HostUser(username='root'),
        HostUser(username='tom'),
        HostUser(username='jerry'),
        HostUser(username='jack'),
        HostUser(username='rose'),
    ])
    session.commit()
    
    session.add_all([
        HostToHostUser(host_id=1, host_user_id=1),
        HostToHostUser(host_id=1, host_user_id=2),
        HostToHostUser(host_id=1, host_user_id=3),
        HostToHostUser(host_id=2, host_user_id=2),
        HostToHostUser(host_id=2, host_user_id=4),
        HostToHostUser(host_id=2, host_user_id=3),
    ])
    session.commit()
    
    # 需求:获取web1服务器中的所有用户
    
    # 原始方式需要经过三步:
    # 第一步:查询web1的服务器ID
    host_obj = session.query(Host.nid).filter(Host.hostname == 'web1').first()
    print(host_obj)
    # 第二步:查询第三张表(关系表)查询所有用户的ID  host_id == host_obj.nid
    ret1 = session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id == host_obj.nid).all()
    print(ret1)
    uid = list(zip(*ret1))[0]
    # 第三步:根据用户ID查找所有用户
    # users = session.query(HostUser.username).filter(HostUser.nid.in_(uid)).all()
    # users = [x[0] for x in users]
    # print(users)
    
    # 正向查询
    # host_user = relationship('HostUser', secondary=lambda: HostToHostUser.__table__, backref='host')
    # 上面这个话的意思是说给通过第三张表HostToHostUser给HostUser表建立关系
    ret = session.query(Host).filter(Host.hostname == 'web1').first()
    for item in ret.host_user:
        print(item.username, end=' ')
    
    # 需求2:获取tom用户可以登录哪些服务器
    
    # 原始方式需要经过三步
    # 第一步:查询tom用户的id
    user_id = session.query(HostUser.nid).filter(HostUser.username == 'tom').first()
    print(user_id)
    
    # 第二步:查询第三张表(关系表),查询所有服务器的ID, 条件是 user_obj.nid == host_user_id
    group_user_id = session.query(HostToHostUser.host_id).filter(HostToHostUser.host_user_id == user_id.nid).all()
    print(group_user_id)
    g_u_id = list(zip(*group_user_id))[0]
    
    # 根据服务器ID查找服务器hostname
    host_list = session.query(Host).filter(Host.nid.in_(g_u_id)).all()
    host = [x.hostname for x in host_list]
    print(host)
    
    # 反向查询
    host_user = relationship('HostUser', secondary=lambda: HostToHostUser.__table__, backref='host')
    host_user_obj = session.query(HostUser).filter(HostUser.username == 'tom').first()
    for item in host_user_obj.host:
        print(item.hostname)
  • 相关阅读:
    Ubuntu 安装.net core 设置源仓储地址
    ASP.NET Core开发-MVC 使用dotnet 命令创建Controller和View
    ubuntu安装postman
    redis相关操作
    C# StringValues 类型
    MySQL时间戳与日期格式的相互转换
    C# 生成时间戳以及时间戳转换为时间
    关于vscode 使用nuget插件相关说明
    ubuntu一些命令
    使用HBuilderX新建Uniapp项目并在不同平台上运行调试
  • 原文地址:https://www.cnblogs.com/yxy-linux/p/5733004.html
Copyright © 2011-2022 走看看