zoukankan      html  css  js  c++  java
  • drf三大认证补充

    频率认证

    源码分析部分

    def check_throttles(self, request):
            for throttle in self.get_throttles():
                if not throttle.allow_request(request, self):
                    self.throttled(request, throttle.wait())
        def throttled(self, request, wait):
            #抛异常,可以自定义异常,实现错误信息的中文显示
            raise exceptions.Throttled(wait)
    class SimpleRateThrottle(BaseThrottle):
        # 咱自己写的放在了全局变量,他的在django的缓存中
        cache = default_cache
        # 获取当前时间,跟咱写的一样
        timer = time.time
        # 做了一个字符串格式化,
        cache_format = 'throttle_%(scope)s_%(ident)s'
        scope = None
        # 从配置文件中取DEFAULT_THROTTLE_RATES,所以咱配置文件中应该配置,否则报错
        THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES
    
        def __init__(self):
            if not getattr(self, 'rate', None):
                # 从配置文件中找出scope配置的名字对应的值,比如咱写的‘3/m’,他取出来
                self.rate = self.get_rate()
            #     解析'3/m',解析成 3      m
            self.num_requests, self.duration = self.parse_rate(self.rate)
        # 这个方法需要重写
        def get_cache_key(self, request, view):
            """
            Should return a unique cache-key which can be used for throttling.
            Must be overridden.
    
            May return `None` if the request should not be throttled.
            """
            raise NotImplementedError('.get_cache_key() must be overridden')
        
        def get_rate(self):
            if not getattr(self, 'scope', None):
                msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
                       self.__class__.__name__)
                raise ImproperlyConfigured(msg)
    
            try:
                # 获取在setting里配置的字典中的之,self.scope是 咱写的luffy
                return self.THROTTLE_RATES[self.scope]
            except KeyError:
                msg = "No default throttle rate set for '%s' scope" % self.scope
                raise ImproperlyConfigured(msg)
        # 解析 3/m这种传参
        def parse_rate(self, rate):
            """
            Given the request rate string, return a two tuple of:
            <allowed number of requests>, <period of time in seconds>
            """
            if rate is None:
                return (None, None)
            num, period = rate.split('/')
            num_requests = int(num)
            # 只取了第一位,也就是 3/mimmmmmmm也是代表一分钟
            duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
            return (num_requests, duration)
        # 逻辑跟咱自定义的相同
        def allow_request(self, request, view):
            """
            Implement the check to see if the request should be throttled.
    
            On success calls `throttle_success`.
            On failure calls `throttle_failure`.
            """
            if self.rate is None:
                return True
    
            self.key = self.get_cache_key(request, view)
            if self.key is None:
                return True
    
            self.history = self.cache.get(self.key, [])
            self.now = self.timer()
    
            # Drop any requests from the history which have now passed the
            # throttle duration
            while self.history and self.history[-1] <= self.now - self.duration:
                self.history.pop()
            if len(self.history) >= self.num_requests:
                return self.throttle_failure()
            return self.throttle_success()
        # 成功返回true,并且插入到缓存中
        def throttle_success(self):
            """
            Inserts the current request's timestamp along with the key
            into the cache.
            """
            self.history.insert(0, self.now)
            self.cache.set(self.key, self.history, self.duration)
            return True
        # 失败返回false
        def throttle_failure(self):
            """
            Called when a request to the API has failed due to throttling.
            """
            return False
    
        def wait(self):
            """
            Returns the recommended next request time in seconds.
            """
            if self.history:
                remaining_duration = self.duration - (self.now - self.history[-1])
            else:
                remaining_duration = self.duration
    
            available_requests = self.num_requests - len(self.history) + 1
            if available_requests <= 0:
                return None
    
            return remaining_duration / float(available_requests)

    自定义频率类

    # 1) 自定义一个继承 SimpleRateThrottle 类 的频率类
    # 2) 设置一个 scope 类属性,属性值为任意见名知意的字符串
    # 3) 在settings配置文件中,配置drf的DEFAULT_THROTTLE_RATES,格式为 {scope字符串: '次数/时间'}
    # 4) 在自定义频率类中重写 get_cache_key 方法
        # 限制的对象返回 与限制信息有关的字符串
        # 不限制的对象返回 None (只能放回None,不能是False或是''等)

    例子:短信接口 1/min 频率限制

    频率:api/throttles.py
    from rest_framework.throttling import SimpleRateThrottle
    
    class SMSRateThrottle(SimpleRateThrottle):
        scope = 'sms'
    
        # 只对提交手机号的get方法进行限制
        def get_cache_key(self, request, view):
            mobile = request.query_params.get('mobile')
            # 没有手机号,就不做频率限制
            if not mobile:
                return None
            # 返回可以根据手机号动态变化,且不易重复的字符串,作为操作缓存的key
            return 'throttle_%(scope)s_%(ident)s' % {'scope': self.scope, 'ident': mobile}
    配置:settings.py
    # drf配置
    REST_FRAMEWORK = {
        # 频率限制条件配置
        'DEFAULT_THROTTLE_RATES': {
            'sms': '1/min'
        },
    }
    视图:views.py
    from .throttles import SMSRateThrottle
    class TestSMSAPIView(APIView):
        # 局部配置频率认证
        throttle_classes = [SMSRateThrottle]
        def get(self, request, *args, **kwargs):
            return APIResponse(0, 'get 获取验证码 OK')
        def post(self, request, *args, **kwargs):
            return APIResponse(0, 'post 获取验证码  OK')
    路由:api/url.py
    url(r'^sms/$', views.TestSMSAPIView.as_view()),
    限制的接口
    # 只会对 /api/sms/?mobile=具体手机号 接口才会有频率限制
    # 1)对 /api/sms/ 或其他接口发送无限制
    # 2)对数据包提交mobile的/api/sms/接口无限制
    # 3)对不是mobile(如phone)字段提交的电话接口无限制

    总结:

    DRF中频率认证源码部分执行流程简述

    '''
        先写一个频率校验的类(继承SimpleRateThrottle) ---> 在该类中 配置 scope参数---> 重写 get_cache_key 方法 并返回self.get_ident(request)---->在需要校验的类中配置 该校验类
        
        
        执行步骤:
            1.APIView
            2.---->dispatch
            3.---->self.initial(request, *args, **kwargs)
            4.---->self.check_throttles(request)
            5.---->throttle.allow_request(request, self)
            6.---->self.get_cache_key(request, view)(自己重写的)(返回的是ip)
            7.---->self.cache.get(self.key, []) 根据ip获取当前ip的 时间列表
            8.---->需要用到参数(self.rate,self.num_requests,self.duration)在类实例化的时已经执行了如下代码(__init__):
                    8.1--> self.rate = self.get_rate()-->self.THROTTLE_RATES[self.scope]
                        -->api_settings.DEFAULT_THROTTLE_RATES(配置文件中找)
                        这里 self.rate  ---> 是前面配置的 'qzk':'3/m'
                    8.2-->self.num_requests, self.duration = self.parse_rate(self.rate)
                        这里 self.num_requests 是设定的访问限制次数(3)
                            self.duration  是访问的时间限制(60)
            9.----> 通过如上数据,并进行一通逻辑判断, 返回
                    True:——> 执行throttle_success:
                                self.history.insert(0, self.now)  # 把当前访问时间添加到时间列表
                                # 再将 key,history,duration 在 {} 中更新
                                self.cache.set(self.key, self.history, self.duration)
                    False :--->  执行throttle_failure ,返回 Ture
    '''
  • 相关阅读:
    雪花算法 适用于ID 自增
    SB ,mybatis generator 插件 实现 更新操作
    spark优化
    Hive的导入导出方式汇总
    推荐系统架构图
    DBScan算法
    机器学习-逻辑回归算法
    机器学习-微博精准营销
    机器学习-TF-IDF算法
    机器学习-KNN识别手写数字
  • 原文地址:https://www.cnblogs.com/AbrahamChen/p/11729595.html
Copyright © 2011-2022 走看看