zoukankan      html  css  js  c++  java
  • drf-jwt认证组件、权限组件、频率组件的使用

    drf-jwt认证组件、权限组件、频率组件的使用

    三大认证流程图:

    认证组件

    在restframework中自带认证组件,而其自带的认证组件是如何认证校验的呢:

    class BaseAuthentication:
        """
        All authentication classes should extend BaseAuthentication.
        所有身份验证类都应扩展BaseAuthentication。
        """
    
        def authenticate(self, request):
            """
            Authenticate the request and return a two-tuple of (user, token).
            验证请求并返回(user, token)的二元组。
            """
            raise NotImplementedError(".authenticate() must be overridden.")
    
        def authenticate_header(self, request):
            """
            Return a string to be used as the value of the `WWW-Authenticate`
            header in a `401 Unauthenticated` response, or `None` if the
            authentication scheme should return `403 Permission Denied` responses.
            
            返回一个字符串,用作“ 401 Unauthenticated”响应中的“ WWW-Authenticate”标头的值;如果身份验证方案应返回“ 403 Permission Denied”响应,则返回“ None”。
            """
            pass
    
    
    class BasicAuthentication(BaseAuthentication):
        """
        HTTP Basic authentication against username/password.
        针对用户名/密码的HTTP基本身份验证。
        """
        www_authenticate_realm = 'api'
    
        def authenticate(self, request):
            """
            Returns a `User` if a correct username and password have been supplied
            using HTTP Basic authentication.  Otherwise returns `None`.
            如果使用HTTP Basic身份验证提供了正确的用户名和密码,则返回“用户”。 否则返回“无”。
            """
            auth = get_authorization_header(request).split()#按空格拆分,拆分的结果长度为2才合法
    
            if not auth or auth[0].lower() != b'basic':#auth的结构大致是“basic abc.def.hig ”
                #如果没有token,认证方法直接返回None,代表游客方式访问
                return None
    
            #判断auth是否被切成了两份
            if len(auth) == 1:
                msg = _('Invalid basic header. No credentials provided.')
                raise exceptions.AuthenticationFailed(msg)
            elif len(auth) > 2:
                msg = _('Invalid basic header. Credentials string should not contain spaces.')
                raise exceptions.AuthenticationFailed(msg)
    		#作用:当提交的token格式有误则抛出异常,(非法用户访问,可用于反爬)
            try:
                #反解token(auth是被查分的列表,0是头,1是token)
                auth_parts = base64.b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).partition(':')
            except (TypeError, UnicodeDecodeError, binascii.Error):
                msg = _('Invalid basic header. Credentials not correctly base64 encoded.')
                raise exceptions.AuthenticationFailed(msg)
                #如果反解失败则抛出异常,代表该用户是非法用户
    
            userid, password = auth_parts[0], auth_parts[2]
            return self.authenticate_credentials(userid, password, request)
    		#token反解成功,返回(user,token)组成的元组,代表合法用户
            #元组0位的user会被存储到request.user中
            #元组1位的token会被存储到request.auth中,通常也可以不保存,所以可以用none填充。
            
        def authenticate_credentials(self, userid, password, request=None):
            """
            Authenticate the userid and password against username and password
            with optional request for context.
            针对用户名和密码对用户标识和密码进行身份验证,并提供可选的上下文请求。
            """
            credentials = {
                get_user_model().USERNAME_FIELD: userid,
                'password': password
            }
            user = authenticate(request=request, **credentials)
    
            if user is None:
                raise exceptions.AuthenticationFailed(_('Invalid username/password.'))
    
            if not user.is_active:
                raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
    
            return (user, None)
    
        def authenticate_header(self, request):
            return 'Basic realm="%s"' % self.www_authenticate_realm
    

    这里我们不采用rest-framework的身份认证组件而是采用drf-jwt框架的认证组件,下面看一下相关源码:

    class BaseJSONWebTokenAuthentication(BaseAuthentication):
        """
        Token based authentication using the JSON Web Token standard.
        """
    
        def authenticate(self, request):
            """
            Returns a two-tuple of `User` and token if a valid signature has been
            supplied using JWT-based authentication.  Otherwise returns `None`.
            """
            jwt_value = self.get_jwt_value(request)
            #从请求中拿出token
            if jwt_value is None:
                return None
            #如果没有token则返回None,以游客方式访问
            
    
            #反解token,反解失败,抛出异常,用户为非法用户;反解成功则反解的数据就是载荷,存放在payload中
            try:
                payload = jwt_decode_handler(jwt_value)
            except jwt.ExpiredSignature:
                msg = _('Signature has expired.')
                raise exceptions.AuthenticationFailed(msg)
            except jwt.DecodeError:
                msg = _('Error decoding signature.')
                raise exceptions.AuthenticationFailed(msg)
            except jwt.InvalidTokenError:
                raise exceptions.AuthenticationFailed()
    
                #载荷校验得到登录用户
            user = self.authenticate_credentials(payload)
    
            #得到登录用户,返回(user,token)
            return (user, jwt_value)
    
        def authenticate_credentials(self, payload):
            """
            Returns an active user that matches the payload's user id and email.
            """
            User = get_user_model()
            username = jwt_get_username_from_payload(payload)
    
            if not username:
                msg = _('Invalid payload.')
                raise exceptions.AuthenticationFailed(msg)
    
            try:
                user = User.objects.get_by_natural_key(username)
            except User.DoesNotExist:
                msg = _('Invalid signature.')
                raise exceptions.AuthenticationFailed(msg)
    
            if not user.is_active:
                msg = _('User account is disabled.')
                raise exceptions.AuthenticationFailed(msg)
    
            return user
    
    
    class JSONWebTokenAuthentication(BaseJSONWebTokenAuthentication):
        """
        Clients should authenticate by passing the token key in the "Authorization"
        HTTP header, prepended with the string specified in the setting
        `JWT_AUTH_HEADER_PREFIX`. For example:
    
            Authorization: JWT eyJhbGciOiAiSFMyNTYiLCAidHlwIj
        """
        www_authenticate_realm = 'api'
    
        def get_jwt_value(self, request):
            auth = get_authorization_header(request).split()
            auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()
    #这里的校验和rest-framework类似
            if not auth:
                if api_settings.JWT_AUTH_COOKIE:
                    return request.COOKIES.get(api_settings.JWT_AUTH_COOKIE)
                return None
    
            if smart_text(auth[0].lower()) != auth_header_prefix:
                return None
    
            if len(auth) == 1:
                msg = _('Invalid Authorization header. No credentials provided.')
                raise exceptions.AuthenticationFailed(msg)
            elif len(auth) > 2:
                msg = _('Invalid Authorization header. Credentials string '
                        'should not contain spaces.')
                raise exceptions.AuthenticationFailed(msg)
    
            return auth[1]
    
        def authenticate_header(self, request):
            """
            Return a string to be used as the value of the `WWW-Authenticate`
            header in a `401 Unauthenticated` response, or `None` if the
            authentication scheme should return `403 Permission Denied` responses.
            """
            return '{0} realm="{1}"'.format(api_settings.JWT_AUTH_HEADER_PREFIX, self.www_authenticate_realm)
    
    

    我们还可以自定义认证组件,方法是自己新建一个authentications文件,然后重写校验代码,并在settings中进行配置,但是我们一般不采用自定义的方法而是使用drf-jwt认证组件进行身份认证。由于身份认证只是判断一下来访问的客户端是什么身份,并不做其他的处理,所以我们一般需要对其进行全局配置,因为所有来访问的人是什么身份,以便权限组件进行用户权限的处理。

    认证组件的全局配置:

    '''
    1.settings文件全局配置drf-jwt框架的认证类
    REST_FRAMEWORK = {
        # 认证组件
        'DEFAULT_AUTHENTICATION_CLASSES': [
            'rest_framework_jwt.authentication.JSONWebTokenAuthentication'
        ],
    }
    
    2.在自定义的authentications文件中导入认证组件
    from rest_framework_jwt.authentication import JSONWebTokenAuthentication
    class MyAuthentication(JSONWebTokenAuthentication):
        def authenticate(self, request):
            """
            1) 从请求头中拿到前台提交的token(一般从HTTP_AUTHORIZATION中拿,也可以与前台约定)
                  -- 如果设置了反爬等措施,校验一下反爬(头 token)
            2)没有token,返回None,代表游客
            3)有token,进入校验
                  -- 不通过:抛AuthenticationFailed异常,代表非法用户
                  -- 通过:返回 (user, token),代表合法用户
            """
            pass#这里的pass相当于return None
    '''
    

    自定义认证类的步骤:

    # 自定义认证类
    # 1) 如果使用session认证,drf默认提供了SessionAuthentication
    # 2) 如果使用drf-jwt认证框架,drf-jwt框架提供了JSONWebTokenAuthentication
    # 3) 如果是自定义签发与校验token,才需要将校验token的算法封装到自定义的认证类中
    from rest_framework.authentication import BaseAuthentication
    class MyAuthentication(BaseAuthentication):
        def authenticate(self, request):
          
            pass#这里的pass相当于return None
    

    权限组件

    权限组件的校验套路都是一样的,如果验证通过就return True,如不通过就return False。

    class AllowAny(BasePermission):
        """
        Allow any access.
        This isn't strictly required, since you could use an empty
        permission_classes list, but it's useful because it makes the intention
        more explicit.
        """
    
        def has_permission(self, request, view):
            return True
    
    drf默认提供了一些权限类:
    AllowAny:游客和登录用户有全权限
    IsAuthenticated:只有登录用户有全权限
    IsAdminUser:只有后台用户(admin用户)有全权限
    IsAuthenticatedOrReadOnly:游客有读权限,登录用户有全权限
    

    权限组件为我们提供的校验功能是有限的,我们可以自定义自己的校验功能。自定义权限类的步骤如下:

    1.新建自己的permissions文件,定义自己的校验类并继承BasePermission类

    from rest_framework.permissions import BasePermission
    
    """如果有特殊需要,需要自定义权限类
        如:只有superuser有权限、只有vip用户有权限、只有某ip网段用户有权限、只有某个视图及其子类有权限
    """
    class MyPermission(BasePermission):
        def has_permission(self, request, view):
            """
            1) 根据需求,request和view的辅助,制定权限规则判断条件
            2)如果条件通过,返回True
            3)如果条件不通过,返回False
            """
            print(request.user, request.auth)
            return False
    
    # VIP用户权限
    class VIPUserPermission(BasePermission):
        def has_permission(self, request, view):
            for group in request.user.groups.all():
                if group.name.lower() == 'vip':#如果用户的group是vip则校验通过,否则不通过。
                    return True
            return False
    

    我们还可结合权限组件的权限类使用,方法:

    from rest_framework.permissions import IsAuthenticated, IsAdminUser, AllowAny, IsAuthenticatedOrReadOnly
        permission_classes = [IsAuthenticated]
        permission_classes = [IsAdminUser]#这里是在重写APIView的权限校验属性,属于局部配置。如果我们需要添加多个权限类则直接在中括号内添加。
    

    直接将对应的类导入进行属性添加即可。

    频率组件

      # 频率组件:频率类一般做局部配置,但是频率调节在settings中配置
        'DEFAULT_THROTTLE_RATES': {
            'user': '5/min',
            'anon': '3/min',
            'mobile': '1/min'
        },
    

    drf-jwt签发token源码分析

    # drf-jwt自定义配置
    import datetime
    
    JWT_AUTH = {
        # 过期时间
        'JWT_EXPIRATION_DELTA': datetime.timedelta(seconds=300),
        # 是否允许刷新
        'JWT_ALLOW_REFRESH': False,
        # 最大刷新的过期时间
        'JWT_REFRESH_EXPIRATION_DELTA': datetime.timedelta(days=7),
    }
    
    

    自定义签发token实现多方式登录

    源码分析

    首先我们来分析一下rest-framework-jwt签发token的代码:

    class JSONWebTokenSerializer(Serializer):
        """
        Serializer class used to validate a username and password.
    
        'username' is identified by the custom UserModel.USERNAME_FIELD.
    
        Returns a JSON Web Token that can be used to authenticate later calls.
        """
        def __init__(self, *args, **kwargs):
            """
            Dynamically add the USERNAME_FIELD to self.fields.
            """
            super(JSONWebTokenSerializer, self).__init__(*args, **kwargs)
    		#jwt的校验规则只校验username和password
            self.fields[self.username_field] = serializers.CharField()
            self.fields['password'] = PasswordField(write_only=True)
    
        @property
        def username_field(self):
            return get_username_field()
    
        def validate(self, attrs):
            #将username和password包装到字典中
            credentials = {
                self.username_field: attrs.get(self.username_field),
                'password': attrs.get('password')
            }
    
            #用auth组件的authenticate方法进行校验,该方法也只能校验username和password
            if all(credentials.values()):
                user = authenticate(**credentials)
                
    			#判断用户是否激活
                if user:
                    if not user.is_active:
                        msg = _('User account is disabled.')
                        raise serializers.ValidationError(msg)
    				
                    #将激活的用户添加到载荷中
                    payload = jwt_payload_handler(user)
    				
                    #将token和user存放在serializer对象中
                    return {
                        'token': jwt_encode_handler(payload),#签发token字符串
                        'user': user
                    }
                else:
                    msg = _('Unable to log in with provided credentials.')
                    raise serializers.ValidationError(msg)
            else:
                msg = _('Must include "{username_field}" and "password".')
                msg = msg.format(username_field=self.username_field)
                raise serializers.ValidationError(msg)
    
    

    通过上面源码我们可以总结drf-jwt签发token的步骤如下:

    1. username、password通过auth组件的authenticate方法得到user对象

    2. user对象通过drf-jwt框架的jwt_payload_handler函数包装payload载荷

    3. payload载荷通过drf-jwt框架的jwt_encode_handler函数签发token字符串

      注:我们可以借助jwt_payload_handler和jwt_encode_handler两个函数完成自定义jws-token的签发。

    多方式登陆签发token实例

    # 多方式登录
    #视图类
    from rest_framework.views import APIView
    class LoginAPIView(APIView):
        """ 重点
        1)token只能由登录接口签发
        2)登录接口也是APIView的子类,使用一定会进行认证、权限组件的校验
        结论:不管系统默认、或是全局settings配置的是何认证与权限组件,登录接口不用参与任何认证与权限的校验
        所以,登录接口一定要进行认证与权限的局部禁用
        """
        authentication_classes = []
        pagination_class = []
    
        def post(self, request, *args, **kwargs):
            serializer = serializers.LoginModelSerializer(data=request.data)
            serializer.is_valid(raise_exception=True)  # 内部在全局钩子中完成token的签发
            return APIResponse(results={
                'username': serializer.content.get('user').username,
                'token': serializer.content.get('token')
            })
    
      
    
     #序列化类 
    from rest_framework_jwt.serializers import jwt_payload_handler, jwt_encode_handler
    import re
    class LoginModelSerializer(serializers.ModelSerializer):
        # post请求,序列化默认当做create动作进行校验,需要校验数据库,create动作username会抛用户已存在异常
        # 抛用户已存在异常是多余的,所以自定义系统校验规则即可
        username = serializers.CharField(min_length=3, max_length=16)
        password = serializers.CharField(min_length=3, max_length=16)
        class Meta:
            model = models.User
            fields = ('username', 'password')
    
        # 用全局钩子,完成token的签发
        def validate(self, attrs):
            # 1)通过 username 和 password 完成多方式登录校验,得到user对象
            user = self._validate_user(attrs)
            # 2)user对象包装payload载荷
            payload = jwt_payload_handler(user)
            # 3)payload载荷签发token
            token = jwt_encode_handler(payload)
            # 4)将user与token存储到serializer对象中,方便在视图类中使用
            self.content = {
                'user': user,
                'token': token
            }
            return attrs
    
        def _validate_user(self, attrs):
            username = attrs.get('username')
            password = attrs.get('password')
    
            if re.match(r'.*@.*', username):  # 邮箱
                user = models.User.objects.filter(email=username).first()  # type: models.User
            elif re.match(r'^1[3-9][0-9]{9}$', username):  # 电话
                user = models.User.objects.filter(mobile=username).first()
            else:  # 用户名
                user = models.User.objects.filter(username=username).first()
    
            if not user or not user.check_password(password):
                raise serializers.ValidationError({'message': '用户信息异常'})
    
    
    """
    
    

    频率组件

    AnonRateThrottle:只对游客进行频率限制
    UserRateThrottle:对所有用户进行频率限制

    看一下频率组件的源码:

    class AnonRateThrottle(SimpleRateThrottle):
        """
        Limits the rate of API calls that may be made by a anonymous users.
    
        The IP address of the request will be used as the unique cache key.
        限制匿名用户可能进行的API调用的速率。
    
         请求的IP地址将用作唯一的缓存密钥。
        """
        scope = 'anon'
    
        def get_cache_key(self, request, view):
            if request.user.is_authenticated:
                #登录用户返回none代表登录用户可以无限次访问,我们可以重新在settings中配置需要限制的频率
                return None  # Only throttle unauthenticated requests.
    
            return self.cache_format % {
                #匿名用户,返回一个与当前匿名用户有关的字符串
                'scope': self.scope,
                'ident': self.get_ident(request)
            }
    
    
    
    # 配置drf自带的频率类
    'DEFAULT_THROTTLE_CLASSES': [],
    #在settings文件中全局配置
    from rest_framework.throttling import AnonRateThrottle, UserRateThrottle
    throttle_classes = [UserRateThrottle]
    #在视图类中局部配置
    
    #在我们的settings文件中的REST_FRAMEWORK中进行如下频率配置
    'DEFAULT_THROTTLE_RATES': {
            'user': '5/min',#自定义已登录用户每分钟访问次数
            'anon': '3/min',#自定义匿名用户每分钟访问次数
            'mobile': '1/min'
        },
    

    自定义频率类

    在实际应用中如对ip进行限次、对电话进行限次、对视图某些信息进行限次我们需要自定义频率类。

    以手机号为例:

    from rest_framework.throttling import SimpleRateThrottle
    
    class MobileRateThrottle(SimpleRateThrottle):
        """
        1)设置scope字符串类属性,同时在settings中进行drf配置DEFAULT_THROTTLE_RATES
            eg: DEFAULT_THROTTLE_RATES = {'mobile': '1/min'}
        2)重写get_catch_key方法:
            返回与限制条件有关的字符串,表示限制
            返回None,表示不限制
        """
        scope = 'mobile'
        def get_cache_key(self, request, view):
            if not request.user.is_authenticated or not request.user.mobile:
                return None  # 匿名用户 或 没有电话号的用户 都不限制
    
            # 只要有电话号的用户踩进行限制
            return self.cache_format % {
                'scope': self.scope,
                'ident': request.user.mobile
            }
    
    
  • 相关阅读:
    Java 将Clob字段转换成String字符串
    Java BigDecimal详解
    java循环创建对象应该在循环体内还是循环体外
    Java 通过身份证获取生日和性别
    Java将数据进行分组处理
    Java 2进制和16进制的转换
    Map-HashMap 与 IF 判断内存占用对比
    Groovy脚本-通用SQL开关
    Oracle用户、授权、角色管理
    【docker随笔系列】安装sqlserver
  • 原文地址:https://www.cnblogs.com/ghylpb/p/12142751.html
Copyright © 2011-2022 走看看