zoukankan      html  css  js  c++  java
  • rest-framework框架——认证、权限、频率组件

    一、rest-framework登录验证

      网站登录之后就有个人中心可以对个人信息进行修改,但是在每次向服务器发请求时,由于http是无状态的,导致每次都是新的请求。

      服务端则需要对每次请求都进行认证,确认用户是否登录及登录用户是谁。

      因此需要将认证逻辑抽离出来,之前的做法主要是加装饰器或中间件。在前后端不分离的项目中通常是使用cookie和session,前后端分离的项目通常是使用token。

    1、models.py添加User和Token模型

    class User(models.Model):
        name = models.CharField(max_length=32)
        pwd = models.CharField(max_length=32)
    
    
    class Token(models.Model):
        user = models.OneToOneField("User", on_delete=models.CASCADE)
        token = models.CharField(max_length=128)
    
        def __str__(self):
            return self.token
    

      添加后执行数据库迁移,添加app01_user和app01_token表。

    2、给login配置url

    urlpatterns = [
        ...
        re_path(r'^login/$', views.LoginView.as_view(), name="login"),
    ]
    

    3、配置视图

    def get_random_str(user):
        """ 生成随机字符串 """
        import hashlib, time
        ctime = str(time.time())
        md5 = hashlib.md5(bytes(user, encoding='utf-8'))
        md5.update(bytes(ctime, encoding="utf-8"))
        return md5.hexdigest()
    
    
    class LoginView(APIView):
        def post(self, request):
            # 验证逻辑:获取用户名密码与数据库比对
            name = request.data.get("name")
            pwd = request.data.get("pwd")
            user = User.objects.filter(name=name, pwd=pwd).first()
            res = {"state_code": 1000, "msg": None}   # 成功或失败需要返回的字典标识这次的状态
            if user:
                # 通过校验  拿到随机字符串交给这个人
                random_str = get_random_str(user.name)   # 获取随机字符串
                token = Token.objects.update_or_create(user=user, defaults={"token": random_str})
                res["token"] = str(token)   # json不能序列化对象,因此转为字符串
            else:
                # 校验失败
                res["state_code"]=1001   # 错误状态码
                res["msg"] = "用户名或密码错误"
    
            return Response(json.dumps(res))
    

      注意:

    (1)登录验证逻辑

    class LoginView(APIView):
        def post(self, request):
            # 验证逻辑:获取用户名密码与数据库比对
            name = request.data.get("name")
            pwd = request.data.get("pwd")
            user = User.objects.filter(name=name, pwd=pwd).first()
            if user:
                # 通过校验  拿到随机字符串交给这个人
                pass
            else:
                # 校验失败
                pass
    
            return Response("login....")
    

    (2)生成随机字符串

    def get_random_str(user):
        """ 生成随机字符串 """
        import hashlib, time
        ctime = str(time.time())
        md5 = hashlib.md5(bytes(user, encoding='utf-8'))
        md5.update(bytes(ctime, encoding="utf-8"))
        return md5.hexdigest()
    

       注意:ctime=str(time.time()) ,世界上一直在变化的就是时间变量,因此ctime这个变量每个都是不同的。

             hashlib.md5()构建md5对象。实例化md5时传递参数叫做加盐

    md5 = hashlib.md5(bytes(user, encoding='utf-8'))
    

       md5.update()写入要加密的字节:(这里的md5是实例化出来的对象)

    md5.update(bytes(ctime, encoding="utf-8"))
    

       md5_obj.hexdigest():获取密文

    return md5.hexdigest()
    

      加盐之后,即使要加密的数据完全一样,但是用户名肯定不一样,因此产生的密文一定唯一。

    (3)update_or_create(self, defaults=None, **kwargs)方法

    Token.objects.update_or_create(user=user,defaults={"token":random_str})
    

      用给定的**kwargs值查找对象(这里是user=user),如果defaults不为None则用defaults的值{"token":random_str}更新对象;如果为None则创建一个新对象。

    class QuerySet:
        def update_or_create(self, defaults=None, **kwargs):
            """
            Look up an object with the given kwargs, updating one with defaults
            if it exists, otherwise create a new one.
            Return a tuple (object, created), where created is a boolean
            specifying whether an object was created.
            """
            defaults = defaults or {}
    

      如果是create操作返回值是添加的数据,如果是update操作返回值是更新的条数。

    二、局部视图认证

    from rest_framework import exceptions
    
    class TokenAuth(object):   # 这个类名可以任意取
        def authenticate(self, request):   # 这个方法名不可变动
            token = request.GET.get("token")
            token_obj = Token.objects.filter(token=token).first()
            if not token_obj:
                raise exceptions.AuthenticationFailed("验证失败!")
            # 如果有值  return两个值中间加逗号,就构成了一个元组
            return token_obj.user.name, token_obj.token  # 元组:(关联用户对象的名字,当前登录对象token)
    
        def authenticate_header(self, request):   # 不加会报错,要求要传入两个参数
            pass
    
    class BookView(APIView):
        authentication_classes = [TokenAuth, ]
    
        def get(self, request):
            book_list = Book.objects.all()
            bs = BookModelSerializers(book_list, many=True, context={"request": request})  # 序列化结果
            # return HttpResponse(bs.data)
            return Response(bs.data)
    
        def post(self, request):
            # POST请求的数据
            bs = BookModelSerializers(data=request.data)
            if bs.is_valid():  # 验证数据是否合格
                print(bs.validated_data)
                bs.save()  # create方法
                return Response(bs.data)  # 当前添加的数据
            else:
                return Response(bs.errors)
    

    1、分析源码

      每次请求都要执行dispatch

    (1)dispatch分发前的认证权限验证

      在用户访问时执行APIView的dispatch方法,在dispatch进行分发操作前,需要先执行self.initial(request, *args, **kwargs),执行认证、权限、频率操作。

    def dispatch(self, request, *args, **kwargs):
        self.args = args
        self.kwargs = kwargs
        request = self.initialize_request(request, *args, **kwargs)   # 构建新request
        self.request = request
        self.headers = self.default_response_headers  # deprecate?
    
        try:
            self.initial(request, *args, **kwargs)    # 认证权限频率
    
            # Get the appropriate handler method
            if request.method.lower() in self.http_method_names:
                handler = getattr(self, request.method.lower(),
                                  self.http_method_not_allowed)
            else:
                handler = self.http_method_not_allowed
    
            response = handler(request, *args, **kwargs)
    
        except Exception as exc:
            response = self.handle_exception(exc)
    
        self.response = self.finalize_response(request, response, *args, **kwargs)
        return self.response
    

    (2)initial()方法中的认证权限频率组件

    class APIView(View): 
        def initial(self, request, *args, **kwargs):
            """代码省略"""
            # Ensure that the incoming request is permitted 
            # 初始化认证组件
            self.perform_authentication(request)
            # 权限组件
            self.check_permissions(request)
            # 频率组件
            self.check_throttles(request)

    (3)查看perform_authentication()方法

    def perform_authentication(self, request):
        request.user
    

      由于在dispatch函数中,self.initial(request, *args, **kwargs)晚于request = self.initialize_request(request, *args, **kwargs)。因此这里的request是新构建的request。request.user即需要去Request类中寻找user静态方法。

      这个新request通过initialize_request方法返回Request类对象:

    def initialize_request(self, request, *args, **kwargs):
        parser_context = self.get_parser_context(request)
    
        return Request(
            request,
            parsers=self.get_parsers(),
            authenticators=self.get_authenticators(),
            negotiator=self.get_content_negotiator(),
            parser_context=parser_context
        )
    

    (4)Request类中有方法user

      紧跟with后面的语句会被求值,返回对象的__enter__()方法被调用,这个方法的返回值将被赋值给as关键字后面的变量,当with后面的代码块全部被执行完之后,将调用前面返回对象的__exit__()方法。

      with语句最关键的地方在于被求值对象必须有__enter__()__exit__()这两个方法。

      由此可知 self._authenticate()一定会执行。

    @property
    def user(self):
        """
        Returns the user associated with the current request, as authenticated
        by the authentication classes provided to the request.
      当request已经通过认证类提供的认证,返回当前请求关联的用户 """ if not hasattr(self, '_user'): with wrap_attributeerrors(): self._authenticate() return self._user

      注意:@property装饰器就是负责把一个方法变成属性调用。未通过验证的需要用self._authenticate()方法来进行校验。

    (5)_authenticate方法分析(认证核心)

      循环 self.authenticators,拿到我们配置的每个认证类的实例化对象。

      配置的认证类中一定要实现authenticate()方法,否则一定会报错。

      需要注意的是:这里的 self 就是 request。authenticate()方法的返回值是一个元组——user_auth_tuple

    class Request:
        def _authenticate(self):
            """
            Attempt to authenticate the request using each authentication instance in turn.
            """
            # 循环 self.authenticators,拿到我们配置的每个认证类的实例化对象。
            for authenticator in self.authenticators:
                try:
                    # 配置的认证类中一定要实现authenticate()方法,否则一定会报错
                    # 这里的 self 就是 request。authenticate()方法的返回值是一个元组——user_auth_tuple
                    user_auth_tuple = authenticator.authenticate(self)
                except exceptions.APIException:
                    self._not_authenticated()
                    raise
    
                if user_auth_tuple is not None:
                    self._authenticator = authenticator
                    # 元组的值赋给:request.user和request.auth
                    self.user, self.auth = user_auth_tuple
                    return
    
            self._not_authenticated()

      继续追溯 self.authenticators 的来源是Request实例化时传入的:

    class Request:
        """
        Wrapper allowing to enhance a standard `HttpRequest` instance.
    
        Kwargs:
            - request(HttpRequest). The original request instance.
            - parsers_classes(list/tuple). The parsers to use for parsing the
              request content.
            - authentication_classes(list/tuple). The authentications used to try
              authenticating the request's user.
        """
    
        def __init__(self, request, parsers=None, authenticators=None,
                     negotiator=None, parser_context=None):
            assert isinstance(request, HttpRequest), (
                'The `request` argument must be an instance of '
                '`django.http.HttpRequest`, not `{}.{}`.'
                .format(request.__class__.__module__, request.__class__.__name__)
            )
    
            self._request = request
            self.parsers = parsers or ()
            self.authenticators = authenticators or ()
            ...

    (6)追溯self.authenticators

      找到Request实例化的方法:initialize_request

    class APIView(View):
        def initialize_request(self, request, *args, **kwargs):
            return Request(
                authenticators=self.get_authenticators(),
            )
    

      再由此找到get_authenticators方法:

    class APIView(View):
        def get_authenticators(self):
            """
            Instantiates and returns the list of authenticators that this view can use.
            """
            return [auth() for auth in self.authentication_classes] 
    

      继续追溯 self.authentication_classes:

    # rest_framework/views.py
    class APIView(View):
    
        # The following policies may be set at either globally, or per-view.
        renderer_classes = api_settings.DEFAULT_RENDERER_CLASSES
        parser_classes = api_settings.DEFAULT_PARSER_CLASSES
        authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES

      self.authentication_classes 去配置文件中拿所有的认证类

      authentication_classes就是我们在视图函数中定义的列表:

    class BookView(APIView):
        authentication_classes = [TokenAuth, ]
        def get(self, request):...
    
        def post(self, request):...
    

       [auth() for auth in self.authentication_classes]这个语句的含义:循环每一个认证类并进行实例化,放在数组中。以[TokenAuth, ]为例返回值是[TokenAuth(), ]。 

      因此回传回去authenticators=[TokenAuth(), ]。

    (7)_authenticate方法处理

    class Request(object):
        def __init__(self, request, parsers=None, authenticators=None,
                     negotiator=None, parser_context=None):
            self._request = request
            self.parsers = parsers or ()
            self.authenticators = authenticators or ()   # 如果是None返回空元组,如果有值返回authenticators
            """
            print(3 and 0)   # 0
            print(0 and 2)   # 0
            print(0 or 1)    # 1  
            print(4 or 1)    # 4
            """
    
        def _authenticate(self):
            """
            Attempt to authenticate the request using each authentication instance
            in turn.
            """
            for authenticator in self.authenticators:   # [TokenAuth(), ],  authenticator:TokenAuth()
                try:
                    user_auth_tuple = authenticator.authenticate(self)  # TokenAuth必须有authenticate方法
                    # authenticator.authenticate(self):是一个实例对象调用自己的实例方法,本不需要传self,这里一定是传的一个形参。
                    # 这个方法是在Request类中,追溯调用关系可知,这里的self是一个新request对象
                except exceptions.APIException:  # 抛出错误
                    self._not_authenticated()   # 没有验证成功
                    raise
    
                if user_auth_tuple is not None:   # 如果user_auth_tuple有值
                    self._authenticator = authenticator
                    self.user, self.auth = user_auth_tuple  # 将元组的值赋给self.user和self.auth
                    return
    
            self._not_authenticated()   # 没有验证成功
    

      authenticator.authenticate(self):是一个实例对象调用自己的实例方法,本不需要传self,这里一定是传的一个形参。这个方法是在Request类中,追溯调用关系可知,这里的self是一个新request对象。因此在构建自定义的TokenAuth时一定要在def authenticate(self, request):  添加request参数。

    2、测试验证

      

      访问时添加数据库查到的token信息,验证通过:

      

       打印之前在_authenticate将元组的值赋给self.user和self.auth的值:

    class BookView(APIView):
        authentication_classes = [TokenAuth, ]
    
        def get(self, request):
            print(request.user)   # egon
            print(request.auth)   # 02aee930be6011068e24f68935d52b02
    

      可以看到正好对应authoerticate函数的返回值:return token_obj.user.name, token_obj.token.

      因此在视图中类中配置authentication_classes,即可实现局部认证。

    3、BaseAuthentication模块引入规范简化认证类编写

    from rest_framework import exceptions
    from rest_framework.authentication import BaseAuthentication
    
    class TokenAuth(BaseAuthentication):
        def authenticate(self, request):
            token = request.GET.get("token")
            token_obj = Token.objects.filter(token=token).first()
            if not token_obj:
                raise exceptions.AuthenticationFailed("验证失败!")
            # 如果有值  return两个值中间加逗号,就构成了一个元组
            return token_obj.user.name, token_obj.token  # 元组:(关联用户对象的名字,当前登录对象token)
    
        # def authenticate_header(self, request):   # 不加会报错,且要求要传入两个参数
        #     pass
    

      BaseAuthentication包含authenticate和authenticate_header函数。默认用来被覆盖。

    三、全局视图认证

     1、源码分析

      如果没有在局部定义authentication_classes=[TokenAuth, ]。回溯查找默认的authentication_classes。

    (1)APIView有变量authentication_classes

    class APIView(View):
        authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
    

      api_settings.DEFAULT_AUTHENTICATION_CLASSES是类的实例对象.属性的模式。当调用不存在的属性时,Python会试图调用__getattr__(self,attr)来获取属性,并且返回attr。

    (2)查看api_settings

      该语句在rest_framework/settings.py中。发现api_settings是一个实例对象。实例化时执行相应的init方法。

    api_settings = APISettings(None, DEFAULTS, IMPORT_STRINGS)
    

      DEFAULTS这个值是一个字典,每一个键后面都是跟着一个元组,保存了关于rest_framework所有默认配置。也定义在rest_framework/settings.py文件中。

      

    (3)查看APISettings类__init__方法

    class APISettings(object):
        def __init__(self, user_settings=None, defaults=None, import_strings=None):
            if user_settings:
                self._user_settings = self.__check_user_settings(user_settings)
            self.defaults = defaults or DEFAULTS
            self.import_strings = import_strings or IMPORT_STRINGS
            self._cached_attrs = set()
    

      user_settings默认为None,如果有值拿到user_settings.

      self.defaults = defaults or DEFAULTS  拿到DEFAULTS字典值。

    (4)APISettings类的def __getattr__(self, attr)方法

    class APISettings(object):
        def __getattr__(self, attr):
            if attr not in self.defaults:
                raise AttributeError("Invalid API setting: '%s'" % attr)
    
            try:
                # Check if present in user settings
                val = self.user_settings[attr]
            except KeyError:   # 当self.user_settings的值是一个空字典,取值报错KeyError
                # Fall back to defaults
                val = self.defaults[attr]  # 异常处理去取默认的DEFAULT值
    
            # Coerce import strings into classes
            if attr in self.import_strings:
                val = perform_import(val, attr)
    
            # Cache the result
            self._cached_attrs.add(attr)
            setattr(self, attr, val)
            return val
    

      __getattr__是python里的一个内建函数,可以很方便地动态返回一个属性;当调用不存在的属性时,Python会试图调用__getattr__(self,item)来获取属性,并且返回item;

    class Person(object):
        def __init__(self, name):
            self.name = name
    
        def __getattr__(self, item):
            print("item", item)
    
        def dream(self):
            print("dreaming。。。。。")
            
    alex = Person("alex")
    alex.yuan   # 打印:item yuan
    

      val = self.user_settings[attr]:user_settings执行的返回值后面加上[attr]

    (5)APISettings类的user_settings方法执行

    class APISettings(object):
        @property
        def user_settings(self):  # 静态方法
            if not hasattr(self, '_user_settings'):
                self._user_settings = getattr(settings, 'REST_FRAMEWORK', {})
            return self._user_settings
    

      这里的settings指的是restDemo项目中的restDemo/settings.py。

      因此getattr(settings, 'REST_FRAMEWORK', {})  代表的意思是:去settings.py中去拿REST_FRAMEWORK变量,如果拿不到则取一个空字典。因此self._user_settings一定是一个字典。

      因此self.user_settings[attr]是在字典中取键attr的值.当字典为空时,取不到值会抛出Keyerror错误,进行异常处理去取默认的DEFAULT字典中的值:

    DEFAULTS = {
        """省略代码"""
        'DEFAULT_AUTHENTICATION_CLASSES': (
            'rest_framework.authentication.SessionAuthentication',
            'rest_framework.authentication.BasicAuthentication'
        ),
        """省略代码"""
    

    2、在settings.py中配置全局视图

    REST_FRAMEWORK = {
        # 仿照DEFAULT配置认证类路径
        "DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.TokenAuth"]
    }
    

    (1)REST_FRAMEWORK是一个字典

      键必须是DEFAULT_AUTHENTICATION_CLASSES

    (2)值是认证类路径

      由于要指定认证类路径因此要把之前写的TokenAuth从views.py迁移到一个新文件(文件名自定义)中,这里是:/app01/utils.py。在这里定义认证类TokenAuth,认证类必须包含authenticate方法。

      注意:不能继承object类,必须继承基础认证类——BaseAuthentication

    from .models import *
    from rest_framework import exceptions
    from rest_framework.authentication import BaseAuthentication
    
    class TokenAuth(BaseAuthentication):
        def authenticate(self, request):
            token = request.GET.get("token")
            token_obj = Token.objects.filter(token=token).first()
            if not token_obj:
                raise exceptions.AuthenticationFailed("验证失败!")
            # 如果有值  return两个值中间加逗号,就构成了一个元组
            return token_obj.user.name, token_obj.token  # 元组:(关联用户对象的名字,当前登录对象token)
    

    3、在全局认证情况下,配置部分访问不进行认证

    class LoginView(APIView):
        authentication_classes = []    # 配置为空,优先级高于全局认证,不用进行认证
        def post(self, request):....
    

    四、权限组件

      权限是对某件事情决策的范围和程度,权限在项目开发中经常会用到。视频网站中,很多视频需要vip用户才能观看,甚至还有一些视频需要svip、ssvip才能观看。

    1、权限组件源码分析

      DRF的权限与版本、认证、频率组件都非常相似,均是在initial方法里初始化的。一般来说权限基于登录,因此在DRF源码中先执行版本和认证才执行权限。

    (1)访问请求交给APIView类下的dispatch方法处理

    class APIView(View):
        def dispatch(self, request, *args, **kwargs):
            self.args = args
            self.kwargs = kwargs
            request = self.initialize_request(request, *args, **kwargs)
            self.request = request
            self.headers = self.default_response_headers  # deprecate?
    
            try:
                self.initial(request, *args, **kwargs)
            """代码省略"""
    

      在dispatch中使用的initial()方法,包含了权限组件:

    def initial(self, request, *args, **kwargs):
      """代码省略"""
        # Ensure that the incoming request is permitted 
        # 认证组件
        self.perform_authentication(request)
        # 权限组件
        self.check_permissions(request)
        # 频率组件
        self.check_throttles(request)
    

    (2)权限函数——check_permissions

      这里循环拿到的permission是我们配置的每一个权限类的实例对象(MyPermission等);

      permission_denied用于抛出异常,因此权限类必须有has_permission方法,否则将抛出异常。

      注意:这里的self一直是我们视图类的实例化对象

      失败抛出异常时,通过反射在实例化对象中找message属性,因此可以在权限类中定义message来定义异常信息。

    class APIView(View):
        def check_permissions(self, request):
            """
            Check if the request should be permitted.
            Raises an appropriate exception if the request is not permitted.
            """
            for permission in self.get_permissions():   # 循环的是[SVIPPermission(), ]  permission是SVIPPermission()——权限实例
                if not permission.has_permission(request, self):   # 由此可见权限类必须有has_permission方法
            # 通过权限认证什么都不用做,不通过执行以下代码 self.permission_denied( request, message=getattr(permission, 'message', None) )

      self.get_permissions()是权限类实例对象组成的列表。

      permission.has_permission()是权限类中的权限方法。

      self.permission_denied()是在权限没有权限方法时,抛出异常。

      1)查看get_permissions方法

    class APIView(View):
        def get_permissions(self):
            """
            Instantiates and returns the list of permissions that this view requires.
            """
            return [permission() for permission in self.permission_classes]
    

      [permission() for permission in self.permission_classes]与认证组件完全类似:循环每一个权限类并进行实例化,组成一个列表返回。

      2)查看确认permission_classes的值

    class APIView(View):
        # The following policies may be set at either globally, or per-view.
        authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
        permission_classes = api_settings.DEFAULT_PERMISSION_CLASSES
    

      如果在自己的视图类中定义了permission_classes(优先取当前视图的局部配置) ,说明配置了局部视图权限,就取自己定义的。

      如果没有定义:

      由于权限组件依然是用了api_settings这个APISettings类实例对象,实例化时执行__init__函数,因此也执行了user_settings静态方法。

    class APISettings(object):
        def __init__(self, user_settings=None, defaults=None, import_strings=None):
            if user_settings:
                self._user_settings = self.__check_user_settings(user_settings)
            self.defaults = defaults or DEFAULTS
    
        def user_settings(self):
            if not hasattr(self, '_user_settings'):
                self._user_settings = getattr(settings, 'REST_FRAMEWORK', {})  # 去settings.py中去拿REST_FRAMEWORK变量,如果拿不到则取一个空字典
            return self._user_settings
    

      这里user_settings函数通过反射取settings下是否有配置‘REST_FRAMEWORK’,如果有配置即说明配置了全局视图权限。self._user_settings取配置的值

      getattr(settings, 'REST_FRAMEWORK', {})里的settings是从django引入过来的:

    from django.conf import settings
    

      如果没有配置则self._user_settings是一个空字典,需要进一步分析api_settings.DEFAULT_PERMISSION_CLASSES:

      APISettings类中包含__getattr__方法,当调用不存在的属性时,Python会试图调用__getattr__(self,item)来获取属性,并且返回item:

    class APISettings(object):
        def __getattr__(self, attr):
            if attr not in self.defaults:
                raise AttributeError("Invalid API setting: '%s'" % attr)
    
            try:
                # Check if present in user settings
                val = self.user_settings[attr]    # 全局权限视图未设置时self.user_settings返回值是一个空字典,设置时取到全局配置
            except KeyError:   # 空字典取值报错抛出异常
                # Fall back to defaults
                val = self.defaults[attr]    # 获取DEFAULT中默认值
    
            # Coerce import strings into classes
            if attr in self.import_strings:
                val = perform_import(val, attr)
    
            # Cache the result
            self._cached_attrs.add(attr)
            setattr(self, attr, val)
            return val
    

      由于全局权限视图未未设置时self.user_settings返回值是一个空字典,因此取值会失败,通过异常处理获取DEFAULT默认配置中的值

      3)总结

      self.get_permissions()的返回值是权限实例列表,以下面的示例为例是[SVIPPermission(), ]。因此for循环拿到的permission是一个个权限实例:SVIPPermission()

      因此如果permission.has_permission返回值是true,直接完成权限验证;如果返回值是False,则返回没有权限的提示。

      因此,权限类一定要有has_permission方法,否则会抛出异常。

    2、局部视图权限

    (1)前置准备

      修改models.py中user表结构,设置用户类型和默认值:

    class User(models.Model):
        name = models.CharField(max_length=32)
        pwd = models.CharField(max_length=32)
        type_choice = ((1, "普通用户"), (2, "VIP"), (3, "SVIP"))
        user_type = models.IntegerField(choices=type_choice, default=1)
    

       修改后完成数据库迁移。

    (2)配置权限类

      自定义工具包文件夹utils,创建permission.py权限文件:

    from rest_framework.permissions import BasePermission
    
    class SVIPPermission(BasePermission):
        message = "您没有权限"
        # 超级用户可用
        def has_permission(self, request, view):
            username = request.user   # 获取前面认证过的信息:egon
            user_type = User.objects.filter(name=username).first().user_type
            if user_type == 3:
                # 通过权限认证
                return True
            else:
                # 未通过权限认证
                return False

      视图配置如下所示:

    from rest_framework import viewsets
    from utils.permission import SVIPPermission    # 权限
    
    class AuthorViewSet(viewsets.ModelViewSet):
        # authentication_classes = [TokenAuth, ]
        permission_classes = [SVIPPermission, ]  
        queryset = Author.objects.all()  # 配置queryset:告知这个类这次处理的数据
        serializer_class = AuthorModelSerializers  # 告知处理用到的序列化组件

      显示效果:

      

    (3)用message自定义配置错误提示

      查看源码可以看到,check_permissions中失败抛出异常时,通过反射在实例化对象中找message属性,因此可以在权限类中定义message来定义异常信息。

    class SVIPPermission(object):
        # 超级用户可用
        message = "只有超级用户才能访问"
        def has_permission(self, request, view):
            username = request.user   # 获取前面认证过的信息:egon
            user_type = User.objects.filter(name=username).first().user_type
            if user_type == 3:
                # 通过权限认证
                return True
            else:
                # 未通过权限认证
                return False
    

      显示效果:

      

    3、全局视图权限

      在settings.py中配置全局权限

    REST_FRAMEWORK = {
        # 认证类路径
        "DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.TokenAuth"],
        # 权限类路径
        "DEFAULT_PERMISSION_CLASSES": ["utils.permission.SVIPPermission"]
    }
    

    五、throttle(频率访问)组件

      开放平台的API接口调用需要限制其频率,以节约服务器资源和避免恶意的频繁调用。

    1、源码分析

      频率组件的源码和前面的版本、认证、权限是一个流程。

    class APIView(View):
        def dispatch(self, request, *args, **kwargs):
            try:
                self.initial(request, *args, **kwargs)
    
        def initial(self, request, *args, **kwargs):
            self.check_throttles(request)
    
        def check_throttles(self, request):
            """
            Check if request should be throttled.
            Raises an appropriate exception if the request is throttled.
            """
            for throttle in self.get_throttles():
                if not throttle.allow_request(request, self):
                    self.throttled(request, throttle.wait())
    
        def get_throttles(self):
            return [throttle() for throttle in self.throttle_classes]   # 频率对象列表
    

      get_throttles:去配置文件中拿到所有频率控制类,并实例化放入列表中。

      因此 throttle 是配置中每个频率控制类的实例化对象。

      self.throttled 则是失败抛出异常。

      throttle.wait方法主要用来向客户端返回还需要多少时间可以继续访问。

    2、频率组件原理和模板

    (1)频率组件原理

      DRF中频率控制基本原理是基于访问次数和时间,可以通过自己定义的方法来实现。

      请求走到频率组件的时候,DRF内部会有一个字典记录访问者的IP,以这个IP为KEY,value为一个列表,存放访问者每次访问的时间。

    {ip: [time1, time2, time3],}

      把每次访问最新时间放入列表的最前面,记录这样一个数据结构后,既可以用如下示例(设置10秒内只能访问次)实现限流:

      -- 1、判断访问者的IP是否在这个请求IP的字典里

      -- 2、保证这个列表里都是最近10秒内的访问的时间

          判断当前请求时间和列表里最早的(也就是最后的)请求时间的查

          如果差大于10秒,说明请求以及不是最近10秒内的,删除掉,

          继续判断倒数第二个,直到差值小于10秒

      -- 3、判断列表的长度(即访问次数),是否大于我们设置的5次,

          如果大于就限流,否则放行,并把时间放入列表的最前面。

    (2)频率组件模板

      创建文件DRFDemo/utils/throttle.py:

    import time
    from rest_framework.throttling import BaseThrottle
    
    VISIT_RECORD = {}
    
    class MyThrottle(BaseThrottle):
    
        def allow_request(self, request, view):
            # 实现限流的逻辑
            # 以ip地址限流
            # 访问列表  {ip: [time1,time2,time3]}
            # 1、获取请求的ip地址
            # 2、判断ip地址是否在访问列表
                # 2.1 不在,给访问列表添加key,value
                # 2.2 在,需要获取该ip的访问记录,把当前时间加入列表
            # 3、确保列表里最新访问即最老访问的时间差 是1分钟
            # 4、得到列表的长度,判断是否是允许的次数
            
            pass
    
        def wait(self):
            # 返回还剩多久可继续访问
            pass 

    3、局部视图throttle

    (1)限制类定义

      直接将频率限制类定义在DRFDemo/utils/throttle.py中:

    import time
    from rest_framework.throttling import BaseThrottle
    
    VISIT_RECORD = {}
    
    class MyThrottle(BaseThrottle):
    
        def __init__(self):
            self.history = None
    
        def allow_request(self, request, view):
            # 实现限流的逻辑
            # 以ip地址限流:要求访问站点的频率一分钟不超过3次
            # 访问列表  {ip: [time1,time2,time3]}
            # 1、获取请求的ip地址
            remote_ip = request.META.get("REMOTE_ADDR")
            ctime = time.time()
            # 2、判断ip地址是否在访问列表
            if remote_ip not in VISIT_RECORD:
                # 2.1 不在,给访问列表添加key,value
                VISIT_RECORD[remote_ip] = [ctime, ]
                return True
    
            # 2.2 在,需要获取该ip的访问记录,把当前时间加入列表
            history = VISIT_RECORD.get(remote_ip)    # 取到列表
    
            # 3、确保列表里最新访问即最老访问的时间差 是1分钟
            while history and history[0] - history[-1] > 60:
                history.pop()               # 删最后一个
            self.history = history
            # 4、得到列表的长度,判断是否是允许的次数
            if len(history) < 3:
                # 未达到频率限制
                history.insert(0, ctime)    # 加到第一个
                return True
            else:
                return False
    
        def wait(self):
            # 返回还剩多久可继续访问
            ctime = 60 - (self.history[0] - self.history[-1])    # 最新的时间减去最老的时间(可能是已经删除的时间)
            return ctime

    (2)视图中配置局部频率限制

      在视图类中中配置局部频率限制:

    import uuid      # UUID对象和生成函数
    from .models import User
    from utils.auth import MyAuth    # 认证
    from utils.permission import MyPermission    # 权限
    from utils.throttle import MyThrottle        # 频率
    from rest_framework.views import APIView
    from rest_framework.response import Response
    
    class DemoView(APIView):
        def get(self, request):
            return Response("认证demo~")
    
    class LoginView(APIView):
        def post(self, request):
            username = request.data.get("username")
            pwd = request.data.get("pwd")
            # 登录成功,生成token会将token给你返回
            token = uuid.uuid4()   # 通过随机数来生成UUID
            User.objects.create(username=username, pwd=pwd, token=token)
            return Response("创建用户成功")
    
    class TestView(APIView):
        authentication_classes = [MyAuth, ]    # 局部认证
        # permission_classes = [MyPermission, ]  # 局部权限
        throttle_classes = [MyThrottle, ]
    
        def get(self, request):
            # 访问:GET /auth/test?token=98bc3edce82143f8ad638f9cad336807
            print(request.user)     # User object (1)
            print(request.auth)     # 98bc3edce82143f8ad638f9cad336807
            return Response("认证测试")

      注意:request.META 是一个Python字典,包含了所有本次HTTP请求的Header信息,比如用户IP地址和用户Agent(通常是浏览器的名称和版本号)。 注意,Header信息的完整列表取决于用户所发送的Header信息和服务器端设置的Header信息。

    (3)频率组件测试

      连续访问:http://127.0.0.1:8000/auth/test?token=98bc3edce82143f8ad638f9cad336807,前三次均显示“认证测试”。第四次显示如下所示:

      

    4、全局视图throttle

      在restDemo/settings.py中配置:

    REST_FRAMEWORK = {
        # "DEFAULT_VERSIONING_CLASS": "utils.version.MyVersion",
        # 默认使用的版本控制类
        "DEFAULT_VERSIONING_CLASS": "rest_framework.versioning.QueryParameterVersioning",
        # 默认使用的版本
        "DEFAULT_VERSION": "v1",
        # 允许的版本
        "ALLOWED_VERSIONS": "v1, v2",
        # 版本使用的参数名称
        "VERSION_PARAM": "ver",
        # 默认认证类
        "DEFAULT_AUTHENTICATION_CLASSES": ["utils.auth.MyAuth",],
        # 默认权限类
        "DEFAULT_PERMISSION_CLASSES": ["utils.permission.MyPermission"],
        # 默认频率类
        "DEFAULT_THROTTLE_CLASSES": ["utils.throttle.MyThrottle"]
    }

    5、框架内置throttle限流类

      将频率配置类修改为:

    class MyThrottle(SimpleRateThrottle):
        scope = "WD"
    
        def get_cache_key(self, request, view):
            # 如果以ip地址做限流返回ip地址
            key = self.get_ident(request)
            return key

      settings.py设置:

    REST_FRAMEWORK = {
        # 默认使用的版本控制类
        "DEFAULT_VERSIONING_CLASS": "rest_framework.versioning.QueryParameterVersioning",
        # 默认使用的版本
        "DEFAULT_VERSION": "v1",
        # 允许的版本
        "ALLOWED_VERSIONS": "v1, v2",
        # 版本使用的参数名称
        "VERSION_PARAM": "ver",
        "DEFAULT_THROTTLE_RATES": {
            "WD": "3/m"
        }
    }
  • 相关阅读:
    五、页脚footer
    一、页眉header
    四、(2)列布局+媒体查询
    二、导航栏nav
    coredns介绍
    pandas指定列索引和行索引
    学习笔记246—国家自然科学基金申请书写作攻略【收藏】
    Axios请求传参的格式
    NodeJspm2常用命令
    FastAPI实现谷歌DialogFlow 接口问答批量导入导出和批量删除 DialogFlow batch import and export Q&A interface
  • 原文地址:https://www.cnblogs.com/xiugeng/p/9579220.html
Copyright © 2011-2022 走看看