zoukankan      html  css  js  c++  java
  • day102:MoFang:后端完成对短信验证码的校验&基于celery完成异步短信发送&flask_jwt_extended&用户登录的API接口

    目录

    1.用户注册

      1.后端完成对短信验证码的校验

      2.基于celery实现短信异步发送

    2.用户登录

      1.jwt登录验证:flask_jwt_extended

      2.服务端提供用户登录的API接口

    1.用户注册

    1.后端完成对短信验证码的校验

    application.apps.users.marshmallow,代码:

    from marshmallow import Schema,fields,validate,validates,ValidationError
    from message import ErrorMessage as Message
    from .models import User,db
    class MobileSchema(Schema):
        ...
    
    from marshmallow_sqlalchemy import SQLAlchemyAutoSchema,auto_field
    from marshmallow import post_load,pre_load,validates_schema
    from application import redis
    class UserSchema(SQLAlchemyAutoSchema):
        ...
    
        @validates_schema
        def validate(self,data, **kwargs):
            ....
    
            '''校验短信验证码'''
            # 1. 从redis中提取验证码
            redis_sms_code = redis.get("sms_%s" % data["mobile"])
            if redis_sms_code is None:
                raise ValidationError(message=Message.sms_code_expired,field_name="sms_code")
            redis_sms_code = redis_sms_code.decode()
            
            #2. 从客户端提交的数据data中提取验证码
            sms_code = data["sms_code"]
            
            #3. 字符串比较,如果失败,则抛出异常,否则,直接删除验证码
            if sms_code != redis_sms_code:
                raise ValidationError(message=Message.sms_code_error, field_name="sms_code")
            redis.delete("sms_%s" % data["mobile"])
    
            return data

    2.基于celery实现短信异步发送

    1.安装celery

    pip install celery==4.4.0

    2.celery主程序文件:main.py

    在项目根目录下创建mycelery目录,同时创建celery启动主程序文件main.py,代码:

    from __future__ import absolute_import
    from celery import Celery
    from application import init_app
    
    # 初始化celery对象
    app = Celery("flask")
    
    # 初始化flask 
    flask_app = init_app("application.settings.dev").app
    
    # 加载配置
    app.config_from_object("mycelery.config")
    
    # 自动注册任务
    app.autodiscover_tasks(["mycelery.sms"])

    3.celery配置文件:config.py

    配置文件,mycelery.config,代码:

    # 任务队列地址
    broker_url = 'redis://127.0.0.1:6379/15'
    
    # 结果队列地址
    result_backend = "redis://127.0.0.1:6379/14"

    4.创建任务模块:sms/tasks.py

    在mycelery下创建任务模块包sms,并创建tasks.py任务模块文件,

    同时,在任务执行过程中, 基于监听器和任务bind属性对失败任务进行记录和重新尝试执行. 代码:

    import json
    from application import redis
    from flask import current_app
    from ronglian_sms_sdk import SmsSDK
    from mycelery.main import app,flask_app
    
    @app.task(name="send_sms",bind=True)
    def send_sms(self,mobile,sms_code):
        """发送短信"""
        try:
            with flask_app.app_context(): 
                sdk = SmsSDK(
                    current_app.config.get("SMS_ACCOUNT_ID"),
                    current_app.config.get("SMS_ACCOUNT_TOKEN"),
                    current_app.config.get("SMS_APP_ID")
                )
                ret = sdk.sendMessage(
                    current_app.config.get("SMS_TEMPLATE_ID"),
                    mobile,
                    (sms_code, current_app.config.get("SMS_EXPIRE_TIME") // 60)
                )
                result = json.loads(ret)
    
                if result["statusCode"] == "000000":
                    pipe = redis.pipeline()
                    pipe.multi()  # 开启事务
                    # 保存短信记录到redis中
                    pipe.setex("sms_%s" % mobile, current_app.config.get("SMS_EXPIRE_TIME"), sms_code)
                    # 进行冷却倒计时
                    pipe.setex("int_%s" % mobile, current_app.config.get("SMS_INTERVAL_TIME"), "_")
                    
                    pipe.execute()  # 提交事务
                else:
                    current_app.log.error("短信发送失败!
    %s" % ret)
                    raise Exception
        except Exception as exc:
            # 重新尝试执行失败任务
            print(self.request.retries) # 本次执行的次数
            self.retry(exc=exc, countdown=3, max_retries=5)
    
    """基于监听器完成任务监听"""
    from celery.app.task import Task
    class SMSTask(Task):
        def on_success(self, retval, task_id, args, kwargs):
            print( '任务执行成功!')
            return super().on_success(retval, task_id, args, kwargs)
    
        def on_failure(self, exc, task_id, args, kwargs, einfo):
            print('任务执行失败!%s' % self.request.retries)
            # 重新尝试执行失败任务,时间间隔:3秒,最大尝试次数:5次
            self.retry(exc=exc, countdown=3, max_retries=5)
            return super().on_failure(exc, task_id, args, kwargs, einfo)
    
        def after_return(self, status, retval, task_id, args, kwargs, einfo):
            print('this is after return')
            return super().after_return(status, retval, task_id, args, kwargs, einfo)
    
        def on_retry(self, exc, task_id, args, kwargs, einfo):
            print('this is retry')
            return super().on_retry(exc, task_id, args, kwargs, einfo)

    5.flask项目调用异步任务发送短信

    flask项目调用异步任务发送短信,application.apps.home.views,代码:

    @jsonrpc.method(name="Home.sms")
    def sms(mobile):
        """发送短信验证码"""
        # 验证手机
        if not re.match("^1[3-9]d{9}$",mobile):
            return {"errno": status.CODE_VALIDATE_ERROR, "errmsg": message.mobile_format_error}
    
        # 短信发送冷却时间
        ret = redis.get("int_%s" % mobile)
        if ret is not None:
            return {"errno": status.CODE_INTERVAL_TIME, "errmsg": message.sms_interval_time}
    
        # 生成验证码
        sms_code = "%06d" % random.randint(0,999999)
        
        try:
            # 异步发送短信 ******
            from mycelery.sms.tasks import send_sms
            send_sms.delay(mobile=mobile, sms_code=sms_code)
            # 返回结果
            return {"errno":status.CODE_OK, "errmsg": message.sms_is_send}
        except Exception as e:
            return {"errno": status.CODE_SMS_ERROR, "errmsg": message.sms_send_error}

    6.运行celery

    在第一个终端运行celery

    主程序终端下启动: celery -A mycelery.main worker -l info
    调度器终端下启动: celery -A mycelery.main beat

    再开一个终端,输入如下指令

    python manage.py shell
    >>> from mycelery.sms.tasks import send_sms
    >>> send_sms.delay(mobile="13928836666",sms_code="123456")

    2.用户登录

    1.jwt登录验证:flask_jwt_extended

    1.flask_jwt_extended简介

    当前我们开发的项目属于前后端分离,而目前最适合我们使用的认证方式就是jwt token认证。

    在flask中,我们可以通过flask_jwt_extended模块来快速实现jwt用户登录认证。

    注意:

    1. flask_jwt_extended的作者开发当前模块主要适用于flask的普通视图方法的。其认证方式主要通过装饰器来完成。而我们当前所有服务端接口都改造成了jsonrpc规范接口,所以我们在使用过程中,需要对部分源代码进行调整才能正常使用。

    2. 事实上,在我们当前使用的flask_jsonrpc也提供了用户登陆认证功能,但是这个功能是依靠用户账户username和密码password来实现。如果我们基于当前这种方式,也可以实现jwt登陆认证,只是相对于上面的flask_jwt_extended模块而言,要补充的代码会更多,所以在此,我们放弃这块功能的使用。

    2.模块安装

    pip install flask-jwt-extended

    官网文档:https://flask-jwt-extended.readthedocs.io/en/latest/

    配置说明:https://flask-jwt-extended.readthedocs.io/en/latest/options/

    3.初始化

    在魔方项目中对模块进行初始化,application/__init__.py,代码:

    import os,sys
    # 引入flask_jwt_extended模块
    from flask_jwt_extended import JWTManager
    
    # jwt认证模块实例化
    jwt = JWTManager()
    
    def init_app(config_path):
        """全局初始化"""
       
        # jwt初始化
        jwt.init_app(app)
    
        return manager

    4.jwt相关配置

    配置文件,application.settings.dev,代码:

    from . import InitConfig
    class Config(InitConfig):
        """项目开发环境下的配置"""
        ......
    
        # jwt 相关配置
        # 加密算法,默认: HS256
        JWT_ALGORITHM  = "HS256"
        # 秘钥,默认是flask配置中的SECRET_KEY
        JWT_SECRET_KEY = "y58Rsqzmts6VCBRHes1Sf2DHdGJaGqPMi6GYpBS4CKyCdi42KLSs9TQVTauZMLMw"
        # token令牌有效期,单位: 秒/s,默认: datetime.timedelta(minutes=15) 或者 15 * 60
        JWT_ACCESS_TOKEN_EXPIRES = 60
        # refresh刷新令牌有效期,单位: 秒/s,默认:datetime.timedelta(days=30) 或者 30*24*60*60
        JWT_REFRESH_TOKEN_EXPIRES = 30*24*60*60
        # 设置通过哪种方式传递jwt,默认是http请求头,也可以是query_string,json,cookies
        JWT_TOKEN_LOCATION = "headers"
        # 当通过http请求头传递jwt时,请求头参数名称设置,默认值: Authorization
        JWT_HEADER_NAME="Authorization"
        # 当通过http请求头传递jwt时,令牌的前缀。
        # 默认值为 "Bearer",例如:Authorization: Bearer <JWT>
        JWT_HEADER_TYPE="jwt"

    5.编写登录视图函数(和jwt部分相关的)

    application.apps.users.views,代码:

    from application import jsonrpc,db
    from .marshmallow import MobileSchema,UserSchema
    from marshmallow import ValidationError
    from message import ErrorMessage as Message
    from status import APIStatus as status
    ......
    
    from flask_jwt_extended import create_access_token,create_refresh_token,jwt_required,get_jwt_identity,jwt_refresh_token_required
    from flask import jsonify,json
    
    @jsonrpc.method("User.login")
    def login(account,password):
        """根据用户登录信息生成token"""
        # 1. todo 根据账户信息和密码获取用户
        
        # 2. 生成jwt token
        access_token = create_access_token(identity=account)
        refresh_token = create_refresh_token(identity=account)
        return "ok"
    
    @jsonrpc.method("User.info")
    @jwt_required # 验证jwt
    def info():
        """获取用户信息"""
        user_data = json.loads(get_jwt_identity()) # get_jwt_identity 用于获取载荷中的数据
        return "ok"
    
    @jsonrpc.method("User.refresh")
    @jwt_refresh_token_required
    def refresh():
        """重新获取新的认证令牌token"""
        current_user = get_jwt_identity()
        # 重新生成token
        access_token = create_access_token(identity=current_user)
        return access_token

    6.修改jwt源码

    装饰器jwt_required就是用于获取客户端提交的数据中的jwt的方法,这里,我们需要进行2处调整。以方便它更好的展示错误信息。

    flask_jwt_extended/view_decorators.py,代码:

    from jwt.exceptions import ExpiredSignatureError
    from flask_jwt_extended.exceptions import InvalidHeaderError
    from message import ErrorMessage as message
    from status import APIStatus as status
    def jwt_required(fn):
    @wraps(fn) def wrapper(*args, **kwargs): try: verify_jwt_in_request() except NoAuthorizationError: return {"errno":status.CODE_NO_AUTHORIZATION,"errmsg":message.no_authorization} except ExpiredSignatureError: return {"errno":status.CODE_SIGNATURE_EXPIRED,"errmsg":message.authorization_has_expired} except InvalidHeaderError: return {"errno":status.CODE_INVALID_AUTHORIZATION,"errmsg":message.authorization_is_invalid} return fn(*args, **kwargs) return wrapper

    当前文件,另一个验证函数jwt_refresh_token_required,代码:

    def jwt_refresh_token_required(fn):
    
        @wraps(fn)
        def wrapper(*args, **kwargs):
            try:
                verify_jwt_refresh_token_in_request()
            except NoAuthorizationError:
                return {"errno":status.CODE_NO_AUTHORIZATION,"errmsg":message.no_authorization}
            except ExpiredSignatureError:
                return {"errno":status.CODE_SIGNATURE_EXPIRED,"errmsg":message.authorization_has_expired}
            except InvalidHeaderError:
                return {"errno":status.CODE_INVALID_AUTHORIZATION,"errmsg":message.authorization_is_invalid}
            return fn(*args, **kwargs)
        return wrapper

    2.服务端提供用户登录的API接口

    application.apps.users.views,视图实现并完成登陆接口,代码:

    from flask_jwt_extended import create_access_token,create_refresh_token,jwt_required,get_jwt_identity,jwt_refresh_token_required
    from flask import jsonify,json
    from sqlalchemy import or_
    from .models import User
    from message import ErrorMessage as message
    from status import APIStatus as status
    @jsonrpc.method("User.login")
    def login(account,password):
        """根据用户登录信息生成token"""
        # 1. 根据账户信息和密码获取用户
        if len(account) < 1:
            return {"errno":status.CODE_NO_ACCOUNT,"errmsg":message.account_no_data}
        user = User.query.filter(or_(
            User.mobile==account,
            User.email==account,
            User.name==account
        )).first()
    
        # 检测用户是否存在
        if user is None: 
            return {"errno": status.CODE_NO_USER,"errmsg":message.user_not_exists}
    
        # 验证密码
        if not user.check_password(password): 
            return {"errno": status.CODE_PASSWORD_ERROR, "errmsg":message.password_error}
    
        # 2. 生成jwt token
        access_token = create_access_token(identity=user.id)
        refresh_token = create_refresh_token(identity=user.id)
    
        return {"access_token": access_token,"refresh_token":refresh_token}
    
    @jsonrpc.method("User.info")
    @jwt_required # 验证jwt
    def info():
        """获取用户信息"""
        user_data = json.loads(get_jwt_identity()) # get_jwt_identity 用于获取载荷中的数据
        print(user_data)
        return "ok"
    
    @jsonrpc.method("User.refresh")
    @jwt_refresh_token_required
    def refresh():
        """重新获取新的认证令牌token"""
        current_user = get_jwt_identity()
        # 重新生成token
        access_token = create_access_token(identity=current_user)
        return access_token
  • 相关阅读:
    Python3.5 学习三
    心灵鸡汤20180727
    Python3.5 学习二
    spring心得4--setter注入集合(set、list、map、properties等多种集合,配有案例解析)@基本装(引用)
    drop user和drop user cascade的区别(转)
    数据库的导入 导出
    OracleDBConsole服务无法启动原因
    create XML
    C#里面Console.Write与Console.WriteLine有什么区别????
    将字符串 按照规定编码方式编码
  • 原文地址:https://www.cnblogs.com/libolun/p/14083206.html
Copyright © 2011-2022 走看看