zoukankan      html  css  js  c++  java
  • FastAPI 学习之路(五十八)对之前的代码进行优化

    我们之前登录认证的一些内容都直接写入到代码中,我们现在统一的给放到config文件中。

    SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
    ALGORITHM = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES = 30

    我们需要把uer.py修改

    from config import *
    #对应的方法
    def create_access_token(data: dict):
        to_encode = data.copy()
        encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
        return encoded_jwt
    
    
    async def get_cure_user(request: Request, token: Optional[str] = Header(...)) -> UserBase:
        credentials_exception = HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="验证失败"
        )
        credentials_FOR_exception = HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="用户未登录或者登陆token已经失效"
        )
        try:
            payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
            username: str = payload.get("sub")
            if username is None:
                raise credentials_exception
            useris = await request.app.state.redis.get(username)
            if not useris and useris!=token:
                raise credentials_FOR_exception
            user = UserBase(email=username)
            return user
        except JWTError:
            raise credentials_exception

     在存储redis的地方也需要修改

    request.app.state.redis.set(user.email, token,
     expire=ACCESS_TOKEN_EXPIRE_MINUTES * 60)

    我们把main.py的redis相关的也放在配置里

    #config.py
    redishost='127.0.0.1'
    redisport='6379'
    redisdb='0'

     main.py修改

    async def get_redis_pool() -> Redis:
        redis = await create_redis_pool(f"redis://:@"+redishost+":"+redisport+"/"+redisport+"?encoding=utf-8")
        return redis

    调整之前的websocket和file相关的代码。统一放在routers文件下,创建

    websoocket.py。

    from fastapi import APIRouter,WebSocket,Depends,WebSocketDisconnect
    from typing import List, Dict
    from routers.user import get_cure_user
    socketRouter = APIRouter()
    class ConnectionManager:
        def __init__(self):
            # 存放**的链接
            self.active_connections: List[Dict[str, WebSocket]] = []
    
        async def connect(self, user: str, ws: WebSocket):
            # 链接
            await ws.accept()
            self.active_connections.append({"user": user, "ws": ws})
    
        def disconnect(self, user: str, ws: WebSocket):
            # 关闭时 移除ws对象
            self.active_connections.remove({"user": user, "ws": ws})
    
        async def send_other_message_json(self, message: dict, user: str):
            # 发送个人消息
            for connection in self.active_connections:
                if connection["user"] == user:
                    await connection['ws'].send_json(message)
    
        async def broadcast_json(self, data: dict):
            # 广播消息
            for connection in self.active_connections:
                await connection['ws'].send_json(data)
    
        @staticmethod
        async def send_personal_message(message: str, ws: WebSocket):
            # 发送个人消息
            await ws.send_text(message)
    
        async def send_other_message(self, message: str, user: str):
            # 发送个人消息
            for connection in self.active_connections:
                if connection["user"] == user:
                    await connection['ws'].send_text(message)
    
        async def broadcast(self, data: str):
            # 广播消息
            for connection in self.active_connections:
                await connection['ws'].send_text(data)
    
    
    manager = ConnectionManager()
    
    @socketRouter.websocket("/ws/{user}/")
    async def websocket_many_point(
            websocket: WebSocket,
            user: str,
            cookie_or_token: str = Depends(get_cure_user),
    ):
        await manager.connect(user, websocket)
        try:
            while True:
                data = await websocket.receive_json()
                senduser = data['username']
                if senduser:
                    await manager.send_other_message_json(data, senduser)
                else:
                    await  manager.broadcast_json(data)
        except WebSocketDisconnect as e:
            manager.disconnect(user, websocket)

     对于file相关的也放在files文件下

    from fastapi import APIRouter, File, UploadFile
    from typing import List
    fileRouter = APIRouter()
    @fileRouter.post("/uploadfile/")
    async def upload_file(fileS: List[UploadFile] = File(...)):
        return {"filenames": [file.filename for file in fileS]}

    然后我们在main.py优化

    from routers.websoocket import socketRouter
    from routers.file import  fileRouter
    #我们在最后注册这个router
    app.include_router(socketRouter, prefix="/socket", tags=['socket'])
    app.include_router(fileRouter, prefix="/file", tags=['file'])

    调整完整后,我们的接口更加清晰了。我们启动下,看我们先有的接口。

    代码存储
    https://gitee.com/liwanlei/fastapistuday
    

      

    文章首发在公众号,欢迎关注。

  • 相关阅读:
    2.java基础语法(上)
    1.java概述
    Qt layout透明的问题
    Duilib 关于ChildLayout崩溃的问题
    关于注册表使用的几个问题
    win32接口获取ping值
    Web开发中遇到的问题
    DuiLib 窗口透明方法
    通过进程名杀死进程的方法--WIN32
    关于在Qt的MainWindow窗口中添加Layout的问题
  • 原文地址:https://www.cnblogs.com/leiziv5/p/15416945.html
Copyright © 2011-2022 走看看