zoukankan      html  css  js  c++  java
  • DRF(六)

    一.postman实现登录

    urls.py:

        # 登录
        url(r'^v1/login/$', views.LoginSessionAPIView.as_view()),

    views.py:

    from rest_framework.views import APIView
    from django.contrib import auth
    class LoginSessionAPIView(APIView):
        # 登录要禁用认证与权限
        authentication_classes = []
        permission_classes = []
        def post(self, request, *args, **kwargs):
            username = request.data.get('username')
            password = request.data.get('password')
            if not (username and password):
                return Response('信息有误')
            # user = models.User.objects.filter(username=username).first()  # type: models.User
            # user.check_password(password)
            user = auth.authenticate(request, username=username, password=password)
            if not user:
                return Response('登录失败')
            auth.login(request, user=user)
            return Response('登录成功')

    注 : auth.login(request, user=user) 做了三件事  =>

             a.把登录用户的session信息保存到数据库的django_session表中

             b.把session_id签发给前台,保存到cookie中

             c.把登录的user保存到request.user中

    用户登录后做其他的增删改查操作:

            session认证中对登录的用户会开启csrftoken认证

    系统:session认证
    rest_framework.authentication.SessionAuthentication
    ajax请求通过认证:
    cookie中要携带 sessionid、csrftoken,请求头中要携带 x-csrftoken,后台从META字典中获取

            Postman请求header如图:

         

         Postman添加cookie如图:

        

    普通自定义认证类 , 模仿class SessionAuthentication (BaseAuthentication):

    from rest_framework.authentication import BaseAuthentication
    def authenticate(self, request):
        auth = 从request中得到
        user = 从auth中得到 (session校验中user是由sessionid解析出来的,而jwt校验user是由token字符串解析出来的)
        if not user:
            return None
        return user, auth

    二. jwt

    jwt:json web tokens 采用json格式在web上传输的 认证字符串
    
    jwt字符串:头.载荷.签名
    
    头:公司基本信息、项目组基本信息、常规加密算法名
    载荷:用户信息、过期时间
    签名:头、载荷、秘钥
    
    {头信息字典,采用base64加密算法}.{载荷信息字典,采用base64加密算法}.{头加密串、载荷加密串、服务器秘钥,采用hs256加密算法}
    
    base64是可逆加密
    hash256是不可逆加密

    原理图:

    安装:

    pip install djangorestframework-jwt

    1.jwt登录 (取消了认证和权限校验):

        urls.py:

       from rest_framework_jwt.views import ObtainJSONWebToken, obtain_jwt_token
    
        # url(r'^v2/login/$', ObtainJSONWebToken.as_view()),
        url(r'^v2/login/$', obtain_jwt_token),

       源码校验及视图:

     校验代码serializers.py(签发,编码形成token):

      APIView:

     def post(self, request, *args, **kwargs):
            serializer = self.get_serializer(data=request.data)
    
            if serializer.is_valid():
                user = serializer.object.get('user') or request.user
                token = serializer.object.get('token')
                response_data = jwt_response_payload_handler(token, user, request)
                response = Response(response_data)
                if api_settings.JWT_AUTH_COOKIE:
                    expiration = (datetime.utcnow() +
                                  api_settings.JWT_EXPIRATION_DELTA)
                    response.set_cookie(api_settings.JWT_AUTH_COOKIE,
                                        token,
                                        expires=expiration,
                                        httponly=True)
                return response
    
            return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

    发送账号密码post请求,成功返回token字符串,错误返回错误信息.

    2.自定义jwt登录:

    路由:

    url(r'^v3/login/$', views.LoginJWTAPIView.as_view()),

    视图:

     username可能携带的不止是用户名,可能还是用户的其它唯一标识 手机号 邮箱
     校验密码,手动通过 user 签发编码形成 jwt-token
    import re
    from utils.response import APIResponse
    from rest_framework_jwt.serializers import jwt_encode_handler, jwt_payload_handler
    class LoginJWTAPIView(APIView):
        authentication_classes = ()
        permission_classes = ()
        def post(self, request, *args, **kwargs):
            # username可能携带的不止是用户名,可能还是用户的其它唯一标识 手机号 邮箱
            username = request.data.get('username')
            password = request.data.get('password')
    
            # 如果username匹配上手机号正则 => 可能是手机登录
            if re.match(r'1[3-9][0-9]{9}', username):
                try:
                    # 手动通过 user 签发 jwt-token
                    user = models.User.objects.get(mobile=username)
                except:
                    return APIResponse(1, '该手机未注册')
    
            # 邮箱登录 等
    
            # 账号登录
            else:
                try:
                    # 手动通过 user 签发 jwt-token
                    user = models.User.objects.get(username=username)
                except:
                    return APIResponse(1, '该账号未注册')
    
            # 获得用户后,校验密码并签发token
            if not user.check_password(password):
                return APIResponse(1, '密码错误')
            payload = jwt_payload_handler(user)
            token = jwt_encode_handler(payload)
            return APIResponse(0, 'ok', results={
                'username': user.username,
                'mobile': user.mobile,
                'token': token
            })

    3.jwt认证:

    rest_framework_jwt.authentication.JSONWebTokenAuthentication
    ajax请求通过认证:
    请求头中要携带 authorization,值为 jwt+空格+token

    路由:

      url(r'^v3/cars/$', views.CarV3ModelViewSet.as_view({
            'get': 'list',
            'post': 'create'
        })),
        url(r'^v3/cars/(?P<pk>.*)/$', views.CarV3ModelViewSet.as_view({
            'get': 'retrieve',
            'put': 'update',
            'patch': 'partial_update',
            'delete': 'destroy',
        })),

    视图:

    from rest_framework_jwt.authentication import JSONWebTokenAuthentication
    class CarV3ModelViewSet(ModelViewSet):
        authentication_classes = [JSONWebTokenAuthentication]
        permission_classes = [IsAuthenticatedOrReadOnly]
        queryset = models.Car.objects.filter(is_delete=False)
        serializer_class = serializers.CarModelSerializer
        def destroy(self, request, *args, **kwargs):
            obj = self.get_object()
            obj.is_delete = True
            obj.save()
            return Response('删除成功')

    authonticate方法认证(游客返回None,登录由token字符串反解析为user):

     def authenticate(self, request):
            """
            Returns a two-tuple of `User` and token if a valid signature has been
            supplied using JWT-based authentication.  Otherwise returns `None`.
            """
            jwt_value = self.get_jwt_value(request)
            if jwt_value is None:
                return None
    
            try:
                payload = jwt_decode_handler(jwt_value)
            except jwt.ExpiredSignature:
                msg = _('Signature has expired.')
                raise exceptions.AuthenticationFailed(msg)
            except jwt.DecodeError:
                msg = _('Error decoding signature.')
                raise exceptions.AuthenticationFailed(msg)
            except jwt.InvalidTokenError:
                raise exceptions.AuthenticationFailed()
    
            user = self.authenticate_credentials(payload)
    
            return (user, jwt_value)
    get_jwt_value方法获取token值(settings中拿到前缀小写就是jwt,规定前台格式为jwt token字符串):
     def get_jwt_value(self, request):
            auth = get_authorization_header(request).split()
            auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()
    
            if not auth:
                if api_settings.JWT_AUTH_COOKIE:
                    return request.COOKIES.get(api_settings.JWT_AUTH_COOKIE)
                return None
    
            if smart_text(auth[0].lower()) != auth_header_prefix:
                return None
    
            if len(auth) == 1:
                msg = _('Invalid Authorization header. No credentials provided.')
                raise exceptions.AuthenticationFailed(msg)
            elif len(auth) > 2:
                msg = _('Invalid Authorization header. Credentials string '
                        'should not contain spaces.')
                raise exceptions.AuthenticationFailed(msg)
    
            return auth[1]
    get_authorization_header拿到二进制字符串:
    
    
    def get_authorization_header(request):
        """
        Return request's 'Authorization:' header, as a bytestring.
    
        Hide some test client ickyness where the header can be unicode.
        """
        auth = request.META.get('HTTP_AUTHORIZATION', b'')
        if isinstance(auth, str):
            # Work around django test client oddness
            auth = auth.encode(HTTP_HEADER_ENCODING)
        return auth
    
    

     payload签发内容:

       payload = {
            'user_id': user.pk,
            'username': username,
            'exp': datetime.utcnow() + api_settings.JWT_EXPIRATION_DELTA
        }

    在settings.py中可找到token的过期时间:300s

        'JWT_EXPIRATION_DELTA': datetime.timedelta(seconds=300),

    4.自定义jwt认证类:

    自定义:基于jwt、其它
    1)自定义认证类,继承BaseAuthentication(或其子类),重写authenticate
    2)authenticate中完成
        拿到认证标识 auth
        反解析出用户 user
        前两步操作失败 返回None => 游客
        前两步操作成功 返回user,auth => 登录用户
        注:如果在某个分支抛出异常,直接定义失败 => 非法用户

        settings.py配置 (修改过期时间和前缀):

    # jwt配置
    import datetime
    JWT_AUTH = {
        'JWT_EXPIRATION_DELTA': datetime.timedelta(seconds=30000),
        'JWT_AUTH_HEADER_PREFIX': 'TOKEN',
    }

      把authentication认证配置换成自定义的jwt认证:

    REST_FRAMEWORK = {
        'DEFAULT_AUTHENTICATION_CLASSES': [
            # django默认session校验:校验规则 游客 及 登录用户
            # 'rest_framework.authentication.SessionAuthentication',
            # 'rest_framework.authentication.BasicAuthentication',
            # 'rest_framework_jwt.authentication.JSONWebTokenAuthentication',
            'api.authentications.JWTAuthentication',
        ],
    }

    自定义authtications.py (前台发送auth的token,而且不传前缀):

    from rest_framework.authentication import BaseAuthentication
    from rest_framework.exceptions import AuthenticationFailed
    import jwt
    from rest_framework_jwt.authentication import BaseJSONWebTokenAuthentication
    from rest_framework_jwt.authentication import jwt_decode_handler
    
    class JWTAuthentication(BaseJSONWebTokenAuthentication):
        # 自定义认证类,重写authenticate方法
        def authenticate(self, request):
            # 认证通过,返回user,auth
            # 认证失败,返回None
            auth = request.META.get('HTTP_AUTH')  # 前台用auth携带token
            if not auth:
                return None
    
            try:
                payload = jwt_decode_handler(auth)
    
            # 出现jwt解析异常,直接抛出异常,代表非法用户,也可以返回None,作为游客处理
            except jwt.ExpiredSignature:
                raise AuthenticationFailed('token已过期')
            except:
                raise AuthenticationFailed('token非法')
    
    
            user = self.authenticate_credentials(payload)
            return (user, auth)

    视图:

    from rest_framework_jwt.authentication import JSONWebTokenAuthentication
    class CarV3ModelViewSet(ModelViewSet):
        #authentication_classes = [JSONWebTokenAuthentication]
        permission_classes = [IsAuthenticatedOrReadOnly]
        queryset = models.Car.objects.filter(is_delete=False)
        serializer_class = serializers.CarModelSerializer
        def destroy(self, request, *args, **kwargs):
            obj = self.get_object()
            obj.is_delete = True
            obj.save()
            return Response('删除成功')

    5.自定义权限类:

    系统:
    1)AllowAny:允许所有用户,校验方法直接返回True
    2)IsAuthenticated:只允许登录用户
        必须request.user和request.user.is_authenticated都通过
    3)IsAuthenticatedOrReadOnly:游客只读,登录用户无限制
        get、option、head 请求无限制
        前台请求必须校验 request.user和request.user.is_authenticated
    4)IsAdminUser:是否是后台用户
        校验 request.user和request.user.is_staff    is_staff(可以登录后台管理系统的用户)
        
    
    自定义:基于auth的Group与Permission表
    1)自定义权限类,继承BasePermission,重写has_permission
    2)has_permission中完成
        拿到登录用户 user <= request.user
        校验user的分组或是权限
        前两步操作失败 返回False => 无权限
        前两步操作成功 返回True => 有权限

    查看所有用户路由:

        # 权限: 只有管理员分组成员才可以查看所以用户信息
        url(r'^users/$', views.UserListAPIView.as_view()),

    视图:

    from rest_framework.generics import ListAPIView
    from .permissions import AdminPermission
    
    class UserListAPIView(ListAPIView):
        queryset = models.User.objects.filter(is_active=True)
        serializer_class = serializers.UserModelSerializer
        # 登录的用户必须是自定义管理员分组成员
        permission_classes = [AdminPermission]

    自定义permissions.py:

    from rest_framework.permissions import BasePermission
    
    class AdminPermission(BasePermission):
        # 继承BasePermission,重写has_permission
        def has_permission(self, request, view):
            # 有权限,返回True
            # 无权限,返回False
            user = request.user
            if not user:
                return False
            # 用户是 管理员 分组 (管理员分组是Group表中的一条自定义记录)
            if not user.groups.filter(name='管理员'):
                return False
            # 登录的用户必须是自定义管理员分组成员
            return True

    6.admin添加用户密文密码及其他信息:

    7.频率校验throttle 

    系统:
    1)AnonRateThrottle:对同一IP游客的限制
    2)UserRateThrottle:对同一IP登录用户的限制
    必须在settings.py中
    'DEFAULT_THROTTLE_RATES': {
        'user': '10/min',  # 登录的用户一分钟可以访问10次
        'anon': '3/min',  # 游客一分钟可以访问3次
    }
    在视图类中:
    class TempAPIView(APIView):
        ...
        throttle_classes = [AnonRateThrottle, UserRateThrottle]
        
        
    
    自定义:基于auth的Group与Permission表
    1)自定义频率类,继承SimpleRateThrottle,重写get_cache_key,明确scope
        SimpleRateThrottle已经帮我们实现了 allow_request、wait
    2)scope与settings.py的DEFAULT_THROTTLE_RATES配合使用
    3)get_cache_key中完成
        拿到限制信息 ident <= request中获取
        没有限制信息 返回None => 不限制
        有限制信息 返回限制信息字符串 => 有限制
    
    

    视图类中校验频率:

     def check_throttles(self, request):
            """
            Check if request should be throttled.
            Raises an appropriate exception if the request is throttled.
            """
            throttle_durations = []
            for throttle in self.get_throttles():
                if not throttle.allow_request(request, self):
                    throttle_durations.append(throttle.wait())
    
            if throttle_durations:
                self.throttled(request, max(throttle_durations))

    SimpleRateThrottle类

    throttle调方法前要初始化:

        def __init__(self):
            if not getattr(self, 'rate', None):
                self.rate = self.get_rate()
            self.num_requests, self.duration = self.parse_rate(self.rate)

    获取scope:

     def get_rate(self):
            """
            Determine the string representation of the allowed request rate.
            """
            if not getattr(self, 'scope', None):
                msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
                       self.__class__.__name__)
                raise ImproperlyConfigured(msg)
    
            try:
                return self.THROTTLE_RATES[self.scope]
            except KeyError:
                msg = "No default throttle rate set for '%s' scope" % self.scope
                raise ImproperlyConfigured(msg)
    THROTTLE_RATES:
    THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES

      找配置:

        # Throttling
        'DEFAULT_THROTTLE_RATES': {
            'user': None,
            'anon': None,
        },

    parse_rate推出配置中格式为'user':'1/min':

     def parse_rate(self, rate):
            """
            Given the request rate string, return a two tuple of:
            <allowed number of requests>, <period of time in seconds>
            """
            if rate is None:
                return (None, None)
            num, period = rate.split('/')
            num_requests = int(num)
            duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
            return (num_requests, duration)

    小结:

    allow_request方法:

        def allow_request(self, request, view):
            """
            Implement the check to see if the request should be throttled.
    
            On success calls `throttle_success`.
            On failure calls `throttle_failure`.
            """
            if self.rate is None:
                return True
             限制标识:
            self.key = self.get_cache_key(request, view)
            if self.key is None:
                return True
    
            self.history = self.cache.get(self.key, [])
            self.now = self.timer()
    
            # Drop any requests from the history which have now passed the
            # throttle duration
            while self.history and self.history[-1] <= self.now - self.duration:
                self.history.pop()
            if len(self.history) >= self.num_requests:
                return self.throttle_failure()
            return self.throttle_success()

     get_cache_key方法要重写:

        def get_cache_key(self, request, view):
            """
            Should return a unique cache-key which can be used for throttling.
            Must be overridden.
    
            May return `None` if the request should not be throttled.
            """
            raise NotImplementedError('.get_cache_key() must be overridden')

    到user类:

    class UserRateThrottle(SimpleRateThrottle):
        """
        Limits the rate of API calls that may be made by a given user.
    
        The user id will be used as a unique cache key if the user is
        authenticated.  For anonymous requests, the IP address of the request will
        be used.
        """
        scope = 'user'
    
        def get_cache_key(self, request, view):
            if request.user.is_authenticated:
                ident = request.user.pk
            else:
                ident = self.get_ident(request)
    
            return self.cache_format % {
                'scope': self.scope,
                'ident': ident   游客的ident为None
            }

    simple类中有cache_format字符串:

        cache_format = 'throttle_%(scope)s_%(ident)s'

    游客类:

    class AnonRateThrottle(SimpleRateThrottle):
        """
        Limits the rate of API calls that may be made by a anonymous users.
    
        The IP address of the request will be used as the unique cache key.
        """
        scope = 'anon'
    
        def get_cache_key(self, request, view):
            if request.user.is_authenticated:
                return None  # Only throttle unauthenticated requests.
    
            return self.cache_format % {
                'scope': self.scope,
                'ident': self.get_ident(request)
            }

    自定义频率校验throttle :

    配置:

    REST_FRAMEWORK = {
        'DEFAULT_AUTHENTICATION_CLASSES': [
            # django默认session校验:校验规则 游客 及 登录用户
            # 'rest_framework.authentication.SessionAuthentication',
            # 'rest_framework.authentication.BasicAuthentication',
            # 'rest_framework_jwt.authentication.JSONWebTokenAuthentication',
            'api.authentications.JWTAuthentication',
        ],
        'DEFAULT_PERMISSION_CLASSES': [
            # 'rest_framework.permissions.AllowAny',
            # 全局配置:一站式网站(所有操作都需要登录后才能访问)
            # 'rest_framework.permissions.IsAuthenticated',
        ],
        'DEFAULT_THROTTLE_RATES': {
            'user': '5/min',
            'anon': '3/min',
            'count_time': '3/min'
        }
    }

    自定义throttles.py:

    from rest_framework.throttling import SimpleRateThrottle
    
    class ThreeMinRateThrottle(SimpleRateThrottle):
        scope = 'count_time'
        def get_cache_key(self, request, view):
            # 就是限制的标识:用户id、IP地址、手机号
            ident = request.user.pk
            return self.cache_format % {
                'scope': self.scope,
                'ident': ident
            }

    或者对手机号的频率限制:

    from rest_framework.throttling import SimpleRateThrottle
    
    class ThreeMinRateThrottle(SimpleRateThrottle):
        scope = 'sms'
        def get_cache_key(self, request, view):
            # 对手机号频率限制
            ident = request.data.get('mobile')
            if not ident:  # 为发现限制条件,返回None代表不进行频率限制
                return None
            return self.cache_format % {
                'scope': self.scope,
                'ident': ident
            }
    # settings.py
    'DEFAULT_THROTTLE_RATES': {
        'user': '10/min',  # 登录的用户一分钟可以访问10次
        'anon': '3/min',  # 游客一分钟可以访问3次
        'sms': '1/min',
    }
    
    
    
  • 相关阅读:
    Codeforces Gym 100015F Fighting for Triangles 状压DP
    Codeforces Gym 100015B Ball Painting 找规律
    Codeforces Gym 100015A Another Rock-Paper-Scissors Problem 找规律
    Codeforces Gym 100231G Voracious Steve 记忆化搜索
    Codeforces Gym 100231F Solitaire 折半搜索
    Codeforces Gym 100231L Intervals 数位DP
    Codeforces Gym 100231B Intervals 线段树+二分+贪心
    Codeforces Round #339 (Div. 1) A. Peter and Snow Blower 计算几何
    Codeforces Round #339 (Div. 2) B. Gena's Code 水题
    Codeforces Round #339 (Div. 2) A. Link/Cut Tree 水题
  • 原文地址:https://www.cnblogs.com/sima-3/p/11494494.html
Copyright © 2011-2022 走看看