zoukankan      html  css  js  c++  java
  • JWT认证

    一. jwt实现过程

    1. 构建jwt过程#

    第一: 用户提交用户名和密码给服务端,如果登录成功,使用jwt创建一个token,并给用户返回

    Copy
    eyJ0eXAiOiJqd3QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoxLCJ1c2VybmFtZSI6InpjYyIsImV4cCI6MTU5NDczODg5MX0.OCG4mUhs_yXIkxtxvG9MWJWjpbvnSGDcqMVtpsn_0mo
    

    第二步: 构建三段字符串之间的关系

    Copy
    # 第一段字符串 headers内部包含了算法 和 token类型。
       	流程: 先将python类型对象装换成json格式字符串, 然后做base64加密
        headers = {
            'typ': 'jwt',
            'alg': 'HS256',
        }    
    
        
    # 第二段字符串payload,自定义的值
        流程: 先将python类型对象装换成json格式字符串,然后做base64加密
        payload = {
            'user_id': user.pk,
            'username': username,
            'exp': datetime.datetime.utcnow() + datetime.timedelta(seconds=300),  # 超时时间
        }
        
    # 第三段字符串 
        第一步:把12部分base64加密过后的结果进行拼接加密
        第二步:对前2部分的加密结果进行hs256加密 + 加盐
        第三步:对hs256加密后的密文在进行base64url加密再拼接到前1, 2部分base64格式的末尾作为sign.
    

    第三步: 以后用户访问时,需要携带token,后端需要对token校验

    2. 校验jwt过程

    第一步: 获取token

    第二步: 对token进行切割, 获取第二段内容进行base64解密,获取payload信息, 检查超时时间是否超时

    第三步:由于第三部分的字符串不能反解,把第一和第二段在进行hs256加密

    Copy
    1.12部分base64的密文拼接加密
    2. 对前2部分加密进行hs256加密+加盐得到密文
    3. 再将密文机进行base64加密, 与前两段的base64d格式的密文进行对比, 如果相等,表示token没有修改通过.
    

    二. drf-jwt安装

    官网: http://getblimp.github.io/django-rest-framework-jwt/

    安装: pip install djangorestframework-jwt

    三. 使用内置jwt认证+签发token

    1. 快速使用

    Copy
    # 路由中配置
    # 提示: 
    '''
    obtain_jwt_token本质是由ObtainJSONWebToken类调用as_view类方法实例化出来的, 其实路由中这样写也可以:
        path('login/', ObtainJSONWebToken.as_view()),
    '''
    
    from rest_framework_jwt.views import ObtainJSONWebToken, obtain_jwt_token, JSONWebTokenAPIView, VerifyJSONWebToken
    
    urlpatterns = [
        # path('login/', ObtainJSONWebToken.as_view()),
        path('login/', obtain_jwt_token),
    ]
    

    解析: 为什么路由中配置了obtain_jwt_token用户认证, 签发token等等都不需要写了?

    Copy
    # 帮我们写了视图认证实现接受用户请求及基于请求响应:
        看继承关系: obtain_jwt_token = ObtainJSONWebToken.as_view() -> ObtainJSONWebToken -> JSONWebTokenAPIView
        JSONWebTokenAPIView就是我们的视图类. 它里面写了post方法, 处理我们的认证请求.
        
    # 帮我们写了序列化器实现了token的签发:
        class ObtainJSONWebToken(JSONWebTokenAPIView):
            # JSONWebTokenSerializer内部就在序列换器里面使用了validate钩子, 实现了token的签发
            serializer_class = JSONWebTokenSerializer
    

    2. 使用内置认证快速实现认证+签发token引发的三大缺陷及解决.

    1) 缺陷1: jwt提供的视图中is_valid校验成功时, 无法自定义返回的response的结构.

    解决: 需要自定义jwt的response函数, 用来覆盖它默认使用的jwt_response_payload_handler函数的返回格式

    Copy
    # 路由代码:
    
    
    # utils/jwt_response.py 自定义认证成功返回格式:
    from rest_framework_jwt.utils import jwt_response_payload_handler
    
    def custom_jwt_response_payload_handler(token, user=None, request=None):
        # 返回什么, 认证成功时就返回什么格式
        return {
            'status': 1000,
            'messages': '登录成功',
            'token': token,
        }
    
    # 配置文件中配置:
    JWT_AUTH = {
        'JWT_RESPONSE_PAYLOAD_HANDLER':
        'apps.utils.jwt_response.custom_jwt_response_payload_handler',
    }
    

    2) 缺陷2: jwt提供的视图中is_valid校验失败时, 无法自定义返回response的结构. (基于缺陷1的升级)

    新建类继承jwt提供的JSONWebTokenAPIView视图类, 重写post方法, 使用super, 拿到返回的response对象, 判断response.data字典中是否有token这个key, 如果有token表示用户登录认证成功, 返回正确的response. 没有token表示认证失败, 返回错误的response

    Copy
    # 路由代码:
    from user.views import CustomObtainJSONWebToken
    
    urlpatterns = [
        path('login/', CustomObtainJSONWebToken.as_view()),
    ]
    
    # 视图代码:
    from rest_framework_jwt.views import ObtainJSONWebToken
    
    class CustomObtainJSONWebToken(ObtainJSONWebToken):
        def post(self, request, *args, **kwargs):
            response = super().post(request, *args, **kwargs)
            # 有token表示用户登录认证成功, 返回正确的response. 没有token表示认证失败, 返回错误的response.
            if response.data.get('token'):
                obj = CommonResponse(messages='登陆成功', results=response.data)
            else:
                obj = CommonResponse(messages='登录失败', results=response.data)
            return obj
    

    3) 缺陷3: jwt提供的认证签发token机制无法实现用户多方式登录 (基于缺陷2的再次封装. 其实这样封装你会发现意义以及不大了!!!!)

    解决: 新建类基础jwt提供的JSONWebTokenSerializer序列化类, 重写validate方法, 实现用户多方式登录签发token.

    Copy
    # 路由代码:
    from rest_framework_jwt.views import ObtainJSONWebToken, obtain_jwt_token, JSONWebTokenAPIView, VerifyJSONWebToken
    # import user.views as views
    from user.views import CustomObtainJSONWebToken
    
    urlpatterns = [
        path('admin/', admin.site.urls),
        path('api/', include('user.urls')),
        path('login_jwt/', ObtainJSONWebToken.as_view()),
        path('login_custom/', CustomObtainJSONWebToken.as_view()),
        path('login_custom/', CustomObtainJSONWebToken.as_view()),
        # path(r'xadmin/', xadmin.site.urls)
    ]
    
    # 视图代码:
    from rest_framework_jwt.views import ObtainJSONWebToken
    from .ser import CostomJSONWebTokenSerializer
    
    
    class CustomObtainJSONWebToken(ObtainJSONWebToken):
        serializer_class = CostomJSONWebTokenSerializer
    
        def post(self, request, *args, **kwargs):
            response = super().post(request, *args, **kwargs)
            # 有token表示用户登录认证成功, 返回正确的response. 没有token表示认证失败, 返回错误的response.
            if response.data.get('token'):
                obj = CommonResponse(messages='登陆成功', results=response.data)
            else:
                obj = CommonResponse(code=2000, messages='登录失败', results=response.data)
            return obj
    
    # 序列化器代码:
    import re
    import jwt
    import datetime
    from django.conf import settings
    from rest_framework.exceptions import ValidationError
    from rest_framework_jwt.views import JSONWebTokenSerializer
    
    
    class CostomJSONWebTokenSerializer(JSONWebTokenSerializer):
        # 使用pyjwt
        def verify_username(self, username):
            """多方式登录校验"""
            if re.search(r'^1[3-9][0-9]{9}$', username):
                user = models.User.objects.filter(mobile=username).first()
            elif re.search(r'^.*?@.*?.com$', username):
                user = models.User.objects.filter(email=username).first()
            else:
                user = models.User.objects.filter(username=username).first()
    
            if user:
                return user
            raise ValidationError("用户名错误!")
    
        def verify_password(self, user, password):
            """校验密码"""
            is_success = user.check_password(raw_password=password)
            if not is_success:
                raise ValidationError("用户密码错误!")
    
        def sign_token(self, user):
            """签发token"""
            headers = {
                'typ': 'jwt',
                'alg': 'HS256',
            }
            payload = {
                'user_id': user.pk,
                'username': user.username,
                'exp': datetime.datetime.utcnow() + datetime.timedelta(seconds=300),
            }
    
            slat = settings.SECRET_KEY
            token = jwt.encode(payload=payload, key=slat, headers=headers)
            return token
    
        def validate(self, attrs):
            """校验用户名, 校验密码, 签发token"""
            username = attrs.get('username')
            password = attrs.get('password')
    
            user = self.verify_username(username)
    
            self.verify_password(user, password)
    
            token = self.sign_token(user)
            # 返回什么格式由jwt_response_payload_handler来控制. 因此任然需要重写自定义jwt_response_payload_handler
            return {
                'token': token,
                'user': user
            }
    
    # utils/jwt_response.py 自定义认证成功返回格式:
    from rest_framework_jwt.utils import jwt_response_payload_handler
    
    def custom_jwt_response_payload_handler(token, user=None, request=None):
        # 返回什么, 认证成功时就返回什么格式
        return {
            # 'status': 1000,
            # 'messages': '登录成功',
            'token': token,
            'username': user.username,
        }
    
    
    # 配置文件配置
    JWT_AUTH = {
        'JWT_RESPONSE_PAYLOAD_HANDLER':
        'apps.utils.jwt_response.custom_jwt_response_payload_handler',
    }
    

    3. 使用内置提供的认证

    头部访问格式: 使用内置的如果没有修改配置文件中配置的前缀, 那么jwt前缀必须要加, 如果不加前缀认证就返回None, 认证就失效了. 大小写都行.

    Authorizationjwt eyJ0eXAiOiJqd3QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoxLCJ1c2VybmFtZSI6InlhbmciLCJleHAiOjE1OTUwODY3NDZ9.aaehvGOl3AMI5gfU2Z9L8GH015pWIitOCXLgBJ5zl8E
    Authorization JWT eyJ0eXAiOiJqd3QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoxLCJ1c2VybmFtZSI6InlhbmciLCJleHAiOjE1OTUwODY3NDZ9.aaehvGOl3AMI5gfU2Z9L8GH015pWIitOCXLgBJ5zl8E

    拓展: 认证前缀可以修改

    Copy
    from rest_framework_jwt import settings
    'JWT_AUTH_HEADER_PREFIX': 'JWT',
    

    内置认证配置: JSONWebTokenAuthentication要和IsAuthenticated连用, 因为不符合内置的认证返回的是None, 那么就获取不到用户对象, 此时是匿名用户IsAuthenticated就对匿名用户做了认证. 因此2个要搭配使用

    Copy
    from rest_framework.views import APIView
    from rest_framework.permissions import IsAuthenticated
    from rest_framework_jwt.authentication import JSONWebTokenAuthentication
    from rest_framework.response import Response
    
    
    class OderAPIView(APIView):
        authentication_classes = (JSONWebTokenAuthentication, )
        permission_classes = [IsAuthenticated]
    
        def get(self, request, *args, **kwargs):
            return Response('订单视图')
    

    4. 使用内置认证控制登录成功时response返回的格式

    Copy
    from rest_framework_jwt.utils import jwt_response_payload_handler
    
    
    def custom_jwt_response_payload_handler(token, user=None, request=None):
        """返回值就是登录成功以后返回的数据格式"""
        return {
            'status': 1000,
            'messages': 'ok',
            'user_id': user.pk,
            'username': user.username,
            'token': token,
        }
    
    # 第二步: settings.py文件中配置成自己的路径即可
    JWT_AUTH = {
        # utils.jwt_response_payload_handler.custom_jwt_response_payload_handler
        'JWT_RESPONSE_PAYLOAD_HANDLER':
            'utils.jwt_response_payload_handler.custom_jwt_response_payload_handler',
    }
    

    三. 自定义jwt认证+签发

    1. 自定义jwt认证

    1) 继承BaseAuthentication实现

    Copy
    import jwt
    from rest_framework_jwt.authentication import BaseAuthentication
    
    from rest_framework_jwt.authentication import jwt_decode_handler
    from rest_framework_jwt.utils import jwt_decode_handler
    
    from rest_framework.exceptions import AuthenticationFailed
    from rest_framework.exceptions import APIException
    
    from app01.models import User
    
    class CustomJwtAuthentication(BaseAuthentication):
        def authenticate(self, request):
            jwt_value = request.META.get('HTTP_AUTHORIZATION')
            if jwt_value:
                try:
                    payload = jwt_decode_handler(jwt_value)
                except jwt.ExpiredSignature:
                    raise AuthenticationFailed('签名过期!')
                except jwt.DecodeError:
                    raise AuthenticationFailed("签名解码错误!")
                except jwt.InvalidTokenError:
                    raise AuthenticationFailed('token无效!')
                except Exception as e:
                    raise AuthenticationFailed(str(e))
    
                # print('payload:', payload)   # payload: {'user_id': 1, 'username': 'egon', 'exp': 1594819676, 'email': ''}
                # 方式一: 缺点, 查数据库耗费时间
                # user_obj = User.objects.get(pk=payload.get('user_id'))
                # print('user_obj.phone:', user_obj.phone)  # 17621839222
    
                # 方式二: 缺点, 没有传递的数据就获取不到
                user_obj = User(id=payload.get('user_id'), username=payload.get('username'))
                print('user_obj.phone:', [user_obj.phone])
                return user_obj, jwt_value    # ['']
            raise AuthenticationFailed('请携带认证信息!')
            
            
    # 全局使用
    REST_FRAMEWORK = {
        # 认证模块
        'DEFAULT_AUTHENTICATION_CLASSES': (
            'users.app_auth.CustomJwtAuthentication',
        ),
    }
    
    # 局部使用
    from user.authentications import CustomJwtAuthentication
    authentication_classes = [CustomJwtAuthentication]            
    

    2) 继承BaseJSONWebTokenAuthentication + 手动get获取jwt_value 或者 自动获取jwt_value实现#

    Copy
    import jwt
    from rest_framework_jwt.authentication import BaseJSONWebTokenAuthentication
    
    from rest_framework_jwt.authentication import jwt_decode_handler
    from rest_framework_jwt.utils import jwt_decode_handler
    
    from rest_framework.exceptions import AuthenticationFailed
    from rest_framework.exceptions import APIException
    
    from rest_framework.authentication import get_authorization_header
    
    
    class CustomJwtAuthentication(BaseJSONWebTokenAuthentication):
        def authenticate(self, request):
            # 手动获取jwt_value
            # jwt_value = request.META.get('HTTP_AUTHORIZATION')
    
            # 自动获取jwt_value
            jwt_value = get_authorization_header(request)
            if jwt_value:
                try:
                    payload = jwt_decode_handler(jwt_value)
                except jwt.ExpiredSignature:
                    raise AuthenticationFailed("签名过期!")
                except jwt.DecodeError:
                    raise AuthenticationFailed('解码错误!')
                except jwt.InvalidTokenError:
                    raise AuthenticationFailed('token无效!')
                except Exception as e:
                    raise AuthenticationFailed(str(e))
                user = self.authenticate_credentials(payload)
                # print('user.phone:', user.phone)   # 17621839222
                return user, jwt_value
            raise AuthenticationFailed('请携带token信息!')
            
    # 全局使用
    REST_FRAMEWORK = {
        # 认证模块
        'DEFAULT_AUTHENTICATION_CLASSES': (
            'users.app_auth.CustomJwtAuthentication',
        ),
    }
    
    # 局部使用
    from user.authentications import CustomJwtAuthentication
    authentication_classes = [CustomJwtAuthentication]            
    

    3) 继承JSONWebTokenAuthentication + copy源码及导入模块快速实现

    提示: 只需要删除一部分, 新增一句代码即可!!!!

    Copy
    import jwt
    from rest_framework_jwt.authentication import JSONWebTokenAuthentication
    from rest_framework_jwt.utils import jwt_decode_handler
    from django.utils.translation import ugettext as _
    from rest_framework import exceptions
    
    
    class CustomJSONWebTokenAuthentication(JSONWebTokenAuthentication):
        def authenticate(self, request):
            jwt_value = self.get_jwt_value(request)
            # 删除这一部分
            # if jwt_value is None:
            #     return None
    
            try:
                payload = jwt_decode_handler(jwt_value)
            except jwt.ExpiredSignature:
                msg = _('Signature has expired.')
                raise exceptions.AuthenticationFailed(msg)
            except jwt.DecodeError:
                msg = _('Error decoding signature.')
                raise exceptions.AuthenticationFailed(msg)
            except jwt.InvalidTokenError:
                raise exceptions.AuthenticationFailed()
    
            # 新增这一部分
            except Exception as e:
                raise exceptions.AuthenticationFailed(str(e))
    
            user = self.authenticate_credentials(payload)
    
            return (user, jwt_value)
    
        
    # 全局使用
    REST_FRAMEWORK = {
        # 认证模块
        'DEFAULT_AUTHENTICATION_CLASSES': (
            'users.app_auth.CustomJSONWebTokenAuthentication',
        ),
    }
    
    # 局部使用
    from user.authentications import CustomJSONWebTokenAuthentication
    authentication_classes = [CustomJSONWebTokenAuthentication]    
    

    4) 使用pyjwt模块并继承JSONWebTokenAuthentication实现

    Copy
    import jwt
    
    '''
    from .exceptions import (
        InvalidTokenError, DecodeError, InvalidAlgorithmError,
        InvalidAudienceError, ExpiredSignatureError, ImmatureSignatureError,
        InvalidIssuedAtError, InvalidIssuerError, ExpiredSignature,
        InvalidAudience, InvalidIssuer, MissingRequiredClaimError,
        InvalidSignatureError,
        PyJWTError,
    )
    '''
    from django.conf import settings
    from rest_framework_jwt.authentication import JSONWebTokenAuthentication
    from rest_framework.exceptions import AuthenticationFailed
    
    
    class JWTHeadersAuthentication(JSONWebTokenAuthentication):
        def authenticate(self, request):
            token = request.META.get("HTTP_AUTHORIZATION")
            if token:
                salt = settings.SECRET_KEY
                """
                def decode(self,
                   jwt,  # type: str
                   key='',   # type: str
                   verify=True,  # type: bool
                   algorithms=None,  # type: List[str]
                   options=None,  # type: Dict
                   **kwargs):
                """
                try:
                    payload = jwt.decode(jwt=token, key=salt)
                except jwt.ExpiredSignatureError:
                    raise AuthenticationFailed('签名过期!')
                except jwt.DecodeError:
                    raise AuthenticationFailed("解码失败!")
                except jwt.InvalidTokenError:
                    raise AuthenticationFailed("token校验错误!")
                except Exception as e:
                    raise AuthenticationFailed(str(e))
    
                user = self.authenticate_credentials(payload)            
                return user, token
            raise AuthenticationFailed('请求头中请携带token!')
            
    # 全局使用
    REST_FRAMEWORK = {
        # 认证模块
        'DEFAULT_AUTHENTICATION_CLASSES': (
            'users.app_auth.JWTHeadersAuthentication',
        ),
    }
    
    # 局部使用
    from user.authentications import JWTHeadersAuthentication
    authentication_classes = [JWTHeadersAuthentication]
    

    2. 自定义签发token

    1) 多方式登录,逻辑写在视图类中

    Copy
    # 视图代码
    import re
    from rest_framework.viewsets import ViewSet
    from rest_framework_jwt.utils import jwt_payload_handler
    from rest_framework_jwt.utils import jwt_encode_handler
    from rest_framework.exceptions import ValidationError
    
    from app01.models import User
    
    
    class LoginAPIView(ViewSet):
        """
        继承ViewSet意义:
            1. 修改视图类中方法, 使用login明确提意
            2. 继承了APIView, 具有较高的可控性
        """
        def login(self, request, *args, **kwargs):
            username = request.data.get('username')
            password = request.data.get('password')
    
            # username=egon/111@qq.com/17621839222
            if re.search(r'^1[3-9][0-9]{9}$', username):
                user = User.objects.filter(phone=username).first()
            elif re.search(r'^.*?@.*?qq.com$', username):
                user = User.objects.filter(email=username).first()
            else:
                user = User.objects.filter(username=username).first()
    
            if user:
                is_login = user.check_password(raw_password=password)
                if is_login:
                    # 签发token
                    payload = jwt_payload_handler(user)
                    token = jwt_encode_handler(payload)
                    return Response({'status': 1000, 'token': token, 'results': {'username': user.username, 'email': user.email}})
                raise ('用户密码错误!')
            raise ValidationError("用户名错误!")
    

    2) 多方式登录,逻辑写在序列化类中

    Copy
    # 视图代码
    from rest_framework.viewsets import ViewSet
    
    from app01.models import User
    from .ser import LoginModelSerializer
    
    
    class LoginAPIView(ViewSet):
        """
        继承ViewSet意义:
            1. 修改视图类中方法, 使用login明确提意
            2. 继承了APIView, 具有较高的可控性
        """
        def login(self, request, *args, **kwargs):
            serializer = LoginModelSerializer(data=request.data)
            serializer.is_valid(raise_exception=True)
            token = serializer.context.get('token')
            user = serializer.context.get('user')
            return Response({'status': 1000, 'token': token, 'results': {'username': user.username, 'email': user.email}})
    
    
    # 序列化器代码
    import re
    
    from rest_framework import serializers
    from rest_framework.exceptions import ValidationError
    
    from rest_framework_jwt.utils import jwt_payload_handler
    from rest_framework_jwt.utils import jwt_encode_handler
    
    from app01.models import User
    
    
    class LoginModelSerializer(serializers.ModelSerializer):
        username = serializers.CharField()
        class Meta:
            model = User
            fields = ('username', 'password')
    
        # 实现方式一: 多方式登录,逻辑写在视图类中
        def validate(self, validate_date):
        username = validate_date.get('username')
        password = validate_date.get('password')
        # username=egon/111@qq.com/17621839222
        print('validate_date:', validate_date)
    
        if re.search(r'^1[3-9][0-9]{9}$', username):
            user = User.objects.filter(phone=username).first()
        elif re.search(r'^.*?@.*?qq.com$', username):
            user = User.objects.filter(email=username).first()
        else:
            user = User.objects.filter(username=username).first()
    
        print('user:', user, type(user))
        if user:
            is_login = user.check_password(raw_password=password)
            if is_login:
                # 签发token
                payload = jwt_payload_handler(user)
                token = jwt_encode_handler(payload)
                self.context['token'] = token
                self.context['user'] = user
                return validate_date
            raise ValidationError('用户密码错误!')
        raise ValidationError("用户名错误!")
    

    3) 多方式登录, 使用drf-jwt的解耦合完整版实现

    Copy
    # 路由代码:
    url(r'^login/', views.LoginView.as_view(actions={'post': 'login'})),
    
    # 视图代码:
    from rest_framework.viewsets import ViewSet
    
    from . import ser
    from apps.utils.response import CommonResponse
    
    
    class LoginView(ViewSet):
        authentication_classes = []
    
        def login(self, request, *args, **kwargs):
            """登陆接口"""
            serializer = ser.LoginModelSerializer(data=request.data)
            serializer.is_valid(raise_exception=True)
    
            token = serializer.context.get('token')
            user = serializer.context.get('user')
            username = user.username
            email = user.email
            mobile = user.mobile
            return CommonResponse(token=token, results={'username': username, 'email': email, 'mobile': mobile})
    # 序列化器代码
    import re
    from rest_framework import serializers
    from rest_framework_jwt.utils import jwt_payload_handler
    from rest_framework_jwt.utils import jwt_encode_handler
    
    from rest_framework.exceptions import ValidationError
    
    from . import models
    
    
    class LoginModelSerializer(serializers.ModelSerializer):
        """登陆接口,jwt方式返回token,格式为{status:100,msg:登陆成功,token:safasdfa}"""
    
        username = serializers.CharField()
    
        class Meta:
            model = models.User
            fields = ('username', 'password')
    
        # 使用drf-jwt实现没有解耦合之前
        # def validate(self, validate_data):
        #     print('validate_data:', validate_data)
        #     username = validate_data.get('username')
        #     password = validate_data.get('password')
        #     # 支持多方式登录
        #     if re.search(r'^1[3-9][0-9]{9}$', username):
        #         user = models.User.objects.filter(mobile=username).first()
        #     elif re.search(r'^.*?@.*?.com$', username):
        #         user = models.User.objects.filter(email=username).first()
        #     else:
        #         user = models.User.objects.filter(username=username).first()
        #
        #     if not user.is_delete:
        #         if user:
        #             if user.check_password(raw_password=password):
        #                 # 签发token
        #                 payload = jwt_payload_handler(user)
        #                 token = jwt_encode_handler(payload)
        #                 self.context['token'] = token
        #                 self.context['user'] = user
        #                 return validate_data
        #             else:
        #                 raise ValidationError('用户密码错误!')
        #         raise ValidationError('用户名错误!')
        #     raise ValidationError('该用户已经被管理员注销!')
    
        def verify_username(self, username):
            """校验用户不同登录方式"""
            if re.search(r'^1[3-9][0-9]{9}$', username):
                user = models.User.objects.filter(mobile=username).first()
            elif re.search(r'^.*?@.*?.com$', username):
                user = models.User.objects.filter(email=username).first()
            else:
                user = models.User.objects.filter(username=username).first()
    
            if user:
                return user
            raise ValidationError('用户名错误!')
    
        def verify_password(self, user, password):
            """校验密码"""
            is_succeed = user.check_password(raw_password=password)
            if not is_succeed:
                raise ValidationError('用户密码错误!')
    
        def sign_token(self, user):
            """签发token"""
            payload = jwt_payload_handler(user)
            token = jwt_encode_handler(payload)
            self.context['token'] = token
            self.context['user'] = user
    
        def validate(self, validate_data):
            """校验用户,密码,签发token接口"""
            print('validate_data:', validate_data)
            username = validate_data.get('username')
            password = validate_data.get('password')
            # 校验用户
            user = self.verify_username(username)
            # 校验密码
            self.verify_password(user, password)
    
            # 签发token
            self.sign_token(user)
            return validate_data
    

    4) 多方式登录, 使用pyjwt的解耦合完整版实现

    Copy
    # 路由代码:
    url(r'^login/', views.LoginView.as_view(actions={'post': 'login'})),
    
    # 视图代码:
    from rest_framework.viewsets import ViewSet
    
    from . import ser
    from apps.utils.response import CommonResponse
    
    
    class LoginView(ViewSet):
        authentication_classes = []
    
        def login(self, request, *args, **kwargs):
            """登陆接口"""
            serializer = ser.LoginModelSerializer(data=request.data)
            serializer.is_valid(raise_exception=True)
    
            token = serializer.context.get('token')
            user = serializer.context.get('user')
            username = user.username
            email = user.email
            mobile = user.mobile
            return CommonResponse(token=token, results={'username': username, 'email': email, 'mobile': mobile})
        
        
    # 序列化器代码:
    import re
    import jwt
    import datetime
    from django.conf import settings
    from rest_framework import serializers
    from rest_framework.exceptions import ValidationError
    
    from . import models
    
    
    class LoginModelSerializer(serializers.ModelSerializer):
        username = serializers.CharField()
    
        class Meta:
            model = models.User
            fields = ('username', 'password')
            
        # 使用pyjwt实现没解耦之前
        '''
            def validate(self, attrs):
            """校验用户名, 用户密码, 签发toekn"""
            username = attrs.get('username')
            password = attrs.get('password')
    
            if re.search(r'^1[3-9][0-9]{9}$', username):
                user = models.User.objects.filter(mobile=username).first()
            elif re.search(r'^.*?@.*?.com$', username):
                user = models.User.objects.filter(email=username).first()
            else:
                user = models.User.objects.filter(username=username).first()
            if user:
                is_success = user.check_password(raw_password=password)
                if is_success:
                    # 签发token
                    headers = {
                        'typ': 'jwt',
                        'alg': 'HS256',
                    }
                    payload = {
                        'user_id': user.pk,
                        'username': user.username,
                        'exp': datetime.datetime.utcnow() + datetime.timedelta(seconds=300),
                    }
    
                    slat = settings.SECRET_KEY
                    token = jwt.encode(payload=payload, key=slat, headers=headers)
                    self.context['token'] = token
                    self.context['user'] = user
                    return attrs
                raise ValidationError("用户密码错误!")
            raise ValidationError("用户名错误!")
        '''
        
        def verify_username(self, username):
            """校验多方式用户名"""
            if re.search(r'^1[3-9][0-9]{9}$', username):
                user = models.User.objects.filter(mobile=username).first()
            elif re.search(r'^.*?@.*?.com$', username):
                user = models.User.objects.filter(email=username).first()
            else:
                user = models.User.objects.filter(username=username).first()
            if user:
                return user
            raise ValidationError("用户名错误!")
        
        def verify_password(self, user, password):
            """校验密码"""
            is_success = user.check_password(raw_password=password)
            if not is_success:
                raise ValidationError("用户密码错误!")
            
            
        def sign_token(self, user):   
            """签发token"""
            headers = {
                'typ': 'jwt',
                'alg': 'HS256',
            }
            payload = {
                'user_id': user.pk,
                'username': user.username,
                'exp': datetime.datetime.utcnow() + datetime.timedelta(seconds=300),
            }
    
            slat = settings.SECRET_KEY
            token = jwt.encode(payload=payload, key=slat, headers=headers)
            self.context['token'] = token
            self.context['user'] = user
        
        def validate(self, attrs):
            """校验用户名, 用户密码, 接口"""
            username = attrs.get('username')
            password = attrs.get('password')
            # 校验用户名   
            user = self.verify_username(username)
            # 校验用户密码
            self.verify_password(user, password)
            # 签发token
            self.sign_token(user)
            return attrs
    

    四. jwt的配置参数: 过期时间配置

    Copy
    import datetime
    JWT_AUTH = {
        'JWT_EXPIRATION_DELTA': datetime.timedelta(seconds=10),
    }
    

    五. base64编码与解码

    Copy
    import base64
    import json
    
    # 使用base64进行解密
    base64_str = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9'
    bytes_json = base64.b64decode(base64_str.encode('utf-8'))
    header = json.loads(bytes_json)   # {'typ': 'JWT', 'alg': 'HS256'}
    print(header)
    
    
    # 使用base64进行加密
    json_str = json.dumps(header)
    base64_bytes = base64.b64encode(json_str.encode('utf-8'))
    base64_str = base64_bytes.decode('utf-8')
    print(base64_str)   # eyJ0eXAiOiAiSldUIiwgImFsZyI6ICJIUzI1NiJ9
    
  • 相关阅读:
    flv mime IIS设置
    正则表达式
    MATLAB out of memory
    Cyclic prefix
    Windows Live Writer Test
    zz排序算法的稳定性
    MATLAB的分数和小数
    young矩阵学习
    Python初体验(二)
    利用VS2010调用Excel的宏
  • 原文地址:https://www.cnblogs.com/Tornadoes-Destroy-Parking-Lots/p/13390975.html
Copyright © 2011-2022 走看看