废话不多说,直接上代码。
目录结构, 由于我也是刚开始学这个框架,只是了解了怎么注册蓝图,JWT的集成,数据库的集成,想了解更多,自行打开官方文档去详细阅读。fastapi官网文档链接
创建一个main.py文件, 我这个是添加了蓝图, 关键字:
from fastapi import FastAPI from text import demo from wx import test_client from sql_conf import models, database models.Base.metadata.create_all(bind=database.engine) app = FastAPI() app.include_router(demo.router, prefix="/api") app.include_router(test_client.router, prefix="/wx") if __name__ == '__main__': import uvicorn uvicorn.run( app='main:app', host="0.0.0.0", port=8082, reload=True, debug=True )
auth.py用来jwt的校验和生成
#! /usr/bin/env python # -*- coding: utf-8 -*- # Eamil: 1922878025@qq.com # @Author: Wyc # @Time: 3:25 下午 import jwt from fastapi import HTTPException, Security, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBasic from passlib.context import CryptContext from datetime import datetime, timedelta class AuthHandler(): security = HTTPBasic() pwd_content = CryptContext(schemes=['bcrypt'], deprecated='auto') secret = 'SECRET' # 密码加密 def get_password_has(self, password): return self.pwd_content.hash(password) # 密码校验 def verify_password(self, plain_password, hashed_password): return self.pwd_content.verify(plain_password, hashed_password) # 生成token def encode_token(self, user_id): payload = { 'exp': datetime.utcnow() + timedelta(days=0, minutes=120), 'iat': datetime.utcnow(), 'sub': user_id } return str(jwt.encode(payload, self.secret, algorithm='HS256')) # token解码 def decode_token(self, token): try: payload = jwt.decode(token, self.secret, algorithms=['HS256']) return payload['sub'] except jwt.ExpiredSignatureError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="签名已过期" ) except jwt.InvalidTokenError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='签名无效!') def auth_wrapper(self, oauth: HTTPAuthorizationCredentials = Security(security)): return self.decode_token(oauth.credentials)
wx,test两个目录,都是单独的一个app。
wx目录下的test_client.py
#! /usr/bin/env python # -*- coding: utf-8 -*- # Eamil: 1922878025@qq.com # @Author: Wyc # @Time: 4:00 下午 from fastapi import status, APIRouter, Depends, Path, Query from fastapi.responses import JSONResponse from sql_conf.database import SessionLocal from typing import Union, Any from sql_conf.models import User from sql_conf.crud import db_create from sqlalchemy.orm import Session from sql_conf.db_session import get_db from .data_model import UserItem, USerLgoin from auth import AuthHandler auth_middle = AuthHandler() router = APIRouter( tags=["wx"], responses={404: {"description": "未找到接口!"}} ) @router.get('/') async def read_main(token_data: Union[str, Any] = Depends(auth_middle.decode_token),): content = {"msg": "hello wx"} return JSONResponse(status_code=status.HTTP_200_OK, content=content) @router.post('/create_user') async def create_user_info( Item: UserItem, db: Session = Depends(get_db)): result = {'code': 0, "message": ""} request_dict = Item.dict() login = request_dict.get('login') email = request_dict.get('email') pwd = request_dict.get("hashed_password") if not login: return JSONResponse(status_code=status.HTTP_201_CREATED, content={"code": 201, "msg": '请输入账号!'}) has_pwd = auth_middle.get_password_has(pwd) save_user_info = { 'email': email, 'login': login, 'hashed_password': has_pwd } comiit_user = db_create(db=db, payload=save_user_info) if comiit_user: result['code'] = 200 result['message'] = '注册成功!' else: result['code'] = 201 result['message'] = '注册失败!' return JSONResponse(status_code=status.HTTP_200_OK, content=result) @router.post('/login_user') async def user_login(Data: USerLgoin, db: Session=Depends(get_db)): result = {} user_dict = Data.dict() user_name = user_dict.get('username') pwd = user_dict.get('password') filter_data = db.query(User).filter_by(login=user_name).first() version_pwd = auth_middle.verify_password(plain_password=pwd, hashed_password=filter_data.hashed_password) print(version_pwd) if version_pwd: token = auth_middle.encode_token(user_id=filter_data.id) print(token) result['code'] = 200 result['msg'] = '登录成功!' result['token'] = token else: result['code'] = 201 result['msg'] = '登录失败!' return JSONResponse(status_code=status.HTTP_200_OK, content=result) @router.get('/get_user/', summary="获取用户信息") async def get_user_info(token_data: Union[str, Any] = Depends(auth_middle.decode_token), db: Session = Depends(get_db)): result = {} filter_user = db.query(User).filter_by(id=int(token_data)).first() if filter_user: result['code'] = 200 result['message'] = '用户信息' result['userinfo'] = { 'login': filter_user.login, 'email': filter_user.email, 'pwd': filter_user.hashed_password } else: result['code'] = 201 result['message'] = "未获取到用户信息!" return JSONResponse(status_code=status.HTTP_200_OK, content=result)
data_model.py 用来创建数据模型
#! /usr/bin/env python # -*- coding: utf-8 -*- # Eamil: 1922878025@qq.com # @Author: Wyc # @Time: 4:02 下午 from pydantic import BaseModel class UserItem(BaseModel): login: str email: str hashed_password: str class USerLgoin(BaseModel): username: str password: str
text目录demo.py文件
#! /usr/bin/env python # -*- coding: utf-8 -*- # Eamil: 1922878025@qq.com # @Author: Wyc # @Time: 3:48 下午 from typing import Optional from fastapi import FastAPI, Body, status, APIRouter from pydantic import BaseModel from fastapi.responses import JSONResponse router = APIRouter( tags=["items"], responses={404: {"description": "Not found"}} ) class Item(BaseModel): name: str price: float is_offer: Optional[bool] = None @router.get('/') def index(): content = {"hello": "world"} return JSONResponse(status_code=status.HTTP_200_OK, content=content) @router.get("/items/{item_id}") def read_item(item_id: int, q: Optional[str] = None): return {'itrm_id': item_id, "q": q} @router.put("/items/{item_id}") def update_item(item_id: int, item: Item): print(item_id) print(item) return {"item_name": item.name, "item_id": item_id}
sql_conf目录用于配置数据库
创建 crud.py, database.py, db_session.py, models.py
crud.py 数据库的增删改查
#! /usr/bin/env python # -*- coding: utf-8 -*- # Eamil: 1922878025@qq.com # @Author: Wyc # @Time: 4:52 下午 from sql_conf import models from sqlalchemy.orm import Session def db_create(db: Session, payload): try: db_user = models.User(**payload) db.add(db_user) db.flush() db.commit() return db_user except Exception as e: print("错误: {}".format(e))
数据库的连接 databases.py
#! /usr/bin/env python # -*- coding: utf-8 -*- # Eamil: 1922878025@qq.com # @Author: Wyc # @Time: 4:52 下午 from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker db_conf = { 'user': 'user', 'password': '密码', 'host': 'ip', 'port': '3306', 'name': 'demo' } # 数据库访问地址 SQLALCHEMY_DATABASE_URL = "mysql+pymysql://{user}:{password}@{host}:{port}/{name}".format( user=db_conf['user'], password=db_conf['password'], host=db_conf['host'], port=db_conf['port'], name=db_conf['name'] ) # 启动引擎 engine = create_engine( SQLALCHEMY_DATABASE_URL ) # 创建会话 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # 数据模型基类 Base = declarative_base()
models.py创建模型
#! /usr/bin/env python # -*- coding: utf-8 -*- # Eamil: 1922878025@qq.com # @Author: Wyc # @Time: 4:52 下午 from sqlalchemy import Boolean, Column, ForeignKey, Integer, String from sqlalchemy.orm import relationship from .database import Base class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True, index=True) login = Column(String(length=60), index=True) email = Column(String(length=255), unique=True, index=True, ) hashed_password = Column(String(length=255)) is_active = Column(Boolean, default=True)
db_session.py 数据库会话信息
#! /usr/bin/env python # -*- coding: utf-8 -*- # Eamil: 1922878025@qq.com # @Author: Wyc # @Time: 2:59 下午 from .database import SessionLocal def get_db(): db = "" try: db = SessionLocal() yield db finally: db.close()