zoukankan      html  css  js  c++  java
  • FastAPI 安全机制(三) 基于OAuth2和JWT的Token认证机制(二)用户登陆及验证

    作者:麦克煎蛋   出处:https://www.cnblogs.com/mazhiyong/ 转载请保留这段声明,谢谢!

    下面我们模拟用户登陆的过程,具体讲解下登陆验证的流程,并完善代码逻辑。

    密码哈希

    为了数据安全,我们利用PassLib对入库的用户密码进行加密处理,推荐的加密算法是"Bcrypt"。我们需要安装依赖包:

    pip install passlib
    pip install bcrypt

    PassLib也可以对密码进行校验。

    用户登陆

    用户通过终端发送usernamepassword到后端。后端收到数据后,进行一下操作:

    1、用户信息校验:查询当前系统是否存在该用户,以及密码是否正确。

    2、如果用户存在,则生成JWT token并返回,JWT payload中可以携带自定义数据。

    from datetime import datetime, timedelta
    from typing import Optional
    
    from fastapi import Depends, FastAPI, HTTPException, status
    from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
    import jwt
    from pydantic import BaseModel
    from passlib.context import CryptContext
    
    
    # to get a string like this run:
    # openssl rand -hex 32
    SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
    ALGORITHM = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES = 30
    
    
    # 模拟数据库数据 fake_users_db
    = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "johndoe@example.com", "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, } } class Token(BaseModel): access_token: str token_type: str class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None class UserInDB(User): hashed_password: str pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token") app = FastAPI() # 校验密码 def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) # 密码哈希 def get_password_hash(password): return pwd_context.hash(password) # 模拟从数据库读取用户信息 def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) # 用户信息校验:username和password分别校验 def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return user # 生成token,带有过期时间 def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt @app.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): # 首先校验用户信息 user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) # 生成并返回token信息 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": "test"}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}

    数据请求

    终端获取到token信息后,必须在后续请求的Authorization头信息中带有Bearer token,才能被允许访问。

    我们添加一个校验函数,对请求的合法性进行校验,读取token内容解析并进行验证:

    async def get_current_user(token: str = Depends(oauth2_scheme)):
        credentials_exception = HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
        try:
            payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
            username: str = payload.get("sub")
            if username is None:
                raise credentials_exception
            token_data = TokenData(username=username)
        except PyJWTError:
            raise credentials_exception
        user = get_user(fake_users_db, username=token_data.username)
        if user is None:
            raise credentials_exception
        return user

    添加数据请求接口,依赖上面的校验函数:

    @app.get("/users/me/", response_model=User)
    async def read_users_me(current_user: User = Depends(get_current_user)):
        return current_user

    以下是Postman测试结果:

    参照以上业务流程,我们可以实现其他的需要权限校验的数据请求接口。

    个人认为,如果只是单纯的提供开发接口,通过JWT的校验机制已经足以满足业务需求。

  • 相关阅读:
    洛谷P1455 搭配购买(并查集)
    洛谷 P2078 朋友(并查集)
    TheZealous的集训日常之 离线算法与在线算法区别
    综合练习: Python自动化测试--从Excel读取数据并录入mysql
    读取excel/CSV/json数据
    Python 日志
    python 操作数据库
    python requests 接口: 调用百度开发者平台图片文字识别接口
    接口测试理论 2
    接口测试理论 1
  • 原文地址:https://www.cnblogs.com/mazhiyong/p/13219300.html
Copyright © 2011-2022 走看看