zoukankan      html  css  js  c++  java
  • day102:MoFang:后端完成对短信验证码的校验&基于celery完成异步短信发送&flask_jwt_extended&用户登录的API接口

    目录

    1.用户注册

      1.后端完成对短信验证码的校验

      2.基于celery实现短信异步发送

    2.用户登录

      1.jwt登录验证:flask_jwt_extended

      2.服务端提供用户登录的API接口

    1.用户注册

    1.后端完成对短信验证码的校验

    application.apps.users.marshmallow,代码:

    from marshmallow import Schema,fields,validate,validates,ValidationError
    from message import ErrorMessage as Message
    from .models import User,db
    class MobileSchema(Schema):
        ...
    
    from marshmallow_sqlalchemy import SQLAlchemyAutoSchema,auto_field
    from marshmallow import post_load,pre_load,validates_schema
    from application import redis
    class UserSchema(SQLAlchemyAutoSchema):
        ...
    
        @validates_schema
        def validate(self,data, **kwargs):
            ....
    
            '''校验短信验证码'''
            # 1. 从redis中提取验证码
            redis_sms_code = redis.get("sms_%s" % data["mobile"])
            if redis_sms_code is None:
                raise ValidationError(message=Message.sms_code_expired,field_name="sms_code")
            redis_sms_code = redis_sms_code.decode()
            
            #2. 从客户端提交的数据data中提取验证码
            sms_code = data["sms_code"]
            
            #3. 字符串比较,如果失败,则抛出异常,否则,直接删除验证码
            if sms_code != redis_sms_code:
                raise ValidationError(message=Message.sms_code_error, field_name="sms_code")
            redis.delete("sms_%s" % data["mobile"])
    
            return data

    2.基于celery实现短信异步发送

    1.安装celery

    pip install celery==4.4.0

    2.celery主程序文件:main.py

    在项目根目录下创建mycelery目录,同时创建celery启动主程序文件main.py,代码:

    from __future__ import absolute_import
    from celery import Celery
    from application import init_app
    
    # 初始化celery对象
    app = Celery("flask")
    
    # 初始化flask 
    flask_app = init_app("application.settings.dev").app
    
    # 加载配置
    app.config_from_object("mycelery.config")
    
    # 自动注册任务
    app.autodiscover_tasks(["mycelery.sms"])

    3.celery配置文件:config.py

    配置文件,mycelery.config,代码:

    # 任务队列地址
    broker_url = 'redis://127.0.0.1:6379/15'
    
    # 结果队列地址
    result_backend = "redis://127.0.0.1:6379/14"

    4.创建任务模块:sms/tasks.py

    在mycelery下创建任务模块包sms,并创建tasks.py任务模块文件,

    同时,在任务执行过程中, 基于监听器和任务bind属性对失败任务进行记录和重新尝试执行. 代码:

    import json
    from application import redis
    from flask import current_app
    from ronglian_sms_sdk import SmsSDK
    from mycelery.main import app,flask_app
    
    @app.task(name="send_sms",bind=True)
    def send_sms(self,mobile,sms_code):
        """发送短信"""
        try:
            with flask_app.app_context(): 
                sdk = SmsSDK(
                    current_app.config.get("SMS_ACCOUNT_ID"),
                    current_app.config.get("SMS_ACCOUNT_TOKEN"),
                    current_app.config.get("SMS_APP_ID")
                )
                ret = sdk.sendMessage(
                    current_app.config.get("SMS_TEMPLATE_ID"),
                    mobile,
                    (sms_code, current_app.config.get("SMS_EXPIRE_TIME") // 60)
                )
                result = json.loads(ret)
    
                if result["statusCode"] == "000000":
                    pipe = redis.pipeline()
                    pipe.multi()  # 开启事务
                    # 保存短信记录到redis中
                    pipe.setex("sms_%s" % mobile, current_app.config.get("SMS_EXPIRE_TIME"), sms_code)
                    # 进行冷却倒计时
                    pipe.setex("int_%s" % mobile, current_app.config.get("SMS_INTERVAL_TIME"), "_")
                    
                    pipe.execute()  # 提交事务
                else:
                    current_app.log.error("短信发送失败!
    %s" % ret)
                    raise Exception
        except Exception as exc:
            # 重新尝试执行失败任务
            print(self.request.retries) # 本次执行的次数
            self.retry(exc=exc, countdown=3, max_retries=5)
    
    """基于监听器完成任务监听"""
    from celery.app.task import Task
    class SMSTask(Task):
        def on_success(self, retval, task_id, args, kwargs):
            print( '任务执行成功!')
            return super().on_success(retval, task_id, args, kwargs)
    
        def on_failure(self, exc, task_id, args, kwargs, einfo):
            print('任务执行失败!%s' % self.request.retries)
            # 重新尝试执行失败任务,时间间隔:3秒,最大尝试次数:5次
            self.retry(exc=exc, countdown=3, max_retries=5)
            return super().on_failure(exc, task_id, args, kwargs, einfo)
    
        def after_return(self, status, retval, task_id, args, kwargs, einfo):
            print('this is after return')
            return super().after_return(status, retval, task_id, args, kwargs, einfo)
    
        def on_retry(self, exc, task_id, args, kwargs, einfo):
            print('this is retry')
            return super().on_retry(exc, task_id, args, kwargs, einfo)

    5.flask项目调用异步任务发送短信

    flask项目调用异步任务发送短信,application.apps.home.views,代码:

    @jsonrpc.method(name="Home.sms")
    def sms(mobile):
        """发送短信验证码"""
        # 验证手机
        if not re.match("^1[3-9]d{9}$",mobile):
            return {"errno": status.CODE_VALIDATE_ERROR, "errmsg": message.mobile_format_error}
    
        # 短信发送冷却时间
        ret = redis.get("int_%s" % mobile)
        if ret is not None:
            return {"errno": status.CODE_INTERVAL_TIME, "errmsg": message.sms_interval_time}
    
        # 生成验证码
        sms_code = "%06d" % random.randint(0,999999)
        
        try:
            # 异步发送短信 ******
            from mycelery.sms.tasks import send_sms
            send_sms.delay(mobile=mobile, sms_code=sms_code)
            # 返回结果
            return {"errno":status.CODE_OK, "errmsg": message.sms_is_send}
        except Exception as e:
            return {"errno": status.CODE_SMS_ERROR, "errmsg": message.sms_send_error}

    6.运行celery

    在第一个终端运行celery

    主程序终端下启动: celery -A mycelery.main worker -l info
    调度器终端下启动: celery -A mycelery.main beat

    再开一个终端,输入如下指令

    python manage.py shell
    >>> from mycelery.sms.tasks import send_sms
    >>> send_sms.delay(mobile="13928836666",sms_code="123456")

    2.用户登录

    1.jwt登录验证:flask_jwt_extended

    1.flask_jwt_extended简介

    当前我们开发的项目属于前后端分离,而目前最适合我们使用的认证方式就是jwt token认证。

    在flask中,我们可以通过flask_jwt_extended模块来快速实现jwt用户登录认证。

    注意:

    1. flask_jwt_extended的作者开发当前模块主要适用于flask的普通视图方法的。其认证方式主要通过装饰器来完成。而我们当前所有服务端接口都改造成了jsonrpc规范接口,所以我们在使用过程中,需要对部分源代码进行调整才能正常使用。

    2. 事实上,在我们当前使用的flask_jsonrpc也提供了用户登陆认证功能,但是这个功能是依靠用户账户username和密码password来实现。如果我们基于当前这种方式,也可以实现jwt登陆认证,只是相对于上面的flask_jwt_extended模块而言,要补充的代码会更多,所以在此,我们放弃这块功能的使用。

    2.模块安装

    pip install flask-jwt-extended

    官网文档:https://flask-jwt-extended.readthedocs.io/en/latest/

    配置说明:https://flask-jwt-extended.readthedocs.io/en/latest/options/

    3.初始化

    在魔方项目中对模块进行初始化,application/__init__.py,代码:

    import os,sys
    # 引入flask_jwt_extended模块
    from flask_jwt_extended import JWTManager
    
    # jwt认证模块实例化
    jwt = JWTManager()
    
    def init_app(config_path):
        """全局初始化"""
       
        # jwt初始化
        jwt.init_app(app)
    
        return manager

    4.jwt相关配置

    配置文件,application.settings.dev,代码:

    from . import InitConfig
    class Config(InitConfig):
        """项目开发环境下的配置"""
        ......
    
        # jwt 相关配置
        # 加密算法,默认: HS256
        JWT_ALGORITHM  = "HS256"
        # 秘钥,默认是flask配置中的SECRET_KEY
        JWT_SECRET_KEY = "y58Rsqzmts6VCBRHes1Sf2DHdGJaGqPMi6GYpBS4CKyCdi42KLSs9TQVTauZMLMw"
        # token令牌有效期,单位: 秒/s,默认: datetime.timedelta(minutes=15) 或者 15 * 60
        JWT_ACCESS_TOKEN_EXPIRES = 60
        # refresh刷新令牌有效期,单位: 秒/s,默认:datetime.timedelta(days=30) 或者 30*24*60*60
        JWT_REFRESH_TOKEN_EXPIRES = 30*24*60*60
        # 设置通过哪种方式传递jwt,默认是http请求头,也可以是query_string,json,cookies
        JWT_TOKEN_LOCATION = "headers"
        # 当通过http请求头传递jwt时,请求头参数名称设置,默认值: Authorization
        JWT_HEADER_NAME="Authorization"
        # 当通过http请求头传递jwt时,令牌的前缀。
        # 默认值为 "Bearer",例如:Authorization: Bearer <JWT>
        JWT_HEADER_TYPE="jwt"

    5.编写登录视图函数(和jwt部分相关的)

    application.apps.users.views,代码:

    from application import jsonrpc,db
    from .marshmallow import MobileSchema,UserSchema
    from marshmallow import ValidationError
    from message import ErrorMessage as Message
    from status import APIStatus as status
    ......
    
    from flask_jwt_extended import create_access_token,create_refresh_token,jwt_required,get_jwt_identity,jwt_refresh_token_required
    from flask import jsonify,json
    
    @jsonrpc.method("User.login")
    def login(account,password):
        """根据用户登录信息生成token"""
        # 1. todo 根据账户信息和密码获取用户
        
        # 2. 生成jwt token
        access_token = create_access_token(identity=account)
        refresh_token = create_refresh_token(identity=account)
        return "ok"
    
    @jsonrpc.method("User.info")
    @jwt_required # 验证jwt
    def info():
        """获取用户信息"""
        user_data = json.loads(get_jwt_identity()) # get_jwt_identity 用于获取载荷中的数据
        return "ok"
    
    @jsonrpc.method("User.refresh")
    @jwt_refresh_token_required
    def refresh():
        """重新获取新的认证令牌token"""
        current_user = get_jwt_identity()
        # 重新生成token
        access_token = create_access_token(identity=current_user)
        return access_token

    6.修改jwt源码

    装饰器jwt_required就是用于获取客户端提交的数据中的jwt的方法,这里,我们需要进行2处调整。以方便它更好的展示错误信息。

    flask_jwt_extended/view_decorators.py,代码:

    from jwt.exceptions import ExpiredSignatureError
    from flask_jwt_extended.exceptions import InvalidHeaderError
    from message import ErrorMessage as message
    from status import APIStatus as status
    def jwt_required(fn):
    @wraps(fn) def wrapper(*args, **kwargs): try: verify_jwt_in_request() except NoAuthorizationError: return {"errno":status.CODE_NO_AUTHORIZATION,"errmsg":message.no_authorization} except ExpiredSignatureError: return {"errno":status.CODE_SIGNATURE_EXPIRED,"errmsg":message.authorization_has_expired} except InvalidHeaderError: return {"errno":status.CODE_INVALID_AUTHORIZATION,"errmsg":message.authorization_is_invalid} return fn(*args, **kwargs) return wrapper

    当前文件,另一个验证函数jwt_refresh_token_required,代码:

    def jwt_refresh_token_required(fn):
    
        @wraps(fn)
        def wrapper(*args, **kwargs):
            try:
                verify_jwt_refresh_token_in_request()
            except NoAuthorizationError:
                return {"errno":status.CODE_NO_AUTHORIZATION,"errmsg":message.no_authorization}
            except ExpiredSignatureError:
                return {"errno":status.CODE_SIGNATURE_EXPIRED,"errmsg":message.authorization_has_expired}
            except InvalidHeaderError:
                return {"errno":status.CODE_INVALID_AUTHORIZATION,"errmsg":message.authorization_is_invalid}
            return fn(*args, **kwargs)
        return wrapper

    2.服务端提供用户登录的API接口

    application.apps.users.views,视图实现并完成登陆接口,代码:

    from flask_jwt_extended import create_access_token,create_refresh_token,jwt_required,get_jwt_identity,jwt_refresh_token_required
    from flask import jsonify,json
    from sqlalchemy import or_
    from .models import User
    from message import ErrorMessage as message
    from status import APIStatus as status
    @jsonrpc.method("User.login")
    def login(account,password):
        """根据用户登录信息生成token"""
        # 1. 根据账户信息和密码获取用户
        if len(account) < 1:
            return {"errno":status.CODE_NO_ACCOUNT,"errmsg":message.account_no_data}
        user = User.query.filter(or_(
            User.mobile==account,
            User.email==account,
            User.name==account
        )).first()
    
        # 检测用户是否存在
        if user is None: 
            return {"errno": status.CODE_NO_USER,"errmsg":message.user_not_exists}
    
        # 验证密码
        if not user.check_password(password): 
            return {"errno": status.CODE_PASSWORD_ERROR, "errmsg":message.password_error}
    
        # 2. 生成jwt token
        access_token = create_access_token(identity=user.id)
        refresh_token = create_refresh_token(identity=user.id)
    
        return {"access_token": access_token,"refresh_token":refresh_token}
    
    @jsonrpc.method("User.info")
    @jwt_required # 验证jwt
    def info():
        """获取用户信息"""
        user_data = json.loads(get_jwt_identity()) # get_jwt_identity 用于获取载荷中的数据
        print(user_data)
        return "ok"
    
    @jsonrpc.method("User.refresh")
    @jwt_refresh_token_required
    def refresh():
        """重新获取新的认证令牌token"""
        current_user = get_jwt_identity()
        # 重新生成token
        access_token = create_access_token(identity=current_user)
        return access_token
  • 相关阅读:
    Linux常用命令-centos
    USACO 2006 Open, Problem. The Country Fair 动态规划
    USACO 2007 March Contest, Silver Problem 1. Cow Traffic
    USACO 2007 December Contest, Silver Problem 2. Building Roads Kruskal最小生成树算法
    USACO 2015 February Contest, Silver Problem 3. Superbull Prim最小生成树算法
    LG-P2804 神秘数字/LG-P1196 火柴排队 归并排序, 逆序对
    数据结构 并查集
    浴谷国庆集训 对拍
    1999 NOIP 回文数
    2010 NOIP 普及组 第3题 导弹拦截
  • 原文地址:https://www.cnblogs.com/libolun/p/14083206.html
Copyright © 2011-2022 走看看