zoukankan      html  css  js  c++  java
  • 抽屉之Tornado实战(7)--form表单验证

      在这里,我们把form表单验证的代码进行工具化了,以后稍微修改一下参数就可以拿来用了

      先贴上代码

    forms.py

    from backend.form import fields
    
    
    class BaseForm:
    
        def __init__(self):
            self._value_dict = {}
            self._error_dict = {}
            self._valid_status = True
    
        def valid(self, handler):
    
            for field_name, field_obj in self.__dict__.items():
                if field_name.startswith('_'):
                    continue
    
                if type(field_obj) == fields.CheckBoxField:
                    post_value = handler.get_arguments(field_name, None)
                elif type(field_obj) == fields.FileField:
                    post_value = []
                    file_list = handler.request.files.get(field_name, [])
                    for file_item in file_list:
                        post_value.append(file_item['filename'])
                else:
                    post_value = handler.get_argument(field_name, None)
    
                field_obj.match(field_name, post_value)
                if field_obj.is_valid:
                    self._value_dict[field_name] = field_obj.value
                else:
                    self._error_dict[field_name] = field_obj.error
                    self._valid_status = False
            return self._valid_status
    

    fields.py

    import re
    import os
    
    class Field:
    
        def __init__(self):
    
            self.is_valid = False
            self.name = None
            self.value = None
            self.error = None
    
        def match(self, name, value):
            self.name = name
    
            if not self.required:
                self.is_valid = True
                self.value = value
            else:
                if not value:
                    if self.custom_error_dict.get('required', None):
                        self.error = self.custom_error_dict['required']
                    else:
                        self.error = "%s is required" % name
                else:
                    ret = re.match(self.REGULAR, value)
                    if ret:
                        self.is_valid = True
                        self.value = value
                    else:
                        if self.custom_error_dict.get('valid', None):
                            self.error = self.custom_error_dict['valid']
                        else:
                            self.error = "%s is invalid" % name
    
    
    class StringField(Field):
    
        REGULAR = "^.*$"
    
        def __init__(self, custom_error_dict=None, required=True):
    
            self.custom_error_dict = {}  # {'required': 'IP不能为空', 'valid': 'IP格式错误'}
            if custom_error_dict:
                self.custom_error_dict.update(custom_error_dict)
    
            self.required = required
    
            super(StringField, self).__init__()
    
    
    class IPField(Field):
    
        REGULAR = "^(25[0-5]|2[0-4]d|[0-1]?d?d)(.(25[0-5]|2[0-4]d|[0-1]?d?d)){3}$"
    
        def __init__(self, custom_error_dict=None, required=True):
    
            self.custom_error_dict = {}  # {'required': 'IP不能为空', 'valid': 'IP格式错误'}
            if custom_error_dict:
                self.custom_error_dict.update(custom_error_dict)
    
            self.required = required
            super(IPField, self).__init__()
    
    
    class EmailField(Field):
    
        REGULAR = "^w+([-+.']w+)*@w+([-.]w+)*.w+([-.]w+)*$"
    
        def __init__(self, custom_error_dict=None, required=True):
    
            self.custom_error_dict = {}  # {'required': 'IP不能为空', 'valid': 'IP格式错误'}
            if custom_error_dict:
                self.custom_error_dict.update(custom_error_dict)
    
            self.required = required
            super(EmailField, self).__init__()
    
    
    class IntegerField(Field):
    
        REGULAR = "^d+$"
    
        def __init__(self, custom_error_dict=None, required=True):
    
            self.custom_error_dict = {}  # {'required': 'IP不能为空', 'valid': 'IP格式错误'}
            if custom_error_dict:
                self.custom_error_dict.update(custom_error_dict)
    
            self.required = required
            super(IntegerField, self).__init__()
    
    
    class CheckBoxField(Field):
    
        REGULAR = "^d+$"
    
        def __init__(self, custom_error_dict=None, required=True):
    
            self.custom_error_dict = {}  # {'required': 'IP不能为空', 'valid': 'IP格式错误'}
            if custom_error_dict:
                self.custom_error_dict.update(custom_error_dict)
    
            self.required = required
            super(CheckBoxField, self).__init__()
    
        def match(self, name, value):
            self.name = name
    
            if not self.required:
                self.is_valid = True
                self.value = value
            else:
                if not value:
                    if self.custom_error_dict.get('required', None):
                        self.error = self.custom_error_dict['required']
                    else:
                        self.error = "%s is required" % name
                else:
                    if isinstance(name, list):
                        self.is_valid = True
                        self.value = value
                    else:
                        if self.custom_error_dict.get('valid', None):
                            self.error = self.custom_error_dict['valid']
                        else:
                            self.error = "%s is invalid" % name
    
    
    class FileField(Field):
    
        REGULAR = "^(w+.pdf)|(w+.mp3)|(w+.py)$"
    
        def __init__(self, custom_error_dict=None, required=True):
    
            self.custom_error_dict = {}  # {'required': 'IP不能为空', 'valid': 'IP格式错误'}
            if custom_error_dict:
                self.custom_error_dict.update(custom_error_dict)
    
            self.required = required
    
            super(FileField, self).__init__()
    
        def match(self, name, file_name_list):
            flag = True
            self.name = name
    
            if not self.required:
                self.is_valid = True
                self.value = file_name_list
            else:
                if not file_name_list:
                    if self.custom_error_dict.get('required', None):
                        self.error = self.custom_error_dict['required']
                    else:
                        self.error = "%s is required" % name
                    flag = False
                else:
                    for file_name in file_name_list:
                        if not file_name or not file_name.strip():
                            if self.custom_error_dict.get('required', None):
                                self.error = self.custom_error_dict['required']
                            else:
                                self.error = "%s is required" % name
                            flag = False
                            break
                        else:
                            ret = re.match(self.REGULAR, file_name)
                            if not ret:
                                if self.custom_error_dict.get('valid', None):
                                    self.error = self.custom_error_dict['valid']
                                else:
                                    self.error = "%s is invalid" % name
                                flag = False
                                break
    
                self.is_valid = flag
    
        def save(self, request, upload_to=""):
    
            file_metas = request.files[self.name]
            for meta in file_metas:
                file_name = meta['filename']
                file_path_name = os.path.join(upload_to, file_name)
                with open(file_path_name, 'wb') as up:
                    up.write(meta['body'])
    
            upload_file_path_list = map(lambda path: os.path.join(upload_to, path), self.value)
            self.value = list(upload_file_path_list)
    

      在form.py这个文件,做了一件什么事呢?代码就定义了父类,主要是判断要验证内容的类型,然后取值,然后又调用了在fields.py里类的match方法,最后把验证后结果信息返回(vaild方法)。

      而在fields.py文件里,主要是对为空检测,合法性检测,并把检测结果返回给form.py的vaild方法里(match方法)

    首先

    • form组件只做为空检测,合法性检测,并没做超时检测,内部可获取验证状态_valid_status--True/False,错误信息_error_dict,验证通过时的用户数据_value_dict

    • 验证类型:字符串,IP,邮箱,数字,复选框,文件

    再者,怎么用?

    • 分析你的应用场景,需要对哪几个类型进行验证,定义一个类,把需要的验证类型写入到构造方法里,记得继承一下BaseForm类,并且继承一下父类的构造方法

    • 在构造方法里,实例Field对象时,可以传入自定制错误类型信息custom_error_dict,required=False可为空设置

    from backend.form.forms import BaseForm
    from backend.form.fields import StringField
    from backend.form.fields import IntegerField
    from backend.form.fields import EmailField
    
    
    class SendMsgForm(BaseForm):
    
        def __init__(self):
            self.email = EmailField(custom_error_dict={'required': '注册邮箱不能为空.', 'valid': '注册邮箱格式错误.'})
    
            super(SendMsgForm, self).__init__()
    
    class RegisterForm(BaseForm):
    
        def __init__(self):
            self.username = StringField()
            self.email = EmailField()
            self.password = StringField()
            self.email_code = StringField()
    
            super(RegisterForm, self).__init__()
    
    class LoginForm(BaseForm):
    
        def __init__(self):
            self.user = StringField()
            self.pwd = StringField()
            self.code = StringField()
    
            super(LoginForm, self).__init__()
    

     最后

    • 在post方法里,调用一下form对象的vaild方法(把handler对象,也就是self传入),接下来只要根据form对象里检测完后的信息进行相应的操作

    import io
    import datetime
    import json
    from backend.utils import check_code
    from backend.core.request_handler import BaseRequestHandler
    from forms import account
    from backend.utils.response import BaseResponse
    from backend import commons
    from models import chouti_orm as ORM
    from sqlalchemy import and_, or_
    
    
    class CheckCodeHandler(BaseRequestHandler):
        def get(self, *args, **kwargs):
            stream = io.BytesIO()
            img, code = check_code.create_validate_code()
            img.save(stream, "png")
            self.session["CheckCode"] = code
            self.write(stream.getvalue())
    
    
    class LoginHandler(BaseRequestHandler):
        def post(self, *args, **kwargs):
            #对象里有self.status=False,self.data=None,self.summary=None,self.message={}
            rep = BaseResponse()
            form = account.LoginForm()
            if form.valid(self):
                if form._value_dict['code'].lower() != self.session["CheckCode"].lower():
                    rep.message = {'code': '验证码错误'}
                    self.write(json.dumps(rep.__dict__))
                    return
                conn = ORM.session()
                obj = conn.query(ORM.UserInfo).filter(
                    or_(
                        and_(ORM.UserInfo.email == form._value_dict['user'],
                             ORM.UserInfo.password == form._value_dict['pwd']),
                        and_(ORM.UserInfo.username == form._value_dict['user'],
                             ORM.UserInfo.password == form._value_dict['pwd'])
                    )).first()
                if not obj:
                    rep.message = {'user': '用户名邮箱或密码错误'}
                    self.write(json.dumps(rep.__dict__))
                    return
    
                self.session['is_login'] = True
                self.session['user_info'] = obj.__dict__
                rep.status = True
            else:
                rep.message = form._error_dict
            self.write(json.dumps(rep.__dict__))
    
    
    class RegisterHandler(BaseRequestHandler):
        def post(self, *args, **kwargs):
            rep = BaseResponse()
            form = account.RegisterForm()
            if form.valid(self):
                current_date = datetime.datetime.now()
                limit_day = current_date - datetime.timedelta(minutes=1)
                conn = ORM.session()
                is_valid_code = conn.query(ORM.SendMsg).filter(ORM.SendMsg.email == form._value_dict['email'],
                                                               ORM.SendMsg.code == form._value_dict['email_code'],
                                                               ORM.SendMsg.ctime > limit_day).count()
                if not is_valid_code:
                    rep.message['email_code'] = '邮箱验证码不正确或过期'
                    self.write(json.dumps(rep.__dict__))
                    return
                has_exists_email = conn.query(ORM.UserInfo).filter(ORM.UserInfo.email == form._value_dict['email']).count()
                if has_exists_email:
                    rep.message['email'] = '邮箱已经存在'
                    self.write(json.dumps(rep.__dict__))
                    return
                has_exists_username = conn.query(ORM.UserInfo).filter(
                    ORM.UserInfo.username == form._value_dict['username']).count()
                if has_exists_username:
                    rep.message['email'] = '用户名已经存在'
                    self.write(json.dumps(rep.__dict__))
                    return
                form._value_dict['ctime'] = current_date
                form._value_dict.pop('email_code')
                obj = ORM.UserInfo(**form._value_dict)
                conn.add(obj)
                conn.query(ORM.SendMsg).filter_by(email=form._value_dict['email']).delete()
                conn.commit()
                self.session['is_login'] = True
                self.session['user_info'] = obj.__dict__
                rep.status = True
    
            else:
                rep.message = form._error_dict
    
            self.write(json.dumps(rep.__dict__))
    
    
    class SendMsgHandler(BaseRequestHandler):
        def post(self, *args, **kwargs):
            rep = BaseResponse()
            form = account.SendMsgForm()
            if form.valid(self):
                email = form._value_dict['email']
                conn = ORM.session()
    
                has_exists_email = conn.query(ORM.UserInfo).filter(ORM.UserInfo.email == form._value_dict['email']).count()
                if has_exists_email:
                    rep.summary = "此邮箱已经被注册"
                    self.write(json.dumps(rep.__dict__))
                    return
                current_date = datetime.datetime.now()
                code = commons.random_code()
    
                count = conn.query(ORM.SendMsg).filter_by(**form._value_dict).count()
                if not count:
                    insert = ORM.SendMsg(code=code,
                                         email=email,
                                         ctime=current_date)
                    conn.add(insert)
                    conn.commit()
                    rep.status = True
                else:
                    limit_day = current_date - datetime.timedelta(hours=1)
                    times = conn.query(ORM.SendMsg).filter(ORM.SendMsg.email == email,
                                                           ORM.SendMsg.ctime > limit_day,
                                                           ORM.SendMsg.times >= 10,
                                                           ).count()
                    if times:
                        rep.summary = "'已经超过今日最大次数(1小时后重试)'"
                    else:
                        unfreeze = conn.query(ORM.SendMsg).filter(ORM.SendMsg.email == email,
                                                                  ORM.SendMsg.ctime < limit_day).count()
                        if unfreeze:
                            conn.query(ORM.SendMsg).filter_by(email=email).update({"times": 0})
    
                        conn.query(ORM.SendMsg).filter_by(email=email).update({"times": ORM.SendMsg.times + 1,
                                                                               "code": code,
                                                                               "ctime": current_date},
                                                                              synchronize_session="evaluate")
                        conn.commit()
                        rep.status = True
            else:
                rep.summary = form._error_dict['email']
            self.write(json.dumps(rep.__dict__))
    
  • 相关阅读:
    rs
    stm32f767 usoc3
    stm32f767 RTT 日志
    stm32f767 标准库 工程模板
    stm32f767 HAL 工程模板
    docker tab 补全 linux tab 补全
    docker anconda 依赖 下载 不了
    docker run 常用 指令
    linux scp 命令
    Dockerfile 常用参数说明
  • 原文地址:https://www.cnblogs.com/xinsiwei18/p/5890141.html
Copyright © 2011-2022 走看看