一.DRF的认证组件
token
学习过使用cookie和session两种方式可以保存用户信息,这两种方式不同的是cookie保存在客户端浏览器中,而session保存在服务器中,他们各有优缺点,配合起来使用,可将重要的敏感的信息存储在session中,而在cookie中可以存储不太敏感的数据。
token认证的大致步骤:
- 用户登录,服务器端获取用户名密码,查询用户表,如果存在该用户且第一次登录(或者token过期),生成token,否则返回错误信息
- 如果不是第一次登录,且token未过期,更新token值
model.py
from django.db import models # Create your models here. class User(models.Model): username = models.CharField(max_length=32) password = models.CharField(max_length=32) user_type_entry = ( (1, 'Delux'), (2, 'SVIP'), (3, "VVIP") ) user_type = models.IntegerField(choices=user_type_entry) address = models.CharField(max_length=32) def __str__(self): return self.username class UserToken(models.Model): user = models.OneToOneField("User", on_delete=models.CASCADE) token = models.CharField(max_length=128)
我们无需实现get方法,因为涉及登录认证,所有写post方法接口,登录都是post请求,视图类如下所示:
from django.http import JsonResponse from rest_framework.views import APIView from .models import User, Book, UserToken from .utils import get_token class UserView(APIView): def post(self, request): response = dict() try: username = request.data['username'] password = request.data['password'] user_instance = User.objects.filter( user_name=username, password=password ).first() if user_instance: access_token = get_token.generater_token() UserToken.objects.update_or_create(user=user_instance, defaults={ "token": access_token }) response["status_code"] = 200 response["status_message"] = "登录成功" response["access_token"] = access_token response["user_role"] = user_instance.get_user_type_display() else: response["status_code"] = 201 response["status_message"] = "登录失败,用户名或密码错误" except Exception as e: response["status_code"] = 202 response["status_message"] = str(e) return JsonResponse(response)
获取随机字符串的方法用来生成token值:
import uuid def generater_token(): random_str = ''.join(str(uuid.uuid4()).split('-')) return random_str
DRF认证组件的使用
定义一个认证类
class UserAuth(object): def authenticate_header(self, request): pass def authenticate(self, request): user_post_token = request.query_params.get('token') token_object = UserToken.objects.filter(token=user_post_token).first() if token_object: return token_object.user.username, token_object.token else: raise APIException("认证失败")
在需要认证的数据接口里面指定认证类
class BookView(ModelViewSet): authentication_classes = [UserAuth, UserAuth2] queryset = Book.objects.all() serializer_class = BookSerializer
DRF认证源码解析

