三大认证
流程
- 由于DRF中, 所有的视图类都要直接和间接继承APIView类, 也只有APIView类中才有dispatch方法, 所以所有的请求都要经过三大认证, 认证通过后执行相应请求的视图函数
def dispatch(self, request, *args, **kwargs):
"..."
try:
# 三大认证
self.initial(request, *args, **kwargs)
"..."
except Exception as exc:
# 异常处理
response = self.handle_exception(exc)
def initial(self, request, *args, **kwargs):
"..."
# 认证
self.perform_authentication(request)
# 权限
self.check_permissions(request)
# 频率
self.check_throttles(request)
-
认证组件
- 请求未携带token ==> 游客
- 请求携带token
- token认证通过 ==> 合法用户
- token认证未通过 ==> 非法用户
-
权限组件
-
频率组件
认证组件
-
DRF认证类的默认配置
# rest_framework/settings.py
'DEFAULT_AUTHENTICATION_CLASSES': [
# 默认采用的session认证
'rest_framework.authentication.SessionAuthentication',
'rest_framework.authentication.BasicAuthentication'
-
自定义认证类
- 继承BaseAuthentication
- 重写authenticate方法
- 方法体
- 从请求头HTTP_AUTHORIZATION中拿token
- 没有token返回None, 游客
- 有token, 校验不通过, 抛AuthenticationFailed异常, 非法用户
- 有token, 校验通过, 返回(user, token), 合法用户
# authentications.py
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
# 自定义认证类继承BaseAuthentication
class MyAuthentication(BaseAuthentication):
"""
1.从请求中获取token(一般是HTTP_AUTHORIZATION)
2.token为None, 返回None, ==> 游客
3.token不为None
认证失败, raise AuthenticationFailed(''), ==> 非法用户
认证通过, 返回(user, token)或者(user, None), ==> 合法用户
"""
def authenticate(self, request):
pass
-
全局配置认证类
# settings.py
REST_FRAMEWORK = {
# 全局配置认证类
'DEFAULT_AUTHENTICATION_CLASSES': [
'api.authentications.MyAuthentication',
'rest_framework.authentication.BasicAuthentication',
]
}
权限组件
-
DRF权限类的默认配置
# rest_framework/settings.py
'DEFAULT_PERMISSION_CLASSES': [
# 默认不做权限限制
'rest_framework.permissions.AllowAny',
],
class AllowAny(BasePermission):
def has_permission(self, request, view):
return True
"""
DRF默认提供的些权限类:
AllowAny:游客和登录用户有全权限
IsAuthenticated:只有登录用户有全权限
IsAdminUser:只有后台用户(admin用户)有全权限
IsAuthenticatedOrReadOnly:游客有读权限,登录用户有全权限
"""
-
自定义权限类
- 继承BasePermission
- 重写has_permission方法
- 方法体
- 根据实际需求设置条件
- 返回True, 代表有权限
- 返回False, 代表无权限
# permissions.py
from rest_framework.permissions import BasePermission
# vip用户权限
class VipPermission(BasePermission):
def has_permission(self, request, view):
for group in request.user.groups.all():
if group.name.lower() == 'vip':
return True
return False
-
局部配置权限类
# views.py
# 只有vip用户才能进行单查
class UserViewSet(ViewSet):
# 局部配置权限类
permission_classes = [permissions.VipPermission]
def retrieve(self, request, *args, **kwargs):
return APIResponse(results={
'username': request.user.username,
'email': request.user.email,
'mobile': request.user.mobile
})
频率组件
-
自定义频率组件
- 继承SimpleRateThrottle
- 设置
scope='xxx'
, xxx
对应的是在settings.py设置的频率 'xxx': '1/min'
- 重写get_cache_key方法
- 方法体:
- 返回None, 不做限制
- 返回self.cache_format, 有频率限行
# throttles.py
from rest_framework.throttling import SimpleRateThrottle
# 自定义频率限制类: 按照手机号限行
class MobileRateThrottle(SimpleRateThrottle):
scope = 'mobile'
def get_cache_key(self, request, view):
# 匿名用户和没有手机号的不做限制
if not request.user.is_authenticated or not request.user.mobile:
return None # 反正None就是不做限制
return self.cache_format % {
'scope': self.scope,
'ident': request.user.mobile
}
# settings.py
REST_FRAMEWORK = {
# 频率类一般是局部配置, 但是频率调节在settings.py中配置
'DEFAULT_THROTTLE_RATES': {
'user': '5/min',
'anon': '3/min',
'mobile': '1/min'
},
}
自定义token的签发
class LoginModelSerializer(serializers.ModelSerializer):
username = serializers.CharField(max_length=12, min_length=3)
password = serializers.CharField(min_length=6)
class Meta:
model = models.User
fields = ('username', 'password')
# 全局钩子完成token签发
def validate(self, attrs):
# 1.获取user对象
user = self._validate_user(attrs)
# 2.获取payload
payload = jwt_payload_handler(user)
# 3.获取token
token = jwt_encode_handler(payload)
# 4.将user和token保存到serializer对象中, 以便在视图类中使用
self.content = {
'user': user,
'token': token
}
return attrs
# 多方式登录
def _validate_user(self, attrs):
username = attrs.get('username')
password = attrs.get('password')
# 邮箱
if re.match(r'.*@.*', username):
user = models.User.objects.filter(email=username).first()
# 手机号
elif re.match(r'1[3-9][0-9]{9}$', username):
user = models.User.objects.filter(mobile=username).first()
# 用户名
else:
user = models.User.objects.filter(username=username).first()
if not user or not user.check_password(password):
raise serializers.ValidationError({'msg': '用户名或密码错误'})
return user