zoukankan      html  css  js  c++  java
  • Flask进阶编程

    flask restful

    1.flask与django对比

    • flask与django开发效率对比

      意义不大,当把常用插件都安装到flask那么flask与django基本差不多,只是flask没有配置插件,用户可以自定义选择相应插件,而这些插件为了帮助我们减少重复代码,方便开发。
      
    • django与flask谁更优秀

      gitHub Starts 数量几乎相同
      主流框架就是这两个
      优秀程度来说没有意义
      
    • 上手速度

      django什么都集成好了
      flask根据自己所需进行配置
      各有优劣。
      
    • 哪个框架更适合做大型项目

      大型项目都是跨语言,跨框架。这也没有意义
      
    • flask轻量级,django重量级,就像flask像小姐姐,django像御姐

    • 如果真的要比较和选择

      接受不了框架的数据,那么选flask
      
      想要完成功能,那么用django
      
    • 自己遇到问题debug调试

    2.必备环境与软件

    1.Python3.6 (虚拟环境pipenv)
    2.Pycharm(开发工具)
    3.mysql(数据库)
    4.Navicat(数据库可视化工具)
    5.PostMan(API测试工具)
    

    3.flask版本

    • 使用flask 1.0版本
    1.0 版本相比以往0.x版本,主要体现在代码重构,机制重写,废弃2.6与3.3
    

    4.项目目录搭建

    • 项目基本目录

    • 代码:

      • ginger.py
      from app.app import create_app
      # 调用app create_app方法,获得app然后启动:
      app = create_app()
      
      if __name__ == '__main__':
          app.run(debug=True)
      
      • libs/redprint.py
      class Redprint:
          """
          复写蓝图方法,用于注册函数
          """
          def __init__(self,name):
              self.name = name
              self.mound = []
          def route(self, rule, **options):
          	# 定义路由
              def decorator(f):
                  # f要执行的函数,rule为路由。
                  self.mound.append((f, rule, options))
                  return f
              return decorator
          def register(self,bp,url_prefix=None):
              if url_prefix is None:
                  # 判断如果没有url_prefix,指定它的name为路由前缀
                  url_prefix = '/' + self.name
              # 实现视图函数向蓝图注册
              for f, rule, options in self.mound:
                  # 视图中有endpoint取endpoint,没有就取视图名字作为endpoint
                  endpoint = options.pop("endpoint", f.__name__)
                  # 添加路由
                  bp.add_url_rule(url_prefix + rule, endpoint, f, **options)
      
      • app/api/v1/__init__.py
      from flask import Blueprint
      from app.api.v1 import user, book
      def create_blueprint_v1():
          bp_v1 = Blueprint('v1',__name__)
          #将红图注册到蓝图中
          # user.api.register(bp_v1,url_prefix='/user')
          # book.api.register(bp_v1,url_prefix='/book')
          user.api.register(bp_v1)
          book.api.register(bp_v1)
          return bp_v1
      
      • 业务代码:book.py
      from app.libs.redprint import Redprint
      # 用Redprint 注册函数
      api = Redprint('book')
      @api.route('',methods=['GET'])
      def get_book():
          return 'I am book'
      
      @api.route('',methods=['POST'])
      def create_book():
          return 'create book'
      
    • 项目结构思路

    首先自己注册Redprint,然后再每个分文件中都实例化一个Redprint对象,然后把Redprint注册到蓝图上,再把蓝图注册到flask核心对象中
    
    • 整个反向实现过程思路
    首先在业务层面代码:实例化一个Redprint,用Redprint注册函数。
    
    然后再 业务层面__init__里创建一个蓝图,把Redprint注册到Blueprint中,并且通过url_prefix增加一个url前缀
    
    在app.py中,再一次把蓝图注册flask核心对象中,并且通过url_prefix增加一个url前缀
    

    5.restful

    • 又称表述性状态转移,一种架构设计风格。将资源(如上面book,user),使用url定位资源,使用HTTP操作资源(GET,POST,PUT,DELETE)。

    6.客户端:

    • 比如客户注册,除了传统意义上的用户,你应该考虑到第三方要调你的接口,APP,小程序等都可以要调用你的接口,所以你的设计不仅仅是用户。当然注册形式也是多样化的短信,邮件,QQ,微信等等。所以你要设计好的代码结构,这样让你的代码不会特别杂乱。

    7.用户注册

    • 功能实现演示:

    • 提交数据首先进行表单验证,这里使用WTForm,因表单提交一般用于网页中,json数据提交一般用于移动端。那么我们提交表单,如何json传入什么参数,如何通过表单进行校验。

      from app.libs.enums import ClientTypeEnum
      from app.libs.error_code import Success
      from app.libs.redprint import Redprint
      from app.validators.forms import ClientForm, UserEmailForm
      from models.user import User
      
      api = Redprint('client')
      @api.route('/register', methods=['POST'])
      def create_create():
          # validate_for_api这里封装验证表单是否正确,不正确会自动抛出异常
          # 第一次验证,验证账号密码合法性
          form = ClientForm().validate_for_api()
          # if form.validate():
          # 如果新增其他注册类型如微信,公众号,QQ等,只需在promise定义枚举。
          # 再定义相应的函数像:__register_user_by_email即可
          # promise 为一个字典,key=枚举中键,value=相应验证的函数。
          # ClientTypeEnum 标识不同类型注册的枚举
          # 你也可以在promise定义其他类型注册,下面再定义相应方法。这里只用email进行演示
          promise = {
              ClientTypeEnum.USER_EMAIL: __register_user_by_email,
          }
          # 从form中取到用户是哪个类型注册,这里以email注册为例
          promise[form.type.data]()
          # # 引发一个异常,咱么可以自定义一个异常,此引发错误时html错误。
          # # 那么错误异常信息如何返回json格式,自定义错误异常json信息
          """
              有些异常我们时可以预知:已知异常 通过APIException
              有些异常我们时无法预知:未知异常
              # 通过AOP处理未知异常。通过装饰器捕捉未知异常
          """
          # 最终:我们可以接收定义时的复杂,但不能接受调用时候的复杂
          # 因为定义时候只定义一次。
          return Success()
      
      def __register_user_by_email():
          # 必须从form中拿到参数,因为form中数据是经过校验器校验
          # UserEmailForm,这里如果用户用email注册需要对email进行校验。
          # 第二次验证,验证email注册用户的email
          form = UserEmailForm().validate_for_api()
          User.reqister_by_email(form.nickname.data,
                                 form.account.data,
                                 form.secret.data)
      
      # 注意: 此时validate_for_api 为自定义的BaseForm中方法用于验证表单是否正确。不正确会抛出异常
      

    8.form表单验证

    • 在validators中定义一个j基类BaseForm,默认继承Form
    from flask import request
    from wtforms import Form
    # ParameterException 为自定义返回错误信息,后面会详细叙述
    from app.libs.error_code import ParameterException
    
    class BaseForm(Form):
        """
        重写form,让验证form验证继承它
        """
        def __init__(self):
            # 在基类中实例化时,获取到请求的数据,并转换json格式
            data = request.json
            # 继承父类的实例化方法,将请求数据传入form表单中
            super(BaseForm, self).__init__(data=data)
        # 这里定义的是一个全局的form表单验证,用于验证是否满足表单验证,不满足抛出异常,此时self为form对象
        def validate_for_api(self):
            valid = super(BaseForm, self).validate()
            print(valid)
            if not valid:
                raise ParameterException(msg=self.errors)
            # 将form返回
            return self
    
    • 其他form表单验证。其他表单的验证,都要直接间接继承父类
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    """
    # Author Xu Junkai
    # coding=utf-8
    # @Time    : 2020/2/25 16:20
    # @Site    :
    # @File    : forms.py
    # @Software: PyCharm
    """
    from wtforms import StringField, IntegerField
    from wtforms.validators import DataRequired, length, Email, Regexp, ValidationError
    
    from app.libs.enums import ClientTypeEnum
    from models.user import User
    # 此处继承BaseForm
    from app.validators.baseform import BaseForm as Form
    
    class ClientForm(Form):
        # 定义账号验证规则
        account = StringField(validators=[DataRequired(message='没有输入账号'), length(
            min=5, max=32,message='密码长度要求6-32'
        )])
        # 定义密码验证规则,此处没有定义
        secret = StringField()
        # 定义用户是通过哪种方式注册的验证(email,qq...)这里注意我们采用是枚举方式定义注册类型,见下面代码
        type = IntegerField(validators=[DataRequired(message='没有输入类型'),])
        def validate_type(self,value):
            # 定义传入注册类型,必须是我们指定的类型,不是的话抛出异常
            try:
                client = ClientTypeEnum(value.data)
            except ValueError as e:
                raise e
            # 将枚举类型赋值给type
            self.type.data = client
    
    class UserEmailForm(ClientForm):
        # 默认继承ClientForm,通过email注册的用户需要对email进行验证
        # 通过email注册方式,进行校验。这样减少重复代码编写
        account = StringField(
            validators=[Email(message='invalidate email')]
        )
        secret = StringField(validators=[
            DataRequired(),
            Regexp(r'^[A-Za-z0-9_*&$#@]{6,22}$')
        ])
        nickname = StringField(validators=[DataRequired(),
                                           length(min=2, max=22)])
        def validate_account(self,value):
            # 重复验证,从数据库查询有没有注册,有注册,引发一个wtf错误。
            if User.query.filter_by(email=value.data).first():
                raise ValidationError()
    
    

    9.枚举的应用

    • 以往验证表单我们都是通过choice验证,通过枚举方式进行验证:libs/enums.py
    from enum import Enum
    class ClientTypeEnum(Enum):
        """
        继承Enum,定义一个枚举,用于不同形式客户端注册,(微信,EMAIL,QQ...)
        """
        # email注册
        USER_EMAIL = 100
        # 手机注册
        USER_MOBILE = 1001
        # 微信小程序
        USER_MINA = 200
        # 微信公众号
        USER_WX = 201
    

    10.异常捕捉--->自定义

    • libs/error.py定义一个基类,用于异常返回相应格式的异常信息,类似如下:
    {
        "msg": "sorry, we make a mistake (~~ _ ~~)",
        "error_code": 999,
        "request": "POST /v1/client/register"
    }
    
    • 自定义,基类异常处理,都是默认配置,让其返回数据结构为json
    import json
    from flask import request
    from werkzeug.exceptions import HTTPException
    class APIException(HTTPException):
        """
        本身继承wekzeug的HTTPException
        要求返回错误信息格式:
        {
            "msg": "sorry, we make a mistake (~~ _ ~~)",
            "error_code": 999,
            "request": "POST /v1/client/register"
        }
        """
        # 默认code,msg,error_code  code为返回服务端状态码
        code = 500
        msg = 'sorry, we make a mistake (~~ _ ~~)'
        error_code = 999
        def __init__(self, msg=None, code=None, error_code=None, headers=None):
            if code:
                self.code = code
            if error_code:
                self.error_code = error_code
            if msg:
                self.msg = msg
            super(APIException,self).__init__(msg, None)
    
        def get_body(self, environ=None):
            """
            封装json格式,将数据转为我们所需要格式
            :param environ:
            :return:
            """
            body = dict(
                msg = self.msg,
                error_code = self.error_code,
                request = request.method + " " +self.get_url_no_param()
            )
            text = json.dumps(body)
            return text
        # 定义返回的Content-Type类型
        def get_headers(self, environ=None):
            """
            定义我们要返回数据类型为json
            :param environ: 
            :return: 
            """
            return [("Content-Type", "application/json")]
        @staticmethod
        def get_url_no_param():
            # 返回发生异常的url进行处理,去除 ? 后面的内容
            full_path = str(request.full_path)
            main_path = full_path.split("?")
            return main_path[0]
    
    
    • libs/error_code.py 自定义返回数据:错误码,信息,或是成功状态码...
    from app.libs.error import APIException
    # 400 请求参数错误
    # 401 未授权
    # 403 禁止访问
    # 404 没有找到页面
    # 500 服务器产生未知错误
    # 200 查询成功
    # 201 创建或更新成功
    # 204 删除成功
    # 301/302 重定向
    class Success(APIException):
        """
        返回成功数据
        """
        code = 201
        msg = "ok"
        error_code = 0
    
    class ServerError(APIException):
        """定义python中最原始的错误"""
        code = 500
        msg = "sorry, we made a mistake.(~__~!)"
        error_code = 999
    
    
    class ClientTypeError(APIException):
        code = 400
        msg = 'client is invalid'
        error_code = 1006
    
    class ParameterException(APIException):
        # 定义通用异常提示,APIException定义返回异常的格式
        code = 400
        msg = 'invalid parameter'
        error_code = 1000
    
    class NotFound(APIException):
        code = 404
        msg = "the resource ar not found!"
        error_code = 1001
    
    class AuthFailed(APIException):
        code = 401
        error_code = 1005
        msg = "authorization failed"
    
    
    • 未知错误异常捕捉,前面考虑到有些异常是我们可以捕捉的,但是有些未知异常我们是无法预测的。那么怎么办?
    • 这里再主程序入口文件,定义全局捕捉异常,通过装饰器方式
    # 捕捉未知异常 只有在 flask 1.0版本及以上,可以捕捉通用异常
    @app.errorhandler(Exception)
    def framework_error(e):
        # 有可能是APIExcepiton
        # 有可能是HTTPException
        # 有可能是Excepiton  python中最原始的错误
        if isinstance(e,APIException):
            return e
        if isinstance(e,HTTPException):
            code = e.code
            msg = e.description
            error_code = 1007
            return APIException(msg, code, error_code)
        else:
            #这里进行判断,如果是调试模式,返回详细信息(因为开发节点需要看到详细信息)
            if not app.config['DEBUG']:
                # 当然此处可以进行日志记录
                # python中最原始的错误 就返回默认未知错误
                return ServerError
            else:
                raise e
    
    

    11.数据库连接

    • 这里我们重写Query替换BaseQuery:,并自定义一个基本model.
    from datetime import datetime
    from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy, BaseQuery
    from sqlalchemy import Column, Integer, SmallInteger
    from contextlib import contextmanager
    
    from app.libs.error_code import NotFound
    
    
    class SQLAlchemy(_SQLAlchemy):
        # 创建一个上下文管理器,可以用with语法
        @contextmanager
        def auto_commit(self):
            # 异常就回滚数据,并抛出异常
            try:
                yield
                self.session.commit()
            except Exception as e:
                db.session.rollback()
                raise e
    class Query(BaseQuery):
        """
        覆盖查询类 重写filter_by方法
        """
        def filter_by(self,**kwargs):
            # 为每个model中加入status,只有等于1才会被找到
            if "status" not in kwargs.keys():
                kwargs['status'] = 1
            return super(Query,self).filter_by(**kwargs)
        def get_or_404(self, ident, description=None):
            # 重写get_or_404方法,没有找到触发自己定义异常。
            # 因原本get_or_404异常返回时一个html页面,但是我们想返回是一个json
            rv = self.get(ident)
            if not rv:
                raise NotFound()
            return rv
        def first_or_404(self, description=None):
            # 同理
            rv = self.first()
            if not rv:
                raise NotFound()
            return rv
    # query_class指定自定义的Query,实例化一个db对象
    db = SQLAlchemy(query_class=Query)
    
    
    class Base(db.Model):
        """
        模型的基类,为所有模型添加create_time,status属性
        为方便好用:添加了模型公共方法,比如删除一个模型
        """
        __abstract__ = True
        create_time = Column(Integer)
        status = Column(SmallInteger,default=1)
        def __init__(self):
            # 实例化定义创建时间
            self.create_time = int(datetime.now().timestamp())
        @property
        def create_datetime(self):
            if self.create_time:
                return datetime.fromtimestamp(self.create_time)
            else:
                return None
        def set_attrs(self, attrs_dict):
            for key,value in attrs_dict.items():
                if hasattr(self, key) and key != 'id':
                    setattr(self, key, value)
        def delete(self):
            self.status = 0
    
    

    12.model定义User表

    from sqlalchemy import Column, Integer, String, SmallInteger
    from werkzeug.security import generate_password_hash, check_password_hash
    
    from app.libs.error_code import NotFound, AuthFailed
    from models.base import Base, db
    
    
    class User(Base):
        # 定义字段名
        id = Column(Integer, primary_key=True)
        email = Column(String(24), unique=True,nullable=False)
        nickname = Column(String(24), unique=True)
        auth = Column(SmallInteger, default=1)
        _password = Column('password', String(100))
        @property
        def password(self):
            return self._password
        @password.setter
        def password(self,raw):
            # 将原始密码加密
            self._password = generate_password_hash(raw)
    
        # 注册代码,在一个对象下面创建对象本身,这样不是合理的,但是如果定义静态方法,是可行的
        @staticmethod
        def reqister_by_email(nickname, account, secret):
            # email注册数据提交数据库
            # 因之前创建db指定定义auto_commit对加上上下文管理器装饰器,所以这里可以用with语法
            with db.auto_commit():
                user = User()
                user.nickname = nickname
                user.email = account
                user.password = secret
                # 插入数据库中
                db.session.add(user)
        @staticmethod
        def verify(email, password):
            """
            主要做登陆的验证
            """
            # 验证用户是否存在
            user = User.query.filter_by(email=email).first_or_404()
            # 通过方法check_password,验证密码是否正确
            if not user.check_password(password):
                raise AuthFailed
            return {"uid": user.id}
        def check_password(self, raw):
            # 验证密码是否正确
            if not self._password:
                return False
            return check_password_hash(self._password, raw)
    

    13.Token获取:

    • 通过用户输入账号,密码,登陆类型,获取token

    • 效果如下:

    • 视图定义:获取token

      from flask import current_app, jsonify
      
      from app.libs.enums import ClientTypeEnum
      from app.libs.redprint import Redprint
      from app.validators.forms import ClientForm
      from models.user import User
      #生成Token令牌
      from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
      api = Redprint("token")
      
      # 发起请求类型:
      # {"account":"xujunkaipy@163.com","secret":"123456","type":100} post
      # url = http://127.0.0.1:5000/v1/token
      @api.route('', methods=['POST'])
      def get_token():
          # 表单验证是否合法
          form = ClientForm().validate_for_api()
          # 定义验证登陆用户的方法,verify写在User类下用于验证账号密码是否正确
          promise = {
              ClientTypeEnum.USER_EMAIL: User.verify,
          }
          # 传入账号密码,进行验证。验证用户账号是否存在,密码是否正确
          identity = promise[ClientTypeEnum(form.type.data)](
              form.account.data,
              form.secret.data
          )
          # Token 生成 TOKEN_EXPIRATION 为配置文件设置过期时间
          expiration = current_app.config['TOKEN_EXPIRATION']
          # generate_auth_token生成令牌
          token = generate_auth_token(identity['uid'],
                                      form.type.data,
                                      None,
                                      expiration)
          response_token = {
              'token': token.decode('ascii')
          }
          # 返回token,返回http状态码 201
          return jsonify(response_token), 201
      
      
      def generate_auth_token(uid,ac_type,scope=None,expiration=7200):
          """
              生成令牌
              uid:用户id
              ac_type: 用户类型
              scope:权限作用域
              expiration: 令牌有效期
          """
          s = Serializer(current_app.config['SECRET_KEY'],
                         expires_in=expiration)
          return s.dumps(
              {
                  "uid": uid,
                  "type": ac_type.value
              }
          )
      
      

    14.限制没有token进行api访问

    • 往往有些数据必须登陆才能看得到,怎样限制那些没有登陆用户访问。我们通过用户访问接口是否携带token,通过装饰器方式对进行token校验来确定用户是否登陆。这里我们使用HTTPBasicAuth一个第三方组件完成功能。

    • 显示效果:

      libs/token_auth.py

      #!/usr/bin/env python
      # -*- coding:utf-8 -*-
      """
      # Author Xu Junkai
      # coding=utf-8
      # @Time    : 2020/2/26 15:10
      # @Site    :
      # @File    : token_auth.py
      # @Software: PyCharm
      """
      from collections import namedtuple
      
      from flask_httpauth import HTTPBasicAuth
      from itsdangerous import TimedJSONWebSignatureSerializer 
          as Serializer,BadSignature, SignatureExpired
      from flask import current_app, g
      
      from app.libs.error_code import AuthFailed
      # 实例化HTTPBasicAuth对象。
      auth = HTTPBasicAuth()
      # 定义一个namedtuple 用于解密token返回
      User = namedtuple("User",['uid', 'ac_type', 'scpoe'])
      @auth.verify_password
      def verify_password(token,password):
          # token验证
          # HTTPBasicAuth 规范要传递账号,密码的方式
          # key= Authorization
          # value = basic base64(账号:密码)
          """
          HTTPBasicAuth 规范可以将账号密码放到Authorization传入。
          传入格式是: basic + ' ' +base64(账号:密码)
          相当于:key=Authorization,value=basic + ' ' +base64(账号:密码)
          这里我们变通:只传入token, 这样函数第二个参数我们不用,
          通过获取到了token,执行verify_auth_token方法进行解密。解密失败抛出一系列异常
      
          """
          user_info = verify_auth_token(token)
          if not user_info:
              return False
          else:
              # 如果校验成功,把用户信息保存在g变量中,后续用
              g.user = user_info
              return True
      
      
      # 解密token
      def verify_auth_token(token):
          # 根据配置密钥解密
          s = Serializer(current_app.config['SECRET_KEY'])
          try:
              data = s.loads(token)
          except BadSignature:
              # 令牌不合法,抛出异常
              raise AuthFailed(msg='token is invalid', error_code=1002)
          except SignatureExpired:
              # 验证token是否过期
              raise AuthFailed(msg='token is expired', error_code=1003)
          uid = data['uid']
          ac_type = data['type']
          # 返回
          return User(uid, ac_type, '')
      
      
      • 在需要验证token的视图上面添加装饰器
      from app.libs.redprint import Redprint
      from app.libs.token_auth import auth
      from models.user import User
      
      api = Redprint('user')
      @api.route('/<int:uid>',methods=["GET"])
      @auth.login_required
      def get_user(uid):
          user = User.query.get_or_404(uid)
          return 'I am junKai'
      
    • 知识点:

      class A:
          name = "xjk"
          age = 18
          def __init__(self):
              self.gender = "male"
      print(A.__dict__)  # {"gender":"male"}
      #由上面代码可以看出来,__dict__方法是不可以把类变量转化成字典的,那么如何拿到类变量中键和值呢
      
      class A:
          name = "xjk"
          age = 18
          def __init__(self):
              self.gender = "male"
          def keys(self):
              # 首先会调用keys方法,获取keys
              return ("name", "age", "gender")
          def __getitem__(self, item):
              # 通过__getitem__ 让对象可以用[] 访问。
              # 通过getattr 获取对象值
              return getattr(self, item)
      o = A()
      # 通过dict 调用keys方法
      print(dict(o))
      # {'name': 'xjk', 'age': 18, 'gender': 'male'}
      

    15.权限控制

    • 这里设计权限思路而不是把权限放在库里,因为当用户多了,频繁操作你的数据库,会给你数据库带来很多压力,这里就是把权限存在文件中,然后后端逻辑判断

    • 首先下面有一个视图

    @api.route('/<int:uid>',methods=["GET"])
    @auth.login_required
    def super_get_user(uid):
        """
        超管用户获取用户信息,获取哪个都可以
        """
        user = User.query.filter_by(id = uid).first_or_404()
        return jsonify(dict(user))
    
    • 我们首先好奇的是,user为对象,我们如何通过jsonify将user转换成dict的。这里我们在组件的入口文件:也就是app.py中重写JSONEncoder方法实现的
    from flask import Flask as _Flask
    from flask.json import JSONEncoder as _JSONEncoder
    
    from app.libs.error_code import ServerError
    from datetime import date
    
    class JSONEncoder(_JSONEncoder):
        def default(self, o):
            # default会被递归调用。因为如果对象的元素无法序列化,它会再调用default进行深层次的序列化
            # 判断有没有keys和getitem方法
            if hasattr(o, 'keys') and hasattr(o, "__getitem__"):
                return dict(o)
            # 没有的话,报一个服务错误。
            # 这里判断如果是时间,再对时间对象进行处理,当然你还可以定义其他对象处理方式比如uuid
            if isinstance(o, date):
                return o.strftime('%Y-%m-%d')
            raise ServerError()
    
    class Flask(_Flask):
        # 将json_encoder替换掉自己写的JSONEncoder
        # 这样就能跑到自己定义的JSONEncoder中
        json_encoder = JSONEncoder
    
    • 首先改写model中User类的verify方法
    class User(Base):
    	...
    	@staticmethod
        def verify(email, password):
            """
            主要做登陆的验证
            """
            # 验证用户是否存在
            user = User.query.filter_by(email=email).first_or_404()
            # 通过方法check_password,验证密码是否正确
            if not user.check_password(password):
                raise AuthFailed
            # 判断是不是管理员
            scope = "AdminScope" if user.auth == 2 else "UserScope"
            # 将结果返回
            return {"uid": user.id,"scope":scope}
    
    • 在token中通过generate_auth_token函数生成token时候,将当前用户的类型也加在token中
    def generate_auth_token(uid,ac_type,scope=None,expiration=7200):
        """
            生成令牌
            uid:用户id
            ac_type: 用户类型
            scope:权限作用域
            expiration: 令牌有效期
        """
        s = Serializer(current_app.config['SECRET_KEY'],
                       expires_in=expiration)
        return s.dumps(
            {
                "uid": uid,
                "type": ac_type.value,
                "scope": scope
            }
        )
    
    
    • 在解密token时候:执行verify_auth_token函数进行解密,可以获取到当前用户是普通用户还是超级用户
    # 解密token
    def verify_auth_token(token):
        # 根据配置密钥解密
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except BadSignature:
            # 令牌不合法,抛出异常
            raise AuthFailed(msg='token is invalid', error_code=1002)
        except SignatureExpired:
            # 验证token是否过期
            raise AuthFailed(msg='token is expired', error_code=1003)
        uid = data['uid']
        ac_type = data['type']
        scope = data["scope"]
        # 可知道scope和request 所访问视图函数
        # 判断用户是否有权限
        allow = is_in_scope(scope,request.endpoint)
        print("allow",allow)
        if not allow:
            raise Forbidden()
        return User(uid, ac_type, scope)
    
    
    • 关键点在于 is_in_scope方法。它是用来判断用户权限的。Scope中定义2个列表allow_api用户存放视图函数的api,allow_module则用户存放一个py文件所有的允许api,这样,通过这两个权限粒度不同,能更好进行权限判断。
    class Scope:
        allow_api = []
        # 一个py文件所有视图函数
        allow_module = []
        # 不允许访问的权限 为什么要设置forbidden,
        # 比如说,如果user.py视图函数有100个,只有2个不允许访问,这样需要在UserScope内的allow_api配置98个项
        # 显然这是费工夫的,如果我通过排除法做就会简单很多
        forbidden = []
        def __add__(self, other):
            # 运算符重造
            self.allow_api = self.allow_api + other.allow_api
            # 权限去重:
            self.allow_api = list(set(self.allow_api))
            # 最后一定要return 当前对象否则无法用链式方法书写
    
            # 红图级别也要支持相加,也就是user.py文件所有视图
            self.allow_module = self.allow_module + other.allow_module
            self.allow_module = list(set(self.allow_module))
    
            # 禁止访问
            self.forbidden = self.forbidden + other.forbidden
            self.forbidden = list(set(self.forbidden))
            return self
    
    
    class AdminScope(Scope):
        # 超级用户
        allow_api = ['v1.user+super_get_user','v1.user+super_delete_user']
        allow_module = ["v1.user"]
        def __init__(self):
            # 链式方法添加UserScope和AdminScope权限
            self + UserScope()
    
    class UserScope(Scope):
        # 普通用户
        allow_api = ["v1.user+get_user","v1.user+delete_user"]
        forbidden = ['v1.user+super_get_user','v1.user+super_delete_user']
        def __init__(self):
            self + AdminScope()
    # class SuperScope(Scope):
    #     allow_api = ["v1.C", "v1.D"]
    #     allow_module = ["v1.user",]
    #     def __init__(self):
    #         # 链式方法添加UserScope和AdminScope权限
    #         self + UserScope() + AdminScope()
    
    def is_in_scope(scope, endpoint):
        # 首先需要让endpoint包含视图所对应模块,
        # 有些用户可以访问当前py文件所有的视图函数。通过moudle_name实现权限粒度更粗分配
        # 构建格式: 如果是视图函数 v1.view_func
        # 如果是模块的话: v1.module_name+view_func,其实module_name就是红图名字
        # 我们可以再红图做拼接
        """
        判断是否允许访问
        :param scope:
        :param endpoint:
        :return:
        """
        # scope 为 UserScope 或 AdminScope字符串
        #globals可以把当前变量,类,函数封装成一个字典
        # {"UserScope":UserScope,"AdminScope":AdminScope ...}
        # 通过 globals[]
        # 实例化相应类的对象
        scope = globals()[scope]()
        # scope.allow_api获取允许访问权限的接口
        # endpoint = v1.red_name+view_func
        splits = endpoint.split("+")
        # red_name = v1.red_name
        red_name = splits[0]
        if endpoint in scope.forbidden:
            # 首先检测是否是禁止访问的
            return False
        if endpoint in scope.allow_api:
            # 再判断视图函数是否再允许权限中
            return True
        if red_name in scope.allow_module:
            # 最后判断py文件内所有视图函数是否允许
            return True
        else:
            return False
    
    • 解密成功,并将用户信息保存在g变量中。
    @auth.verify_password
    def verify_password(token,password):
        # token验证
        # HTTPBasicAuth 规范要传递账号,密码的方式
        # key= Authorization
        # value = basic base64(账号:密码)
        """
        HTTPBasicAuth 规范可以将账号密码放到Authorization传入。
        传入格式是: basic + ' ' +base64(账号:密码)
        相当于:key=Authorization,value=basic + ' ' +base64(账号:密码)
        这里我们变通:只传入token, 这样函数第二个参数我们不用,
        通过获取到了token,执行verify_auth_token方法进行解密。解密失败抛出一系列异常
    
        """
        user_info = verify_auth_token(token)
        if not user_info:
            return False
        else:
            # 如果校验成功,把用户信息保存在g变量中,后续用
            g.user = user_info
            return True
    
    • 如果is_in_scope返回False则会引发一个错误,表示该用户没有权限
    allow = is_in_scope(scope,request.endpoint)
    if not allow:
    	raise Forbidden()
    
  • 相关阅读:
    VS2010版快捷键
    Win7旗舰版中的IIS配置asp.net的运行环境
    实现软件自动在线升级的原理
    view_countInfo
    C#尝试读取或写入受保护的内存。这通常指示其他内存已损坏。
    error: 40
    SQL Server 2008 阻止保存要求重新创建表的更改问题的设置方法
    继承实现圆柱体面积体积的计算
    圆柱模板价格计算器V1.0版本
    python3.7内置函数整理笔记
  • 原文地址:https://www.cnblogs.com/xujunkai/p/12384561.html
Copyright © 2011-2022 走看看