zoukankan      html  css  js  c++  java
  • PyJWT 详解

    1.首先,我们需要先了解 JWT 的概念,所以我们先看pyjwt的官网

    https://jwt.io/

    2.对于官方 JWT 有两篇博文写的不错分别如下:

    https://blog.csdn.net/qq_15766181/article/details/80707923

    https://blog.csdn.net/u011277123/article/details/78918390

    3.然后我们需要了解 python 的jwt ------- PyJWT

    (1).官网

    https://pyjwt.readthedocs.io/en/latest/

    (2).本文完善的pyjwt demo 来源

    https://www.thatyou.cn/flask-pyjwt-%E5%AE%9E%E7%8E%B0%E5%9F%BA%E4%BA%8Ejson-web-token%E7%9A%84%E7%94%A8%E6%88%B7%E8%AE%A4%E8%AF%81%E6%8E%88%E6%9D%83/

    4.开始学习 3(2) 中的大神的一个很有借鉴意义的demo

    (1).项目整体框架如下

    (2).auth和users中的 __init__.py 文件都为空,本文不涉及这两个文件

    首先,进入项目根目录,执行

    pip install pyjwt

    (3).根目录的__init__.py:

    from flask import Flask, request
    
    
    def create_app(config_filename):
        app = Flask(__name__)
        app.config.from_object(config_filename)
    
        # send CORS headers
        @app.after_request
        def after_request(response):
            response.headers.add('Access-Control-Allow-Origin', '*')
            if request.method == 'OPTIONS':
                response.headers['Access-Control-Allow-Methods'] = 'DELETE, GET, POST, PUT'
                headers = request.headers.get('Access-Control-Request-Headers')
                if headers:
                    response.headers['Access-Control-Allow-Headers'] = headers
            return response
    
        from app.users.model import db
        db.init_app(app)
    
        from app.users.api import init_api
        init_api(app)
    
        return app
    

    (4).auths.py:

    import jwt, datetime, time
    from flask import jsonify
    from app.users.model import Users
    from .. import config
    from .. import common
    from flask_sqlalchemy import SQLAlchemy
    db = SQLAlchemy()
    
    
    class Auth():
        @staticmethod
        def encode_auth_token(user_id):
            # 申请Token,参数为自定义,user_id不必须,此处为以后认证作准备,程序员可以根据情况自定义不同参数
            """
            生成认证Token
            :param user_id: int
            :param login_time: int(timestamp)
            :return: string
            """
            try:
    
                headers = {
                    "typ": "JWT",
                    "alg": "HS256",
                    "user_id": user_id
                }
    
                playload = {
                    "headers": headers,
                    "iss": 'ly',
                    "exp": datetime.datetime.utcnow() + datetime.timedelta(days=0, hours=0, minutes=1, seconds=0),
                    'iat': datetime.datetime.utcnow()
                }
    
                signature = jwt.encode(playload, config.SECRET_KEY, algorithm='HS256')
                return signature
    
            except Exception as e:
                return e
    
            # encode为加密函数,decode为解密函数(HS256)
    
            # JWT官网的三个加密参数为
            # 1.header(type,algorithm)
            #  {
            #  "alg": "HS256",
            #  "typ": "JWT"
            #  }
            # 2.playload(iss,sub,aud,exp,nbf,lat,jti)
            #   iss: jwt签发者
            #   sub: jwt所面向的用户
            #   aud: 接收jwt的一方
            #   exp: jwt的过期时间,这个过期时间必须要大于签发时间
            #   nbf: 定义在什么时间之前,该jwt都是不可用的.
            #   iat: jwt的签发时间
            #   jti: jwt的唯一身份标识,主要用来作为一次性token,从而回避重放攻击。
            # 3.signature
            #
            # jwt的第三部分是一个签证信息,这个签证信息由三部分组成:
            #
            #    header (base64后的)
            #    payload (base64后的)
            #    secret
    
            # PyJwt官网的三个加密参数为
            # jwt.encode(playload, key, algorithm='HS256')
            # playload 同上,key为app的 SECRET_KEY algorithm 为加密算法
    
            # 二者应该都可以用,但我们用的是python的 pyjwt ,那就入乡随俗吧
    
    
        @staticmethod
        def decode_auth_token(auth_token):
            """
            验证Token
            :param auth_token:
            :return: integer|string
            """
            try:
                payload = jwt.decode(auth_token, config.SECRET_KEY, options={'verify_exp': False})
                if payload:
                    return payload
                else:
                    raise jwt.InvalidTokenError
    
            except jwt.ExpiredSignatureError:
                return 'Token过期'
    
            except jwt.InvalidTokenError:
                return '无效Token'
    
        def authenticate(self, username, password):
            """
            用户登录,登录成功返回token,写将登录时间写入数据库;登录失败返回失败原因
            :param password:
            :return: json
            """
            info = Users.query.filter(username == Users.username).first()
            if info is None:
                return jsonify(common.falseReturn('', '找不到用户'))
            else:
                if info.password == password:
                    login_time = int(time.time())
                    info.login_time = login_time
                    db.session.commit()
                    token = self.encode_auth_token(info.id)
                    return jsonify(common.trueReturn(token.decode(), '登录成功'))
                    # return jsonify(common.trueReturn(jwt.decode(token, config.SECRET_KEY, algorithms='HS256'), '登录成功'))
                else:
                    return jsonify(common.falseReturn('', '密码不正确'))
    
        def identify(self, request):
            """
            用户鉴权
            :return: list
            """
            try:
                auth_token = jwt.decode(request.headers.get('Authorization'), config.SECRET_KEY, algorithms='HS256')
                if auth_token:
    
                    if not auth_token or auth_token['headers']['typ'] != 'JWT':
                        result = common.falseReturn('', '请传递正确的验证头信息')
                    else:
                        user = Users.query.filter(Users.id == auth_token['headers']['user_id']).first()
                        if user is None:
                            result = common.falseReturn('', '找不到该用户信息')
                        else:
                            result = common.trueReturn(user.id, '请求成功')
    
                    return result
    
            except jwt.ExpiredSignatureError:
                result = common.falseReturn('Time_Out', 'Token已过期')
                return result
    
            except jwt.InvalidTokenError:
                result = common.falseReturn('Time_Out', '未提供认证Token')
                return result
    
    (5).api.py:
    from flask import jsonify, request
    from app.users.model import Users
    from app.auth.auths import Auth
    from .. import common
    from flask_sqlalchemy import SQLAlchemy
    db = SQLAlchemy()
    
    
    def init_api(app):
        @app.route('/register', methods=['POST'])
        def register():
            """
            用户注册
            :return: json
            """
            email = request.form.get('email')
            username = request.form.get('username')
            password = request.form.get('password')
            user = Users(id=None, username=username, password=password, email=email)
            db.session.add(user)
            db.session.commit()
    
            info = Users.query.filter(Users.username == username).first()
            if info:
                data = {
                    'id': info.id,
                    'username': info.username,
                    'email': info.email,
                    'login_time': info.login_time
                }
                return jsonify(common.trueReturn(data, "用户注册成功"))
            else:
                return jsonify(common.falseReturn('you are the error', '用户注册失败'))
    
        @app.route('/login', methods=['POST'])
        def login():
            """
            用户登录
            :return: json
            """
            username = request.form.get('username')
            password = request.form.get('password')
            if not username or not password:
                return jsonify(common.falseReturn('', '用户名和密码不能为空'))
            else:
                return Auth.authenticate(Auth, username, password)
    
        @app.route('/user', methods=['GET'])
        def get():
            """
            获取用户信息
            :return: json
            """
            result = Auth.identify(Auth, request)
            if result['status'] and result['data']:
                user = Users.query.filter(Users.id == result['data']).first()
                data = {
                    'id': user.id,
                    'username': user.username,
                    'email': user.email,
                    'login_time': user.login_time
                }
                result = common.trueReturn(data, "请求成功")
            return jsonify(result)
    

    (6).model.py:

    from flask_sqlalchemy import SQLAlchemy
    db = SQLAlchemy()
    
    
    class Users(db.Model):
        id = db.Column(db.Integer, primary_key=True)
        email = db.Column(db.String(250),  unique=True, nullable=False)
        username = db.Column(db.String(250),  unique=True, nullable=False)
        password = db.Column(db.String(250))
        login_time = db.Column(db.Integer)
    
        def __init__(self, id, username, password, email):
            self.id = id
            self.username = username
            self.password = password
            self.email = email
    
        def __str__(self):
            return "Users(id='%s')" % self.id
    
        def set_password(self, password):
            self.password = password
            db.session.commit()
    
        def check_password(self, hash, password):
            if password == self.password:
                return True
            else:
                return False
    
        def get(self, id):
            return self.query.filter_by(id=id).first()
    
        def add(self, user):
            db.session.add(user)
            return session_commit()
    
        def update(self):
            return session_commit()
    
    
    def session_commit():
        db.session.commit()
    
    
    (7).common.py:
    def trueReturn(data, msg):
        return {
            "status": True,
            "data": data,
            "msg": msg
        }
    
    
    def falseReturn(data, msg):
        return {
            "status": False,
            "data": data,
            "msg": msg
        }
    
    (8).config.py:
    DB_USER = 'root'
    DB_PASSWORD = '005'
    DB_HOST = 'localhost'
    DB_DB = 'jwt_test'
    
    DEBUG = True
    PORT = 3306
    HOST = "127.0.0.1"
    SECRET_KEY = "lanyue"
    
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://' + DB_USER + ':' + DB_PASSWORD + '@' + DB_HOST + '/' + DB_DB
    

    (9).db.py:

    from flask_script import Manager
    from flask_migrate import Migrate, MigrateCommand
    from run import app
    
    app.config.from_object('app.config')
    
    from app.users.model import db
    db.init_app(app)
    
    migrate = Migrate(app, db)
    manager = Manager(app)
    manager.add_command('db', MigrateCommand)
    
    if __name__ == '__main__':
        manager.run()
    

    (10).run.py:

    from app import create_app
    
    app = create_app('app.config')
    
    if __name__ == '__main__':
        app.run(host=app.config['HOST'], port=app.config['PORT'], debug=app.config['DEBUG'])
    
    

    5.调试详解:

    (1).首先,我们下载火狐浏览器的  RESTClient  插件,应为这样做可以省去前台的代码(包括向后台的get post)

    (2).然后,运行项目,打开火狐浏览器的 RESTClient 插件

    我们先熟悉一下这个插件的使用方法

    这个插件中有一个参数挺重要的,推荐给大家

    https://blog.csdn.net/wh_xmy/article/details/70873370

    (3).打开插件

    请求方法指的是我们获取数据的方式,一般只用到 get post

    网址指的是我们将数据提交的路由网址

    身份认证我也不清楚,本文可以不用

    表头就是设置数据类型和发送的各种状态的

    正文是提交的数据,如果是表单数据需要用到正文右上角的那个工具(本例就是使用的它)

    HTTP响应就是指我们针对网址发送的数据经由服务器根据我们指定的参数执行完后返回的数据和状态

    因为我们先要注册用户所以用到我们的register路由(每种语言叫法不一样),点击增加HTTP头字段,key值选Content-Type,value值根据图中选好,填好网址后,点击正文右上角的那个工具出来如下界面,根据图填好内容

    点击确定,点击发送出来如下界面,表示我们注册成功:

    接下来我们验证登录

    还是POST,把register改为login,表中数据不变,如下图:

    点击发送按钮,出现如下提示表明我们成功:

    把data字段对应的值复制下来,准备测试/user路由用

    选择GET方法,点击增加请求头,key值为Authorization,value值为刚才复制的内容,login改为user,点击发送按钮,出来如下内容,表示我们成功,但是为什么提示Token过期啊,因为我们设置了Token的有效期为一分钟,聪明的你们能不能动作快点,来使他访问成功呢

    最后,demo下载地址为:

    https://download.csdn.net/download/itlanyue/10625344

    下载完成后,使用 tar(bzip2) 解压缩即可

    作者:蓝月

    -------------------------------------------

    个性签名:能我之人何其多,戒骄戒躁,脚踏实地地走好每一步

  • 相关阅读:
    js处理富文本编辑器转义、去除转义、去除HTML标签
    web sec tools / Arachni
    OS + Linux IP / Inside IP / Outside IP
    OS + Linux SMB / Samba
    OS + Linux NTP Server
    my live / PC NAS / weiliantong QNAP TS-532X 4.5.1.1495 / cipanzhenlie / raid / Synology
    summarise() regrouping output 警告
    binary_crossentropy和BinaryCrossentropy的区别
    损失函数BinaryCrossentropy例子说明
    理解功能强大的sed替换命令
  • 原文地址:https://www.cnblogs.com/viplanyue/p/12700607.html
Copyright © 2011-2022 走看看