第六章 restframework——认证组件、权限组件、频率组件
一、认证、权限、频率组件起始位置
二、认证组件
三、权限组件
四、频率组件
一、认证、权限、频率组件起始位置
第二章节分析了APIView,主要核心在于内部dispatch的执行,那认证、权限、频率的发生位于哪里呢?
通过查看initial方法来继续深入
二、认证组件
什么是认证(what):
只有认证通过的用户才能访问指定的url地址,比如:查询课程信息,需要登录之后才能查看,没有登录,就不能查看,这时候需要用到认证组件。
普通版本登录认证(与认证组件无关)

urlpatterns = [ url(r'^admin/', admin.site.urls), # url(r'^authors/$',views.AuthorView.as_view(),name='authors'), # url(r'^authors/(?P<pk>d+)/$',views.AuthorDetailView.as_view(),name='author_detail'), url(r'^authors/$',views.AuthorModelView.as_view({"get":"list","post":"create"}),name='authors'), url(r'^authors/(?P<pk>d+)/$',views.AuthorModelView.as_view({ 'get': 'retrieve', 'put': 'update', 'patch': 'partial_update', 'delete': 'destroy' }),name='author_detail'), url(r'^publishes/$',views.PublishView.as_view(),name='publishes'), url(r'^publishes/(?P<pk>d+)/$',views.PublishDetailView.as_view(),name='publish_detail'), url(r'^books/$',views.BookView.as_view(),name='books'), url(r'^books/(d+)/$',views.BookDetailView.as_view(),name='book_detail'), url(r'^login/$',views.LoginView.as_view(),name='login') ]

class User(models.Model): name = models.CharField(max_length=32) pwd = models.CharField(max_length=32) class Token(models.Model): user = models.OneToOneField("User") token = models.CharField(max_length=128) def __str__(self): return self.token

from .models import User class LoginView(APIView): def post(self,request): name = request.data.get('name') pwd = request.data.get('pwd') user = User.objects.filter(name=name,pwd=pwd).first() res = {"status_code":1000,"msg":None} if user : random_str = get_random_str(user.name) Token.objects.update_or_create(user=user,defaults={"token":random_str}) res["token"] = random_str res["msg"] = "登录成功" else: res["status_code"] = 1001 # 错误状态码 res["msg"] = "用户名或密码错误" import json return Response(json.dumps(res,ensure_ascii=False))
1.局部视图认证
新建认证类(验证通过return两个参数)
from rest_framework import exceptions class TokenAuth(object): def authenticate(self,request): token = request.GET.get("token") token_obj = Token.objects.filter(token=token).first() if not token_obj: raise exceptions.AuthenticationFailed("验证失败!") else: return token_obj.user.name,token_obj.token def authenticate_header(self,request): pass
from rest_framework import exceptions from rest_framework.authentication import BaseAuthentication # class TokenAuth(object): # 可以直接引入BaseAuthentication类,这里面的写法实际就和我们的写法一样 class TokenAuth(BaseAuthentication): def authenticate(self,request): token = request.GET.get("token") token_obj = Token.objects.filter(token=token).first() if not token_obj: raise exceptions.AuthenticationFailed("验证失败!") else: return token_obj.user.name,token_obj.token # def authenticate_header(self,request): # pass
views.py
class BookView(APIView): authentication_classes = [TokenAuth] def get(self,request): print(request.user) print(request.auth) # get请求数据 book_list = Book.objects.all() bs = BookModelSerializers(book_list,many=True) return Response(bs.data) def get_random_str(user): import hashlib import time md = hashlib.md5() md.update(bytes(str(time.time()), encoding='utf-8')) md.update(bytes(user, encoding='utf-8')) return md.hexdigest() from .models import User class LoginView(APIView): def post(self,request): name = request.data.get('name') pwd = request.data.get('pwd') user = User.objects.filter(name=name,pwd=pwd).first() res = {"status_code":1000,"msg":None} if user : random_str = get_random_str(user.name) Token.objects.update_or_create(user=user,defaults={"token":random_str}) res["token"] = random_str res["msg"] = "登录成功" else: res["status_code"] = 1001 # 错误状态码 res["msg"] = "用户名或密码错误" import json return Response(json.dumps(res,ensure_ascii=False))
总结:局部使用,只需要在视图类里加入:
authentication_classes = [TokenAuth, ]
2.全局视图认证
也可以将自己写的认证类放到其他py文件内,并引用进去(绿色部分)
REST_FRAMEWORK={ "DEFAULT_AUTHENTICATION_CLASSES":["app01.service.auth.Authentication",] }
如果想全局生效,局部几个视图不想生效,只需将局部视图等于空列表即可
authentication_classes = []
3.源码分析
点进来查看认证组件,你会发现就短短的一句request.user,这里需要弄明白这个request是谁,并且这个user是什么
回到initial方法(很明显也是通过传参传入的,再往前找)
再查看initialize_request
找到这个Request类
点进去看找到静态方法user(认证组件做的事情就在这里)
所以最核心的认证的源代码就在这里了
通过__init__初始化发现,authenticators默认是None,默认赋值authenicators or ()
其实之前应该应该注意到返回的Request初始化对象已经给予赋值
这就是我们为什么要写authentication_classes的原因,这个列表放的是一个个认证类的实例对象
再往前推,之前request.user的时候
所以我们的TokenAuth类里要写authenicate方法(这才是关键)
假如没有写局部认证组件,那么APIView默认帮我们写了全局组件,只需要去settings内注册即可
查看api_settings,实际就是一个APISettings实例化对象
查看APISettings类
认证类使用顺序:先用视图类中的验证类,再用settings里配置的验证类,最后用默认的验证类
三、权限组件
什么是权限(what):
只用超级用户才能访问指定的数据,普通用户不能访问,所以就要有权限组件对其限制1.局部视图认证。
准备:
models.py
class User(models.Model): name = models.CharField(max_length=32) pwd = models.CharField(max_length=32) type_choices=((1,'普通用户'),(2,'VIP'),(3,'SVIP')) user_type = models.IntegerField(choices=type_choices,default=1)
1.局部视图权限
注意:has_permission(self,request,view)有三个参数,而认证组件只有两个,没有view
新建权限类(可以指定message报错信息)
class SVIPPermission(object): # 这个message是这个类公共的,不是has_permission内的 message = "只有超级用户才能访问" def has_permission(self,request,view): # 拿到当前用户的类型 user_name = request.user user_type = User.objects.filter(name=user_name).first().user_type if user_type == 3: # 通过权限认证 return True else: return False
也可以继承,重写方法,效果相同
from rest_framework.permissions import BasePermission class SVIPPermission(BasePermission): # 这个message是这个类公共的,不是has_permission内的 message = "只有超级用户才能访问" def has_permission(self,request,view): # 拿到当前用户的类型 user_name = request.user user_type = User.objects.filter(name=user_name).first().user_type if user_type == 3: # 通过权限认证 return True else: return False
总结:局部使用只需要在视图类里加入
permission_classes = [SVIPPermission,]
2.全局视图权限(可以将自己写的类放到某个文件夹,并引用进来)
与认证组件相同
REST_FRAMEWORK={ "DEFAULT_AUTHENTICATION_CLASSES":["app01.service.auth.Authentication",], "DEFAULT_PERMISSION_CLASSES":["app01.service.permissions.SVIPPermission",] }
3.源码分析
其大致逻辑与认证组件完全相同
找到dispatch的initial
而我们自己的权限组件也必须根据restframework的一样来写
四、频率组件
什么是频率(what):
为了控制用户对某个url请求的频率,比如,一分钟以内,只能访问三次
1.局部视图频率
新建频率类(访问频率正常return True,访问频率超过次数return False)
from rest_framework.throttling import BaseThrottle class VisitRateThrottle(BaseThrottle): VISIT_RECORD = {} def __init__(self): # 这是用于存储某一IP每次访问登录的时间 self.history = None def allow_request(self, request, view): # (1)取出访问者ip # print(request.META) ip = request.META.get('REMOTE_ADDR') import time ctime = time.time() # (2)判断当前ip不在访问字典里,添加进去,并且直接返回True,表示第一次访问 if ip not in self.VISIT_RECORD: self.VISIT_RECORD[ip] = [ctime, ] return True # 把ip对应的每次登录时间赋值给history self.history = self.VISIT_RECORD.get(ip) print(self.history) # (3)循环判断当前ip的列表,有值,并且当前时间减去列表的最后一个时间大于60s,把这种数据pop掉,这样列表中只有60s以内的访问时间, while self.history and ctime - self.history[-1] > 60: self.history.pop() # (4)判断,当列表小于3,说明一分钟以内访问不足三次,把当前时间插入到列表第一个位置,返回True,顺利通过 # (5)当大于等于3,说明一分钟内访问超过三次,返回False验证失败 if len(self.history) < 3: self.history.insert(0, ctime) return True else: return False def wait(self): import time ctime = time.time() return 60 - (ctime - self.history[-1])
views.py
from rest_framework import viewsets class AuthorModelView(viewsets.ModelViewSet): authentication_classes = [TokenAuth,] permission_classes = [SVIPPermission,] # 限制某个IP每分钟访问次数不超过20次 throttle_classes = [VisitRateThrottle,] queryset = Author.objects.all() serializer_class = AuthorModelSerializers
2.内置频率类及局部使用
新建频率类继承SimpleRateThrottle
from rest_framework.throttling import SimpleRateThrottle class VisitThrottle(SimpleRateThrottle): scope = 'abc' def get_cache_key(self, request, view): return self.get_ident(request)
settings.py
REST_FRAMEWORK = { 'DEFAULT_THROTTLE_RATES':{ 'abc':'3/m' } }
在视图类里使用
throttle_classes = [VisitThrottle,]
错误信息的中文提示:
class Course(APIView): authentication_classes = [TokenAuth, ] permission_classes = [SVIPPermission, ] throttle_classes = [VisitThrottle,] def get(self, request): return HttpResponse('get') def post(self, request): return HttpResponse('post') def throttled(self, request, wait): from rest_framework.exceptions import Throttled class MyThrottled(Throttled): default_detail = '你' extra_detail_singular = '还有 {wait} second.' extra_detail_plural = '出了 {wait} seconds.' raise MyThrottled(wait)
3.全局视图频率(可以将自己写的类放到某个文件夹,并引用进来)
REST_FRAMEWORK={ "DEFAULT_AUTHENTICATION_CLASSES":["app01.service.auth.Authentication",], "DEFAULT_PERMISSION_CLASSES":["app01.service.permissions.SVIPPermission",], "DEFAULT_THROTTLE_CLASSES":["app01.service.throttles.VisitThrottle",] }
4.源码分析
还是类似的方法(APIView)下,读取自己的throttle_classes列表
找到dispatch的initial
找到check_throttles方法