zoukankan      html  css  js  c++  java
  • SQLAlchemy

    SQLAlchemy

    1.1.SQLAlchemy介绍

    SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

     安装

    pip install sqlalchemy

    组成部分

    Engine,框架的引擎
    Connection Pooling ,数据库连接池
    Dialect,选择连接数据库的DB API种类
    Schema/Types,架构和类型
    SQL Exprression Language,SQL表达式语言

    SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:

    MySQL-Python
        mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
        
    pymysql
        mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
        
    MySQL-Connector
        mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
        
    cx_Oracle
        oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
        
    更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html

    1.2.SQLAlchemy表结构

     (1)创建单表

    import datetime
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
    
    Base = declarative_base()
    
    
    class Users(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=False)
        email = Column(String(32), unique=True)
        ctime = Column(DateTime, default=datetime.datetime.now)
        extra = Column(Text, nullable=True)
    
        __table_args__ = (
            UniqueConstraint('id', 'name', name='uix_id_name'),    #id和name联合唯一
            Index('ix_id_name', 'name', 'email'),            #索引
        )
    
    
    def init_db():
        """
        根据类创建数据库表
        :return:
        """
        engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
        Base.metadata.create_all(engine)
    
    
    def drop_db():
        """
        根据类删除数据库表
        :return:
        """
        engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
        Base.metadata.drop_all(engine)
    
    
    if __name__ == '__main__':
        drop_db()
        init_db()

    (2)创建多个表(包含FK,M2M关系)

    import datetime
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
    from sqlalchemy.orm import relationship
    
    Base = declarative_base()
    
    
    # ##################### 单表示例 #########################
    class Users(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True)
        age = Column(Integer, default=18)
        email = Column(String(32), unique=True)
        ctime = Column(DateTime, default=datetime.datetime.now)
        extra = Column(Text, nullable=True)
    
        __table_args__ = (
            # UniqueConstraint('id', 'name', name='uix_id_name'),
            # Index('ix_id_name', 'name', 'extra'),
        )
    
    
    class Hosts(Base):
        __tablename__ = 'hosts'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True)
        ctime = Column(DateTime, default=datetime.datetime.now)
    
    
    # ##################### 一对多示例 #########################
    class Hobby(Base):
        '''爱好'''
        __tablename__ = 'hobby'
        id = Column(Integer, primary_key=True)
        caption = Column(String(50), default='篮球')
    
    
    class Person(Base):
        __tablename__ = 'person'
        nid = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=True)
        hobby_id = Column(Integer, ForeignKey("hobby.id"))    #hobby指的__tablename__ = 'hobby',而不是类名Hobby
    
        # 与生成表结构无关,仅用于查询方便
        hobby = relationship("Hobby", backref='pers')
    
    
    # ##################### 多对多示例 #########################
    
    #第三张表要自己生成
    class Server2Group(Base):
        __tablename__ = 'server2group'
        id = Column(Integer, primary_key=True, autoincrement=True)
        server_id = Column(Integer, ForeignKey('server.id'))
        group_id = Column(Integer, ForeignKey('group.id'))
    
    
    class Group(Base):
        __tablename__ = 'group'
        id = Column(Integer, primary_key=True)
        name = Column(String(64), unique=True, nullable=False)
    
        # 与生成表结构无关,仅用于查询方便
        servers = relationship('Server', secondary='server2group', backref='groups')
    
    
    class Server(Base):
        __tablename__ = 'server'
    
        id = Column(Integer, primary_key=True, autoincrement=True)
        hostname = Column(String(64), unique=True, nullable=False)
    
    
    def init_db():
        """
        根据类创建数据库表
        :return:
        """
        engine = create_engine(
            "mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
        Base.metadata.create_all(engine)
    
    
    def drop_db():
        """
        根据类删除数据库表
        :return:
        """
        engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
        Base.metadata.drop_all(engine)
    
    
    if __name__ == '__main__':
        drop_db()
        init_db()
    View Code

    实例:

    models.py

    import datetime
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
    from sqlalchemy.orm import relationship
    
    Base = declarative_base()
    
    
    # ##################### 单表示例 #########################
    class Users(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True)
        age = Column(Integer, default=18)
        email = Column(String(32), unique=True)
        ctime = Column(DateTime, default=datetime.datetime.now)
        extra = Column(Text, nullable=True)
    
        __table_args__ = (
            # UniqueConstraint('id', 'name', name='uix_id_name'),
            # Index('ix_id_name', 'name', 'extra'),
        )
    
    
    class Hosts(Base):
        __tablename__ = 'hosts'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True)
        ctime = Column(DateTime, default=datetime.datetime.now)
    
    
    # ##################### 一对多示例 #########################
    class Hobby(Base):
        '''爱好'''
        __tablename__ = 'hobby'
        id = Column(Integer, primary_key=True)
        caption = Column(String(50), default='篮球')
    
    
    class Person(Base):
        __tablename__ = 'person'
        nid = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=True)
        hobby_id = Column(Integer, ForeignKey("hobby.id"))    #hobby指的__tablename__ = 'hobby',而不是类名Hobby
    
        # 与生成表结构无关,仅用于查询方便
        hobby = relationship("Hobby", backref='pers')
    
    
    # ##################### 多对多示例 #########################
    
    #第三张表要自己生成
    class Server2Group(Base):
        __tablename__ = 'server2group'
        id = Column(Integer, primary_key=True, autoincrement=True)
        server_id = Column(Integer, ForeignKey('server.id'))
        group_id = Column(Integer, ForeignKey('group.id'))
    
    
    class Group(Base):
        __tablename__ = 'group'
        id = Column(Integer, primary_key=True)
        name = Column(String(64), unique=True, nullable=False)
    
        # 与生成表结构无关,仅用于查询方便
        servers = relationship('Server', secondary='server2group', backref='groups')
    
    
    class Server(Base):
        __tablename__ = 'server'
    
        id = Column(Integer, primary_key=True, autoincrement=True)
        hostname = Column(String(64), unique=True, nullable=False)
    
    if __name__ == '__main__':
        engine = create_engine(
            "mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
        Base.metadata.create_all(engine)   #创建
    
        # Base.metadata.drop_all(engine)   #删除

    1.3.SQLAlchemy两种连接方式

    第一种

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    import models
    
    #1.创建连接池
    engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    
    #2.从连接池中获取数据库连接
    session = Session()
    
    #3.执行ORM操作
    obj1 = models.Users(name="derek",email='derek@163.com')
    session.add(obj1)
    # 提交事务
    session.commit()
    
    # 4.关闭数据库连接(将连接放回连接池)
    session.close()

     

    第二种、基于scoped_session实现线程安全

    首先导入,然后只要修改session = scoped_session(Session)

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from sqlalchemy.orm import scoped_session
    import models
    
    #1.创建连接池
    engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    
    #2.从连接池中获取数据库连接
    # session = Session()
    session = scoped_session(Session)
    
    #3.执行ORM操作
    obj1 = models.Users(name="jack",email='jack@163.com')
    session.add(obj1)
    # 提交事务
    session.commit()
    
    # 4.关闭数据库连接
    session.close()

    1.4.增加数据

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from sqlalchemy.orm import scoped_session
    import models
    
    #1.创建连接池
    engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    
    #2.从连接池中获取数据库连接
    session = Session()
    # session = scoped_session(Session)
    
    # #3.执行ORM操作
    # obj1 = models.Users(name="jack",email='jack@163.com')
    # session.add(obj1)
    # # 提交事务
    # session.commit()
    
    #批量增加,里面是列表
    session.add_all([
        models.Users(name="jack1",email='jack1@163.com'),
        models.Users(name="jack2",email='jack2@163.com'),
    ])
    session.commit()
    
    # 4.关闭数据库连接
    session.close()

    1.5.查看和删除数据

    #查看数据
    user_list = session.query(models.Users).all()
    for row in user_list:
        print(row.id)
        print(row.name)
        print(row.email)
        print(row.ctime)
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from sqlalchemy.orm import scoped_session
    import models
    
    #1.创建连接池
    engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    
    #2.从连接池中获取数据库连接
    session = Session()
    # session = scoped_session(Session)
    
    # #3.执行ORM操作
    # obj1 = models.Users(name="jack",email='jack@163.com')
    # session.add(obj1)
    # # 提交事务
    # session.commit()
    
    # #批量增加,里面是列表
    # session.add_all([
    #     models.Users(name="jack1",email='jack1@163.com'),
    #     models.Users(name="jack2",email='jack2@163.com'),
    # ])
    # session.commit()
    
    #查看数据
    user_list = session.query(models.Users).all()
    for row in user_list:
        print(row.id)
        print(row.name)
        print(row.email)
        print(row.ctime)
    
    # 4.关闭数据库连接
    session.close()
    View Code

    查看的结果:

    添加过滤条件

    user_list = session.query(models.Users).filter(models.Users.id > 2)   #id大于2的

     删除数据

    #删除数据
    session.query(models.Users).filter(models.Users.id > 4).delete()
    session.commit()
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from sqlalchemy.orm import scoped_session
    import models
    
    #1.创建连接池
    engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    
    #2.从连接池中获取数据库连接
    session = Session()
    # session = scoped_session(Session)
    
    # #3.执行ORM操作
    # obj1 = models.Users(name="jack",email='jack@163.com')
    # session.add(obj1)
    # # 提交事务
    # session.commit()
    
    # #批量增加,里面是列表
    # session.add_all([
    #     models.Users(name="jack1",email='jack1@163.com'),
    #     models.Users(name="jack2",email='jack2@163.com'),
    # ])
    # session.commit()
    
    #查看数据
    # user_list = session.query(models.Users).all()
    # user_list = session.query(models.Users).filter(models.Users.id > 2)   #id大于2的
    # for row in user_list:
    #     print(row.id)
    #     print(row.name)
    #     print(row.email)
    #     print(row.ctime)
    
    #删除数据
    session.query(models.Users).filter(models.Users.id > 4).delete()
    session.commit()
    
    
    # 4.关闭数据库连接
    session.close()
    View Code

    1.6.修改数据

    #修改数据
    session.query(models.Users).filter(models.Users.id == 4).update({'name':'Tom'})
    session.query(models.Users).filter(models.Users.id == 1).update({'name': models.Users.name + "099"}, synchronize_session=False)
    session.query(models.Users).filter(models.Users.id == 3).update({"age": models.Users.age + 1}, synchronize_session="evaluate") 
    session.commit()

    结果:

  • 相关阅读:
    jquery实现选项卡(两句即可实现)
    常用特效积累
    jquery学习笔记
    idong常用js总结
    织梦添加幻灯片的方法
    LeetCode "Copy List with Random Pointer"
    LeetCode "Remove Nth Node From End of List"
    LeetCode "Sqrt(x)"
    LeetCode "Construct Binary Tree from Inorder and Postorder Traversal"
    LeetCode "Construct Binary Tree from Preorder and Inorder Traversal"
  • 原文地址:https://www.cnblogs.com/derek1184405959/p/9032613.html
Copyright © 2011-2022 走看看