频率认证
源码分析部分
def check_throttles(self, request):
for throttle in self.get_throttles():
if not throttle.allow_request(request, self):
self.throttled(request, throttle.wait())
def throttled(self, request, wait):
#抛异常,可以自定义异常,实现错误信息的中文显示
raise exceptions.Throttled(wait)
class SimpleRateThrottle(BaseThrottle):
# 咱自己写的放在了全局变量,他的在django的缓存中
cache = default_cache
# 获取当前时间,跟咱写的一样
timer = time.time
# 做了一个字符串格式化,
cache_format = 'throttle_%(scope)s_%(ident)s'
scope = None
# 从配置文件中取DEFAULT_THROTTLE_RATES,所以咱配置文件中应该配置,否则报错
THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES
def __init__(self):
if not getattr(self, 'rate', None):
# 从配置文件中找出scope配置的名字对应的值,比如咱写的‘3/m’,他取出来
self.rate = self.get_rate()
# 解析'3/m',解析成 3 m
self.num_requests, self.duration = self.parse_rate(self.rate)
# 这个方法需要重写
def get_cache_key(self, request, view):
"""
Should return a unique cache-key which can be used for throttling.
Must be overridden.
May return `None` if the request should not be throttled.
"""
raise NotImplementedError('.get_cache_key() must be overridden')
def get_rate(self):
if not getattr(self, 'scope', None):
msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
self.__class__.__name__)
raise ImproperlyConfigured(msg)
try:
# 获取在setting里配置的字典中的之,self.scope是 咱写的luffy
return self.THROTTLE_RATES[self.scope]
except KeyError:
msg = "No default throttle rate set for '%s' scope" % self.scope
raise ImproperlyConfigured(msg)
# 解析 3/m这种传参
def parse_rate(self, rate):
"""
Given the request rate string, return a two tuple of:
<allowed number of requests>, <period of time in seconds>
"""
if rate is None:
return (None, None)
num, period = rate.split('/')
num_requests = int(num)
# 只取了第一位,也就是 3/mimmmmmmm也是代表一分钟
duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
return (num_requests, duration)
# 逻辑跟咱自定义的相同
def allow_request(self, request, view):
"""
Implement the check to see if the request should be throttled.
On success calls `throttle_success`.
On failure calls `throttle_failure`.
"""
if self.rate is None:
return True
self.key = self.get_cache_key(request, view)
if self.key is None:
return True
self.history = self.cache.get(self.key, [])
self.now = self.timer()
# Drop any requests from the history which have now passed the
# throttle duration
while self.history and self.history[-1] <= self.now - self.duration:
self.history.pop()
if len(self.history) >= self.num_requests:
return self.throttle_failure()
return self.throttle_success()
# 成功返回true,并且插入到缓存中
def throttle_success(self):
"""
Inserts the current request's timestamp along with the key
into the cache.
"""
self.history.insert(0, self.now)
self.cache.set(self.key, self.history, self.duration)
return True
# 失败返回false
def throttle_failure(self):
"""
Called when a request to the API has failed due to throttling.
"""
return False
def wait(self):
"""
Returns the recommended next request time in seconds.
"""
if self.history:
remaining_duration = self.duration - (self.now - self.history[-1])
else:
remaining_duration = self.duration
available_requests = self.num_requests - len(self.history) + 1
if available_requests <= 0:
return None
return remaining_duration / float(available_requests)
自定义频率类
# 1) 自定义一个继承 SimpleRateThrottle 类 的频率类
# 2) 设置一个 scope 类属性,属性值为任意见名知意的字符串
# 3) 在settings配置文件中,配置drf的DEFAULT_THROTTLE_RATES,格式为 {scope字符串: '次数/时间'}
# 4) 在自定义频率类中重写 get_cache_key 方法
# 限制的对象返回 与限制信息有关的字符串
# 不限制的对象返回 None (只能放回None,不能是False或是''等)
例子:短信接口 1/min 频率限制
频率:api/throttles.py
from rest_framework.throttling import SimpleRateThrottle
class SMSRateThrottle(SimpleRateThrottle):
scope = 'sms'
# 只对提交手机号的get方法进行限制
def get_cache_key(self, request, view):
mobile = request.query_params.get('mobile')
# 没有手机号,就不做频率限制
if not mobile:
return None
# 返回可以根据手机号动态变化,且不易重复的字符串,作为操作缓存的key
return 'throttle_%(scope)s_%(ident)s' % {'scope': self.scope, 'ident': mobile}
配置:settings.py
# drf配置
REST_FRAMEWORK = {
# 频率限制条件配置
'DEFAULT_THROTTLE_RATES': {
'sms': '1/min'
},
}
视图:views.py
from .throttles import SMSRateThrottle
class TestSMSAPIView(APIView):
# 局部配置频率认证
throttle_classes = [SMSRateThrottle]
def get(self, request, *args, **kwargs):
return APIResponse(0, 'get 获取验证码 OK')
def post(self, request, *args, **kwargs):
return APIResponse(0, 'post 获取验证码 OK')
路由:api/url.py
url(r'^sms/$', views.TestSMSAPIView.as_view()),
限制的接口
# 只会对 /api/sms/?mobile=具体手机号 接口才会有频率限制
# 1)对 /api/sms/ 或其他接口发送无限制
# 2)对数据包提交mobile的/api/sms/接口无限制
# 3)对不是mobile(如phone)字段提交的电话接口无限制
总结:
DRF中频率认证源码部分执行流程简述
'''
先写一个频率校验的类(继承SimpleRateThrottle) ---> 在该类中 配置 scope参数---> 重写 get_cache_key 方法 并返回self.get_ident(request)---->在需要校验的类中配置 该校验类
执行步骤:
1.APIView
2.---->dispatch
3.---->self.initial(request, *args, **kwargs)
4.---->self.check_throttles(request)
5.---->throttle.allow_request(request, self)
6.---->self.get_cache_key(request, view)(自己重写的)(返回的是ip)
7.---->self.cache.get(self.key, []) 根据ip获取当前ip的 时间列表
8.---->需要用到参数(self.rate,self.num_requests,self.duration)在类实例化的时已经执行了如下代码(__init__):
8.1--> self.rate = self.get_rate()-->self.THROTTLE_RATES[self.scope]
-->api_settings.DEFAULT_THROTTLE_RATES(配置文件中找)
这里 self.rate ---> 是前面配置的 'qzk':'3/m'
8.2-->self.num_requests, self.duration = self.parse_rate(self.rate)
这里 self.num_requests 是设定的访问限制次数(3)
self.duration 是访问的时间限制(60)
9.----> 通过如上数据,并进行一通逻辑判断, 返回
True:——> 执行throttle_success:
self.history.insert(0, self.now) # 把当前访问时间添加到时间列表
# 再将 key,history,duration 在 {} 中更新
self.cache.set(self.key, self.history, self.duration)
False :---> 执行throttle_failure ,返回 Ture
'''