zoukankan      html  css  js  c++  java
  • FastAPI(44)- 操作关系型数据库

    ORM

    • FastAPI 可与任何数据库和任何样式的库配合使用并和数据库通信
    • object-relational mapping 对象关系映射
    • ORM 具有在代码和数据库表(关系)中的对象之间进行转换(映射)的工具
    • 使用 ORM,通常会创建一个表示 SQL 数据表的类,该类的每个属性都表示一个列,具有名称和类型

    小栗子

    • Pet 类可以表示 SQL 表 pets
    • 并且 Pet 类的每个实例对象代表数据库中的一行数据
    • 例如,对象 orion_cat(Pet 的一个实例)可以具有属性 orion_cat.type,用于列类型,属性的值可以是:猫

    项目架构

    .
    └── sql_app
        ├── __init__.py
        ├── curd.py
        ├── database.py
        ├── main.py
        ├── models.py
        └── schemas.py

      

    前提

    需要先安装 sqlalchemy

    pip install sqlalchemy

    使用 sqlite

    • 后面的栗子,暂时跟着官网,先使用 sqlite 数据库来演示
    • 后面有时候再通过 Mysql 来写多一篇文章

    database.py 代码

    # 1、导入 sqlalchemy 部分的包
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import sessionmaker
    
    # 2、声明 database url
    
    SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
    # SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
    
    # 3、创建 sqlalchemy 引擎
    engine = create_engine(
        url=SQLALCHEMY_DATABASE_URL,
        connect_args={"check_same_thread": False}
    )
    
    # 4、创建一个 database 会话
    session = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    
    # 5、返回一个 ORM Model
    Base = declarative_base()

    声明 database 连接 url

    SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
    # SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db" 

    第一行是 slite 连接 url

    其他数据库连接 url 的写法

    # sqlite-pysqlite 库
    sqlite+pysqlite:///file_path
    
    # mysql-mysqldb 库
    mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
      
    # mysql-pymysql 库
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
      
    # mysql-mysqlconnector 库
    mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
      
    # oracle-cx_Oracle 库
    oracle+cx_oracle://user:pass@hostname:port[/dbname][?service_name=<service>[&key=value&key=value...]]
    
    # postgresql-pypostgresql 库
    postgresql+pypostgresql://user:password@host:port/dbname[?key=value&key=value...]
    
    # SQL Server-PyODBC 库
    mssql+pyodbc://<username>:<password>@<dsnname>

    创建一个数据库引擎

    engine = create_engine(
        url=SQLALCHEMY_DATABASE_URL,
        connect_args={"check_same_thread": False}
    )
    •  {"check_same_thread": False} 仅适用于 SQlite,其他数据库不需要用到
    • 默认情况下,SQLite 将只允许一个线程与其通信,假设每个线程只处理一个独立的请求
    • 这是为了防止被不同的事物(对于不同的请求)共享相同的连接
    • 但是在 FastAPI 中,使用普通函数 (def) 可以针对同一请求与数据库的多个线程进行交互,因此需要让 SQLite 知道它应该允许使用多线程
    • 需要确保每个请求在依赖项中都有自己的数据库连接会话,因此不需要设置为同一个线程

    创建一个数据库会话

    SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    • SessionLocal 类的每个实例都是一个数据库会话
    • 但 sessionmaker 本身还不是数据库会话
    • 但是一旦创建了 SessionLocal 类的实例,这个实例就会成为实际的数据库会话
    • 将其命名为 SessionLocal ,方便区分从 SQLAlchemy 导入的 Session
    • 稍后将使用 Session(从 SQLAlchemy 导入的那个)

    创建一个 ORM 模型基类

    Base = declarative_base()

    后面会通过继承这个 Base 类,来创建每个数据库 Model,也称为 ORM Model

    models.py 代码

    from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
    from sqlalchemy.orm import relationship
    
    from .database import Base
    
    
    class User(Base):
        # 1、表名
        __tablename__ = "users"
    
        # 2、类属性,每一个都代表数据表中的一列
        # Column 就是列的意思
        # Integer、String、Boolean 就是数据表中,列的类型
        id = Column(Integer, primary_key=True, index=True, autoincrement=True)
        email = Column(String, unique=True, index=True)
        hashed_password = Column(String)
        is_active = Column(Boolean, default=True)
    
        items = relationship("Item", back_populates="owner")
    
    
    class Item(Base):
        __tablename__ = "items"
    
        id = Column(Integer, primary_key=True, index=True, autoincrement=True)
        title = Column(String, index=True)
        description = Column(String, index=True)
        owner_id = Column(Integer, ForeignKey("users.id"))
    
        owner = relationship("User", back_populates="items")

    注意:有 autoincrement 就不要用 default 了哈 

    Column

    列,一个属性代表数据表中的一列

    常用参数

    参数 作用
    primary_key 如果设为 True ,这列就是表的主键
    unique 如果设为 True ,这列不允许出现重复的值
    index 如果设为 True ,为这列创建索引,提升查询效率
    nullable
    • 如果设为 True ,这列允许使用空值;
    • 如果设为 False ,这列不允许使用空值
    default 为这列定义默认值

    autoincrement

    如果设为 True ,这列自增

    String、Integer、Boolean

    代表数据表中每一列的数据类型

    schemas.py 代码

    背景

    为了避免混淆 SQLAlchemy 模型和 Pydantic 模型之间,将使用文件 models.py 编写 SQLAlchemy 模型和文件 schemas.py 编写 Pydantic 模型

    实际代码

    from typing import List, Optional
    
    from pydantic import BaseModel
    
    
    # Item 的基类,表示创建和查询 Item 时共有的属性
    class ItemBase(BaseModel):
        title: str
        description: Optional[str] = None
    
    
    # 创建 Item 时的 Model
    class ItemCreate(ItemBase):
        pass
    
    
    # 查询 Item 时的 Model
    class Item(ItemBase):
        id: int
        owner_id: int
    
        # 向 Pydantic 提供配置
        class Config:
            #  orm_mode 会告诉 Pydantic 模型读取数据,即使它不是字典,而是 ORM 模型(或任何其他具有属性的任意对象)
            orm_mode = True
    
    
    class UserBase(BaseModel):
        email: str
    
    
    class UserCreate(UserBase):
        password: str
    
    
    class User(UserBase):
        id: int
        is_active: bool
        items: List[Item] = []
    
        class Config:
            orm_mode = True

    ItemBase、UserBase

    基类,声明在创建或读取数据时共有的属性

    ItemCreate、UserCreate

    创建数据时使用的 Model

    Item、User

    读取数据时使用的 Model

    orm_mode

    class Config:
       orm_mode = True
    • 这是一个 Pydantic 配置项
    • orm_mode 会告诉 Pydantic 模型读取数据,即使它不是字典,而是 ORM 模型(或任何其他具有属性的任意对象)
    # 正常情况
    id = data["id"]
    
    # 还会尝试从对象获取属性
    id = data.id

    设置了 orm_mode,Pydantic 模型与 ORM 就兼容了,只需在路径操作的 response_model 参数中声明它即可

    orm_mode 的技术细节

    • SQLAlchemy 默认情况下 lazy loading 懒加载,即需要获取数据时,才会主动从数据库中获取对应的数据
    • 比如获取属性 current_user.items ,SQLAlchemy 会从 items 表中获取该用户的 item 数据,但在这之前不会主动获取

    如果没有 orm_mode

    • 从路径操作中返回一个 SQLAlchemy 模型,它将不会包括关系数据(比如 user 中有 item,则不会返回 item,后面再讲实际的栗子)
    • 在 orm_mode 下,Pydantic 会尝试从属性访问它要的数据,可以声明要返回的特定数据,它甚至可以从 ORM 中获取它

    curd.py 代码

    作用

    • 主要用来编写与数据库交互的函数,增删改查,方便整个项目不同地方都能进行复用
    • 并且给这些函数添加专属的单元测试

    实际代码

    代码只实现了查询和创建

    1. 根据 id 查询 user
    2. 根据 email 查询 user
    3. 查询所有 user
    4. 创建 user
    5. 查询所有 item
    6. 创建 item
    from sqlalchemy.orm import Session
    from .models import User, Item
    from .schemas import UserCreate, ItemCreate
    
    
    # 根据 id 获取 user
    def get_user(db: Session, user_id: int):
        return db.query(User).filter(User.id == user_id).first()
    
    
    # 根据 email 获取 user
    def get_user_by_email(db: Session, email: str):
        return db.query(User).filter(User.email == email).first()
    
    
    # 获取所有 user
    def get_users(db: Session, size: int = 0, limit: int = 100):
        return db.query(User).offset(size).limit(limit).all()
    
    
    # 创建 user,user 类型是 Pydantic Model
    def create_user(db: Session, user: UserCreate):
        fake_hashed_password = user.password + "superpolo"
        # 1、使用传进来的数据创建 SQLAlchemy Model 实例对象
        db_user = User(email=user.email, hashed_password=fake_hashed_password)
        # 2、将实例对象添加到数据库会话 Session 中
        db.add(db_user)
        # 3、将更改提交到数据库
        db.commit()
        # 4、刷新实例,方便它包含来自数据库的任何新数据,比如生成的 ID
        db.refresh(db_user)
        return db_user
    
    
    # 获取所有 item
    def get_items(db: Session, size: int = 0, limit: int = 100):
        return db.query(Item).offset(size).limit(limit).all()
    
    
    # 创建 item,item 类型是 Pydantic Model
    def create_item(db: Session, item: ItemCreate, user_id: int):
        db_item = Item(**item.dict(), owner_id=user_id)
        db.add(db_item)
        db.commit()
        db.refresh(db_item)
        return db_item

    create_user、create_item

    函数内的操作步骤如下

    # 1、使用传进来的数据创建 SQLAlchemy Model 实例对象
    db_user = User(email=user.email, hashed_password=fake_hashed_password)
    
    # 2、将实例对象添加到数据库会话 Session 中
    db.add(db_user)
    
    # 3、将更改提交到数据库
    db.commit()
    
    # 4、刷新实例,方便它包含来自数据库的任何新数据,比如生成的 ID
    db.refresh(db_user)

    main.py 代码 

    from typing import List
    import uvicorn
    from fastapi import Depends, FastAPI, HTTPException, status, Path, Query, Body
    from sqlalchemy.orm import Session
    from models import Base
    from schemas import User, UserCreate, ItemCreate, Item
    from database import SessionLocal, engine
    import curd
    
    Base.metadata.create_all(bind=engine)
    
    app = FastAPI()
    
    
    # 依赖项,获取数据库会话对象
    def get_db():
        db = SessionLocal()
        try:
            yield db
        finally:
            db.close()
    
    
    # 创建用户
    @app.post("/users", response_model=User)
    async def create_user(user: UserCreate, db: Session = Depends(get_db)):
        # 1、先查询用户是否有存在
        db_user = curd.get_user_by_email(db, user.email)
        if db_user:
            raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="user 已存在")
        res_user = curd.create_user(db, user)
        return res_user
    
    
    # 根据 user_id 获取用户
    @app.get("/user_id/{user_id}", response_model=User)
    async def get_user(user_id: int = Path(...), db: Session = Depends(get_db)):
        return curd.get_user(db, user_id)
    
    
    # 根据 email 获取用户
    @app.get("/user_email/{email}", response_model=User)
    async def get_user_by_email(email: str = Path(...), db: Session = Depends(get_db)):
        return curd.get_user_by_email(db, email)
    
    
    # 获取所有用户
    @app.get("/users_all/", response_model=List[User])
    async def get_users(skip: int = Query(0),
                        limit: int = Query(100),
                        db: Session = Depends(get_db)):
        return curd.get_users(db, skip, limit)
    
    
    # 创建 item
    @app.post("/users/{user_id}/items", response_model=Item)
    async def get_user_item(user_id: int = Path(...), item: ItemCreate = Body(...), db: Session = Depends(get_db)):
        return curd.create_user_item(db, item, user_id)
    
    
    # 获取所有 item
    @app.get("/items/", response_model=List[Item])
    async def get_items(skip: int = Query(0),
                        limit: int = Query(100),
                        db: Session = Depends(get_db)):
        return curd.get_items(db, skip, limit)
    
    
    if __name__ == "__main__":
        uvicorn.run(app="main:app", host="127.0.0.1", port=8080, reload=True, debug=True)

    依赖项

    def get_db():
        db = SessionLocal()
        try:
            yield db
        finally:
            db.close()
    • 每个请求都有一个独立的数据库会话(SessionLocal)
    • 在请求完成后会自动关闭它
    • 然后下一个请求来的时候,会创建一个新会话

    声明依赖项

    async def create_user(user: UserCreate, db: Session = Depends(get_db)) 
    • SessionLocal 是 sessionmaker() 创建的,是 SQLAlchemy Session 的代理
    • 通过声明 db: Session ,IDE 就可以提供智能代码提示啦

    使用中间件 middleware 代替依赖项声明数据库会话

    # 中间件
    @app.middleware("http")
    async def db_session_middleware(request: Request, call_next):
        # 默认响应
        response = Response("Internal server error", status_code=500)
        try:
            request.state.db = SessionLocal()
            response = await call_next(request)
        finally:
            # 关闭数据库会话
            request.state.db.close()
        return response
    
    
    # 依赖项,获取数据库会话对象
    def get_db(request: Request):
        return request.state.db

    request.state

    • request.state 是每个 Request 对象的一个属性
    • 它用于存储附加到请求本身的任意对象,例如本例中的数据库会话 db
    • 也就是说,我不叫 db,叫 sqlite_db 也可以,只是一个属性名

    使用中间件 middleware 和使用 yield 的依赖项的区别

    • 中间件需要更多的代码,而且稍微有点复杂
    • 中间件必须是一个 async 函数,而且需要有 await 的代码,可能会阻塞程序并稍稍降低性能
    • 每个请求运行的时候都会先运行中间件,所以会为每个请求都创建一个数据库连接,即使某个请求的路径操作函数并不需要和数据库交互

    建议

    • 创建数据库连接对象最好还是用带有 yield 的依赖项来完成
    • 在其他使用场景也是,能满足需求的前提下,最好用带有 yield 的依赖项来完成
  • 相关阅读:
    logstash定义表达式
    redis
    HTTP 错误 500.19 请求的页面的相关配置数据无效 解决办法
    redis sentinel集群
    Elasticsearch6.0及其head插件安装
    Elasticsearch5.2.0部署过程的坑
    Centos7远程桌面 vnc/vnc-server的设置
    python-day27--configparser模块
    python-day27--hashlib模块-摘要算法
    python-day21--os模块
  • 原文地址:https://www.cnblogs.com/poloyy/p/15361037.html
Copyright © 2011-2022 走看看