zoukankan      html  css  js  c++  java
  • Django REST framework

    访问admin站点,先修改站点的语言配置

    settings.py

    LANGUAGE_CODE = 'zh-hans'  # 中文
    
    TIME_ZONE = 'Asia/Shanghai' # 时区是亚洲上海
    
    USE_I18N = True # 国际化
    
    USE_L10N = True # 本地化
    
    USE_TZ = True # 数据库是否使用TIME_ZONE,True表示使用上海的时区,False表示不使用,使用UTC时间,然后转成上海,会差8个小时

    认证介绍和源码分析

    1 只有认证通过的用户才能访问指定的url地址,
    比如:查询课程信息,需要登录之后才能查看,没有登录,就不能查看,这时候需要用到认证组件
    2 APIVIew--->dispatche--->self.initial--->写的 self.perform_authentication(request)# 认证 self.check_permissions(request) # 权限 self.check_throttles(request) # 频率 3 APIView的perform_authentication -request.user # 新的request对象,drf的Request类 4 Request类的user -被包装成了数据属性,内部有 self._authenticate() -Request类的_authenticate()方法 5 Request类的_authenticate()方法 def _authenticate(self): for authenticator in self.authenticators: try: user_auth_tuple = authenticator.authenticate(self) except exceptions.APIException: self._not_authenticated() raise if user_auth_tuple is not None: self._authenticator = authenticator self.user, self.auth = user_auth_tuple return self._not_authenticated() 6 drf的Request对象实例化是再什么时候? -再APIVIew的dispatch最上面完成的 - return Request( request, parsers=self.get_parsers(), authenticators=self.get_authenticators(), # 看它 negotiator=self.get_content_negotiator(), parser_context=parser_context ) 7 APIView的get_authenticators def get_authenticators(self): return [auth() for auth in self.authentication_classes] -如果我再视图类中写:authentication_classes=[类名,类名1] -返回[对象,对象1]

    认证类前奏登录功能,认证类编写

    登录功能

    ## 模型层models.py 先写两个表
    class User(models.Model):
        name = models.CharField(max_length=32)
        password = models.CharField(max_length=32)
    
    
    class UserToken(models.Model):
        user = models.OneToOneField(to='User',on_delete=models.CASCADE)
        token=models.CharField(max_length=32)

    登录视图类

    # views.py
    
    class UserView(ViewSetMixin, CreateAPIView):
        queryset = models.User.objects.all()
        serializer_class = serializer.UserModelSerializer
        # 全局使用下的局部禁用需要配置
        authentication_classes = []  # 认证局部禁用
        permission_classes = []    # 权限局部禁用
        throttle_classes = []       # 频率局部禁用
    
        #基于原生session版
        # @action(methods=['POST'], detail=False)
        # def login(self, request):
        #     username = request.data.get('username')
        #     password = request.data.get('password')
        #     user = models.User.objects.filter(name=username, password=password).first()
        #     request.session['name'] = user.name
        #     request.session['id'] = user.id
        #     print(type(request.session))
        #     request.session.save()
        #     print(request.session.session_key)
        #     from  django.contrib.sessions.backends.db import SessionStore
        #     if user:
        #         return APIResponse(msg='登录成功', token=request.session.session_key)
        #     else:
        #         return APIResponse(status=101, msg='用户名或密码错误')
    
        # 基于自己写的UserToken表版
        @action(methods=['POST'], detail=False)
        def login(self, request):
            username = request.data.get('username')
            password = request.data.get('password')
            user = models.User.objects.filter(name=username, password=password).first()
            token = uuid.uuid4()  # 生成一个uuid的随机字符串
            # 这个是错误的:user.usertoken是None
            # user.usertoken.user=user
            # user.usertoken.token=token
            # 如果每次都是新增,如果它登录过,这个地方会报错
            # models.UserToken.objects.create(user=user,token=token)
            # 如果有就更新,如果没有就创建
            # 根据user去查询,如果能查到,就修改token,如果查不到,就新增一条
            models.UserToken.objects.update_or_create(defaults={'token': token}, user=user)
            if user:
                return APIResponse(msg='登录成功', token=token)
            else:
                return APIResponse(status=101, msg='用户名或密码错误')

    路由

    # urls.py
    
    from django.contrib import admin
    from django.urls import path,include
    from rest_framework.routers import SimpleRouter
    from app01 import views
    
    router = SimpleRouter()
    router.register('user', views.UserView)
    
    urlpatterns = [
        path('admin/', admin.site.urls),
        path('api/', include(router.urls)),
    ]

    认证类编写和使用

    认证类的编写

    ####基于自己写的UserToken表(掌握这个就行)
    class LoginAuth(BaseAuthentication):
        def authenticate(self, request):
            token = request.GET.get('token')
            user_token = models.UserToken.objects.filter(token=token).first()
            if user_token:
                # 登录了
                # 返回两个值,第一个值,给了新的request对象的user属性,通常情况我们把当前登录用户给它
                return user_token.user, ''
            else:
                raise AuthenticationFailed('您没有登录')
    
    
    ####基于session的(了解即可)
    from django.contrib.sessions.models import Session
    from importlib import import_module
    from django.conf import settings
    
    class LoginAuth(BaseAuthentication):
        def authenticate(self, request):
            token = request.GET.get('token')
    
            # 通过传入的token(session_key,取到当前key的session对象)
            engine = import_module(settings.SESSION_ENGINE)
            self.SessionStore = engine.SessionStore
            request.session = self.SessionStore(token)
    
            Session.objects.filter(session_key=token).first()
            # print(request.session['name'])
            if request.session.get('name', None):
                print(request.session['name'])
                print(request.session['id'])
                return '', ''
            else:
                raise AuthenticationFailed('您没有登录')

    使用认证类(全局用,局部用)

    #全局用,setting中配置(所有接口都需要认证)
    REST_FRAMEWORK={
        "DEFAULT_AUTHENTICATION_CLASSES":["app01.auth.LoginAuth",]
    }
    # 登录功能需要局部禁用,在视图类中加入
        authentication_classes = []
        
    #只在局部使用,只在视图类中加
    authentication_classes = [LoginAuth,]

    登录认证

    • post 携带 usernamepassword 请求 token路由

    • JSONWebTokenAPIView 的post方法

    • post方法中调用 JSONWebTokenSerializer 序列化器,进行全局校验钩子

    • 调用 authenticate 方法,取到 AUTHENTICATION_BACKENDS 里的校验类并实例化

    • 去循环调用 校验类中的 authenticate(request、username、password) 方法,进行校验

    • 通过后返回一个user对象,判断user对象合法,然后签发token,并保存到 validated_data

    • 最终根据重写jwt_response_payload_handler(token, user, request)返回给前端。

    权限类编写和使用

    编写权限类

    from rest_framework.permissions import BasePermission   # 导入BasePermission权限基类
    
    class MyPermission(BasePermission): # 自定义权限类继承BasePermission
        message='你没有权限'   # 自定义返回给前端的访问错误信息
        def has_permission(self, request, view):
            if request.user.user_type == 1:
    
                return True
            else:
                self.message='你是%s用户,你没有权限'%request.user.get_user_type_display()
                return False

    权限类的使用

    # 局部使用(在视图类中加)
    permission_classes = [MyPermission,]
    # 全局使用(在配置文件中配置)
    REST_FRAMEWORK={
        "DEFAULT_PERMISSION_CLASSES":["app01.auth.MyPermission",],
    }

    权限类源码分析

    • BasePermission 基类

      • has_permission 相当于判断功能权限,如:看是否有操作这个功能权限。

      • has_object_permission 相当于判断数据权限,如:看是否有操作这条数据的权限。

      • 详细:has_object_permission 是用户过了 has_permission 判断有权限以后,再判断这个用户有没有对一个具体的对象有没有操作权限

    • 原理:

      • APIViewdispatch方法中,执行initial方法

      • 再执行check_permissions方法,再调用get_permissions方法

      • 会循环实例化permission_classes中的权限校验类,

      • 再去调用权限校验类中的has_permission方法,传requestAPIview 进行权限判断,如果不通过,抛出 permission_denied 错误

      • 如果不抛错再进行has_object_permission方法,传requestAPIviewobj(当前操作的对象) 进行权限判断,如果不通过,抛出 permission_denied 错误

    频率类的使用

    定义一个频率类

    from rest_framework.throttling import BaseThrottle, SimpleRateThrottle
    class MyThrottle(SimpleRateThrottle):
        scope = 'ip_th'
        def get_cache_key(self, request, view):
            return self.get_ident(request)

    在配置文件中配置

    REST_FRAMEWORK = {
        'DEFAULT_THROTTLE_RATES': {
            'ip_th': '5/m',  #一分钟访问5次
        },
    }

    局部使用,全局使用

    # 局部用,在视图类中配置
    throttle_classes = [MyThrottle,]
    # 全局用,在配置文件中配置
    REST_FRAMEWORK = {
        "DEFAULT_THROTTLE_CLASSES": ["app01.auth.MyThrottle", ],
        'DEFAULT_THROTTLE_RATES': {
            'ip_th': '5/m',  #一分钟访问5次
        },
    
    }

    从根上自定义频率类(了解)

    from rest_framework.throttling import BaseThrottle  # 导入BaseThrottle频率基类
    
    # {'127.0.0.1':[],'192.168.1.1':['8:31:13',],'192.168.1.2':[]}
    
    class MyThrottling(BaseThrottle):
        VISIT_RECORD = {}  # 记录访问者的大字典
    
        def __init__(self):
            self.history = None
    
        def allow_request(self, request, view):
            # (1)取出访问者ip
            # (2)判断当前ip不在访问字典里,添加进去,并且直接返回True,表示第一次访问,在字典里,继续往下走
            # (3)循环判断当前ip的列表,有值,并且当前时间减去列表的最后一个时间大于60s,把这种数据pop掉,这样列表中只有60s以内的访问时间,
            # (4)判断,当列表小于3,说明一分钟以内访问不足三次,把当前时间插入到列表第一个位置,返回True,顺利通过
            # (5)当大于等于3,说明一分钟内访问超过三次,返回False验证失败
            # (1)取出访问者ip
            # print(request.META)
            ip = request.META.get('REMOTE_ADDR')
            import time
            ctime = time.time()
            # (2)判断当前ip不在访问字典里,添加进去,并且直接返回True,表示第一次访问
            if ip not in self.VISIT_RECORD:
                self.VISIT_RECORD[ip] = [ctime, ]
                return True
            self.history = self.VISIT_RECORD.get(ip,[])
            # (3)循环判断当前ip的列表,有值,并且当前时间减去列表的最后一个时间大于60s,把这种数据pop掉,这样列表中只有60s以内的访问时间,
            while self.history and ctime - self.history[-1] > 60:
                self.history.pop()
            # (4)判断,当列表小于3,说明一分钟以内访问不足三次,把当前时间插入到列表第一个位置,返回True,顺利通过
            # (5)当大于等于3,说明一分钟内访问超过三次,返回False验证失败
            if len(self.history) < 3:
                self.history.insert(0, ctime)
                return True
            else:
                return False
    
        def wait(self):
            import time
            ctime = time.time()
            # return 60 - (ctime - self.history[-1])
            return 1

    SimpleRateThrottle:其实跟这个写的一样,只不过通过配置,可扩展性更高

    频率类源码分析

    • · 基类 BaseThrottle

      • allow_request 抽象方法

      • get_ident 检测有没有代理

      • wait 一旦用户访问次数到达阀值,显示用户需要等待的时间

    • SimpleRateThrottle 频率控制类 ***

      • 请求次数

      • 请求频率间隔时间

    • AnonRateThrottle 继承SimpleRateThrottle类,通过重写get_cache_key 实现根据IP 为key粒度的频率限制

    • UserRateThrottle 继承SimpleRateThrottle类,通过重写get_cache_key,实现根据IP + userID 为key粒度的频率限制

    • ScopedRateThrottle 继承SimpleRateThrottle类,通过重写allow_request 方法和重写 get_cache_key 方法,实现根据IP + userID + 视图 为key粒度的频率限制

    • 原理:

      • APIViewdispatch方法中,执行initial 方法

      • 再执行check_throttles方法,再调用get_throttles方法

      • 会循环实例化throttles_classes中的频率校验类,

      • 再去调用频率校验类中的allow_request方法,传requestAPIview 进行频率判断,如果不通过,抛出 exceptions.Throttled(需要等待的时间)错误

    认证、权限、频率演示总代码

    1 注册接口,使用postman注册用户,用户名不能以sb开头,密码必须大于3位,小于16位
    2 登陆接口,登陆成功返回token
    3 图书增加,图书修改,图书删除,图书查询所有,查询一本5个接口
      -所有操作必须登录后才能
      -只有超级用户有图书增加,图书修改,图书删除权限
    4 所有接口限制每分钟只能访问5次

    from django.db import models
    
    
    class User(models.Model):
        username = models.CharField(max_length=32)
        password = models.CharField(max_length=32)
        user_type = models.IntegerField(choices=((1, '超级管理员'), (2, '普通用户'), (3, '2B用户')),default=3)
    
    
    class UserToken(models.Model):
        user = models.OneToOneField(to='User', on_delete=models.CASCADE)
        token = models.CharField(max_length=16)
    
    
    class Book(models.Model):
        name = models.CharField(max_length=32)
        price = models.DecimalField(max_digits=5,decimal_places=2)
        publish = models.CharField(max_length=32)
    
    # 不要忘了数据库迁移两条命令
    模型层models.py
    from rest_framework.response import Response
    
    
    class APIResponse(Response) :
        def __init__(self, code=100, msg=None, data=None, status=None,
                     template_name=None, headers=None,
                     exception=False, content_type=None, **kwargs) :
            dic = {'status' : code, 'msg' : msg}
            if data :
                dic['data'] = data
            if kwargs :
                dic.update(kwargs)
    
            super().__init__(data=dic, status=status, template_name=template_name,
                             headers=headers, exception=exception, content_type=content_type)
    自定义 response.py
    from app01 import models
    from rest_framework.serializers import ModelSerializer
    from django.core.exceptions import ValidationError
    
    class BookModelSerializer(ModelSerializer):
        class Meta:
            model = models.Book
            fields = "__all__"
    
    
    class UserModelSerializer(ModelSerializer):
        class Meta:
            model = models.User
            fields = "__all__"
            extra_kwargs = {
                # 注册密码必须大于3位,小于16位
                'password': {'write_only': True, 'max_length': 16, 'min_length': 3}
            }
    
        # 局部钩子: 用户名不能以sb开头
        def validate_username(self, data):
            # data就是当前字段的值
            if data.startswith('sb'):
                raise ValidationError('不能以sb开头')
            else :
                return data
    序列化类 serializer.py
    from app01 import models
    from rest_framework.exceptions import AuthenticationFailed
    from rest_framework.authentication import BaseAuthentication
    
    ####写一个认证类:基于自己写的UserToken表
    class LoginAuth(BaseAuthentication):
        # 重写 authenticate 方法
        def authenticate(self, request):
            # 实现认证逻辑
            token = request.GET.get('token')
            user_token = models.UserToken.objects.filter(token=token).first()
            if user_token:
                # 认证通过后,返回一个元组(user对象,token值)
                return user_token.user,''
            else:
                # 失败后报错,如过要执行后面的认证,则返回None
                raise AuthenticationFailed('您还没有登录')
    
    
    ####写一个权限类
    from rest_framework.permissions import BasePermission
    
    class CustomPermission(BasePermission):
        message = '您没有权限'
        def has_permission(self, request, view):
            if hasattr(request,"user") and hasattr(request.user,"user_type") and request.user.user_type == 1:
                return True
            else:
                # 如果该字段用了choice,通过get_字段名_display()就能取出choice后面的中文
                self.message = '您是%s用户,您没有权限' % request.user.get_user_type_display()
    
                return False
    
    
    ### 写一个频率类
    
    from rest_framework.throttling import SimpleRateThrottle
    
    class CustomThrottle(SimpleRateThrottle):
        # 实例化时就会读取.scope的范围, 根据配置文件中配置的scope去选择频率如:4/s
        scope = 'admin'  # 在此处定义配置文件的控制评率的键
        def get_cache_key(self, request, view):
            ### 返回谁就以谁做限制
    
            #返回当前请求者的ip地址
            return self.get_ident(request)
            # return request.Meta.get('REMOTE_ADDR')
            #如果想按用户限制
            # return request.user.id
    自定义认证,权限,频率 auth.py
    from django.shortcuts import render
    from app01 import models,serializer
    from rest_framework.generics import GenericAPIView,ListAPIView,CreateAPIView,RetrieveAPIView,RetrieveUpdateDestroyAPIView
    from rest_framework.viewsets import ViewSetMixin
    from rest_framework.viewsets import ModelViewSet
    from rest_framework.decorators import action
    from app01.response import APIResponse
    import uuid
    
    
    class BookView(ViewSetMixin, ListAPIView, RetrieveAPIView):
        queryset = models.Book.objects.all()
        serializer_class = serializer.BookModelSerializer
        permission_classes = []
    
    
    class BookAdminView(ModelViewSet):
        queryset = models.Book.objects.all()
        serializer_class = serializer.BookModelSerializer
    
    
    class UserView(ViewSetMixin, CreateAPIView):
        queryset = models.User.objects.all()
        serializer_class = serializer.UserModelSerializer
        # 全局使用下的局部禁用需要配置
        authentication_classes = []  # 认证局部禁用
        permission_classes = []    # 权限局部禁用
        throttle_classes = []       # 频率局部禁用
    
        # 基于自己写的UserToken表版
        @action(methods=['POST'], detail=False)
        def login(self, request):
            username = request.data.get('username')
            password = request.data.get('password')
            user = models.User.objects.filter(username=username, password=password).first()
            token = uuid.uuid4()  # 生成一个uuid的随机字符串
            # 如果有就更新,如果没有就创建
            # 根据user去查询,如果能查到,就修改token,如果查不到,就新增一条
            models.UserToken.objects.update_or_create(defaults={'token': token}, user=user)
            if user:
                return APIResponse(msg='登录成功', token=token)
            else:
                return APIResponse(status=101, msg='用户名或密码错误')
    视图层 views.py
    from django.contrib import admin
    from django.urls import path,include
    from rest_framework.routers import SimpleRouter
    from app01 import views
    router = SimpleRouter()
    router.register('book', views.BookView)
    router.register('books', views.BookAdminView)
    router.register('user', views.UserView)
    
    urlpatterns = [
        path('admin/', admin.site.urls),
        path('', include(router.urls))
    ]
    路由层 urls.py
    REST_FRAMEWORK = {
        "DEFAULT_AUTHENTICATION_CLASSES" : ["app01.auth.LoginAuth", ],  # 认证
        "DEFAULT_PERMISSION_CLASSES" : ["app01.auth.CustomPermission", ],  # 权限
        "DEFAULT_THROTTLE_CLASSES" : ["app01.auth.CustomThrottle", ],  # 频率
        'DEFAULT_THROTTLE_RATES' : {
            'admin' : '5/m',  # 一分钟访问5次
        },
    
    }
    配置文件 settings.py
    从来就没有正确的选择,我们只不过是要努力奋斗,使当初的选择变得正确。
  • 相关阅读:
    Java中DAO的实现
    使用Fabric自动化你的任务
    JAVA中处理事务的程序--多条更新SQL语句的执行(包括回滚)
    财政业务流程
    ubuntu 14.04 install google chrom stable
    gcc编译命令
    textread 用法
    ubuntu software
    Coursera.org打不开怎么办
    Presentation__short sentence review mining
  • 原文地址:https://www.cnblogs.com/gfeng/p/14675103.html
Copyright © 2011-2022 走看看