1、drf-jwt手动签发与校验
2、drf小组件:过滤、筛选、排序、分页 => 针对与群查接口
jwt_token源码分析(入口)
rest_framework_jwt --> views.py --> ObtainJSONWebToken(JSONWebTokenAPIView)
class ObtainJSONWebToken(JSONWebTokenAPIView):
serializer_class = JSONWebTokenSerializer
然后到父类中JSONWebTokenAPIView的post方法
def post(self, request, *args, **kwargs):
#从get_serializer获取serializer
serializer = self.get_serializer(data=request.data)
........
点击get_serializer
def get_serializer(self, *args, **kwargs):
#获取到serializer_class类
serializer_class = self.get_serializer_class()
kwargs['context'] = self.get_serializer_context()
return serializer_class(*args, **kwargs)
回到
def post(self, request, *args, **kwargs):
# 这里就是将数据传入,然后反序列化做校验
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
user = serializer.object.get('user') or request.user
token = serializer.object.get('token')
response_data = jwt_response_payload_handler(token, user, request)
response = Response(response_data)
if api_settings.JWT_AUTH_COOKIE:
expiration = (datetime.utcnow() +
api_settings.JWT_EXPIRATION_DELTA)
response.set_cookie(api_settings.JWT_AUTH_COOKIE,
token,
expires=expiration,
httponly=True)
return response
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
然后找到jwt的资源文件serializers.py中的JSONWebTokenSerializer(Serializer)
class JSONWebTokenSerializer(Serializer):
def __init__(self, *args, **kwargs):
"""
Dynamically add the USERNAME_FIELD to self.fields.
"""
super(JSONWebTokenSerializer, self).__init__(*args, **kwargs)
#username_field是活的,而‘password’是死的
self.fields[self.username_field] = serializers.CharField()
#write_only=True说明只能是反序列化
self.fields['password'] = PasswordField(write_only=True)
点击PasswordField类
class PasswordField(serializers.CharField):
def __init__(self, *args, **kwargs):
if 'style' not in kwargs:
kwargs['style'] = {'input_type': 'password'}
else:
kwargs['style']['input_type'] = 'password'
super(PasswordField, self).__init__(*args, **kwargs)
当我们发向接口发送post请求就要携带username和password字段,不然会报错,因为在反序列化的时候校验的只是这两个字段,所以说这两个字段是必填,然后返回token
value :记得加jwt空格+token
签发token源码分析
#通过账号密码签发token(依赖auth组件的RBAC用户权限六表)
class ObtainJSONWebToken(JSONWebTokenAPIView):
"""
API View that receives a POST with a user's username and password.
Returns a JSON Web Token that can be used for authenticated requests.
"""
serializer_class = JSONWebTokenSerializer
点击JSONWebTokenAPIView,post方法是签发token
def post(self, request, *args, **kwargs):
#传入data进行反序列化校验
serializer = self.get_serializer(data=request.data)
#如果校验成功返回response
if serializer.is_valid():
#从序列化对象object是个字典,这个字典存放着user,token,就是这一步serializer = self.get_serializer(data=request.data)将数据给序列化类,这个类再通过username,password产生了用户并且产生了token,通过这个对象获取已经处理好的user和token
#总结:获取user与签发token都是序列化类完成的
#获取用户
user = serializer.object.get('user') or request.user
#获取token
token = serializer.object.get('token')
#根据token,user,request封装成要被返回的数据
response_data = jwt_response_payload_handler(token, user, request)
#将数据做初始化得到response对象
response = Response(response_data)
if api_settings.JWT_AUTH_COOKIE:
expiration = (datetime.utcnow() +
api_settings.JWT_EXPIRATION_DELTA)
response.set_cookie(api_settings.JWT_AUTH_COOKIE,
token,
expires=expiration,
httponly=True)
#返回response对象
return response
#校验失败返回错误信息
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
查看序列化类资源文件serializers.py中的JSONWebTokenSerializer
class JSONWebTokenSerializer(Serializer):
"""
Serializer class used to validate a username and password.
'username' is identified by the custom UserModel.USERNAME_FIELD.
Returns a JSON Web Token that can be used to authenticate later calls.
"""
def __init__(self, *args, **kwargs):
"""
Dynamically add the USERNAME_FIELD to self.fields.
"""
super(JSONWebTokenSerializer, self).__init__(*args, **kwargs)
self.fields[self.username_field] = serializers.CharField()
self.fields['password'] = PasswordField(write_only=True)
@property
def username_field(self):
return get_username_field()
想要进入到super中在is_valid()之前,也就是serializer这一步进行操作,不能下一步到is_valid()这一步进入就会没有数据,因为是再校验之前做的实例化
class JSONWebTokenSerializer(Serializer):
def __init__(self, *args, **kwargs):
"""
Dynamically add the USERNAME_FIELD to self.fields.
"""
super(JSONWebTokenSerializer, self).__init__(*args, **kwargs)
self.fields[self.username_field] = serializers.CharField()
self.fields['password'] = PasswordField(write_only=True)
@property
def username_field(self):
return get_username_field()
##重点看这个校验函数方法
#全局钩子,就是post方法serializer.is_valid():做校验的时候就会走全局钩子校验
def validate(self, attrs):
#获取用户和密码
credentials = {
self.username_field: attrs.get(self.username_field),
'password': attrs.get('password')
}
if all(credentials.values()):
#调用authenticate,就是将账号密码传入做认证,得到用户
user = authenticate(**credentials)
if user:
#判断用户是否是活跃用户
if not user.is_active:
msg = _('User account is disabled.')
raise serializers.ValidationError(msg)
#根据用户产生payload对象,也就是用户的载荷,可以对外展示的信息,如用户的id,用户名,电话,过期时间等,但是没有密码,如果需要获取到用户的密码可以根据用户id来映射出用户密码
payload = jwt_payload_handler(user)
#返回有user和token的字典
return {
#传入payload产生签名
'token': jwt_encode_handler(payload),
'user': user
}
else:
msg = _('Unable to log in with provided credentials.')
raise serializers.ValidationError(msg)
else:
msg = _('Must include "{username_field}" and "password".')
msg = msg.format(username_field=self.username_field)
raise serializers.ValidationError(msg)
def post(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
#校验通过就说明可以获取到用户和token,所以说user和token在全局钩子里已经产生了,在序列化类中就已经解析出user和password了,所以可以通过序列化对象获取到user和token
user = serializer.object.get('user') or request.user
token = serializer.object.get('token')
response_data = jwt_response_payload_handler(token, user, request)
response = Response(response_data)
if api_settings.JWT_AUTH_COOKIE:
expiration = (datetime.utcnow() +
api_settings.JWT_EXPIRATION_DELTA)
response.set_cookie(api_settings.JWT_AUTH_COOKIE,
token,
expires=expiration,
httponly=True)
return response
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
校验token源码分析
发送请求
urls.py
from rest_framework_jwt.views import ObtainJSONWebToken, obtain_jwt_token
from django.urls import re_path
from api import views
urlpatterns = [
re_path(r'^jogin/$', obtain_jwt_token),
re_path(r'^user/detail/$', views.UserDetail.as_view()),
]
views.py
from rest_framework.views import APIView
from utils.response import APIResponse
# 必须登录后才能访问 - 通过了认证权限组件
from rest_framework.permissions import IsAuthenticated
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
class UserDetail(APIView):
#请求过来走JSONWebTokenAuthentication
authentication_classes = [JSONWebTokenAuthentication] # jwt-token校验request.user
#IsAuthenticated这个认证是判断request.user是否有值
permission_classes = [IsAuthenticated] # 结合权限组件筛选掉游客
def get(self, request, *args, **kwargs):
return APIResponse(results={'username': request.user.username})
走JSONWebTokenAuthentication,这个类没有authenticate认证方法,所以走父类BaseJSONWebTokenAuthentication
class BaseJSONWebTokenAuthentication(BaseAuthentication):
"""
Token based authentication using the JSON Web Token standard.
"""
def authenticate(self, request):
"""
Returns a two-tuple of `User` and token if a valid signature has been
supplied using JWT-based authentication. Otherwise returns `None`.
"""
#传过来的authentication是带着请求头jwt,但是这里获取出来的没有带jwt
#没有认证,代表游客,返回none
jwt_value = self.get_jwt_value(request)
if jwt_value is None:
return None
#进行一通校验
try:
#通过token得到payload
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
#token过期捕获异常
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
#如果校验失败就抛异常
raise exceptions.AuthenticationFailed()
#通过payload来获取用户,点击authenticate_credentials username = jwt_get_username_from_payload(payload)
#如果校验通过拿到user
user = self.authenticate_credentials(payload)
#返回user
return (user, jwt_value)
点击get_jwt_value
点击get_authorization_header()
想要获取到authenticate_credentials就可以views.py视图类中继承BaseJSONWebTokenAuthentication, authenticate_credentials是BaseJSONWebTokenAuthentication的方法
手动签发token
# 1)通过username、password得到user对象
# 2)通过user对象生成payload:jwt_payload_handler(user) => payload
# from rest_framework_jwt.serializers import jwt_payload_handler
# 3)通过payload签发token:jwt_encode_handler(payload) => token
# from rest_framework_jwt.serializers import jwt_encode_handler
签发token的请求 - 登录请求 - ObtainJSONWebToken - post - 将账号密码丢给序列化类处理 - 得到user、token => 序列化类的全局钩子
签发源码小总结:
# 前提:给一个局部禁用了所有 认证与权限 的视图类发送用户信息得到token,其实就是登录接口
# 1)rest_framework_jwt.views.ObtainJSONWebToken 的 父类 JSONWebTokenAPIView 的 post 方法
# 接受有username、password的post请求
# 2)post方法将请求数据交给 rest_framework_jwt.serializer.JSONWebTokenSerializer 处理
# 完成数据的校验,会走序列化类的 全局钩子校验规则,校验得到登录用户并签发token存储在序列化对象中
自己的理解:
(只需要一条路由就可以签发token,视图类都不需要写)
携带json格式的用户密码超url发送post请求,url: obtain_jwt_token也就是ObtainJSONWebToken.as_view()方法 , ObtainJSONWebToken没有as_view(),到父类中JSONWebTokenAPIView中也没有,到JSONWebTokenAPIView的APIView中找到as_view并且返回调用view函数的返回值,view函数中dispatch方法,就是请求分发,所以post请求来到ObtainJSONWebToken没有,到父类JSONWebTokenAPIView找到,只写了这个post方法,所以get方法不行,serializer = self.get_serializer(data=request.data)调用ObtainJSONWebToken的对象调用get_serializer将data进去进行反序列化,get_serializer(data=request.data),get_serializer这个方法传入数据,是调用get_serializer中的 self.get_serializer_class()找到的serializer_class,serializer_class是在ObtainJSONWebToken中找到 serializer_class = JSONWebTokenSerializer,然后将JSONWebTokenSerializer(data=request.data)传入进行反序列操作,所以走JSONWebTokenSerializer的__init__方法,调用父类序列化,然后对传入的账号密码声明,校验走 validate方法,根据用户产生payload对象,也就是用户的载荷,可以对外展示的信息,如用户的id,用户名,电话,过期时间等,但是没有密码,如果需要获取到用户的密码可以根据用户id来映射出用户密码
payload = jwt_payload_handler(user)
#返回有user和token的字典
return {
#传入payload产生签名
'token': jwt_encode_handler(payload),
'user': user
}
核心源码:rest_framework_jwt.serializer.JSONWebTokenSerializer的validate(self, attrs)方法
def validate(self, attrs):
# 账号密码字典
credentials = {
self.username_field: attrs.get(self.username_field),
'password': attrs.get('password')
}
if all(credentials.values()):
# 签发token第1步:用账号密码得到user对象
user = authenticate(**credentials)
if user:
if not user.is_active:
msg = _('User account is disabled.')
raise serializers.ValidationError(msg)
# 签发token第2步:通过user得到payload,payload包含着用户信息与过期时间
payload = jwt_payload_handler(user)
# 在视图类中,可以通过 序列化对象.object.get('user'或者'token') 拿到user和token
return {
# 签发token第3步:通过payload签发出token
'token': jwt_encode_handler(payload),
'user': user
}
else:
msg = _('Unable to log in with provided credentials.')
raise serializers.ValidationError(msg)
else:
msg = _('Must include "{username_field}" and "password".')
msg = msg.format(username_field=self.username_field)
raise serializers.ValidationError(msg)
手动校验token
# 1)从请求头中获取token
# 2)根据token解析出payload:jwt_decode_handler(token) => payloay
# from rest_framework_jwt.authentication import jwt_decode_handler
# 3)根据payload解析出user:self.authenticate_credentials(payload) => user
# 继承drf-jwt的BaseJSONWebTokenAuthentication,拿到父级的authenticate_credentials方法
校验源码小总结
# 前提:访问一个配置了jwt认证规则的视图类,就需要提交认证字符串token,在认证类中完成token的校验
# 1)rest_framework_jwt.authentication.JSONWebTokenAuthentication 的 父类 BaseJSONWebTokenAuthentication 的 authenticate 方法
# 请求头拿认证信息jwt-token => 通过反爬小规则确定有用的token => payload => user
核心源码:rest_framework_jwt.authentication.BaseJSONWebTokenAuthentication的authenticate(self, request)方法
def authenticate(self, request):
"""
Returns a two-tuple of `User` and token if a valid signature has been
supplied using JWT-based authentication. Otherwise returns `None`.
"""
# 带有反爬小规则的获取token:前台必须按 "jwt token字符串" 方式提交
# 校验user第1步:从请求头 HTTP_AUTHORIZATION 中拿token,并提取
jwt_value = self.get_jwt_value(request)
# 游客
if jwt_value is None:
return None
# 校验
try:
# 校验user第2步:token => payload
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
# 校验user第3步:token => payload
user = self.authenticate_credentials(payload)
return (user, jwt_value)
代码详见:
E: en_djangopage