zoukankan      html  css  js  c++  java
  • 六、drf认证权限频率解析

    drf认证权限频率解析

    一、认证组件(authentication)

    1. 使用流程

    1. 写一个类,继承BaseAuthentication,重写authenticate,认证的逻辑写在里面.
        认证通过,返回两个值(user、啥都可以,一般是token),一个值最终给了包装以后的request对象, 视图中就可以通过request.user获取,
        认证失败,抛异常:APIException 或者 AuthenticationFailed
            注意: 本质抛出的异常只是AuthenticationFailed, 而AuthenticationFailed又继承了APIException, 因此源码中只需要捕获父类异常, 在属性查找时必然会找到其子类AuthenticationFailed
            class AuthenticationFailed(APIException):
                status_code = status.HTTP_401_UNAUTHORIZED
                default_detail = _('Incorrect authentication credentials.')
                default_code = 'authentication_failed'            
            
            
        提示: BaseAuthentication可以不继承, BaseAuthentication作用仅仅是对继承它的子类必须要定义authentication方法
            本质采用的就是通过抛出异常的形式去规范认证的方法名写法.
            def authenticate(self, request):
                raise NotImplementedError(".authenticate() must be overridden.")        
    2. 全局使用认证,局部使用认证
    

    2. 认证源码解析

    # 1. APIVIew --> dispatch方法 --> self.initial(request, *args, **kwargs) -->有认证,权限,频率
    
    # 2. 只读认证源码: self.perform_authentication(request)
    
    # 3. self.perform_authentication(request)就一句话:request.user,需要去drf的Request类中找user属性(方法) 
    
    # 4. Request类中的user方法,刚开始来,没有_user,走 self._authenticate()
    
    # 5. 核心,就是Request类的 _authenticate(self):
        def _authenticate(self):
            # 遍历拿到一个个认证器,进行认证
            # self.authenticators配置的一堆认证类产生的认证类对象组成的 list
            # self.authenticators 你在视图类中配置的一个个的认证类:authentication_classes=[认证类1,认证类2],对象的列表
            for authenticator in self.authenticators:
                try:
                    # 认证器(对象)调用认证方法authenticate(认证类对象self, request请求对象)
                    # 返回值:登陆的用户与认证的信息组成的 tuple
                    # 该方法被try包裹,代表该方法会抛异常,抛异常就代表认证失败
                    user_auth_tuple = authenticator.authenticate(self)
                except exceptions.APIException:
                    self._not_authenticated()
                    raise
    
                # 返回值的处理
                if user_auth_tuple is not None:
                    self._authenticator = authenticator
                    # 如何有返回值,就将 登陆用户 与 登陆认证 分别保存到 request.user、request.auth
                    self.user, self.auth = user_auth_tuple
                    return
            # 如果返回值user_auth_tuple为空,代表认证通过,但是没有 登陆用户 与 登陆认证信息,代表游客
            self._not_authenticated()
    

    3. 自定义认证功能实现

    (1) model.py

    from django.db import models
    
    # Create your models here.
    # 待查询数据
    class Book(models.Model):
        id = models.AutoField(primary_key=True)
        title = models.CharField(max_length=32)
        price = models.DecimalField(max_digits=5,decimal_places=2)
        publish = models.CharField(max_length=32)
        author = models.CharField(max_length=32)
    
    # 用户信息
    class Userinfo(models.Model):
        username = models.CharField(max_length=32)
        password = models.CharField(max_length=32)
        user_type = models.IntegerField(choices=(('1','超级用户'),('2','普通用户'),('3','二笔用户'),))
    
    # 用户的token信息 
    class Usertoken(models.Model):
        token = models.CharField(max_length=64)
        user = models.OneToOneField(to='Userinfo')
    

    (2) 序列化组件类:ser.py

    from app01 import models
    from rest_framework import serializers
    class BookSerializer(serializers.ModelSerializer):
        class Meta:
            model = models.Book
            fields = "__all__"
            extra_kwargs = {
                'id':{'read_only':'True'}
            }
    

    (3) 认证组件类:app_auth.py

    from app01 import models
    from rest_framework.exceptions import AuthenticationFailed
    from rest_framework.authentication import BaseAuthentication
    
    class MyMyAuthentication(BaseAuthentication):
        def authenticate(self, request):
            # 获取用户传过来的token串
            token = request.query_params.get('token') # 也可以通过request.GET.get('token')获取
            # 判断token是否存在
            if token: # 存在
                # 通token串取Usertoken模型表中获取该用户信息
                user_token = models.Usertoken.objects.filter(token=token).first() # 这是token模型表
                # 判断是否存在用户
                if user_token:
                    # 存在,就是通过验证
                    return user_token.user,token
                else:
                    raise AuthenticationFailed('认证失败')
            else:
                raise AuthenticationFailed('请求地址中需要携带token参数')
    

    (4) views.py

    from django.shortcuts import render
    
    # Create your views here.
    from rest_framework.viewsets import ModelViewSet
    
    import uuid
    from app01 import models
    
    from app01.ser import BookSerializer
    from rest_framework.views import APIView
    from rest_framework.decorators import action
    from rest_framework.response import Response
    from app01.app_auth import MyMyAuthentication
    
    # 待查询的数据
    class BooksView(ModelViewSet):
        # 此时已经设置好了全局认证,所有此处不再需要额外写authentication_classes了
        # authentication_classes = [MyMyAuthentication]
        queryset = models.Book.objects.all()
        serializer_class = BookSerializer
       
        @action(methods=['GET'], detail=False)
        def get_3(self,request):
            book_queryset = self.get_queryset()[:3]
            book_ser = self.get_serializer(book_queryset,many=True)
            return Response(book_ser.data)
    
        @action(methods=['get',],detail=True)
        def get_1(self,request,pk):
            book_obj = self.get_object()
            book_ser = self.get_serializer(book_obj)
            return Response(book_ser.data)
    
    # 登录视图类
    class LoginView(APIView):
        # 此时已经设置好的全局认证配置,但是登录功能不能添加认证(还没登录,那来的token串)
        # 解决措施:置空authentication_classes列表即可
        authentication_classes = []
        # 用户登录视图函数
        def post(self,request):
            # 获取用户名和密码
            username = request.data.get('username')
            password = request.data.get('password')
            # 根据用户名和密码取模型表中查询该用户的数据
            user_obj = models.Userinfo.objects.filter(username=username, password=password).first()
            # 判断用户是否在数据库中
            if user_obj: # 登陆成功,生成一个随机字符串
                # 生成token字符串
                token = uuid.uuid4()
                # 在Usertoken模型表中创建一条数据,表示当前用户已登录
                # update_or_create(default,**kwargs):根据**kwargs的信息取Usertoken模型表中查询数据,有就将default的数据更新,没有就创建一条数据
                models.Usertoken.objects.update_or_create(defaults={"token":token,},user=user_obj)
                return Response({'status':'1000','msg':'成功','token':token})
            else:
                return Response({'status':'1001','msg':'用户名或密码错误'})
    

    (5) urls.py

    from django.conf.urls import url
    from django.contrib import admin
    from app01 import views
    from rest_framework import routers
    
    
    router = routers.SimpleRouter()
    router.register('books',views.BooksView) # 不要加斜杠
    
    urlpatterns = [
        url(r'^admin/', admin.site.urls),
        # 登录的路由
        url(r'^login/', views.LoginView.as_view()),
    ]
    # 自动生成的路由
    urlpatterns += router.urls
    

    (6) settings.py

    ......
    # 认证的全局配置
    REST_FRAMEWORK={
        "DEFAULT_AUTHENTICATION_CLASSES":["app01.app_auth.MyMyAuthentication",]
    }
    

    4. 内置认证类配置

    # 配置文件中配置
    REST_FRAMEWORK = {
        'DEFAULT_AUTHENTICATION_CLASSES': (
            'rest_framework.authentication.SessionAuthentication',  # session认证
            'rest_framework.authentication.BasicAuthentication',    # 基本认证
        )
    }
    
    # 视图中设置authentication_classess属性来设置
    from rest_framework.authentication import SessionAuthentication, BasicAuthentication
    from rest_framework.views import APIView
    
    class ExampleView(APIView):
        # 类属性
        authentication_classes = [SessionAuthentication, BasicAuthentication]
        ...
    

    二、权限组件(permission)

    1. 自定义权限

    ①.源码分析

    # 位置: APIView的对象方法
    
    # 流程: APIView --> dispatch --> initial --> self.check_permissions(request)
    
    # 源码
    def check_permissions(self, request):
        for permission in self.get_permissions():
                if not permission.has_permission(request, self):
                    self.permission_denied(
                        request, message=getattr(permission, 'message', None)
                    )
    
    # 源码分析步骤:
        self.get_permissions()表示的是权限对象列表.
        permission.has_permission(request, self)表示调用permission对象中的has_permission方法,
        意思就是如果你自定义了权限类, 并且配置了全局或者局部的配置. 就会将你自定义的权限类中的has_permission方法,
        执行完毕该方法中, 通过你的返回值来确定是否抛出异常.
            如果返回结果的布尔值是True:  权限认证就通过了
            如果返回结果的布尔值是False: 就会指定self.permission_denied中的方法抛出异常
                抛出的异常默认有2种情况:
                第一种情况: exceptions.NotAuthenticated()
                第二种情况: exceptions.PermissionDenied(detail=message)
            注意: 需要注意的是, 一旦抛出异常, dispatch中try中之下的代码就不会执行 (之下的代码包含: 频率校验, 反射执行视图类中的方法)
                接着就会直接响应客户端请求, 返回response了. 此时整个这次客户端的交互就结束了.
    

    ②.实现步骤

    # 实现权限控制思路: 关键点就取决于自定义的权限类中定义的has_permission的方法的返回值来控制权限的返回
    
    #  实现步骤:
        第一步: 新建.py文件. 书写权限类
        第二步: 权限类中必须要重写has_permission方法
            注意: 重写的has_permission方法的参数
            request: APIView中dispatch方法中包装过后的request, 不是原生的了.
            self: 继承了APIView的自定义类实例化的对象
        第三步: 因为权限前面就是认证, 而认证如果通过了request.user是可以获取到认证通过的用户对象的
            注意: 有二种情况下request.user获取的是匿名用户.
            第一种: 没有自定义书写认证类
            第二种: 最后一个认证类中定义的认证方法的返回值结果是None.
        第四步: 全局 或者 局部配置
            查找: APIView -> api_settings -> DEFAULTS ->
            全局:
                'DEFAULT_PERMISSION_CLASSES': [
                        'app01.app_permission.UserPermission',  # 配置你自定义认证类的路径
                    ],
            局部: permission_classes = [UserPermission, ]
            禁用: permission_classes = []
    

    ③.编写权限类

    from rest_framework.permissions import BasePermission
    
    class USerPermission(BasePermission):
        def has_permission(self, request, view):
            # self:权限对象
            # request:请求对象
            # view:视图类对象
            
            """
            可能的错误一:
                AttributeError at /app01/superuser/
                'AnonymousUser' object has no attribute 'get_user_type_display'
            可能的错误二:
                AttributeError: 'AnonymousUser' object has no attribute 'user_type'
            权限认证失败返回False时:  You do not have permission to perform this action
                APIView -> dispatch  -> initial -> check_permissions -> permission_denied -> raise exceptions.PermissionDenied(detail=message)
            """
            """
            try:
                print('request.user.get_user_type_display():', request.user.get_user_type_display())
                return True if request.user.user_type == 1 else False
            except AttributeError:
                return False
            """
            # 现在这你可能出现的异常交给exception_handler捕获
            user = request.user  # 当前登录用户
            # 如果该字段用了choice,通过get_字段名_display()就能取出choice后面的中文
            print(user.get_user_type_display())
            if user.user_type == 1:
                return True
            else:
                return False
    

    ④.全局配置

    REST_FRAMEWORK={
        'DEFAULT_PERMISSION_CLASSES': [
            'app01.app_permission.UserPermission',  # 配置你自定义认证类的路径
        ],
    }
    

    ⑤.局部配置

    # 局部使用只需要在视图类里加入
    permission_classes = [UserPermission, ]
    

    ⑥.总结

    1. 新建.py文件书写权限控制类, 继承 BasePermission
        from rest_framework.permissions import BasePermission
    2. 新建的类中重写 has_permission 方法
        三个参数: self, request, view
        self: 自定义认证类的丢下
        request: APIView中的dispatch中封装过后的request对象
        view: 自定义视图类实例化过后的对象
    3. 根据has_permission的返回布尔值是否是True 或者 False进行判断
        True:  权限通过
        False: 权限不通过, 抛出异常(抛出2种不同类型异常, 根据实际情况分析)
    4. 全局配置  或者  局部配置
    

    2. 内置权限类

    ①.内置权限类

    from rest_framework.permissions import AllowAny,IsAuthenticated,IsAdminUser,IsAuthenticatedOrReadOnly
     # AllowAny 允许所有用户
     # IsAuthenticated 仅通过认证的用户
     # IsAdminUser 仅管理员用户
     # IsAuthenticatedOrReadOnly 已经登陆认证的用户可以对数据进行增删改操作,没有登陆认证的只能查看数据。
    

    ②.全局配置和局部配置

    # 全局配置
        REST_FRAMEWORK = {
            'DEFAULT_PERMISSION_CLASSES': [
                    rest_framework.permissions.AllowAny',  # 内置
                ],
        }
    
    # 局部配置
        '''
        # IsAdminUser
            return bool(request.user and request.user.is_staff)
    
        # SessionAuthentication
            user = getattr(request._request, 'user', None)
                if not user or not user.is_active:
                    return None
            SessionAuthentication源码控制的是user 和 user.is_active
                控制情况: 匿名用户user.is_action布尔值是False
                1. 原生的request对象中时候有user属性
                2. user中is_active的布尔值为True.
        '''
        from rest_framework.authentication import SessionAuthentication
        from rest_framework.permissions import IsAdminUser
        authentication_classes = [SessionAuthentication]
        permission_classes = [IsAdminUser]
    

    ③.代码示例

    # 内置局部认证+内置局部权限控制: is_active控制认证 is_staff控制权限
    from rest_framework.response import Response
    from rest_framework.views import APIView
    from rest_framework.authentication import SessionAuthentication
    from rest_framework.permissions import IsAdminUser
    
    class TextView2(APIView):
        """
        SessionAuthentication认证的判断依据: is_active
            if not user or not user.is_active:
                return None
            失败返回:
                Authentication credentials were not provided.
    
        IsAdminUser权限的判断依据:  is_staff
            return bool(request.user and request.user.is_staff)
            失败返回: You do not have permission to perform this action.
        """
        authentication_classes = [SessionAuthentication]
        permission_classes = [IsAdminUser]
    
        def get(self, request):
            return Response('这是活跃的工作')
    

    ④.总结

    内置认证: 
        from rest_framework.authentication import SessionAuthentication
        控制的是is_active
    内置权限:  
        from rest_framework.permissions import isAdminUser
        控制的是is_staff
    

    三、频率组件(throttle)

    1. 自定义频率类

    ①.编写频率类

    # 自定义的逻辑
    #(1)取出访问者ip
    #(2)判断当前ip不在访问字典里,添加进去,并且直接返回True,表示第一次访问,在字典里,继续往下走
    #(3)循环判断当前ip的列表,有值,并且当前时间减去列表的最后一个时间大于60s,把这种数据pop掉,这样列表中只有60s以内的访问时间,
    #(4)判断,当列表小于3,说明一分钟以内访问不足三次,把当前时间插入到列表第一个位置,返回True,顺利通过
    #(5)当大于等于3,说明一分钟内访问超过三次,返回False验证失败
    class MyThrottles():
        VISIT_RECORD = {}
        def __init__(self):
            self.history=None
        def allow_request(self,request, view):
            #(1)取出访问者ip
            # print(request.META)
            ip=request.META.get('REMOTE_ADDR')
            import time
            ctime=time.time()
            # (2)判断当前ip不在访问字典里,添加进去,并且直接返回True,表示第一次访问
            if ip not in self.VISIT_RECORD:
                self.VISIT_RECORD[ip]=[ctime,]
                return True
            self.history=self.VISIT_RECORD.get(ip)
            # (3)循环判断当前ip的列表,有值,并且当前时间减去列表的最后一个时间大于60s,把这种数据pop掉,这样列表中只有60s以内的访问时间,
            while self.history and ctime-self.history[-1]>60:
                self.history.pop()
            # (4)判断,当列表小于3,说明一分钟以内访问不足三次,把当前时间插入到列表第一个位置,返回True,顺利通过
            # (5)当大于等于3,说明一分钟内访问超过三次,返回False验证失败
            if len(self.history)<3:
                self.history.insert(0,ctime)
                return True
            else:
                return False
        def wait(self):
            import time
            ctime=time.time()
            return 60-(ctime-self.history[-1])
    

    ②.全局配置

    REST_FRAMEWORK = {
        'DEFAULT_THROTTLE_CLASSES':['app01.utils.MyThrottles',],
    }
    

    ③.局部配置

    #在视图类里使用
    throttle_classes = [MyThrottles,]
    

    2. 根据用户ip限制

    # 写一个类,继承自SimpleRateThrottle,(根据ip限制)
    from rest_framework.throttling import SimpleRateThrottle
    class VisitThrottle(SimpleRateThrottle):
        scope = 'luffy' # 必须指定,需要在settings.py中配置限制,默认是None
        def get_cache_key(self, request, view):
            return request.META.get("REMOTE_ADDR")  # 返回什么就是通过获取请求用户ip来进行限制的逻辑
        
    #在setting里配置:(一分钟访问三次)
    REST_FRAMEWORK = {
        'DEFAULT_THROTTLE_RATES':{
            'luffy':'3/m'  # key要跟类中的scop对应
        }
    }
    

    3. 限制用户(匿名、已登录)

    # 全局配置:
        REST_FRAMEWORK = {
            'DEFAULT_THROTTLE_CLASSES': [
                'rest_framework.throttling.AnonRateThrottle',
                'rest_framework.throttling.UserRateThrottle',
            ],
            'DEFAULT_THROTTLE_RATES': {
                'user': '10/m',            # 对登录用户的频率控制:   每分钟10次
                'anon': '5/m',             # 对未登录用户的频率控制: 每分钟5次
                # 'anon': '5/mqweasdzxc',  # 提示: 这样也识别, 因为默认只会取5/m
            },
        }
    
    # 局部配置:
        # 控制未登录用户的频率访问
            from rest_framework.throttling import AnonRateThrottle
            throttle_classes = [AnonRateThrottle]
    
        # 控制登录用户的频率访问:
            from rest_framework.throttling import UserRateThrottle
            throttle_classes = [UserRateThrottle]
    

    视图中代码显示

    from rest_framework.response import Response
    from rest_framework.views import APIView
    
    
    # 禁用认证+禁用权限控制+禁用频率校验: 未登录用户访问评频率不做限制
    class TextView3(APIView):
        authentication_classes = []
        permission_classes = []
        throttle_classes = []
    
        def get(self, request, *args, **kwargs):
    
            return Response('我是未登录用户 TextView3')
    
    
    # 禁用认证+禁用权限控制+匿名用户局部频率校验: 控制未登录用户的访问频率 -> 每分钟访问5次
    from rest_framework.throttling import AnonRateThrottle
    
    
    class TextView4(APIView):
        """
        当访问次数超出时抛出异常:
        Request was throttled. Expected available in 54 seconds.
        """
        authentication_classes = []
        permission_classes = []
        throttle_classes = [AnonRateThrottle]
    
        def get(self, request, *args, **kwargs):
            return Response('我是未登录用户. TextView4')
    
    
    # 局部内置认证+禁用权限控制+内置用户全局频率校验: 控制登录用户的访问频率 -> 每分钟访问10次
    from rest_framework.authentication import SessionAuthentication
    
    
    class TextView5(APIView):
        """
        注意: 这里是内置认证以后的频率校验, 如果使用的自定义的认证, request.user是没有is_authenticated属性的
        抛出异常: 'User' object has no attribute 'is_authenticated'
    
        """
        authentication_classes = [SessionAuthentication]  # 认证依据is_active
        permission_classes = []
    
        def get(self, request, *args, **kwargs):
            return Response('我是未登录用户. TextView5')
    

    4. 拓展:错误信息显示中文

    class Course(APIView):
        authentication_classes = [TokenAuth, ]
        permission_classes = [UserPermission, ]
        throttle_classes = [MyThrottles,]
    
        def get(self, request):
            return HttpResponse('get')
    
        def post(self, request):
            return HttpResponse('post')
        def throttled(self, request, wait):
            from rest_framework.exceptions import Throttled
            class MyThrottled(Throttled):
                default_detail = '傻逼啊'
                extra_detail_singular = '还有 {wait} second.'
                extra_detail_plural = '出了 {wait} seconds.'
            raise MyThrottled(wait)
    
  • 相关阅读:
    查找链表中是否有环linked-list-cycle
    reverse-integer
    AVL树之 Java的实现
    single-number
    Best Time to Buy and Sell Stock II
    maximun-depth-of-binary-tree
    minimun-depth-of-binary-tree
    剑指offer--矩阵中的路径
    grep的几个参数
    fsck和badlocks
  • 原文地址:https://www.cnblogs.com/borntodie/p/14330799.html
Copyright © 2011-2022 走看看