zoukankan      html  css  js  c++  java
  • rest_framework组件之认证,权限,访问频率

    共用的models
     1 from django.db import models
     2 
     3 
     4 # Create your models here.
     5 
     6 
     7 class User(models.Model):
     8     username = models.CharField(max_length=32)
     9     password = models.CharField(max_length=32)
    10     user_type = models.IntegerField(choices=((1, '超级用户'), (2, '普通用户'), (3, '第三方用户')))
    11 
    12 
    13 class UserToken(models.Model):
    14     user = models.OneToOneField(to='User')
    15     token = models.CharField(max_length=64)
    View Code

    urls

    1 from django.conf.urls import url
    2 from django.contrib import admin
    3 from app01 import views
    4 urlpatterns = [
    5     url(r'^admin/', admin.site.urls),
    6     url(r'^course/',views.Course.as_view()),
    7     url(r'^login/',views.Login.as_view()),
    8     url(r'^get_ip/',views.GetIp.as_view()),
    9 ]
    View Code

    views

     1 from django.shortcuts import render, HttpResponse
     2 
     3 # Create your views here.
     4 
     5 import json
     6 from django.views import View
     7 from rest_framework.views import APIView
     8 from app01 import models
     9 from utils.common import *
    10 from rest_framework.response import Response
    11 
    12 
    13 class Login(APIView):
    14     def post(self, request, *args, **kwargs):
    15         response = MyResponse()
    16         name = request.data.get('username')
    17         pwd = request.data.get('password')
    18         user = models.User.objects.filter(username=name, password=pwd).first()
    19         if user:
    20             token = get_token(name)
    21             ret = models.UserToken.objects.update_or_create(user=user, defaults={'token': token})
    22             response.status = 100
    23             response.msg = '登陆成功'
    24             response.token = token
    25 
    26         else:
    27             response.msg = '用户名或密码错误'
    28 
    29         return Response(response.get_dic())
    30 
    31 
    32 class Course(APIView):
    33     authentication_classes = [MyAuth, ]
    34     permission_classes = [MyPermission, ]
    35     throttle_classes = [VisitThrottle, ]
    36 
    37     def get(self, request):
    38         print(request.user)
    39         print(request.auth)
    40         return HttpResponse(json.dumps({'name': 'Python'}))
    View Code


    认证组件

    1 写一个认证类
    from rest_framework.authentication import BaseAuthentication
    class MyAuth(BaseAuthentication):
    def authenticate(self,request):
    # request 是封装后的
    token = request.query_params.get('token')
    ret = models.UserToken.objects.filter(token=token).first()
    if ret:
    # 认证通过
    return
    else:
    raise AuthenticationFailed('认证失败')
    #可以不写了
    def authenticate_header(self,ss):
    pass
    2 局部使用
    authentication_classes=[MyAuth,MyAuth2]
    3 全局使用
    查找顺序:自定义的APIView里找---》项目settings里找---》内置默认的
    REST_FRAMEWORK={
    'DEFAULT_AUTHENTICATION_CLASSES':['utils.common.MyAuth',]

    }

    认证:
     1 import hashlib
     2 import time
     3 
     4 
     5 class MyResponse():
     6     def __init__(self):
     7         self.status = 1001
     8         self.msg = None
     9 
    10     def get_dic(self):
    11         return self.__dict__
    12 
    13 
    14 # res = MyResponse()
    15 #
    16 # print(res.get_dic())
    17 
    18 def get_token(name):
    19     md = hashlib.md5()
    20     md.update(name.encode('utf-8'))
    21     md.update(str(time.time()).encode('utf-8'))
    22 
    23     return md.hexdigest()  # 返回的是包含name和time的hash值
    24 
    25 
    26 from rest_framework.authentication import BaseAuthentication
    27 from app01 import models
    28 from rest_framework.exceptions import APIException, AuthenticationFailed
    29 
    30 
    31 class MyAuth(BaseAuthentication):
    32     def authenticate(self, request):
    33         token = request.query_params.get('token')
    34         ret = models.UserToken.objects.filter(token=token).first()
    35         if ret:
    36 
    37             return ret.user,ret
    38         else:
    39 
    40             raise AuthenticationFailed("认证失败")
    View Code

    权限组件
    1 写一个类
    class MyPermission():
    def has_permission(self,request,view):
    token=request.query_params.get('token')
    ret=models.UserToken.objects.filter(token=token).first()
    if ret.user.type==2:
    # 超级用户可以访问
    return True
    else:
    return False
    2 局部使用:
    permission_classes=[MyPermission,]
    3 全局使用:
    REST_FRAMEWORK={
    'DEFAULT_PERMISSION_CLASSES':['utils.common.MyPermission',]
    }

    权限组件:
     1 from rest_framework.permissions import BasePermission
     2 
     3 class MyPermission(BasePermission):
     4     message = "不是超级用户,查看不了"
     5     def has_permission(self, request, view):
     6         token = request.query_params.get("token")
     7         ret=models.UserToken.objects.filter(token=token).first()
     8         print(ret.user.get_user_type_display())
     9         if ret.user.user_type == 1:
    10             return True
    11         else:
    12             return False
    View Code
    
    


    频率组件
    1 写一个类:
    from rest_framework.throttling import SimpleRateThrottle
    class VisitThrottle(SimpleRateThrottle):
    scope = 'xxx'
    def get_cache_key(self, request, view):
    return self.get_ident(request)
    2 在setting里配置:
    'DEFAULT_THROTTLE_RATES':{
    'xxx':'5/h',
    }
    3 局部使用
    throttle_classes=[VisitThrottle,]
    4 全局使用
    REST_FRAMEWORK={
    'DEFAULT_THROTTLE_CLASSES':['utils.common.MyPermission',]
    }
    访问频率组件:
    1 from rest_framework.throttling import SimpleRateThrottle
    2 
    3 
    4 class VisitThrottle(SimpleRateThrottle):
    5     scope = 'da peng ya'
    6 
    7     def get_cache_key(self, request, view):
    8         return self.get_ident(request)
    View Code



    作业:
    1 认证类,不存数据库的token验证
    token='idfjasfsadfas|userid'
     1 def get_token(id,salt='123'):
     2     import hashlib
     3     md=hashlib.md5()
     4     md.update(bytes(str(id),encoding='utf-8'))
     5     md.update(bytes(salt,encoding='utf-8'))
     6 
     7     return md.hexdigest()+'|'+str(id)
     8 
     9 def check_token(token,salt='123'):
    10     ll=token.split('|')
    11     import hashlib
    12     md=hashlib.md5()
    13     md.update(bytes(ll[-1],encoding='utf-8'))
    14     md.update(bytes(salt,encoding='utf-8'))
    15     if ll[0]==md.hexdigest():
    16         return True
    17     else:
    18         return False
    19 
    20 class TokenAuth():
    21     def authenticate(self, request):
    22         token = request.GET.get('token')
    23         success=check_token(token)
    24         if success:
    25             return
    26         else:
    27             raise AuthenticationFailed('认证失败')
    28     def authenticate_header(self,request):
    29         pass
    30 class Login(APIView):
    31     def post(self,reuquest):
    32         back_msg={'status':1001,'msg':None}
    33         try:
    34             name=reuquest.data.get('name')
    35             pwd=reuquest.data.get('pwd')
    36             user=models.User.objects.filter(username=name,password=pwd).first()
    37             if user:
    38                 token=get_token(user.pk)
    39                 # models.UserToken.objects.update_or_create(user=user,defaults={'token':token})
    40                 back_msg['status']='1000'
    41                 back_msg['msg']='登录成功'
    42                 back_msg['token']=token
    43             else:
    44                 back_msg['msg'] = '用户名或密码错误'
    45         except Exception as e:
    46             back_msg['msg']=str(e)
    47         return Response(back_msg)
    48 from rest_framework.authentication import BaseAuthentication
    49 class TokenAuth():
    50     def authenticate(self, request):
    51         token = request.GET.get('token')
    52         token_obj = models.UserToken.objects.filter(token=token).first()
    53         if token_obj:
    54             return
    55         else:
    56             raise AuthenticationFailed('认证失败')
    57     def authenticate_header(self,request):
    58         pass
    59 
    60 class Course(APIView):
    61     authentication_classes = [TokenAuth, ]
    62 
    63     def get(self, request):
    64         return HttpResponse('get')
    65 
    66     def post(self, request):
    67         return HttpResponse('post')
    View Code


    2 自己写一个每分钟限制三次访问的频率类
    ip:拿到ip
    全局变量:{ip:[时间1,时间2,时间3]}

    获取IP
    1     def get(self, request):
    2         x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
    3         if x_forwarded_for:
    4             ip = x_forwarded_for.split(',')[0]  # 所以这里是真实的ip
    5         else:
    6             ip = request.META.get('REMOTE_ADDR')  # 这里获得代理ip
    7         return ip
    
    
    

    待补充。。








  • 相关阅读:
    Codeforces Beta Round #92 (Div. 2 Only) B. Permutations 模拟
    POJ 3281 Dining 最大流 Dinic算法
    POJ 2441 Arrange the BUlls 状压DP
    URAL 1152 Faise Mirrors 状压DP 简单题
    URAL 1039 Anniversary Party 树形DP 水题
    URAL 1018 Binary Apple Tree 树形DP 好题 经典
    pytorch中的forward前向传播机制
    .data()与.detach()的区别
    Argparse模块
    pytorch代码调试工具
  • 原文地址:https://www.cnblogs.com/Roc-Atlantis/p/9833238.html
Copyright © 2011-2022 走看看