zoukankan      html  css  js  c++  java
  • Django Rest Framwork的认证组件 权限组件以及频率组件

    提示:

    再看这篇博客之前 最好先看看关于CBV FBV的那篇博客  看完之后再来理解源码以及流程就相对方便多了 https://www.cnblogs.com/wakee/p/12553402.html

    本片博客部分参考:https://www.cnblogs.com/wupeiqi/articles/7805382.html

    1 DRF简介

    '''
    基于cbv完成满足restful规范的接口
    '''
    

    2 安装drf

    pip install djangorestframework

    # 安装好之后再settings.py注册app: INSTALLED_APPS = [..., 'rest_framework']

      

    3 认证组件

    注意

    组件中有俩种获取数据的方式
    1 request.data.get()
    2 request._request.GET.get()
    
    区别是request.data是前端放在body中的数据
    request._request.GET.get()是get请求带的参数
    

      

    # 对用户进行登录认证
    class AuthView(APIView):
    
        def post(self,request,*args,**kwargs):
            ret = {
                'code':1000,
                "msg":None
            }
            # 从post 里面取数据
            print(request.data)
    		# 获取数据
            username = request.data.get("username")
            password = request.data.get("password")
            
            
            
            
    class Authtication(BasicAuthentication):
        def authenticate(self,request):
    
            # 获取数据
            print(request.data)  # body中的数据
            token = request._request.GET.get("token")
            print(token)
    

      

     

    3.1 authentication

    """
    系统:session认证
    rest_framework.authentication.SessionAuthentication
    ajax请求通过认证:
    cookie中要携带 sessionid、csrftoken,请求头中要携带 x-csrftoken
    
    第三方:jwt认证 
    rest_framework_jwt.authentication.JSONWebTokenAuthentication
    ajax请求通过认证:
    请求头中要携带 authorization,值为 jwt空格token
    
    自定义:基于jwt、其它
    1)自定义认证类,继承BaseAuthentication(或其子类),重写authenticate
    2)authenticate中完成
        拿到认证标识 auth
        反解析出用户 user
        前两步操作失败 返回None => 游客
        前两步操作成功 返回user,auth => 登录用户
        注:如果在某个分支抛出异常,直接定义失败 => 非法用户
    """
    

      

    3.2源码流程

    dispatch
    	--封装request
        	--获取定义的认证类(全局/局部),通过列表生成式创建对象
        --initial
        	--perform_auhentication
            	--request.user(内部循环列表生成式创建的对象)
    

     具体源码

    # 第一步
        def dispatch(self, request, *args, **kwargs):
            """
            `.dispatch()` is pretty much the same as Django's regular dispatch,
            but with extra hooks for startup, finalize, and exception handling.
            """
            self.args = args
            self.kwargs = kwargs
            # 对原生的request进行加工,看第二步 
            request = self.initialize_request(request, *args, **kwargs)  # 点击initialize_request进入第二步看request
            self.request = request
            self.headers = self.default_response_headers  # deprecate?
    
            try:
                self.initial(request, *args, **kwargs)
    
                # Get the appropriate handler method
                if request.method.lower() in self.http_method_names:
                    handler = getattr(self, request.method.lower(),
                                      self.http_method_not_allowed)
                else:
                    handler = self.http_method_not_allowed
    
                response = handler(request, *args, **kwargs)
    
            except Exception as exc:
                response = self.handle_exception(exc)
    
            self.response = self.finalize_response(request, response, *args, **kwargs)
            return self.response
        
        
        
        
      # 第二步  
        
        def initialize_request(self, request, *args, **kwargs):
            """
            Returns the initial request object.
            """
            parser_context = self.get_parser_context(request)
    
            # 对request进行了加工 不仅有原生request 还有其他的类对象比如authentication_classes
            return Request(
                request,
                parsers=self.get_parsers(),
                authenticators=self.get_authenticators(), # 点击get_authenticators()进入第三步
                negotiator=self.get_content_negotiator(),
                parser_context=parser_context
            )
        
        
        
       # 第三步
        
         def get_authenticators(self):
            """
            Instantiates and returns the list of authenticators that this view can use.
            """
            # 谁调用他就拿到那个类的对象
            return [auth() for auth in self.authentication_classes]  # 点击authentication_classes 第四步
        
        
        
        # 第四步:
        class APIView(View):
    
            # The following policies may be set at either globally, or per-view.
            renderer_classes = api_settings.DEFAULT_RENDERER_CLASSES
            parser_classes = api_settings.DEFAULT_PARSER_CLASSES
            authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES # 点击后看的部分
            throttle_classes = api_settings.DEFAULT_THROTTLE_CLASSES
            permission_classes = api_settings.DEFAULT_PERMISSION_CLASSES
            content_negotiation_class = api_settings.DEFAULT_CONTENT_NEGOTIATION_CLASS
            metadata_class = api_settings.DEFAULT_METADATA_CLASS
            versioning_class = api_settings.DEFAULT_VERSIONING_CLASS
        
        
      
        
        
        # 第五步
        def dispatch(self, request, *args, **kwargs):
            """
            `.dispatch()` is pretty much the same as Django's regular dispatch,
            but with extra hooks for startup, finalize, and exception handling.
            """
            self.args = args
            self.kwargs = kwargs
            # 对原生的request进行加工 ,其实就是
            request = self.initialize_request(request, *args, **kwargs)  # 点击initialize_request进入第二步看request
            self.request = request
            self.headers = self.default_response_headers  # deprecate?
    
            try:
                self.initial(request, *args, **kwargs)  # 点击进入第六步
    
                # Get the appropriate handler method
                if request.method.lower() in self.http_method_names:
                    handler = getattr(self, request.method.lower(),
                                      self.http_method_not_allowed)
                else:
                    handler = self.http_method_not_allowed
    
                response = handler(request, *args, **kwargs)
    
            except Exception as exc:
                response = self.handle_exception(exc)
    
            self.response = self.finalize_response(request, response, *args, **kwargs)
            return self.response
        
        
        
        
        # 第六步
        def initial(self, request, *args, **kwargs):
            """
            Runs anything that needs to occur prior to calling the method handler.
            """
            self.format_kwarg = self.get_format_suffix(**kwargs)
    
            # Perform content negotiation and store the accepted info on the request
            neg = self.perform_content_negotiation(request)
            request.accepted_renderer, request.accepted_media_type = neg
    
            # Determine the API version, if versioning is in use.
            version, scheme = self.determine_version(request, *args, **kwargs)
            request.version, request.versioning_scheme = version, scheme
    
            # Ensure that the incoming request is permitted
            self.perform_authentication(request)  # 点击进入下一步
            self.check_permissions(request)
            self.check_throttles(request)
            
         
        
         # 第七步   
        def perform_authentication(self, request):
            """
            Perform authentication on the incoming request.
    
            Note that if you override this and simply 'pass', then authentication
            will instead be performed lazily, the first time either
            `request.user` or `request.auth` is accessed.
            """
            request.user # 点击user进入下一步
    
        
        # 第八步
        @property
        def user(self):
            """
            Returns the user associated with the current request, as authenticated
            by the authentication classes provided to the request.
            """
            if not hasattr(self, '_user'):
                with wrap_attributeerrors():
                    self._authenticate() # 点击进入下一步
            return self._user
        
        
        # 第九步
    def _authenticate(self):
            # 遍历拿到一个个认证器,进行认证
            # self.authenticators配置的一堆认证类产生的认证类对象组成的 list
            for authenticator in self.authenticators:
                try:
                    # 认证器(对象)调用认证方法authenticate(认证类对象self, request请求对象)
                    # 返回值:登陆的用户与认证的信息组成的 tuple(request.user,request.auth)
                    # 该方法被try包裹,代表该方法会抛异常,抛异常就代表认证失败
                    user_auth_tuple = authenticator.authenticate(self)
                except exceptions.APIException:
                    self._not_authenticated()
                    raise
    
                # 返回值的处理
                if user_auth_tuple is not None:
                    self._authenticator = authenticator
                    # 如何有返回值,就将 登陆用户 与 登陆认证 分别保存到 request.user、request.auth
                    self.user, self.auth = user_auth_tuple
                    return
            # 如果返回值user_auth_tuple为空,代表认证通过,但是没有 登陆用户 与 登陆认证信息,代表游客
            self._not_authenticated()
    

      

    3.3  自定登录认证

    models.py

    from django.db import models
    
    class Userinfo(models.Model):
        user_type_choice ={
            (1,'普通用户'),
            (2,'VIP用户'),
            (3,'SVIP用户')
        }
        user_type = models.IntegerField(choices=user_type_choice)
        username = models.CharField(max_length=32)
        password = models.CharField(max_length=64)
    
    
    # 用户token表
    class UserToken(models.Model):
        user = models.OneToOneField(to="Userinfo",on_delete=True)
        token = models.CharField(max_length=64)
    

      

    views.py

    from django.shortcuts import render
    from rest_framework.views import APIView
    from api import models
    from django.http import JsonResponse,HttpResponse
    
    
    # 生成随机字符创(token)  并且进行更新
    def md5(username):
        import hashlib
        import time
        ctime = str(time.time())
        m = hashlib.md5(bytes(username,encoding="utf-8") )
        m.update(bytes(ctime,encoding="utf-8"))
        return m.hexdigest()
    
    
    # 对用户进行登录认证
    class AuthView(APIView):
    
        def get(self,request):
            return HttpResponse("GET请求")
    
        def post(self,request,*args,**kwargs):
            ret = {
                'code':1000,
                "msg":None
            }
    
            # 从post 里面取数据
            # 注意 获取值是request.data.get()来获取
            print(request.data)  # 这是前端带来的数据
            username = request.data.get("username")
            password = request.data.get("password")
    
            print(username,password)
            obj = models.Userinfo.objects.filter(username=username,password=password).first()
            if not obj:
                ret["code"] = 1001
                ret["msg"] = "用户名或密码错误"
                return JsonResponse(ret)
            if obj:
    
                # 为用户创建token
                token = md5(username=username)
                # 存在就更新 不存在就创建
                models.UserToken.objects.update_or_create(user=obj,defaults={"token":token})
                ret["token"] = token
                return JsonResponse(ret)
    
            else:
                ret["code"] =1002
                ret["msg"] = "请求异常"
                return JsonResponse(ret)
    

      

    3.4 自定义认证类

    自定义类模板

    1 创建类 继承BasicAuthentication 自己实现authenticate方法(就是自定义实现认证的逻辑)
    2 返回值:
        --None,表示不管当前认证,让下一位类来进行认证
        --raise exceptions.AuthenticationFailed("用户认证失败")  #  from rest_framework import exceptions
        --(元素1,元素2)  # 元素1赋值给request.user  元素二赋值给request.auth

    3 局部使用或全局使用

     自定义认证案例 

    from rest_framework import exceptions
    from rest_framework.authentication import BasicAuthentication

    # 第一步要写一个认证类 继承BasicAuthentication class Authtication(BasicAuthentication):

      # 第二步重写authenticate方法 def authenticate(self,request): token = request._request.GET.get("token") token_obj = models.UserToken.objects.filter(token=token).first() if not token_obj: raise exceptions.AuthenticationFailed("用户认证失败") # 在restframwork内部会将两个字段赋值给request,以供后续操作 return (token_obj.user,token_obj) def authenticate_header(self, request): # 此方法带着 不必写代码逻辑 pass

      

    使用自定义类

    局部使用  views.py

    from rest_framework import exceptions
    from rest_framework.authentication import BasicAuthentication
    # 第一步要写一个认证类
    class Authtication(BasicAuthentication):
        
        # 第二步重写authenticate方法
        def authenticate(self,request):
            # token = request._request.get("token")
            token_obj = models.UserToken.objects.filter(token=token).first()
            if not token_obj:
                raise exceptions.AuthenticationFailed("用户认证失败")
    
            # 在restframwork内部会将两个字段赋值给request,以供后续操作
            return (token_obj.user,token_obj)
    
        def authenticate_header(self, request):
            pass
    
    
        
        
    class OrderVIew(APIView):
        '''
        登陆之后可以查看订单信息
        '''  
        
        # 第三步使用自定义类
        authentication_classes = [Authtication,]
        
        
        # 获取订单信息
        # self.dispatch
        def get(self,request,*args,**kwargs):
            # 要首先判断是否是登录  登录的才能看订单信息
            # request.user 对应class Authtication(object):中的return (token_obj.user,token_obj)第一个参数
            # request.auth 对应class Authtication(object):中的return (token_obj.user,token_obj)第二个参数
            ret = {"code":1000,"msg":None,"data":None}
    
            try:
                ret["data"] = order_dict
            except Exception as e:
                pass
            return JsonResponse(ret)
    

      

    全局使用

    在settings.py中

    REST_FRAMEWORK = {
        # 'DEFAULT_AUTHENTICATION_CLASSES': ['apps.api.utils.auth.Authtication',],
        'DEFAULT_AUTHENTICATION_CLASSES': ['apps.api.utils.auth.Authtication'],  # 是自定义认证类的路径
        'UNAUTHENTICATED_USER': None,
        'UNAUTHENTICATED_TOKEN': None,
    }
    

    或者 

    REST_FRAMEWORK = {
    	# python中认证的配置
        'DEFAULT_AUTHENTICATION_CLASSES': (
            'rest_framework.authentication.BasicAuthentication',   # 基本认证
            'rest_framework.authentication.SessionAuthentication',  # session认证
        ),
        
        # python中权限的配置,如果没有指明,系统默认的权限是允许所有人访问的
        'DEFAULT_PERMISSION_CLASSES': (
            'rest_framework.permissions.AllowAny',
            
            # 全局配置:一站式网站(所有操作都需要登录后才能访问)
            # 'rest_framework.permissions.IsAuthenticated',
        )
    }
    

      

     

    4 权限组件

    功能:为不同的视图赋予不同的权限    用法和认证组件类似  源码流程和认证类似

    源码

    # 和认证组件类似 也是从dispatch开始往后看
    self.check_permissions(request)
        认证细则:
        def check_permissions(self, request):
            # 遍历权限对象列表得到一个个权限对象(权限器),进行权限认证
            for permission in self.get_permissions():
                # 权限类一定有一个has_permission权限方法,用来做权限认证的
                # 参数:权限对象self、请求对象request、视图类对象
                # 返回值:有权限返回True,无权限返回False
                if not permission.has_permission(request, self):
                    self.permission_denied(
                        request, message=getattr(permission, 'message', None)
                    )
    

      

    #权限组件自带的四个权限,默认全局配置的是AllowAny,
        - 可以在settings.py中更换全局配置
        - 可以在视图类中针对性的局部配置
        - 可以自定义
    
    class AllowAny(BasePermission):
        # 游客与登陆用户都有所有权限
        def has_permission(self, request, view):
            return True
    ​
    class IsAuthenticated(BasePermission):
        # 游客没有任何权限,登陆用户才有权限
        def has_permission(self, request, view):
            return bool(request.user and request.user.is_authenticated)
    ​
    class IsAdminUser(BasePermission):
        # 游客没有任何权限,登陆用户才有权限
        def has_permission(self, request, view):
            return bool(request.user and request.user.is_staff)
    ​
    class IsAuthenticatedOrReadOnly(BasePermission):
        # 认证规则必须是只读请求或是合法用户: 游客只读,合法用户无限制
        def has_permission(self, request, view):
            return bool(
                request.method in SAFE_METHODS or 
                request.user and
                request.user.is_authenticated
            )
    

      

    自定义权限类

    '''
    1) 创建继承BasePermission的权限类
    2) 实现has_permission方法
    3) 实现体根据权限规则 确定有无权限
    4) 进行全局或局部配置
    ​
    认证规则
    i.满足设置的用户条件,代表有权限,返回True
    ii.不满足设置的用户条件,代表有权限,返回False
    '''
    

    使用

    局部使用

    # 第一步 自定义权限类  继承BasePermission
    from rest_framework.permissions import BasePermission
    class MyPermission(BasePermission):
        def has_permission(self,request,view):
            if request.user.user_type !=3:
                return False  # 无权访问
            return True  # 有权访问
    
        
        
    class OrderVIew(APIView):
    
        # 添加一个需求 用户等级是3就可以访问
        permission_classes = [MyPermission,]  # 局部权限配置
    
        def get(self,request,*args,**kwargs):
            ret = {"code":1000,"msg":None,"data":None}
    
            try:
                ret["data"] = order_dict
            except Exception as e:
                pass
            return JsonResponse(ret)
    

      

    全局使用

    settings.py进行配置  如有视图中还有    permission_classes = [],优先 使用视图中 没有的再去配置文件中找

    REST_FRAMEWORK = {
    	# DRF中认证的配置
        'DEFAULT_AUTHENTICATION_CLASSES': (
            'rest_framework.authentication.BasicAuthentication',   # 基本认证
            'rest_framework.authentication.SessionAuthentication',  # session认证
        )
        
         
        # DRF中权限的配置
        # python中权限的配置,如果没有指明,系统默认的权限是允许所有人访问的
        'DEFAULT_PERMISSION_CLASSES': (
            'rest_framework.permissions.AllowAny',  # 所有用户都应权限
            
            # 一站式网站(所有操作都需要登录后才能访问)
            # 'rest_framework.permissions.IsAuthenticated',
            
            # 自定义配置的类
            #‘DEFAULT_PERMISSION_CLASSES‘:[‘app01.utils.permission.SVIPPermission‘,]#可以自定义多个权限  可以写地址
            
            # 自定义配置的类
            # 'rest_framework.permissions.MyPermission',
    
        )
    }
    

      

     

    5 频率组件

    源码流程

    1. 和认证的流程一样,进入initial(request)
    2. 其中check_throttles(request)是节流的函数
    3. allow_request()就是节流函数(要复写)(get_throttles循环所有节流类)
    4. VisitThrottle自定义权限类
       allow_request()返回值:
    
        - True, 允许访问
        - False, 访问太频繁
        
       wait()返回值:返回一个整数,表示下次还有多久可以访问
    

      

    两种使用方式

    1 继承BaseThrottle,自己实现allow_request(返回True或者False表示可以访问或者访问频率太高)
    	和wait方法 (表示还需等待到少秒)
        
    2 继承SimpleRateThrottle,自己实现 get_cache_key和设置一个scope(配置文件字段) 
        from rest_framework.throttling import  SimpleRateThrottle
        class VisitThrottle(SimpleRateThrottle):
            scope = "Vistor"
    		# 获取用户的IP
            def get_cache_key(self, request, view):
                return self.get_ident(request)
    
        class UserThrottle(SimpleRateThrottle):
            scope = "User"
    
            def get_cache_key(self, request, view):
                return self.user.username
      
    局部配置  全局配置
    class AuthView(APIView):
     	# 局部配置
        throttle_classes = [VisitThrottle,]
    
    全局配置
    1 : 针对第一种继承类方法
    'DEFAULT_THROTTLE_CLASSES': ['apps.api.utils.throttle.UserThrottle'], # 是自定义类的路径
    
     
    2 : 针对第二种继承类方法
    	'DEFAULT_THROTTLE_RATES': {
          'Vistor': '3/m',
          'User': '10/m'
         },
    

      

    第一种方式(建议使用)

    最好写在专门的组件文件夹中 和views视图分开

    from rest_framework.throttling import  SimpleRateThrottle
    class VisitThrottle(SimpleRateThrottle):
        scope = "Vistor"
    
        def get_cache_key(self, request, view):
            return self.get_ident(request)
    
    class UserThrottle(SimpleRateThrottle):
        scope = "User"
    
        def get_cache_key(self, request, view):
            return self.user.username
    

      

    第二种方式

    最好写在专门的组件文件夹中 和views视图分开

    # 节流控制
    from rest_framework.throttling import BaseThrottle
    import time
    
    # 设置一个空字典 放每次匿名登录的时间
    # 有个缺点 将用户ip放到全局变量中 每次启动时都会清空
    VISIT_REVORD={}
    
    class VisitThrottle(BaseThrottle):
        '''
        根据IP来限制用户的访问次数
        '''
        def __init__(self):
            self.history = None
    
        def allow_request(self, request, view):
            # 获取用户的ip
            remote_addr = request.META.get('REMOTE_ADDR')
    
            ctime = time.time()
    
            if remote_addr not in VISIT_REVORD:
                # 讲用户IP添加到字典中
                VISIT_REVORD[remote_addr] = [ctime,]
                return True
    
    
            # 获取到用户ip
            history = VISIT_REVORD.get(remote_addr)
            self.history  = history
    
            # 判断是否有ip  且判断字典最后一位是否小于当前时间减去30秒时
            while history and history[-1] < ctime-30:
                # 如果是减去最后一位 pop默认减去最后一位
                history.pop()
    
    
            if len(history) <3:
                # 当次数小于三时 将当前操作时间放进去
                history.insert(0,ctime)
                return True
    
    
        def wait(self):
            '''
            告知用户还需要等多少时间
            '''
            ctime = time.time()
            # history[-1]使用户最用一次访问时间
            wait_time = 30 -(ctime-self.history[-1]) # 提示用户还有多少秒才能访问
            return wait_time
    

      

     
     
     
     

     

  • 相关阅读:
    翻转二叉树 递归
    移动零 双指针
    多数元素
    laravel 使用PhantomMagick导出pdf ,在Linux下安装字体
    jQuery验证控件jquery.validate.js汉化
    windows 下使用Linux子系统
    Oracle12C RAC数据库grid&Oracle打补丁升级指导-来自ORACLE官方文档
    关于oracle数据库性能监控指标
    oracle在线添加日志组和日志组成员
    chr(39)表示单引'
  • 原文地址:https://www.cnblogs.com/wakee/p/12556212.html
Copyright © 2011-2022 走看看