zoukankan      html  css  js  c++  java
  • doc

       之前写过一篇博客介绍过sqlalchemy的基本用法,本篇博客主要介绍除增删改查以外SQLAlchemy对数据库表的操作,主要内容有单表操作、一对多操作、多对多操作。

    一、单表操作

        单表操作的增删改查在上篇博客中已经详细介绍过,这里不再详细介绍,今天主要对数据库查询在详细介绍一下,下面我们先创建表并插入数据。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
      
    from sqlalchemy import and_, or_
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/mydata", max_overflow=5)
      
    Base = declarative_base()
      
    class Group(Base):
        __tablename__='group'
        nid = Column(Integer, primary_key=True,autoincrement=True)
        caption = Column(String(32))
      
    class User(Base):
        __tablename__='user'
        nid = Column(Integer, primary_key=True,autoincrement=True)
        username = Column(String(32))
        group_id = Column(Integer, ForeignKey('group.nid'))
        group = relationship("Group",backref='uuu'#跟Group表建立关系,方便查询,常和ForeignKey在一起使用
      
    def init_table():
        """
        创建表,调用Base类的子类
        :return:
        """
        Base.metadata.create_all(engine)
      
    def drop_table():
        Base.metadata.drop_all(engine)
      
    init_table()
    Session = sessionmaker(bind=engine)
    session = Session()
      
    # 单表操作:
    session.add(Group(caption='dba'))   #往组里添加数据
    session.add(Group(caption='dddd'))
    session.commit()
      
    session.add_all([
        User(username='jack1',group_id=1),
        User(username='jack2',group_id=1),
        User(username='jack1',group_id=2),
        User(username='jack1',group_id=1),
        User(username='jack2',group_id=1),
    ])
    session.commit()

    1,条件查询

    1
    2
    3
    4
    5
    6
    7
    8
    9
    #查询用户jack1的nid,filter和filter_by两种书写方式
    ret1 = session.query(User.nid).filter(User.username=='jack1').all()
    print(ret1)
    ret2 = session.query(User.nid).filter_by(username='jack1').all()
    print(ret2)
     
    #结果:
    [(1,), (3,), (4,)]
    [(1,), (3,), (4,)]
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    #查询用户nid大于1并且username等于jack2的nid
    ret1 = session.query(User.nid).filter(User.nid >1,User.username=='jack2').all()
    print(ret1)
     
    #结果:
    [(2,), (5,)]
     
    #查询nid在1和3之间username等于jack1的所有信息
    ret2=session.query(User.nid,User.username).filter(User.nid.between(1,3),User.username=='jack1').all()
    print(ret2)
     
    #结果:
    [(1, 'jack1'), (3, 'jack1')]
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    #查询用户nid在1,3,4这个列表里的用户信息
    ret=session.query(User.nid,User.username).filter(User.nid.in_([1,3,4])).all()
    print(ret)
       
    #结果:
    [(1, 'jack1'), (3, 'jack1'), (4, 'jack1')]
       
    #取反,查询用户nid不在1,3,4这个列表里的用户信息
    ret1=session.query(User.nid,User.username).filter(~User.nid.in_([1,3,4,])).all()
    print(ret1)
       
    #结果:
    [(2, 'jack2'), (5, 'jack2')]
      
    #查询username='jack1'的所有信息
    ret2 = session.query(User.nid,User.username).filter(
    User.nid.in_(session.query(User.nid).filter_by(username='jack1'))).all()
    print(ret2)
      
    #结果:
    [(1, 'jack1'), (3, 'jack1'), (4, 'jack1')]
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    #查询nid大于3并且username='jack1'的信息
    ret = session.query(User.nid,User.username).filter(and_(User.nid >3,User.username=='jack1')).all()
    print(ret)
     
    #结果:
    [(4, 'jack1')]
     
    #查询nid小于2或者username等于jack1的数据
    ret = session.query(User.nid,User.username).filter(
    or_(User.nid < 2, User.username == 'jack1')).all()
    print(ret)
     
    #查询用户nid小于2或者username等于jack1并且nid大于3的信息
    ret = session.query(User.nid,User.username).filter(
        or_(User.nid < 2,and_(User.username == 'jack1', User.nid > 3))).all()
    print(ret)
     
    #结果:
    [(1, 'jack1'), (4, 'jack1')]

    二、通配符

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    #模糊匹配用户名以字母j开头的所有数据
    ret = session.query(User.nid,User.username).filter(User.username.like('j%')).all()
     
    #结果:
    [(1, 'jack1'), (2, 'jack2'), (3, 'jack1'), (4, 'jack1'), (5, 'jack2')]
     
    #取反
    ret1 = session.query(User.nid,User.username).filter(~User.username.like('j%')).all()
    print(ret)
    print(ret1)
     
    #结果:
    []

    三、限制

    1
    2
    3
    4
    5
    ret=session.query(User.nid,User.username)[1:2]
    print(ret)
     
    #结果:
    [(2, 'jack2')]

    四、排序

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    #倒序排序
    ret=session.query(User.nid,User.username).order_by(User.nid.desc()).all()
    print(ret)  
     
    #结果:
    [(5, 'jack2'), (4, 'jack1'), (3, 'jack1'), (2, 'jack2'), (1, 'jack1')]
     
    #正序排序
    ret1=session.query(User.nid,User.username).order_by(User.nid.asc()).all()
    print(ret1)
     
    #结果:
    [(1, 'jack1'), (2, 'jack2'), (3, 'jack1'), (4, 'jack1'), (5, 'jack2')]

     五、分组

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    #导入模块
    from sqlalchemy.sql import func
      
    ret = session.query(User.nid,User.username).group_by(User.nid).all()
    print(ret)
      
    #结果:
    [(1, 'jack1'), (2, 'jack2'), (3, 'jack1'), (4, 'jack1'), (5, 'jack2')]
      
    ret1=session.query(
        func.max(User.nid),
        func.sum(User.nid),
        func.min(User.nid),).group_by(User.username).all()
    print(ret1)
      
    #结果:
    [(4, Decimal('8'), 1), (5, Decimal('7'), 2)]
      
    ret2=session.query(
        func.max(User.nid),
        func.sum(User.nid),
        func.min(User.nid), ).group_by(User.username).having(func.min(User.nid)>1).all()
    print(ret2)
      
    #结果:
    [(5, Decimal('7'), 2)]
     
    #打印SQL语句:
    from sqlalchemy.sql import func
    ret2=session.query(
        func.max(User.nid),
        func.sum(User.nid),
        func.min(User.nid), ).group_by(User.username).having(func.min(User.nid)>1)
    print(ret2)
     
    #结果:
    SELECT max("user".nid) AS max_1, sum("user".nid) AS sum_1, min("user".nid) AS min_1
    FROM "user" GROUP BY "user".username
    HAVING min("user".nid) > :min_2
    [('jack1', 'dba'), ('jack2', 'dddd')]
    SELECT "user".nid AS user_nid, "user".username AS user_username, "user".group_id AS user_group_id
    FROM "user" LEFT OUTER JOIN "group" ON "group".nid = "user".group_id

    六、组合

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    q1=session.query(User.username).filter(User.nid >2)
    q2=session.query(Group.caption).filter(Group.nid <2)
    ret = q1.union(q2).all()
    print(ret)
     
    #结果:
    [('jack1',), ('jack2',), ('dba',)]
     
    q1=session.query(User.username).filter(User.nid >2)
    q2=session.query(Group.caption).filter(Group.nid <2)
    ret = q1.union_all(q2).all()
    print(ret)
     
    #结果:
    [('jack1',), ('jack1',), ('jack2',), ('dba',)]

    二、一对多操作

        一对多的关系就需要我们外键来进行约束,下面我们来举例来说明一对多进行连表操作。

    1,原始方式:通过join方法来进行连表操作。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    ret = session.query(User.username,Group.caption).filter(User.nid==Group.nid).all()
    print(ret)
       
    #结果:
    [('jack1', 'dba'), ('jack2', 'dddd')]
       
    #通过join来进行连表操作,加isouter的区别:
    sql1 = session.query(User).join(Group,isouter=True)
    print(sql1)
       
    #结果:
    SELECT "user".nid AS user_nid, "user".username AS user_username, "user".group_id AS user_group_id
    FROM "user" LEFT OUTER JOIN "group" ON "group".nid = "user".group_id
       
    sql2 = session.query(User).join(Group)
    print(sql2)
       
    #结果:
    SELECT "user".nid AS user_nid, "user".username AS user_username, "user".group_id AS user_group_id
    FROM "user" JOIN "group" ON "group".nid = "user".group_id
     
    #连表操作
    ret = session.query(User.username,Group.caption).join(Group,isouter=True).filter(Group.caption == 'dba').all()
    print(ret)
     
    #结果:
    [('jack1', 'dba'), ('jack2', 'dba'), ('jack1', 'dba')]

    2,新方式:通过建立relationship的方式

    1
    2
    3
    4
    5
    6
    7
    #首先在创建表的类中加入relationship字段
    class User(Base):
        __tablename__='user'
        nid = Column(Integer, primary_key=True,autoincrement=True)
        username = Column(String(32))
        group_id = Column(Integer, ForeignKey('group.nid'))
        group = relationship("Group",backref='uuu')   #跟Group表建立关系,方便查询,backref为虚拟列

    正向查询:通过User表查询Group表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    ret = session.query(User).all()
    for obj in ret:
        #obj代指user表的每一行数据
        #obj.group代指group对象
        print(obj.nid,obj.username,obj.group_id,obj.group_id,obj.group,
              obj.group.nid,obj.group.caption)
     
    #结果:
    1 jack1 1 1 <__main__.Group object at 0x0000015D762F4630> 1 dba
    2 jack2 1 1 <__main__.Group object at 0x0000015D762F4630> 1 dba
    3 jack1 2 2 <__main__.Group object at 0x0000015D762F47F0> 2 dddd
    4 jack1 1 1 <__main__.Group object at 0x0000015D762F4630> 1 dba
    5 jack2 2 2 <__main__.Group object at 0x0000015D762F47F0> 2 dddd

    反向查询:通过Group表查询User表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    obj = session.query(Group).filter(Group.caption == 'dba').first()
    print(obj.nid)
    print(obj.caption)
    print(obj.uuu)
     
    #结果:
    1
    dba
    [<__main__.User object at 0x000002606096C5C0>, <__main__.User object at 0x000002606096C630>, <__main__.User object at 0x000002606096C6A0>]

      我们可以看到上面的例子输出的为对象的列表,输出不太友好,为了达到自己想要的结果,我们可以进行自定义返回结果,请看下面代码,加入__repr__函数:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    class User(Base):
        __tablename__='user'
        nid = Column(Integer, primary_key=True,autoincrement=True)
        username = Column(String(32))
        group_id = Column(Integer, ForeignKey('group.nid'))
        group = relationship("Group",backref='uuu'#跟Group表建立关系,方便查询,常和ForeignKey在一起使用
     
        def __repr__(self):
            """
            自定义返回结果
            :return:
            """
            temp = '%s:%s:%s' % (self.nid,self.username,self.group_id)
            return temp

    三、多对多操作

    1,创建表结构并插入信息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
     
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/mydata", max_overflow=5)
     
    Base = declarative_base()
     
    class Host(Base):
        __tablename__ = 'host'
        nid = Column(Integer, primary_key=True,autoincrement=True)
        hostname = Column(String(32))
        port = Column(String(32))
        ip = Column(String(32))
     
    class HostUser(Base):
        __tablename__ = 'host_user'
        nid = Column(Integer, primary_key=True,autoincrement=True)
        username = Column(String(32))
     
    #使用for循环时,通过正向反向查询
    class HostToHostUser(Base):
        __tablename__ = 'host_to_host_user'
        nid = Column(Integer, primary_key=True,autoincrement=True)
     
        host_id = Column(Integer,ForeignKey('host.nid'))
        host_user_id = Column(Integer,ForeignKey('host_user.nid'))
     
     
        host = relationship("Host",backref='h')
        host_user = relationship("HostUser",backref='u')
     
    def init_table():
        """
        创建表,调用Base类的子类
        :return:
        """
        Base.metadata.create_all(engine)
     
    def drop_table():
        Base.metadata.drop_all(engine)
     
    init_table()
    Session = sessionmaker(bind=engine)
    session = Session()
     
    session.add_all([
        Host(hostname='c1',port='22',ip='1.1.1.1'),
        Host(hostname='c2',port='22',ip='1.1.1.2'),
        Host(hostname='c3',port='22',ip='1.1.1.3'),
        Host(hostname='c4',port='22',ip='1.1.1.4'),
        Host(hostname='c5',port='22',ip='1.1.1.5'),
    ])
    session.commit()
     
     
    session.add_all([
        HostUser(username='root'),
        HostUser(username='db'),
        HostUser(username='nb'),
        HostUser(username='sb'),
    ])
    session.commit()
     
    session.add_all([
        HostToHostUser(host_id=1,host_user_id=1),
        HostToHostUser(host_id=1,host_user_id=2),
        HostToHostUser(host_id=1,host_user_id=3),
        HostToHostUser(host_id=2,host_user_id=2),
        HostToHostUser(host_id=2,host_user_id=4),
        HostToHostUser(host_id=2,host_user_id=3),
    ])
    session.commit()

    2,需求:获取主机1中所有的用户

     方法一:通过一步一步取

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    host_obj = session.query(Host).filter(Host.hostname == 'c1').first()
    # #取出host_obj.nid
    host_to_host_user = session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id== host_obj.nid).all()
    #
    # #因为取出来的结果是[(1,),(2,),(3,)],我们通过内置函数zip来转换成想要的结果
    r = zip(*host_to_host_user)
    #
    users =session.query(HostUser.username).filter(HostUser.nid.in_(list(r)[0])).all()
    print(users)
     
    #结果:
    [('root',), ('db',), ('nb',)]

    方法二:通过join的方式

    1
    2
    #通过代码整合的代码,相当复杂
    session.query(HostUser.name).filter(HostUser.nid.in_(session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id == session.query(Host.nid).filter(Host.hostname == 'c1'))))

    方法三:通过建立relationship的方式

     1,对象

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
     
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/mydata", max_overflow=5)
     
    Base = declarative_base()
     
    class HostToHostUser(Base):
        __tablename__ = 'host_to_host_user'
        nid = Column(Integer, primary_key=True,autoincrement=True)
     
        host_id = Column(Integer,ForeignKey('host.nid'))
        host_user_id = Column(Integer,ForeignKey('host_user.nid'))
     
    class Host(Base):
        __tablename__ = 'host'
        nid = Column(Integer, primary_key=True,autoincrement=True)
        hostname = Column(String(32))
        port = Column(String(32))
        ip = Column(String(32))
        host_user = relationship('HostUser',secondary=HostToHostUser.__table__,backref='h')
     
    # host_user = relationship('HostUser', secondary=lambda: HostToHostUser.__table__, backref='h')
    #这里加lambda是因为关系表在下面,可以不加lambda,但是关系表要放上面
     
     
    class HostUser(Base):
        __tablename__ = 'host_user'
        nid = Column(Integer, primary_key=True,autoincrement=True)
        username = Column(String(32))
     
    def init_table():
        """
        创建表,调用Base类的子类
        :return:
        """
        Base.metadata.create_all(engine)
     
    def drop_table():
        Base.metadata.drop_all(engine)
     
    Session = sessionmaker(bind=engine)
    session = Session()
     
    host_obj= session.query(Host).filter(Host.hostname=='c1').first()
    print(host_obj.host_user)

    2,类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
     
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/mydata", max_overflow=5)
     
    Base = declarative_base()
     
    class HostToHostUser(Base):
        __tablename__ = 'host_to_host_user'
        nid = Column(Integer, primary_key=True,autoincrement=True)
     
        host_id = Column(Integer,ForeignKey('host.nid'))
        host_user_id = Column(Integer,ForeignKey('host_user.nid'))
     
    class Host(Base):
        __tablename__ = 'host'
        nid = Column(Integer, primary_key=True,autoincrement=True)
        hostname = Column(String(32))
        port = Column(String(32))
        ip = Column(String(32))
        host_user = relationship('HostUser',secondary=HostToHostUser.__table__,backref='h')
     
    # host_user = relationship('HostUser', secondary=lambda: HostToHostUser.__table__, backref='h')
    #这里加lambda是因为关系表在下面,可以不加lambda,但是关系表要放上面
     
     
    class HostUser(Base):
        __tablename__ = 'host_user'
        nid = Column(Integer, primary_key=True,autoincrement=True)
        username = Column(String(32))
     
    def init_table():
        """
        创建表,调用Base类的子类
        :return:
        """
        Base.metadata.create_all(engine)
     
    def drop_table():
        Base.metadata.drop_all(engine)
     
    Session = sessionmaker(bind=engine)
    session = Session()
     
    host_obj= session.query(Host).filter(Host.hostname=='c1').first()
    print(host_obj.host_user)

    今天SQLALchemy就介绍到这里,更多参考信息请参考:

  • 相关阅读:
    mysql同步故障解决
    linux环境变量
    shell脚本中的交互式输入自动化
    ERROR oslo_service.service PlacementNotConfigured 解决办法
    openstack源
    rabbitmq——用户管理
    linux lvm扩容
    UWB DWM1000 开源项目框架 之 信号强度指示RSSI
    UWB DWM1000 开源项目框架 之 温度采集
    UWB DWM1000 开源项目框架
  • 原文地址:https://www.cnblogs.com/phennry/p/5731299.html
Copyright © 2011-2022 走看看