zoukankan      html  css  js  c++  java
  • rest_framework组件之认证,权限,访问频率

    共用的models
     1 from django.db import models
     2 
     3 
     4 # Create your models here.
     5 
     6 
     7 class User(models.Model):
     8     username = models.CharField(max_length=32)
     9     password = models.CharField(max_length=32)
    10     user_type = models.IntegerField(choices=((1, '超级用户'), (2, '普通用户'), (3, '第三方用户')))
    11 
    12 
    13 class UserToken(models.Model):
    14     user = models.OneToOneField(to='User')
    15     token = models.CharField(max_length=64)
    View Code

    urls

    1 from django.conf.urls import url
    2 from django.contrib import admin
    3 from app01 import views
    4 urlpatterns = [
    5     url(r'^admin/', admin.site.urls),
    6     url(r'^course/',views.Course.as_view()),
    7     url(r'^login/',views.Login.as_view()),
    8     url(r'^get_ip/',views.GetIp.as_view()),
    9 ]
    View Code

    views

     1 from django.shortcuts import render, HttpResponse
     2 
     3 # Create your views here.
     4 
     5 import json
     6 from django.views import View
     7 from rest_framework.views import APIView
     8 from app01 import models
     9 from utils.common import *
    10 from rest_framework.response import Response
    11 
    12 
    13 class Login(APIView):
    14     def post(self, request, *args, **kwargs):
    15         response = MyResponse()
    16         name = request.data.get('username')
    17         pwd = request.data.get('password')
    18         user = models.User.objects.filter(username=name, password=pwd).first()
    19         if user:
    20             token = get_token(name)
    21             ret = models.UserToken.objects.update_or_create(user=user, defaults={'token': token})
    22             response.status = 100
    23             response.msg = '登陆成功'
    24             response.token = token
    25 
    26         else:
    27             response.msg = '用户名或密码错误'
    28 
    29         return Response(response.get_dic())
    30 
    31 
    32 class Course(APIView):
    33     authentication_classes = [MyAuth, ]
    34     permission_classes = [MyPermission, ]
    35     throttle_classes = [VisitThrottle, ]
    36 
    37     def get(self, request):
    38         print(request.user)
    39         print(request.auth)
    40         return HttpResponse(json.dumps({'name': 'Python'}))
    View Code


    认证组件

    1 写一个认证类
    from rest_framework.authentication import BaseAuthentication
    class MyAuth(BaseAuthentication):
    def authenticate(self,request):
    # request 是封装后的
    token = request.query_params.get('token')
    ret = models.UserToken.objects.filter(token=token).first()
    if ret:
    # 认证通过
    return
    else:
    raise AuthenticationFailed('认证失败')
    #可以不写了
    def authenticate_header(self,ss):
    pass
    2 局部使用
    authentication_classes=[MyAuth,MyAuth2]
    3 全局使用
    查找顺序:自定义的APIView里找---》项目settings里找---》内置默认的
    REST_FRAMEWORK={
    'DEFAULT_AUTHENTICATION_CLASSES':['utils.common.MyAuth',]

    }

    认证:
     1 import hashlib
     2 import time
     3 
     4 
     5 class MyResponse():
     6     def __init__(self):
     7         self.status = 1001
     8         self.msg = None
     9 
    10     def get_dic(self):
    11         return self.__dict__
    12 
    13 
    14 # res = MyResponse()
    15 #
    16 # print(res.get_dic())
    17 
    18 def get_token(name):
    19     md = hashlib.md5()
    20     md.update(name.encode('utf-8'))
    21     md.update(str(time.time()).encode('utf-8'))
    22 
    23     return md.hexdigest()  # 返回的是包含name和time的hash值
    24 
    25 
    26 from rest_framework.authentication import BaseAuthentication
    27 from app01 import models
    28 from rest_framework.exceptions import APIException, AuthenticationFailed
    29 
    30 
    31 class MyAuth(BaseAuthentication):
    32     def authenticate(self, request):
    33         token = request.query_params.get('token')
    34         ret = models.UserToken.objects.filter(token=token).first()
    35         if ret:
    36 
    37             return ret.user,ret
    38         else:
    39 
    40             raise AuthenticationFailed("认证失败")
    View Code

    权限组件
    1 写一个类
    class MyPermission():
    def has_permission(self,request,view):
    token=request.query_params.get('token')
    ret=models.UserToken.objects.filter(token=token).first()
    if ret.user.type==2:
    # 超级用户可以访问
    return True
    else:
    return False
    2 局部使用:
    permission_classes=[MyPermission,]
    3 全局使用:
    REST_FRAMEWORK={
    'DEFAULT_PERMISSION_CLASSES':['utils.common.MyPermission',]
    }

    权限组件:
     1 from rest_framework.permissions import BasePermission
     2 
     3 class MyPermission(BasePermission):
     4     message = "不是超级用户,查看不了"
     5     def has_permission(self, request, view):
     6         token = request.query_params.get("token")
     7         ret=models.UserToken.objects.filter(token=token).first()
     8         print(ret.user.get_user_type_display())
     9         if ret.user.user_type == 1:
    10             return True
    11         else:
    12             return False
    View Code
    
    


    频率组件
    1 写一个类:
    from rest_framework.throttling import SimpleRateThrottle
    class VisitThrottle(SimpleRateThrottle):
    scope = 'xxx'
    def get_cache_key(self, request, view):
    return self.get_ident(request)
    2 在setting里配置:
    'DEFAULT_THROTTLE_RATES':{
    'xxx':'5/h',
    }
    3 局部使用
    throttle_classes=[VisitThrottle,]
    4 全局使用
    REST_FRAMEWORK={
    'DEFAULT_THROTTLE_CLASSES':['utils.common.MyPermission',]
    }
    访问频率组件:
    1 from rest_framework.throttling import SimpleRateThrottle
    2 
    3 
    4 class VisitThrottle(SimpleRateThrottle):
    5     scope = 'da peng ya'
    6 
    7     def get_cache_key(self, request, view):
    8         return self.get_ident(request)
    View Code



    作业:
    1 认证类,不存数据库的token验证
    token='idfjasfsadfas|userid'
     1 def get_token(id,salt='123'):
     2     import hashlib
     3     md=hashlib.md5()
     4     md.update(bytes(str(id),encoding='utf-8'))
     5     md.update(bytes(salt,encoding='utf-8'))
     6 
     7     return md.hexdigest()+'|'+str(id)
     8 
     9 def check_token(token,salt='123'):
    10     ll=token.split('|')
    11     import hashlib
    12     md=hashlib.md5()
    13     md.update(bytes(ll[-1],encoding='utf-8'))
    14     md.update(bytes(salt,encoding='utf-8'))
    15     if ll[0]==md.hexdigest():
    16         return True
    17     else:
    18         return False
    19 
    20 class TokenAuth():
    21     def authenticate(self, request):
    22         token = request.GET.get('token')
    23         success=check_token(token)
    24         if success:
    25             return
    26         else:
    27             raise AuthenticationFailed('认证失败')
    28     def authenticate_header(self,request):
    29         pass
    30 class Login(APIView):
    31     def post(self,reuquest):
    32         back_msg={'status':1001,'msg':None}
    33         try:
    34             name=reuquest.data.get('name')
    35             pwd=reuquest.data.get('pwd')
    36             user=models.User.objects.filter(username=name,password=pwd).first()
    37             if user:
    38                 token=get_token(user.pk)
    39                 # models.UserToken.objects.update_or_create(user=user,defaults={'token':token})
    40                 back_msg['status']='1000'
    41                 back_msg['msg']='登录成功'
    42                 back_msg['token']=token
    43             else:
    44                 back_msg['msg'] = '用户名或密码错误'
    45         except Exception as e:
    46             back_msg['msg']=str(e)
    47         return Response(back_msg)
    48 from rest_framework.authentication import BaseAuthentication
    49 class TokenAuth():
    50     def authenticate(self, request):
    51         token = request.GET.get('token')
    52         token_obj = models.UserToken.objects.filter(token=token).first()
    53         if token_obj:
    54             return
    55         else:
    56             raise AuthenticationFailed('认证失败')
    57     def authenticate_header(self,request):
    58         pass
    59 
    60 class Course(APIView):
    61     authentication_classes = [TokenAuth, ]
    62 
    63     def get(self, request):
    64         return HttpResponse('get')
    65 
    66     def post(self, request):
    67         return HttpResponse('post')
    View Code


    2 自己写一个每分钟限制三次访问的频率类
    ip:拿到ip
    全局变量:{ip:[时间1,时间2,时间3]}

    获取IP
    1     def get(self, request):
    2         x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
    3         if x_forwarded_for:
    4             ip = x_forwarded_for.split(',')[0]  # 所以这里是真实的ip
    5         else:
    6             ip = request.META.get('REMOTE_ADDR')  # 这里获得代理ip
    7         return ip
    
    
    

    待补充。。








  • 相关阅读:
    java 静态代码块和spring @value等注解注入顺序
    中秋节
    IP切换
    MMF循环队列实现RPC
    Redis 集群方案
    简单Http多线程下载实现
    信息采集
    大四了
    懒懒交流会《前端,架构,框架与库》里面提到的一些问题
    [知识整理] 导数据工具
  • 原文地址:https://www.cnblogs.com/Roc-Atlantis/p/9833238.html
Copyright © 2011-2022 走看看