zoukankan      html  css  js  c++  java
  • rest_framework自己总结的

    settings.py

    REST_FRAMEWORK = {
        "DEFAULT_VERSIONING_CLASS": "rest_framework.versioning.URLPathVersioning",
        "DEFAULT_VERSION": "v1",
        "ALLOWED_VERSIONS": ["v1", "v2"],
        "VERSION_PARAM": "version",
        # "DEFAULT_AUTHENTICATION_CLASSES": ["utils.Auth.MyAuth"],
        "DEFAULT_THROTTLE_RATES": {
            "WD": "3/m"
        }
    }
    

    urls.py

    from django.conf.urls import url
    from django.contrib import admin
    # from serializers.views import StudentAPIView, StudentEditView, BooksAPIView
    from serializers.views import StudentView, VersionView, UserView, TestAuthView,BookPageView
    
    urlpatterns = [
        url(r'^admin/', admin.site.urls),
        # url(r'^api/students', StudentAPIView.as_view()),   # get post请求
        # url(r'^api/student/(?P<id>d+)', StudentEditView.as_view()), #带有id的get patch delete请求
    
        # 使用我们自己写的ModelViewSet,继承ViewSetMixin进行请求的分发
        # url(r'^api/students', BooksAPIView.as_view({"get": "list", "post": "create"})),
        # url(r'^api/student/(?P<id>d+)', BooksAPIView.as_view({"get": "retrieve", "patch": "update", "delete": "destroy"})),
    
        # 使用rest_framework自带的ModelViewSet进行请求的分发,这里要注意url携带的命名参数,名字应该是pk
        # url(r'^api/students', BooksAPIView.as_view({"get": "list", "post": "create"})),
        # url(r'^api/student/(?P<pk>d+)', BooksAPIView.as_view({"get": "retrieve", "patch": "update", "delete": "destroy"})),
    
        # 自己写的测试版二,可以通用的版本
        #  url(r'^api/student$', StudentView.as_view()),
        #  url(r'^api/student/(?P<id>d+)', StudentView.as_view()),
    
        # 带版本控制的
        url(r'^(?P<version>[v1|v2]+)/api/student$', StudentView.as_view()),  # http://127.0.0.1:8000/v1/api/student
        url(r'^(?P<version>[v1|v2]+)/api/student/(?P<id>d+)', StudentView.as_view()),
    
        # 版本控制测试
        url(r'^(?P<version>[v1|v2]+)/book$', VersionView.as_view()),
    
        # 认证测试,权限测试,频率测试
        url(r'^user$', UserView.as_view()),
        url(r'^test', TestAuthView.as_view()),  # http://127.0.0.1:8000/test?token=20609ddd01fe4faeb0ffe7d8d8c39881
    
        # 分页测试
        url(r'^book_page', BookPageView.as_view()),   # http://127.0.0.1:8000/book_page?page=1&size=1
                                                      # http://127.0.0.1:8000/book_page?limit=2&offset=0
                                                      # http://127.0.0.1:8000/book_page
    
    ]
    

      utils文件夹下的文件  Auth.py(认证)        pagenation.py(分页)       permissions.py(权限)     throttle(频率)

    # Auth.py
    from rest_framework.authentication import BaseAuthentication
    from rest_framework.exceptions import AuthenticationFailed
    from serializers.models import UserInfo
    from rest_framework.response import Response
    
    
    class MyAuth(BaseAuthentication):
        def authenticate(self, request):
            # 第一步先拿到前端传过来的token
            token = request.query_params["token"]
            # 验证token是否存在
            user_obj = UserInfo.objects.filter(token=token).first()
            if user_obj:
                return (user_obj, token)
            else:
                raise AuthenticationFailed("认证失败")
    			
    
    
    # pagenation.py
    from rest_framework import pagination  # 分页
    
    
    class MyPagenation(pagination.PageNumberPagination):
        page_size = 2
        page_query_param = 'page'
        page_size_query_param = "size"
        max_page_size = 3
    
    
    class MyLimitPage(pagination.LimitOffsetPagination):
        default_limit = 1
        limit_query_param = 'limit'  # limit限制每页显示的个数
        offset_query_param = 'offset'  # 相对第一个数据的偏移量
        max_limit = 2
    
    
    class MyCursorPage(pagination.CursorPagination):
        cursor_query_param = 'cursor'
        page_size = 2
        ordering = '-id'
    
    
    
    
    
    
    # permissions.py
    class MyPermission(object):
        message = "您没有权限,请充值"
        def has_permission(self, request, view):
            # 权限逻辑 有权限返回True 没有返回False
            # 认证是在权限前面执行
            # request.user   user_obj
            user_obj = request.user
            if user_obj.type == 1:
                return True
            else:
                return False
    
    
    
    
    
    # throttle.py
    from rest_framework import throttling  # 频率
    import time
    VISIT_RECORD = {}
    class MyThrottle(object):
        """
        60秒访问3次
        """
        def __init__(self):
            self.history = None
    
        def allow_request(self, request, view):
            """
            频率限制的逻辑
            通过返回True
            不通过返回False
            :param request:
            :param view:
            :return:
            """
            # 获取用户IP
            ip = request.META.get("REMOTE_ADDR")
            # 判断ip是否在访问记录里
            now = time.time()
            if ip not in VISIT_RECORD:
                VISIT_RECORD[ip] = [now,]
            # 如果ip在访问记录里
            history = VISIT_RECORD[ip]
            # 把当然访问时间添加到列表最前面
            history.insert(0, now)
    
            # 确保列表内的时间都是范围内时间
            while history and now - history[-1] > 60:
                history.pop()
            self.history = history
            # 看列表长度是否符合限制次数
            if len(history) <= 3:    # 经过这样的限制后,history列表中(60秒内)最多有三次访问记录,列表中最后的数据是据现在60秒内最早的访问记录(访问时间)
                return True
            else:
                return False
    
        def wait(self):
            """
            返回还剩多久可以访问
            :return:
            """
            now = time.time()
            return 60 - (now - self.history[-1])
    
    class MyVisitThrottle(throttling.SimpleRateThrottle):
        scope = "WD"
        """
        第一  自己的类里要有scope
        第二  settings   DEFAULT_THROTTLE_RATES
        第三  DEFAULT_THROTTLE_RATES = { scope配置的变量值:xxx}
        第四 重写 get_cache_key(self, request, view)
        """
        def get_cache_key(self, request, view):
            return self.get_ident(request)			
    

      

    views.py

    from django.shortcuts import render
    from rest_framework.views import APIView
    from rest_framework.response import Response
    from .models import Student,UserInfo
    from .serializers import StudentSerializer
    from utils.Auth import MyAuth
    from utils.permissions import MyPermission
    from utils.throttle import MyThrottle, MyVisitThrottle
    import uuid
    from utils.pagenation import MyPagenation,MyLimitPage,MyCursorPage
    from rest_framework.viewsets import ViewSetMixin, ModelViewSet
    
    # Create your views here.
    """
    class GenericAPIView(APIView):
        queryset = None
        serializer_class = None
    
        def get_queryset(self):
            return self.queryset.all()
    
        def get_serializer(self, *args, **kwargs):
            return self.serializer_class(*args, **kwargs)
    
    
    class ListModelMixin(object):
        def list(self, request):
            queryset = self.get_queryset()
            ser_obj = self.get_serializer(queryset, many=True)
            return Response(ser_obj.data)
    
    
    class CreateModelMixin(object):
        def create(self, request):
            ser_obj = self.get_serializer(data=request.data)
            if ser_obj.is_valid():
                print(ser_obj.validated_data)
                ser_obj.save()
                return Response(ser_obj.validated_data)
            else:
                return Response(ser_obj.errors)
    
    
    class All(GenericAPIView, ListModelMixin, CreateModelMixin):
        pass
    
    
    class RetrieveModelMixin(object):
        def retrieve(self, request, id):
            student_obj = self.get_queryset().filter(id=id).first()
            ser_obj = self.get_serializer(student_obj)
            return Response(ser_obj.data)
    
    
    class UpdateModelMixin(object):
        def update(self, request, id):
            student_obj = self.get_queryset().filter(id=id).first()
            ser_obj = self.get_serializer(instance=student_obj, data=request.data, partial=True)
            if ser_obj.is_valid():
                ser_obj.save()
                return Response(ser_obj.validated_data)
            else:
                return Response(ser_obj.errors)
    
    
    class DestroyModelMixin(object):
        def destroy(self, request, id):
            student_obj = self.get_queryset().filter(id=id).first()
            if student_obj:
                student_obj.delete()
                return Response("")
            else:
                return Response("删除对象不存在")
    
    
    class EditAll(GenericAPIView, RetrieveModelMixin, UpdateModelMixin, DestroyModelMixin):
        pass
    
    
    class StudentAPIView(All):
        queryset = Student.objects.all()
        serializer_class = StudentSerializer
    
        def get(self, request):
            return self.list(request)
    
        def post(self, request):
            return self.create(request)
    
    
    class StudentEditView(EditAll):
        queryset = Student.objects.all()
        serializer_class = StudentSerializer
    
        def get(self, request, id):  # 不知道为什么这里的形参id的名字必须与url(?P<id>d+)中传递参数的名字一样,可能这里传的是关键字参数吧
            return self.retrieve(request, id)
    
        def patch(self, request, id):
            return self.update(request, id)
    
        def delete(self, request, id):
            return self.destroy(request, id)
    
    
    # class ModelViewSet(ViewSetMixin, All, EditAll):  # 打开注释用的是自己写的ModelViewSet,不打开用的是rest_framework封装好的ModelViewSet
    #     pass
    
    
    class BooksAPIView(ModelViewSet):
        queryset = Student.objects.all()
        serializer_class = StudentSerializer
    """
    
    """
    #  自己的测试版一
    class GenericAPIView(APIView):
        queryset = None
        serializer_class = None
    
        def get_queryset(self):
            return self.queryset.all()
    
        def get_serializer(self, *args, **kwargs):
            return self.serializer_class(*args, **kwargs)
    
    
    class RetrieveModelMixin(object):
        def retrieve(self, request, id):
            if not id:
                queryset = self.get_queryset()
                ser_obj = self.get_serializer(queryset, many=True)
            else:
                student_obj = self.get_queryset().filter(id=id).first()
                ser_obj = self.get_serializer(student_obj)
    
            return Response(ser_obj.data)
    
    class CreateModelMixin(object):
        def create(self, request):
            ser_obj = self.get_serializer(data=request.data)
            if ser_obj.is_valid():
                print(ser_obj.validated_data)
                ser_obj.save()
                return Response(ser_obj.validated_data)
            else:
                return Response(ser_obj.errors)
    
    class UpdateModelMixin(object):
        def update(self, request, id):
            student_obj = self.get_queryset().filter(id=id).first()
            ser_obj = self.get_serializer(instance=student_obj, data=request.data, partial=True)
            if ser_obj.is_valid():
                ser_obj.save()
                return Response(ser_obj.validated_data)
            else:
                return Response(ser_obj.errors)
    
    class DestroyModelMixin(object):
        def destroy(self, request, id):
            student_obj = self.get_queryset().filter(id=id).first()
            if student_obj:
                student_obj.delete()
                return Response("")
            else:
                return Response("删除对象不存在")
    
    class All(GenericAPIView,RetrieveModelMixin,CreateModelMixin,UpdateModelMixin,DestroyModelMixin):
        pass
    class StudentView(All):
        queryset = Student.objects.all()
        serializer_class = StudentSerializer
    
        def get(self, request, id=None):
            return self.retrieve(request, id)
    
        def post(self, request,id=None):
            return self.create(request)
    
        def patch(self, request, id=None):
            return self.update(request, id)
    
        def delete(self, request, id=None):
            return self.destroy(request, id)
    """
    
    
    """
    #  自己的测试版二,这版整理的比较好可以通用
    class GenericAPIView(APIView):
        queryset = None
        serializer_class = None
        def get_queryset(self):
            return self.queryset.all()
        def get_serializer(self, *args, **kwargs):
            return self.serializer_class(*args, **kwargs)
        def retrieve(self, request, id):
            if not id:
                queryset = self.get_queryset()
                ser_obj = self.get_serializer(queryset, many=True)
            else:
                student_obj = self.get_queryset().filter(id=id).first()
                ser_obj = self.get_serializer(student_obj)
    
            return Response(ser_obj.data)
        def create(self, request):
            ser_obj = self.get_serializer(data=request.data)
            if ser_obj.is_valid():
                print(ser_obj.validated_data)
                ser_obj.save()
                return Response(ser_obj.validated_data)
            else:
                return Response(ser_obj.errors)
        def update(self, request, id):
            student_obj = self.get_queryset().filter(id=id).first()
            ser_obj = self.get_serializer(instance=student_obj, data=request.data, partial=True)
            if ser_obj.is_valid():
                ser_obj.save()
                return Response(ser_obj.validated_data)
            else:
                return Response(ser_obj.errors)
        def destroy(self, request, id):
            student_obj = self.get_queryset().filter(id=id).first()
            if student_obj:
                student_obj.delete()
                return Response("")
            else:
                return Response("删除对象不存在")
    
    
    class StudentView(GenericAPIView):
        queryset = Student.objects.all()
        serializer_class = StudentSerializer
    
        def get(self, request, id=None):        
            return self.retrieve(request, id)
    
        def post(self, request,id=None):
            return self.create(request)
    
        def patch(self, request, id=None):
            return self.update(request, id)
    
        def delete(self, request,id=None):
            return self.destroy(request, id)
    """
    
    
    # url带版本验证
    class GenericAPIView(APIView):
        queryset = None
        serializer_class = None
        def get_queryset(self):
            return self.queryset.all()
        def get_serializer(self, *args, **kwargs):
            return self.serializer_class(*args, **kwargs)
        def retrieve(self, request,version, id):
            if not id:
                queryset = self.get_queryset()
                ser_obj = self.get_serializer(queryset, many=True)
            else:
                student_obj = self.get_queryset().filter(id=id).first()
                ser_obj = self.get_serializer(student_obj)
            return Response(ser_obj.data)
        def create(self, request):
            ser_obj = self.get_serializer(data=request.data)
            if ser_obj.is_valid():
                print(ser_obj.validated_data)
                ser_obj.save()
                return Response(ser_obj.validated_data)
            else:
                return Response(ser_obj.errors)
        def update(self, request, id):
            student_obj = self.get_queryset().filter(id=id).first()
            ser_obj = self.get_serializer(instance=student_obj, data=request.data, partial=True)
            if ser_obj.is_valid():
                ser_obj.save()
                return Response(ser_obj.validated_data)
            else:
                return Response(ser_obj.errors)
        def destroy(self, request, id):
            student_obj = self.get_queryset().filter(id=id).first()
            if student_obj:
                student_obj.delete()
                return Response("")
            else:
                return Response("删除对象不存在")
    
    class StudentView(GenericAPIView):
        queryset = Student.objects.all()
        serializer_class = StudentSerializer
    
        def get(self, request,version=None, id=None):
            # 这里做些版本控制的逻辑,不同版本请求做不同处理,当然这里的逻辑最好还是写到GenericAPIView类里面
            if request.version == "v1":
                return Response("v1版本的返回")
            elif request.version == "v2":
                return Response("v2版本的返回")
            return self.retrieve(request,version, id)
    
        def post(self, request,version=None,id=None):
            return self.create(request)
    
        def patch(self, request,version=None, id=None):
            return self.update(request, id)
    
        def delete(self, request,version=None, id=None):
            return self.destroy(request, id)
    
    class VersionView(APIView):
        def get(self, request, version):
            print(request.version)
            print(request.versioning_scheme)
            if request.version == "v1":
                return Response("v1版本的返回")
            elif request.version == "v2":
                return Response("v2版本的返回")
            return Response("版本不存在")
    
    class UserView(APIView):
        def post(self, request):
            # 这相当于注册
            username = request.data["username"]
            UserInfo.objects.create(username=username, token=uuid.uuid4())
            return Response("ok")
    
    class TestAuthView(APIView):
        authentication_classes = [MyAuth,]   # 这里是局部注册认证,写在settings中是全局注册
        permission_classes = [MyPermission, ]
        throttle_classes = [MyThrottle, ]          # 使用自己写的
        # throttle_classes = [MyVisitThrottle, ]   # 使用提供的频率控制
    
        # 这相当于登录的认证
        def get(self, request):
            print(request.user)
            print(request.auth)
            return Response("认证测试")
    
    
    
    from rest_framework.negotiation import DefaultContentNegotiation
    class BookPageView(APIView):
        def get(self, request):
            queryset = Student.objects.all()
            my_pagenation = MyPagenation()   # page size分页方法
            # my_pagenation = MyLimitPage()        # limit offset分页方法
            # my_pagenation = MyCursorPage()      # cursor 游标分页
            page_queryset = my_pagenation.paginate_queryset(queryset, request, view=self)
            ser_obj = StudentSerializer(page_queryset, many=True)
            # return Response(ser_obj.data)
            return my_pagenation.get_paginated_response(ser_obj.data)
    
        def post(self, request):
            ser_obj = StudentSerializer(data=request.data)
            if ser_obj.is_valid():
                ser_obj.save()
                return Response(ser_obj.validated_data)
            else:
                return Response(ser_obj.errors)
    

      

    serializers.py
    from rest_framework import serializers
    from .models import Student
    
    
    class ClassesSerializer(serializers.Serializer):
        id = serializers.IntegerField()
        name = serializers.CharField(max_length=32)
    
    
    class CourseSerializer(serializers.Serializer):
        id = serializers.IntegerField()
        name = serializers.CharField(max_length=32)
    
    
    class StudentSerializer(serializers.ModelSerializer):
        classes_info = serializers.SerializerMethodField(read_only=True)
        course_info = serializers.SerializerMethodField(read_only=True)
    
        def get_classes_info(self, obj):
            ret = {
                "id": obj.classes.id,
                "name": obj.classes.name
            }
            return ret
    
        def get_course_info(self, obj):
            courses = obj.courses.all()
            ret = []
            for course in courses:
                ret.append({
                    "id": course.id,
                    'name': course.name
                })
            return ret
    
        class Meta:
            model = Student
            fields = "__all__"
            # fields = ["id", "name", "classes_info","course_info"]
            extra_kwargs = {
                "classes": {"write_only": True},
                "courses": {'write_only': True},
            }
    

      

    models.py
    from django.db import models
    
    # Create your models here.
    __all__ = ['Student', 'Classes', 'Course']
    
    
    class Student(models.Model):
        name = models.CharField(max_length=32, verbose_name='姓名')
        age = models.IntegerField()
        classes = models.ForeignKey(to='Classes')
        courses = models.ManyToManyField(to='Course')
        def __str__(self):
            return self.name
        class Meta:
            db_table='01-学生表'
            verbose_name_plural=db_table
    class Classes(models.Model):
        name = models.CharField(max_length=32,verbose_name='班级')
        def __str__(self):
            return self.name
        class Meta:
            db_table='02-班级表'
            verbose_name_plural=db_table
    class Course(models.Model):
        name=models.CharField(max_length=32,verbose_name='课程')
        def __str__(self):
            return self.name
        class Meta:
            db_table='03-课程表'
            verbose_name_plural=db_table
    class UserInfo(models.Model):
        username = models.CharField(max_length=32)
        token = models.UUIDField()
        CHOICES = ((1, "vip"), (2, "svip"), (3, "普通用户"))
        type = models.IntegerField(choices=CHOICES, default=3)
    

     

    app下的admin.py

    from django.contrib import admin
    from . import models
    
    # Register your models here.
    for table in models.__all__:
        admin.site.register(getattr(models,table))
    

      

  • 相关阅读:
    随笔
    转:windows 下 netsh 实现 端口映射(端口转发)
    2015年01月01日:新年第一天:happy new year to myself
    谨记一次问题排查经历
    新机器,分区为NTFS, 安装 Windows XP、Windows Server 2003 时蓝屏问题,修改为 FAT32 即可
    Oracle 11g 的bug?: aix 上,expdp 11.2.0.1 导出,impdp 11.2.0.3 导入,Interval 分区的 【Interval】 分区属性成了【N】
    Cursor for loop in Oracle
    Oracle date 详解
    oracle中to_timestamp和to_date什么区别
    Oracle FM FM09999999 确保8位数字 即使全是0
  • 原文地址:https://www.cnblogs.com/perfey/p/9959851.html
Copyright © 2011-2022 走看看