一、认证
认证涉及登录,登录为一次post请求,则涉及跨站请求伪造,django中间件csrf很好的解决了这个问题,但是在前后端分离开发的过程中,使用Postman发送请求无法使用该组件,所以将该中间件注释后,自己简单实现一个类似csrftoken的功能:
登录后需要验证token值,每次登录,token值都会变。登录后,所有请求必须带着token值才能通过验证
import hashlib import time # 利用摘要算法生成一条token值,采用加盐方式 class GetToken(object): def __init__(self, username): self.username = username def get_md5_token(self): md = hashlib.md5(bytes(self.username, encoding="utf-8")) md.update(bytes(str(time.time()), encoding="utf-8")) return md.hexdigest() # 登录视图 class LoginView(APIView): def post(self, request): username = request.data.get('username') password = request.data.get('password') obj = models.User.objects.filter(username=username, password=password).first() res = {"state": 200, "msg": None} if obj: token = GetToken(obj.username) str_token = token.get_md5_token() models.Token.objects.update_or_create(user=obj, defaults={"token": str_token}) res["token"] = str_token else: res["state"] = 404 res["msg"] = "登录失败" return Response(res)
认证类:
from app01 import models from rest_framework import exceptions # 认证类 class Authentication(object): def authenticate(self, request): token = request.GET.get('token') obj = models.Token.objects.filter(token=token).first() if obj: return obj.user.username, obj.token else: raise exceptions.AuthenticationFailed('验证失败') # 不加这个方法会报错 def authenticate_header(self, request): pass
为其它视图添加认证组件:
from rest_framework import viewsets from app01 import utils class BookView(viewsets.ModelViewSet): authentication_classes = [utils.Authentication, ] # 认证组件,一个列表,类别中放认证类 queryset = models.Book.objects.all() serializer_class = serializers.BookSerializers
全局视图添加认证类,需要在settings中配置:
REST_FRAMEWORK={ "DEFAULT_AUTHENTICATION_CLASSES":["app01.utils.Authentication",] }
如果添加了全局,某个视图不许要认证,只需要在视图中设置为空即可:
authentication_classes = []
2、权限
一条url代表一条权限,所有权限类验证时,应该是取到用户所有的url后,根据访问的地址进行验证,这里就不详细示例:
class User(models.Model): username = models.CharField(max_length=48) password = models.CharField(max_length=64) roles_choices = ((1, "普通用户"), (2, "管理员"), (3, "超级管理员")) role = models.IntegerField(choices=roles_choices, default=1) def __str__(self): return self.username
# 权限类 class PermissionCheck(object): message = "你不是超级管理员" # 这里的request属于APIview重新构造的reuqest,经过认证后,认证组件返回一个request.user和request.auth, # 这两个的返回结果在自己的认证类中自定义返回的结果,所以这里能够直接调用 def has_permission(self, request, view): if request.user.role == 3: return True else: return False
class BookView(viewsets.ModelViewSet): authentication_classes = [utils.Authentication, ] # 认证组件,一个列表,类别中放认证类 permission_classes = [utils.PermissionCheck, ] # 权限组件, 一个列表, 列表中放权限类 queryset = models.Book.objects.all() serializer_class = serializers.BookSerializers
全局视图加入权限验证需要在settings中配置:
REST_FRAMEWORK={ "DEFAULT_AUTHENTICATION_CLASSES":["app01.utils.Authentication",], # 认证 "DEFAULT_PERMISSION_CLASSES":["app01.utils.PermissionCheck",] # 权限 }
3、频率
rest_framework提供内置throttle类,继承BaseThrottle
from rest_framework.throttling import BaseThrottle VISIT_RECORD = {} # {IP地址:[12:23:05, 12:22:45, 12:22:30]} class VisitThrottle(BaseThrottle): def __init__(self): self.history = None def allow_request(self, request, view): remote_addr = request.META.get('REMOTE_ADDR') # 获取IP地址 print(remote_addr) # xff = request.META.get('HTTP_X_FORWARDED_FOR') # print(xff) import time ctime = time.time() # 第一次访问 if remote_addr not in VISIT_RECORD: # IP地址不在VISIT_RECORD地点当中 VISIT_RECORD[remote_addr] = [ctime, ] # {IP地址:[12:23:05, ]} return True # 第一次访问通过频率验证 # 不是第一次访问 history = VISIT_RECORD.get(remote_addr) # 获取访问的时间列表[12:23:07, 12:23:05] self.history = history # 访问时间列表有值并且最久一次访问的时间距离现在时间已经过去60秒,把这个时间拿出来 while history and history[-1] < ctime - 60: history.pop() # 因为列表pop操作只能把最后一条数据取出,所以列表中的访问时间应该是把最近一次访问时间放在列表最前 # 如果60秒内访问此时小于3次,把这次访问的时间添加到列表最前面。如果大于3次,限制访问 if len(history) < 3: history.insert(0, ctime) return True else: return False # 距离限制还有多久 def wait(self): import time ctime = time.time() return 60 - (ctime - self.history[-1])
from rest_framework import viewsets from app01 import utils class BookView(viewsets.ModelViewSet): authentication_classes = [utils.Authentication, ] # 认证组件,一个列表,类别中放认证类 permission_classes = [utils.PermissionCheck, ] # 权限组件, 一个列表, 列表中放权限类 throttle_classes = [utils.VisitThrottle, ] # 频率组件 queryset = models.Book.objects.all() serializer_class = serializers.BookSerializers
全局视图设置频率:
REST_FRAMEWORK={ "DEFAULT_AUTHENTICATION_CLASSES":["app01.utils.Authentication",], # 认证 "DEFAULT_PERMISSION_CLASSES":["app01.utils.PermissionCheck",], # 权限 "DEFAULT_THROTTLE_CLASSES":["app01utils.VisitThrottle",] # 频率 }
上面是内置频率方法实现原理,使用内置频率方法:
class VisitThrottle(SimpleRateThrottle): scope="visit_rate" # settings中的标识 def get_cache_key(self, request, view): return self.get_ident(request) #get_ident方法是获取用户IP地址,你也可以返回用户名request.user.username
需要在settings中设置:
REST_FRAMEWORK={ "DEFAULT_AUTHENTICATION_CLASSES":["app01.utils.Authentication",], # 认证 "DEFAULT_PERMISSION_CLASSES":["app01.utils.PermissionCheck",], # 权限 "DEFAULT_THROTTLE_CLASSES":["app01utils.VisitThrottle",], # 频率 "DEFAULT_THROTTLE_RATES":{ "visit_rate":"5/m", # m代表分钟,每分钟限制访问5次 } }