zoukankan      html  css  js  c++  java
  • 6 drf-认证权限解析频率

    一. 认证组件

    1. 流程

    1. 写一个类,继承BaseAuthentication,重写authenticate,认证的逻辑写在里面.
        认证通过,返回两个值,一个值最终给了包装以后的request对象, 视图中就可以通过request.user获取,
        认证失败,抛异常:APIException 或者 AuthenticationFailed
            注意: 本质抛出的异常只是AuthenticationFailed, 而AuthenticationFailed又继承了APIException, 因此源码中只需要捕获父类异常, 在属性查找时必然会找到其子类AuthenticationFailed
            class AuthenticationFailed(APIException):
                status_code = status.HTTP_401_UNAUTHORIZED
                default_detail = _('Incorrect authentication credentials.')
                default_code = 'authentication_failed'            
            
            
        提示: BaseAuthentication可以不继承, BaseAuthentication作用仅仅是对继承它的子类必须要定义authentication方法
            本质采用的就是通过抛出异常的形式去规范认证的方法名写法.
            def authenticate(self, request):
                raise NotImplementedError(".authenticate() must be overridden.")        
    2. 全局使用认证,局部使用认证
    

    2. 认证的源码分析

    # 1. APIVIew --> dispatch方法 --> self.initial(request, *args, **kwargs) -->有认证,权限,频率
    
    # 2. 只读认证源码: self.perform_authentication(request)
    
    # 3. self.perform_authentication(request)就一句话:request.user,需要去drf的Request类中找user属性(方法) 
    
    # 4. Request类中的user方法,刚开始来,没有_user,走 self._authenticate()
    
    # 5. 核心,就是Request类的 _authenticate(self):
        def _authenticate(self):
            # 遍历拿到一个个认证器,进行认证
            # self.authenticators配置的一堆认证类产生的认证类对象组成的 list
            # self.authenticators 你在视图类中配置的一个个的认证类:authentication_classes=[认证类1,认证类2],对象的列表
            for authenticator in self.authenticators:
                try:
                    # 认证器(对象)调用认证方法authenticate(认证类对象self, request请求对象)
                    # 返回值:登陆的用户与认证的信息组成的 tuple
                    # 该方法被try包裹,代表该方法会抛异常,抛异常就代表认证失败
                    user_auth_tuple = authenticator.authenticate(self)
                except exceptions.APIException:
                    self._not_authenticated()
                    raise
    
                # 返回值的处理
                if user_auth_tuple is not None:
                    self._authenticator = authenticator
                    # 如何有返回值,就将 登陆用户 与 登陆认证 分别保存到 request.user、request.auth
                    self.user, self.auth = user_auth_tuple
                    return
            # 如果返回值user_auth_tuple为空,代表认证通过,但是没有 登陆用户 与 登陆认证信息,代表游客
            self._not_authenticated()
    

    3. 自定义认证功能实现

    1) models.py

    from django.db import models
    
    
    # Create your models here.
    class User(models.Model):
        username = models.CharField(max_length=64)
        password = models.CharField(max_length=64)
    
    
    class DjangoToken(models.Model):
        user = models.OneToOneField(to='User')
        token = models.CharField(max_length=255)
    
        class Meta:
            db_table = 'django_token'
    

    3) 新建序列化.py文件: serializer.py

    from rest_framework import serializers
    
    from app01.models import Book
    
    
    class BookModelSerializer(serializers.ModelSerializer):
        class Meta:
            model = Book
            fields = '__all__'
    

    4) 新建认证.py文件: app_auth.py

    from rest_framework.exceptions import APIException
    from .models import DjangoToken
    from rest_framework.authentication import BaseAuthentication
    
    
    class TokenAuthentication(BaseAuthentication):
        # 注意1: 这里的request是APIView中包装以后的request对象.
        # 注意2: 这里面的异常不会被exception_handler捕获, 因为这里是在Request中完成的, perform_authentication进行是request.user操作
        def authenticate(self, request):   
            token = request.META.get("HTTP_TOKEN")
            print('token:', token)
            if token:
                token_obj = Token.objects.filter(token=token).first()
                if token_obj:
                    '''
                    # 如果返回值不是元组或者直接抛出的异常那么response的返回结果是: Authentication credentials were not provided
                    APIView -> dispatch  -> initial -> check_permissions -> permission_denied -> raise exceptions.NotAuthenticated()
                    
                    {
                        "detail": "Authentication credentials were not provided."
                    }
                    '''
                    # return
                    return token_obj.user, token
                else:
                    raise AuthenticationFailed('校验失败, token值不匹配!')
            raise AuthenticationFailed('请求头中需要携带token!')
    

    5) views.py

    from rest_framework.views import APIView
    from rest_framework.response import Response
    from rest_framework.viewsets import ModelViewSet
    
    from rest_framework.decorators import action
    
    from .models import User
    from .models import DjangoToken
    from app01.models import Book
    from app01.serializer import BookModelSerializer
    
    
    class SelfResponse(Response):
        """继承Response封装的自定义类"""
    
        def __init__(self, status=1000, messages='登录成功',
                     error=None, results=None, headers=None,
                     *args, **kwargs):
            data = {
                'status': status,
                'messages': messages,
            }
            if results:
                data['results'] = results
            elif error:
                data['error'] = error
            super().__init__(data=data, headers=headers, *args, **kwargs)
    
    
    class LoginView(APIView):
        """
        登录功能
        """
        authentication_classes = []
    
        def post(self, request):
            """
            注意: request.META的格式
            请求头中指定的格式是: token: 随机字符串
            获取时:
                print(request.META)  
                'HTTP_TOKEN': '6da7c382-c1ec-11ea-8971-48ba4e4e6384',
            """
            username = request.data.get('username')
            password = request.data.get('password')
            user_obj = User.objects.filter(username=username).first()
            # 判断用户账号
            if user_obj:
                # 判断用户密码
                if user_obj.password == password:
                    import uuid
                    token_uuid = uuid.uuid1()
                    # 利用update_or_create有就更新数据, 无就新增数据
                    DjangoToken.objects.update_or_create(defaults={'token': token_uuid}, user=user_obj)
                    obj = SelfResponse(headers={'token': token_uuid})
                else:
                    obj = SelfResponse(2000, '登陆失败, 用户密码错误', error={'username': username, 'password': password})
            else:
                obj = SelfResponse(3000, '登陆失败, 用户名不存在', error={'username': username, 'password': password})
            return obj
    
    
    class BookModelViewSet(ModelViewSet):
        queryset = Book.objects.all()  # 具体这里为什么还需要.all()老刘也没说
        serializer_class = BookModelSerializer
        lookup_url_kwarg = 'num'       # 修改默认的又名分组,  让视图中关键字参数必须pk接收
    
        # 使用detail=True为路由中新增一条可获取一下自定义方法的路由, 并且格式是^books/(?P<num>[^/.]+)/get/$ [name='book-get']. 
        @action(detail=True)
        def get(self, request, num):
            book_queryset = self.get_queryset()[:int(num)]  # 这里使用num可以动态获取想要的数据条数
            serializer = self.serializer_class(instance=book_queryset, many=True)
            return SelfResponse(results=serializer.data)
    

    6) urls.py

    Copyfrom django.conf.urls import url
    
    from rest_framework import routers
    
    from .views import LoginView
    from .views import BookModelViewSet
    
    router = routers.SimpleRouter()
    
    router.register('books', BookModelViewSet)
    
    urlpatterns = [
        url(r'^login/', LoginView.as_view())
    ]
    urlpatterns += router.urls
    print('router.urls:', router.urls)
    

    4. 内置认证类配置

    # 配置文件中配置
    REST_FRAMEWORK = {
        'DEFAULT_AUTHENTICATION_CLASSES': (
            'rest_framework.authentication.SessionAuthentication',  # session认证
            'rest_framework.authentication.BasicAuthentication',    # 基本认证
        )
    }
    
    
    # 视图中设置authentication_classess属性来设置
    from rest_framework.authentication import SessionAuthentication, BasicAuthentication
    from rest_framework.views import APIView
    
    class ExampleView(APIView):
        # 类属性
        authentication_classes = [SessionAuthentication, BasicAuthentication]
        ...
    

    二. 权限组件

    1. 自定义权限

    1) 源码分析

    # 位置: APIView的对象方法
    
    # 流程: APIView --> dispatch --> initial --> self.check_permissions(request)
    
    # 源码
    def check_permissions(self, request):
        for permission in self.get_permissions():
                if not permission.has_permission(request, self):
                    self.permission_denied(
                        request, message=getattr(permission, 'message', None)
                    )
    
    # 源码分析步骤:
        self.get_permissions()表示的是权限对象列表.
        permission.has_permission(request, self)表示调用permission对象中的has_permission方法,
        意思就是如果你自定义了权限类, 并且配置了全局或者局部的配置. 就会将你自定义的权限类中的has_permission方法,
        执行完毕该方法中, 通过你的返回值来确定是否抛出异常.
            如果返回结果的布尔值是True:  权限认证就通过了
            如果返回结果的布尔值是False: 就会指定self.permission_denied中的方法抛出异常
                抛出的异常默认有2种情况:
                第一种情况: exceptions.NotAuthenticated()
                第二种情况: exceptions.PermissionDenied(detail=message)
            注意: 需要注意的是, 一旦抛出异常, dispatch中try中之下的代码就不会执行 (之下的代码包含: 频率校验, 反射执行视图类中的方法)
                接着就会直接响应客户端请求, 返回response了. 此时整个这次客户端的交互就结束了.
    

    2) 实现步骤

    # 实现权限控制思路: 关键点就取决于自定义的权限类中定义的has_permission的方法的返回值来控制权限的返回
    
    #  实现步骤:
        第一步: 新建.py文件. 书写权限类
        第二步: 权限类中必须要重写has_permission方法
            注意: 重写的has_permission方法的参数
            request: APIView中dispatch方法中包装过后的request, 不是原生的了.
            self: 继承了APIView的自定义类实例化的对象
        第三步: 因为权限前面就是认证, 而认证如果通过了request.user是可以获取到认证通过的用户对象的
            注意: 有二种情况下request.user获取的是匿名用户.
            第一种: 没有自定义书写认证类
            第二种: 最后一个认证类中定义的认证方法的返回值结果是None.
        第四步: 全局 或者 局部配置
            查找: APIView -> api_settings -> DEFAULTS ->
            全局:
                'DEFAULT_PERMISSION_CLASSES': [
                        'app01.app_permission.UserPermission',  # 配置你自定义认证类的路径
                    ],
            局部: permission_classes = [UserPermission, ]
            禁用: permission_classes = []
    

    3) 编写权限类

    from rest_framework.permissions import BasePermission
    
    class USerPermission(BasePermission):
        def has_permission(self, request, view):
            """
            可能的错误一:
                AttributeError at /app01/superuser/
                'AnonymousUser' object has no attribute 'get_user_type_display'
            可能的错误二:
                AttributeError: 'AnonymousUser' object has no attribute 'user_type'
            权限认证失败返回False时:  You do not have permission to perform this action
                APIView -> dispatch  -> initial -> check_permissions -> permission_denied -> raise exceptions.PermissionDenied(detail=message)
            """
            """
            try:
                print('request.user.get_user_type_display():', request.user.get_user_type_display())
                return True if request.user.user_type == 1 else False
            except AttributeError:
                return False
            """
            # 现在这你可能出现的异常交给exception_handler捕获
            return True if request.user.user_type == 1 else False
    

    4) 全局配置

    REST_FRAMEWORK={
        'DEFAULT_PERMISSION_CLASSES': [
            'app01.app_permission.UserPermission',  # 配置你自定义认证类的路径
        ],
    }
    

    5) 局部配置

    # 局部使用只需要在视图类里加入
    permission_classes = [UserPermission, ]
    

    6) 总结

    1. 新建.py文件书写权限控制类, 继承 BasePermission
        from rest_framework.permissions import BasePermission
    2. 新建的类中重写 has_permission 方法
        三个参数: self, request, view
        self: 自定义认证类的丢下
        request: APIView中的dispatch中封装过后的request对象
        view: 自定义视图类实例化过后的对象
    3. 根据has_permission的返回布尔值是否是True 或者 False进行判断
        True:  权限通过
        False: 权限不通过, 抛出异常(抛出2种不同类型异常, 根据实际情况分析)
    4. 全局配置  或者  局部配置
    

    2. 内置权限类

    1) 内置权限类

    from rest_framework.permissions import AllowAny,IsAuthenticated,IsAdminUser,IsAuthenticatedOrReadOnly
    - AllowAny 允许所有用户
    - IsAuthenticated 仅通过认证的用户
    - IsAdminUser 仅管理员用户
    - IsAuthenticatedOrReadOnly 已经登陆认证的用户可以对数据进行增删改操作,没有登陆认证的只能查看数据。
    

    2) 全局配置 和 局部配置

    # 全局配置
        REST_FRAMEWORK = {
            'DEFAULT_PERMISSION_CLASSES': [
                    rest_framework.permissions.AllowAny',  # 内置
                ],
        }
    
    # 局部配置
        '''
        # IsAdminUser
            return bool(request.user and request.user.is_staff)
    
        # SessionAuthentication
            user = getattr(request._request, 'user', None)
                if not user or not user.is_active:
                    return None
            SessionAuthentication源码控制的是user 和 user.is_active
                控制情况: 匿名用户user.is_action布尔值是False
                1. 原生的request对象中时候有user属性
                2. user中is_active的布尔值为True.
        '''
        from rest_framework.authentication import SessionAuthentication
        from rest_framework.permissions import IsAdminUser
        authentication_classes = [SessionAuthentication]
        permission_classes = [IsAdminUser]
    

    3) 代码实例

    # 内置局部认证+内置局部权限控制: is_active控制认证 is_staff控制权限
    from rest_framework.response import Response
    from rest_framework.views import APIView
    from rest_framework.authentication import SessionAuthentication
    from rest_framework.permissions import IsAdminUser
    
    
    class TextView2(APIView):
        """
        SessionAuthentication认证的判断依据: is_active
            if not user or not user.is_active:
                return None
            失败返回:
                Authentication credentials were not provided.
    
        IsAdminUser权限的判断依据:  is_staff
            return bool(request.user and request.user.is_staff)
            失败返回: You do not have permission to perform this action.
        """
        authentication_classes = [SessionAuthentication]
        permission_classes = [IsAdminUser]
    
        def get(self, request):
            return Response('这是活跃的工作
    

    4) 总结

    内置认证: 
        from rest_framework.authentication import SessionAuthentication
        控制的是is_active
    内置权限:  
        from rest_framework.permissions import isAdminUser
        控制的是is_staff
    

    三. 频率组件

    1. 自定义频率类

    1) 编写频率类

    # 自定义的逻辑
    #(1)取出访问者ip
    #(2)判断当前ip不在访问字典里,添加进去,并且直接返回True,表示第一次访问,在字典里,继续往下走
    #(3)循环判断当前ip的列表,有值,并且当前时间减去列表的最后一个时间大于60s,把这种数据pop掉,这样列表中只有60s以内的访问时间,
    #(4)判断,当列表小于3,说明一分钟以内访问不足三次,把当前时间插入到列表第一个位置,返回True,顺利通过
    #(5)当大于等于3,说明一分钟内访问超过三次,返回False验证失败
    class MyThrottles():
        VISIT_RECORD = {}
        def __init__(self):
            self.history=None
        def allow_request(self,request, view):
            #(1)取出访问者ip
            # print(request.META)
            ip=request.META.get('REMOTE_ADDR')
            import time
            ctime=time.time()
            # (2)判断当前ip不在访问字典里,添加进去,并且直接返回True,表示第一次访问
            if ip not in self.VISIT_RECORD:
                self.VISIT_RECORD[ip]=[ctime,]
                return True
            self.history=self.VISIT_RECORD.get(ip)
            # (3)循环判断当前ip的列表,有值,并且当前时间减去列表的最后一个时间大于60s,把这种数据pop掉,这样列表中只有60s以内的访问时间,
            while self.history and ctime-self.history[-1]>60:
                self.history.pop()
            # (4)判断,当列表小于3,说明一分钟以内访问不足三次,把当前时间插入到列表第一个位置,返回True,顺利通过
            # (5)当大于等于3,说明一分钟内访问超过三次,返回False验证失败
            if len(self.history)<3:
                self.history.insert(0,ctime)
                return True
            else:
                return False
        def wait(self):
            import time
            ctime=time.time()
            return 60-(ctime-self.history[-1])
    

    2) 全局配置

    REST_FRAMEWORK = {
        'DEFAULT_THROTTLE_CLASSES':['app01.utils.MyThrottles',],
    }
    

    3) 局部配置

    #在视图类里使用
    throttle_classes = [MyThrottles,]
    

    2. 根据用户ip限制

    # 写一个类,继承自SimpleRateThrottle,(根据ip限制)
    from rest_framework.throttling import SimpleRateThrottle
    class VisitThrottle(SimpleRateThrottle):
        scope = 'luffy'
        def get_cache_key(self, request, view):
            return self.get_ident(request)  # get_ident中就是通过获取请求用户ip来进行限制的逻辑
        
    #在setting里配置:(一分钟访问三次)
    REST_FRAMEWORK = {
        'DEFAULT_THROTTLE_RATES':{
            'luffy':'3/m'  # key要跟类中的scop对应
        }
    }
    

    3. 限制匿名用户每分钟访问5次, 限制登陆用户每分钟访问10次

    # 全局配置:
        REST_FRAMEWORK = {
            'DEFAULT_THROTTLE_CLASSES': [
                'rest_framework.throttling.AnonRateThrottle',
                'rest_framework.throttling.UserRateThrottle',
            ],
            'DEFAULT_THROTTLE_RATES': {
                'user': '10/m',            # 对登录用户的频率控制:   每分钟10次
                'anon': '5/m',             # 对未登录用户的频率控制: 每分钟5次
                # 'anon': '5/mqweasdzxc',  # 提示: 这样也识别, 因为默认只会取5/m
            },
        }
    
    # 局部配置:
        # 控制未登录用户的频率访问
            from rest_framework.throttling import AnonRateThrottle
            throttle_classes = [AnonRateThrottle]
    
        # 控制登录用户的频率访问:
            from rest_framework.throttling import UserRateThrottle
            throttle_classes = [UserRateThrottle]
    

    视图中代码展示

    from rest_framework.response import Response
    from rest_framework.views import APIView
    
    
    # 禁用认证+禁用权限控制+禁用频率校验: 未登录用户访问评频率不做限制
    class TextView3(APIView):
        authentication_classes = []
        permission_classes = []
        throttle_classes = []
    
        def get(self, request, *args, **kwargs):
    
            return Response('我是未登录用户 TextView3')
    
    
    # 禁用认证+禁用权限控制+匿名用户局部频率校验: 控制未登录用户的访问频率 -> 每分钟访问5次
    from rest_framework.throttling import AnonRateThrottle
    
    
    class TextView4(APIView):
        """
        当访问次数超出时抛出异常:
        Request was throttled. Expected available in 54 seconds.
        """
        authentication_classes = []
        permission_classes = []
        throttle_classes = [AnonRateThrottle]
    
        def get(self, request, *args, **kwargs):
            return Response('我是未登录用户. TextView4')
    
    
    # 局部内置认证+禁用权限控制+内置用户全局频率校验: 控制登录用户的访问频率 -> 每分钟访问10次
    from rest_framework.authentication import SessionAuthentication
    
    
    class TextView5(APIView):
        """
        注意: 这里是内置认证以后的频率校验, 如果使用的自定义的认证, request.user是没有is_authenticated属性的
        抛出异常: 'User' object has no attribute 'is_authenticated'
    
        """
        authentication_classes = [SessionAuthentication]  # 认证依据is_active
        permission_classes = []
    
        def get(self, request, *args, **kwargs):
            return Response('我是未登录用户. TextView5')
    

    4. 拓展: 错误信息中文显示

    class Course(APIView):
        authentication_classes = [TokenAuth, ]
        permission_classes = [UserPermission, ]
        throttle_classes = [MyThrottles,]
    
        def get(self, request):
            return HttpResponse('get')
    
        def post(self, request):
            return HttpResponse('post')
        def throttled(self, request, wait):
            from rest_framework.exceptions import Throttled
            class MyThrottled(Throttled):
                default_detail = '傻逼啊'
                extra_detail_singular = '还有 {wait} second.'
                extra_detail_plural = '出了 {wait} seconds.'
            raise MyThrottled(wait)
    
  • 相关阅读:
    Nim or not Nim? hdu3032 SG值打表找规律
    Maximum 贪心
    The Super Powers
    LCM Cardinality 暴力
    Longge's problem poj2480 欧拉函数,gcd
    GCD hdu2588
    Perfect Pth Powers poj1730
    6656 Watching the Kangaroo
    yield 小用
    wpf DropDownButton 源码
  • 原文地址:https://www.cnblogs.com/wait59/p/13976480.html
Copyright © 2011-2022 走看看