一.认证
定义一个用户表和一个保存用户Token的表:
class UserInfo(models.Model):
username = models.CharField(max_length=16)
password = models.CharField(max_length=32)
type = models.SmallIntegerField(
choices=((1, '普通用户'), (2, 'VIP用户')),
default=1
)
class Token(models.Model):
user = models.OneToOneField(to='UserInfo')
token_code = models.CharField(max_length=128)
1.定义一个登录视图:
def get_random_token(username):
"""
根据用户名和时间戳生成随机token
:param username:
:return:
"""
import hashlib, time
timestamp = str(time.time())
m = hashlib.md5(bytes(username, encoding="utf8"))
m.update(bytes(timestamp, encoding="utf8"))
return m.hexdigest()
class LoginView(APIView):
"""
校验用户名密码是否正确从而生成token的视图
"""
def post(self, request):
res = {"code": 0}
print(request.data)
username = request.data.get("username")
password = request.data.get("password")
user = models.UserInfo.objects.filter(username=username, password=password).first()
if user:
# 如果用户名密码正确
token = get_random_token(username)
models.Token.objects.update_or_create(defaults={"token_code": token}, user=user)
res["token"] = token
else:
res["code"] = 1
res["error"] = "用户名或密码错误"
return Response(res)
2.定义一个认证类
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
class MyAuth(BaseAuthentication):
def authenticate(self, request):
if request.method in ["POST", "PUT", "DELETE"]:
request_token = request.data.get("token", None)
if not request_token:
raise AuthenticationFailed('缺少token')
token_obj = models.Token.objects.filter(token_code=request_token).first()
if not token_obj:
raise AuthenticationFailed('无效的token')
return token_obj.user.username, None
else:
return None, None
3.视图级别认证
class CommentViewSet(ModelViewSet):
queryset = models.Comment.objects.all()
serializer_class = app01_serializers.CommentSerializer
authentication_classes = [MyAuth, ]
4.全局级别认证
# 在settings.py中配置
REST_FRAMEWORK = {
"DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.MyAuth", ]
}
二.权限
只有VIP用户才能看的内容。
1.自定义一个权限类
# 自定义权限
class MyPermission(BasePermission):
message = 'VIP用户才能访问'
def has_permission(self, request, view):
"""
自定义权限只有VIP用户才能访问
"""
# 因为在进行权限判断之前已经做了认证判断,所以这里可以直接拿到request.user
if request.user and request.user.type == 2: # 如果是VIP用户
return True
else:
return False
2.视图级别配置
class CommentViewSet(ModelViewSet):
queryset = models.Comment.objects.all()
serializer_class = app01_serializers.CommentSerializer
authentication_classes = [MyAuth, ]
permission_classes = [MyPermission, ]
3.全局级别设置
# 在settings.py中设置rest framework相关配置项
REST_FRAMEWORK = {
"DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.MyAuth", ],
"DEFAULT_PERMISSION_CLASSES": ["app01.utils.MyPermission", ]
}
三.限制
1.自定义限制类
VISIT_RECORD = {}
# 自定义限制
class MyThrottle(object):
def __init__(self):
self.history = None
def allow_request(self, request, view):
"""
自定义频率限制60秒内只能访问三次
"""
# 获取用户IP
ip = request.META.get("REMOTE_ADDR")
timestamp = time.time()
if ip not in VISIT_RECORD:
VISIT_RECORD[ip] = [timestamp, ]
return True
history = VISIT_RECORD[ip]
self.history = history
history.insert(0, timestamp)
while history and history[-1] < timestamp - 60:
history.pop()
if len(history) > 3:
return False
else:
return True
def wait(self):
"""
限制时间还剩多少
"""
timestamp = time.time()
return 60 - (timestamp - self.history[-1])
2.视图使用
class CommentViewSet(ModelViewSet):
queryset = models.Comment.objects.all()
serializer_class = app01_serializers.CommentSerializer
throttle_classes = [MyThrottle, ]
3.全局使用
# 在settings.py中设置rest framework相关配置项
REST_FRAMEWORK = {
"DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.MyAuth", ],
"DEFAULT_PERMISSION_CLASSES": ["app01.utils.MyPermission", ]
"DEFAULT_THROTTLE_CLASSES": ["app01.utils.MyThrottle", ]
}
4.使用内置限制类
from rest_framework.throttling import SimpleRateThrottle
class VisitThrottle(SimpleRateThrottle):
scope = "xxx"
def get_cache_key(self, request, view):
return self.get_ident(request)
5.全局配置
# 在settings.py中设置rest framework相关配置项
REST_FRAMEWORK = {
"DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.MyAuth", ],
# "DEFAULT_PERMISSION_CLASSES": ["app01.utils.MyPermission", ]
"DEFAULT_THROTTLE_CLASSES": ["app01.utils.VisitThrottle", ],
"DEFAULT_THROTTLE_RATES": {
"xxx": "5/m",
}
}
#频率限制详解
import time
from django.utils.deprecation import MiddlewareMixin
from django.shortcuts import HttpResponse
# 访问IP池
visit_ip_pool = {}
class RequestBlockingMiddleware(MiddlewareMixin):
def process_request(self,request):
# 获取访问者IP
ip=request.META.get("REMOTE_ADDR")
# 获取访问当前时间
visit_time=time.time()
# 判断如果访问IP不在池中,就将访问的ip时间插入到对应ip的key值列表,如{"127.0.0.1":[时间1]}
if ip not in visit_ip_pool:
visit_ip_pool[ip]=[visit_time]
return None
# 然后在从池中取出时间列表
history_time = visit_ip_pool.get(ip)
# 循环判断当前ip的时间列表,有值,并且当前时间减去列表的最后一个时间大于60s,把这种数据pop掉,这样列表中只有60s以内的访问时间,
while history_time and visit_time-history_time[-1]>60:
history_time.pop()
# 如果访问次数小于10次就将访问的ip时间插入到对应ip的key值列表的第一位置,如{"127.0.0.1":[时间2,时间1]}
print(history_time)
if len(history_time)<10:
history_time.insert(0, visit_time)
return None
else:
# 如果大于10次就禁止访问
return HttpResponse("访问过于频繁,还需等待%s秒才能继续访问"%int(60-(visit_time-history_time[-1])))