zoukankan      html  css  js  c++  java
  • FastAPI安全系列(三) 基于Hash Password和JWT Bearer Token的OAuth2 .0认证

    一、准备

    1、python-jose

    JavaScript对象签名和加密(JOSE)技术。

    • JSON Web Signatures(JWS)
    • JSON Web Encryption(JWK)
    • JSON Web Key(JWK)
    • JSON Web Algorithms(JWA)

    使用各种算法对内容进行加密和签名,其中JSON Web Signatures是对JSON编码对象进行签名,然后将其编成复杂的URL安全字符串。支持的算法有:

    Algorithm Value Digital Signature or MAC Algorithm
    HS256 HMAC using SHA-256 hash algorithm
    HS384 HMAC using SHA-384 hash algorithm
    HS512 HMAC using SHA-512 hash algorithm
    RS256 RSASSA using SHA-256 hash algorithm
    RS384 RSASSA using SHA-384 hash algorithm
    RS512 RSASSA using SHA-512 hash algorithm
    ES256 ECDSA using SHA-256 hash algorithm
    ES384 ECDSA using SHA-384 hash algorithm
    ES512 ECDSA using SHA-512 hash algorithm

    在使用前先进行安装包:

    pip insstall python-jose

    然后使用:

    >>> from jose import jwt
    >>> token = jwt.encode({"key":"value"},"secret",algorithm="HS256")
    >>> token
    'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJrZXkiOiJ2YWx1ZSJ9.FG-8UppwHaFp1LgRYQQeS6EDQF7_6-bMFegNucHjmWg'
    >>> jwt.decode(token,"secret",algorithms=["HS256"])
    {'key': 'value'}

    2、passlib

      passlib是一个用于python2 & 3的密码散列包,它提供了超过30多种密码散列算法,同时也是一个管理现有密码散列的框架。

      passlib可以大致分为四类:

    • Password Hashes
    • Password Contexts
    • Two-Factor Authentication
    • Application Helpers

    使用这个模块,需要先安装passlib:

    pip install passlib

    然后这里以Password Hashes为例:

    # 加密
    >>> from passlib.hash import pbkdf2_sha256
    >>> hash = pbkdf2_sha256.hash("password")
    >>> hash
    '$pbkdf2-sha256$29000$8l4rpTRmDGEsRUhJac05Bw$eajW7PFThCDyQ2DiCbVIeaMw6pF/bLXj5XkLlI3dOY0'
    
    # 进行验证
    >>> pbkdf2_sha256.verify("password", hash)
    True

    二、Hash Password和JWT Bearer Token认证

    (一)流程

    •  客户端发送用户名和密码到生成token的路径操作
    • 服务器路径操作函数生成对应的JWT Token
    • 返回JWT Token到客户端
    • 客户端发送请求,并且请求头中携带对应的Token
    • 服务端检查JWT Token,并且从Token中得到用户信息
    • 服务端将用户信息返回到客户端

    (二)获取jwt token

    from pydantic import BaseModel
    from fastapi import FastAPI, Depends, HTTPException, status
    from typing import Optional
    from passlib.context import CryptContext
    from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
    from datetime import datetime, timedelta
    from jose import jwt, JWTError
    
    app = FastAPI()
    
    fake_users_db = {
        "johndoe": {
            "username": "johndoe",
            "full_name": "John Doe",
            "email": "johndoe@example.com",
            "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
            "disabled": False,
        }
    }
    
    SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
    ALGORITHM = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTE = 30
    
    
    class User(BaseModel):
        username: str
        email: Optional[str] = None
        full_name: Optional[str] = None
        disabled: Optional[bool] = None
    
    
    class UserInDB(User):
        hashed_password: str
    
    
    class Token(BaseModel):
        """返回给用户的Token"""
        access_token: str
        token_type: str
    
    
    oauth2_schema = OAuth2PasswordBearer(tokenUrl="/jwt/token")
    pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
    
    
    def verity_password(plain_password: str, hashed_password: str):
        """对密码进行校验"""
        return pwd_context.verify(plain_password, hashed_password)
    
    
    def jwt_get_user(db, username: str):
        if username in db:
            user_dict = db[username]
            return UserInDB(**user_dict)
    
    
    def jwt_authenticate_user(db, username: str, password: str):
        """根据前台传递的用户名来从数据库中取出用户,然后进行密码验证"""
        user = jwt_get_user(db=db, username=username)
        if not user:
            return False
        if not verity_password(plain_password=password, hashed_password=user.hashed_password):
            return False
        return user
    
    
    def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
        """用户验证成功,并且取出用户信息,然后据此创建jwt token"""
        to_encode = data.copy()
        if expires_delta:
            expire = datetime.utcnow() + expires_delta
        else:
            expire = datetime.utcnow() + timedelta(minutes=15)
        to_encode.update({"exp": expire})
        encoded_jwt = jwt.encode(claims=to_encode, key=SECRET_KEY, algorithm=ALGORITHM)
        return encoded_jwt
    
    
    @app.post("/jwt/token", response_model=Token)
    async def login_jwt_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
        user = jwt_authenticate_user(db=fake_users_db, username=form_data.username, password=form_data.password)
        if not user:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Incorrect username or password",
                headers={"WWW-Authenticate": "Bearer"}
            )
        access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTE)
        access_token = create_access_token(
            data={"sub": user.username},
            expires_delta=access_token_expires
        )
        return {"access_token": access_token, "token_type": "bearer"}

    在交互文档中输入用户名和密码获取jwt token:

    (三)获取活跃用户

    from pydantic import BaseModel
    from fastapi import FastAPI, Depends, HTTPException, status
    from typing import Optional
    from passlib.context import CryptContext
    from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
    from datetime import datetime, timedelta
    from jose import jwt, JWTError
    
    app = FastAPI()
    
    fake_users_db = {
        "johndoe": {
            "username": "johndoe",
            "full_name": "John Doe",
            "email": "johndoe@example.com",
            "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
            "disabled": False,
        }
    }
    
    SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
    ALGORITHM = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTE = 30
    
    
    class User(BaseModel):
        username: str
        email: Optional[str] = None
        full_name: Optional[str] = None
        disabled: Optional[bool] = None
    
    
    class UserInDB(User):
        hashed_password: str
    
    
    class Token(BaseModel):
        """返回给用户的Token"""
        access_token: str
        token_type: str
    
    
    oauth2_schema = OAuth2PasswordBearer(tokenUrl="/jwt/token")
    pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
    
    
    def verity_password(plain_password: str, hashed_password: str):
        """对密码进行校验"""
        return pwd_context.verify(plain_password, hashed_password)
    
    
    def jwt_get_user(db, username: str):
        if username in db:
            user_dict = db[username]
            return UserInDB(**user_dict)
    
    
    def jwt_authenticate_user(db, username: str, password: str):
        """根据前台传递的用户名来从数据库中取出用户,然后进行密码验证"""
        user = jwt_get_user(db=db, username=username)
        if not user:
            return False
        if not verity_password(plain_password=password, hashed_password=user.hashed_password):
            return False
        return user
    
    
    def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
        """用户验证成功,并且取出用户信息,然后据此创建jwt token"""
        to_encode = data.copy()
        if expires_delta:
            expire = datetime.utcnow() + expires_delta
        else:
            expire = datetime.utcnow() + timedelta(minutes=15)
        to_encode.update({"exp": expire})
        encoded_jwt = jwt.encode(claims=to_encode, key=SECRET_KEY, algorithm=ALGORITHM)
        return encoded_jwt
    
    
    @app.post("/jwt/token", response_model=Token)
    async def login_jwt_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
        user = jwt_authenticate_user(db=fake_users_db, username=form_data.username, password=form_data.password)
        if not user:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Incorrect username or password",
                headers={"WWW-Authenticate": "Bearer"}
            )
        access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTE)
        access_token = create_access_token(
            data={"sub": user.username},
            expires_delta=access_token_expires
        )
        return {"access_token": access_token, "token_type": "bearer"}
    
    
    async def jwt_get_current_user(token: str = Depends(oauth2_schema)):
        credentials_exception = HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid credentials",
            headers={"WWW-Authenticate": "Bearer"}
        )
        try:
            payload = jwt.decode(token=token, key=SECRET_KEY, algorithms=[ALGORITHM])
            username = payload.get("sub")
            if not username:
                raise credentials_exception
        except JWTError:
            raise credentials_exception
        user = jwt_get_user(db=fake_users_db, username=username)
        if not user:
            raise credentials_exception
        return user
    
    
    async def jwt_get_current_active_user(current_user: User = Depends(jwt_get_current_user)):
        if current_user.disabled:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="Inactive user"
            )
        return current_user
    
    
    @app.get("/jwt/users/me")
    async def jwt_read_users_me(current_user: User = Depends(jwt_get_current_active_user)):
        return current_user
    作者:iveBoy
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须在文章页面给出原文连接,否则保留追究法律责任的权利。
  • 相关阅读:
    2016工作目标
    Android简化xml sax解析
    dynamic-load-apk插件原理整理
    Spring boot 拾遗 —— Spring Cache 扩展 Duration
    Spring boot 拾遗 —— Spring Cache 使用 Jackson 与 自定义 TTL
    Spring boot 拾遗 —— 错误验证
    简化 Java 代码 ——(一)使用 PropertyMapper
    Java 开源项目 OpenFeign —— feign 结合 SpringBoot
    Java 开源项目 OpenFeign —— feign 的基本使用
    Java 定时任务 Quartz (三)—— 并发
  • 原文地址:https://www.cnblogs.com/shenjianping/p/14870309.html
Copyright © 2011-2022 走看看