执行self.initial()方法 执行self.perform_authentication(request),方法,注意,新的request对象被传递进去了 该方法只有一行request.user,根据之前的经验,解析器(request.data),我们知道这个user肯定也是request对的一个属性方法 所料不错,该方法继续执行self._authenticate(),注意此时的self是request对象 该方法会循环self.authenticators,而这个变量是在重新实例化request对象时通过参数传递的 传递该参数是通过get_authenticatos()的返回值来确定的,它的返回值是 [ auth for auth in self.authentication_classes ] 也就是我们的BookView里面定义的那个类变量,也就是认证类 一切都明朗了,循环取到认证类,实例化,并且执行它的authenticate方法 这就是为什么认证类里面需要有该方法 如果没有该方法,认证的逻辑就没办法执行 至于类里面的header方法,照着写就行,有兴趣的可以研究源码,这里就不细究了 该方法如果执行成功就返回一个元组,执行完毕 如果失败,它会捕捉一个APIException 如果我们不希望认证通过,可以raise一个APIException
多个认证组件的使用
class UserAuth2(object): def authenticate(self, request): raise APIException("认证失败") class UserAuth(object): def authenticate_header(self, request): pass def authenticate(self, request): user_post_token = request.query_params.get('token') token_object = UserToken.objects.filter(token=user_post_token).first() if token_object: return token_object.user.username, token_object.token else: raise APIException("认证失败") class BookView(ModelViewSet): authentication_classes = [UserAuth, UserAuth2]
注意:如果需要返回什么数据,请在最后一个认证类中返回,因为如果在前面返回,在self._authentication()方法中会对返回值进行判断,如果不为空,认证的过程就会中止
如果不希望每次都写那个无用的authenticate_header方法,继承BaseAuthentication类即可。
from rest_framework.authentication import BaseAuthentication class UserAuth2(BaseAuthentication): def authenticate(self, request): raise APIException("认证失败") class UserAuth(BaseAuthentication): def authenticate(self, request): user_post_token = request.query_params.get('token') token_object = UserToken.objects.filter(token=user_post_token).first() if token_object: return token_object.user.user_name, token_object.token else: raise APIException("认证失败")
全局认证组件
如果认证类自己没有authentication_classes,就会到settings中去找,通过这个机制,我们可以将认证类写入到settings文件中即可实现全局认证。
REST_FRAMEWORK = { 'DEFAULT_AUTHENTICATION_CLASSES': ( 'authenticator.utils.authentication.UserAuth', 'authenticator.utils.authentication.UserAuth2', ), }
二.权限组件
定义一个权限类
class UserPerms(): message = "您没有权限访问该数据" def has_permission(self, request, view): if request.user.user_type > 2: return True else: return False
在需要认证的数据接口里面指定权限类
class BookView(ModelViewSet): authentication_classes = [UserAuth] permission_classes = [UserPerms2] queryset = Book.objects.all() serializer_class = BookSerializer
三.频率组件的使用
定义一个频率类
import time import math from rest_framework import exceptions class MyException(exceptions.Throttled): default_detail = '连接次数过多' extra_detail_plural = extra_detail_singular = '请在{wait}秒内访问' def __init__(self, wait=None, detail=None, code=None): super().__init__(wait=wait, detail=detail, code=code) class VisitThrottle(): user_visit_information = dict() visited_times = 1 period = 60 allow_times_per_minute = 5 first_time_visit = True def allow_request(self, request, view): self.request_host = request_host = request.META.get("REMOTE_ADDR") current_user_info = self.user_visit_information.get(request_host, None) if not self.__class__.first_time_visit: self.user_visit_information[request_host][0] += 1 current_visit_times = self.user_visit_information[request_host][0] if current_visit_times > self.allow_times_per_minute: if self._current_time - current_user_info[1] <= self.period: if len(current_user_info) > 2: current_user_info[2] = self._time_left else: current_user_info.append(self._time_left) view.throttled = self.throttled return None else: self.__class__.first_time_visit = True if self.first_time_visit: self.__class__.first_time_visit = False self._initial_infomation() return True def wait(self): return self.period - self.user_visit_information[self.request_host][2] def throttled(self, request, wait): raise MyException(wait=wait) @property def _current_time(self): return time.time() @property def _time_left(self): return math.floor(self._current_time - self.user_visit_information.get(self.request_host)[1]) def _initial_infomation(self): self.user_visit_information[self.request_host] = [self.visited_times, self._current_time]
指定频率类
class BookView(ModelViewSet): throttle_classes = [ VisitThrottle ] queryset = Book.objects.all() serializer_class = BookSerializer
使用DRF的简单频率控制来控制用户访问频率(局部)
局部访问频率的控制
from rest_framework.throttling import SimpleRateThrottle class RateThrottle(SimpleRateThrottle): # 指定访问频率 rate = '5/m' # 指定通过什么方式来区分用户 def get_cache_key(self, request, view): return self.get_ident(request)
指定频率类
from .utils.throttles import RateThrottle
# Create your views here.
class BookView(ModelViewSet):
throttle_classes = [ RateThrottle ]
queryset = Book.objects.all()
serializer_class = BookSerializer
全局访问频率的控制
指定一个类
class RateThrottle(SimpleRateThrottle): scope = "visit_rate" def get_cache_key(self, request, view): return self.get_ident(request)
在settings里面指定频率类和访问频率
REST_FRAMEWORK = { "DEFAULT_THROTTLE_CLASSES": ('throttler.utils.throttles.RateThrottle',), "DEFAULT_THROTTLE_RATES": { "visit_rate": "5/m" } }