zoukankan      html  css  js  c++  java
  • DRF的认证权限频率组件

    一.DRF的认证组件

    token

      学习过使用cookie和session两种方式可以保存用户信息,这两种方式不同的是cookie保存在客户端浏览器中,而session保存在服务器中,他们各有优缺点,配合起来使用,可将重要的敏感的信息存储在session中,而在cookie中可以存储不太敏感的数据。

    token认证的大致步骤:

    • 用户登录,服务器端获取用户名密码,查询用户表,如果存在该用户且第一次登录(或者token过期),生成token,否则返回错误信息
    • 如果不是第一次登录,且token未过期,更新token值

    model.py

    from django.db import models
    
    # Create your models here.
    
    
    class User(models.Model):
        username = models.CharField(max_length=32)
        password = models.CharField(max_length=32)
        user_type_entry = (
            (1, 'Delux'),
            (2, 'SVIP'),
            (3, "VVIP")
        )
        user_type = models.IntegerField(choices=user_type_entry)
        address = models.CharField(max_length=32)
    
        def __str__(self):
            return self.username
    
    
    class UserToken(models.Model):
        user = models.OneToOneField("User", on_delete=models.CASCADE)
        token = models.CharField(max_length=128)

    我们无需实现get方法,因为涉及登录认证,所有写post方法接口,登录都是post请求,视图类如下所示:

    from django.http import JsonResponse
    
    from rest_framework.views import APIView
    
    from .models import User, Book, UserToken
    from .utils import get_token
    
    
    class UserView(APIView):
    
        def post(self, request):
            response = dict()
            try:
                username = request.data['username']
                password = request.data['password']
    
                user_instance = User.objects.filter(
                    user_name=username,
                    password=password
                ).first()
    
                if user_instance:
                    access_token = get_token.generater_token()
    
                    UserToken.objects.update_or_create(user=user_instance, defaults={
                        "token": access_token
                    })
                    response["status_code"] = 200
                    response["status_message"] = "登录成功"
                    response["access_token"] = access_token
                    response["user_role"] = user_instance.get_user_type_display()
                else:
                    response["status_code"] = 201
                    response["status_message"] = "登录失败,用户名或密码错误"
            except Exception as e:
                response["status_code"] = 202
                response["status_message"] = str(e)
    
            return JsonResponse(response)

    获取随机字符串的方法用来生成token值:

    import uuid
    
    
    def generater_token():
        random_str = ''.join(str(uuid.uuid4()).split('-'))
        return random_str

    DRF认证组件的使用

      定义一个认证类

    class UserAuth(object):
    
        def authenticate_header(self, request):
            pass
    
        def authenticate(self, request):
            user_post_token = request.query_params.get('token')
    
            token_object = UserToken.objects.filter(token=user_post_token).first()
            if token_object:
                return token_object.user.username, token_object.token
            else:
                raise APIException("认证失败")

      在需要认证的数据接口里面指定认证类

    class BookView(ModelViewSet):
    
        authentication_classes = [UserAuth, UserAuth2]
    
        queryset = Book.objects.all()
        serializer_class =  BookSerializer

    DRF认证源码解析

    执行self.initial()方法
    执行self.perform_authentication(request),方法,注意,新的request对象被传递进去了
    该方法只有一行request.user,根据之前的经验,解析器(request.data),我们知道这个user肯定也是request对的一个属性方法
    所料不错,该方法继续执行self._authenticate(),注意此时的self是request对象
    该方法会循环self.authenticators,而这个变量是在重新实例化request对象时通过参数传递的
    传递该参数是通过get_authenticatos()的返回值来确定的,它的返回值是
    [ auth for auth in self.authentication_classes ]
    也就是我们的BookView里面定义的那个类变量,也就是认证类
    一切都明朗了,循环取到认证类,实例化,并且执行它的authenticate方法
    这就是为什么认证类里面需要有该方法
    如果没有该方法,认证的逻辑就没办法执行
    至于类里面的header方法,照着写就行,有兴趣的可以研究源码,这里就不细究了
    该方法如果执行成功就返回一个元组,执行完毕
    如果失败,它会捕捉一个APIException
    如果我们不希望认证通过,可以raise一个APIException
    View Code

    多个认证组件的使用

    class UserAuth2(object):
    
        def authenticate(self, request):
            raise APIException("认证失败")
    
    
    class UserAuth(object):
    
        def authenticate_header(self, request):
            pass
    
        def authenticate(self, request):
            user_post_token = request.query_params.get('token')
    
            token_object = UserToken.objects.filter(token=user_post_token).first()
            if token_object:
                return token_object.user.username, token_object.token
            else:
                raise APIException("认证失败")
    
    
    class BookView(ModelViewSet):
    
        authentication_classes = [UserAuth, UserAuth2]

      注意:如果需要返回什么数据,请在最后一个认证类中返回,因为如果在前面返回,在self._authentication()方法中会对返回值进行判断,如果不为空,认证的过程就会中止

      如果不希望每次都写那个无用的authenticate_header方法,继承BaseAuthentication类即可。

    from rest_framework.authentication import BaseAuthentication
    
    class UserAuth2(BaseAuthentication):
    
        def authenticate(self, request):
            raise APIException("认证失败")
    
    
    class UserAuth(BaseAuthentication):
    
        def authenticate(self, request):
            user_post_token = request.query_params.get('token')
    
            token_object = UserToken.objects.filter(token=user_post_token).first()
            if token_object:
                return token_object.user.user_name, token_object.token
            else:
                raise APIException("认证失败")

    全局认证组件

      如果认证类自己没有authentication_classes,就会到settings中去找,通过这个机制,我们可以将认证类写入到settings文件中即可实现全局认证。

    REST_FRAMEWORK = {
        'DEFAULT_AUTHENTICATION_CLASSES': (
            'authenticator.utils.authentication.UserAuth',
            'authenticator.utils.authentication.UserAuth2',
        ),
    }

    二.权限组件

      定义一个权限类

    class UserPerms():
        message = "您没有权限访问该数据"
        def has_permission(self, request, view):
            if request.user.user_type > 2:
                return True
            else:
                return False

      在需要认证的数据接口里面指定权限类

    class BookView(ModelViewSet):
    
        authentication_classes = [UserAuth]
        permission_classes = [UserPerms2]
    
        queryset = Book.objects.all()
        serializer_class =  BookSerializer

    三.频率组件的使用

      定义一个频率类

    import time
    import math
    
    from rest_framework import exceptions
    
    
    class MyException(exceptions.Throttled):
        default_detail = '连接次数过多'
        extra_detail_plural = extra_detail_singular = '请在{wait}秒内访问'
    
        def __init__(self, wait=None, detail=None, code=None):
            super().__init__(wait=wait, detail=detail, code=code)
    
    
    class VisitThrottle():
        user_visit_information = dict()
        visited_times = 1
        period = 60
        allow_times_per_minute = 5
        first_time_visit = True
    
        def allow_request(self, request, view):
            self.request_host = request_host = request.META.get("REMOTE_ADDR")
            current_user_info = self.user_visit_information.get(request_host, None)
    
            if not self.__class__.first_time_visit:
                self.user_visit_information[request_host][0] += 1
                current_visit_times = self.user_visit_information[request_host][0]
    
                if current_visit_times > self.allow_times_per_minute:
                    if self._current_time - current_user_info[1] <= self.period:
                        if len(current_user_info) > 2:
                            current_user_info[2] = self._time_left
                        else:
                            current_user_info.append(self._time_left)
    
                        view.throttled = self.throttled
                        return None
                    else:
                        self.__class__.first_time_visit = True
    
            if self.first_time_visit:
                self.__class__.first_time_visit = False
                self._initial_infomation()
    
            return True
    
        def wait(self):
            return self.period - self.user_visit_information[self.request_host][2]
    
        def throttled(self, request, wait):
            raise MyException(wait=wait)
    
        @property
        def _current_time(self):
            return time.time()
    
        @property
        def _time_left(self):
            return math.floor(self._current_time - self.user_visit_information.get(self.request_host)[1])
    
        def _initial_infomation(self):
            self.user_visit_information[self.request_host] = [self.visited_times, self._current_time]

      指定频率类

    class BookView(ModelViewSet):
        throttle_classes = [ VisitThrottle ]
        queryset = Book.objects.all()
        serializer_class = BookSerializer

    使用DRF的简单频率控制来控制用户访问频率(局部)

      局部访问频率的控制

    from rest_framework.throttling import SimpleRateThrottle
    
    
    class RateThrottle(SimpleRateThrottle):
          # 指定访问频率
        rate = '5/m'
    
         # 指定通过什么方式来区分用户
        def get_cache_key(self, request, view):
            return self.get_ident(request)

      指定频率类

    from .utils.throttles import RateThrottle
    
    # Create your views here.
    
    
    class BookView(ModelViewSet):
        throttle_classes = [ RateThrottle ]
        queryset = Book.objects.all()
        serializer_class = BookSerializer

    全局访问频率的控制

      指定一个类

    class RateThrottle(SimpleRateThrottle):
        scope = "visit_rate"
    
        def get_cache_key(self, request, view):
            return self.get_ident(request)

      在settings里面指定频率类和访问频率

    REST_FRAMEWORK = {
        "DEFAULT_THROTTLE_CLASSES": ('throttler.utils.throttles.RateThrottle',),
        "DEFAULT_THROTTLE_RATES": {
            "visit_rate": "5/m"
        }
    }
  • 相关阅读:
    (转)Linux netstat命令详解
    4G模块*99#拨号上网
    (转)Linux系统-tcpdump常用抓包命令
    (转)Makefile介绍
    导航和渲染首页文章列表
    删除项目开发中的.pyc文件
    django之media配置
    基于Ajax提交formdata数据、错误信息展示和局部钩子、全局钩子的校验。
    点击头像上传文件的效果
    使用python实现滑动验证码
  • 原文地址:https://www.cnblogs.com/chenxi67/p/10102771.html
Copyright © 2011-2022 走看看