zoukankan      html  css  js  c++  java
  • JWT

    HTTP协议是无状态协议,为了解决这个问题产生了cookie和session技术。

    传统的session-cookie机制

    浏览器发起第一次请求到服务器,服务器发现浏览器没有提供session id,就认为这是第一次请求,会返回一个新的session id给浏览器端。
    浏览器只要不关闭,这个session id就会随着每一次请求重新发给服务器端,服务器端查找这个session id,如果查到,就认为是同一个会话。如果没有查到,就认为是新的请求。
    session是会话级的,可以在一个session中创建很多数据,连接断开session清除,包括session id。
    session还得有过期的机制,一段时间如果没有发起请求,就清除session。浏览器端也会清除响应的cookie信息。
    服务器端保存着大量session信息,很消耗服务器内存,而且如果多服务器部署,还要考虑session共享的问题,比如redis、memcached等方案。
    

    无session方案

    既然服务端就是需要一个ID来表示身份,那么不使用session也可以创建一个ID返回给客户端。
    服务端生成一个标识,并使用某种算法对标识签名。
    服务端收到客户端发来的标识,需要检查签名。
    这种方案的缺点是,加密、解密需要消耗CPU计算资源,无法让浏览器自己主动检查过期的数据以清除。
    这种技术称作JWT(Json WEB Token)
    
    jwt中所有数据都是明文传输的,只是做了base64,如果是敏感信息,请不要使用jwt。
    数据签名的目的不是为了隐藏数据,而是保证数据不被篡改,确认唯一身份。如果数据篡改了,发回到服务端,服务端使用自己的key再计算一遍,然后进行签名校验,一定对不上签名。
    
    认证是jwt最常用的场景,一旦用户登录成功,就会得到token,然后请求中就可以带上这个token。
    服务端校验通过,就可以被允许访问资源。甚至可以在不同域名中传递,在单点登录(Single Sign On)中应用广泛。
    
    PyJWT is a Python library which allows you to encode and decode JSON Web Tokens (JWT).
    

    命令行操作

    pip3.7 install pyjwt
    pyjwt --key 'secret' encode 'some'='payload'
    token=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzb21lIjoicGF5bG9hZCJ9.Joh1R2dYzkRvDkqv3sygm5YyK8Gi4ShZqbhK2gxcs2U
    pyjwt --key 'secret' decode $token
    pyjwt decode --no-verify $token
    

    编码

    import jwt
    
    jwt_encode = jwt.encode({'some': 'payload'}, key='secret', algorithm='HS256')
    print(jwt_encode)
    # b'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzb21lIjoicGF5bG9hZCJ9.Joh1R2dYzkRvDkqv3sygm5YyK8Gi4ShZqbhK2gxcs2U'
    

    TOKEN由三部分拼接而成

    • HEADER: ALGORITHM & TOKEN TYPE
    • PAYLOAD: DATA
    • VERIFY SIGNATURE

    源码实现过程

    1. 先用json.dumps对payload进行json格式转换,并encode()转换成bytes
    2. 将header转换成bytes
    3. 分别使用base64对json_header和payload编码
    4. 将key转换成bytes
    5. 使用加密算法和key生成校验签名
    6. 使用base64对签名编码
    7. 将三部分使用b'.'拼接在一起

    解码

    jwt_decode = jwt.decode(jwt_encode, key='secret', algorithms=['HS256'])
    print(jwt_decode)
    # {'some': 'payload'}
    

    不校验获取数据

    payload_no_verify = jwt.decode(jwt_encode, verify=False)
    print(payload_no_verify)
    # {'some': 'payload'}
    

    不校验获取header

    header_no_verify = jwt.get_unverified_header(jwt_encode)
    print(header_no_verify)
    # {'typ': 'JWT', 'alg': 'HS256'}
    

    过期时间声明

    The “exp” (expiration time) claim identifies the expiration time on or after which the JWT MUST NOT be accepted for processing.
    The processing of the “exp” claim requires that the current date/time MUST be before the expiration date/time listed in the “exp” claim.
    Implementers MAY provide for some small leeway, usually no more than a few minutes, to account for clock skew.
    Its value MUST be a number containing a NumericDate value.
    
    You can pass the expiration time as a UTC UNIX timestamp (an int) or as a datetime, which will be converted into an int.
    Expiration time is automatically verified in jwt.decode() and raises jwt.ExpiredSignatureError if the expiration time is in the past.
    Expiration time will be compared to the current UTC time, so be sure to use a UTC timestamp or datetime.
    
    You can turn off expiration time verification with the verify_exp parameter in the options argument.
    
    import time
    from datetime import datetime, timedelta
    
    d1 = jwt.encode({'exp': int(time.time()) + 5}, key='secret')
    d2 = jwt.encode({'exp': datetime.utcnow() + timedelta(seconds=5)}, key='secret')
    
    # time.sleep(10)
    print(jwt.decode(d1, key='secret'))
    print(jwt.decode(d2, key='secret'))
    # Signature has expired
    

    手动 base64 解码

    from base64 import urlsafe_b64decode
    
    data = {'name': 'tom', 'email': '123@qq.com'}
    key = 'secret'
    token = jwt.encode(payload=data, key=key)
    print(token)
    # b'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJuYW1lIjoidG9tIiwiZW1haWwiOiIxMjNAcXEuY29tIn0.wB5vRj2K2-_ZPMRJIrdRL4HIPpByZuNpgvLTVbNeSV4'
    print(jwt.decode(token, key=key))
    # {'name': 'tom', 'email': '123@qq.com'}
    
    header, payload, signature = token.split(b'.')
    print(header)
    print(payload)
    print(signature)
    # b'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9'
    # b'eyJuYW1lIjoidG9tIiwiZW1haWwiOiIxMjNAcXEuY29tIn0'
    # b'wB5vRj2K2-_ZPMRJIrdRL4HIPpByZuNpgvLTVbNeSV4'
    
    def addequal(b: bytes):
        # 补齐base64编码的等号
        equals = 4 - len(b) % 4
        return b + b'=' * equals
    
    print('header =', urlsafe_b64decode(addequal(header)))
    print('payload =', urlsafe_b64decode(addequal(payload)))
    print('signature =', urlsafe_b64decode(addequal(signature)))
    # header = b'{"typ":"JWT","alg":"HS256"}'
    # payload = b'{"name":"tom","email":"123@qq.com"}'
    # signature = b'xc0x1eoF=x8axdbxefxd9<xc4I"xb7Q/x81xc8>x90rfxe3ix82xf2xd3Uxb3^I^'
    

    实际使用示例

    from django.http import JsonResponse, HttpRequest, HttpResponseBadRequest
    import json
    from .models import User
    import jwt
    import bcrypt
    from django.conf import settings
    import time
    from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
    
    # Create your views here.
    
    key = settings.SECRET_KEY
    print(settings, settings.SECRET_KEY)
    
    expire = 24 * 60 * 60 * 365  # 过期时间
    
    
    def gen_token(user_id):
        data = {
            'user_id': user_id,
            'exp': int(time.time()) + expire,
        }
        return jwt.encode(payload=data, key=key).decode()
    
    
    # 注册
    def reg(request: HttpRequest):
        print(request, type(request))
        # <WSGIRequest: POST '/user/reg/'> <class 'django.core.handlers.wsgi.WSGIRequest'>
        print(request.GET)
        print(request.POST)
        print(request.body)
        # b'{
    	"email": "b@qq.com",
    	"name": "b",
    	"password": "123"
    }'
    
        payload = json.loads(request.body)  # 请求数据在request body中
        print(payload)
    
        try:
            email = payload['email']
            query = User.manager.filter(email=email)
            # <QuerySet []> <class 'django.db.models.query.QuerySet'>
            print(query.query)  # 查看SQL语句
            # SELECT `user`.`id`, `user`.`name`, `user`.`email`, `user`.`password` FROM `user` WHERE `user`.`email` = 1234@qq.com
            if query.first():
                return HttpResponseBadRequest('该邮箱已注册')
    
            name = payload['name']
            password = payload['password']
    
            confirm = payload['confirm']
            print(password, confirm)
            if password != confirm:
                return HttpResponseBadRequest('两次输入密码不一致')
    
            # 添加一个user
            user = User()
            user.email = email
            user.name = name
            user.password = bcrypt.hashpw(password.encode(), bcrypt.gensalt())
            user.save()
    
            return JsonResponse({'user_id': user.id}, status=201)
        except Exception as e:
            return HttpResponseBadRequest(e)
    
    
    # 认证
    def auth(view):
        def wrapper(request: HttpRequest):
            # print(request.META)
            token = request.META.get('HTTP_JWT')
            print(token)
            if not token:
                return HttpResponseBadRequest('请先登陆')
            try:
                data = jwt.decode(token, key=key)
                print(data)
                user_id = data.get('user_id', -1)
                user = User.manager.filter(pk=user_id).get()
                request.user = user
            except jwt.ExpiredSignatureError as e:
                return HttpResponseBadRequest(e)
            except ObjectDoesNotExist:
                return HttpResponseBadRequest("user does't exist")
            except MultipleObjectsReturned:
                return HttpResponseBadRequest("more than one user was found")
    
            ret = view(request)
            return ret
    
        return wrapper
    
    
    # @auth
    def login(request: HttpRequest):
        payload = json.loads(request.body)
        try:
            email = payload['email']
            password = payload['password']
            print(email, password)
    
            user = User.manager.filter(email=email).first()
            # print(user.password)
            matched = bcrypt.checkpw(password.encode(), user.password.encode())
            if user and matched:
                token = gen_token(user.id)
                res = JsonResponse({
                    "user": {
                        "user_id": user.id,
                        "name": user.name,
                        "email": user.email
                    }, "token": token
                })
    
                res.set_cookie('jwt', value=token)
                return res
            else:
                return HttpResponseBadRequest('用户名或密码错误')
                # raise Exception
        except:
            return HttpResponseBadRequest('登录失败')
    

    参考:
    https://jwt.io/
    https://pyjwt.readthedocs.io/en/latest/
    https://github.com/jpadilla/pyjwt
    https://zhuanlan.zhihu.com/p/38942172
    https://auth0.com/learn/json-web-tokens/
    https://developer.atlassian.com/cloud/jira/software/understanding-jwt/
    https://segmentfault.com/a/1190000018058541

  • 相关阅读:
    9.2模拟题解
    NOI1995 石子合并
    NOIP2012 借教室
    织梦内页读取栏目banner图
    mysql数据库版本引发的问题
    简单修改hosts文件加快打开网页速度
    详细剖析电脑hosts文件的作用和修改
    内部标签样式
    织梦让当前的子栏目拥有特殊的样式
    织梦获取单个顶级栏目名
  • 原文地址:https://www.cnblogs.com/keithtt/p/12795770.html
Copyright © 2011-2022 走看看