一、什么是Token?
Token是服务端生成的一串字符串,以作客户端进行请求的一个令牌,当第一次登录后,服务器会生成一个Token并将此Token返回给客户端,以后客户端只需带上这个Token前来请求数据即可,无需再次带上用户名和密码。
二、为什么要使用Token?
在很多项目案例中,需要实现账户的功能,客户端所有的功能都基于用户已登陆的前提下才可以使用。这就要求每次客户端像服务器请求数据时都要验证账户是否正确,如果正确则按正常方式返回数据,如果错误则进行拦截并返回错误信息。但是当客户端频繁向服务器请求数据的话,每次服务器都要频繁地查询数据库。而Token正是为了减轻服务器的压力,减少频繁的查询数据库,使服务器更加健壮。并取代传统使用session的方法来进行验证。
三、JWT json-web-token
1.三大组成
1,header
格式为字典,元数据格式如下
{'alg':'HS256', 'typ':'JWT'} #alg代表要使用的 算法 #typ表明该token的类别 - 此处必须为 大写的 JWT
该部分数据需要转成json串并用base64加密
2,payload
格式为字典,此部分分为公有声明和私有声明
公共声明:JWT提供了内置关键字用于描述常见的问题
此部分均为可选项,用户根据自己需求 按需添加key,常见公共声明如下:
{'exp':xxx, # Expiration Time 此token的过期时间的时间戳 'iss':xxx,# (Issuer) Claim 指明此token的签发者 'aud':xxx, #(Audience) Claim 指明此token的 'iat':xxx, # (Issued At) Claim 指明此创建时间的时间戳 'aud':xxx, # (Audience) Claim 指明此token签发面向群体 }
私有声明:用户可根据自己业务需求,添加自定义的key,例如如下:
{'username': 'guoxiaonao'}
公共声明和私有声明均在同一个字典中;转成json串并用base64加密
签名规则如下:
根据header中的alg确定具体算法,以下用 HS256为例
HS256(自定义的key , base64后的header + '.' + base64后的payload)
解释:用自定义的key, 对base64后的header + '.' + base64后的payload进行hmac计算
base64(header) + '.' + base64(payload) + '.' + base64(sign)
最终结果如下: b'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VybmFtZSI6Imd1b3hpYW9uYW8iLCJpc3MiOiJnZ2cifQ.Zzg1u55DCBqPRGf9z3-NAn4kbA-MJN83SxyLFfc5mmM'
3,校验jwt规则
1,解析header, 确认alg
3,获取payload自定义内容
4,pyjwt
方法 | 参数说明 | 返回值 |
---|---|---|
encode(payload, key, algorithm) | payload: jwt三大组成中的payload,需要组成字典,按需添加公有声明和私有声明 例如: {'username': 'guoxiaonao', 'exp': 1562475112} 参数类型: dict | token串 返回类型:bytes |
key : 自定义的加密key 参数类型:str | ||
algorithm: 需要使用的加密算法[HS256, RSA256等] 参数类型:str | ||
decode(token,key,algorithm,) | token: token串 参数类型: bytes/str | payload明文 返回类型:dict |
key : 自定义的加密key ,需要跟encode中的key保持一致 参数类型:str | ||
algorithm: 同encode | ||
issuer: 发布者,若encode payload中添加 'iss' 字段,则可针对该字段校验 参数类型:str | 若iss校验失败,则抛出jwt.InvalidIssuerError | |
audience:签发的受众群体,若encode payload中添加'aud'字段,则可针对该字段校验 参数类型:str | 若aud校验失败,则抛出jwt.InvalidAudienceError |
老师手写的Jwt类,很厉害:
import base64 import copy import hmac import json import time class Jwt(): def __init__(self): pass @staticmethod def encode(payload, key, exp=300): #init header header = {'typ': 'JWT', 'alg': 'HS256'} #separators - 指定序列化后的json串格式, 第一个参数 #指每个键值对之间的连接符号,第二个参数指的是每一个键值对中键和值之间的连接符号 #sort_keys - 将序列化后的字符串进行排序 header_json = json.dumps(header, separators=(',',':'), sort_keys=True) #生成b64 header header_bs = Jwt.b64encode(header_json.encode()) #参数中的 payload {'username': 'aaa'} payload = copy.deepcopy(payload) #添加公有声明 - exp 且值为未来时间戳 payload['exp'] = int(time.time()) + exp payload_json = json.dumps(payload, separators=(',',':'), sort_keys=True) payload_bs = Jwt.b64encode(payload_json.encode()) #签名 #判断传入的key的类型 if isinstance(key, str): key = key.encode() hm = hmac.new(key, header_bs + b'.' + payload_bs, digestmod='SHA256') hm_bs = Jwt.b64encode(hm.digest()) return header_bs + b'.' + payload_bs + b'.' + hm_bs @staticmethod def b64encode(j_s): #替换生成出来的b64串中的占位符 = return base64.urlsafe_b64encode(j_s).replace(b'=',b'') @staticmethod def b64decode(b64_s): rem = len(b64_s) % 4 if rem > 0: b64_s += b'=' * (4-rem) return base64.urlsafe_b64decode(b64_s) @staticmethod def decode(token, key): #校验两次HMAC结果 #检查exp公有声明的有效性 #注意 b64 = 要补全 #校验成功 返回 payload 字典对象, 失败的话 raise header_b , payload_b , sign = token.split(b'.') if isinstance(key, str): key = key.encode() #比较两次HMAC结果 hm = hmac.new(key, header_b + b'.' + payload_b, digestmod='SHA256') if sign != Jwt.b64encode(hm.digest()): raise JwtSignError('---sign error !!! ') #获取payload payload_json = Jwt.b64decode(payload_b) payload = json.loads(payload_json.decode()) #校验exp是否过期 exp = payload['exp'] now = time.time() if now > exp: #过期 raise JwtExpireError('---The token is expire !!!') return payload class JwtSignError(Exception): def __init__(self, error_msg): self.error_msg = error_msg def __str__(self): return '<JwtSignError is %s>'%(self.error_msg) class JwtExpireError(Exception): def __init__(self, error_msg): self.error_msg = error_msg def __str__(self): return '<JwtExpireError is %s>' % (self.error_msg) if __name__ == '__main__': s = Jwt.encode({'username':'guoxiaonao'}, 'abcde') #time.sleep(2) #res = Jwt.decode(s, 'abcde') print(s)