认证组件、权限组件、频率组件
一、Django权限六张表
1.1、content_type表
"""
Course:
name、type、days、price、vip_type
基础 免费课 7 0
中级 学位课 180 69
究极 会员课 360 至尊会员
Course:
name、type、days、content_type_id
基础 免费课 7 null
中级 学位课 180 1
究极 会员课 360 2
app01_course_1
id、price
app01_course_2
id vip_type
content_type表(Django提供)
id、app_label、model
1 app01 course_1
2 app01 course_2
"""
!
content_type表作用: 作为两张表之间的关联第三张表
- 应用一:权限表的权限是操作表的,所有在权限表中有一个content_type表的外键,标识改权限具有操作哪一张表的权限
- 应用二:价格策略
- 缺点:增加了查询效率
1.2 admin关联自定义
通过后台在浏览器中显示对象的字段或者对密码进行加密等操作
from django.contrib import admin
from . import models
# admin注册自定义User表:密文操作密码
from django.contrib.auth.admin import UserAdmin as AuthUserAdmin
class UserAdmin(AuthUserAdmin):
add_fieldsets = (
(None, {
'classes': ('wide',),
# 添加用户界面可操作的字段
'fields': ('username', 'password1', 'password2', 'mobile', 'email', 'is_staff', 'is_active'),
}),
)
list_display = ('username', 'mobile', 'email', 'is_staff', 'is_active')
# 明文操作密码,admin可视化添加的用户密码都是明文,登录时用的是密文,所以用户无法登录
# admin.site.register(models.User)
# 新建的类添加进去
admin.site.register(models.User, UserAdmin)
二、认证组件和权限组件的使用
2.1全局配置
# settings.py
# drf的配置
REST_FRAMEWORK = {
# 渲染模块
'DEFAULT_RENDERER_CLASSES': ['rest_framework.renderers.JSONRenderer'],
# 异常模块
# 'EXCEPTION_HANDLER': 'rest_framework.views.exception_handler',
'EXCEPTION_HANDLER': 'utilss.exception.exception_handler',
# 全局配置 认证
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.SessionAuthentication',
'rest_framework.authentication.BasicAuthentication'
],
# 全局配置 权限
'DEFAULT_PERMISSION_CLASSES': [
'rest_framework.permissions.AllowAny',
],
}
2.2局部配置
from utilss.authentications import TokenAuthentication
from utilss.permissions import SuperUserPermission
class UserListAPIView(ListAPIView):
# 局部配置
authentication_classes = [TokenAuthentication]
permission_classes = [SuperUserPermission]
三、认证和权限实现
3.1 urls.py
urlpatterns = [
url(r'^user/', views.UserListAPIView.as_view()),
url(r'^login/', views.LoginAPIView.as_view()),
]
3.2 view.py
from rest_framework.generics import ListAPIView # 视图工具类
from api import models, serializers
from utilss.authentications import TokenAuthentication # 自定义验证
from utilss.permissions import SuperUserPermission # 自定义权限
class UserListAPIView(ListAPIView):
"""
查看所有用户信息,前提,必须是登录的超级管理员
同电商网站,多接口是不需要登录的,少接口需要登录,使用在需要登录的接口中完成局部配置,进行局部接口校验
"""
authentication_classes = [TokenAuthentication]
permission_classes = [SuperUserPermission]
queryset = models.User.objects.filter(is_active=True, is_superuser=False).all()
serializer_class = serializers.UserSerializers
def get(self, request, *args, **kwargs):
response = self.list(request, *args, **kwargs)
return APIResponse(data=response.data)
# 登录接口:如果是超级管理员登录,返回一个可以校验出超级管理员的token字符串
# 只要有用户登录,就可以返回一个与登录用户相关的token字符串 => 返回给前台 => 签发token => user_obj -> token_str
class LoginAPIView(APIView):
# 登录接口一定要做局部禁用认证与权限的校验
authentication_classes = []
permission_classes = []
def post(self, request, *args, **kwargs):
login_serial = serializers.LoginModelSerializer(data=request.data)
login_serial.is_valid(raise_exception=True)
# 重点:校验成功后,就可以返回信息,一定不能调用save方法,因为该post方法只完成数据库查操作
# 所以校验会得到user对象,并且在校验过程中,会完成token签发(user_obj -> token_str)
return APIResponse(data={
"username": login_serial.user.username,
"token": login_serial.token
})
总结:
- 登录接口:
- 如果是超级管理员登录,返回一个可以校验出超级管理员的token字符串
- 只有其他用户登录就返回一个与登录登录相关的token字符串,(返回给前台 => 签发token => user_obj -> token_str)
- 注意:登录忌口一定要做局部禁用认证和权限的校验
- 校验成功后,就返回一个字符串token,一定不能调用save方法,因为post方法只完成数据库的查操作,所以在自定义校验序列化类是要自定义字段进行校验用户是否存在
- 校验成功后会得到user对象,并且在校验过程,完成对token的签发(生成一个字符串,user_obj -> token_str)
3.3 自定义序列化类
from . import models
from rest_framework import serializers
# 用户信息查看序列化接口
class UserSerializers(serializers.ModelSerializer):
class Meta:
model = models.User
fields = ("username", "mobile", "email")
from django.contrib import auth
from rest_framework.serializers import ValidationError
# 用户登录
class LoginModelSerializer(serializers.ModelSerializer):
user = serializers.CharField(max_length=20)
pwd = serializers.CharField(max_length=30)
class Meta:
model = models.User
fields = ("user", "pwd")
def validate(self, attrs):
"""
全局钩子校验用户是否存在,存在返回token
:param attrs:
:return:
"""
user = attrs.get("user")
pwd = attrs.get("pwd")
try:
user_obj = auth.authenticate(username=user, password=pwd)
except:
raise serializers.ValidationError({"user": "提供用户信息有误"})
# 将当前用户放到名称空间中
self.user = user_obj
# 将生成token放到名称空间中
self.token = _get_token(user_obj)
return attrs
# 签发算法:b64encode(用户名).b64encode(用户主键).md5(用户名+用户主键+服务器秘钥)
# 自定义签发token
# 分析:拿user得到token,后期还需要通过token得到user
# token:用户名(base64加密).用户主键(base64加密).用户名+用户主键+服务器秘钥(md5加密)
# eg: YWJj.Ao12bd.2c953ca5144a6c0a187a264ef08e1af1
# 校验算法(认证类)与签发算法配套
"""
拆封token:一段 二段 三段
用户名:b64decode(一段)
用户主键:b64decode(二段)
碰撞解密:md5(用户名+用户主键+服务器秘钥) == 三段
"""
import base64, json, hashlib
from d_pro import settings
# 获取token
def _get_token(obj):
"""
签发算法:b64encode(用户名).b64encode(用户主键).md5(用户名+用户主键+服务器秘钥)
自定义签发token
分析:拿user得到token,后期还需要通过token得到user
token:用户名(base64加密).用户主键(base64加密).用户名+用户主键+服务器秘钥(md5加密)
eg: YWJj.Ao12bd.2c953ca5144a6c0a187a264ef08e1af1
:param obj:
:return:
"""
user_base = base64.b64encode(json.dumps({"username": obj.username}).encode()).decode()
id_base = base64.b64encode(json.dumps({"pk": obj.id}).encode()).decode()
user_md5 = hashlib.md5(
json.dumps({
"username": obj.username,
"pk": obj.id,
"key": settings.SECRET_KEY
}).encode()
).hexdigest()
print(f"{user_base}.{id_base}.{user_md5}")
return f"{user_base}.{id_base}.{user_md5}"
总结:
- 用户登录校验序列化类,校验用户用户是否存在, 并返回token字符传
- 校验字段必须是自己定义的,如果使用自带的字段怎会报错,因为post是保存数据的,会执行到save方法,所有要自己定义字段通过全局钩子实现校验用户是否登录,并将token添加到名称空间中
- 自定义token字符串
- 签发算法:b64encode(用户名).b64encode(用户主键).md5(用户名+用户主键+服务器秘钥)
- 进入其他的页面会通过token得到user,判断用户是否登录,所以用户名和组件采用可解码加密,最后一个采用不可解码加密
- 使用base64模块,json模块,hashlib模块
3.4 自定义认证类.py
# 自定义认证类
"""
认证模块工作原理
1)继承BaseAuthentication类,重写authenticate方法
2)认证规则(authenticate方法实现体):
没有携带认证信息,直接返回None => 游客
有认证信息,校验失败,抛异常 => 非法用户
有认证信息,校验出User对象 => 合法用户
"""
from rest_framework.authentication import BasicAuthentication
from rest_framework.exceptions import AuthenticationFailed
class TokenAuthentication(BasicAuthentication):
prefix = "token"
def authenticate(self, request):
# 获取头携带的token
auth = request.META.get("HTTP_AUTHORIZATION")
# 如果为空返回None,进行校验
if not auth:
return None
# 获取 token和值
auth_list = auth.split()
# 判断是否正确
if not (len(auth_list) == 2 and auth_list[0].lower() == self.prefix.lower()):
raise AuthenticationFailed("非法用户")
token = auth_list[1]
# 校验算法
user = _get_obj(token)
# 校验失败抛异常,成功返回(user, token)
return (user, token)
import base64, hashlib, json
from d_pro import settings
from api.models import User
def _get_obj(token):
"""
校验算法(认证类)与签发算法配套
拆封token:一段 二段 三段
用户名:b64decode(一段)
用户主键:b64decode(二段)
碰撞解密:md5(用户名+用户主键+服务器秘钥) == 三段
"""
token_list = token.split(".")
# 判断是否为三分
if not len(token_list):
raise AuthenticationFailed("token异常")
# 获取用户名和pk
username = json.loads(base64.b64decode(token_list[0])).get("username")
pk = json.loads(base64.b64decode(token_list[1])).get("pk")
md5_dic = {
"username": username,
"pk": pk,
"key": settings.SECRET_KEY
}
# 判断token是否相等
if token_list[2] != hashlib.md5(json.dumps(md5_dic).encode()).hexdigest():
raise AuthenticationFailed("token内容异常")
user_obj = User.objects.filter(pk=pk, username=username).first()
return user_obj
""" 认证类的认证核心规则
def authenticate(self, request):
token = get_token(request)
try:
user = get_user(token) # 校验算法
except:
raise AuthenticationFailed()
return (user, token)
"""
总结:
- 自定义验证模块,继承BaseAuthentication类,重写authenticate方法
- 认证规则(authenticate方法实现体):
- 没有携带认证信息,直接返回None => 游客
- 有认证信息,校验失败,抛异常 => 非法用户
- 有认证信息,校验出User对象 => 合法用户
- 获取token进行解密
- 校验算法(认证类)与签发算法配套
- 拆封token:一段 二段 三段
- 用户名:b64decode(一段)
- 用户主键:b64decode(二段)
- 碰撞解密:md5(用户名+用户主键+服务器秘钥) == 三段
- 认证规则(authenticate方法实现体)
3.5 自定义权限类
# 自定义权限类
"""
权限模块工作原理
1)继承BasePermission类,重写has_permission方法
2)权限规则(has_permission方法实现体):
返回True,代表有权限
返回False,代表无权限
"""
class SuperUserPermission(BasePermission):
def has_permission(self, request, view):
return request.user and request.user.is_superuser
# 自定义权限类
"""
权限模块工作原理
1)继承BasePermission类,重写has_permission方法
2)权限规则(has_permission方法实现体):
返回True,代表有权限
返回False,代表无权限
"""
总结:
- 继承BasePermission类,重写has_permission方法
- 权限规则(has_permission方法实现体):返回True,代表有权限,返回False,代表无权限
四、源码分析
4.1认证模块源码分析
# 全局配置 认证
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.SessionAuthentication',
'rest_framework.authentication.BasicAuthentication'
],
# 1.全局配置,SessionAuthentication,BasicAuthentication
# SessionAuthentication内容
def authenticate(self, request):
"""
Returns a `User` if the request session currently has a logged in user.
Otherwise returns `None`.
"""
# Get the session-based user from the underlying HttpRequest object
user = getattr(request._request, 'user', None)
# Unauthenticated, CSRF validation not required
if not user or not user.is_active:
return None
self.enforce_csrf(request)
# CSRF passed with authenticated user
return (user, None)
# 2. BasicAuthentication
def authenticate(self, request):
# 获取认证头
# 1)从请求头中获取 认证字符串(auth(后台) - token(前台))
auth = get_authorization_header(request).split()
# 2)如果没有auth,返回None =>游客 在认证模块中就是返回None
if not auth or auth[0].lower() != b'basic':
return None
# 3) 如果有认证字段一定校验
# 校验失败: 抛出校验失败的异常
# 校验成功: 返回长度为2的元组,且第一位是登录user对象,第二位none 空字符传auth
if len(auth) == 1:
msg = _('Invalid basic header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid basic header. Credentials string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
try:
# 反向解密获取用户id和用户密码
auth_parts = base64.b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).partition(':')
except (TypeError, UnicodeDecodeError, binascii.Error):
msg = _('Invalid basic header. Credentials not correctly base64 encoded.')
raise exceptions.AuthenticationFailed(msg)
# 4)校验成功
userid, password = auth_parts[0], auth_parts[2]
# 5)返回用户
return self.authenticate_credentials(userid, password, request)
# 查询用户是否存在,返回用户对象
def authenticate_credentials(self, userid, password, request=None):
credentials = {
get_user_model().USERNAME_FIELD: userid,
'password': password
}
# 校验用户
user = authenticate(request=request, **credentials)
if user is None:
raise exceptions.AuthenticationFailed(_('Invalid username/password.'))
if not user.is_active:
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
return (user, None)
# 认证范围 当前api接口
def authenticate_header(self, request):
return 'Basic realm="%s"' % self.www_authenticate_realm
# 3重写认证模块
class TokenAuthentication(BasicAuthentication):
prefix = "token"
def authenticate(self, request):
# 获取头携带的token
auth = request.META.get("HTTP_AUTHORIZATION")
# 如果为空返回None,进行校验
if not auth:
return None
# 获取 token和值
auth_list = auth.split()
# 判断是否正确
if not (len(auth_list) == 2 and auth_list[0].lower() == self.prefix.lower()):
raise AuthenticationFailed("非法用户")
token = auth_list[1]
# 校验算法
user = _get_obj(token)
# 校验失败抛异常,成功返回(user, token)
return (user, token)
def _get_obj(token):
"""
校验算法(认证类)与签发算法配套 拆封token:一段 二段 三段 用户名:b64decode(一段)
用户主键:b64decode(二段) 碰撞解密:md5(用户名+用户主键+服务器秘钥) == 三段
"""
token_list = token.split(".")
# 判断是否为三分
if not len(token_list):
raise AuthenticationFailed("token异常")
# 获取用户名和pk
username = json.loads(base64.b64decode(token_list[0])).get("username")
pk = json.loads(base64.b64decode(token_list[1])).get("pk")
md5_dic = {
"username": username,
"pk": pk,
"key": settings.SECRET_KEY
}
# 判断token是否相等
if token_list[2] != hashlib.md5(json.dumps(md5_dic).encode()).hexdigest():
raise AuthenticationFailed("token内容异常")
user_obj = User.objects.filter(pk=pk, username=username).first()
return user_obj
总结:
-
SessionAuthentication,BasicAuthentication这两个类都是继承BaseAuthentication类
-
SessionAuthentication类解析
- 该类重写了authenticate方法,在方法中获取登录对象,判断用户是否已经登录,如果为空或者不活跃,则返回None,只具有游客的权限
- 如果用户存在则返回用户对象
-
BasicAuthentication类解析
- 从请求头中获取认证的字符串token(auth后台 --->token(前台)),判断auth如果为空返回None,用户则具有游客功能,只要是游客,返回的都是None>
- 如果auth部位空则一定进行校验,校验失败则抛出异常,校验成功返回长度为2的元组且第一位登录的user对象,第二个为None
- authenticate_credentials函数实现对用户是否存在的校验
-
认证模块工作原理
- 继承BaseAuthentication类,重写authenticate方法
- 认证规则:
- 没有携带认证信息,直接返回None => 游客
- 有认证信息,校验失败,抛异常 => 非法用户
- 有认证信息,校验出User对象 => 合法用户
-
认证模块核心实现
# 重写函数 def authenticate(self, request): token = get_token(request) try: user = get_user(token) # 校验算法 except: raise AuthenticationFailed() return (user, token)
4.2解析模块源码分析
# 全局配置 权限
'DEFAULT_PERMISSION_CLASSES': [
'rest_framework.permissions.AllowAny',
],
#
class BasePermission(metaclass=BasePermissionMetaclass):
"""
A base class from which all permission classes should inherit.
"""
def has_permission(self, request, view):
"""
Return `True` if permission is granted, `False` otherwise.
"""
return True
def has_object_permission(self, request, view, obj):
"""
Return `True` if permission is granted, `False` otherwise.
"""
return True
# 重写权限类
class SuperUserPermission(BasePermission):
def has_permission(self, request, view):
return request.user and request.user.is_superuser
总结
- 权限模块都是继承BasePermission类,重写has_permission方法
- 权限规则(has_permission方法实现体):
- 返回True,代表有权限返回False,代表无权限
五、频率组件
5.1 自定义频率类
from rest_framework.throttling import SimpleRateThrottle
"""
1. 定义类继承SimpleRateThrottle,重写get_cache_key方法,设置scope类属性
2. scope就是一个认证字符串,在配置文件中配置scope字符串对象的频率设置
3. get_cache_key的返回值是字符串,该字符串是缓存访问次数key
"""
class ThreeTimeUserThrottle(SimpleRateThrottle):
scope = "three"
# 当前用户缓存的key(key要唯一)
def get_cache_key(self, request, view):
return "throttle:user_%s" % request.user.id
5.2时间频率时间配置
# 频率设置
'DEFAULT_THROTTLE_RATES': {
'three': '3/min',
},
5.3使用
from rest_framework.permissions import IsAdminUser
from utilss.throttling import ThreeTimeUserThrottle
class JwtUserCenterAPIView(APIView):
# 权限认证
permission_classes = [IsAdminUser]
# 频率认证
throttle_classes = [ThreeTimeUserThrottle]
def get(self, request, *args, **kwargs):
user = request.user
serializer_obj = serializers.JwtUserModelSerializer(user)
return APIResponse(data=serializer_obj.data)
总结:
1. 作用:用于一个接口在一定时间范围内可以访问几次,访问接口的次数在设定的时间范围内是否过快,配置访问频率,每次访问都要缓存几次,超次后需要等待的时间
2. 使用:定义类继承SimpleRateThrottle,重写get_cache_key方法,设置scope类属性
3. scope就是一个认证字符串,在配置文件中配置scope字符串对象的频率设置
4. get_cache_key的返回值是字符串,该字符串是缓存访问次数key
5. 在django的settings.py文件中配置频率访问次数, 'three': '3/min',在get_cache_key中使用,还有一个参数可以在重写方法中规定频率次数,rate属性也可以设置访问的频率
六、总结
1.登录接口总结:
- 登录接口:
- 如果是超级管理员登录,返回一个可以校验出超级管理员的token字符串
- 只有其他用户登录就返回一个与登录相关的token字符串,(返回给前台 => 签发token => user_obj -> token_str)
- 注意:登录接口一定要做局部禁用认证和权限的校验
- 校验成功后,就返回一个字符串token,一定不能调用save方法,因为post方法只完成数据库的查操作,所以在自定义校验序列化类是要自定义字段进行校验用户是否存在
- 校验成功后会得到user对象,并且在校验过程,完成对token的签发(生成一个字符串,user_obj -> token_str)
2.用户登录校验序列化类总结:
- 用户登录校验序列化类,校验用户用户是否存在, 并返回token字符
- 校验字段必须是自己定义的,如果使用自带的字段就会报错,因为post是保存数据的,会执行到save方法,所有要自己定义字段通过全局钩子实现校验用户是否登录,并将token添加到名称空间中
- 自定义token字符串
- 签发算法:b64encode(用户名).b64encode(用户主键).md5(用户名+用户主键+服务器秘钥)
- 进入其他的页面会通过token得到user,判断用户是否登录,所以用户名和组件采用可解码加密,最后一个采用不可解码加密
- 使用base64模块,json模块,hashlib模块
3.自定义验证模块总结:
- 自定义验证模块,继承BaseAuthentication类,重写authenticate方法
- 认证规则(authenticate方法实现体):
- 没有携带认证信息,直接返回None => 游客
- 有认证信息,校验失败,抛异常 => 非法用户
- 有认证信息,校验出User对象 => 合法用户
- 获取token进行解密
- 校验算法(认证类)与签发算法配套
- 拆封token:一段 二段 三段
- 用户名:b64decode(一段)
- 用户主键:b64decode(二段)
- 碰撞解密:md5(用户名+用户主键+服务器秘钥) == 三段
- 认证规则(authenticate方法实现体):
4.自定义权限类
- 继承BasePermission类,重写has_permission方法
- 权限规则(has_permission方法实现体):返回True,代表有权限,返回False,代表无权限
5认证模块源码分析
-
SessionAuthentication,BasicAuthentication这两个类都是继承BaseAuthentication类
-
SessionAuthentication类解析
- 该类重写了authenticate方法,在方法中获取登录对象,判断用户是否已经登录,如果为空或者不活跃,则返回None,只具有游客的权限
- 如果用户存在则返回用户对象
-
BasicAuthentication类解析
- 从请求头中获取认证的字符串token(auth后台 --->token(前台)),判断auth如果为空返回None,用户则具有游客功能,只要是游客,返回的都是None>
- 如果auth部位空则一定进行校验,校验失败则抛出异常,校验成功返回长度为2的元组且第一位登录的user对象,第二个为None
- authenticate_credentials函数实现对用户是否存在的校验
-
认证模块工作原理
- 继承BaseAuthentication类,重写authenticate方法
- 认证规则:
- 没有携带认证信息,直接返回None => 游客
- 有认证信息,校验失败,抛异常 => 非法用户
- 有认证信息,校验出User对象 => 合法用户
6.认证模块核心实现
# 重写函数
def authenticate(self, request):
token = get_token(request)
try:
user = get_user(token) # 校验算法
except:
raise AuthenticationFailed()
return (user, token)
7.自定义权限源码分析
- 权限模块都是继承BasePermission类,重写has_permission方法
- 权限规则(has_permission方法实现体):
- 返回True,代表有权限返回False,代表无权限(自定义权限判断)
- AllowAny:不限制,IsAuthenticated:必须是登录用户,IsAdminUser:必须是后台用户,IsAuthenticatedOrReadOnly:读操作无限制,其他操作需要登录
8.自定义频率配置
- 作用:用于一个接口在一定时间范围内可以访问几次,访问接口的次数在设定的时间范围内是否过快,配置访问频率,每次访问都要缓存几次,超次后需要等待的时间
- 使用:定义类继承SimpleRateThrottle,重写get_cache_key方法,设置scope类属性
- scope就是一个认证字符串,在配置文件中配置scope字符串对象的频率设置
- get_cache_key的返回值是字符串,该字符串是缓存访问次数key
- 在django的settings.py文件中配置频率访问次数, 'three': '3/min',在get_cache_key中使用,还有一个参数可以在重写方法中规定频率次数,rate属性也可以设置访问的频率