zoukankan      html  css  js  c++  java
  • 认真写写SQLAlchemy

    SQLAlchemy 是一款十分强大的ORM数据处理组件,以面向对象的形式来操作数据库,支持连接池、支持事务以及一些复杂查询功能。极大程度提高了编程语言操作数据库的能力。而为什么现代编程语言都强调使用ORM对象,而不是原生方式来操作数据库的原因,此处不再赘述,可百度了解。

    一、创建引擎

    任何的 SQLAlchemy 应用都开始与一个叫 Engine 的对象。此对象是到特定数据库的链接中心源,为这些数据库链接提供一个称为连接池的统一工厂。它是一个典型的为特定数据库服务器创建的全局对象。

    _engine = create_engine(_connstr, pool_size=5, pool_recycle=1200)
    

    create_engine 方法还有一个名为 echo 的参数,默认值为 False ,如果设置为 True ,会在执行 SQL 语句时,将语句输出到控制台。

    数据库连接字符串的格式为:

    dialect[+driver]://user:password@host/dbname[?key=value..] 
    

    dialect 为某个指定的数据库名称,例如: mysql , oracle 等;driver 为数据库接口名称,例如:pyodbccx_oracle 等。常见的两个链接字符串:

    • mysql+pymysql://user:password@hostname:port/dbname?charset=uft8
    • mssql+pyodbc://user:password@server/dbname?driver={ODBC+Driver+17+for+SQL+Server}

    在建立成功后,一旦调用 Engine.connect() 或调用依赖于它的方法(如 Engine.execute()),新生成的引擎将从基础池请求连接。当收到此请求时,池将依次建立第一个实际的DBAPI连接。

    引起是延迟加载的,这意味着,当使用 create_engine() 创建对象时,并不直接建立任何实际的DBAPI连接。

    如果链接字符串中包含诸如 @ 、# 等特殊字符,可能会导致创建失败,可以用 quote_plus 方法进行包装:

    from urllib.parse import quote_plus
    _connectionString = f'mysql+pymysql://user:{quote_plus(password)}@hostname:port/dbname'
    

    二、管理ORM

    ORM是对象关系模型的简称,对于所有类似的数据库操作组件来说,本质上都是开发语言操作数据库的工具。对下,必须依赖于底层数据库驱动;对上,提供了核心工具和ORM两种工作模式。核心工具,可以不准确的抽象理解为就是编写原生SQL语句,通过核心工具来传输到数据库执行;而ORM模式,则是此类工具真正灵魂的地方,即用面向对象变成的方式来操作数据库。

    因此,我们首先要了解的是 对象 ,即如何在编程语言中使用数据库对象,可以粗暴的将其理解为数据库对象在代码中的模型。而这个数据库对象,对于目前的SQLAlchemy来说,主要是: MetaData (元数据)、Table (表)和 Column (列)。

    SQLAlchemy 的官方文档,从几个角度描述了如何声明一个数据表对象,例如: MetaData()registry()declarative_base() 。后两者实际上前者的简化扩展,不必过分纠结使用哪个更合适。我会已 declarative_base 做为主要使用说明,想了解更多,参看文末的元数据。

    简单定义

    下面的代码,定义了一个最简单的对象模型 Blog ,包含主键和2个文本字段。

    # 定义一个元数据基类
    Base = declarative_base()
    
    # 定义一个数据表模型对象
    class Blog(Base):
        __tablename__ = 'blog'
        id = Column(Integer, primary_key=True, autoincrement=True)
        blog_name = Column(String(40))
        blog_content = Column(String(8000))
    
    # 提交到数据库中
    Base.metadata.create_all(engine)
    

    从文本中可以看出 __tablename__ 的值即为数据库中的表名称。当然这也意味着,这个属性必须有唯一的值。

    关于字段

    从上面的代码中可以看出,一个 Column 定义了表中的一个字段,上面只是最简单的用法。下面演示了另外几个比较常用的参数:

    # 非空及注释
    blog_name = Column(String(40), nullable=False, comment='博文标题')
    # 默认值
    blog_vote = Column(Integer, default=0, server_default='0')
    
    # 特殊默认值
    time1 = Column(TIMESTAMP, server_default=text('CURRENT_TIMESTAMP'))
    time2 = Column(TIMESTAMP, server_default=func.now())
    time3 = Column(TIMESTAMP, server_default=text('CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP'))
    

    关于 defaultserver_default 的主要区别在于,前者仅用于 Python 端调用,而后者是实现在数据库端的。

    对于时间类型的默认值,有几种特殊的用法。func 用于调用数据库函数做为默认值, text 用于包装数据库关键字做为默认值。对于 MySQL 数据库,务必注意 server_onupdate 参数并不能实现数据库中的 ON UPDATE CURRENT_TIMESTAMP 操作,只能通过默认值方式来实现。

    约束及索引

    外键是约束的一部分,因此在描述外键和索引之前,先定义另外一个表对象,并同时修改上面的表定义,最终得到的代码如下:

    class Blog(Base):
        __tablename__ = 'blog'
        id = Column(Integer, primary_key=True, autoincrement=True, comment='主键,自增列')
        blog_name = Column(String(40), nullable=False, comment='博文标题')
        blog_content = Column(String(8000), comment='博文内容')
    
        create_time = Column(TIMESTAMP, server_default=text('CURRENT_TIMESTAMP'), comment='创建时间')
    
        blog_property = relationship('BlogProperty')
    
    
    Index('idx_blog_search', Blog.create_time.desc())
    
    
    class BlogProperty(Base):
        __tablename__ = 'blog_property'
        id = Column(Integer, primary_key=True, autoincrement=True, comment='主键,自增列')
        blog_id = Column(Integer, ForeignKey('blog.id'), nullable=False, comment='外键,博文编号')
        blog_vote = Column(Integer, default=0, server_default='0', comment='投票数')
        blog_view = Column(Integer, default=0, server_default='0', comment='浏览数')
        last_update_time = Column(TIMESTAMP, server_default=text('CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP'), comment='最后修改时间')
    
    
    UniqueConstraint(BlogProperty.blog_id)
    

    与前面代码的显著区别,在于新增了几个声明:

    • Index : 用于声明索引
    • ForeignKey : 用于声明外键
    • UniqueConstraint : 用于声明唯一性约束

    上面演示了如何从代码关联到数据库,但在实际的操作中一般不太会利用代码来管理数据库结构。一个核心原因是,SQLAlchemy 并没有提供很好的结构化修改方式,还是要依赖于执行 SQL 语句。因此,最好将其只用作管理 数据,而不是 结构

    三、生成会话

    会话是 SQLAlchemy 中非常重要的一个概念。前面的引擎只是构建了一个底层的管道,模型是被操作的对象,而会话正是通过引擎来操作模型的接口。会话本身的概念十分复杂,可以专门浏览官方文档,这里只展示操作。

    创建会话

    官方推荐使用 sessionmaker 方法来创建会话对象,由于应用程序通常在模块范围内有一个引擎对象,sessionmaker 则可以为使用该引擎的所有会话对象提供统一的工厂。

    # 一般用法
    from sqlalchemy.orm import sessionmaker
    
    _DBSession = sessionmaker(bind=_engine)
    
    _session = _DBSession()
    _session2 = _DBSession()
    print(_session is _session2)
    

    从输出结果来看,大家也不难理解,这不就是实例化了两个对象么~但是对于数据库操作来说,这种状况就不是那么友好了,这表示即使你使用的是一个引擎实例化出的会话,也并不是一个,同样意味着,每次的会话都是新的会话,这并不是太好的体验,因此下面介绍会话范围对象 scoped_session

    会话范围

    # 推荐用法,用 scoped_session 创建线程安全的会话对象
    from sqlalchemy.orm import scoped_session
    _session_factory = sessionmaker(bind=_engine)
    _DBSession = scoped_session(_session_factory)
    _session = _DBSession()
    _session2 = _DBSession()
    print(_session is _session2)
    _DBSession.remove()
    

    此时的输出结果,可以看出两个会话是同一个对象,对照着前面的描述,针对实际项目中经常疑惑的比如会话何时开启、何时提交、何时关闭,也就不难得出,对于使用了 SQLAlchemyPython 项目来说,强烈推荐使用 scoped_session 来管理会话。

    Web应用

    注意上文在描述会话及会话范围时,提到个不太重要的重要词:模块内 。很明显,一般的Web应用,并不能算作单一的模块。根据官方文档的说法,想如此使用会话,有两个必要要求:

    • 当web第一次启动时,要注册独立的 scoped_session,并且确保应用程序的其他部分可以访问这个对象;
    • 确保当web请求结束时,调用 scoped_session.remove() , 这通常要与web框架的系统事件继承来建立 on request end 事件。

    然而,这种操作已然不属于数据库编程的范畴。因此,可以有两个选择:

    • 无视,毕竟对于多数百万、千万级的一般应用,完全可以通过堆服务器性能来解决这个平静;
    • 重视,利用web框架提供的集成组件来替代远程组件,例如:flask-sqlalchemy(另文开说)。

    四、编辑数据

    构建数据库的原因自然是要操作数据库,官方提供了所谓 1.x Style2.0 Style 两种操作数据的方法,至少目前官网没有明确说明两者的显著差别,可以视情况使用。我们首先讲述数据的增、删和改。

    插入数据

    _model = Blog(blog_name=blog_name, blog_content=blog_content, blog_property=BlogProperty())
    _session.add(_model)
    _session.commit()
    

    继承了 declarative_base 的对象模型,无需自行编写 __init__ 来实现构造方法,可直接使用。同时,如上面所演示的,在创建关联表数据时无需关心具体的实现,只需要根据相关定义,给对象的关联属性赋值即可,具体值会有组件自行解决。

    提交数据

    _session.flush()
    _session.commit()
    

    在很多文档中,都会特别说明 add 对象后调用 commit 方法的必要性。而有时,又会跟另外一个类似的方法 flush 产生一些混淆。其重要区别在于 commit 会产生实际的提交行为,将数据写入到数据库中;而 flush 并不会,但是其会增加自增列的数值。在官方文档中,明确说明了实际上在执行 commit 之前会主动调用 flush 方法。当然,也可以在 sessionmaker 生成会话对象时,指定不自动执行 flush 方法。

    修改和删除数据

    修改和删除数据的操作方式基本类似,按照 1.x Style 的风格,就是:筛选 --> 操作 --> 提交。

    _session.query(Blog).filter(Blog.id == id).update({"blog_name": blog_name, "blog_content": blog_content}, synchronize_session="fetch")
    _session.commit()
    
    _session.query(Blog).filter(Blog.id == id).delete(synchronize_session="fetch")
    _session.commit()
    

    其中,将 synchronize_session 参数的值,设置为 : fetch ,表示应通过独立的查询语句来获取受影响的结果,否则返回的只是本地内存中更新后的结果。这主要的意义,在于一些必须要发送到数据库中校验的数据,在回写时可能会引发验证错误,不能仅仅在本地更新。

    批量插入

    多数数据库的写入操作,都会涉及到海量数据的写入,下面整理了多种批量写入数据的方法。首先 直接排除 遍历 \ _session.add() 的添加方式,不再测试。

    _list = [
    	Blog(blog_name='BLOG_' + str(i), blog_content='content' + str(i), blog_property=BlogProperty())
    	for i in range(1, 10001)
    ]
    
    _dicts = [
        dict(blog_name="NAME"+str(i), blog_content='CONTENT'+str(i)
        for i in range(1, len)
    ]
    
    # 38+s
    _session.add_all(_list)
    
    # 1s- ,不支持 relationship 数据写入
    _session.bulk_save_objects(_list)
    
    # 1s-,不支持 relationship 数据写入 
    _session.bulk_insert_mappings(Blog, _dicts)
    
    # 0.5s,不支持 relationship 数据写入
    _session.execute(Blog.__table__.insert(), _dicts)
    
    # 1.7s,不支持 relationship 数据写入
    _stmt = insert(Blog).values(_dicts)
    _return = _session.execute(_stmt)
    print(_return.rowcount)
    

    整理了目前已知的几种批量写入数据的方式,却没有两全的方式。支持关联数据写入的(add_all),速度最慢;最快的几种方式,均不支持 relationship 数据的写入;所以,如何取舍,只能取决于具体的应用场景。

    同时要注意, add_allbulk_save_objects 方法的参数是 对象列表,后面三种是 字典列表

    五、查询数据

    相比于编辑而言,对于数据库而言,更多的操作是查询,因此 SQLAlchemy 也提供了非常丰富的查询语句。

    查询单一值

    _val1 = _session.query(Blog.blog_name).filter_by(id=blog_id).scalar()
    _val2 = _session.query(func.count(Blog.id)).scalar()
    _val3 = _session.query(func.count('*')).select_from(Blog).scalar()
    

    查询单一值的核心方法是 scalar() ,区别只是在于前面的查询字句。后两句,展示了两种调用系统函数的方法。如果查询到的字句超过1条,会警告异常并返回 None

    查询指定记录

    前面的修改和删除语句,从代码中可以直观的看到,实际上就是使用 query 对象的 filter_by 方法过滤出需要操作的数据,然后进行操作而已。所以查询方法,也就已经如此展现出来了:

    _blog1 = _session.query(Blog.blog_name).filter_by(id=blog_id).first()
    _blog2 = _session.query(Blog.blog_name).filter_by(id=blog_id).one()
    _blog3 = _session.query(Blog.blog_name).filter_by(id=blog_id).one_or_none()
    

    上面查询对象 queryfirst 方法,可以在不同场景下用 one 方法或 one_or_none 方法来替代,其区别在于:

    1. first,当查询结果超过1条时,仅返回第一条;如果没有结果,则返回None;
    2. one,当且仅当查询结果只有一条数据时可用;当超过一条或者没有结果时,会报错;
    3. one_or_none,是one的增强版本,无结果时返回none;超过一条时,依旧报错;
    4. 针对各个简单查询方法的特征,可以自行选择在不同场景中的使用,或者简单的全部使用 first 方法即可。

    连接查询

    当我们打印上面查询到的 Blog 对象,或者从 SQL 打印语句可以看到,在查询 blog 表时,并没有查询设置了外键关系的 blog_property 表,这是因为 relationship 方法默认的链接加载方式是 select ,即:

    blog_property = relationship('BlogProperty', lazy='select')
    

    这表示,当我们使用/打印 _blog.blog_property 属性时,才会执行对应的 SQL 语句。但是,前提是在同一个 Session 中,即:在使用该属性时,查询主键对象的 Session 没有关闭 !!!

    针对于一些范式比较规范,基本需要同时查询的场景,也有其他方式可以解决,即在查询主对象时,默认链接查询外键对象,只需要将 lazy 参数改为 joined 值即可:

    blog_property = relationship('BlogProperty', lazy='joined')
    

    当打印 blog_property 这个属性时,会看到输出结果是一个数组类型,但是理论上这个属性应该是 1:1 的类型,而不是 1:n 的类型,可以通过 uselist 参数来指定该关联不是列表类型:

    blog_property = relationship('BlogProperty', uselist=False)
    

    接下来,又有个嵌套的需要,如果想要在 BlogProperty 对象中注册一个 Blog 实例属性,该如何做?当然并不需要在BlogProperty 中再定义一个 relationship 对象,而是用该对象的 backref 属性来反向注册:

    blog_property = relationship('BlogProperty', lazy='select', uselist=False, backref='blog')
    
    # 其他查询代码……
    _blog = _session.query(Blog).filter_by(id=1).first()
    print(_blog, _blog.blog_property.blog)
    

    可以看到,在 blog_property 中已经反向注册了 blog 对象。在一些文章中,可以看到下面这种用法:

    class Parent(Base):
    	relationship("Children", back_populates="parent")
    
    class Children(Base):
    	relationship("Parent", back_populates="children")
    

    其实,本质上来说 relationship 方法的 backref 参数,只是对这一种写法的简化。

    连接查询动态加载

    为了演示这部分,再次创建一个对象 BlogComments , 用于表示博文的评论:

    class BlogComments(Base):
        __tablename__ = 'blog_comments'
    
        id = Column(Integer, primary_key=True, autoincrement=True, comment='默认主键')
        blog_id = Column(Integer, ForeignKey('blog.id'), nullable=False, comment='外键,博文编号')
        comments_content = Column(String(2000), nullable=False)
        comments_vote = Column(Integer, default=0, server_default='0', comment='投票数')
        create_time = Column(TIMESTAMP, server_default=text('CURRENT_TIMESTAMP'), comment='创建时间')
    

    同时,在 Blog 对象中也增加关系映射:

    class Blog(Base):
    	blog_comments = relationship('BlogComments', lazy='dynamic', order_by={BlogComments.create_time.desc()})
    

    进行查询:

    _blog = _session.query(Blog).filter_by(id=1).first()
    _blog_comment1 = _blog1.blog_comments
    _blog_comment2 = _blog1.blog_comments.all()
    _blog_comment3 = _blog1.blog_comments.limit(10).offset(9).all()
    

    对照前面的属性查询,可以看到当把 lazy 参数设置为 dynamic 后,查询主对象和访问属性时,均不会实际查询数据库。只有当调用该属性的方法时,才会执行真实的 SQL 查询。或者换个更容易理解的方式,即此时,返回的是一个 dynamic query(动态查询) 对象。

    复杂条件查询

    在前面的查询中,我们只使用了单一的条件查询 filter_by() 方法,该方法仅支持键值的相等查询,这很显然不适用于多数情况,因此引入一个更强大的过滤查询方法: filter()

    # 链式查询
    _query = _session.query(Blog).filter(Blog.id == 1).filter(Blog.create_time > '2021-01-01').all()
    
    # 组合查询条件
    _filters = {
        or_(
            and_(
            	Blog.id > 0, Blog.id < 10
            ),
            and_(
            	Blog.id > 11, Blog.id < 20
            )
        )
    }
    _query = _session.query(Blog).filter(_filters).all()
    

    分页查询

    _query = _session.query(Blog).order_by(Blog.create_time.desc()).offset(偏移量).limit(取出量).all()
    _query2 = _session.query(Blog).order_by(Blog.create_time.desc()).slice(下索引, 上索引).all()
    

    六、管理事务

    [思索中]

    七、上下文会话

    数据库链接在打开进行完操作之后,一定要及时关闭。为了减少代码的使用,可以通过 Python 提供的上下文管理以及生成器技术来实现:

    import contextlib
    
    @contextlib.contextmanager
    def open_session(self):
        _engine = create_engine(self.__connectionString, echo=True)
        _DBSession = scoped_session(sessionmaker(bind=_engine))
        _session = _DBSession()
        try:
        	yield _session
        except Exception as ex:
        	_session.rollback()
        return
    
    # 调用上下文:
    with self.open_session() as _session:
    	return _session.query(func.count('*')).scalar()
    

    参考资料

    曾经我以为我是个程序员攻城狮,,现在我发现,必须要要前面加上“广告公司”四字。
  • 相关阅读:
    如何使用Jquery 引入css文件
    html如何绘制带尖角(三角)的矩形
    让HTML标签、DIV、SPAN拥有focus事件和blur事件,聚焦和失焦
    html如何引用另一个html的内容
    HTML中块级元素与内联元素有什么区别 ?
    一个js文件如何加载另外一个js文件
    在线工具-程序员的工具箱-在线Cron表达式生成器
    oracle fm格式化
    html如何让label在div中的垂直方向居中显示?
    服务发现框架选型: Consul、Zookeeper还是etcd ?
  • 原文地址:https://www.cnblogs.com/bashenandi/p/15656512.html
Copyright © 2011-2022 走看看