服务器不要存储token,token交给每一个客户端自己存储,服务器压力小, 服务器存储的是 签发和校验token 两段算法,签发认证的效率高,算法完成各集群服务器同步成本低,路由项目完成集群部署(适应高并发)
格式
jwt token采用三段式:头部.载荷.签名
每一部分都是一个json字典加密形参的字符串
头部和载荷采用的是base64可逆加密(前台后台都可以解密)
签名采用hash256不可逆加密(后台校验采用碰撞校验)
各部分字典的内容:
头部:基础信息 - 公司信息、项目组信息、可逆加密采用的算法
载荷:有用但非私密的信息 - 用户可公开信息、过期时间
签名:头部+载荷+秘钥 不可逆加密后的结果
注:服务器jwt签名加密秘钥一定不能泄露
签发token:固定的头部信息加密.当前的登陆用户与过期时间加密.头部+载荷+秘钥生成不可逆加密
校验token:头部可校验也可以不校验,载荷校验出用户与过期时间,头部+载荷+秘钥完成碰撞检测校验token是否被篡改
drf-jwt插件:https://github.com/jpadilla/django-rest-framework-jwt
安装: pip3 install djangorestframework-jwt
api/urls.py
from django.conf.urls import url from . import views from rest_framework_jwt.views import ObtainJSONWebToken,obtain_jwt_token urlpatterns = [ # url(r'^jogin', ObtainJSONWebToken.as_view()), url(r'^jogin', obtain_jwt_token), url(r'^user/detail/$', views.UserDetail.as_view()), ] # 认证信息,必须在请求头的Authorization 中携带'jwt'后台签发的'token'格式的认证字符串
认证 - 校验token:全局或局部配置drf-jwt的认证类 JSONWebTokenAuthentication
from rest_framework.views import APIView from utils.response import APIResponse # 必须登录后,才能访问, 通过认证权限组件 from rest_framework.permissions import IsAuthenticated from rest_framework_jwt.authentication import JSONWebTokenAuthentication class UserDetail(APIView): authentication_classes = [JSONWebTokenAuthentication] # jwt-token校验request.user permission_classes = [IsAuthenticated] # 结合权限组件筛选掉游客 def get(self, request, *args, **kwargs): return APIResponse(results={'username': request.user.username})
签发token
前提:给一个局部禁用了所有 认证与权限 的视图类发送用户信息得到token,其实就是登录接口
rest_framework_jwt.views.ObtainJSONWebToken 的 父类 JSONWebTokenAPIView 的 post 方法, 接受有username、password的post请求
post方法将请求数据交给 rest_framework_jwt.serializer.JSONWebTokenSerializer 处理,完成数据的校验,会走序列化类的 全局钩子校验规则,校验得到登录用户并签发token存储在序列化对象
核心源码:rest_framework_jwt.serializer.JSONWebTokenSerializer的validate(self, attrs)方法
def validate(self, attrs): # 账号密码字典 credentials = { self.username_field: attrs.get(self.username_field), 'password': attrs.get('password') } if all(credentials.values()): # 签发token第1步:用账号密码得到user对象 user = authenticate(**credentials) if user: if not user.is_active: msg = _('User account is disabled.') raise serializers.ValidationError(msg) # 签发token第2步:通过user得到payload,payload包含着用户信息与过期时间 payload = jwt_payload_handler(user) # 在视图类中,可以通过 序列化对象.object.get('user'或者'token') 拿到user和token return { # 签发token第3步:通过payload签发出token 'token': jwt_encode_handler(payload), 'user': user } else: msg = _('Unable to log in with provided credentials.') raise serializers.ValidationError(msg) else: msg = _('Must include "{username_field}" and "password".') msg = msg.format(username_field=self.username_field) raise serializers.ValidationError(msg)
手动签发token逻辑
通过username、password得到user对象,通过user对象生成payload:jwt_payload_handler(user) => payload, from rest_framework_jwt.serializers import jwt_payload_handler 通过payload签发token:jwt_encode_handler(payload) => token from rest_framework_jwt.serializers import jwt_encode_handler
校验token
前提:访问一个配置了jwt认证规则的视图类,就需要提交认证字符串token,在认证类中完成token的校验, rest_framework_jwt.authentication.JSONWebTokenAuthentication 的 父类 BaseJSONWebTokenAuthentication 的 authenticate 方法, 请求头拿认证信息jwt-token => 通过反爬小规则确定有用的token => payload => user
核心源码:rest_framework_jwt.authentication.BaseJSONWebTokenAuthentication的authenticate(self, request)方法
def authenticate(self, request): """ Returns a two-tuple of `User` and token if a valid signature has been supplied using JWT-based authentication. Otherwise returns `None`. """ # 带有反爬小规则的获取token:前台必须按 "jwt token字符串" 方式提交 # 校验user第1步:从请求头 HTTP_AUTHORIZATION 中拿token,并提取 jwt_value = self.get_jwt_value(request) # 游客 if jwt_value is None: return None # 校验 try: # 校验user第2步:token => payload payload = jwt_decode_handler(jwt_value) except jwt.ExpiredSignature: msg = _('Signature has expired.') raise exceptions.AuthenticationFailed(msg) except jwt.DecodeError: msg = _('Error decoding signature.') raise exceptions.AuthenticationFailed(msg) except jwt.InvalidTokenError: raise exceptions.AuthenticationFailed() # 校验user第3步:token => payload user = self.authenticate_credentials(payload) return (user, jwt_value)
手动校验token逻辑
从请求头中获取token,根据token解析出payload:jwt_decode_handler(token) => payloay, from rest_framework_jwt.authentication import jwt_decode_handler, 根据payload解析出user:self.authenticate_credentials(payload) => user, 继承drf-jwt的BaseJSONWebTokenAuthentication,拿到父级的authenticate_credentials方法
实现多方式登陆签发token
models.py
from django.db import models from django.contrib.auth.models import AbstractUser class User(AbstractUser): mobile = models.CharField(max_length=11, unique=True) class Meta: db_table = 'api_user' verbose_name = '用户表' verbose_name_plural = verbose_name def __str__(self): return self.username
serializers.py
from rest_framework import serializers from . import models import re from rest_framework_jwt.serializers import jwt_payload_handler from rest_framework_jwt.serializers import jwt_encode_handler # 前台提交多种登录信息都采用一个key,所以后台可以自定义反序列化字段进行对应 # 序列化要处理序列化与反序列化,要在fields中绑定的Model类所有使用到的字段 # 区分序列化字段与反序列化字段 read_only | write_only # 在自定义校验规则中,校验数据是否合法,确定登录用户,根据用户签发token # 将登录的用户与签发的token保存在序列化对象中 class UserModelSerializer(serializers.ModelSerializer): # 自定义反序列字段:一定要设置write_only,只参与反序列化,不会与model类字段映射 usr = serializers.CharField(write_only=True) pwd = serializers.CharField(write_only=True) class Meta: model = models.User fields = ['usr', 'pwd', 'username', 'mobile', 'email'] # 系统校验规则 extra_kwargs = { 'username': { 'read_only': True }, 'mobile': { 'read_only': True }, 'email': { 'read_only': True }, } def validate(self, attrs): usr = attrs.get('usr') pwd = attrs.get('pwd') # 多方式登录:各分支处理得到该方式下对应的用户 if re.match(r'.+@.+', usr): user_query = models.User.objects.filter(email=usr) elif re.match(r'1[3-9][0-9]{9}', usr): user_query = models.User.objects.filter(mobile=usr) else: user_query = models.User.objects.filter(username=usr) user_obj = user_query.first() # 签发:得到登录用户,签发token并存储在实例化对象中 if user_obj and user_obj.check_password(pwd): # 签发token,将token存放到 实例化类对象的token 名字中 payload = jwt_payload_handler(user_obj) token = jwt_encode_handler(payload) # 将当前用户与签发的token都保存在序列化对象中 self.user = user_obj self.token = token return attrs raise serializers.ValidationError({'data': '数据有误'})
views.py
from rest_framework.views import APIView from utils.response import APIResponse # 必须登录后,才能访问, 通过认证权限组件 from rest_framework.permissions import IsAuthenticated from rest_framework_jwt.authentication import JSONWebTokenAuthentication from .authentications import JWTAuthentication class UserDetail(APIView): # authentication_classes = [JSONWebTokenAuthentication] authentication_classes = [JWTAuthentication] permission_classes = [IsAuthenticated] def get(self, request, *args, **kwargs): return APIResponse(results={'username': request.user.username})
自定义认证反爬规则的认证类
authentications.py
import jwt from rest_framework_jwt.authentication import BaseJSONWebTokenAuthentication from rest_framework_jwt.authentication import jwt_decode_handler from rest_framework.exceptions import AuthenticationFailed class JWTAuthentication(BaseJSONWebTokenAuthentication): def authenticate(self, request): jwt_token = request.META.get('HTTP_AUTHORIZATION') # 自定义校验规则:auth token jwt token = self.parse_jwt_token(jwt_token) if token is None: return None try: payload = jwt_decode_handler(token) except jwt.ExpiredSignature: raise AuthenticationFailed('token 已过期') except: raise AuthenticationFailed('非法用户') user = self.authenticate_credentials(payload) return (user, token) # 自定义校验检测:auth token jwt,auth为前盐,jwt为后盐 def parse_jwt_token(self, jwt_token): tokens = jwt_token.split() if len(tokens) != 3 or tokens[0].lower() != 'auth' or tokens[2].lower() != 'jwt': return None return tokens[1]
views.py
from utils.response import APIResponse # 必须登录后,才能访问, 通过认证权限组件 from rest_framework.permissions import IsAuthenticated from rest_framework_jwt.authentication import JSONWebTokenAuthentication from .authentications import JWTAuthentication class UserDetail(APIView): # authentication_classes = [JSONWebTokenAuthentication] authentication_classes = [JWTAuthentication] permission_classes = [IsAuthenticated] def get(self, request, *args, **kwargs): return APIResponse(results={'username': request.user.username})
admin使用自定义User表:新增用户密码密文
from . import models from django.contrib.auth.admin import UserAdmin class MyUserAdmin(UserAdmin): add_fieldsets = ( ( None, { 'classes': ('wide',), 'fields': ('username', 'password1', 'password2', 'mobile', 'email'), }) ) admin.site.register(models.User, MyUserAdmin)
群查接口各种筛选组件
urls.py
url(r'^cars/$', views.CarListAPIView.as_view()),
models.py
class Car(models.Model): name = models.CharField(max_length=16, unique=True, verbose_name='车名') price = models.DecimalField(max_digits=10, decimal_places=2, verbose_name='价格') brand = models.CharField(max_length=16, verbose_name='品牌') class Meta: db_table = 'api_car' verbose_name = '汽车表' verbose_name_plural = verbose_name def __str__(self): return self.name
admin.py
admin.site.register(models.Car)
serializers.py
class CarModelSerializer(serializers.ModelSerializer): class Meta: model = models.Car fields = ['name', 'price', 'brand']
views.py
from rest_framework.generics import ListAPIView class CarListAPIView(ListAPIView): queryset = models.Car.objects.all() serializer_class = serializers.CarModelSerializer
drf搜索过滤组件
from rest_framework.generics import ListAPIView # 第一步:drf的SearchFilter - 搜索过滤 from rest_framework.filters import SearchFilter class CarListAPIView(ListAPIView): queryset = models.Car.objects.all() serializer_class = serializers.CarModelSerializer # 第二步:局部配置 过滤类 们(全局配置用DEFAULT_FILTER_BACKENDS) filter_backends = [SearchFilter] # 第三步:SearchFilter过滤类依赖的过滤条件 => 接口:/cars/?search=... search_fields = ['name', 'price'] # eg:/cars/?search=1,name和price中包含1的数据都会被查询出
drf排序过滤组件
from rest_framework.generics import ListAPIView # 第一步:drf的OrderingFilter - 排序过滤 from rest_framework.filters import OrderingFilter class CarListAPIView(ListAPIView): queryset = models.Car.objects.all() serializer_class = serializers.CarModelSerializer # 第二步:局部配置 过滤类 们(全局配置用DEFAULT_FILTER_BACKENDS) filter_backends = [OrderingFilter] # 第三步:OrderingFilter过滤类依赖的过滤条件 => 接口:/cars/?ordering=... ordering_fields = ['pk', 'price'] # eg:/cars/?ordering=-price,pk,先按price降序,如果出现price相同,再按pk升序
drf基础分页组件
pahenations.py
from rest_framework.pagination import PageNumberPagination class MyPageNumberPagination(PageNumberPagination): # ?page=页码 page_query_param = 'page' # ?page=页面 下默认一页显示的条数 page_size = 3 # ?page=页面&page_size=条数 用户自定义一页显示的条数 page_size_query_param = 'page_size' # 用户自定义一页显示的条数最大限制:数值超过5也只显示5条 max_page_size = 5
views.py
from rest_framework.generics import ListAPIView from . import pahenations
class CarListAPIView(ListAPIView): # 如果queryset没有过滤条件,就必须 .all(),不然分页会出问题 queryset = models.Car.objects.all() serializer_class = serializers.CarModelSerializer # 分页组件 - 给视图类配置分页类即可 - 分页类需要自定义,继承drf提供的分页类即可 pagination_class = pagenations.MyPageNumberPagination
drf偏移分页组件
pahenations.py
from rest_framework.pagination import LimitOffsetPagination class MyLimitOffsetPagination(LimitOffsetPagination): # ?offset=从头偏移的条数&limit=要显示的条数 limit_query_param = 'limit' offset_query_param = 'offset' # ?不传offset和limit默认显示前3条,只设置offset就是从偏移位往后再显示3条 default_limit = 3 # ?limit可以自定义一页显示的最大条数 max_limit = 5
views.py
from rest_framework.generics import ListAPIView class CarListAPIView(ListAPIView): # 如果queryset没有过滤条件,就必须 .all(),不然分页会出问题 queryset = models.Car.objects.all() serializer_class = serializers.CarModelSerializer # 分页组件 - 给视图类配置分页类即可 - 分页类需要自定义,继承drf提供的分页类即可 pagination_class = pagenations.MyLimitOffsetPagination
drf游标分页组件
pahenations.py
# 1)如果接口配置了OrderingFilter过滤器,那么url中必须传ordering # 1)如果接口没有配置OrderingFilter过滤器,一定要在分页类中声明ordering按某个字段进行默认排序 from rest_framework.pagination import CursorPagination class MyCursorPagination(CursorPagination): cursor_query_param = 'cursor' page_size = 3 page_size_query_param = 'page_size' max_page_size = 5 ordering = '-pk'
views.py
from rest_framework.generics import ListAPIView class CarListAPIView(ListAPIView): # 如果queryset没有过滤条件,就必须 .all(),不然分页会出问题 queryset = models.Car.objects.all() serializer_class = serializers.CarModelSerializer # 分页组件 - 给视图类配置分页类即可 - 分页类需要自定义,继承drf提供的分页类即可 pagination_class = pagenations.MyCursorPagination
自定义过滤器
filters.py
# 自定义过滤器,接口:?limit=显示的条数 class LimitFilter: def filter_queryset(self, request, queryset, view): # 前台固定用 ?limit=... 传递过滤参数 limit = request.query_params.get('limit') if limit: limit = int(limit) return queryset[:limit] return queryset
views.py
from rest_framework.generics import ListAPIView class CarListAPIView(ListAPIView): # 如果queryset没有过滤条件,就必须 .all(),不然分页会出问题 queryset = models.Car.objects.all() serializer_class = serializers.CarModelSerializer # 局部配置 过滤类 们(全局配置用DEFAULT_FILTER_BACKENDS) filter_backends = [LimitFilter]
过滤器插件:django-filter
安装 pip3 install django-filter
filters.py
from django_filters.rest_framework.filterset import FilterSet from . import models # 自定义过滤字段 from django_filters import filters class CarFilterSet(FilterSet): min_price = filters.NumberFilter(field_name='price', lookup_expr='gte') max_price = filters.NumberFilter(field_name='price', lookup_expr='lte') class Meta: model = models.Car fields = ['brand', 'min_price', 'max_price'] # brand是model中存在的字段,一般都是可以用于分组的字段 # min_price、max_price是自定义字段,需要自己自定义过滤条件
views.py
from django_filters.rest_framework import DjangoFilterBackend from .filters import CarFilterSet class CarListAPIView(ListAPIView): queryset = models.Car.objects.all() serializer_class = serializers.CarModelSerializer
# django-filter过滤器插件使用
filter_backends = [DjangoFilterBackend]