08 jwt源码剖析
JSON Web Tokens,是一种开发的行业标准 RFC 7519 ,用于安全的表示双方之间的声明。目前,jwt广泛应用在系统的用户认证方面,特别是现在前后端分离项目。
1. jwt认证流程
在项目开发中,一般会按照上图所示的过程进行认证,即:用户登录成功之后,服务端给用户浏览器返回一个token,以后用户浏览器要携带token再去向服务端发送请求,服务端校验token的合法性,合法则给用户看数据,否则,返回一些错误信息。
传统token方式和jwt在认证方面有什么差异?
-
传统token方式
用户登录成功后,服务端生成一个随机token给用户,并且在服务端(数据库或缓存)中保存一份token,以后用户再来访问时需携带token,服务端接收到token之后,去数据库或缓存中进行校验token的是否超时、是否合法。
-
jwt方式
用户登录成功后,服务端通过jwt生成一个随机token给用户(服务端无需保留token),以后用户再来访问时需携带token,服务端接收到token之后,通过jwt对token进行校验是否超时、是否合法。
2.jwt创建token
2.1 原理
-
jwt的生成token格式如下,即:由
.
连接的三段字符串组成。eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
-
生成规则如下:
- 第一段HEADER部分,固定包含算法和token类型,对此json进行base64url加密,这就是token的第一段。
{ "alg": "HS256", "typ": "JWT" }
- 第二段PAYLOAD部分,包含一些数据,对此json进行base64url加密,这就是token的第二段
{ "sub": "1234567890", "name": "John Doe", "iat": 1516239022 ... }
- 第三段SIGNATURE部分,把前两段的base密文通过
.
拼接起来,然后对其进行HS256
加密,再然后对hs256
密文进行base64url加密,最终得到token的第三段。
base64url( HMACSHA256( base64UrlEncode(header) + "." + base64UrlEncode(payload), your-256-bit-secret (秘钥加盐) ) )
-
最后将三段字符串通过
.
拼接起来就生成了jwt的token。 -
注意:base64url加密是先做base64加密,然后再将
-
替代+
及_
替代/
。
2.2 jwt校验token
- 一般在认证成功后,把jwt生成的token返回给用户,以后用户再次访问时候需要携带token,此时jwt需要对token进行
超时
及合法性
校验。 - 获取token之后,会按照以下步骤进行校验:
- 将token分割成
header_segment
、payload_segment
、crypto_segment
三部分 - 对第一部分
header_segment
进行base64url解密,得到header
- 对第二部分
payload_segment
进行base64url解密,得到payload
- 对第三部分
crypto_segment
进行base64url解密,得到signature
- 对第三部分
signature
部分数据进行合法性校验- 拼接前两段密文,即:
signing_input
- 从第一段明文中获取加密算法,默认:
HS256
- 使用 算法+盐 对
signing_input
进行加密,将得到的结果和signature
密文进行比较。
- 拼接前两段密文,即:
3. jwt使用
-
安装
pip3 install djangorestframework-jwt
-
setting配置文件
import datetime JWT_AUTH = { "JWT_EXPIRATION_DELTA":datetime.timedelta(minutes=10) }
-
app中注册
INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'api.apps.ApiConfig', 'rest_framework', 'rest_framework_jwt' ]
-
用户登录
from rest_framework.views import APIView from rest_framework.response import Response from api import models class LoginView(APIView): """ 登录接口 """ def post(self,request,*args,**kwargs): # 基于jwt的认证 # 1.去数据库获取用户信息 from rest_framework_jwt.settings import api_settings jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER user = models.UserInfo.objects.filter(**request.data).first() if not user: return Response({'code':1000,'error':'用户名或密码错误'}) payload = jwt_payload_handler(user) token = jwt_encode_handler(payload) return Response({'code':1001,'data':token})
-
用户认证
from rest_framework.views import APIView from rest_framework.response import Response # from rest_framework.throttling import AnonRateThrottle,BaseThrottle class ArticleView(APIView): def get(self,request,*args,**kwargs): # 获取用户提交的token,进行一步一步校验 import jwt from rest_framework import exceptions from rest_framework_jwt.settings import api_settings jwt_decode_handler = api_settings.JWT_DECODE_HANDLER jwt_value = request.query_params.get('token') try: payload = jwt_decode_handler(jwt_value) except jwt.ExpiredSignature: msg = '签名已过期' raise exceptions.AuthenticationFailed(msg) except jwt.DecodeError: msg = '认证失败' raise exceptions.AuthenticationFailed(msg) except jwt.InvalidTokenError: raise exceptions.AuthenticationFailed() print(payload) return Response('文章列表')
4. 源码剖析
-
首先从路由看起
from rest_framework_jwt.views import obtain_jwt_token urlpatterns = [ url(r'^login/', account.LoginView.as_view()), url(r'^jwt/login/',obtain_jwt_token), # ObtainJSONWebToken.as_view() url(r'^article/', article.ArticleView.as_view()), ] # obtain_jwt_token = ObtainJSONWebToken.as_view()
-
ObtainJSONWebToken类
class ObtainJSONWebToken(JSONWebTokenAPIView): serializer_class = JSONWebTokenSerializer
-
JSONWebTokenSerializer类,进行用户认证
from rest_framework_jwt.settings import api_settings jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER class JSONWebTokenSerializer(Serializer): def validate(self, attrs): credentials = { self.username_field: attrs.get(self.username_field), 'password': attrs.get('password') } if all(credentials.values()): user = authenticate(**credentials) if user: payload = jwt_payload_handler(user) return { 'token': jwt_encode_handler(payload), 'user': user }
-
用户认证成功后会将user对象当作参数执行jwt_payload_handler函数
函数内部会将用户id、用户名、以及超时时间放到一个payload的字典中
def jwt_payload_handler(user): username_field = get_username_field() username = get_username(user) payload = { 'user_id': user.pk, 'username': username, 'exp': datetime.utcnow() + api_settings.JWT_EXPIRATION_DELTA } return payload
-
将payload当作参数执行jwt_encode_handler函数
def jwt_encode_handler(payload): key = api_settings.JWT_PRIVATE_KEY or jwt_get_secret_key(payload) return jwt.encode( payload, key, api_settings.JWT_ALGORITHM ).decode('utf-8')
-
encode的方法内部会将包括类型以及加密的算法进行base64加密
def encode(self, payload, # type: Union[Dict, bytes] key, # type: str algorithm='HS256', # type: str headers=None, # type: Optional[Dict] json_encoder=None # type: Optional[Callable] ): json_payload = json.dumps( payload, separators=(',', ':'), cls=json_encoder ).encode('utf-8') return super(PyJWT, self).encode( json_payload, key, algorithm, headers, json_encoder )
-
执行super().encode()方法
将前两段拼接起来进行hs256加密后,再进行base64加密,再将这三段拼接起来
def encode(self, payload, # type: Union[Dict, bytes] key, # type: str algorithm='HS256', # type: str headers=None, # type: Optional[Dict] json_encoder=None # type: Optional[Callable] ): segments = [] # Header header = {'typ': self.header_typ, 'alg': algorithm} json_header = force_bytes( json.dumps( header, separators=(',', ':'), cls=json_encoder ) ) segments.append(base64url_encode(json_header)) segments.append(base64url_encode(payload)) # Segments signing_input = b'.'.join(segments) alg_obj = self._algorithms[algorithm] key = alg_obj.prepare_key(key) signature = alg_obj.sign(signing_input, key) segments.append(base64url_encode(signature)) return b'.'.join(segments)
-
用户下次请求进来,进行验证
class BaseJSONWebTokenAuthentication(BaseAuthentication): def authenticate(self, request): jwt_value = self.get_jwt_value(request) if jwt_value is None: return None try: payload = jwt_decode_handler(jwt_value) except jwt.ExpiredSignature: msg = _('Signature has expired.') raise exceptions.AuthenticationFailed(msg) except jwt.DecodeError: msg = _('Error decoding signature.') raise exceptions.AuthenticationFailed(msg) except jwt.InvalidTokenError: raise exceptions.AuthenticationFailed() user = self.authenticate_credentials(payload) return (user, jwt_value)
总结:
- 请求来时会执行ObtainJSONWebToken类的serializer_class的序列化方法,
- 用户认证成功后会将user对象当作参数执行jwt_payload_handler函数,
- 在这个函数内部会将用户id、用户名、以及超时时间放到一个payload的字典中,
- 接着将payload当作参数执行jwt_encode_handler函数,
- 在encode的方法内部会将包括类型以及加密的算法进行base64加密,
- 将payload进行base64加密,
- 将前两段拼接起来进行hs256加密后,再进行base64加密,再将这三段拼接起来