zoukankan      html  css  js  c++  java
  • Django REST framework认证权限和限制和频率

    认证、权限和限制

    身份验证是将传入请求与一组标识凭据(例如请求来自的用户或其签名的令牌)相关联的机制。然后 权限 和 限制 组件决定是否拒绝这个请求。

    简单来说就是:

    认证确定了你是谁

    权限确定你能不能访问某个接口

    限制确定你访问某个接口的频率

    认证

    REST framework 提供了一些开箱即用的身份验证方案,并且还允许你实现自定义方案。

    个人 敲码log:

    1.

    # Create your models here.
    class UserInfo1(models.Model):
        id = models.AutoField(primary_key=True)  # 创建一个自增的主键字段
        # 创建一个varchar(64)的唯一的不为空的字段
        name = models.CharField(null=False, max_length=20)
        pwd = models.CharField(max_length=32,default=123)
    
    class Token(models.Model):
        user = models.OneToOneField("UserInfo1",on_delete=models.CASCADE)
        token = models.CharField(max_length=128)

    2.

    # 我需要一个随机字符串
    def get_random_str(user):
        import hashlib, time
        ctime = str(time.time())
        # 封装bytes类型一个MD5的加密对象
        md5 = hashlib.md5(bytes(user, encoding="utf8"))
        # md5.update 是拼接的效果,随机生成md5值
        md5.update(bytes(ctime, encoding="utf8"))
        return md5.hexdigest()
    
    
    # 认证、权限和限制
    class LoginModelView(APIView):
        def post(self, request):
            name = request.data.get("name")
            pwd = request.data.get("pwd")
    
            user = models.UserInfo1.objects.filter(name=name, pwd=pwd).first()
            res = {"state_code": 1000, "msg": None}
            if user:
                # 返回了一个usermd5 加密的字符串
                random_str = get_random_str(user.name)
                """
                当存在token时,则更新,不存在则创建,defaults: 是由 (field, value) 对组成的字典,用于更新对象。 
                返回一个由 (object, created)组成的元组,
                object: 是一个创建的或者是被更新的对象,
                created: 是一个标示是否创建了新的对象的布尔值。                      
                """
                token = models.Token.objects.update_or_create(user=user, defaults={"token": random_str})
                res["token"] = random_str
            else:
                res["state_code"] = 1001  # 错误状态码
                res["msg"] = "用户名或者密码错误"
    
            import json
            # 这是因为json.dumps 序列化时对中文默认使用的ascii编码.想输出真正的中文需要指定ensure_ascii=False:
            # 如果你这样的话就能把中文转成json字符串,而不是 u4e2du56fd
            return Response(json.dumps(res,ensure_ascii=False))

    Django update_or_create 分析

    update_or_create(defaults=None, **kwargs)
    defaults 的值不同则创建,相同则更新

    Member.objects.update_or_create(defaults={'user':1}, others={'field1':1,'field2':1})
    当存在user=1时,则更新,不存在则创建

    update_or_create 用法:

    update_or_create(defaults=None, **kwargs)

    kwargs: 来更新对象或创建一个新的对象。

    defaults: 是由 (field, value) 对组成的字典,用于更新对象。

     

    返回一个由 (object, created)组成的元组,

    object: 是一个创建的或者是被更新的对象,

    created: 是一个标示是否创建了新的对象的布尔值。

     

    update_or_create: 方法通过给出的kwarg

     

    定义一个认证类(详情请看 源码分析)

    好处:

    1.他能判断你是否是当前登录用户,当你没有带认证码 向我后端发请求的时候我是不会给你数据的。

    2.他每一次登录的认证都会改变。

    局部视图认证

    示例1:

    class TokenAuth(object):
        def authenticate(self, request):
            # 取到 request里面的 token值
            totken = request.GET.get("token")
            token_obj = models.Token.objects.filter(token=totken).first()
            if not token_obj:
                # 抛认证字段的异常
                raise exceptions.AuthenticationFailed("验证失败")
            else:
                return token_obj.user.name,token_obj.token
    
        def authenticate_header(self,request):
            pass
    # 多条数据
    class BookView(APIView):
        # 定义一个认证类
        authentication_classes = [TokenAuth]

    而没有定义 token类的依然可以访问。

     

    全局级别认证

    # 定义全局认证,所有视图都需要认证
    REST_FRAMEWORK = {
        "DEFAULT_AUTHENTICATION_CLASSES" : ["app01.utils.TokenAuth"]
    
    }

    from app01 import models
    from rest_framework import exceptions
    
    
    class TokenAuth(object):
        def authenticate(self, request):
            # 取到 request里面的 token值
            totken = request.GET.get("token")
            token_obj = models.Token.objects.filter(token=totken).first()
            if not token_obj:
                # 抛认证字段的异常
                raise exceptions.AuthenticationFailed("验证失败")
            else:
                return token_obj.user.name, token_obj.token
    
        def authenticate_header(self, request):
            pass

    权限

    只有VIP用户才能看的内容。

     

    from rest_framework.permissions import BasePermission
    class SVIPPermission(BasePermission):
        message="只有超级用户才能访问"
        def has_permission(self,request,view):
            username=request.user
            user_type=User.objects.filter(name=username).first().user_type
    
            if user_type==3:
    
                return True # 通过权限认证
            else:
                return False
    # 封装了3层
    class AuthorDetaiView(viewsets.ModelViewSet):
        permission_classes = [SVIPpermission,]
        throttle_classes = [] # 限制某个ip 每分钟访问次数不能超过20次
        # authentication_classes = [TokenAuth]
        # queryset serializer 这两个方法一定要定义成这个不然取不到值
        queryset = models.Author.objects.all()
        serializer_class = AuthorModelSerializers

    限制(频率组件)

    # 自定义局部限制

     

    import time
    
    # 自定义限制
    VISIT_RECORD = {}
    class VisitRateThrottle(object):
        def __init__(self):
            self.history = None
    
        def allow_request(self, request, view):
    
            """
            自定义频率限制60秒内只能访问三次
            """
            # 获取用户IP
            ip = request.META.get("REMOTE_ADDR")
            # 获取当前时间戳
            timestamp = time.time()
    
            # 如果当前访问ip没有在列表中 我就新建一个IP访问记录
            if ip not in VISIT_RECORD:
                VISIT_RECORD[ip] = [timestamp, ]
                # 可以通过验证
                return True
    
            # 如果列表中有值,我就取当当前ip地址 赋值给变量
            history = VISIT_RECORD[ip]
            self.history = history
            # 在列表头部 插入一个时间戳
            history.insert(0, timestamp)
            # 如果列表有值,最先插入的值小于 当前值-60 ,tiemstamp是当前时间是在增加的
            while history and history[-1] < timestamp - 60:
                # pop 取出 最后一个条件成立的值
                history.pop()
            # 列表中的时间戳 大于3 就返回falsse,否则通过
            if len(history) > 3:
                return False
            else:
                return True
    
        def wait(self):
            # 返回给前端还剩多少时间可以访问
            timestamp = time.time()
            # 求出还剩多少时间可以访问
            return 60 - (timestamp - self.history[-1])
    # 视图使用
    from
    rest_framework.response import Response class AuthorModelView(viewsets.ModelViewSet): authentication_classes = [TokenAuth, ] permission_classes = [SVIPPermission, ] throttle_classes = [VisitRateThrottle, ] # 限制某个IP每分钟访问次数不能超过20次 queryset = Author.objects.all() serializer_class = AuthorModelSerializers # 分页 # pagination_class = MyPageNumberPagination # renderer_classes = []

    # 全局使用

    # 在settings.py中设置rest framework相关配置项
    REST_FRAMEWORK = {
        "DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.TokenAuth", ],
        "DEFAULT_PERMISSION_CLASSES": ["app01.utils.SVIPPermission", ],
        "DEFAULT_THROTTLE_CLASSES": ["app01.utils.MyThrottle", ],
    }

    二:内置频率类的使用

    定义频率类,必须继承SimpleRateThrottle类:

    复制代码
    from rest_framework.throttling import SimpleRateThrottle
    class VisitThrottle(SimpleRateThrottle):
    # 必须配置scope参数 通过scope参数去settings中找频率限定的规则 scope = 'throttle'
    # 必须定义 get_cache_key函数 返回用户标识的key 这里借用源码中BaseThrottle类(SimpleRateThrottle的父类)中的get_ident函数返回用户ip地址 def get_cache_key(self, request, view): return self.get_ident(request)
    复制代码

    局部使用:

    视图函数中加上
    throttle_classes = [VisitThrottle,]

    全局使用:settings中配置

    REST_FRAMEWORK={
        'DEFAULT_THROTTLE_CLASSES':['utils.common.VisitThrottle'],
    # 局部使用也要在settings中配置上 DEFAULT_THROTTLE_RATES 通过self.scope取频率规则 (一分钟访问3次) 'DEFAULT_THROTTLE_RATES':{'throttle':'3/m',} }

    设置错误信息为中文:

    复制代码
    class Course(APIView):
        authentication_classes = [TokenAuth, ]
        permission_classes = [UserPermission, ]
        throttle_classes = [MyThrottles,]
    
        def get(self, request):
            return HttpResponse('get')
    
        def post(self, request):
            return HttpResponse('post')
    # 函数名为throttled 重写Throttled类中默认的错误信息 def throttled(self, request, wait): from rest_framework.exceptions import Throttled class MyThrottled(Throttled): default_detail = '访问频繁' extra_detail_singular = '等待 {wait} second.' extra_detail_plural = '等待 {wait} seconds.' raise MyThrottled(wait)



  • 相关阅读:
    Delphi公用函数单元
    Delphi XE5 for Android (十一)
    Delphi XE5 for Android (十)
    Delphi XE5 for Android (九)
    Delphi XE5 for Android (八)
    Delphi XE5 for Android (七)
    Delphi XE5 for Android (五)
    Delphi XE5 for Android (四)
    Delphi XE5 for Android (三)
    Delphi XE5 for Android (二)
  • 原文地址:https://www.cnblogs.com/Rivend/p/11844648.html
Copyright © 2011-2022 走看看