zoukankan      html  css  js  c++  java
  • 签发token、校验token、多方式登录签发token的实现、自定义认证反爬规则的认证类、admin使用自定义User表:新增用户密码密文、群查接口各种筛选组件数据准备、drf搜索过滤组件、drf排序过滤组件、drf基础分页组件

    签发token

    源码入口
    # 前提:给一个局部禁用了所有 认证与权限 的视图类发送用户信息得到token,其实就是登录接口
    
    # 1)rest_framework_jwt.views.ObtainJSONWebToken 的 父类 JSONWebTokenAPIView 的 post 方法
    #        接收有username、password的post请求
    # 2)post方法将请求得到的数据交给 rest_framework_jwt.serializer.JSONWebTokenSerializer 处理
    #        完成数据的校验,会走序列化类的 全局钩子校验规则,校验得到登录用户并签发token存储在序列化对象中

    核心源码:rest_framework_jwt.serializer.JSONWebTokenSerializer的validate(self, attrs)方法

    def validate(self, attrs):
        # 账号密码字典
        credentials = {
            self.username_field: attrs.get(self.username_field),
            'password': attrs.get('password')
        }
        if all(credentials.values()):
            # 签发token第1步:用账号密码得到user对象
            user = authenticate(**credentials)
            if user:
                if not user.is_active:
                    msg = _('User account is disabled.')
                    raise serializers.ValidationError(msg)
                # 签发token第2步:通过user得到payload,payload包含着用户信息与过期时间
                payload = jwt_payload_handler(user)
                # 在视图类中,可以通过 序列化对象.object.get('user'或者'token') 拿到user和token 
                return {
                    # 签发token第3步:通过payload签发出token
                    'token': jwt_encode_handler(payload),
                    'user': user
                }
            else:
                msg = _('Unable to log in with provided credentials.')
                raise serializers.ValidationError(msg)
        else:
            msg = _('Must include "{username_field}" and "password".')
            msg = msg.format(username_field=self.username_field)
            raise serializers.ValidationError(msg)

    完整源码:

    1.
    from rest_framework_jwt.views import ObtainJSONWebToken, obtain_jwt_token
    urlpatterns = [
        # url(r'^jogin/$', ObtainJSONWebToken.as_view()),
        url(r'^jogin/$', obtain_jwt_token),# 入口
    ]
    2.
    # 签发的请求来的时候走父类JSONWebTokenAPIView的post方法.
    class ObtainJSONWebToken(JSONWebTokenAPIView):
        serializer_class = JSONWebTokenSerializer
    3.
    class JSONWebTokenAPIView(APIView):
        # 自动禁用了权限和认证组件的认证.
        permission_classes = ()
        authentication_classes = ()
    
        def get_serializer_context(self):
            return {
                'request': self.request,
                'view': self,
            }
    
        def get_serializer_class(self):
            assert self.serializer_class is not None, (
                "'%s' should either include a `serializer_class` attribute, "
                "or override the `get_serializer_class()` method."
                % self.__class__.__name__)
            return self.serializer_class
    
        def get_serializer(self, *args, **kwargs):
            serializer_class = self.get_serializer_class()
            kwargs['context'] = self.get_serializer_context()
            return serializer_class(*args, **kwargs)
    
        def post(self, request, *args, **kwargs):
            # 从ObtainJSONWebToken类中获取serializer_class = JSONWebTokenSerializer并实例化,走__init__方法.
            serializer = self.get_serializer(data=request.data)
            #调用is_valid()方法.
            if serializer.is_valid():
                user = serializer.object.get('user') or request.user
                token = serializer.object.get('token')
                response_data = jwt_response_payload_handler(token, user, request)
                response = Response(response_data)
                if api_settings.JWT_AUTH_COOKIE:
                    expiration = (datetime.utcnow() +
                                  api_settings.JWT_EXPIRATION_DELTA)
                    response.set_cookie(api_settings.JWT_AUTH_COOKIE,
                                        token,
                                        expires=expiration,
                                        httponly=True)
                return response
    
            return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
    4.
    class JSONWebTokenSerializer(Serializer):
        # __init__方法获取了用户名和密码.
        def __init__(self, *args, **kwargs):
            super(JSONWebTokenSerializer, self).__init__(*args, **kwargs)
    
            self.fields[self.username_field] = serializers.CharField()
            self.fields['password'] = PasswordField(write_only=True)
    
        @property
        def username_field(self):
            return get_username_field()
        # 走全局钩子函数
        def validate(self, attrs):
            credentials = {
                self.username_field: attrs.get(self.username_field),
                'password': attrs.get('password')
            }
            # 进行校验用户名和密码是否正确,依赖RBAC权限六表.
            if all(credentials.values()):
                user = authenticate(**credentials)
    
                if user:
                    if not user.is_active:
                        msg = _('User account is disabled.')
                        raise serializers.ValidationError(msg)
                    # 将user作为参数传入得到载荷.
                    payload = jwt_payload_handler(user)
    
                    return {
                        # 将载荷作为参数传入获取token.
                        'token': jwt_encode_handler(payload),
                        'user': user
                    }
                else:
                    msg = _('Unable to log in with provided credentials.')
                    raise serializers.ValidationError(msg)
            else:
                msg = _('Must include "{username_field}" and "password".')
                msg = msg.format(username_field=self.username_field)
                raise serializers.ValidationError(msg)

    手动签发token逻辑思路:

    # 1)通过username、password得到user对象
    # 2)通过user对象生成payload:jwt_payload_handler(user) => payload
    #        from rest_framework_jwt.serializers import jwt_payload_handler
    # 3)通过payload签发token:jwt_encode_handler(payload) => token
    #        from rest_framework_jwt.serializers import jwt_encode_handler

    校验token

    源码入口
    # 前提:访问一个配置了jwt认证规则的视图类,把需要提交认证字符串token,在认证类中完成token的校验
    
    # 1)rest_framework_jwt.authentication.JSONWebTokenAuthentication 的父类 BaseJSONWebTokenAuthentication 的 authenticate 方法
    #        请求头拿认证信息jwt-token => 通过反爬小规则确定有用的token => payload => user

    核心源码:rest_framework_jwt.authentication.BaseJSONWebTokenAuthentication的authenticate(self, request)方法

    def authenticate(self, request):
        """
        Returns a two-tuple of `User` and token if a valid signature has been
        supplied using JWT-based authentication.  Otherwise returns `None`.
        """
        # 带有反爬小规则的获取token:前台必须按 "jwt token字符串" 方式提交
        # 校验user第1步:从请求头 HTTP_AUTHORIZATION 中拿token,并提取
        jwt_value = self.get_jwt_value(request)
        # 游客
        if jwt_value is None:
            return None
        # 校验
        try:
            # 校验user第2步:token => payload
            payload = jwt_decode_handler(jwt_value)
        except jwt.ExpiredSignature:
            msg = _('Signature has expired.')
            raise exceptions.AuthenticationFailed(msg)
        except jwt.DecodeError:
            msg = _('Error decoding signature.')
            raise exceptions.AuthenticationFailed(msg)
        except jwt.InvalidTokenError:
            raise exceptions.AuthenticationFailed()
        # 校验user第3步:token => payload
        user = self.authenticate_credentials(payload)
    
        return (user, jwt_value)

    完整源码:

    1.
    from .authentications import JWTAuthentication
    class UserDetail(APIView):
        # 使用JSONWebTokenAuthentication认证组件
        authentication_classes = [JSONWebTokenAuthentication]
        def get(self, request, *args, **kwargs):
            return APIResponse(results={'username': request.user.username})
    2.
    # 调用父类BaseJSONWebTokenAuthentication的authenticate方法.
    class JSONWebTokenAuthentication(BaseJSONWebTokenAuthentication):
        www_authenticate_realm = 'api'
        def get_jwt_value(self, request):
            auth = get_authorization_header(request).split()
            auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()
    
            if not auth:
                if api_settings.JWT_AUTH_COOKIE:
                    return request.COOKIES.get(api_settings.JWT_AUTH_COOKIE)
                return None
    
            if smart_text(auth[0].lower()) != auth_header_prefix:
                return None
    
            if len(auth) == 1:
                msg = _('Invalid Authorization header. No credentials provided.')
                raise exceptions.AuthenticationFailed(msg)
            elif len(auth) > 2:
                msg = _('Invalid Authorization header. Credentials string '
                        'should not contain spaces.')
                raise exceptions.AuthenticationFailed(msg)
    
            return auth[1]
    3.
    class BaseJSONWebTokenAuthentication(BaseAuthentication):
        """
        Token based authentication using the JSON Web Token standard.
        """
    
        def authenticate(self, request):
            """
            Returns a two-tuple of `User` and token if a valid signature has been
            supplied using JWT-based authentication.  Otherwise returns `None`.
            """
            # 调用子类JSONWebTokenAuthentication的方法获取jwt_value
            jwt_value = self.get_jwt_value(request)
            if jwt_value is None:
                return None
    
            try:
                # 由token解析出载荷
                payload = jwt_decode_handler(jwt_value)
            except jwt.ExpiredSignature:
                msg = _('Signature has expired.')
                raise exceptions.AuthenticationFailed(msg)
            except jwt.DecodeError:
                msg = _('Error decoding signature.')
                raise exceptions.AuthenticationFailed(msg)
            except jwt.InvalidTokenError:
                raise exceptions.AuthenticationFailed()
             # 由载荷解析出用户
            user = self.authenticate_credentials(payload)
            # 返回用户和token
            return (user, jwt_value)
    
        def authenticate_credentials(self, payload):
            """
            Returns an active user that matches the payload's user id and email.
            """
            User = get_user_model()
            username = jwt_get_username_from_payload(payload)
    
            if not username:
                msg = _('Invalid payload.')
                raise exceptions.AuthenticationFailed(msg)
    
            try:
                user = User.objects.get_by_natural_key(username)
            except User.DoesNotExist:
                msg = _('Invalid signature.')
                raise exceptions.AuthenticationFailed(msg)
    
            if not user.is_active:
                msg = _('User account is disabled.')
                raise exceptions.AuthenticationFailed(msg)
            return user

    手动校验token逻辑思路:

    # 1)从请求头中获取token
    # 2)根据token解析出payload:jwt_decode_handler(token) => payload
    #        from rest_framework_jwt.authentication import jwt_decode_handler
    # 3)根据payload解析出user:self.authenticate_credentials(payload) => user
    #        继承drf-jwt的BaseJSONWebTokenAuthentication,拿到父级的authenticate_credentials方法

    案例:实现多方式登陆签发token

    models.py

    from django.db import models
    
    from django.contrib.auth.models import AbstractUser
    class User(AbstractUser):
        mobile = models.CharField(max_length=11, unique=True)
    
        class Meta:
            db_table = 'api_user'
            verbose_name = '用户表'
            verbose_name_plural = verbose_name
    
        def __str__(self):
            return self.username

    serializers.py

    from rest_framework import serializers
    from . import models
    import re
    
    # 拿到前台token的两个函数: user => payload => token
    # from rest_framework_jwt.settings import api_settings
    # jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
    # jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
    from rest_framework_jwt.serializers import jwt_payload_handler
    from rest_framework_jwt.serializers import jwt_encode_handler
    
    
    # 1) 前台提交多种登录信息都采用一个key,所以后台可以自定义反序列化字段进行对应
    # 2) 序列化类要处理序列化与反序列化,要在fields中设置model绑定的Model类所有使用到的字段
    # 3) 区分序列化字段与反序列化字段 read_only | write_only
    # 4) 在自定义校验规则中(局部钩子、全局钩子)校验数据是否合法、确定登录的用户、根据用户签发token
    # 5) 将登录的用户与签发的token保存在序列化类对象中
    class UserModelSerializer(serializers.ModelSerializer):
        # 自定义反序列字段:一定要设置write_only,只参与反序列化,不会与model类字段映射
        usr = serializers.CharField(write_only=True)
        pwd = serializers.CharField(write_only=True)
        class Meta:
            model = models.User
            fields = ['usr', 'pwd', 'username', 'mobile', 'email']
            # 系统校验规则
            extra_kwargs = {
                'username': {
                    'read_only': True
                },
                'mobile': {
                    'read_only': True
                },
                'email': {
                    'read_only': True
                },
            }
    
        def validate(self, attrs):
            usr = attrs.get('usr')
            pwd = attrs.get('pwd')
    
            # 多方式登录:各分支处理得到该方式下对应的用户
            if re.match(r'.+@.+', usr):
                user_query = models.User.objects.filter(email=usr)
            elif re.match(r'1[3-9][0-9]{9}', usr):
                user_query = models.User.objects.filter(mobile=usr)
            else:
                user_query = models.User.objects.filter(username=usr)
            user_obj = user_query.first()
    
            # 签发:得到登录用户,签发token并存储在实例化对象中
            if user_obj and user_obj.check_password(pwd):
                # 签发token,将token存放到 实例化类对象的token 名字中
                payload = jwt_payload_handler(user_obj)
                token = jwt_encode_handler(payload)
                # 将当前用户与签发的token都保存在序列化对象中
                self.user = user_obj
                self.token = token
                return attrs
    
            raise serializers.ValidationError({'data': '数据有误'})

    views.py

    #实现多方式登陆签发token:账号、手机号、邮箱等登陆
    # 1) 禁用认证与权限组件
    # 2) 拿到前台登录信息,交给序列化类
    # 3) 序列化类校验得到登录用户与token存放在序列化对象中
    # 4) 取出登录用户与token返回给前台
    import re
    from . import serializers, models
    from utils.response import APIResponse
    
    from rest_framework_jwt.serializers import jwt_payload_handler
    from rest_framework_jwt.serializers import jwt_encode_handler
    
    class LoginAPIView(APIView):
        # 1) 禁用认证与权限组件
        authentication_classes = []
        permission_classes = []
        def post(self, request, *args, **kwargs):
            # 2) 拿到前台登录信息,交给序列化类,规则:账号用usr传,密码用pwd传
            user_ser = serializers.UserModelSerializer(data=request.data)
            # 3) 序列化类校验得到登录用户与token存放在序列化对象中
            user_ser.is_valid(raise_exception=True)
            # 4) 取出登录用户与token返回给前台
            return APIResponse(token=user_ser.token, results=serializers.UserModelSerializer(user_ser.user).data)
    
        # "一根筋" 思考方式:所有逻辑都在视图类中处理
        def my_post(self, request, *args, **kwargs):
            usr = request.data.get('usr')
            pwd = request.data.get('pwd')
            if re.match(r'.+@.+', usr):
                user_query = models.User.objects.filter(email=usr)
            elif re.match(r'1[3-9][0-9]{9}', usr):
                user_query = models.User.objects.filter(mobile=usr)
            else:
                user_query = models.User.objects.filter(username=usr)
            user_obj = user_query.first()
            if user_obj and user_obj.check_password(pwd):
                payload = jwt_payload_handler(user_obj)
                token = jwt_encode_handler(payload)
                return APIResponse(results={'username': user_obj.username}, token=token)
            return APIResponse(data_msg='不可控错误')

    案例:自定义认证反爬规则的认证类

    authentications.py

    import jwt
    from rest_framework_jwt.authentication import BaseJSONWebTokenAuthentication
    from rest_framework_jwt.authentication import jwt_decode_handler
    from rest_framework.exceptions import AuthenticationFailed
    class JWTAuthentication(BaseJSONWebTokenAuthentication):
        def authenticate(self, request):
            jwt_token = request.META.get('HTTP_AUTHORIZATION')
    
            # 自定义校验规则:auth token jwt
            token = self.parse_jwt_token(jwt_token)
    
            if token is None:
                return None
    
            try:
                # token => payload
                payload = jwt_decode_handler(token)
            except jwt.ExpiredSignature:
                raise AuthenticationFailed('token已过期')
            except:
                raise AuthenticationFailed('非法用户')
            # payload => user
            user = self.authenticate_credentials(payload)
    
            return (user, token)
    
        # 自定义校验规则:auth token jwt,auth为前盐,jwt为后盐
        def parse_jwt_token(self, jwt_token):
            tokens = jwt_token.split()
            if len(tokens) != 3 or tokens[0].lower() != 'auth' or tokens[2].lower() != 'jwt':
                return None
            return tokens[1]

    views.py

    from rest_framework.views import APIView
    from utils.response import APIResponse
    # 必须登录后才能访问 - 通过了认证权限组件
    from rest_framework.permissions import IsAuthenticated
    # 自定义jwt校验规则
    from .authentications import JWTAuthentication
    class UserDetail(APIView):
        authentication_classes = [JWTAuthentication]
        permission_classes = [IsAuthenticated]
        def get(self, request, *args, **kwargs):
            return APIResponse(results={'username': request.user.username})

    admin使用自定义User表:新增用户密码密文

    from django.contrib import admin
    from . import models
    
    # 自定义User表,admin后台管理,采用密文密码
    from django.contrib.auth.admin import UserAdmin
    
    class MyUserAdmin(UserAdmin):
        add_fieldsets = (
            (None, {
                'classes': ('wide',),
                'fields': ('username', 'password1', 'password2', 'mobile', 'email'),
            }),
        )
    
    admin.site.register(models.User, MyUserAdmin)

    群查接口各种筛选组件数据准备

    models.py

    class Car(models.Model):
        name = models.CharField(max_length=16, unique=True, verbose_name='车名')
        price = models.DecimalField(max_digits=10, decimal_places=2, verbose_name='价格')
        brand = models.CharField(max_length=16, verbose_name='品牌')
    
        class Meta:
            db_table = 'api_car'
            verbose_name = '汽车表'
            verbose_name_plural = verbose_name
    
        def __str__(self):
            return self.name

    admin.py

    admin.site.register(models.Car)

    serializers.py

    class CarModelSerializer(serializers.ModelSerializer):
        class Meta:
            model = models.Car
            fields = ['name', 'price', 'brand']

    views.py

    # Car的群查接口
    from rest_framework.generics import ListAPIView
    
    class CarListAPIView(ListAPIView):
        queryset = models.Car.objects.all()
        serializer_class = serializers.CarModelSerializer

    urls.py

    url(r'^cars/$', views.CarListAPIView.as_view()),

    drf搜索过滤组件

    源码分析:

    1.
    class ListAPIView(mixins.ListModelMixin,
                      GenericAPIView):
        """
        Concrete view for listing a queryset.
        """
        def get(self, request, *args, **kwargs):
            return self.list(request, *args, **kwargs)
    2.
    class ListModelMixin:
        """
        List a queryset.
        """
        def list(self, request, *args, **kwargs):
            queryset = self.filter_queryset(self.get_queryset())
    
            page = self.paginate_queryset(queryset)
            if page is not None:
                serializer = self.get_serializer(page, many=True)
                return self.get_paginated_response(serializer.data)
    
            serializer = self.get_serializer(queryset, many=True)
            return Response(serializer.data)
    3.rest_framework/Generics.py
    class GenericAPIVIew(views.APIView):
        def filter_queryset(self, queryset):
            """
            Given a queryset, filter it with whichever filter backend is in use.
    
            You are unlikely to want to override this method, although you may need
            to call it either from a list view, or from a custom `get_object`
            method if you want to apply the configured filtering backend to the
            default queryset.
            """
            for backend in list(self.filter_backends):
                queryset = backend().filter_queryset(self.request, queryset, self)
            return queryset
    4.
        # The filter backend classes to use for queryset filtering
        filter_backends = api_settings.DEFAULT_FILTER_BACKENDS
    views.py
    from rest_framework.generics import ListAPIView
    
    # 第一步:drf的SearchFilter - 搜索过滤
    from rest_framework.filters import SearchFilter
    
    class CarListAPIView(ListAPIView):
        queryset = models.Car.objects.all()
        serializer_class = serializers.CarModelSerializer
    
        # 第二步:局部配置 过滤类 们(全局配置用DEFAULT_FILTER_BACKENDS)
        filter_backends = [SearchFilter]
    
        # 第三步:SearchFilter过滤类依赖的过滤条件 => 接口:/cars/?search=...
        search_fields = ['name', 'price']
        # eg:/cars/?search=1,name和price中包含1的数据都会被查询出

    drf排序过滤组件

    源码分析:

    # 1.继承ListAPIView的视图类get方法走的是ListModelMixin的List方法。
    class ListModelMixin:
        """
        List a queryset.
        """
        def list(self, request, *args, **kwargs):
            # 调用filter_queryset方法
            queryset = self.filter_queryset(self.get_queryset())
    # 2.调用GenericAPIView中的filter_queryset方法
    class GenericAPIView(views.APIView):
        def filter_queryset(self, queryset):
            # 从你的全局或局部配置中获取过滤类们实例化并调用filter_queryset方法
            for backend in list(self.filter_backends):
                queryset = backend().filter_queryset(self.request, queryset, self)
            return queryset
    # 3.rest_framework.filters.py中的排序组件OrderingFilter
    class OrderingFilter(BaseFilterBackend):
        ordering_param = api_settings.ORDERING_PARAM
        ordering_fields = None
        ordering_title = _('Ordering')
        ordering_description = _('Which field to use when ordering the results.')
        template = 'rest_framework/filters/ordering.html'
        # 调用排序组件的filter_queryset方法
        def filter_queryset(self, request, queryset, view):
            # 调用get_ordering方法获取排序字段
            ordering = self.get_ordering(request, queryset, view)
            if ordering:
                return queryset.order_by(*ordering)
    # 4.get_ordering方法,处理主从排序字段,调用了remove_invalid_fields方法
        def get_ordering(self, request, queryset, view):
            """
            Ordering is set by a comma delimited ?ordering=... query parameter.
            """
            params = request.query_params.get(self.ordering_param)
            if params:
                # 若排序字段多个需要用逗号隔开(主排序字段,从排序字段)
                fields = [param.strip() for param in params.split(',')]
                # 调用remove_invalid_fields方法
                ordering = self.remove_invalid_fields(queryset, fields, view, request)
                if ordering:
                    return ordering
    # 5.调用get_valid_fields方法
        def remove_invalid_fields(self, queryset, fields, view, request):
            # 调用get_valid_fields方法
            valid_fields = [item[0] for item in self.get_valid_fields(queryset, view, {'request': request})]
            return [term for term in fields if term.lstrip('-') in valid_fields and ORDER_PATTERN.match(term)]
    # 6.返回排序字段valid_fields
        def get_valid_fields(self, queryset, view, context={}):
            # 从视图类的ordering_fields字段中获取valid_fields的值
            valid_fields = getattr(view, 'ordering_fields', self.ordering_fields)
    views.py
    from rest_framework.generics import ListAPIView
    
    # 第一步:drf的OrderingFilter - 排序过滤
    from rest_framework.filters import OrderingFilter
    
    class CarListAPIView(ListAPIView):
        queryset = models.Car.objects.all()
        serializer_class = serializers.CarModelSerializer
    
        # 第二步:局部配置 过滤类 们(全局配置用DEFAULT_FILTER_BACKENDS)
        filter_backends = [OrderingFilter]
    
        # 第三步:OrderingFilter过滤类依赖的过滤条件 => 接口:/cars/?ordering=...
        ordering_fields = ['pk', 'price']
        # eg:/cars/?ordering=-price,pk,先按price降序,如果出现price相同,再按pk升序

    drf基础分页组件

    pahenations.py
    from rest_framework.pagination import PageNumberPagination
    
    class MyPageNumberPagination(PageNumberPagination):
        # ?page=页码
        page_query_param = 'page'
        # ?page=页面 下默认一页显示的条数
        page_size = 3
        # ?page=页面&page_size=条数 用户自定义一页显示的条数
        page_size_query_param = 'page_size'
        # 用户自定义一页显示的条数最大限制:数值超过5也只显示5条
        max_page_size = 5
    views.py
    from rest_framework.generics import ListAPIView
    
    class CarListAPIView(ListAPIView):
        # 如果queryset没有过滤条件,就必须 .all(),不然分页会出问题
        queryset = models.Car.objects.all()
        serializer_class = serializers.CarModelSerializer
        
        # 分页组件 - 给视图类配置分页类即可 - 分页类需要自定义,继承drf提供的分页类即可
        pagination_class = pagenations.MyPageNumberPagination
  • 相关阅读:
    2017中国大学生程序设计竞赛
    2017中国大学生程序设计竞赛
    2017中国大学生程序设计竞赛
    2017中国大学生程序设计竞赛
    计算几何之凸包模板
    计算几何之凸包模板
    Kafka知识点汇总
    python 爬虫1 開始,先拿新浪微博開始
    iOS 7的手势滑动返回
    Ctrl+Enter 选中文本提交
  • 原文地址:https://www.cnblogs.com/yangjiaoshou/p/11734816.html
Copyright © 2011-2022 走看看