zoukankan      html  css  js  c++  java
  • 认证、权限与频率组件

    一、认证组件

    局部视图认证

        url(r'^login/$', views.LoginView.as_view(),name="login"),

    models.py

    from django.db import models
    
    class User(models.Model):
        name=models.CharField(max_length=32)
        pwd=models.CharField(max_length=32)
    
    class Token(models.Model):
        user=models.OneToOneField("User")
        token = models.CharField(max_length=128)
    
        def __str__(self):
            return self.token
    
    class Book(models.Model):
        title=models.CharField(max_length=32)
        price=models.IntegerField()
        pub_date=models.DateField()
        publish=models.ForeignKey("Publish")
        authors=models.ManyToManyField("Author")
        def __str__(self):
            return self.title

    先看认证组件源代码流程:

    当用户登录的时候会走APIView类下的dispatch

    class APIView(View):
        authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES#
        
        # for authenticator in self.authenticators:中的authenticators最终来源于这里
        def get_authenticators(self):# (12)
    
            return [auth() for auth in self.authentication_classes] #[TokenAuthor()]  (13)
            #如果我们定义了authentication_classes就使用自己定义的,没有定义就使用上面的
            #最终生成的列表中放着一个个认证类的实例对象
    
        def perform_authentication(self, request):# (4)
    
            request.user #查找request:->perform_authentication(self, request)->initial(self, request, *args, **kwargs)
            #->self.initial(request, *args, **kwargs)->dispatch(self, request, *args, **kwargs)最后找到的是dispatch类下
            #的request方法:request = self.initialize_request(request, *args, **kwargs),这个request是initialize_request类
            #下Request的实例化对象,可知user是Request类中的静态方法
            # 实例化对象,而且是新构建的request,  (5)
            #
    
        def initialize_request(self, request, *args, **kwargs):#
            return Request(#
                request,#
                authenticators=self.get_authenticators(),  #[TokenAuthor()] (11)
            )
    
        def initial(self, request, *args, **kwargs):# (2)
            #认证组件
            self.perform_authentication(request) (3)
          #权限组件
            self.check_permissions(request)
            #访问频率组件
            self.check_throttles(request)
    
        def dispatch(self, request, *args, **kwargs):#
            request = self.initialize_request(request, *args, **kwargs)#
            self.request = request#
    
            try:
                self.initial(request, *args, **kwargs)#  (1)
    
    #request.py
    class Request:
        def __init__(self, request, parsers=None, authenticators=None,):
            self._request = request
    
            self.authenticators = authenticators or () (10)  #这里我们自定义了,在Request类实例化的时候通过参数的方式传进来了  
            #authenticators=self.get_authenticators()
    
        @property
        def user(self):  (6)
            if not hasattr(self, '_user'):
                with wrap_attributeerrors():
                    self._authenticate()  (7)
            return self._user
    
        #认证所有的源代码都在这
        def _authenticate(self):# (8)
            for authenticator in self.authenticators: #[TokenAuthor()]  (9)
                #查找authenticators:->_authenticate->user->Request(object)->get_authenticators
                #authenticator就是我们自定制的认证类的实例对象
                try:
                    user_auth_tuple = authenticator.authenticate(self)
                    #类下的实例对象调自己的方法本不需要传self,这里的self是新的request对象
    
                except exceptions.APIException:
                    self._not_authenticated()
                    raise
    
                if user_auth_tuple is not None:
                    self._authenticator = authenticator
                    self.user, self.auth = user_auth_tuple
                    return

    使用:

    在app01.service.auth.py:

    from rest_framework import exceptions
    from rest_framework.authentication import BaseAuthentication #
    class TokenAuth(BaseAuthentication):#
        def authenticate(self,request):#
            token=request.GET.get("token")#
            token_obj=Token.objects.filter(token=token).first()#
            if not token_obj:#
                raise exceptions.AuthenticationFailed("验证失败!")
            else:
                return token_obj.user.name,token_obj.token#

    views.py

    #生成token随机字符串
    def get_random_str(user):
        import hashlib,time
        ctime=str(time.time())
    
        md5=hashlib.md5(bytes(user,encoding="utf8")) #构建一个md5对象,使用user加盐处理
        md5.update(bytes(ctime,encoding="utf8"))
    
        return md5.hexdigest()
    #登录
    from .models import User
    from app01.service.auth import *
    class LoginView(APIView): authentication_classes = [TokenAuth, ] # [TokenAuth(),] 这里只是做认证演示,登录是不需要认证的,
    可以这样设置authentication_classes=[],当设置全局的认证组件的时候,这样做登录就不需要认证了
    def post(self,request):# name=request.data.get("name") pwd=request.data.get("pwd") user=User.objects.filter(name=name,pwd=pwd).first() res = {"state_code": 1000, "msg": None}# if user: random_str=get_random_str(user.name) #取随机字符串 token=Token.objects.update_or_create(user=user,defaults={"token":random_str}) #更新token表 res["token"]=random_str# else: res["state_code"]=1001 #错误状态码 res["msg"] = "用户名或者密码错误"# import json return Response(json.dumps(res,ensure_ascii=False))#

    以上只是对登录进行了认证

    2 全局视图认证组件 

    settings.py配置如下:

    REST_FRAMEWORK={
        "DEFAULT_AUTHENTICATION_CLASSES":["app01.service.auth.TokenAuth",]
    }

     二、权限组件

    1 局部视图权限

    源码:流程

    class APIView(View):
    
        permission_classes = api_settings.DEFAULT_PERMISSION_CLASSES (7)
        #用户定义了自己的权限就用用户定义的,用户没有定义就使用这里的
        #……
        
        def get_permissions(self):# (5)
            """
            Instantiates and returns the list of permissions that this view requires.
            """
            return [permission() for permission in self.permission_classes]#(6)
    
        #……
        def check_permissions(self, request):#  (3)
            """
            Check if the request should be permitted.
            Raises an appropriate exception if the request is not permitted.
            """
            for permission in self.get_permissions():# (4)
                if not permission.has_permission(request, self): #self是当前的view
                    self.permission_denied(
                        request, message=getattr(permission, 'message', None)#
                    )
        #……
        def initial(self, request, *args, **kwargs):#
    
            #认证组件
            self.perform_authentication(request)
            #权限组件
            self.check_permissions(request)   (2)
            self.check_throttles(request)
    
        def dispatch(self, request, *args, **kwargs):# 
            #……
            try:
                self.initial(request, *args, **kwargs)# (1)
            #……

    用法:

    app01models.py

    class User(models.Model):
        name=models.CharField(max_length=32)
        pwd=models.CharField(max_length=32)
        type_choices=((1,"普通用户"),(2,"VIP"),(3,"SVIP"))
        user_type=models.IntegerField(choices=type_choices,default=1)

    在app01.service.permissions.py中:

    from rest_framework.authentication import BaseAuthentication
    class SVIPPermission(BaseAuthentication):
        message="只有超级用户才能访问"
        def has_permission(self,request,view):  #根据源码写
            username=request.user #在登录成功后赋予的user,可以取出来
            user_type=User.objects.filter(name=username).first().user_type
    
            if user_type==3:
    
                return True # 通过权限认证
            else:
                return False

    app01views.py:

    from app01.service.permissions import SVIPPermission
    class AuthorModelView(viewsets.ModelViewSet):#
    
        authentication_classes = [TokenAuth,]#
        permission_classes=[SVIPPermission,]#
    
        queryset = Author.objects.all()#
        serializer_class = AuthorModelSerializers#
    

    2 全局视图权限

    settings.py配置如下:

    REST_FRAMEWORK={
        "DEFAULT_AUTHENTICATION_CLASSES":["app01.service.auth.Authentication",],
        "DEFAULT_PERMISSION_CLASSES":["app01.service.permissions.SVIPPermission",]
    }

    三、throttle(访问频率)组件

     在app01.service.throttles.py中:

    源码:和权限组件类似

    class APIView(View):
    
        throttle_classes = api_settings.DEFAULT_THROTTLE_CLASSES
    
    
        def get_throttles(self):
    
            return [throttle() for throttle in self.throttle_classes]
        #……
    
        def check_throttles(self, request):
    
            throttle_durations = []
            for throttle in self.get_throttles():
                if not throttle.allow_request(request, self):
                    throttle_durations.append(throttle.wait())
    
            if throttle_durations:
                durations = [
                    duration for duration in throttle_durations
                    if duration is not None
                ]
    
                duration = max(durations, default=None)
                self.throttled(request, duration)
                #……
    
        def initial(self, request, *args, **kwargs):
    
            # 频率组件
            self.check_throttles(request)
    
        def dispatch(self, request, *args, **kwargs):
    
            try:
                self.initial(request, *args, **kwargs)
                #……
    
    
    #settings.py
    DEFAULTS = {
    
        'DEFAULT_THROTTLE_CLASSES': [],
    
    }
    
    
    class APISettings:
    
        def __init__(self, user_settings=None, defaults=None, import_strings=None):
            if user_settings:
                self._user_settings = self.__check_user_settings(user_settings)
            self.defaults = defaults or DEFAULTS
    
        @property
        def user_settings(self):
            if not hasattr(self, '_user_settings'):
                self._user_settings = getattr(settings, 'REST_FRAMEWORK', {})
            return self._user_settings
    
        def __getattr__(self, attr):
            if attr not in self.defaults:
                raise AttributeError("Invalid API setting: '%s'" % attr)
    
            try:
                # Check if present in user settings
                val = self.user_settings[attr]
            except KeyError:
                # Fall back to defaults
                val = self.defaults[attr]
    
            # Coerce import strings into classes
            if attr in self.import_strings:
                val = perform_import(val, attr)
    
            # Cache the result
            self._cached_attrs.add(attr)
            setattr(self, attr, val)
            return val
    
    api_settings = APISettings(None, DEFAULTS, IMPORT_STRINGS)
    源码

    用法:

    from rest_framework.throttling import BaseThrottle
    
    VISIT_RECORD={}
    class VisitThrottle(BaseThrottle):
    
        def __init__(self):
            self.history=None
    
        def allow_request(self,request,view):
            remote_addr = request.META.get('REMOTE_ADDR')
            print(remote_addr)
            import time
            ctime=time.time()
    
            if remote_addr not in VISIT_RECORD:
                VISIT_RECORD[remote_addr]=[ctime,]
                return True
    
            history=VISIT_RECORD.get(remote_addr)
            self.history=history
    
            while history and history[-1]<ctime-60:
                history.pop()
    
            if len(history)<3:
                history.insert(0,ctime)
                return True
            else:
                return False
    
        def wait(self):
            import time
            ctime=time.time()
            return 60-(ctime-self.history[-1])

    在views.py中:

    from app01.service.throttles import *
    
    class BookViewSet(generics.ListCreateAPIView):
        throttle_classes = [VisitThrottle,]
        queryset = Book.objects.all()
        serializer_class = BookSerializers

    全局视图throttle

    REST_FRAMEWORK={
        "DEFAULT_AUTHENTICATION_CLASSES":["app01.service.auth.Authentication",],
        "DEFAULT_PERMISSION_CLASSES":["app01.service.permissions.SVIPPermission",],
        "DEFAULT_THROTTLE_CLASSES":["app01.service.throttles.VisitThrottle",]
    }

    内置throttle类

    在app01.service.throttles.py修改为:

    class VisitThrottle(SimpleRateThrottle):
    
        scope="visit_rate"
        def get_cache_key(self, request, view):
    
            return self.get_ident(request)

    settings.py设置:

    REST_FRAMEWORK={
        "DEFAULT_AUTHENTICATION_CLASSES":["app01.service.auth.Authentication",],
        "DEFAULT_PERMISSION_CLASSES":["app01.service.permissions.SVIPPermission",],
        "DEFAULT_THROTTLE_CLASSES":["app01.service.throttles.VisitThrottle",],
        "DEFAULT_THROTTLE_RATES":{
            "visit_rate":"5/m",
        }
    }

     rest framework框架访问频率限制推荐放到 redis/memecached

  • 相关阅读:
    [转]MySQL索引类型
    [转]数据结构(全)
    [转]数据结构基础概念篇
    [转]从零开始搭建创业公司后台技术栈
    将博客搬至CSDN
    编译android源代码(aosp)
    【Android】Fresco图片加载框架(二)————Producer
    【Android】Fresco图片加载框架(一)————源码简要分析
    TSL(SSL)相关内容
    谨慎使用AsyncTask
  • 原文地址:https://www.cnblogs.com/zh-xiaoyuan/p/13056424.html
Copyright © 2011-2022 走看看