zoukankan      html  css  js  c++  java
  • REST framwork之认证,权限与频率

    认证组件

    局部视图认证

    在app01.service.auth.py:

    复制代码
    class Authentication(BaseAuthentication):
    
        def authenticate(self,request):
            token=request._request.GET.get("token")
            token_obj=UserToken.objects.filter(token=token).first()
            if not token_obj:
                raise exceptions.AuthenticationFailed("验证失败!")
            return (token_obj.user,token_obj)
    复制代码

    在views.py:

    复制代码
    def get_random_str(user):
        import hashlib,time
        ctime=str(time.time())
    
        md5=hashlib.md5(bytes(user,encoding="utf8"))
        md5.update(bytes(ctime,encoding="utf8"))
    
        return md5.hexdigest()
    
    
    from app01.service.auth import *
    
    from django.http import JsonResponse
    class LoginViewSet(APIView):
        authentication_classes = [Authentication,]
        def post(self,request,*args,**kwargs):
            res={"code":1000,"msg":None}
            try:
                user=request._request.POST.get("user")
                pwd=request._request.POST.get("pwd")
                user_obj=UserInfo.objects.filter(user=user,pwd=pwd).first()
                print(user,pwd,user_obj)
                if not user_obj:
                    res["code"]=1001
                    res["msg"]="用户名或者密码错误"
                else:
                    token=get_random_str(user)
                    UserToken.objects.update_or_create(user=user_obj,defaults={"token":token})
                    res["token"]=token
    
            except Exception as e:
                res["code"]=1002
                res["msg"]=e
    
            return JsonResponse(res,json_dumps_params={"ensure_ascii":False})
    复制代码

    全局视图认证组件

    settings.py配置如下:

    REST_FRAMEWORK={
        "DEFAULT_AUTHENTICATION_CLASSES":["app01.service.auth.Authentication",]
    }

    权限组件

    局部视图权限

    在app01.service.permissions.py中:

    复制代码
    from rest_framework.permissions import BasePermission
    class SVIPPermission(BasePermission):
        message="SVIP才能访问!"
        def has_permission(self, request, view):
            if request.user.user_type==3:
                return True
            return False
    复制代码

    在views.py:

    复制代码
    from app01.service.permissions import *
    
    class BookViewSet(generics.ListCreateAPIView):
        permission_classes = [SVIPPermission,]
        queryset = Book.objects.all()
        serializer_class = BookSerializers
    复制代码

    全局视图权限

    settings.py配置如下:

    REST_FRAMEWORK={
        "DEFAULT_AUTHENTICATION_CLASSES":["app01.service.auth.Authentication",],
        "DEFAULT_PERMISSION_CLASSES":["app01.service.permissions.SVIPPermission",]
    }

    throttle(访问频率)组件

    局部视图throttle

    在app01.service.throttles.py中:

    复制代码
    from rest_framework.throttling import BaseThrottle
    
    VISIT_RECORD={}
    class VisitThrottle(BaseThrottle):
    
        def __init__(self):
            self.history=None
    
        def allow_request(self,request,view):
            remote_addr = request.META.get('REMOTE_ADDR')
            print(remote_addr)
            import time
            ctime=time.time()
    
            if remote_addr not in VISIT_RECORD:
                VISIT_RECORD[remote_addr]=[ctime,]
                return True
    
            history=VISIT_RECORD.get(remote_addr)
            self.history=history
    
            while history and history[-1]<ctime-60:
                history.pop()
    
            if len(history)<3:
                history.insert(0,ctime)
                return True
            else:
                return False
    
        def wait(self):
            import time
            ctime=time.time()
            return 60-(ctime-self.history[-1])
    复制代码

    在views.py中:

    复制代码
    from app01.service.throttles import *
    
    class BookViewSet(generics.ListCreateAPIView):
        throttle_classes = [VisitThrottle,]
        queryset = Book.objects.all()
        serializer_class = BookSerializers
    复制代码

    全局视图throttle

    REST_FRAMEWORK={
        "DEFAULT_AUTHENTICATION_CLASSES":["app01.service.auth.Authentication",],
        "DEFAULT_PERMISSION_CLASSES":["app01.service.permissions.SVIPPermission",],
        "DEFAULT_THROTTLE_CLASSES":["app01.service.throttles.VisitThrottle",]
    }

    内置throttle类

    在app01.service.throttles.py修改为:

    class VisitThrottle(SimpleRateThrottle):
    
        scope="visit_rate"
        def get_cache_key(self, request, view):
    
            return self.get_ident(request)

    settings.py设置:

    REST_FRAMEWORK={
        "DEFAULT_AUTHENTICATION_CLASSES":["app01.service.auth.Authentication",],
        "DEFAULT_PERMISSION_CLASSES":["app01.service.permissions.SVIPPermission",],
        "DEFAULT_THROTTLE_CLASSES":["app01.service.throttles.VisitThrottle",],
        "DEFAULT_THROTTLE_RATES":{
            "visit_rate":"5/m",
        }
    }

    根据访问频率组件Throttle 设计一个程序:

    每分钟同一个IP只能访问十次
    from django.db import models
    
    # Create your models here.
    
    class Userinfo(models.Model):
        name=models.CharField(max_length=32,verbose_name='用户名')
        pwd=models.CharField(max_length=32,verbose_name='密码')
        token=models.CharField(max_length=64,null=True)
    
        def __str__(self):
            return self.name
    models
    from django.shortcuts import render
    
    # Create your views here.
    
    import time
    
    
    from rest_framework.views import APIView
    from rest_framework.response import Response
    from rest_framework.throttling import BaseThrottle
    from rest_framework import exceptions
    
    # Create your views here.
    
    
    AllOW = {}
    
    
    class MyThrottle(BaseThrottle):
        '''
        限制每分钟访问10次
        '''
        ip = '127.0.0.1'
        def allow_request(self, request, view):
            ctime = time.time()        # 当前时间。
            ip = self.ip               # IP。
            if ip not in AllOW:        # 如果IP不在ALLOW里面。
                AllOW[ip] = [ctime, ]  # 为IP设置一个当前时间。
            else:
                time_list = AllOW[ip]  # IP对应的访问时间。
                while True:
                    if  not time_list:    # 这里加判断是为了防止,在time_list中没有值的时候会报错。
                        break
                    if ctime - 60 > time_list[-1]:  # 当当前的时间减去60,大于最初的时间。
                        time_list.pop()   # 自动去掉最初的一条记录。
                    else:
                        break
                if len(AllOW[ip]) > 9:   # 如果一分钟内,IP的访问次数超过10次。
                    return False
                AllOW[ip].insert(0, ctime)
            return True
    
        def wait(self):
            ip = self.ip
            ctime = time.time()
            first_in_time = AllOW[ip][-1]
            wt = 60 - (ctime - first_in_time)  # 等待时间,当前时间减去最后一次的时间得到的一个数,然后再使用60秒减去这个数。
            return wt
    
    
    class Limitview(APIView):
        throttle_classes = [MyThrottle,]  # 请求频率的设置为MyThrottle
    
        def get(self, request):
            return Response('欢迎进入python的世界!!!')
    
        def throttled(self, request, wait):
            class InnerThrottled(exceptions.Throttled):
                default_detail = '您的请求已被限制.'
                extra_detail_singular = 'Expected available in {wait} second.'
                extra_detail_plural = '请耐心等待{wait}秒'
    
            raise InnerThrottled(wait)
    view.py
    urlpatterns = [
        url(r'^admin/', admin.site.urls),
        url(r'^Limitview/',views.Limitview.as_view())
    ]
    urls
    REST_FRAMEWORK = {
        'UNAUTHENTICATED_USER': None,
        'UNAUTHENTICATED_TOKEN': None,
        "DEFAULT_AUTHENTICATION_CLASSES": [
          # "app01.utils.MyAuthentication",
          #   "app02.utils.MyAuthentication",
        ],
        "DEFAULT_PERMISSION_CLASSES":[
           # "app02.utils.MyPermission",
           # "app02.utils.AdminPermission",
        ],
        "DEFAULT_THROTTLE_RATES":{
           'tiga':'10/m',
        }
    }
    settings

    那么我们还可以把认证,权限与频率组合起来进行一个访问频率限制:

    from django.shortcuts import render
    from rest_framework.views import APIView
    from rest_framework.response import Response
    from rest_framework.authentication import BaseAuthentication
    from rest_framework.permissions import BasePermission
    from rest_framework.throttling import SimpleRateThrottle
    from rest_framework import exceptions
    
    from app01 import models
    
    # Create your views here.
    class MyAuthentication(BaseAuthentication):
        def authenticate(self, request):
            token=request.query_params.get(‘token‘)
            user=models.Userinfo.objects.filter(token=token).first()
            if user:
                return (user.name,user)
            return None
    
    class UserPermission(BasePermission):
        message=‘登录用户才可以访问‘
        def has_permission(self, request, view):
            if request.user:
                return True
            return False
    class AdminPermission(BasePermission):
        message=‘管理员才能访问‘
        def has_permission(self, request, view):
            if request.user ==‘ctz‘:
                return True
            return False
    
    
    class AnnoThrottle(SimpleRateThrottle):
        scope = ‘anno‘
        def get_cache_key(self, request, view):
            #如果是匿名用户则执行
            if not request.user:
                return self.get_ident(request)
            #如果不是匿名用户则让他执行
            return None
    
    class UserThrottle(SimpleRateThrottle):
        scope = ‘user‘
    
        def get_cache_key(self, request, view):
            #当前用户登陆了,并且当前用户不是管理员
            if request.user and request.user!=‘ctz‘:
                return self.get_ident(request)
            #如果是匿名用户和管理员 则让他继续执行
            return None
    
    class AdminThrottle(SimpleRateThrottle):
        scope = ‘admin‘
    
        def get_cache_key(self, request, view):
            #如果是管理员
            if request.user==‘ctz‘:
                return self.get_ident(request)
            #不是管理员
            return  None
    
    class IndexView(APIView):
        ‘‘‘
        要求,所有用户都能访问,匿名用户5/m,普通用户10/m,管理员不限
        ‘‘‘
        authentication_classes = [MyAuthentication,]
        permission_classes = []
        throttle_classes = [AnnoThrottle,UserThrottle,AdminThrottle]
        def get(self,request):
            return Response(‘首页‘)
    
        def throttled(self, request, wait):
            class UserInnerThrottled(exceptions.Throttled):
                default_detail = ‘请求被限制.‘
                extra_detail_singular = ‘Expected available in {wait} second.‘
                extra_detail_plural = ‘还需要再等待{wait}秒‘
            raise UserInnerThrottled(wait)
    
    
    
    class UserView(APIView):
        ‘‘‘
        要求:登录用户能访问,普通用户10/m,管理员20/m
        ‘‘‘
        authentication_classes = [MyAuthentication,]
        permission_classes = [UserPermission,]
        throttle_classes = [UserThrottle,AdminThrottle]
        def get(self,request):
            return Response(‘用户界面‘)
    
        def permission_denied(self, request, message=None):
            """
            If request is not permitted, determine what kind of exception to raise.
            """
    
            if request.authenticators and not request.successful_authenticator:
                raise exceptions.NotAuthenticated(‘无权访问‘)
            raise exceptions.PermissionDenied(detail=message)
    
    
        def throttled(self, request, wait):
            class UserInnerThrottled(exceptions.Throttled):
                default_detail = ‘请求被限制.‘
                extra_detail_singular = ‘Expected available in {wait} second.‘
                extra_detail_plural = ‘还需要再等待{wait}秒‘
            raise UserInnerThrottled(wait)
    
    class ManageView(APIView):
        ‘‘‘
        要求:只有管理园能访问,5/m
        ‘‘‘
        authentication_classes = [MyAuthentication,]
        permission_classes = [AdminPermission,]
        throttle_classes = [AdminThrottle]
        def get(self,request):
            return Response(‘管理员界面‘)
    
        def permission_denied(self, request, message=None):
            """
            If request is not permitted, determine what kind of exception to raise.
            """
    
            if request.authenticators and not request.successful_authenticator:
                raise exceptions.NotAuthenticated(‘无权访问‘)
            raise exceptions.PermissionDenied(detail=message)
    
        def throttled(self, request, wait):
            class UserInnerThrottled(exceptions.Throttled):
                default_detail = ‘请求被限制.‘
                extra_detail_singular = ‘Expected available in {wait} second.‘
                extra_detail_plural = ‘还需要再等待{wait}秒‘
            raise UserInnerThrottled(wait)
    view.py
    from django.db import models
    
    # Create your models here.
    class Userinfo(models.Model):
        name=models.CharField(max_length=32,verbose_name=‘用户名‘)
        pwd=models.CharField(max_length=32,verbose_name=‘密码‘)
        token=models.CharField(max_length=64,null=True)
    
        def __str__(self):
            return self.name
    models
    REST_FRAMEWORK = {
        ‘UNAUTHENTICATED_USER‘: None,
        ‘UNAUTHENTICATED_TOKEN‘: None,
        "DEFAULT_AUTHENTICATION_CLASSES": [
          # "app01.utils.MyAuthentication",
            "app02.utils.MyAuthentication",
        ],
        "DEFAULT_PERMISSION_CLASSES":[
           "app02.utils.MyPermission",
           "app02.utils.AdminPermission",
        ],
        "DEFAULT_THROTTLE_RATES":{
           ‘tiga‘:‘10/m‘,
           ‘anno‘:‘5/m‘,
           ‘user‘:‘10/m‘,
           ‘admin‘:‘20/m‘,
        }
    }
    settings
       url(r‘^index/‘, views.IndexView.as_view()),
        url(r‘^user/‘, views.UserView.as_view()),
        url(r‘^manage/‘, views.ManageView.as_view()),
    urls
  • 相关阅读:
    在Jenkins中使用sonar进行静态代码检查
    privoxy自动请求转发到多个网络
    实现自动SSH连接
    云平台的微服务架构实践
    JHipster技术栈理解
    部署模式
    部署模式
    部署模式
    JHipster生成单体架构的应用示例
    JHipster生成微服务架构的应用栈(二)- 认证微服务示例
  • 原文地址:https://www.cnblogs.com/zhangsanfeng/p/9416152.html
Copyright © 2011-2022 走看看