一、什么是RESTful
- REST与技术无关,代表的是一种软件架构风格,REST是Representational State Transfer的简称,中文翻译为“表征状态转移”
- REST从资源的角度类审视整个网络,它将分布在网络中某个节点的资源通过URL进行标识,客户端应用通过URL来获取资源的表征,获得这些表征致使这些应用转变状态
- REST与技术无关,代表的是一种软件架构风格,REST是Representational State Transfer的简称,中文翻译为“表征状态转移”
- 所有的数据,不过是通过网络获取的还是操作(增删改查)的数据,都是资源,将一切数据视为资源是REST区别与其他架构风格的最本质属性
- 对于REST这种面向资源的架构风格,有人提出一种全新的结构理念,即:面向资源架构(ROA:Resource Oriented Architecture)
二、认证补充:
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.authentication import BaseAuthentication from rest_framework.permissions import BasePermission from rest_framework.request import Request from rest_framework import exceptions token_list = [ 'sfsfss123kuf3j123', 'asijnfowerkkf9812', ] class TestAuthentication(BaseAuthentication): def authenticate(self, request): """ 用户认证,如果验证成功后返回元组: (用户,用户Token) :param request: :return: None,表示跳过该验证; 如果跳过了所有认证,默认用户和Token和使用配置文件进行设置 self._authenticator = None if api_settings.UNAUTHENTICATED_USER: self.user = api_settings.UNAUTHENTICATED_USER() # 默认值为:匿名用户 else: self.user = None if api_settings.UNAUTHENTICATED_TOKEN: self.auth = api_settings.UNAUTHENTICATED_TOKEN()# 默认值为:None else: self.auth = None (user,token)表示验证通过并设置用户名和Token; AuthenticationFailed异常 """ val = request.query_params.get('token') if val not in token_list: raise exceptions.AuthenticationFailed("用户认证失败") return ('登录用户', '用户token') def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """ pass class TestPermission(BasePermission): message = "权限验证失败" def has_permission(self, request, view): """ 判断是否有权限访问当前请求 Return `True` if permission is granted, `False` otherwise. :param request: :param view: :return: True有权限;False无权限 """ if request.user == "管理员": return True # GenericAPIView中get_object时调用 def has_object_permission(self, request, view, obj): """ 视图继承GenericAPIView,并在其中使用get_object时获取对象时,触发单独对象权限验证 Return `True` if permission is granted, `False` otherwise. :param request: :param view: :param obj: :return: True有权限;False无权限 """ if request.user == "管理员": return True class TestView(APIView): # 认证的动作是由request.user触发 authentication_classes = [TestAuthentication, ] # 权限 # 循环执行所有的权限 permission_classes = [TestPermission, ] def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET请求,响应内容') def post(self, request, *args, **kwargs): return Response('POST请求,响应内容') def put(self, request, *args, **kwargs): return Response('PUT请求,响应内容') views.py
# 2 class MyAuthtication(BasicAuthentication): 3 def authenticate(self, request): 4 token = request.query_params.get('token') #注意是没有GET的,用query_params表示 5 if token == 'zxxzzxzc': 6 return ('uuuuuu','afsdsgdf') #返回user,auth 7 # raise AuthenticationFailed('认证错误') #只要抛出认证错误这样的异常就会去执行下面的函数 8 raise APIException('认证错误') 9 def authenticate_header(self, request): #认证不成功的时候执行 10 return 'Basic reala="api"' 11 12 class UserView(APIView): 13 authentication_classes = [MyAuthtication,] 14 def get(self,request,*args,**kwargs): 15 print(request.user) 16 print(request.auth) 17 return Response('用户列表')
django的中间件比rest_framework执行的早
认证的功能放到中间件也是可以做的
认证一般做,检查用户是否存在,如果存在request.user/request.auth;不存在request.user/request.auth=None
认证小总结:
---类:authenticate/authenticate_header
---返回值:None,元组(user,auth),异常
---配置:
---视图:
class IndexView(APIView):
authentication_classes = [MyAuthentication,]
---全局
REST_FRAMEWORK = {
'UNAUTHENTICATED_USER': None,
'UNAUTHENTICATED_TOKEN': None,
"DEFAULT_AUTHENTICATION_CLASSES": [
# "app02.utils.MyAuthentication",
],
}
三、权限
权限的应用:
from django.conf.urls import url from django.contrib import admin from app01 import views from app02 import views as app02_view from app03 import views as app03_view from app04 import views as app04_view urlpatterns = [ django rest framework url(r'^auth/', app02_view.AuthView.as_view()), url(r'^hosts/', app02_view.HostView.as_view()), url(r'^users/', app02_view.UserView.as_view()), url(r'^salary/', app02_view.SalaryView.as_view()), ]
from django.views import View from rest_framework.views import APIView from rest_framework.authentication import SessionAuthentication from rest_framework.authentication import BasicAuthentication from rest_framework.authentication import BaseAuthentication from rest_framework.permissions import AllowAny from rest_framework.renderers import JSONRenderer,BrowsableAPIRenderer from rest_framework.request import Request from rest_framework.exceptions import APIException,AuthenticationFailed from rest_framework import exceptions from rest_framework.response import Response from app02 import models import hashlib import time class AuthView(APIView): authentication_classes=[] def get(self,request): """ 接收用户名和密码 :param request: :return: """ ret = {'code':1000,'msg':None} user = request.query_params.get('user') pwd = request.query_params.get('pwd') obj = models.UserInfo.objects.filter(username=user,password=pwd).first() if not obj: ret['code'] = 1001 ret['msg'] = "用户名或密码错误" return Response(ret) # 创建随机字符串 ctime = time.time() key = "%s|%s" %(user,ctime) m = hashlib.md5() m.update(key.encode('utf-8')) token = m.hexdigest() # 保存到数据 obj.token = token obj.save() ret['token'] = token return Response(ret) class MyAuthentication(BaseAuthentication): def authenticate(self, request): token = request.query_params.get('token') obj = models.UserInfo.objects.filter(token=token).first() if obj: return (obj.username,obj) return None def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """ # return 'Basic realm="api"' pass class MyPermission(object): message = "无权访问" def has_permission(self,request,view): if request.user: return True return False class AdminPermission(object): message = "无权访问" def has_permission(self,request,view): if request.user == 'alex': return True return False class HostView(APIView): """ 匿名用户和用户都能访问 """ authentication_classes = [MyAuthentication,] permission_classes = [] def get(self,request,*args,**kwargs): # 原来request对象,django.core.handlers.wsgi.WSGIRequest # 现在的request对象,rest_framework.request.Request self.dispatch print(request.user) # print(request.user) # print(request.auth) return Response('主机列表') class UserView(APIView): """ 用户能访问 """ authentication_classes = [MyAuthentication, ] permission_classes = [MyPermission,] def get(self,request,*args,**kwargs): return Response('用户列表') class SalaryView(APIView): """ 用户能访问 """ authentication_classes = [MyAuthentication, ] permission_classes = [MyPermission,AdminPermission,] def get(self,request,*args,**kwargs): self.dispatch return Response('薪资列表') def permission_denied(self, request, message=None): """ If request is not permitted, determine what kind of exception to raise. """ if request.authenticators and not request.successful_authenticator: raise exceptions.NotAuthenticated(detail='xxxxxxxx') raise exceptions.PermissionDenied(detail=message)
REST_FRAMEWORK = { 'UNAUTHENTICATED_USER': None, 'UNAUTHENTICATED_TOKEN': None, "DEFAULT_AUTHENTICATION_CLASSES": [ # "app02.utils.MyAuthentication", ], 'DEFAULT_PERMISSION_CLASSES':[ ], 'DEFAULT_THROTTLE_RATES':{ 'wdp_anon':'5/minute', 'wdp_user':'10/minute', } }
权限才真正的做request.user/request.auth拿到它们做是否访问的判断
权限小总结:
---类: has_permission/has_object_permission
---返回值:True、False、exceptions.PermissionDenied(detail="错误信息")
---配置:
---视图:
class IndexView(APIView):
permission_classes = [MyPermission,]
---全局:
REST_FRAMEWORK = {
"DEFAULT_PERMISSION_CLASSES": [
# "app02.utils.MyAuthentication",
],
}
四、限制访问的频率
限制访问频率的应用:
a、对匿名用户进行限制,每个用户1分钟允许访问10次
在这里用唯一标识:self.get_ident()
from django.conf.urls import url from django.contrib import admin from app01 import views from app02 import views as app02_view from app03 import views as app03_view from app04 import views as app04_view urlpatterns = [ # url(r'^limit/', app03_view.LimitView.as_view()), ]
from django.shortcuts import render from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.throttling import BaseThrottle,SimpleRateThrottle from rest_framework import exceptions RECORD = { } class MyThrottle(BaseThrottle): def allow_request(self,request,view): """ # 返回False,限制 # 返回True,通行 :param request: :param view: :return: """ """ a. 对匿名用户进行限制:每个用户1分钟允许访问10次 - 获取用户IP request 1.1.1.1 """ import time ctime = time.time() ip = self.get_ident() if ip not in RECORD: RECORD[ip] = [ctime,] else: # [4507862389234,3507862389234,2507862389234,1507862389234,] time_list = RECORD[ip] while True: val = time_list[-1] if (ctime-60) > val: time_list.pop() else: break if len(time_list) > 10: return False time_list.insert(0,ctime) return True def wait(self): import time ctime = time.time() first_in_time = RECORD[self.get_ident()][-1] wt = 60 - (ctime - first_in_time) return wt class MySimpleRateThrottle(SimpleRateThrottle): scope = "wdp" def get_cache_key(self, request, view): return self.get_ident(request) class LimitView(APIView): authentication_classes = [] permission_classes = [] throttle_classes=[MySimpleRateThrottle,] def get(self,request,*args,**kwargs): self.dispatch return Response('控制访问频率示例') def throttled(self, request, wait): """ If request is throttled, determine what kind of exception to raise. """ class MyThrottled(exceptions.Throttled): default_detail = '请求被限制.' extra_detail_singular = 'Expected available in {wait} second.' extra_detail_plural = '还需要再等待{wait}' raise MyThrottled(wait)
REST_FRAMEWORK = { 'UNAUTHENTICATED_USER': None, 'UNAUTHENTICATED_TOKEN': None, "DEFAULT_AUTHENTICATION_CLASSES": [ # "app02.utils.MyAuthentication", ], 'DEFAULT_PERMISSION_CLASSES':[ ], 'DEFAULT_THROTTLE_RATES':{ 'wdp_anon':'5/minute', 'wdp_user':'10/minute', } }
b、对匿名用户进行限制:每个用户1分钟 允许访问5次,登录用户1分钟允许访问10次,VIP1分钟允许访问20次
from django.conf.urls import url from django.contrib import admin from app01 import views from app02 import views as app02_view from app03 import views as app03_view from app04 import views as app04_view urlpatterns = [ # url(r'^limit/', app03_view.LimitView.as_view()), url(r'^index/', app04_view.IndexView.as_view()), url(r'^manage/', app04_view.ManageView.as_view()), ]
from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.throttling import BaseThrottle,SimpleRateThrottle from rest_framework.authentication import BaseAuthentication from app02 import models class MyAuthentication(BaseAuthentication): def authenticate(self, request): token = request.query_params.get('token') obj = models.UserInfo.objects.filter(token=token).first() if obj: return (obj.username,obj) return None def authenticate_header(self, request): pass class MyPermission(object): message = "无权访问" def has_permission(self,request,view): if request.user: return True return False class AdminPermission(object): message = "无权访问" def has_permission(self,request,view): if request.user == 'alex': return True return False class AnonThrottle(SimpleRateThrottle): scope = "wdp_anon" def get_cache_key(self, request, view): # 返回None,表示我不限制 # 登录用户我不管 if request.user: return None # 匿名用户 return self.get_ident(request) class UserThrottle(SimpleRateThrottle): scope = "wdp_user" def get_cache_key(self, request, view): # 登录用户 if request.user: return request.user # 匿名用户我不管 return None # 无需登录就可以访问 class IndexView(APIView): authentication_classes = [MyAuthentication,] permission_classes = [] throttle_classes=[AnonThrottle,UserThrottle,] def get(self,request,*args,**kwargs): self.dispatch return Response('访问首页') # 需登录就可以访问 class ManageView(APIView): authentication_classes = [MyAuthentication,] permission_classes = [MyPermission,] throttle_classes=[AnonThrottle,UserThrottle,] def get(self,request,*args,**kwargs): self.dispatch return Response('访问首页')
REST_FRAMEWORK = { 'UNAUTHENTICATED_USER': None, 'UNAUTHENTICATED_TOKEN': None, "DEFAULT_AUTHENTICATION_CLASSES": [ # "app02.utils.MyAuthentication", ], 'DEFAULT_PERMISSION_CLASSES':[ ], 'DEFAULT_THROTTLE_RATES':{ 'wdp_anon':'5/minute', 'wdp_user':'10/minute', } } CACHES = { 'default': { 'BACKEND': 'django.core.cache.backends.filebased.FileBasedCache', 'LOCATION': 'cache', } }
c、基于用户IP限制访问频率
from django.conf.urls import url, include from web.views import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]
import time from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import exceptions from rest_framework.throttling import BaseThrottle from rest_framework.settings import api_settings # 保存访问记录 RECORD = { '用户IP': [12312139, 12312135, 12312133, ] } class TestThrottle(BaseThrottle): ctime = time.time def get_ident(self, request): """ 根据用户IP和代理IP,当做请求者的唯一IP Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR if present and number of proxies is > 0. If not use all of HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR. """ xff = request.META.get('HTTP_X_FORWARDED_FOR') remote_addr = request.META.get('REMOTE_ADDR') num_proxies = api_settings.NUM_PROXIES if num_proxies is not None: if num_proxies == 0 or xff is None: return remote_addr addrs = xff.split(',') client_addr = addrs[-min(num_proxies, len(addrs))] return client_addr.strip() return ''.join(xff.split()) if xff else remote_addr def allow_request(self, request, view): """ 是否仍然在允许范围内 Return `True` if the request should be allowed, `False` otherwise. :param request: :param view: :return: True,表示可以通过;False表示已超过限制,不允许访问 """ # 获取用户唯一标识(如:IP) # 允许一分钟访问10次 num_request = 10 time_request = 60 now = self.ctime() ident = self.get_ident(request) self.ident = ident if ident not in RECORD: RECORD[ident] = [now, ] return True history = RECORD[ident] while history and history[-1] <= now - time_request: history.pop() if len(history) < num_request: history.insert(0, now) return True def wait(self): """ 多少秒后可以允许继续访问 Optionally, return a recommended number of seconds to wait before the next request. """ last_time = RECORD[self.ident][0] now = self.ctime() return int(60 + last_time - now) class TestView(APIView): throttle_classes = [TestThrottle, ] def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET请求,响应内容') def post(self, request, *args, **kwargs): return Response('POST请求,响应内容') def put(self, request, *args, **kwargs): return Response('PUT请求,响应内容') def throttled(self, request, wait): """ 访问次数被限制时,定制错误信息 """ class Throttled(exceptions.Throttled): default_detail = '请求被限制.' extra_detail_singular = '请 {wait} 秒之后再重试.' extra_detail_plural = '请 {wait} 秒之后再重试.' raise Throttled(wait)
d. 基于用户IP显示访问频率(利于Django缓存)
REST_FRAMEWORK = { 'DEFAULT_THROTTLE_RATES': { 'test_scope': '10/m', }, }
from django.conf.urls import url, include from web.views import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import exceptions from rest_framework.throttling import SimpleRateThrottle class TestThrottle(SimpleRateThrottle): # 配置文件定义的显示频率的Key scope = "test_scope" def get_cache_key(self, request, view): """ Should return a unique cache-key which can be used for throttling. Must be overridden. May return `None` if the request should not be throttled. """ if not request.user: ident = self.get_ident(request) else: ident = request.user return self.cache_format % { 'scope': self.scope, 'ident': ident } class TestView(APIView): throttle_classes = [TestThrottle, ] def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET请求,响应内容') def post(self, request, *args, **kwargs): return Response('POST请求,响应内容') def put(self, request, *args, **kwargs): return Response('PUT请求,响应内容') def throttled(self, request, wait): """ 访问次数被限制时,定制错误信息 """ class Throttled(exceptions.Throttled): default_detail = '请求被限制.' extra_detail_singular = '请 {wait} 秒之后再重试.' extra_detail_plural = '请 {wait} 秒之后再重试.' raise Throttled(wait)
e. view中限制请求频率
REST_FRAMEWORK = { 'DEFAULT_THROTTLE_RATES': { 'xxxxxx': '10/m', }, }
REST_FRAMEWORK = { 'DEFAULT_THROTTLE_RATES': { 'xxxxxx': '10/m', }, }
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import exceptions from rest_framework.throttling import ScopedRateThrottle # 继承 ScopedRateThrottle class TestThrottle(ScopedRateThrottle): def get_cache_key(self, request, view): """ Should return a unique cache-key which can be used for throttling. Must be overridden. May return `None` if the request should not be throttled. """ if not request.user: ident = self.get_ident(request) else: ident = request.user return self.cache_format % { 'scope': self.scope, 'ident': ident } class TestView(APIView): throttle_classes = [TestThrottle, ] # 在settings中获取 xxxxxx 对应的频率限制值 throttle_scope = "xxxxxx" def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET请求,响应内容') def post(self, request, *args, **kwargs): return Response('POST请求,响应内容') def put(self, request, *args, **kwargs): return Response('PUT请求,响应内容') def throttled(self, request, wait): """ 访问次数被限制时,定制错误信息 """ class Throttled(exceptions.Throttled): default_detail = '请求被限制.' extra_detail_singular = '请 {wait} 秒之后再重试.' extra_detail_plural = '请 {wait} 秒之后再重试.' raise Throttled(wait)
f. 匿名时用IP限制+登录时用Token限制
REST_FRAMEWORK = { 'UNAUTHENTICATED_USER': None, 'UNAUTHENTICATED_TOKEN': None, 'DEFAULT_THROTTLE_RATES': { 'luffy_anon': '10/m', 'luffy_user': '20/m', }, }
from django.conf.urls import url, include from web.views.s3_throttling import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.throttling import SimpleRateThrottle class LuffyAnonRateThrottle(SimpleRateThrottle): """ 匿名用户,根据IP进行限制 """ scope = "luffy_anon" def get_cache_key(self, request, view): # 用户已登录,则跳过 匿名频率限制 if request.user: return None return self.cache_format % { 'scope': self.scope, 'ident': self.get_ident(request) } class LuffyUserRateThrottle(SimpleRateThrottle): """ 登录用户,根据用户token限制 """ scope = "luffy_user" def get_ident(self, request): """ 认证成功时:request.user是用户对象;request.auth是token对象 :param request: :return: """ # return request.auth.token return "user_token" def get_cache_key(self, request, view): """ 获取缓存key :param request: :param view: :return: """ # 未登录用户,则跳过 Token限制 if not request.user: return None return self.cache_format % { 'scope': self.scope, 'ident': self.get_ident(request) } class TestView(APIView): throttle_classes = [LuffyUserRateThrottle, LuffyAnonRateThrottle, ] def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET请求,响应内容') def post(self, request, *args, **kwargs): return Response('POST请求,响应内容') def put(self, request, *args, **kwargs): return Response('PUT请求,响应内容')
限制访问的频率小总结:
---类: allow_request/wait PS: scope = "wdp_user"
---返回值:True、False
---配置:
---视图:
class IndexView(APIView):
throttle_classes=[AnonThrottle,UserThrottle,]
def get(self,request,*args,**kwargs):
self.dispatch
return Response('访问首页')
---全局:
REST_FRAMEWORK = {
"DEFAULT_THROTTLE_CLASSES":[
],
'DEFAULT_THROTTLE_RATES':{
'wdp_anon':'5/minute',
'wdp_user':'10/minute',
}
}
五、认证、权限、限制访问频率返回结果的比较:
认证、权限、限制访问频率这三个返回的结果:
认证返回的三种结果:
a、None
b、元组(user,auth)
c、异常 raise APIException(...)
权限的返回值三种结果:
a、True 有权限
b、False 没权限
c、异常
限制访问的频率返回值的两种结果:
a、True
b、False