zoukankan      html  css  js  c++  java
  • django restframework 基于token实现用户认证详解

    本文主要介绍django restframework 用户认证部分的内容

    • 环境配置
    • 基于 token 认证
    • JWT 认证

    1、环境配置

    pip install django==2.0
    
    pip install djangorestframework==3.10.0
    
    pip install pymysql==1.0.2
    

    2、基于token认证(非全局配置)

    2.1 数据库模型设计:models.py

    from django.db import models
    from werkzeug.security import generate_password_hash, check_password_hash
    
    class User(models.Model):
        db_table = 'adminuser_user'
    
        id = models.AutoField(primary_key=True)
        username = models.CharField(max_length=30)
        password_hash = models.TextField()
        root = models.IntegerField()
        email = models.EmailField(max_length=30, default='')
        mobile = models.CharField(max_length=11, default='')
        status = models.IntegerField(default=0)
    
    
        @property
        def password(self):
            raise AttributeError('Can not read password!')
    
        @password.setter
        def password(self, password):
            self.password_hash = generate_password_hash(password)
    
        def verify_password(self, password):
            return check_password_hash(self.password_hash, password)
    

    2.2 同步数据库:数据同步好了之后添加几条用户信息

    image

    2.3 路由:urls.py

    from django.urls import path,re_path
    from . import views, loginView
    
    urlpatterns = [
        # 登录接口
        path('admin/login', loginView.LoginView.as_view()),
        path('admin/users', views.UserView.as_view()),
    ]
    app_name = 'Admin'
    

    2.4 视图:views.py

    -------------------------------------------loginView.py-------------------------------------------
    from .models import User
    import datetime
    from rest_framework.response import Response
    from rest_framework.views import APIView
    import jwt
    from PetHome import settings
    
    
    class LoginView(APIView):
        def post(self, request):
            ret = {
                "data": {},
                "meta": {
                    "status": 200,
                    "message": ""
                }
            }
            try:
                username = request.data["username"]
                password = request.data["password"]
                print(username, password)
                if username and password:
                    user = User.objects.filter(username=username)
                    print(2)
                    if user.count == 0:
                        print(3)
                        ret["meta"]["status"] = 500
                        ret["meta"]["message"] = "用户不存在或密码错误"
                        return Response(ret)
                    elif user and user.first().verify_password(password):
                        print(4)
                        dict = {
                            "exp": datetime.datetime.now() + datetime.timedelta(days=1),  # 过期时间
                            "iat": datetime.datetime.now(),  # 开始时间
                            "id": user.first().id,
                            "username": user.first().username,
                            "mobile": user.first().mobile
                        }
                        token = jwt.encode(dict, settings.SECRET_KEY, algorithm="HS256")
                        ret["data"]["token"] = token
                        ret["data"]["username"] = user.first().username
                        ret["data"]["user_id"] = user.first().id
                        ret["meta"]["status"] = 200
                        ret["meta"]["message"] = "登录成功"
                        return Response(ret)
                    else:
                        ret["meta"]["status"] = 500
                        ret["meta"]["message"] = "用户不存在或密码错误"
                        return Response(ret)
            except Exception as error:
                print(error)
                ret["meta"]["status"] = 500
                ret["meta"]["message"] = "用户不存在或密码错误"
                return Response(ret)
    
    -------------------------------------------views.py-------------------------------------------
    
    from werkzeug.security import generate_password_hash
    from .TokenAuthtication import TokenAuthtication
    from rest_framework.response import Response
    from rest_framework.views import APIView
    from adminuser.serializer.userSerializers import *
    from rest_framework.pagination import PageNumberPagination
    from collections import OrderedDict
    from .models import User
    
    class UserView(APIView):
        authentication_classes = [TokenAuthtication, ]
        def get(self, request, *args, **kwargs):
            """
            获取用户列表
            :param request:
            :param args:
            :param kwargs:
            :return:
            """
            pk = kwargs.get('pk')  # 获取单挑数据 id 值
            query = request.GET.get('query')
            print(query)
            ret = {
                "data": {
                    "users": []
                },
                "meta": {
                    "status": 200,
                    "message": ""
                }
            }
            page = LargeResultsSetPagination()
            if not pk:
                if not query:
                    queryset = User.objects.all().order_by("-id")
                    page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
                    ser = UserSerializer(instance=page_user, many=True)
                    ret["data"]["users"] = ser.data
                else:
                    queryset = User.objects.filter(Q(mobile__contains=query) | Q(username__contains=query) | Q(email__contains=query)).order_by("-id")
                    page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
                    ser = UserSerializer(instance=page_user, many=True)
                    ret["data"]["users"] = ser.data
                return page.get_paginated_response(ret)
            else:
                if User.objects.filter(pk=pk):
                    obj_dict = User.objects.filter(pk=pk).first()
                    ser = UserSerializer(instance=obj_dict, many=False)
                    ret["data"]["users"] = ser.data
                else:
                    ret["meta"]["status"] = 500
                    ret["meta"]["message"] = "系统报错"
                return Response(ret)
    
    
    -------------------------------------------TokenAuthtication.py-------------------------------------------
    
    from rest_framework.authentication import BaseAuthentication
    from rest_framework import exceptions
    import time
    from PetHome import settings
    import jwt
    from .models import User
    
    
    class TokenAuthtication(BaseAuthentication):
        def authenticate(self, request):
            """
            先获取 header 中的token进行解析,在进行校验
                - 获取token,检查过期时间是否大于当前时间
            :param request:
            :return:
            """
            try:
                # header 中的token 在 request.MEAT.get("HTTP_TOKEN")获取
                token = request.META.get("HTTP_TOKEN")
                print("检查token:" + token)
                data = jwt.decode(token, settings.SECRET_KEY, 'HS256')
                print(data)
                if data["exp"] < int(time.time()):
                    raise exceptions.AuthenticationFailed("登录超时,请重新登录")
                elif User.objects.filter(username=data["username"]).count == 0:
                    raise exceptions.AuthenticationFailed("认证用户不存在")
    
            except Exception as error:
                print(error)
                raise exceptions.AuthenticationFailed("用户未登录,请登录")
    

    2.5 测试

    http://127.0.0.1:8089/admin/login

    image

    http://127.0.0.1:8089/admin/users

    1、用户未登录时 (请求headers未携带token)
    image
    2、用户已经登 (请求headers携带token)
    image

    3、全局配置

    • setting.py添加配置
      -------------------------------------------setting.py.py-------------------------------------------
    # django restframework 配置
    REST_FRAMEWORK = {
        'DEFAULT_AUTHENTICATION_CLASSES': ['adminuser.TokenAuthtication.TokenAuthtication', ]
    }
    

    1、全局配置用户认证后,所有的接口类都不需要添加 authentication_classes = [TokenAuthtication, ] 属性,默认所有接口都会进行登录校验
    ------------------------------------------例子1:views.py-------------------------------------------

    from werkzeug.security import generate_password_hash
    from .TokenAuthtication import TokenAuthtication
    from rest_framework.response import Response
    from rest_framework.views import APIView
    from adminuser.serializer.userSerializers import *
    from rest_framework.pagination import PageNumberPagination
    from collections import OrderedDict
    from .models import User
    
    class UserView(APIView):
        def get(self, request, *args, **kwargs):
            """
            获取用户列表
            :param request:
            :param args:
            :param kwargs:
            :return:
            """
            pk = kwargs.get('pk')  # 获取单挑数据 id 值
            query = request.GET.get('query')
            print(query)
            ret = {
                "data": {
                    "users": []
                },
                "meta": {
                    "status": 200,
                    "message": ""
                }
            }
            page = LargeResultsSetPagination()
            if not pk:
                if not query:
                    queryset = User.objects.all().order_by("-id")
                    page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
                    ser = UserSerializer(instance=page_user, many=True)
                    ret["data"]["users"] = ser.data
                else:
                    queryset = User.objects.filter(Q(mobile__contains=query) | Q(username__contains=query) | Q(email__contains=query)).order_by("-id")
                    page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
                    ser = UserSerializer(instance=page_user, many=True)
                    ret["data"]["users"] = ser.data
                return page.get_paginated_response(ret)
            else:
                if User.objects.filter(pk=pk):
                    obj_dict = User.objects.filter(pk=pk).first()
                    ser = UserSerializer(instance=obj_dict, many=False)
                    ret["data"]["users"] = ser.data
                else:
                    ret["meta"]["status"] = 500
                    ret["meta"]["message"] = "系统报错"
                return Response(ret)
    

    2、为解决部分接口不需要登录验证问题,只需要在该类中设置 authentication_classes = [] ,[] 表示不需要认证

    ------------------------------------------例子2:views.py-------------------------------------------

    from werkzeug.security import generate_password_hash
    from .TokenAuthtication import TokenAuthtication
    from rest_framework.response import Response
    from rest_framework.views import APIView
    from adminuser.serializer.userSerializers import *
    from rest_framework.pagination import PageNumberPagination
    from collections import OrderedDict
    from .models import User
    
    class UserView(APIView):
        authentication_classes = []
        def get(self, request, *args, **kwargs):
            """
            获取用户列表
            :param request:
            :param args:
            :param kwargs:
            :return:
            """
            pk = kwargs.get('pk')  # 获取单挑数据 id 值
            query = request.GET.get('query')
            print(query)
            ret = {
                "data": {
                    "users": []
                },
                "meta": {
                    "status": 200,
                    "message": ""
                }
            }
            page = LargeResultsSetPagination()
            if not pk:
                if not query:
                    queryset = User.objects.all().order_by("-id")
                    page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
                    ser = UserSerializer(instance=page_user, many=True)
                    ret["data"]["users"] = ser.data
                else:
                    queryset = User.objects.filter(Q(mobile__contains=query) | Q(username__contains=query) | Q(email__contains=query)).order_by("-id")
                    page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
                    ser = UserSerializer(instance=page_user, many=True)
                    ret["data"]["users"] = ser.data
                return page.get_paginated_response(ret)
            else:
                if User.objects.filter(pk=pk):
                    obj_dict = User.objects.filter(pk=pk).first()
                    ser = UserSerializer(instance=obj_dict, many=False)
                    ret["data"]["users"] = ser.data
                else:
                    ret["meta"]["status"] = 500
                    ret["meta"]["message"] = "系统报错"
                return Response(ret)
    

    不传 token 也能返回数据啦

    参考文档:
    1、https://www.cnblogs.com/zhangqunshi/p/8432209.html
    2、https://www.cnblogs.com/xingxingnbsp/articles/12597565.html

  • 相关阅读:
    idea--不能启动问题
    linux--mysql5.7安装
    vmware
    debezium
    java枚举
    springfox
    日志级别
    lombok--知识点
    es6--箭头函数
    网址访问过慢
  • 原文地址:https://www.cnblogs.com/DeryKong/p/15743275.html
Copyright © 2011-2022 走看看