DRF
什么是DRF?
DRF从本质上来讲, 它就是一个Django的App, 有了这样一个App, 我们就可以更好的设计出符合RESTful规范的web应用 实际上, 即便没有DRF, 我们也能够自行设计出符合RESTful规范的web应用,是一个框架
为什么要使用DRF?
DRF就是这样一个优秀的工具, 另外, 它不仅能够帮助我们快速的设计出符合RESTful规范的接口, 还提供了诸如 认证 , 权限 等等其他强大的功能.使用了drf之后,可以快速帮我们开发restful规范来开发接口
drf组件的功能:
+ 根据请求方式不同做不同操作 get/post/put/patch/delete
+ 视图,继承APIView(在内部apiview继承了django的View)
+ 解析器,解析请求体中的数据,将其变成我们想要的格式。request.data,query_params
+ 序列化,对对象或对象列表(queryset)进行序列化操作以及表单验证的功能。
+ 渲染器,渲染页面
记忆:请求到来之后,先执行视图的dispatch方法。
1. 视图
2. 版本处理
3. 认证
4. 权限
5. 节流(频率限制)
6. 解析器
7. 筛选器
8. 分页
9. 序列化
10. 渲染
DRF的应用场景
参与前后端分离项目、参与为app写接口时,用drf会比较方便。
restful规范
第一步:整体说restful规范是什么?
接口的规范,规则,程序之间做数据交互遵循的规则
第二步:再详细说restful建议
1. https代替http,保证数据传输时安全。
2. 在url中一般要体现api标识,这样看到url就知道他是一个api。
http://www.luffycity.com/api/....(建议,因为他不会存在跨域的问题)
http://api.luffycity.com/....
假设:
前段:https://www.luffycity.com/home
后端:https://www.luffycity.com/api/
3. 在接口中要体现版本
http://www.luffycity.com/api/v1....(建议,因为他不会存在跨域的问题)
注意:版本还可以放在请求头中
http://www.luffycity.com/api/
accept: ...
4. restful也称为面向资源编程,视网络上的一切都是资源,对资源可以进行操作,所以一般资源都用名词。
http://www.luffycity.com/api/user/
5. 如果要加入一些筛选条件,可以添加在url中
http://www.luffycity.com/api/user/?page=1&type=9
6. 根据method不同做不同操作。get/post/delete/patch/put
7. 返回给用户状态码
- 200,成功
- 300,301永久 /302临时
- 400,403拒绝 /404找不到
- 500,服务端代码错误
很多公司:
def get(self,request,*args,**kwargs):
result = {'code':1000,'data':None,'error':None}
try:
val = int('你好')
except Exception as e:
result['code'] = 10001
result['error'] = '数据转换错误'
return Response(result)
8. 返回值
GET http://www.luffycity.com/api/user/
[
{'id':1,'name':'alex','age':19},
{'id':1,'name':'alex','age':19},
]
POST http://www.luffycity.com/api/user/
{'id':1,'name':'alex','age':19}
GET http://www.luffycity.com/api/user/2/
{'id':2,'name':'alex','age':19}
PUT http://www.luffycity.com/api/user/2/
{'id':2,'name':'alex','age':19}
PATCH https//www.luffycity.com/api/user/2/
{'id':2,'name':'alex','age':19}
DELETE https//www.luffycity.com/api/user/2/
空
9. 操作异常时,要返回错误信息
{
error: "Invalid API key"
}
10. 对于下一个请求要返回一些接口:Hypermedia AP
{
'id':2,
'name':'alex',
'age':19,
'depart': "http://www.luffycity.com/api/user/30/"
}
安装DRF
pip3 install djangorestframework
使用DRF步骤
0.设计表结构
from django.db import models
class Category(models.Model):
"""
文章分类
"""
name = models.CharField(verbose_name='分类',max_length=32)
class Article(models.Model):
"""
文章表
"""
title = models.CharField(verbose_name='标题',max_length=32)
summary = models.CharField(verbose_name='简介',max_length=255)
content = models.TextField(verbose_name='文章内容')
1.注册drf,在settings中INSTALLED_APPS注册drf:
'rest_framework'
2.写路由
from django.conf.urls import url
from django.contrib import admin
from drf_app import views
urlpatterns = [
url(r'^admin/', admin.site.urls),
# 文章类型url
url(r'^drf/categore/$', views.DrfCategoreView.as_view()), #展示用
url(r'^drf/categore/(?P<pk>d+)/$', views.DrfCategoreView.as_view()), #编辑删除用
]
3.写视图
from django.shortcuts import render
from drf_app import models
from rest_framework.response import Response
from rest_framework.views import APIView
from django.forms import model_to_dict
# 文章类型
class DrfCategoreView(APIView): #继承APIView
#接口:实现访问接口时,创建一个文章类型
def post(self,request,*args,**kwargs): #添加文章类型数据
name = request.POST.get('name') #获取提交的内容,key_values
category_obj = models.Category(
name=name #创建文章类型数据
)
category_obj.save()
return Response('成功') #给前端返回结果
#接口:获取所有文章类型
def get(self,request,*args,**kwargs): #获取数据,在前端展示
pk = kwargs.get('pk') #获取单条数据的p值k
if not pk: #判断pk知否存在
obj = models.Article.objects.all().values()
data = list(obj)
return Response(data) #Response只接受列表,字典,字符串类型的数据
else:
obj_dict = models.Article.objects.filter(pk=pk).first()
data = model_to_dict(obj_dict) #把对象转换成字典
return Response(data)
#接口:更新文章类型
def put(self,request,*args,**kwargs): #更新数据
pk = kwargs.get('pk')
models.Category.objects.filter(id=pk).update(**request.data)
#request.data 返回解析之后的请求体数据
return Response('更新成功')
#接口:删除文章类型
def delete(self,request,*args,**kwargs): #删除数据
pk = kwargs.get('pk')
if not pk:
return Response('删除失败,没有该条数据')
models.Article.objects.filter(pk=pk).delete()
return Response('删除成功')
DRF的request.data方法
request.data 返回解析之后的请求体数据
包含了解析之后的文件和非文件数据
包含了对POST、PUT、PATCH请求方式解析后的数据
利用了REST framework的parsers解析器,不仅支持表单类型数据,也支持JSON数据
request.query_params.get('xx')类似于django的GET方法
DRF的序列化
基于上一个代码添加校验功能
from rest_framework import serializers
class NewCategorySerializer(serializers.ModelSerializer):
class Meta:
model = models.Category
# fields = "__all__"
fields = ['id','name']
class NewCategoryView(APIView):
def get(self,request,*args,**kwargs):
pk = kwargs.get('pk')
if not pk:
queryset = models.Category.objects.all()
ser = NewCategorySerializer(instance=queryset,many=True)
return Response(ser.data)
else:
model_object = models.Category.objects.filter(id=pk).first()
ser = NewCategorySerializer(instance=model_object, many=False)
return Response(ser.data)
def post(self,request,*args,**kwargs):
ser = NewCategorySerializer(data=request.data)
if ser.is_valid():
ser.save()
return Response(ser.data)
return Response(ser.errors)
def put(self,request,*args,**kwargs):
pk = kwargs.get('pk')
category_object = models.Category.objects.filter(id=pk).first()
ser = NewCategorySerializer(instance=category_object,data=request.data)
if ser.is_valid():
ser.save()
return Response(ser.data)
return Response(ser.errors)
def delete(self,request,*args,**kwargs):
pk = kwargs.get('pk')
models.Category.objects.filter(id=pk).delete()
return Response('删除成功')
#局部更新
def patch(self,request,*args,**kwargs):
pk = kwargs.get('pk')
article_obj = models.Article.objects.filter(pk=pk).first()
ser = ArticleSerializer(instance=article_obj, data=request.data,partial=True) # partial=True
if ser.is_valid():
ser.save()
return Response('更新成功')
return Response(ser.errors)
postman工具的安装使用
安装:
安装:https://www.getpostman.com/
使用:
显示关联表的数据值
from rest_framework import serializers
from drf_app import models
# 文章的Serializer
class ArticleSerializer(serializers.ModelSerializer):
class Meta:
model = models.Article
fields = '__all__' #拿到该表的全部字段
上述的代码用的这种方法来展示获取数据,全部字段但如果有关联的表的字段,会显示一下内容:
{
"id": 2,
"status": 1,
"title": "标题1",
"summary": "简介1",
"content": "标题1内容",
"category": 1
},
{
"id": 3,
"status": 1,
"title": "标题3",
"summary": "简介3",
"content": "标题3内容",
"category": 2
},
关联的category表都为数字,但想要他显示该id对应的中文,需要对Serializer进行一些操作!
对ArticleSerializer进行操作!
有四种方法:
1. from rest_framework import serializers
from drf_app import models
# 文章的Serializer
class ArticleSerializer(serializers.ModelSerializer):
category = serializers.CharField(source='category.name')
class Meta:
model = models.Article
fields = ['id','title','summary','content','category']
2. from rest_framework import serializers
from drf_app import models
# 文章的Serializer
class ArticleSerializer(serializers.ModelSerializer):
category = serializers.SerializerMethodField()
class Meta:
model = models.Article
fields = ['id','title','summary','content','category']
def get_category(self,obj): #obj是表中的一行记录的对象
return obj.category.name
3.可以直接进行筛选值:
article_obj = models.Article.objects.all().values('刷选想要的数据,可以进行跨表查')
4.
from rest_framework import serializers
from drf_app import models
# 文章的Serializer
class ArticleSerializer(serializers.ModelSerializer):
class Meta:
model = models.Article
fields = '__all__'
depth = 1 #会把关联的表的数据全部拿出来,可以理解为与当前的表关联表的个数
{
"id": 2,
"status": 1,
"title": "标题1",
"summary": "简介1",
"content": "标题1内容",
"category": {
"id": 1,
"name": "广告"
}
},
{
"id": 3,
"status": 1,
"title": "标题3",
"summary": "简介3",
"content": "标题3内容",
"category": {
"id": 2,
"name": "it"
}
},
推荐前两种!!!!!!!!!!
显示choice类型字段的对应数据
1.
from rest_framework import serializers
from drf_app import models
# 文章的Serializer
class ArticleSerializer(serializers.ModelSerializer):
category = serializers.CharField(source='category.name')
status = serializers.CharField(source='get_status_display') # get_status_display方法,不用加括号,drf中会检查get_status_display是否可以执行,自动会加括号执行
class Meta:
model = models.Article
fields = ['id','title','summary','content','category','status']
2.
from rest_framework import serializers
from drf_app import models
# 文章的Serializer
class ArticleSerializer(serializers.ModelSerializer):
category = serializers.SerializerMethodField()
class Meta:
model = models.Article
fields = ['id','title','summary','content','category','status']
def get_category(self,obj): #obj是表中的一行记录的对象
return get_status_display()
DRF分页
方式一:PageNumberPagination
针对于每页固定显示数据个数,在访问路径需要加:?page=xx,看第几页的数据
# settings.py
#定义每页显示的数据个数
REST_FRAMEWORK = {
'PAGE_SIZE': 2
}
# serializer.py
class PageArticleSerializer(serializers.ModelSerializer):
class Meta:
model = models.Article
fields = "__all__"
# view.py
from drf_app.serializer import PageArticleSerializer
from rest_framework.pagination import PageNumberPagination
class PageArticleView(APIView):
def get(self,request,*args,**kwargs):
queryset = models.Article.objects.all()
# 方式一:仅数据
# 分页对象
page_object = PageNumberPagination()
# 调用 分页对象.paginate_queryset方法进行分页,得到的结果是分页之后的数据
# result就是分完页的一部分数据
result = page_object.paginate_queryset(queryset,request,self)
# 序列化分页之后的数据
ser = PageArticleSerializer(instance=result,many=True)
return Response(ser.data)
# 方式二:数据 + 分页信息
page_object = PageNumberPagination()
result = page_object.paginate_queryset(queryset, request, self)
ser = PageArticleSerializer(instance=result, many=True)
return page_object.get_paginated_response(ser.data)
# 方式三:数据 + 部分分页信息
page_object = PageNumberPagination()
result = page_object.paginate_queryset(queryset, request, self)
ser = PageArticleSerializer(instance=result, many=True)
return Response({'count':page_object.page.paginator.count,'result':ser.data})
方式二: LimitOffsetPagination
针对于每页显示个数不固定,在访问路径上加/?offset=xx$limit=xx
from rest_framework.pagination import PageNumberPagination
from rest_framework.pagination import LimitOffsetPagination
from rest_framework import serializers
class PageArticleSerializer(serializers.ModelSerializer):
class Meta:
model = models.Article
fields = "__all__"
class HulaLimitOffsetPagination(LimitOffsetPagination):
max_limit = 2
class PageArticleView(APIView):
def get(self,request,*args,**kwargs):
queryset = models.Article.objects.all()
page_object = HulaLimitOffsetPagination()
result = page_object.paginate_queryset(queryset, request, self)
ser = PageArticleSerializer(instance=result, many=True)
return Response(ser.data)
分页扩展:
# url
url(r'^page/view/article/$', views.PageViewArticleView.as_view()),
# view.py
from rest_framework.generics import ListAPIView
class PageViewArticleSerializer(serializers.ModelSerializer):
class Meta:
model = models.Article
fields = "__all__"
class PageViewArticleView(ListAPIView):
queryset = models.Article.objects.all()
serializer_class = PageViewArticleSerializer
# 源码里调用执行,在settings中配置用哪种分页方法
# settings.py
REST_FRAMEWORK = {
"PAGE_SIZE":2,
"DEFAULT_PAGINATION_CLASS":"rest_framework.pagination.PageNumberPagination"
}
筛选
class ArticleView(APIView):
""" 文章视图类 """
def get(self,request,*args,**kwargs):
""" 获取文章列表 """
pk = kwargs.get('pk')
if not pk:
condition = {}
category = request.query_params.get('category')
if category:
condition['category'] = category
queryset = models.Article.objects.filter(**condition).order_by('date') pager = PageNumberPagination()
result = pager.paginate_queryset(queryset,request,self)
ser = ArticleListSerializer(instance=result,many=True)
return Response(ser.data)
article_object = models.Article.objects.filter(id=pk).first()
ser = PageArticleSerializer(instance=article_object,many=False)
return Response(ser.data)
DRF内置组件筛选
from django.shortcuts import render
from rest_framework.views import APIView
from rest_framework.response import Response
from . import models
from rest_framework.filters import BaseFilterBackend
class MyFilterBackend(BaseFilterBackend):
def filter_queryset(self, request, queryset, view):
val = request.query_params.get('cagetory')
return queryset.filter(category_id=val)
class IndexView(APIView):
def get(self,request,*args,**kwargs):
# http://www.xx.com/cx/index/
# models.News.objects.all()
# http://www.xx.com/cx/index/?category=1
# models.News.objects.filter(category=1)
# http://www.xx.com/cx/index/?category=1
# queryset = models.News.objects.all()
# obj = MyFilterBackend()
# result = obj.filter_queryset(request,queryset,self)
# print(result)
return Response('...')
DRF视图
class TagSer(serializers.ModelSerializer):
category = serializers.CharField(source="get_category_display")
author = serializers.SerializerMethodField()
class Meta:
model = models.Article
fields = "__all__"
def get_author(self,obj):
return obj.author.username
class AddTagSer(serializers.ModelSerializer):
class Meta:
model = models.Article
fields = "__all__"
# generics的展示,添加
class NewArticleView(ListAPIView,CreateAPIView):
queryset = models.Article.objects.all() #提供全部数据
serializer_class = TagSer #提供序列化器,源码中的serializer_class为None
def get_serializer_class(self): #源码中提供的方法 返回值为return self.serializer_class,所以可以自定义serializer_class一个序列化器
if self.request.method == 'GET':
return TagSer
elif self.request.method == 'POST':
return AddTagSer
def perform_create(self,serializer):
serializer.save(author_id=2) #多添加一个存储数据,关联的数据
# generics的更新,删除
class OldArticleView(DestroyAPIView,UpdateAPIView,RetrieveAPIView):
queryset = models.Article.objects.all()
serializer_class = TagSer
ListAPIView:展示 执行get --->list
CreateAPIView:添加 执行perform_create方法
DestroyAPIView:删除 执行delete方法
UpdateAPIView:更新 执行perform_update方法
RetrieveAPIView:展示单条数据,不能和ListAPIView同用,因为都有get方法 , 执行retrieve方法
instance = self.get_object()获取的单条数据
当GenericAPIView提供的方法不能满足要求是,需要自定义方法
drf视图源码大致流程
# GenericAPIView提供了一些方法: 取数据,筛选,分页,验证等方法,充当一个桥梁的作用
get_object
get_serializer--->self.get_serializer_class()
get_queryset---->self.queryset
filter_queryset--->self.filter_backends
paginator_queryset--->self.pagination_class()
# ListAPIView源码,当发送一个请求时(get请求)源码的执行流程,一个get请求
class NewsView(ListAPIView):
queryset = models.News.objects.all()
filter_backends = [NewFilterBackend, ]
serializer_class = NewSerializers
pagination_class = PageNumberPagination
1.去NewsView找get方法
def get(self, request, *args, **kwargs):
return self.list(request, *args, **kwargs)
2.执行self.list方法,本类中没有该方法,去父类中找list方法,执行list方法:
def list(self, request, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset()) #筛选
page = self.paginate_queryset(queryset) #分页
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True) #校验
return Response(serializer.data)
3.
先执行list中的get_queryset()方法,首先去自己的类中找get_queryset()方法,没有就去父类中找get_queryset(self)方法,
调用了一个self.queryset,先去自己的类中找,自己的类中定义了queryset,接着执行list中filter_queryset方法,
自己的类中没有该方法,去父类中找,父类中没有再去另一个父类中找,去GenericAPIView中找filter_queryset方法,
在执行循环filter_backends方法,自己有定义filter_backends,是一个列表,循环中实例化序列化器,在执行filter_queryset方法
DRF版本控制
第一步:写路由url(r'^api/(P<version>w+)/user/$',views.UserView.as_view()),
第二步:写模块导入from rest_framework.versioning import URLPathVersioning
第三步:写视图 可不写
request.version获取版本号
class UserView(APIView): # DEFAULT_VERSIONING_CLASS在APIView中默认配置
def get(self,request,*args,**kwargs):
print(request.version)
return Response('....')
第四步:写settings配置:
REST_FRAMEWORK = {
"DEFAULT_VERSIONING_CLASS": "rest_framework.versioning.URLPathVersioning", #配置全局的版本信息
"ALLOWED_VERSIONS":['v1','v2'] #配置允许版本号范围
}
源码
class APIView(View):
versioning_class = api_settings.DEFAULT_VERSIONING_CLASS
def dispatch(self, request, *args, **kwargs):
# ###################### 第一步 ###########################
"""
request,是django的request,它的内部有:request.GET/request.POST/request.method
args,kwargs是在路由中匹配到的参数,如:
url(r'^order/(d+)/(?P<version>w+)/$', views.OrderView.as_view()),
http://www.xxx.com/order/1/v2/
"""
self.args = args
self.kwargs = kwargs
"""
request = 生成了一个新的request对象,此对象的内部封装了一些值。
request = Request(request)
- 内部封装了 _request = 老的request
"""
request = self.initialize_request(request, *args, **kwargs)
self.request = request
self.headers = self.default_response_headers # deprecate?
try:
# ###################### 第二步 ###########################
self.initial(request, *args, **kwargs)
执行视图函数。。
def initial(self, request, *args, **kwargs):
# ############### 2.1 处理drf的版本 ##############
version, scheme = self.determine_version(request, *args, **kwargs)
request.version, request.versioning_scheme = version, scheme
...
def determine_version(self, request, *args, **kwargs):
if self.versioning_class is None:
return (None, None)
scheme = self.versioning_class() # obj = XXXXXXXXXXXX()
return (scheme.determine_version(request, *args, **kwargs), scheme)
class OrderView(APIView):
versioning_class = URLPathVersioning
def get(self,request,*args,**kwargs):
print(request.version)
print(request.versioning_scheme)
return Response('...')
def post(self,request,*args,**kwargs):
return Response('post')
class URLPathVersioning(BaseVersioning):
"""
urlpatterns = [
url(r'^(?P<version>[v1|v2]+)/users/$', users_list, name='users-list'),
]
"""
invalid_version_message = _('Invalid version in URL path.')
def determine_version(self, request, *args, **kwargs):
version = kwargs.get(self.version_param, self.default_version)
if version is None:
version = self.default_version
if not self.is_allowed_version(version):
raise exceptions.NotFound(self.invalid_version_message)
return version
使用(局部)
-
url中写version
url(r'^(?P<version>[v1|v2]+)/users/$', users_list, name='users-list'),
-
在视图中应用
from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.request import Request from rest_framework.versioning import URLPathVersioning class OrderView(APIView): versioning_class = URLPathVersioning def get(self,request,*args,**kwargs): print(request.version) print(request.versioning_scheme) return Response('...') def post(self,request,*args,**kwargs): return Response('post')
-
在settings中配置
REST_FRAMEWORK = { "PAGE_SIZE":2, "DEFAULT_PAGINATION_CLASS":"rest_framework.pagination.PageNumberPagination", "ALLOWED_VERSIONS":['v1','v2'], 'VERSION_PARAM':'version' }
使用(全局)推荐
-
url中写version
url(r'^(?P<version>[v1|v2]+)/users/$', users_list, name='users-list'), url(r'^(?P<version>w+)/users/$', users_list, name='users-list'),
-
在视图中应用
from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.request import Request from rest_framework.versioning import URLPathVersioning class OrderView(APIView): def get(self,request,*args,**kwargs): print(request.version) print(request.versioning_scheme) return Response('...') def post(self,request,*args,**kwargs): return Response('post')
-
在settings中配置
REST_FRAMEWORK = { "PAGE_SIZE":2, "DEFAULT_PAGINATION_CLASS":"rest_framework.pagination.PageNumberPagination", "DEFAULT_VERSIONING_CLASS":"rest_framework.versioning.URLPathVersioning", "ALLOWED_VERSIONS":['v1','v2'], 'VERSION_PARAM':'version' }
DRF认证
认证(面试)
from django.conf.urls import url,include
from django.contrib import admin
from . import views
urlpatterns = [
url(r'^login/$', views.LoginView.as_view()),
url(r'^order/$', views.OrderView.as_view()),
url(r'^user/$', views.UserView.as_view()),
]
import uuid
from django.shortcuts import render
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from rest_framework.versioning import URLPathVersioning
from rest_framework.views import APIView
from rest_framework.response import Response
from . import models
class LoginView(APIView):
def post(self,request,*args,**kwargs):
user_object = models.UserInfo.objects.filter(**request.data).first()
if not user_object:
return Response('登录失败')
random_string = str(uuid.uuid4())
user_object.token = random_string
user_object.save()
return Response(random_string)
class MyAuthentication:
def authenticate(self, request):
"""
Authenticate the request and return a two-tuple of (user, token).
"""
token = request.query_params.get('token')
user_object = models.UserInfo.objects.filter(token=token).first()
if user_object:
return (user_object,token)
return (None,None)
class OrderView(APIView):
authentication_classes = [MyAuthentication, ]
def get(self,request,*args,**kwargs):
print(request.user)
print(request.auth)
return Response('order')
class UserView(APIView):
authentication_classes = [MyAuthentication,]
def get(self,request,*args,**kwargs):
print(request.user)
print(request.auth)
return Response('user')
源码分析
class Request:
def __init__(self, request,authenticators=None):
self._request = request
self.authenticators = authenticators or ()
@property
def user(self):
"""
Returns the user associated with the current request, as authenticated
by the authentication classes provided to the request.
"""
if not hasattr(self, '_user'):
with wrap_attributeerrors():
self._authenticate()
return self._user
def _authenticate(self):
"""
Attempt to authenticate the request using each authentication instance
in turn.
"""
for authenticator in self.authenticators: #认证的对象列表[TokenAuthentication,]
try:
user_auth_tuple = authenticator.authenticate(self)
except exceptions.APIException:
self._not_authenticated()
raise
if user_auth_tuple is not None:
self._authenticator = authenticator
self.user, self.auth = user_auth_tuple
return
self._not_authenticated()
@user.setter
def user(self, value):
"""
Sets the user on the current request. This is necessary to maintain
compatibility with django.contrib.auth where the user property is
set in the login and logout functions.
Note that we also set the user on Django's underlying `HttpRequest`
instance, ensuring that it is available to any middleware in the stack.
"""
self._user = value
self._request.user = value
class APIView(View):
authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
def dispatch(self, request, *args, **kwargs):
"""
`.dispatch()` is pretty much the same as Django's regular dispatch,
but with extra hooks for startup, finalize, and exception handling.
"""
# ###################### 第一步 ###########################
"""
request,是django的request,它的内部有:request.GET/request.POST/request.method
args,kwargs是在路由中匹配到的参数,如:
url(r'^order/(d+)/(?P<version>w+)/$', views.OrderView.as_view()),
http://www.xxx.com/order/1/v2/
"""
self.args = args
self.kwargs = kwargs
"""
request = 生成了一个新的request对象,此对象的内部封装了一些值。
request = Request(request)
- 内部封装了 _request = 老的request
- 内部封装了 authenticators = [MyAuthentication(), ]
"""
request = self.initialize_request(request, *args, **kwargs)
self.request = request
def initialize_request(self, request, *args, **kwargs):
"""
Returns the initial request object.
"""
parser_context = self.get_parser_context(request)
return Request(
request,
parsers=self.get_parsers(),
authenticators=self.get_authenticators(), # [MyAuthentication(),]
negotiator=self.get_content_negotiator(),
parser_context=parser_context
)
def get_authenticators(self):
"""
Instantiates and returns the list of authenticators that this view can use.
"""
return [ auth() for auth in self.authentication_classes ]
# 执行上一个代码块的class Request,判断user
# 执行上一个代码块的class Request,判断user
# 执行上一个代码块的class Request,判断user
class LoginView(APIView):
authentication_classes = []
def post(self,request,*args,**kwargs):
user_object = models.UserInfo.objects.filter(**request.data).first()
if not user_object:
return Response('登录失败')
random_string = str(uuid.uuid4())
user_object.token = random_string
user_object.save()
return Response(random_string)
class OrderView(APIView):
# authentication_classes = [TokenAuthentication, ]
def get(self,request,*args,**kwargs):
print(request.user)
print(request.auth)
if request.user:
return Response('order')
return Response('滚')
class UserView(APIView):
同上
总结
当用户发来请求时,找到认证的所有类并实例化成为对象列表,然后将对象列表封装到新的request对象中。
以后在视同中调用request.user
在内部会循环认证的对象列表,并执行每个对象的authenticate方法,该方法用于认证,他会返回两个值分别会赋值给
request.user/request.auth
DRF权限
权限
使用
也可以把权限全局应用,添加到settings中
from rest_framework.permissions import BasePermission
from rest_framework import exceptions
class MyPermission(BasePermission):
message = {'code': 10001, 'error': '你没权限'} #没有权限的报错信息
def has_permission(self, request, view):
"""
Return `True` if permission is granted, `False` otherwise.
"""
if request.user:
return True
# raise exceptions.PermissionDenied({'code': 10001, 'error': '你没权限'})
return False
def has_object_permission(self, request, view, obj): #使用单条数据的时候
"""
Return `True` if permission is granted, `False` otherwise.
"""
return False
class OrderView(APIView):
permission_classes = [MyPermission,] #在这个视图中添加权限
# permission_classes = [] 不需要加权限就把改列表设置为空
def get(self,request,*args,**kwargs):
return Response('order')
class UserView(APIView):
permission_classes = [MyPermission, ]
def get(self,request,*args,**kwargs):
return Response('user')
REST_FRAMEWORK = {
"PAGE_SIZE":2,
"DEFAULT_PAGINATION_CLASS":"rest_framework.pagination.PageNumberPagination",
"DEFAULT_VERSIONING_CLASS":"rest_framework.versioning.URLPathVersioning",
"ALLOWED_VERSIONS":['v1','v2'],
'VERSION_PARAM':'version',
"DEFAULT_AUTHENTICATION_CLASSES":["kka.auth.TokenAuthentication",]
}
在同一个视图中对不同的功能进行权限
方式一:
class MyPermission(BasePermission):
message = '你没有权限'
def has_permission(self, request, view): #获取多个对象的时候执行
if request.method == "GET": #对用户请求进行权限添加,当用户的请求为get方法,就不加权限,否则就添加权限
return True
else:
if request.user and request.auth: #有权限
print(request.user)
return True
return False #无权限
def has_object_permission(self, request, view, obj): #获取单个对象的时候执行
return self.has_permission(request, view)
# 其余的同上
方式二:
# 加装饰器
from functools import update_wrapper
# 同一个视图中对不同功能添加权限的装饰器
def wrap_permission(*permissions, validate_permission=True):
"""custom permissions for special route"""
def decorator(func):
def wrapper(self, request, *args, **kwargs):
self.permission_classes = permissions
if validate_permission:
self.check_permissions(request)
return func(self, request, *args, **kwargs)
return update_wrapper(wrapper, func)
return decorator
def permission_classes(permission_classes):
def decorator(func):
func.permission_classes = permission_classes
return func
return decorator
class HandleArticleView(UpdateAPIView,DestroyAPIView,RetrieveAPIView):
queryset = models.Article.objects.all()
serializer_class = DetailArticleSerializer
@wrap_permission(MyPermission) # 只对删除添加权限
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
self.perform_destroy(instance)
return Response('删除成功')
源码分析
class APIView(View):
permission_classes = api_settings.DEFAULT_PERMISSION_CLASSES
def dispatch(self, request, *args, **kwargs):
封装request对象
self.initial(request, *args, **kwargs)
通过反射执行视图中的方法
# 认证已经执行完了,request已经封装,接着向下执行initial
def initial(self, request, *args, **kwargs):
版本的处理
# 认证
self.perform_authentication(request)
# 权限判断
self.check_permissions(request)
self.check_throttles(request)
def perform_authentication(self, request):
request.user
def check_permissions(self, request):
# [对象,对象,]
for permission in self.get_permissions():
if not permission.has_permission(request, self):
self.permission_denied(request, message=getattr(permission, 'message', None))
def permission_denied(self, request, message=None):
if request.authenticators and not request.successful_authenticator:
raise exceptions.NotAuthenticated()
raise exceptions.PermissionDenied(detail=message)
def get_permissions(self):
return [permission() for permission in self.permission_classes] #权限对象列表
###
class UserView(APIView):
permission_classes = [MyPermission, ]
def get(self,request,*args,**kwargs):
return Response('user')
同源跨域
跨域
跨域:由于浏览器具有“同源策略”的限制, 当一个请求url的协议、域名、端口三者之间任意一个与当前页面url不同即为跨域
由于浏览器具有“同源策略”的限制。
如果在同一个域下发送ajax请求,浏览器的同源策略不会阻止。
如果在不同域下发送ajax,浏览器的同源策略会阻止。
限制ajax请求
一个域名两个端口访问时就得加端口,IP:端口
举例:
访问http:/www.xxxx.com,返回一个页面,页面上有按钮这个按钮可以发送ajax请求到其他网站获取数据,请求的网站是http:/api.xxxx.com. 这个网站和http:/www.xxxx.com不是同一个域,可以送出请求,访问的网站也有响应给浏览器,但浏览器有同源策略,当访问走到nginx的时候,就会报错,
+ 域相同,永远不会存在跨域。
+ crm,非前后端分离,没有跨域。
+ 路飞学城,前后端分离,没有跨域(之前有,现在没有)。
+ 域不同时,才会存在跨域。
+ 拉勾网,前后端分离,存在跨域(设置响应头解决跨域)
解决跨域:CORS
本质在数据返回值设置响应头
from django.shortcuts import render,HttpResponse
def json(request):
response = HttpResponse("JSONasdfasdf")
response['Access-Control-Allow-Origin'] = "*"
return response
跨域时,发送了2次请求?
在跨域时,发送的请求会分为两种:
-
简单请求,发一次请求。
设置响应头就可以解决 from django.shortcuts import render,HttpResponse def json(request): response = HttpResponse("JSONasdfasdf") response['Access-Control-Allow-Origin'] = "*" return response
-
复杂请求,发两次请求。
-
预检
-
请求
@csrf_exempt def put_json(request): response = HttpResponse("JSON复杂请求") if request.method == 'OPTIONS': # 处理预检 response['Access-Control-Allow-Origin'] = "*" response['Access-Control-Allow-Methods'] = "PUT" return response elif request.method == "PUT": return response
-
条件:
1、请求方式:HEAD、GET、POST
2、请求头信息:
Accept
Accept-Language
Content-Language
Last-Event-ID
Content-Type 对应的值是以下三个中的任意一个
application/x-www-form-urlencoded
multipart/form-data
text/plain
注意:同时满足以上两个条件时,则是简单请求,否则为复杂请求
DRF的访问频率限制
-
频率限制在认证、权限之后
-
知识点
{ throttle_anon_1.1.1.1:[100121340,], 1.1.1.2:[100121251,100120450,] } 限制:60s能访问3次 来访问时: 1.获取当前时间 100121280 2.100121280-60 = 100121220,小于100121220所有记录删除 3.判断1分钟以内已经访问多少次了? 4 4.无法访问 停一会 来访问时: 1.获取当前时间 100121340 2.100121340-60 = 100121280,小于100121280所有记录删除 3.判断1分钟以内已经访问多少次了? 0 4.可以访问
源码
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.throttling import AnonRateThrottle,BaseThrottle
class ArticleView(APIView):
throttle_classes = [AnonRateThrottle,]
def get(self,request,*args,**kwargs):
return Response('文章列表')
class ArticleDetailView(APIView):
def get(self,request,*args,**kwargs):
return Response('文章列表')
class BaseThrottle:
"""
Rate throttling of requests.
"""
def allow_request(self, request, view):
"""
Return `True` if the request should be allowed, `False` otherwise.
"""
raise NotImplementedError('.allow_request() must be overridden')
def get_ident(self, request):
"""
Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR
if present and number of proxies is > 0. If not use all of
HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR.
"""
xff = request.META.get('HTTP_X_FORWARDED_FOR')
remote_addr = request.META.get('REMOTE_ADDR')
num_proxies = api_settings.NUM_PROXIES
if num_proxies is not None:
if num_proxies == 0 or xff is None:
return remote_addr
addrs = xff.split(',')
client_addr = addrs[-min(num_proxies, len(addrs))]
return client_addr.strip()
return ''.join(xff.split()) if xff else remote_addr
def wait(self):
"""
Optionally, return a recommended number of seconds to wait before
the next request.
"""
return None
class SimpleRateThrottle(BaseThrottle):
"""
A simple cache implementation, that only requires `.get_cache_key()`
to be overridden.
The rate (requests / seconds) is set by a `rate` attribute on the View
class. The attribute is a string of the form 'number_of_requests/period'.
Period should be one of: ('s', 'sec', 'm', 'min', 'h', 'hour', 'd', 'day')
Previous request information used for throttling is stored in the cache.
"""
cache = default_cache
timer = time.time
cache_format = 'throttle_%(scope)s_%(ident)s'
scope = None
THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES
def __init__(self):
if not getattr(self, 'rate', None):
self.rate = self.get_rate()
self.num_requests, self.duration = self.parse_rate(self.rate)
def get_cache_key(self, request, view):
"""
Should return a unique cache-key which can be used for throttling.
Must be overridden.
May return `None` if the request should not be throttled.
"""
raise NotImplementedError('.get_cache_key() must be overridden')
def get_rate(self):
"""
Determine the string representation of the allowed request rate.
"""
if not getattr(self, 'scope', None):
msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
self.__class__.__name__)
raise ImproperlyConfigured(msg)
try:
return self.THROTTLE_RATES[self.scope]
except KeyError:
msg = "No default throttle rate set for '%s' scope" % self.scope
raise ImproperlyConfigured(msg)
def parse_rate(self, rate):
"""
Given the request rate string, return a two tuple of:
<allowed number of requests>, <period of time in seconds>
"""
if rate is None:
return (None, None)
num, period = rate.split('/')
num_requests = int(num)
duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
return (num_requests, duration)
def allow_request(self, request, view):
"""
Implement the check to see if the request should be throttled.
On success calls `throttle_success`.
On failure calls `throttle_failure`.
"""
if self.rate is None:
return True
# 获取请求用户的IP
self.key = self.get_cache_key(request, view)
if self.key is None:
return True
# 根据IP获取他的所有访问记录,[]
self.history = self.cache.get(self.key, [])
self.now = self.timer()
# Drop any requests from the history which have now passed the
# throttle duration
while self.history and self.history[-1] <= self.now - self.duration:
self.history.pop()
if len(self.history) >= self.num_requests:
return self.throttle_failure()
return self.throttle_success()
def throttle_success(self):
"""
Inserts the current request's timestamp along with the key
into the cache.
"""
self.history.insert(0, self.now)
self.cache.set(self.key, self.history, self.duration)
return True
def throttle_failure(self):
"""
Called when a request to the API has failed due to throttling.
"""
return False
def wait(self):
"""
Returns the recommended next request time in seconds.
"""
if self.history:
remaining_duration = self.duration - (self.now - self.history[-1])
else:
remaining_duration = self.duration
available_requests = self.num_requests - len(self.history) + 1
if available_requests <= 0:
return None
return remaining_duration / float(available_requests)
class AnonRateThrottle(SimpleRateThrottle):
"""
Limits the rate of API calls that may be made by a anonymous users.
The IP address of the request will be used as the unique cache key.
"""
scope = 'anon'
def get_cache_key(self, request, view):
if request.user.is_authenticated:
return None # Only throttle unauthenticated requests.
return self.cache_format % {
'scope': self.scope,
'ident': self.get_ident(request)
}
自定义频率限制
总结
-
如何实现的频率限制
- 匿名用户,用IP作为用户唯一标记,但如果用户换代理IP,无法做到真正的限制。 - 登录用户,用用户名或用户ID做标识。 具体实现: 在django的缓存中 = { throttle_anon_1.1.1.1:[100121340,], 1.1.1.2:[100121251,100120450,] } DRF中的频率控制基本原理是基于访问次数和时间的,当然我们可以通过自己定义的方法来实现。 当我们请求进来,走到我们频率组件的时候,DRF内部会有一个字典来记录访问者的IP, 以这个访问者的IP为key,value为一个列表,存放访问者每次访问的时间, { IP1: [第三次访问时间,第二次访问时间,第一次访问时间],} 把每次访问最新时间放入列表的最前面,记录这样一个数据结构后,通过什么方式限流呢~~ 如果我们设置的是10秒内只能访问5次, -- 1,判断访问者的IP是否在这个请求IP的字典里 -- 2,保证这个列表里都是最近10秒内的访问的时间 判断当前请求时间和列表里最早的(也就是最后的)请求时间的差 如果差大于10秒,说明请求以及不是最近10秒内的,删除掉, 继续判断倒数第二个,直到差值小于10秒 -- 3,判断列表的长度(即访问次数),是否大于我们设置的5次, 如果大于就限流,否则放行,并把时间放入列表的最前面
jwt
基本原理:
用于在前后端分离时,实现用户认证相关的一项技术。
一般用户认证有2种方式:
-
token
用户登录成功之后,生成一个随机字符串,自己保留一分+给前端返回一份。 以后前端再来发请求时,需要携带字符串。 后端对字符串进行校验。
-
jwt
用户登录成功之后,生成一个随机字符串,给前端。 - 生成随机字符串 {typ:"jwt","alg":'HS256'} {id:1,username:'alx','exp':10} 98qow39df0lj980945lkdjflo.saueoja8979284sdfsdf.asiuokjd978928374 - 类型信息通过base64加密 - 数据通过base64加密 - 两个密文拼接在h256加密+加盐 - 给前端返回 98qow39df0lj980945lkdjflo.saueoja8979284sdfsdf.asiuokjd978928375 前端获取随机字符串之后,保留起来。 以后再来发送请求时,携带98qow39df0lj980945lkdjflo.saueoja8979284sdfsdf.asiuokjd978928375。 后端接受到之后, 1.先做时间判断 2.字符串合法性校验。
安装
pip3 install djangorestframework-jwt
案例
-
app中注册
INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'api.apps.ApiConfig', 'rest_framework', 'rest_framework_jwt' ]
-
用户登录
import uuid from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.versioning import URLPathVersioning from rest_framework import status from api import models class LoginView(APIView): """ 登录接口 """ def post(self,request,*args,**kwargs): # 基于jwt的认证 # 1.去数据库获取用户信息 from rest_framework_jwt.settings import api_settings jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER user = models.UserInfo.objects.filter(**request.data).first() if not user: return Response({'code':1000,'error':'用户名或密码错误'}) payload = jwt_payload_handler(user) token = jwt_encode_handler(payload) return Response({'code':1001,'data':token})
-
用户认证
from rest_framework.views import APIView from rest_framework.response import Response # from rest_framework.throttling import AnonRateThrottle,BaseThrottle class ArticleView(APIView): # throttle_classes = [AnonRateThrottle,] def get(self,request,*args,**kwargs): # 获取用户提交的token,进行一步一步校验 import jwt from rest_framework import exceptions from rest_framework_jwt.settings import api_settings jwt_decode_handler = api_settings.JWT_DECODE_HANDLER jwt_value = request.query_params.get('token') try: payload = jwt_decode_handler(jwt_value) except jwt.ExpiredSignature: msg = '签名已过期' raise exceptions.AuthenticationFailed(msg) except jwt.DecodeError: msg = '认证失败' raise exceptions.AuthenticationFailed(msg) except jwt.InvalidTokenError: raise exceptions.AuthenticationFailed() print(payload) return Response('文章列表')
使用
举例
有文章,评论,订单视图,当匿名用户访问时文章可以访问,评论可以访问,添加评论需要认证,订单全部要认证,并且给访问文章页面加频率限制
# url.py from django.conf.urls import url from .views import account from .views import article from .views import comment from .views import order urlpatterns = [ url(r'^login/$',account.LoginView.as_view()), url(r'^article/$',article.ArticleAPIView.as_view()), url(r'^comment/$',comment.CommentAPIView.as_view()), url(r'^order/$',order.OrderAPIView.as_view()), ]
# account.py from rest_framework.views import APIView from rest_framework.response import Response from api import models from rest_framework_jwt.settings import api_settings class LoginView(APIView): authentication_classes = [] def post(self,request,*args,**kwargs): user = models.UserInfo.objects.filter(username=request.data.get('username'),password=request.data.get('password')).first() if not user: return Response('用户名或密码错误') jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER payload = jwt_payload_handler(user) jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER token= jwt_encode_handler(payload) return Response({'code':10000,'data':token})
# auth.py import jwt from rest_framework import exceptions from rest_framework.authentication import BaseAuthentication from rest_framework_jwt.settings import api_settings from api import models class HulaQueryParamAuthentication(BaseAuthentication): def authenticate(self, request): """ # raise Exception(), 不在继续往下执行,直接返回给用户。 # return None ,本次认证完成,执行下一个认证 # return ('x',"x"),认证成功,不需要再继续执行其他认证了,继续往后权限、节流、视图函数 """ token = request.query_params.get('token') if not token: raise exceptions.AuthenticationFailed({'code':10002,'error':"登录成功之后才能操作"}) jwt_decode_handler = api_settings.JWT_DECODE_HANDLER try: payload = jwt_decode_handler(token) except jwt.ExpiredSignature: raise exceptions.AuthenticationFailed({'code':10003,'error':"token已过期"}) except jwt.DecodeError: raise exceptions.AuthenticationFailed({'code':10004,'error':"token格式错误"}) except jwt.InvalidTokenError: raise exceptions.AuthenticationFailed({'code':10005,'error':"认证失败"}) jwt_get_username_from_payload = api_settings.JWT_PAYLOAD_GET_USERNAME_HANDLER username = jwt_get_username_from_payload(payload) user_object = models.UserInfo.objects.filter(username=username).first() return (user_object,token)
# article.py , comment.py , order.py from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.throttling import AnonRateThrottle #频率 #文章 class ArticleAPIView(APIView): throttle_classes = [AnonRateThrottle, ] #频率 def get(self,request,*args,**kwargs): return Response('文章列表') def get_authenticators(self): if self.request.method == 'GET': return [] #不需要认证就加[] # 评论 class CommentAPIView(APIView): def get(self,request,*args,**kwargs): return Response('评论列表') def post(self,request,*args,**kwargs): return Response('添加评论') def get_authenticators(self): if self.request.method == 'GET': return [] elif self.request.method == 'POST': return super().get_authenticators() #执行父类的get_authenticators()会在settings中找DEFAULT_AUTHENTICATION_CLASSES # 订单 class OrderAPIView(APIView): def get(self, request, *args, **kwargs): return Response('订单列表') def post(self, request, *args, **kwargs): return Response('添加订单')
```python
# settings.py
import datetime
REST_FRAMEWORK = {
"DEFAULT_VERSIONING_CLASS":"rest_framework.versioning.URLPathVersioning",
"ALLOWED_VERSIONS":['v1','v2'],#版本
"DEFAULT_AUTHENTICATION_CLASSES":['api.extensions.auth.HulaQueryParamAuthentication',],
"DEFAULT_THROTTLE_RATES":{
"anon":'10/m' #1分钟访问3次
}
}
JWT_AUTH={
'JWT_EXPIRATION_DELTA': datetime.timedelta(minutes=10), #token过期时间
}