zoukankan      html  css  js  c++  java
  • FastAPI 安全机制(二) 基于OAuth2和JWT的Token认证机制(一)生成token

    作者:麦克煎蛋   出处:https://www.cnblogs.com/mazhiyong/ 转载请保留这段声明,谢谢!

    JWT简介

    基于JWT的Token认证机制实现(一)概念 

    基于JWT的Token认证机制实现(二)认证过程

    OAuth2PasswordBearer

    OAuth2PasswordBearer是接收URL作为参数的一个类:客户端会向该URL发送usernamepassword参数,然后得到一个token值。

    OAuth2PasswordBearer并不会创建相应的URL路径操作,只是指明了客户端用来获取token的目标URL。

    当请求到来的时候,FastAPI会检查请求的Authorization头信息,如果没有找到Authorization头信息,或者头信息的内容不是Bearer token,它会返回401状态码(UNAUTHORIZED)。

    我们可以看一下OAuth2PasswordBearer的源码:

    class OAuth2PasswordBearer(OAuth2):
        def __init__(
            self,
            tokenUrl: str,
            scheme_name: str = None,
            scopes: dict = None,
            auto_error: bool = True,
        ):
            if not scopes:
                scopes = {}
            flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes})
            super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error)
    
        async def __call__(self, request: Request) -> Optional[str]:
            authorization: str = request.headers.get("Authorization")
            scheme, param = get_authorization_scheme_param(authorization)
    if not authorization or scheme.lower() != "bearer":
    if self.auto_error:
                    raise HTTPException(
                        status_code=HTTP_401_UNAUTHORIZED,
                        detail="Not authenticated",
                        headers={"WWW-Authenticate": "Bearer"},
                    )
                else:
                    return None
            return param

    我们需要安装PyJWT来产生和校验JWT token。

    pip install pyjwt

    我们也需要安装python-multipart,因为OAuth2需要通过表单数据来发送usernamepassword信息。

    pip install python-multipart

    获取token的代码示例如下:

    from datetime import datetime, timedelta
    from typing import Optional
    
    from fastapi import Depends, FastAPI
    from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
    import jwt
    from pydantic import BaseModel
    
    # to get a string like this run:
    # openssl rand -hex 32
    SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
    ALGORITHM = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES = 30
    
    
    class Token(BaseModel):
        access_token: str
        token_type: str
    
    
    app = FastAPI()
    
    # oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
    
    
    # 生成token
    def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt
    # 请求接口 @app.post(
    "/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": "test"}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}

    可以使用postman或者交互式文档来测试这个接口。

    如果单纯作为API使用来获取token值,下面这行代码暂时是用不到的:

    # oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
  • 相关阅读:
    27. Remove Element
    26. Remove Duplicates from Sorted Array
    643. Maximum Average Subarray I
    674. Longest Continuous Increasing Subsequence
    1. Two Sum
    217. Contains Duplicate
    448. Find All Numbers Disappeared in an Array
    566. Reshape the Matrix
    628. Maximum Product of Three Numbers
    UVa 1349 Optimal Bus Route Design (最佳完美匹配)
  • 原文地址:https://www.cnblogs.com/mazhiyong/p/13214643.html
Copyright © 2011-2022 走看看