zoukankan      html  css  js  c++  java
  • restframework 认证、权限、频率组件

    一、认证

     1、表的关系

    class User(models.Model):
        name = models.CharField(max_length=32)
        pwd = models.CharField(max_length=64)
    
        def __str__(self):
            return self.name
    
    
    class Token(models.Model):
        users = models.OneToOneField(to="User", on_delete=models.CASCADE)
        token = models.CharField(max_length=128)
    
        def __str__(self):
            return self.token
    模型

     2、设置用户的token

    def get_random_str(user):
        import hashlib
        import time
        md5 = hashlib.md5(bytes(user, encoding='utf-8'))
        md5.update(bytes(str(time.time()), encoding='utf-8'))
        return md5.hexdigest()
    
    
    class LoginView(APIView):
        def post(self, request):
            res = {'status_code': 2000, 'msg': None}
            name = request.data.get('name')
            pwd = request.data.get('pwd')
            user_obj = User.objects.filter(name=name, pwd=pwd).first()
            if user_obj:
                token = get_random_str(user_obj.name)
                Token.objects.update_or_create(users=user_obj, defaults={'token': token})
                res['token'] = token
            else:
                res['status_code'] = 1001
                res['msg'] = "账号或密码错误"
            return JsonResponse(res)
    view 设置Token

     3、认证

    from rest_framework.authentication import BaseAuthentication

    类继承BaseAuthentication,就不用写下面的方法

      File "D:Pythonlibsite-packages est_frameworkviews.py", line 190, in get_authenticate_header
        return authenticators[0].authenticate_header(request)

        def authenticate_header(self, request):
            """
            Return a string to be used as the value of the `WWW-Authenticate`
            header in a `401 Unauthenticated` response, or `None` if the
            authentication scheme should return `403 Permission Denied` responses.
            """
            pass

     局部配置

    """
    源码:
    1.authentication_classes
    APIView 中的 dispath() -> initial找到user静态方法 ->initialize_request->Request(找到静态方法user)->_authenticate()查找 authenticators->authenticators=self.get_authenticators()->authentication_classes
    2.authenticate(self)
    authenticators 是实例化对象的列表
    authenticator 对象
    authenticator
    authenticate(self) 方法 self 是request
    """
    class TestToken:
        def authenticate(self, request):
            token = request.GET.get('token')
            # print("==============>", token)
            token_obj = Token.objects.filter(token=token).first()
            if token_obj:
                return token_obj.users.name, token_obj.token
            else:
                raise exceptions.ValidationError('token错误')
    
    
    class PublishView(mixins.ListModelMixin, mixins.CreateModelMixin, generics.GenericAPIView):
        authentication_classes = [TestToken]
        queryset = Publish.objects.all()
        serializer_class = PublishSerializer
    
        def get(self, request, *args, **kwargs):
            return self.list(request, *args, **kwargs)
    
        def post(self, request, *args, **kwargs):
            return self.create(request, *args, **kwargs)
    
    
    from rest_framework.response import Response
    
    
    class PublishDetailView(APIView):
        authentication_classes = [TestToken]
        def get(self, request, pk):
            publish_obj = Publish.objects.filter(pk=pk).first()
            ps = PublishSerializer(publish_obj, many=False)
            return Response(ps.data)
    
        def put(self, request, pk):
            public_obj = Publish.objects.filter(pk=pk).first()
            ps = PublishSerializer(public_obj, data=request.data)
            if ps.is_valid():
                ps.save()
                # print(ps.data)
                return Response(ps.data)
            else:
                return Response(ps.errors)
    
        def delete(self, request, pk):
            Publish.objects.filter(pk=pk).delete()
            return Response()
    认证方法1

     注意:在视图类中可以通过 request.user和request.auth,调用认证类的两个返回值

    全局配置

    方法2

    修改配置文件,不需要每个视图类都要添加 authentication_classes

    """
    源码分析
    1.找到api_settings
    get_authenticators()-->authentication_classes-->authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
    
    
    2.APISettings-->__getattr__(作用:访问一个不存在的属性,APISettings 不存在attr这个属性)-->user_settings静态方法,获取返回值,
    如果settings.py文件中没有'REST_FRAMEWORK',则返回{}-->val = self.defaults[attr]
    
    3.若settings文件中有'REST_FRAMEWORK'-->self._user_settings,等于setting.REST_FRAMEWORK-->.user_settings[attr], 
    attr为authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES api_settings后面的值DEFAULT_AUTHENTICATION_CLASSES,
    设置authentication_classes为自己定义的类(重写authentication_classes)
    """
    REST_FRAMEWORK = {
        'DEFAULT_AUTHENTICATION_CLASSES': [
            'app01.my_test.Myking',
        ]
    }

     重点:

    self.authentication_classes 视图类中没有authentication_classes方法时,要看父类

     全局设置的时候,如果想要某个类没有认证 authentication_classes =[ ]  即可

    二、权限

    1、模型表

    给用户添加一个身份字段

    # 用户表
    class User(models.Model):
        name = models.CharField(max_length=32)
        pwd = models.CharField(max_length=32)
        per_choice = ((1, '普通用户'), (2, 'vip'))
        permission = models.IntegerField(choices=per_choice, default=1)
    
        def __str__(self):
            return self.name

    2、自定义权限类

    方法名称和用法,看源码(APIView->dispatch()->initial()->check_permissions())

    class PermissionSet:
        # 自定义message字符串,被has_permission方法调用
        message = "只有vip用户才有权限访问,请充值"
    
        def has_permission(self, request, view):
            # 先认证再权限,request.user是认证组件里的
            name = request.user
            print("=========>", name)
            per = User.objects.filter(name=name).first().permission
            if per == 2:
                return True
            else:
                return False

    3、配置

    a、局部

    视图类中添加

    permission_classes = [PermissionSet]

    b、全局

    配置settings.py文件,看源码

    REST_FRAMEWORK = {
        DEFAULT_PERMISSION_CLASSES': [
            'app01.my_test.PermissionSet',
        ],
    }

    如果想要某个视图类没有权限的限制,permission_classes = [ ]

    三、频率

    参考链接

    https://www.cnblogs.com/wdliu/p/9114537.html
    https://www.cnblogs.com/liuqingzheng/articles/9766408.html

    1、内部原理简述

    定义字典用来存放数据
    {
    IP1:[第三次请求时间,第二次请求时间,第一次请求时间,],
    IP2:[第二次请求时间,第一次请求时间,],
    .....
    }
    举例说明,比如我现在配置了5秒内只能访问2次,每次请求到达频率控制时候先判断请求者IP是否已经在这个请求字典中,若存在,在判断用户请求5秒内的请求次数,若次数小于等于2,则允许请求,若大于2,则超过频率,不允许请求。
    关于请求频率的的算法(以5秒内最多访问两次为例):
    1.首先删除掉列表里5秒之前的请求,循环判断当前请求时间和最早请求时间之差记作t1,若t1大于5则代表列表中最早的请求已经在5秒外了,删除掉,继续判断倒数第二个请求,直到t1小于5.
    2.当确保请求列表中只有5秒内请求时候,接着判断其请求次数(列表长度),若长度大于2,则证明超过5秒内访问超过2次了,则不允许,否则,通过并将此次访问时间插入到列表最前面,作为最新访问时间。

    先剔除不合适的,再根据访问频率进行计算

    2、源码解析

    1.APIView->dispatch()->initial()->check_throttles()->get_throttles()->[throttle() for throttle in self.throttle_classes] 类的实例化对象->throttle_classes(全局变量)
    2.方法
    allow_request()->SimpleRateThrottle类->allow_request()
        self.rate -> self.get_rate()==>必须设置`.scope` or `.rate`==>THROTTLE_RATES(api_settings.DEFAULT_THROTTLE_RATES 为空)
    参数:
        a、self.key -> self.get_cache_key()==>必须override
        b、self.history -> self.cache.get() ==> cache -> default_cache -> 类DefaultCacheProxy()
             def __getattr__(self, name):
                return getattr(caches[DEFAULT_CACHE_ALIAS], name)
        c、self.now -> time.time
    
    while self.history and self.history[-1] <= self.now - self.duration: 频率判断实现原理,删除超过时间的请求
    self.num_requests, self.duration = self.parse_rate(self.rate) --> parse_rate(self.rate)-> rate -> get_rate() 格式化速率  scope = 'xx'  'xx'=5/m
    
    3.找get_ident()方法, 该方法用于获取请求的IP
    SimpleRateThrottle ->    BaseThrottle    get_ident
         xff = request.META.get('HTTP_X_FORWARDED_FOR')
        remote_addr = request.META.get('REMOTE_ADDR')
        #这里request是封装以后的requst,django原生的是request._request.META 这样也可以获取
    4.def wait(self):返回建议的等待秒数
    下一个请求。

    3内置频率控制类

    BaseThrottle:最基本的频率控制需要重写allow_request方法和wait方法
    AnonRateThrottle:匿名用户频率控制
    UserRateThrottle:基于SimpleRateThrottle,对用户的频率控制

    4、自定义频率控制类

    from rest_framework.throttling import BaseThrottle
    import time
    
    REQUEST_RECORD = {}  # 存储访问记录
    
    class VisitThrottle(BaseThrottle):
        '''60s内最多能访问5次'''
    
        def __init__(self):
            self.history = None
    
        def allow_request(self, request, view):
            # 获取用户ip (get_ident)
            remote_addr = self.get_ident(request)
            ctime = time.time()
    
            if remote_addr not in REQUEST_RECORD:
                REQUEST_RECORD[remote_addr] = [ctime, ]  # 保持请求的时间,形式{ip:[时间,]}
                return True  # True表示可以访问
            # 获取当前ip的历史访问记录
            history = REQUEST_RECORD.get(remote_addr)
           
            self.history = history
    
           
            while history and history[-1] < ctime - 60:
                # while循环确保每列表中是最新的60秒内的请求
                
                history.pop()
            # 访问记录小于5次,将本次请求插入到最前面,作为最新的请求
            if len(history) < 5:
                history.insert(0, ctime)
                return True
    
        def wait(self):
            '''返回等待时间'''
            ctime = time.time()
            return 60 - (ctime - self.history[-1])

    5、使用SimpleRateThrottle

    频率类

    from rest_framework.throttling import SimpleRateThrottle
    
    class VisitThrottle(SimpleRateThrottle):
        scope = "tom"  #settings配置文件中的key,用于获取配置的频率
    
        def get_cache_key(self, request, view):
            return self.get_ident(request)

    settings.py文件

    REST_FRAMEWORK = {
        #频率控制配置
        "DEFAULT_THROTTLE_CLASSES":['CBV.utils.VisitThrottle', ],   #全局配置,
        "DEFAULT_THROTTLE_RATES":{
            'tom':'3/m',   
    
        }
    }

    视图类

    class PublishView(mixins.ListModelMixin, mixins.CreateModelMixin, generics.GenericAPIView):
        # authentication_classes = [TestToken]
        # throttle_classes = [VisitThrottle, ]
        queryset = Publish.objects.all()
        serializer_class = PublishSerializer
    
        def get(self, request, *args, **kwargs):
            return self.list(request, *args, **kwargs)
    
        def post(self, request, *args, **kwargs):
            return self.create(request, *args, **kwargs)

    注意:单一视图类使用和全局使用

  • 相关阅读:
    thinkphp 事物回滚
    文字超出部分以省略号隐藏
    js倒计时
    js 日期转为时间戳
    jquery 获取url地址参数
    spreadjs 自定义菜单事件
    spreadjs 点击事件
    spreadjs 自定义上传文件单元格
    spreadjs 小记
    Json数组排序
  • 原文地址:https://www.cnblogs.com/wt7018/p/11468717.html
Copyright © 2011-2022 走看看